diff --git a/app/app.go b/app/app.go index 141a2531a4..a337bd3055 100644 --- a/app/app.go +++ b/app/app.go @@ -1030,13 +1030,8 @@ func (app *App) HandleClose() error { } } - // Close state store (SeiDB) - critical for cleaning up background goroutines - if app.stateStore != nil { - if err := app.stateStore.Close(); err != nil { - app.Logger().Error("failed to close state store", "error", err) - errs = append(errs, fmt.Errorf("failed to close state store: %w", err)) - } - } + // Note: stateStore (ssStore) is already closed by cms.Close() in BaseApp.Close() + // No need to close it again here. if len(errs) > 0 { return fmt.Errorf("errors during close: %v", errs) diff --git a/sei-db/db_engine/pebbledb/mvcc/db.go b/sei-db/db_engine/pebbledb/mvcc/db.go index a321ea792e..ce119da6ab 100644 --- a/sei-db/db_engine/pebbledb/mvcc/db.go +++ b/sei-db/db_engine/pebbledb/mvcc/db.go @@ -492,6 +492,11 @@ func (db *Database) writeAsyncInBackground() { // it has been updated. This occurs when that module's keys are updated in between pruning runs, the node after is restarted. // This is not a large issue given the next time that module is updated, it will be properly pruned thereafter. func (db *Database) Prune(version int64) (_err error) { + // Defensive check: ensure database is not closed + if db.storage == nil { + return errors.New("pebbledb: database is closed") + } + startTime := time.Now() defer func() { otelMetrics.pruneLatency.Record( diff --git a/sei-db/db_engine/rocksdb/mvcc/db.go b/sei-db/db_engine/rocksdb/mvcc/db.go index 53f1a67a79..c3a65e8aec 100644 --- a/sei-db/db_engine/rocksdb/mvcc/db.go +++ b/sei-db/db_engine/rocksdb/mvcc/db.go @@ -291,6 +291,11 @@ func (db *Database) writeAsyncInBackground() { // lazy prune. Future compactions will honor the increased full_history_ts_low // and trim history when possible. func (db *Database) Prune(version int64) error { + // Defensive check: ensure database is not closed + if db.storage == nil { + return fmt.Errorf("rocksdb: database is closed") + } + tsLow := version + 1 // we increment by 1 to include the provided version var ts [TimestampSize]byte diff --git a/sei-db/state_db/ss/pruning/manager.go b/sei-db/state_db/ss/pruning/manager.go index 6e4e1cb876..b41995527d 100644 --- a/sei-db/state_db/ss/pruning/manager.go +++ b/sei-db/state_db/ss/pruning/manager.go @@ -3,6 +3,7 @@ package pruning import ( "fmt" "math/rand" + "sync" "time" "github.com/sei-protocol/sei-chain/sei-db/common/logger" @@ -14,7 +15,12 @@ type Manager struct { stateStore types.StateStore keepRecent int64 pruneInterval int64 - started bool + + // Lifecycle management + startOnce sync.Once + stopCh chan struct{} + stopOnce sync.Once + wg sync.WaitGroup } // NewPruningManager creates a new pruning manager for state store @@ -30,31 +36,65 @@ func NewPruningManager( stateStore: stateStore, keepRecent: keepRecent, pruneInterval: pruneInterval, + stopCh: make(chan struct{}), } } func (m *Manager) Start() { - if m.keepRecent <= 0 || m.pruneInterval <= 0 || m.started { + if m.keepRecent <= 0 || m.pruneInterval <= 0 { return } - m.started = true - go func() { - for { - pruneStartTime := time.Now() - latestVersion := m.stateStore.GetLatestVersion() - pruneVersion := latestVersion - m.keepRecent - if pruneVersion > 0 { - // prune all versions up to and including the pruneVersion - if err := m.stateStore.Prune(pruneVersion); err != nil { - m.logger.Error("failed to prune versions till", "version", pruneVersion, "err", err) - } + m.startOnce.Do(func() { + m.wg.Add(1) + go m.pruneLoop() + }) +} + +// Stop gracefully stops the pruning goroutine and waits for it to exit. +// Safe to call multiple times (idempotent). +func (m *Manager) Stop() { + m.stopOnce.Do(func() { + close(m.stopCh) + }) + m.wg.Wait() // Safe: WaitGroup.Wait() is idempotent when counter is 0 +} + +func (m *Manager) pruneLoop() { + defer m.wg.Done() + + for { + // Check for stop signal before pruning + select { + case <-m.stopCh: + m.logger.Info("Pruning manager stopped") + return + default: + } + + pruneStartTime := time.Now() + latestVersion := m.stateStore.GetLatestVersion() + pruneVersion := latestVersion - m.keepRecent + if pruneVersion > 0 { + // prune all versions up to and including the pruneVersion + if err := m.stateStore.Prune(pruneVersion); err != nil { + m.logger.Error("failed to prune versions till", "version", pruneVersion, "err", err) + } else { m.logger.Info(fmt.Sprintf("Pruned state store till version %d took %s\n", pruneVersion, time.Since(pruneStartTime))) } + } - // Generate a random percentage (between 0% and 100%) of the fixed interval as a delay - randomPercentage := rand.Float64() // Generate a random float between 0 and 1 - randomDelay := int64(float64(m.pruneInterval) * randomPercentage) - time.Sleep(time.Duration(m.pruneInterval+randomDelay) * time.Second) + // Generate a random percentage (between 0% and 100%) of the fixed interval as a delay + randomPercentage := rand.Float64() + randomDelay := int64(float64(m.pruneInterval) * randomPercentage) + sleepDuration := time.Duration(m.pruneInterval+randomDelay) * time.Second + + // Wait with stop signal check + select { + case <-m.stopCh: + m.logger.Info("Pruning manager stopped") + return + case <-time.After(sleepDuration): + // Continue to next iteration } - }() + } } diff --git a/sei-db/state_db/ss/pruning/manager_test.go b/sei-db/state_db/ss/pruning/manager_test.go new file mode 100644 index 0000000000..49333d81df --- /dev/null +++ b/sei-db/state_db/ss/pruning/manager_test.go @@ -0,0 +1,160 @@ +package pruning + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/sei-protocol/sei-chain/sei-db/common/logger" + "github.com/sei-protocol/sei-chain/sei-db/proto" + "github.com/sei-protocol/sei-chain/sei-db/state_db/ss/types" + "github.com/stretchr/testify/require" +) + +// mockStateStore is a minimal StateStore implementation for testing +type mockStateStore struct { + latestVersion int64 + pruneCount atomic.Int32 + closeCount atomic.Int32 +} + +func (m *mockStateStore) Get(storeKey string, version int64, key []byte) ([]byte, error) { + return nil, nil +} +func (m *mockStateStore) Has(storeKey string, version int64, key []byte) (bool, error) { + return false, nil +} +func (m *mockStateStore) GetLatestVersion() int64 { + return m.latestVersion +} +func (m *mockStateStore) SetLatestVersion(version int64) error { + m.latestVersion = version + return nil +} +func (m *mockStateStore) GetEarliestVersion() int64 { + return 1 +} +func (m *mockStateStore) SetEarliestVersion(version int64, ignoreVersion bool) error { + return nil +} +func (m *mockStateStore) GetLatestMigratedKey() ([]byte, error) { + return nil, nil +} +func (m *mockStateStore) GetLatestMigratedModule() (string, error) { + return "", nil +} +func (m *mockStateStore) WriteBlockRangeHash(storeKey string, beginBlockRange, endBlockRange int64, hash []byte) error { + return nil +} +func (m *mockStateStore) Iterator(storeKey string, version int64, start, end []byte) (types.DBIterator, error) { + return nil, nil +} +func (m *mockStateStore) ReverseIterator(storeKey string, version int64, start, end []byte) (types.DBIterator, error) { + return nil, nil +} +func (m *mockStateStore) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error { + return nil +} +func (m *mockStateStore) ApplyChangesetSync(version int64, changesets []*proto.NamedChangeSet) error { + return nil +} +func (m *mockStateStore) Prune(version int64) error { + m.pruneCount.Add(1) + return nil +} +func (m *mockStateStore) Close() error { + m.closeCount.Add(1) + return nil +} +func (m *mockStateStore) RawIterate(storeKey string, fn func([]byte, []byte, int64) bool) (bool, error) { + return false, nil +} +func (m *mockStateStore) Import(version int64, ch <-chan types.SnapshotNode) error { + return nil +} +func (m *mockStateStore) RawImport(ch <-chan types.RawSnapshotNode) error { + return nil +} + +func TestManagerStartStop(t *testing.T) { + store := &mockStateStore{latestVersion: 100} + manager := NewPruningManager(logger.NewNopLogger(), store, 10, 1) + + // Start should launch the goroutine + manager.Start() + + // Give it time to run at least one prune cycle + time.Sleep(100 * time.Millisecond) + + // Stop should gracefully terminate + manager.Stop() + + // Verify prune was called at least once + require.GreaterOrEqual(t, store.pruneCount.Load(), int32(1)) +} + +func TestManagerStopIdempotent(t *testing.T) { + store := &mockStateStore{latestVersion: 100} + manager := NewPruningManager(logger.NewNopLogger(), store, 10, 1) + + manager.Start() + time.Sleep(50 * time.Millisecond) + + // Stop multiple times should not panic + manager.Stop() + manager.Stop() + manager.Stop() +} + +func TestManagerStartIdempotent(t *testing.T) { + store := &mockStateStore{latestVersion: 100} + manager := NewPruningManager(logger.NewNopLogger(), store, 10, 1) + + // Start multiple times should only launch one goroutine + manager.Start() + manager.Start() + manager.Start() + + time.Sleep(50 * time.Millisecond) + manager.Stop() +} + +func TestManagerStopConcurrent(t *testing.T) { + store := &mockStateStore{latestVersion: 100} + manager := NewPruningManager(logger.NewNopLogger(), store, 10, 1) + + manager.Start() + time.Sleep(50 * time.Millisecond) + + // Concurrent Stop calls should not panic + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + manager.Stop() + }() + } + wg.Wait() +} + +func TestManagerDisabledPruning(t *testing.T) { + store := &mockStateStore{latestVersion: 100} + + // keepRecent <= 0 should disable pruning + manager := NewPruningManager(logger.NewNopLogger(), store, 0, 1) + manager.Start() + time.Sleep(50 * time.Millisecond) + manager.Stop() + + require.Equal(t, int32(0), store.pruneCount.Load()) + + // pruneInterval <= 0 should also disable pruning + manager2 := NewPruningManager(logger.NewNopLogger(), store, 10, 0) + manager2.Start() + time.Sleep(50 * time.Millisecond) + manager2.Stop() + + require.Equal(t, int32(0), store.pruneCount.Load()) +} diff --git a/sei-db/state_db/ss/store.go b/sei-db/state_db/ss/store.go index 83d3f7eda1..bf7dd1b5c5 100644 --- a/sei-db/state_db/ss/store.go +++ b/sei-db/state_db/ss/store.go @@ -2,6 +2,7 @@ package ss import ( "fmt" + "sync" "github.com/sei-protocol/sei-chain/sei-db/common/logger" "github.com/sei-protocol/sei-chain/sei-db/common/utils" @@ -31,6 +32,30 @@ func RegisterBackend(backendType BackendType, initializer BackendInitializer) { backends[backendType] = initializer } +// PrunableStateStore wraps a StateStore with pruning lifecycle management. +// When Close() is called, it first stops the pruning goroutine, then closes the underlying store. +type PrunableStateStore struct { + types.StateStore + pruningManager *pruning.Manager + closeOnce sync.Once + closeErr error +} + +// Close stops the pruning goroutine and then closes the underlying state store. +// This ensures no pruning operations are in progress when the store is closed. +// Safe to call multiple times (idempotent). +func (p *PrunableStateStore) Close() error { + p.closeOnce.Do(func() { + // First, stop the pruning goroutine and wait for it to exit + if p.pruningManager != nil { + p.pruningManager.Stop() + } + // Then close the underlying store + p.closeErr = p.StateStore.Close() + }) + return p.closeErr +} + // NewStateStore Create a new state store with the specified backend type func NewStateStore(logger logger.Logger, homeDir string, ssConfig config.StateStoreConfig) (types.StateStore, error) { initializer, ok := backends[BackendType(ssConfig.Backend)] @@ -51,10 +76,15 @@ func NewStateStore(logger logger.Logger, homeDir string, ssConfig config.StateSt return nil, err } - // Start the pruning manager for DB + // Create pruning manager pruningManager := pruning.NewPruningManager(logger, stateStore, int64(ssConfig.KeepRecent), int64(ssConfig.PruneIntervalSeconds)) pruningManager.Start() - return stateStore, nil + + // Return wrapped store with pruning lifecycle management + return &PrunableStateStore{ + StateStore: stateStore, + pruningManager: pruningManager, + }, nil } // RecoverStateStore will be called during initialization to recover the state from rlog diff --git a/sei-db/state_db/ss/store_test.go b/sei-db/state_db/ss/store_test.go index 9db08d4c41..93194dd7de 100644 --- a/sei-db/state_db/ss/store_test.go +++ b/sei-db/state_db/ss/store_test.go @@ -4,11 +4,15 @@ import ( "fmt" "os" "path/filepath" + "sync" + "sync/atomic" "testing" "github.com/sei-protocol/sei-chain/sei-db/common/logger" "github.com/sei-protocol/sei-chain/sei-db/config" "github.com/sei-protocol/sei-chain/sei-db/proto" + "github.com/sei-protocol/sei-chain/sei-db/state_db/ss/pruning" + "github.com/sei-protocol/sei-chain/sei-db/state_db/ss/types" iavl "github.com/sei-protocol/sei-chain/sei-iavl" "github.com/stretchr/testify/require" ) @@ -57,3 +61,111 @@ func TestNewStateStore(t *testing.T) { } } + +// mockStateStore is a minimal StateStore implementation for testing Close idempotency +type mockStateStore struct { + closeCount atomic.Int32 +} + +func (m *mockStateStore) Get(storeKey string, version int64, key []byte) ([]byte, error) { + return nil, nil +} +func (m *mockStateStore) Has(storeKey string, version int64, key []byte) (bool, error) { + return false, nil +} +func (m *mockStateStore) GetLatestVersion() int64 { + return 0 +} +func (m *mockStateStore) SetLatestVersion(version int64) error { + return nil +} +func (m *mockStateStore) GetEarliestVersion() int64 { + return 0 +} +func (m *mockStateStore) SetEarliestVersion(version int64, ignoreVersion bool) error { + return nil +} +func (m *mockStateStore) GetLatestMigratedKey() ([]byte, error) { + return nil, nil +} +func (m *mockStateStore) GetLatestMigratedModule() (string, error) { + return "", nil +} +func (m *mockStateStore) WriteBlockRangeHash(storeKey string, beginBlockRange, endBlockRange int64, hash []byte) error { + return nil +} +func (m *mockStateStore) Iterator(storeKey string, version int64, start, end []byte) (types.DBIterator, error) { + return nil, nil +} +func (m *mockStateStore) ReverseIterator(storeKey string, version int64, start, end []byte) (types.DBIterator, error) { + return nil, nil +} +func (m *mockStateStore) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error { + return nil +} +func (m *mockStateStore) ApplyChangesetSync(version int64, changesets []*proto.NamedChangeSet) error { + return nil +} +func (m *mockStateStore) Prune(version int64) error { + return nil +} +func (m *mockStateStore) Close() error { + m.closeCount.Add(1) + return nil +} +func (m *mockStateStore) RawIterate(storeKey string, fn func([]byte, []byte, int64) bool) (bool, error) { + return false, nil +} +func (m *mockStateStore) Import(version int64, ch <-chan types.SnapshotNode) error { + return nil +} +func (m *mockStateStore) RawImport(ch <-chan types.RawSnapshotNode) error { + return nil +} + +func TestPrunableStateStoreCloseIdempotent(t *testing.T) { + mock := &mockStateStore{} + manager := pruning.NewPruningManager(logger.NewNopLogger(), mock, 0, 0) // disabled pruning + + pss := &PrunableStateStore{ + StateStore: mock, + pruningManager: manager, + } + + // Close multiple times should only close underlying store once + err := pss.Close() + require.NoError(t, err) + + err = pss.Close() + require.NoError(t, err) + + err = pss.Close() + require.NoError(t, err) + + // Verify underlying Close was called exactly once + require.Equal(t, int32(1), mock.closeCount.Load()) +} + +func TestPrunableStateStoreCloseConcurrent(t *testing.T) { + mock := &mockStateStore{} + manager := pruning.NewPruningManager(logger.NewNopLogger(), mock, 0, 0) + + pss := &PrunableStateStore{ + StateStore: mock, + pruningManager: manager, + } + + // Concurrent Close calls should not panic and only close once + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _ = pss.Close() + }() + } + wg.Wait() + + // Verify underlying Close was called exactly once + require.Equal(t, int32(1), mock.closeCount.Load()) +}