Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -1030,13 +1030,8 @@ func (app *App) HandleClose() error {
}
}

// Close state store (SeiDB) - critical for cleaning up background goroutines
if app.stateStore != nil {
if err := app.stateStore.Close(); err != nil {
app.Logger().Error("failed to close state store", "error", err)
errs = append(errs, fmt.Errorf("failed to close state store: %w", err))
}
}
// Note: stateStore (ssStore) is already closed by cms.Close() in BaseApp.Close()
// No need to close it again here.

if len(errs) > 0 {
return fmt.Errorf("errors during close: %v", errs)
Expand Down
5 changes: 5 additions & 0 deletions sei-db/db_engine/pebbledb/mvcc/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,11 @@ func (db *Database) writeAsyncInBackground() {
// it has been updated. This occurs when that module's keys are updated in between pruning runs, the node after is restarted.
// This is not a large issue given the next time that module is updated, it will be properly pruned thereafter.
func (db *Database) Prune(version int64) (_err error) {
// Defensive check: ensure database is not closed
if db.storage == nil {
return errors.New("pebbledb: database is closed")
}

startTime := time.Now()
defer func() {
otelMetrics.pruneLatency.Record(
Expand Down
5 changes: 5 additions & 0 deletions sei-db/db_engine/rocksdb/mvcc/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,11 @@ func (db *Database) writeAsyncInBackground() {
// lazy prune. Future compactions will honor the increased full_history_ts_low
// and trim history when possible.
func (db *Database) Prune(version int64) error {
// Defensive check: ensure database is not closed
if db.storage == nil {
return fmt.Errorf("rocksdb: database is closed")
}

tsLow := version + 1 // we increment by 1 to include the provided version

var ts [TimestampSize]byte
Expand Down
76 changes: 58 additions & 18 deletions sei-db/state_db/ss/pruning/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import (
"fmt"
"math/rand"
"sync"
"time"

"github.com/sei-protocol/sei-chain/sei-db/common/logger"
Expand All @@ -14,7 +15,12 @@
stateStore types.StateStore
keepRecent int64
pruneInterval int64
started bool

// Lifecycle management
startOnce sync.Once
stopCh chan struct{}
stopOnce sync.Once
wg sync.WaitGroup
}

// NewPruningManager creates a new pruning manager for state store
Expand All @@ -30,31 +36,65 @@
stateStore: stateStore,
keepRecent: keepRecent,
pruneInterval: pruneInterval,
stopCh: make(chan struct{}),
}
}

func (m *Manager) Start() {
if m.keepRecent <= 0 || m.pruneInterval <= 0 || m.started {
if m.keepRecent <= 0 || m.pruneInterval <= 0 {
return
}
m.started = true
go func() {
for {
pruneStartTime := time.Now()
latestVersion := m.stateStore.GetLatestVersion()
pruneVersion := latestVersion - m.keepRecent
if pruneVersion > 0 {
// prune all versions up to and including the pruneVersion
if err := m.stateStore.Prune(pruneVersion); err != nil {
m.logger.Error("failed to prune versions till", "version", pruneVersion, "err", err)
}
m.startOnce.Do(func() {
m.wg.Add(1)
go m.pruneLoop()

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism
})
}

// Stop gracefully stops the pruning goroutine and waits for it to exit.
// Safe to call multiple times (idempotent).
func (m *Manager) Stop() {
m.stopOnce.Do(func() {
close(m.stopCh)
})
m.wg.Wait() // Safe: WaitGroup.Wait() is idempotent when counter is 0
}

func (m *Manager) pruneLoop() {
defer m.wg.Done()

for {
// Check for stop signal before pruning
select {
case <-m.stopCh:
m.logger.Info("Pruning manager stopped")
return
default:
}

pruneStartTime := time.Now()

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
latestVersion := m.stateStore.GetLatestVersion()
pruneVersion := latestVersion - m.keepRecent
if pruneVersion > 0 {
// prune all versions up to and including the pruneVersion
if err := m.stateStore.Prune(pruneVersion); err != nil {
m.logger.Error("failed to prune versions till", "version", pruneVersion, "err", err)
} else {
m.logger.Info(fmt.Sprintf("Pruned state store till version %d took %s\n", pruneVersion, time.Since(pruneStartTime)))
}
}

// Generate a random percentage (between 0% and 100%) of the fixed interval as a delay
randomPercentage := rand.Float64() // Generate a random float between 0 and 1
randomDelay := int64(float64(m.pruneInterval) * randomPercentage)
time.Sleep(time.Duration(m.pruneInterval+randomDelay) * time.Second)
// Generate a random percentage (between 0% and 100%) of the fixed interval as a delay
randomPercentage := rand.Float64()
randomDelay := int64(float64(m.pruneInterval) * randomPercentage)

Check notice

Code scanning / CodeQL

Floating point arithmetic Note

Floating point arithmetic operations are not associative and a possible source of non-determinism
sleepDuration := time.Duration(m.pruneInterval+randomDelay) * time.Second

// Wait with stop signal check
select {
case <-m.stopCh:
m.logger.Info("Pruning manager stopped")
return
case <-time.After(sleepDuration):
// Continue to next iteration
}
}()
}
}
160 changes: 160 additions & 0 deletions sei-db/state_db/ss/pruning/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package pruning

import (
"sync"
"sync/atomic"
"testing"
"time"

"github.com/sei-protocol/sei-chain/sei-db/common/logger"
"github.com/sei-protocol/sei-chain/sei-db/proto"
"github.com/sei-protocol/sei-chain/sei-db/state_db/ss/types"
"github.com/stretchr/testify/require"
)

// mockStateStore is a minimal StateStore implementation for testing
type mockStateStore struct {
latestVersion int64
pruneCount atomic.Int32
closeCount atomic.Int32
}

func (m *mockStateStore) Get(storeKey string, version int64, key []byte) ([]byte, error) {
return nil, nil
}
func (m *mockStateStore) Has(storeKey string, version int64, key []byte) (bool, error) {
return false, nil
}
func (m *mockStateStore) GetLatestVersion() int64 {
return m.latestVersion
}
func (m *mockStateStore) SetLatestVersion(version int64) error {
m.latestVersion = version
return nil
}
func (m *mockStateStore) GetEarliestVersion() int64 {
return 1
}
func (m *mockStateStore) SetEarliestVersion(version int64, ignoreVersion bool) error {
return nil
}
func (m *mockStateStore) GetLatestMigratedKey() ([]byte, error) {
return nil, nil
}
func (m *mockStateStore) GetLatestMigratedModule() (string, error) {
return "", nil
}
func (m *mockStateStore) WriteBlockRangeHash(storeKey string, beginBlockRange, endBlockRange int64, hash []byte) error {
return nil
}
func (m *mockStateStore) Iterator(storeKey string, version int64, start, end []byte) (types.DBIterator, error) {
return nil, nil
}
func (m *mockStateStore) ReverseIterator(storeKey string, version int64, start, end []byte) (types.DBIterator, error) {
return nil, nil
}
func (m *mockStateStore) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error {
return nil
}
func (m *mockStateStore) ApplyChangesetSync(version int64, changesets []*proto.NamedChangeSet) error {
return nil
}
func (m *mockStateStore) Prune(version int64) error {
m.pruneCount.Add(1)
return nil
}
func (m *mockStateStore) Close() error {
m.closeCount.Add(1)
return nil
}
func (m *mockStateStore) RawIterate(storeKey string, fn func([]byte, []byte, int64) bool) (bool, error) {
return false, nil
}
func (m *mockStateStore) Import(version int64, ch <-chan types.SnapshotNode) error {
return nil
}
func (m *mockStateStore) RawImport(ch <-chan types.RawSnapshotNode) error {
return nil
}

func TestManagerStartStop(t *testing.T) {
store := &mockStateStore{latestVersion: 100}
manager := NewPruningManager(logger.NewNopLogger(), store, 10, 1)

// Start should launch the goroutine
manager.Start()

// Give it time to run at least one prune cycle
time.Sleep(100 * time.Millisecond)

// Stop should gracefully terminate
manager.Stop()

// Verify prune was called at least once
require.GreaterOrEqual(t, store.pruneCount.Load(), int32(1))
}

func TestManagerStopIdempotent(t *testing.T) {
store := &mockStateStore{latestVersion: 100}
manager := NewPruningManager(logger.NewNopLogger(), store, 10, 1)

manager.Start()
time.Sleep(50 * time.Millisecond)

// Stop multiple times should not panic
manager.Stop()
manager.Stop()
manager.Stop()
}

func TestManagerStartIdempotent(t *testing.T) {
store := &mockStateStore{latestVersion: 100}
manager := NewPruningManager(logger.NewNopLogger(), store, 10, 1)

// Start multiple times should only launch one goroutine
manager.Start()
manager.Start()
manager.Start()

time.Sleep(50 * time.Millisecond)
manager.Stop()
}

func TestManagerStopConcurrent(t *testing.T) {
store := &mockStateStore{latestVersion: 100}
manager := NewPruningManager(logger.NewNopLogger(), store, 10, 1)

manager.Start()
time.Sleep(50 * time.Millisecond)

// Concurrent Stop calls should not panic
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
manager.Stop()
}()
}
wg.Wait()
}

func TestManagerDisabledPruning(t *testing.T) {
store := &mockStateStore{latestVersion: 100}

// keepRecent <= 0 should disable pruning
manager := NewPruningManager(logger.NewNopLogger(), store, 0, 1)
manager.Start()
time.Sleep(50 * time.Millisecond)
manager.Stop()

require.Equal(t, int32(0), store.pruneCount.Load())

// pruneInterval <= 0 should also disable pruning
manager2 := NewPruningManager(logger.NewNopLogger(), store, 10, 0)
manager2.Start()
time.Sleep(50 * time.Millisecond)
manager2.Stop()

require.Equal(t, int32(0), store.pruneCount.Load())
}
34 changes: 32 additions & 2 deletions sei-db/state_db/ss/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ss

import (
"fmt"
"sync"

"github.com/sei-protocol/sei-chain/sei-db/common/logger"
"github.com/sei-protocol/sei-chain/sei-db/common/utils"
Expand Down Expand Up @@ -31,6 +32,30 @@ func RegisterBackend(backendType BackendType, initializer BackendInitializer) {
backends[backendType] = initializer
}

// PrunableStateStore wraps a StateStore with pruning lifecycle management.
// When Close() is called, it first stops the pruning goroutine, then closes the underlying store.
type PrunableStateStore struct {
types.StateStore
pruningManager *pruning.Manager
closeOnce sync.Once
closeErr error
}

// Close stops the pruning goroutine and then closes the underlying state store.
// This ensures no pruning operations are in progress when the store is closed.
// Safe to call multiple times (idempotent).
func (p *PrunableStateStore) Close() error {
p.closeOnce.Do(func() {
// First, stop the pruning goroutine and wait for it to exit
if p.pruningManager != nil {
p.pruningManager.Stop()
}
// Then close the underlying store
p.closeErr = p.StateStore.Close()
})
return p.closeErr
}

// NewStateStore Create a new state store with the specified backend type
func NewStateStore(logger logger.Logger, homeDir string, ssConfig config.StateStoreConfig) (types.StateStore, error) {
initializer, ok := backends[BackendType(ssConfig.Backend)]
Expand All @@ -51,10 +76,15 @@ func NewStateStore(logger logger.Logger, homeDir string, ssConfig config.StateSt
return nil, err
}

// Start the pruning manager for DB
// Create pruning manager
pruningManager := pruning.NewPruningManager(logger, stateStore, int64(ssConfig.KeepRecent), int64(ssConfig.PruneIntervalSeconds))
pruningManager.Start()
return stateStore, nil

// Return wrapped store with pruning lifecycle management
return &PrunableStateStore{
StateStore: stateStore,
pruningManager: pruningManager,
}, nil
}

// RecoverStateStore will be called during initialization to recover the state from rlog
Expand Down
Loading
Loading