Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions app/seidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ const (
FlagSCSnapshotKeepRecent = "state-commit.sc-keep-recent"
FlagSCSnapshotInterval = "state-commit.sc-snapshot-interval"
FlagSCSnapshotMinTimeInterval = "state-commit.sc-snapshot-min-time-interval"
FlagSCSnapshotWriterLimit = "state-commit.sc-snapshot-writer-limit"
FlagSCSnapshotPrefetchThreshold = "state-commit.sc-snapshot-prefetch-threshold"
FlagSCSnapshotWriteRateMBps = "state-commit.sc-snapshot-write-rate-mbps"
FlagSCCacheSize = "state-commit.sc-cache-size"
FlagSCOnlyAllowExportOnSnapshotVersion = "state-commit.sc-only-allow-export-on-snapshot-version"

Expand Down Expand Up @@ -88,8 +88,8 @@ func parseSCConfigs(appOpts servertypes.AppOptions) config.StateCommitConfig {
scConfig.SnapshotKeepRecent = cast.ToUint32(appOpts.Get(FlagSCSnapshotKeepRecent))
scConfig.SnapshotInterval = cast.ToUint32(appOpts.Get(FlagSCSnapshotInterval))
scConfig.SnapshotMinTimeInterval = cast.ToUint32(appOpts.Get(FlagSCSnapshotMinTimeInterval))
scConfig.SnapshotWriterLimit = cast.ToInt(appOpts.Get(FlagSCSnapshotWriterLimit))
scConfig.SnapshotPrefetchThreshold = cast.ToFloat64(appOpts.Get(FlagSCSnapshotPrefetchThreshold))
scConfig.SnapshotWriteRateMBps = cast.ToInt(appOpts.Get(FlagSCSnapshotWriteRateMBps))
scConfig.OnlyAllowExportOnSnapshotVersion = cast.ToBool(appOpts.Get(FlagSCOnlyAllowExportOnSnapshotVersion))
return scConfig
}
Expand Down
4 changes: 2 additions & 2 deletions app/seidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ func (t TestSeiDBAppOpts) Get(s string) interface{} {
return config.DefaultStateCommitConfig().SnapshotKeepRecent
case FlagSCSnapshotMinTimeInterval:
return config.DefaultStateCommitConfig().SnapshotMinTimeInterval
case FlagSCSnapshotWriterLimit:
return config.DefaultStateCommitConfig().SnapshotWriterLimit
case FlagSCSnapshotPrefetchThreshold:
return config.DefaultStateCommitConfig().SnapshotPrefetchThreshold
case FlagSCSnapshotWriteRateMBps:
return config.DefaultStateCommitConfig().SnapshotWriteRateMBps
case FlagSSEnable:
return config.DefaultStateStoreConfig().Enable
case FlagSSBackend:
Expand Down
14 changes: 13 additions & 1 deletion sei-db/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const (
DefaultSnapshotMinTimeInterval = 60 * 60 // 1 hour in seconds
DefaultAsyncCommitBuffer = 100
DefaultSnapshotPrefetchThreshold = 0.8 // prefetch if <80% pages in cache
DefaultSnapshotWriteRateMBps = 100 // 100 MB/s to prevent page cache eviction on most validators
DefaultSSKeepRecent = 100000
DefaultSSPruneInterval = 600
DefaultSSImportWorkers = 1
Expand Down Expand Up @@ -45,7 +46,9 @@ type StateCommitConfig struct {
// This prevents excessive snapshot creation during catch-up. Default to 3600 seconds (1 hour).
SnapshotMinTimeInterval uint32 `mapstructure:"snapshot-min-time-interval"`

// SnapshotWriterLimit defines the concurrency for taking commit store snapshot
// SnapshotWriterLimit defines the concurrency for taking commit store snapshot.
// Default to 2 for lower I/O pressure. Higher values speed up snapshot but increase page cache eviction.
// With rate limiting enabled, this mainly affects CPU/goroutine overhead rather than I/O.
SnapshotWriterLimit int `mapstructure:"snapshot-writer-limit"`

// SnapshotPrefetchThreshold defines the page cache residency threshold (0.0-1.0)
Expand All @@ -56,6 +59,14 @@ type StateCommitConfig struct {
// Setting to 0 disables prefetching. Defaults to 0.8
SnapshotPrefetchThreshold float64 `mapstructure:"snapshot-prefetch-threshold"`

// SnapshotWriteRateMBps defines the maximum write rate (MB/s) for snapshot creation.
// This is a GLOBAL limit shared across all trees and files in a single snapshot operation.
// This helps prevent page cache eviction on machines with limited RAM.
// Default: 100 MB/s (conservative for most validators including 64GB RAM machines).
// Set to a very high value (e.g., 10000) for effectively unlimited.
// 0 or unset will use the default (100 MB/s).
SnapshotWriteRateMBps int `mapstructure:"snapshot-write-rate-mbps"`

// CacheSize defines the size of the cache for each memiavl store.
// Deprecated: this is removed, we will just rely on mmap page cache
CacheSize int `mapstructure:"cache-size"`
Expand Down Expand Up @@ -115,6 +126,7 @@ func DefaultStateCommitConfig() StateCommitConfig {
SnapshotKeepRecent: DefaultSnapshotKeepRecent,
SnapshotMinTimeInterval: DefaultSnapshotMinTimeInterval,
SnapshotPrefetchThreshold: DefaultSnapshotPrefetchThreshold,
SnapshotWriteRateMBps: DefaultSnapshotWriteRateMBps,
}
}

Expand Down
12 changes: 9 additions & 3 deletions sei-db/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,22 @@ sc-snapshot-interval = {{ .StateCommit.SnapshotInterval }}
# to allow more frequent snapshots during normal operation.
sc-snapshot-min-time-interval = {{ .StateCommit.SnapshotMinTimeInterval }}

# SnapshotWriterLimit defines the max concurrency for taking commit store snapshot
sc-snapshot-writer-limit = {{ .StateCommit.SnapshotWriterLimit }}

# SnapshotPrefetchThreshold defines the page cache residency threshold (0.0-1.0) to trigger snapshot prefetch.
# Prefetch sequentially reads nodes/leaves files into page cache for faster cold-start replay.
# Only active trees (evm/bank/acc) are prefetched, skipping sparse kv files to save memory.
# Skips prefetch if more than threshold of pages already resident (e.g., 0.8 = 80%).
# Setting to 0 disables prefetching. Defaults to 0.8
sc-snapshot-prefetch-threshold = {{ .StateCommit.SnapshotPrefetchThreshold }}

# SnapshotWriteRateMBps defines the maximum write rate (MB/s) for snapshot creation.
# This is a GLOBAL limit shared across all trees and files in a single snapshot operation.
# This helps prevent page cache eviction on machines with limited RAM, which can cause
# block execution cache misses and consensus delays.
# Default: 100 MB/s (conservative for most validators including 64GB RAM machines).
# Set to a very high value (e.g., 10000) for effectively unlimited.
# 0 or unset will use the default (100 MB/s).
sc-snapshot-write-rate-mbps = {{ .StateCommit.SnapshotWriteRateMBps }}

# OnlyAllowExportOnSnapshotVersion defines whether we only allow state sync
# snapshot creation happens after the memiavl snapshot is created.
sc-only-allow-export-on-snapshot-version = {{ .StateCommit.OnlyAllowExportOnSnapshotVersion }}
Expand Down
39 changes: 25 additions & 14 deletions sei-db/sc/memiavl/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/alitto/pond"
Expand Down Expand Up @@ -69,9 +70,9 @@ type DB struct {
// timestamp of the last successful snapshot creation
// Protected by db.mtx (only accessed in Commit call chain)
lastSnapshotTime time.Time
// snapshot write rate limit in MB/s, 0 means unlimited
snapshotWriteRateMBps int

// pruneSnapshotLock guards concurrent prune operations; use TryLock in pruneSnapshots
pruneSnapshotLock sync.Mutex
// closed guards against double Close(), protected by db.mtx
closed bool

Expand All @@ -91,6 +92,9 @@ type DB struct {
mtx sync.Mutex
// worker goroutine IdleTimeout = 5s
snapshotWriterPool *pond.WorkerPool

// pruningInProgress guards concurrent prune operations (CAS-based)
pruningInProgress atomic.Bool
}

const (
Expand Down Expand Up @@ -276,6 +280,7 @@ func OpenDB(logger logger.Logger, targetVersion int64, opts Options) (database *
snapshotInterval: opts.SnapshotInterval,
snapshotMinTimeInterval: opts.SnapshotMinTimeInterval,
lastSnapshotTime: lastSnapshotTime,
snapshotWriteRateMBps: opts.SnapshotWriteRateMBps,
snapshotWriterPool: workerPool,
opts: opts,
}
Expand Down Expand Up @@ -494,18 +499,19 @@ func (db *DB) checkBackgroundSnapshotRewrite() error {

// pruneSnapshot prune the old snapshots
func (db *DB) pruneSnapshots() {
if !db.pruneSnapshotLock.TryLock() {
db.logger.Info("pruneSnapshots skipped, previous prune still in progress")
// CAS: only one prune can run at a time
if !db.pruningInProgress.CompareAndSwap(false, true) {
return
}
defer db.pruneSnapshotLock.Unlock()
defer db.pruningInProgress.Store(false)

db.logger.Info("pruneSnapshots started")
startTime := time.Now()
defer func() {
db.logger.Info("pruneSnapshots completed", "duration_sec", fmt.Sprintf("%.2fs", time.Since(startTime).Seconds()))
}()

db.logger.Info("pruneSnapshots started")

currentVersion, err := currentVersion(db.dir)
if err != nil {
db.logger.Error("failed to read current snapshot version", "err", err)
Expand Down Expand Up @@ -543,6 +549,9 @@ func (db *DB) pruneSnapshots() {
db.logger.Error("failed to find first snapshot", "err", err)
}

if db.streamHandler == nil {
return
}
if err := db.streamHandler.TruncateBefore(utils.VersionToIndex(earliestVersion+1, db.initialVersion.Load())); err != nil {
db.logger.Error("failed to truncate rlog", "err", err, "version", earliestVersion+1)
}
Expand Down Expand Up @@ -605,11 +614,12 @@ func (db *DB) copy() *DB {
mtree := db.MultiTree.Copy()

return &DB{
MultiTree: *mtree,
logger: db.logger,
dir: db.dir,
snapshotWriterPool: db.snapshotWriterPool,
opts: db.opts,
MultiTree: *mtree,
logger: db.logger,
dir: db.dir,
snapshotWriteRateMBps: db.snapshotWriteRateMBps,
snapshotWriterPool: db.snapshotWriterPool,
opts: db.opts,
}
}

Expand Down Expand Up @@ -644,7 +654,7 @@ func (db *DB) RewriteSnapshot(ctx context.Context) error {
path := filepath.Clean(filepath.Join(db.dir, tmpDir))

writeStart := time.Now()
err := db.MultiTree.WriteSnapshot(ctx, path, db.snapshotWriterPool)
err := db.MultiTree.WriteSnapshotWithRateLimit(ctx, path, db.snapshotWriterPool, db.snapshotWriteRateMBps)
writeElapsed := time.Since(writeStart).Seconds()

if err != nil {
Expand Down Expand Up @@ -849,8 +859,9 @@ func (db *DB) Close() error {
}
db.closed = true
// Wait for any ongoing prune to finish, then block new prunes
db.pruneSnapshotLock.Lock()
defer db.pruneSnapshotLock.Unlock()
for !db.pruningInProgress.CompareAndSwap(false, true) {
time.Sleep(time.Millisecond)
}
errs := []error{}
// Close stream handler
db.logger.Info("Closing stream handler...")
Expand Down
28 changes: 23 additions & 5 deletions sei-db/sc/memiavl/multitree.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/alitto/pond"
"golang.org/x/exp/slices"
"golang.org/x/time/rate"

"github.com/cosmos/iavl"
"github.com/sei-protocol/sei-db/common/errors"
Expand Down Expand Up @@ -412,19 +413,35 @@ func (t *MultiTree) Catchup(stream types.Stream[proto.ChangelogEntry], endVersio
}

func (t *MultiTree) WriteSnapshot(ctx context.Context, dir string, wp *pond.WorkerPool) error {
t.logger.Info("starting snapshot write", "trees", len(t.trees))
return t.WriteSnapshotWithRateLimit(ctx, dir, wp, 0)
}

// WriteSnapshotWithRateLimit writes snapshot with optional rate limiting.
// rateMBps is the rate limit in MB/s. 0 means unlimited.
// A single global limiter is shared across ALL trees and files to ensure
// the total write rate is capped at the configured value.
func (t *MultiTree) WriteSnapshotWithRateLimit(ctx context.Context, dir string, wp *pond.WorkerPool, rateMBps int) error {
t.logger.Info("starting snapshot write", "trees", len(t.trees), "rate_limit_mbps", rateMBps)

if err := os.MkdirAll(dir, os.ModePerm); err != nil { //nolint:gosec
return err
}

// Create a single global limiter shared by all trees and files
// This ensures total write rate is capped regardless of parallelism
limiter := NewGlobalRateLimiter(rateMBps)
if limiter != nil {
t.logger.Info("global rate limiting enabled", "rate_mbps", rateMBps)
}

// Write EVM first to avoid disk I/O contention, then parallel
return t.writeSnapshotPriorityEVM(ctx, dir, wp)
return t.writeSnapshotPriorityEVM(ctx, dir, wp, limiter)
}

// writeSnapshotPriorityEVM writes EVM tree first, then others in parallel
// Best strategy: reduces disk I/O contention for the largest tree
func (t *MultiTree) writeSnapshotPriorityEVM(ctx context.Context, dir string, wp *pond.WorkerPool) error {
// limiter is a shared rate limiter. nil means unlimited.
func (t *MultiTree) writeSnapshotPriorityEVM(ctx context.Context, dir string, wp *pond.WorkerPool, limiter *rate.Limiter) error {
startTime := time.Now()

// Phase 1: Write EVM tree first (if it exists)
Expand All @@ -444,7 +461,7 @@ func (t *MultiTree) writeSnapshotPriorityEVM(ctx context.Context, dir string, wp
if evmTree != nil {
t.logger.Info("writing evm tree", "phase", "1/2")
evmStart := time.Now()
if err := evmTree.WriteSnapshot(ctx, filepath.Join(dir, evmName)); err != nil {
if err := evmTree.WriteSnapshotWithRateLimit(ctx, filepath.Join(dir, evmName), limiter); err != nil {
return err
}
evmElapsed := time.Since(evmStart).Seconds()
Expand All @@ -471,7 +488,8 @@ func (t *MultiTree) writeSnapshotPriorityEVM(ctx context.Context, dir string, wp
wg.Add(1)
wp.Submit(func() {
defer wg.Done()
if err := entry.Tree.WriteSnapshot(ctx, filepath.Join(dir, entry.Name)); err != nil {
// All trees share the same limiter for global rate control
if err := entry.Tree.WriteSnapshotWithRateLimit(ctx, filepath.Join(dir, entry.Name), limiter); err != nil {
mu.Lock()
errs = append(errs, fmt.Errorf("tree %s: %w", entry.Name, err))
mu.Unlock()
Expand Down
13 changes: 11 additions & 2 deletions sei-db/sc/memiavl/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package memiavl

import (
"errors"
"runtime"
"time"

"github.com/sei-protocol/sei-db/common/logger"
Expand Down Expand Up @@ -48,6 +47,12 @@ type Options struct {
// Minimum time interval between snapshots
// This prevents excessive snapshot creation during catch-up. Default is 1 hour.
SnapshotMinTimeInterval time.Duration

// SnapshotWriteRateMBps defines the maximum write rate (MB/s) for snapshot creation.
// This is a GLOBAL limit shared across all trees and files.
// Default: 100. Set to a very high value (e.g., 10000) for effectively unlimited.
// 0 or unset will use the default.
SnapshotWriteRateMBps int
}

func (opts Options) Validate() error {
Expand All @@ -68,13 +73,17 @@ func (opts *Options) FillDefaults() {
}

if opts.SnapshotWriterLimit <= 0 {
opts.SnapshotWriterLimit = runtime.NumCPU()
opts.SnapshotWriterLimit = 2 // Default to 2 for lower I/O pressure on most validators
}

if opts.SnapshotMinTimeInterval <= 0 {
opts.SnapshotMinTimeInterval = 1 * time.Hour
}

if opts.SnapshotWriteRateMBps <= 0 {
opts.SnapshotWriteRateMBps = config.DefaultSnapshotWriteRateMBps
}

opts.PrefetchThreshold = 0.8
opts.Logger = logger.NewNopLogger()
opts.SnapshotKeepRecent = config.DefaultSnapshotKeepRecent
Expand Down
Loading
Loading