diff --git a/app/seidb.go b/app/seidb.go index fcd5c54ba3..289d3cdcdb 100644 --- a/app/seidb.go +++ b/app/seidb.go @@ -21,8 +21,8 @@ const ( FlagSCSnapshotKeepRecent = "state-commit.sc-keep-recent" FlagSCSnapshotInterval = "state-commit.sc-snapshot-interval" FlagSCSnapshotMinTimeInterval = "state-commit.sc-snapshot-min-time-interval" - FlagSCSnapshotWriterLimit = "state-commit.sc-snapshot-writer-limit" FlagSCSnapshotPrefetchThreshold = "state-commit.sc-snapshot-prefetch-threshold" + FlagSCSnapshotWriteRateMBps = "state-commit.sc-snapshot-write-rate-mbps" FlagSCCacheSize = "state-commit.sc-cache-size" FlagSCOnlyAllowExportOnSnapshotVersion = "state-commit.sc-only-allow-export-on-snapshot-version" @@ -88,8 +88,8 @@ func parseSCConfigs(appOpts servertypes.AppOptions) config.StateCommitConfig { scConfig.SnapshotKeepRecent = cast.ToUint32(appOpts.Get(FlagSCSnapshotKeepRecent)) scConfig.SnapshotInterval = cast.ToUint32(appOpts.Get(FlagSCSnapshotInterval)) scConfig.SnapshotMinTimeInterval = cast.ToUint32(appOpts.Get(FlagSCSnapshotMinTimeInterval)) - scConfig.SnapshotWriterLimit = cast.ToInt(appOpts.Get(FlagSCSnapshotWriterLimit)) scConfig.SnapshotPrefetchThreshold = cast.ToFloat64(appOpts.Get(FlagSCSnapshotPrefetchThreshold)) + scConfig.SnapshotWriteRateMBps = cast.ToInt(appOpts.Get(FlagSCSnapshotWriteRateMBps)) scConfig.OnlyAllowExportOnSnapshotVersion = cast.ToBool(appOpts.Get(FlagSCOnlyAllowExportOnSnapshotVersion)) return scConfig } diff --git a/app/seidb_test.go b/app/seidb_test.go index 56cb9f97af..0b7db45c0b 100644 --- a/app/seidb_test.go +++ b/app/seidb_test.go @@ -26,10 +26,10 @@ func (t TestSeiDBAppOpts) Get(s string) interface{} { return config.DefaultStateCommitConfig().SnapshotKeepRecent case FlagSCSnapshotMinTimeInterval: return config.DefaultStateCommitConfig().SnapshotMinTimeInterval - case FlagSCSnapshotWriterLimit: - return config.DefaultStateCommitConfig().SnapshotWriterLimit case FlagSCSnapshotPrefetchThreshold: return config.DefaultStateCommitConfig().SnapshotPrefetchThreshold + case FlagSCSnapshotWriteRateMBps: + return config.DefaultStateCommitConfig().SnapshotWriteRateMBps case FlagSSEnable: return config.DefaultStateStoreConfig().Enable case FlagSSBackend: diff --git a/sei-db/config/config.go b/sei-db/config/config.go index 0589497f27..4421c20f40 100644 --- a/sei-db/config/config.go +++ b/sei-db/config/config.go @@ -6,6 +6,7 @@ const ( DefaultSnapshotMinTimeInterval = 60 * 60 // 1 hour in seconds DefaultAsyncCommitBuffer = 100 DefaultSnapshotPrefetchThreshold = 0.8 // prefetch if <80% pages in cache + DefaultSnapshotWriteRateMBps = 100 // 100 MB/s to prevent page cache eviction on most validators DefaultSSKeepRecent = 100000 DefaultSSPruneInterval = 600 DefaultSSImportWorkers = 1 @@ -45,7 +46,9 @@ type StateCommitConfig struct { // This prevents excessive snapshot creation during catch-up. Default to 3600 seconds (1 hour). SnapshotMinTimeInterval uint32 `mapstructure:"snapshot-min-time-interval"` - // SnapshotWriterLimit defines the concurrency for taking commit store snapshot + // SnapshotWriterLimit defines the concurrency for taking commit store snapshot. + // Default to 2 for lower I/O pressure. Higher values speed up snapshot but increase page cache eviction. + // With rate limiting enabled, this mainly affects CPU/goroutine overhead rather than I/O. SnapshotWriterLimit int `mapstructure:"snapshot-writer-limit"` // SnapshotPrefetchThreshold defines the page cache residency threshold (0.0-1.0) @@ -56,6 +59,14 @@ type StateCommitConfig struct { // Setting to 0 disables prefetching. Defaults to 0.8 SnapshotPrefetchThreshold float64 `mapstructure:"snapshot-prefetch-threshold"` + // SnapshotWriteRateMBps defines the maximum write rate (MB/s) for snapshot creation. + // This is a GLOBAL limit shared across all trees and files in a single snapshot operation. + // This helps prevent page cache eviction on machines with limited RAM. + // Default: 100 MB/s (conservative for most validators including 64GB RAM machines). + // Set to a very high value (e.g., 10000) for effectively unlimited. + // 0 or unset will use the default (100 MB/s). + SnapshotWriteRateMBps int `mapstructure:"snapshot-write-rate-mbps"` + // CacheSize defines the size of the cache for each memiavl store. // Deprecated: this is removed, we will just rely on mmap page cache CacheSize int `mapstructure:"cache-size"` @@ -115,6 +126,7 @@ func DefaultStateCommitConfig() StateCommitConfig { SnapshotKeepRecent: DefaultSnapshotKeepRecent, SnapshotMinTimeInterval: DefaultSnapshotMinTimeInterval, SnapshotPrefetchThreshold: DefaultSnapshotPrefetchThreshold, + SnapshotWriteRateMBps: DefaultSnapshotWriteRateMBps, } } diff --git a/sei-db/config/toml.go b/sei-db/config/toml.go index 4697282ecc..72529ea87d 100644 --- a/sei-db/config/toml.go +++ b/sei-db/config/toml.go @@ -36,9 +36,6 @@ sc-snapshot-interval = {{ .StateCommit.SnapshotInterval }} # to allow more frequent snapshots during normal operation. sc-snapshot-min-time-interval = {{ .StateCommit.SnapshotMinTimeInterval }} -# SnapshotWriterLimit defines the max concurrency for taking commit store snapshot -sc-snapshot-writer-limit = {{ .StateCommit.SnapshotWriterLimit }} - # SnapshotPrefetchThreshold defines the page cache residency threshold (0.0-1.0) to trigger snapshot prefetch. # Prefetch sequentially reads nodes/leaves files into page cache for faster cold-start replay. # Only active trees (evm/bank/acc) are prefetched, skipping sparse kv files to save memory. @@ -46,6 +43,15 @@ sc-snapshot-writer-limit = {{ .StateCommit.SnapshotWriterLimit }} # Setting to 0 disables prefetching. Defaults to 0.8 sc-snapshot-prefetch-threshold = {{ .StateCommit.SnapshotPrefetchThreshold }} +# SnapshotWriteRateMBps defines the maximum write rate (MB/s) for snapshot creation. +# This is a GLOBAL limit shared across all trees and files in a single snapshot operation. +# This helps prevent page cache eviction on machines with limited RAM, which can cause +# block execution cache misses and consensus delays. +# Default: 100 MB/s (conservative for most validators including 64GB RAM machines). +# Set to a very high value (e.g., 10000) for effectively unlimited. +# 0 or unset will use the default (100 MB/s). +sc-snapshot-write-rate-mbps = {{ .StateCommit.SnapshotWriteRateMBps }} + # OnlyAllowExportOnSnapshotVersion defines whether we only allow state sync # snapshot creation happens after the memiavl snapshot is created. sc-only-allow-export-on-snapshot-version = {{ .StateCommit.OnlyAllowExportOnSnapshotVersion }} diff --git a/sei-db/sc/memiavl/db.go b/sei-db/sc/memiavl/db.go index 7c3e551823..1898d41065 100644 --- a/sei-db/sc/memiavl/db.go +++ b/sei-db/sc/memiavl/db.go @@ -12,6 +12,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/alitto/pond" @@ -69,9 +70,9 @@ type DB struct { // timestamp of the last successful snapshot creation // Protected by db.mtx (only accessed in Commit call chain) lastSnapshotTime time.Time + // snapshot write rate limit in MB/s, 0 means unlimited + snapshotWriteRateMBps int - // pruneSnapshotLock guards concurrent prune operations; use TryLock in pruneSnapshots - pruneSnapshotLock sync.Mutex // closed guards against double Close(), protected by db.mtx closed bool @@ -91,6 +92,9 @@ type DB struct { mtx sync.Mutex // worker goroutine IdleTimeout = 5s snapshotWriterPool *pond.WorkerPool + + // pruningInProgress guards concurrent prune operations (CAS-based) + pruningInProgress atomic.Bool } const ( @@ -276,6 +280,7 @@ func OpenDB(logger logger.Logger, targetVersion int64, opts Options) (database * snapshotInterval: opts.SnapshotInterval, snapshotMinTimeInterval: opts.SnapshotMinTimeInterval, lastSnapshotTime: lastSnapshotTime, + snapshotWriteRateMBps: opts.SnapshotWriteRateMBps, snapshotWriterPool: workerPool, opts: opts, } @@ -494,18 +499,19 @@ func (db *DB) checkBackgroundSnapshotRewrite() error { // pruneSnapshot prune the old snapshots func (db *DB) pruneSnapshots() { - if !db.pruneSnapshotLock.TryLock() { - db.logger.Info("pruneSnapshots skipped, previous prune still in progress") + // CAS: only one prune can run at a time + if !db.pruningInProgress.CompareAndSwap(false, true) { return } - defer db.pruneSnapshotLock.Unlock() + defer db.pruningInProgress.Store(false) - db.logger.Info("pruneSnapshots started") startTime := time.Now() defer func() { db.logger.Info("pruneSnapshots completed", "duration_sec", fmt.Sprintf("%.2fs", time.Since(startTime).Seconds())) }() + db.logger.Info("pruneSnapshots started") + currentVersion, err := currentVersion(db.dir) if err != nil { db.logger.Error("failed to read current snapshot version", "err", err) @@ -543,6 +549,9 @@ func (db *DB) pruneSnapshots() { db.logger.Error("failed to find first snapshot", "err", err) } + if db.streamHandler == nil { + return + } if err := db.streamHandler.TruncateBefore(utils.VersionToIndex(earliestVersion+1, db.initialVersion.Load())); err != nil { db.logger.Error("failed to truncate rlog", "err", err, "version", earliestVersion+1) } @@ -605,11 +614,12 @@ func (db *DB) copy() *DB { mtree := db.MultiTree.Copy() return &DB{ - MultiTree: *mtree, - logger: db.logger, - dir: db.dir, - snapshotWriterPool: db.snapshotWriterPool, - opts: db.opts, + MultiTree: *mtree, + logger: db.logger, + dir: db.dir, + snapshotWriteRateMBps: db.snapshotWriteRateMBps, + snapshotWriterPool: db.snapshotWriterPool, + opts: db.opts, } } @@ -644,7 +654,7 @@ func (db *DB) RewriteSnapshot(ctx context.Context) error { path := filepath.Clean(filepath.Join(db.dir, tmpDir)) writeStart := time.Now() - err := db.MultiTree.WriteSnapshot(ctx, path, db.snapshotWriterPool) + err := db.MultiTree.WriteSnapshotWithRateLimit(ctx, path, db.snapshotWriterPool, db.snapshotWriteRateMBps) writeElapsed := time.Since(writeStart).Seconds() if err != nil { @@ -817,6 +827,20 @@ func (db *DB) rewriteSnapshotBackground() error { ch <- snapshotResult{err: err} return } + + // Switch mmap hints from SEQUENTIAL to RANDOM for tree operations. + // NewMmap() applies MADV_SEQUENTIAL by default for cold-start replay performance, + // but after loading we need MADV_RANDOM for random tree access patterns. + // Without this, the kernel aggressively discards accessed pages and does wrong-direction + // readahead, which is catastrophic on high-latency storage (e.g. NAS). + // This matches the behavior in OpenDB() which also calls PrepareForRandomRead(). + for _, tree := range mtree.trees { + if tree.snapshot != nil { + tree.snapshot.nodesMap.PrepareForRandomRead() + tree.snapshot.leavesMap.PrepareForRandomRead() + } + } + cloned.logger.Info("loaded multitree after snapshot", "elapsed", time.Since(loadStart).Seconds()) // do a best effort catch-up, will do another final catch-up in main thread. @@ -849,8 +873,9 @@ func (db *DB) Close() error { } db.closed = true // Wait for any ongoing prune to finish, then block new prunes - db.pruneSnapshotLock.Lock() - defer db.pruneSnapshotLock.Unlock() + for !db.pruningInProgress.CompareAndSwap(false, true) { + time.Sleep(time.Millisecond) + } errs := []error{} // Close stream handler db.logger.Info("Closing stream handler...") diff --git a/sei-db/sc/memiavl/db_rewrite_test.go b/sei-db/sc/memiavl/db_rewrite_test.go index 5609be93ba..d6f9e931e6 100644 --- a/sei-db/sc/memiavl/db_rewrite_test.go +++ b/sei-db/sc/memiavl/db_rewrite_test.go @@ -6,6 +6,7 @@ import ( "path/filepath" "testing" + "github.com/cosmos/iavl" "github.com/sei-protocol/sei-db/common/logger" "github.com/sei-protocol/sei-db/proto" "github.com/stretchr/testify/require" @@ -87,3 +88,153 @@ func TestLoadMultiTreeWithPrefetchDisabled(t *testing.T) { tree := db2.TreeByName("test") require.NotNil(t, tree) } + +// TestBackgroundSnapshotSwitchMadvise verifies that after a background snapshot +// load and switch via ReplaceWith(), the snapshot mmap files use MADV_RANDOM +// (suitable for random tree access) instead of the default MADV_SEQUENTIAL. +// +// Background: PR #2497 changed NewMmap() to apply MADV_SEQUENTIAL by default +// for cold-start replay performance. OpenDB() correctly switches to MADV_RANDOM +// after loading, but the background snapshot switch path was missing this call. +// MADV_SEQUENTIAL causes the kernel to aggressively discard accessed pages, +// which is catastrophic on high-latency storage (e.g. NAS) where each page +// fault requires a network round-trip. +// +// This test exercises the background snapshot switch path (LoadMultiTree with +// prefetch disabled -> ReplaceWith) and verifies data remains accessible after +// the switch, confirming the safety net in ReplaceWith() works correctly. +func TestBackgroundSnapshotSwitchMadvise(t *testing.T) { + dir := t.TempDir() + + db, err := OpenDB(logger.NewNopLogger(), 0, Options{ + Dir: dir, + CreateIfMissing: true, + InitialStores: []string{"test"}, + }) + require.NoError(t, err) + defer func() { require.NoError(t, db.Close()) }() + + // Populate the tree with data + for i, changes := range ChangeSets { + cs := []*proto.NamedChangeSet{ + {Name: "test", Changeset: changes}, + } + require.NoError(t, db.ApplyChangeSets(cs)) + v, err := db.Commit() + require.NoError(t, err) + require.Equal(t, int64(i+1), v) + } + + // Verify initial data + tree := db.TreeByName("test") + require.NotNil(t, tree) + val := tree.Get([]byte("hello1")) + require.Equal(t, []byte("world1"), val) + + // Create a snapshot (simulating background snapshot write) + require.NoError(t, db.RewriteSnapshot(context.Background())) + + // Simulate the background snapshot load path: + // LoadMultiTree with prefetch disabled (exactly like rewriteSnapshotBackground does) + loadOpts := db.opts + loadOpts.PrefetchThreshold = 0 + mtree, err := LoadMultiTree(currentPath(dir), loadOpts) + require.NoError(t, err) + + // At this point, mtree's mmap files have MADV_SEQUENTIAL from NewMmap(). + // The safety net in Tree.ReplaceWith() should switch them to MADV_RANDOM. + + // Switch to the new snapshot via reloadMultiTree (same path as checkBackgroundSnapshotRewrite) + require.NoError(t, db.reloadMultiTree(mtree)) + + // Verify data is still accessible after the switch. + // With MADV_SEQUENTIAL and no safety net, repeated random reads would cause + // the kernel to discard pages, leading to performance degradation on NAS. + // Here we verify correctness; the madvise fix ensures performance. + tree = db.TreeByName("test") + require.NotNil(t, tree) + val = tree.Get([]byte("hello1")) + require.Equal(t, []byte("world1"), val) + + // Perform multiple random reads to exercise the tree access pattern. + // ChangeSets[2] sets hello2=world1 and hello3=world1. + val = tree.Get([]byte("hello2")) + require.Equal(t, []byte("world1"), val) + val = tree.Get([]byte("hello3")) + require.Equal(t, []byte("world1"), val) + + // Re-read previously accessed keys (would cause page faults with MADV_SEQUENTIAL) + val = tree.Get([]byte("hello1")) + require.Equal(t, []byte("world1"), val) +} + +// TestReplaceWithPreservesDataIntegrity verifies that Tree.ReplaceWith() correctly +// switches snapshot and the new snapshot is functional for random access patterns. +func TestReplaceWithPreservesDataIntegrity(t *testing.T) { + dir := t.TempDir() + + db, err := OpenDB(logger.NewNopLogger(), 0, Options{ + Dir: dir, + CreateIfMissing: true, + InitialStores: []string{"evm", "bank"}, + }) + require.NoError(t, err) + defer func() { require.NoError(t, db.Close()) }() + + // Populate multiple stores with different data + for i := 0; i < 10; i++ { + cs := []*proto.NamedChangeSet{ + { + Name: "evm", + Changeset: iavl.ChangeSet{ + Pairs: []*iavl.KVPair{ + {Key: []byte{byte(i)}, Value: []byte{byte(i * 2)}}, + }, + }, + }, + { + Name: "bank", + Changeset: iavl.ChangeSet{ + Pairs: []*iavl.KVPair{ + {Key: []byte{byte(i + 100)}, Value: []byte{byte(i * 3)}}, + }, + }, + }, + } + require.NoError(t, db.ApplyChangeSets(cs)) + _, err := db.Commit() + require.NoError(t, err) + } + + // Create snapshot + require.NoError(t, db.RewriteSnapshot(context.Background())) + + // Load snapshot without prefetch (background load path) + loadOpts := db.opts + loadOpts.PrefetchThreshold = 0 + mtree, err := LoadMultiTree(currentPath(dir), loadOpts) + require.NoError(t, err) + + // Switch via ReplaceWith (triggers PrepareForRandomRead safety net) + require.NoError(t, db.reloadMultiTree(mtree)) + + // Verify all data across multiple stores after switch + evmTree := db.TreeByName("evm") + require.NotNil(t, evmTree) + bankTree := db.TreeByName("bank") + require.NotNil(t, bankTree) + + for i := 0; i < 10; i++ { + val := evmTree.Get([]byte{byte(i)}) + require.Equal(t, []byte{byte(i * 2)}, val, "evm key %d", i) + + val = bankTree.Get([]byte{byte(i + 100)}) + require.Equal(t, []byte{byte(i * 3)}, val, "bank key %d", i) + } + + // Random access pattern: re-read in reverse order + for i := 9; i >= 0; i-- { + val := evmTree.Get([]byte{byte(i)}) + require.Equal(t, []byte{byte(i * 2)}, val, "evm reverse key %d", i) + } +} diff --git a/sei-db/sc/memiavl/multitree.go b/sei-db/sc/memiavl/multitree.go index 3312c42f5e..9b965aefb2 100644 --- a/sei-db/sc/memiavl/multitree.go +++ b/sei-db/sc/memiavl/multitree.go @@ -13,6 +13,7 @@ import ( "github.com/alitto/pond" "golang.org/x/exp/slices" + "golang.org/x/time/rate" "github.com/cosmos/iavl" "github.com/sei-protocol/sei-db/common/errors" @@ -412,19 +413,35 @@ func (t *MultiTree) Catchup(stream types.Stream[proto.ChangelogEntry], endVersio } func (t *MultiTree) WriteSnapshot(ctx context.Context, dir string, wp *pond.WorkerPool) error { - t.logger.Info("starting snapshot write", "trees", len(t.trees)) + return t.WriteSnapshotWithRateLimit(ctx, dir, wp, 0) +} + +// WriteSnapshotWithRateLimit writes snapshot with optional rate limiting. +// rateMBps is the rate limit in MB/s. 0 means unlimited. +// A single global limiter is shared across ALL trees and files to ensure +// the total write rate is capped at the configured value. +func (t *MultiTree) WriteSnapshotWithRateLimit(ctx context.Context, dir string, wp *pond.WorkerPool, rateMBps int) error { + t.logger.Info("starting snapshot write", "trees", len(t.trees), "rate_limit_mbps", rateMBps) if err := os.MkdirAll(dir, os.ModePerm); err != nil { //nolint:gosec return err } + // Create a single global limiter shared by all trees and files + // This ensures total write rate is capped regardless of parallelism + limiter := NewGlobalRateLimiter(rateMBps) + if limiter != nil { + t.logger.Info("global rate limiting enabled", "rate_mbps", rateMBps) + } + // Write EVM first to avoid disk I/O contention, then parallel - return t.writeSnapshotPriorityEVM(ctx, dir, wp) + return t.writeSnapshotPriorityEVM(ctx, dir, wp, limiter) } // writeSnapshotPriorityEVM writes EVM tree first, then others in parallel // Best strategy: reduces disk I/O contention for the largest tree -func (t *MultiTree) writeSnapshotPriorityEVM(ctx context.Context, dir string, wp *pond.WorkerPool) error { +// limiter is a shared rate limiter. nil means unlimited. +func (t *MultiTree) writeSnapshotPriorityEVM(ctx context.Context, dir string, wp *pond.WorkerPool, limiter *rate.Limiter) error { startTime := time.Now() // Phase 1: Write EVM tree first (if it exists) @@ -444,7 +461,7 @@ func (t *MultiTree) writeSnapshotPriorityEVM(ctx context.Context, dir string, wp if evmTree != nil { t.logger.Info("writing evm tree", "phase", "1/2") evmStart := time.Now() - if err := evmTree.WriteSnapshot(ctx, filepath.Join(dir, evmName)); err != nil { + if err := evmTree.WriteSnapshotWithRateLimit(ctx, filepath.Join(dir, evmName), limiter); err != nil { return err } evmElapsed := time.Since(evmStart).Seconds() @@ -471,7 +488,8 @@ func (t *MultiTree) writeSnapshotPriorityEVM(ctx context.Context, dir string, wp wg.Add(1) wp.Submit(func() { defer wg.Done() - if err := entry.Tree.WriteSnapshot(ctx, filepath.Join(dir, entry.Name)); err != nil { + // All trees share the same limiter for global rate control + if err := entry.Tree.WriteSnapshotWithRateLimit(ctx, filepath.Join(dir, entry.Name), limiter); err != nil { mu.Lock() errs = append(errs, fmt.Errorf("tree %s: %w", entry.Name, err)) mu.Unlock() diff --git a/sei-db/sc/memiavl/opts.go b/sei-db/sc/memiavl/opts.go index f329ad04de..1db002856b 100644 --- a/sei-db/sc/memiavl/opts.go +++ b/sei-db/sc/memiavl/opts.go @@ -2,7 +2,6 @@ package memiavl import ( "errors" - "runtime" "time" "github.com/sei-protocol/sei-db/common/logger" @@ -48,6 +47,12 @@ type Options struct { // Minimum time interval between snapshots // This prevents excessive snapshot creation during catch-up. Default is 1 hour. SnapshotMinTimeInterval time.Duration + + // SnapshotWriteRateMBps defines the maximum write rate (MB/s) for snapshot creation. + // This is a GLOBAL limit shared across all trees and files. + // Default: 100. Set to a very high value (e.g., 10000) for effectively unlimited. + // 0 or unset will use the default. + SnapshotWriteRateMBps int } func (opts Options) Validate() error { @@ -68,13 +73,17 @@ func (opts *Options) FillDefaults() { } if opts.SnapshotWriterLimit <= 0 { - opts.SnapshotWriterLimit = runtime.NumCPU() + opts.SnapshotWriterLimit = 2 // Default to 2 for lower I/O pressure on most validators } if opts.SnapshotMinTimeInterval <= 0 { opts.SnapshotMinTimeInterval = 1 * time.Hour } + if opts.SnapshotWriteRateMBps <= 0 { + opts.SnapshotWriteRateMBps = config.DefaultSnapshotWriteRateMBps + } + opts.PrefetchThreshold = 0.8 opts.Logger = logger.NewNopLogger() opts.SnapshotKeepRecent = config.DefaultSnapshotKeepRecent diff --git a/sei-db/sc/memiavl/snapshot.go b/sei-db/sc/memiavl/snapshot.go index 3fcec56789..fdc56b4ecb 100644 --- a/sei-db/sc/memiavl/snapshot.go +++ b/sei-db/sc/memiavl/snapshot.go @@ -18,6 +18,7 @@ import ( "github.com/sei-protocol/sei-db/common/logger" "github.com/sei-protocol/sei-db/sc/types" "golang.org/x/sys/unix" + "golang.org/x/time/rate" "github.com/sei-protocol/sei-db/common/errors" ) @@ -54,6 +55,64 @@ func (w *monitoringWriter) Write(p []byte) (n int, err error) { return n, err } +// rateLimitedWriter wraps an io.Writer with rate limiting to prevent +// page cache eviction on machines with limited RAM. +type rateLimitedWriter struct { + w io.Writer + limiter *rate.Limiter + ctx context.Context +} + +// NewGlobalRateLimiter creates a shared rate limiter for snapshot writes. +// rateMBps is the rate limit in MB/s. If <= 0, returns nil (no limit). +// This limiter should be shared across all files and trees in a single snapshot operation. +func NewGlobalRateLimiter(rateMBps int) *rate.Limiter { + if rateMBps <= 0 { + return nil + } + // Convert MB/s to bytes/s, use rate.Limiter with burst = 4MB (allows some batching) + bytesPerSec := rate.Limit(rateMBps * 1024 * 1024) + burstBytes := 4 * 1024 * 1024 // 4MB burst for better batching + return rate.NewLimiter(bytesPerSec, burstBytes) +} + +// newRateLimitedWriter creates a rate-limited writer with a shared limiter. +// If limiter is nil, returns the original writer (no limit). +func newRateLimitedWriter(ctx context.Context, w io.Writer, limiter *rate.Limiter) io.Writer { + if limiter == nil { + return w + } + return &rateLimitedWriter{ + w: w, + limiter: limiter, + ctx: ctx, + } +} + +func (w *rateLimitedWriter) Write(p []byte) (n int, err error) { + // Wait for rate limiter before writing + // For large writes, we may need to wait multiple times + remaining := len(p) + written := 0 + for remaining > 0 { + // Limit each wait to burst size to avoid very long waits + toWrite := remaining + if toWrite > w.limiter.Burst() { + toWrite = w.limiter.Burst() + } + if err := w.limiter.WaitN(w.ctx, toWrite); err != nil { + return written, err + } + n, err := w.w.Write(p[written : written+toWrite]) + written += n + remaining -= n + if err != nil { + return written, err + } + } + return written, nil +} + // Snapshot manage the lifecycle of mmap-ed files for the snapshot, // it must out live the objects that derived from it. type Snapshot struct { @@ -391,6 +450,12 @@ func (snapshot *Snapshot) export(callback func(*types.SnapshotNode) bool) { } func (t *Tree) WriteSnapshot(ctx context.Context, snapshotDir string) error { + return t.WriteSnapshotWithRateLimit(ctx, snapshotDir, nil) +} + +// WriteSnapshotWithRateLimit writes snapshot with optional rate limiting. +// limiter is a shared rate limiter. nil means unlimited. +func (t *Tree) WriteSnapshotWithRateLimit(ctx context.Context, snapshotDir string, limiter *rate.Limiter) error { // Estimate tree size: root.Size() returns leaf count, total = leaves + branches ≈ 2x treeSize := int64(0) if t.root != nil { @@ -400,7 +465,7 @@ func (t *Tree) WriteSnapshot(ctx context.Context, snapshotDir string) error { // Use 128MB buffer for all trees (large buffer for better performance) bufSize := bufIOSize - err := writeSnapshotWithBuffer(ctx, snapshotDir, t.version, bufSize, treeSize, t.logger, func(w *snapshotWriter) (uint32, error) { + err := writeSnapshotWithBuffer(ctx, snapshotDir, t.version, bufSize, treeSize, limiter, t.logger, func(w *snapshotWriter) (uint32, error) { if t.root == nil { return 0, nil } @@ -418,12 +483,14 @@ func (t *Tree) WriteSnapshot(ctx context.Context, snapshotDir string) error { return nil } -// writeSnapshotWithBuffer writes snapshot with specified buffer size +// writeSnapshotWithBuffer writes snapshot with specified buffer size and optional rate limiting. +// limiter is a shared rate limiter. nil means unlimited. func writeSnapshotWithBuffer( ctx context.Context, dir string, version uint32, bufSize int, totalNodes int64, + limiter *rate.Limiter, log logger.Logger, doWrite func(*snapshotWriter) (uint32, error), ) (returnErr error) { @@ -470,10 +537,17 @@ func writeSnapshotWithBuffer( leavesMonitor := &monitoringWriter{f: fpLeaves} kvsMonitor := &monitoringWriter{f: fpKVs} + // Apply rate limiting if configured (shared limiter across all files) + // This ensures total write rate is capped regardless of file count + var nodesRateLimited, leavesRateLimited, kvsRateLimited io.Writer + nodesRateLimited = newRateLimitedWriter(ctx, nodesMonitor, limiter) + leavesRateLimited = newRateLimitedWriter(ctx, leavesMonitor, limiter) + kvsRateLimited = newRateLimitedWriter(ctx, kvsMonitor, limiter) + // Create buffered writers with buffers - nodesWriter := bufio.NewWriterSize(nodesMonitor, bufSize) - leavesWriter := bufio.NewWriterSize(leavesMonitor, bufSize) - kvsWriter := bufio.NewWriterSize(kvsMonitor, bufSize) + nodesWriter := bufio.NewWriterSize(nodesRateLimited, bufSize) + leavesWriter := bufio.NewWriterSize(leavesRateLimited, bufSize) + kvsWriter := bufio.NewWriterSize(kvsRateLimited, bufSize) w := newSnapshotWriter(ctx, nodesWriter, leavesWriter, kvsWriter, log) w.treeName = filepath.Base(dir) // Set tree name for progress reporting @@ -547,8 +621,8 @@ func writeSnapshot( dir string, version uint32, doWrite func(*snapshotWriter) (uint32, error), ) error { - // Use nop logger for backward compatibility - return writeSnapshotWithBuffer(ctx, dir, version, bufIOSize, 0, logger.NewNopLogger(), doWrite) + // Use nop logger and no rate limit for backward compatibility + return writeSnapshotWithBuffer(ctx, dir, version, bufIOSize, 0, nil, logger.NewNopLogger(), doWrite) } // kvWriteOp represents a key-value write operation diff --git a/sei-db/sc/memiavl/snapshot_pipeline_test.go b/sei-db/sc/memiavl/snapshot_pipeline_test.go index 9f8c5b6ef9..5879fd8b87 100644 --- a/sei-db/sc/memiavl/snapshot_pipeline_test.go +++ b/sei-db/sc/memiavl/snapshot_pipeline_test.go @@ -2,7 +2,10 @@ package memiavl import ( "context" + "io" + "sync" "testing" + "time" "github.com/cosmos/iavl" "github.com/sei-protocol/sei-db/common/logger" @@ -139,6 +142,7 @@ func TestWriteSnapshotWithBuffer(t *testing.T) { tree.version, 1024*1024, // 1MB buffer int64(tree.root.Size()), + nil, // no rate limit logger.NewNopLogger(), func(w *snapshotWriter) (uint32, error) { if tree.root == nil { @@ -153,6 +157,44 @@ func TestWriteSnapshotWithBuffer(t *testing.T) { require.NoError(t, err) } +func TestGlobalRateLimiterSharedAcrossWriters(t *testing.T) { + ctx := context.Background() + // Use NewGlobalRateLimiter to test the actual API surface + limiter := NewGlobalRateLimiter(1) // 1MB/s with 4MB burst + + w1 := newRateLimitedWriter(ctx, io.Discard, limiter) + w2 := newRateLimitedWriter(ctx, io.Discard, limiter) + + // Write 5MB total (more than burst) to ensure rate limiting kicks in + payload := make([]byte, 2*1024*1024+512*1024) // 2.5MB each = 5MB total + + start := time.Now() + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + _, err := w1.Write(payload) + require.NoError(t, err) + }() + go func() { + defer wg.Done() + _, err := w2.Write(payload) + require.NoError(t, err) + }() + wg.Wait() + + // 5MB total at 1MB/s should take ~5s, but with 4MB burst initial writes are fast. + // After burst is consumed, remaining 1MB is rate-limited. + // Use conservative threshold: at least 800ms (allows for burst + some jitter) + elapsed := time.Since(start) + require.GreaterOrEqual(t, elapsed, 800*time.Millisecond, + "Expected rate limiting to slow down writes (5MB at 1MB/s should take >800ms after burst)") + + // Also sanity check it didn't take unreasonably long (e.g., >10s indicates a bug) + require.LessOrEqual(t, elapsed, 10*time.Second, + "Write took too long, possible deadlock or excessive rate limiting") +} + // TestPipelineMetrics tests pipeline metrics reporting func TestPipelineMetrics(t *testing.T) { tree := New(0) diff --git a/sei-db/sc/memiavl/tree.go b/sei-db/sc/memiavl/tree.go index dc25285d9a..fae0f7873a 100644 --- a/sei-db/sc/memiavl/tree.go +++ b/sei-db/sc/memiavl/tree.go @@ -329,6 +329,16 @@ func (t *Tree) ReplaceWith(other *Tree) error { t.initialVersion = other.initialVersion t.cowVersion = other.cowVersion t.zeroCopy = other.zeroCopy + + // Safety net: ensure the new snapshot uses MADV_RANDOM for tree operations. + // NewMmap() defaults to MADV_SEQUENTIAL for cold-start replay, but tree access + // patterns are random. Without this, the kernel aggressively discards accessed + // pages, which is catastrophic on high-latency storage (e.g. NAS). + if t.snapshot != nil { + t.snapshot.nodesMap.PrepareForRandomRead() + t.snapshot.leavesMap.PrepareForRandomRead() + } + if snapshot != nil { return snapshot.Close() } diff --git a/sei-db/sc/store.go b/sei-db/sc/store.go index 3e24e2f2cd..4cdf50a3ff 100644 --- a/sei-db/sc/store.go +++ b/sei-db/sc/store.go @@ -34,6 +34,7 @@ func NewCommitStore(homeDir string, logger logger.Logger, config config.StateCom SnapshotKeepRecent: config.SnapshotKeepRecent, SnapshotMinTimeInterval: time.Duration(config.SnapshotMinTimeInterval) * time.Second, SnapshotWriterLimit: config.SnapshotWriterLimit, + SnapshotWriteRateMBps: config.SnapshotWriteRateMBps, PrefetchThreshold: config.SnapshotPrefetchThreshold, CreateIfMissing: true, OnlyAllowExportOnSnapshotVersion: config.OnlyAllowExportOnSnapshotVersion,