diff --git a/evmrpc/bloom.go b/evmrpc/bloom.go index 135e31abad..0d7cdb8238 100644 --- a/evmrpc/bloom.go +++ b/evmrpc/bloom.go @@ -3,91 +3,18 @@ package evmrpc import ( "github.com/ethereum/go-ethereum/common" ethtypes "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" + "github.com/sei-protocol/sei-chain/evmrpc/ethbloom" ) -var BitMasks = [8]uint8{1, 2, 4, 8, 16, 32, 64, 128} - -// bloomIndexes represents the bit indexes inside the bloom filter that belong -// to some key. -type bloomIndexes [3]uint - -// calcBloomIndexes returns the bloom filter bit indexes belonging to the given key. -func calcBloomIndexes(b []byte) bloomIndexes { - b = crypto.Keccak256(b) - - var idxs bloomIndexes - for i := 0; i < len(idxs); i++ { - idxs[i] = (uint(b[2*i])<<8)&2047 + uint(b[2*i+1]) - } - return idxs -} +// BloomIndexes is re-exported for backward compatibility. +type BloomIndexes = ethbloom.BloomIndexes -// res: AND on outer level, OR on mid level, AND on inner level (i.e. all 3 bits) -func EncodeFilters(addresses []common.Address, topics [][]common.Hash) (res [][]bloomIndexes) { - filters := make([][][]byte, 1+len(topics)) - if len(addresses) > 0 { - filter := make([][]byte, len(addresses)) - for i, address := range addresses { - filter[i] = address.Bytes() - } - filters = append(filters, filter) - } - for _, topicList := range topics { - filter := make([][]byte, len(topicList)) - for i, topic := range topicList { - filter[i] = topic.Bytes() - } - filters = append(filters, filter) - } - for _, filter := range filters { - // Gather the bit indexes of the filter rule, special casing the nil filter - if len(filter) == 0 { - continue - } - bloomBits := make([]bloomIndexes, len(filter)) - for i, clause := range filter { - if clause == nil { - bloomBits = nil - break - } - bloomBits[i] = calcBloomIndexes(clause) - } - // Accumulate the filter rules if no nil rule was within - if bloomBits != nil { - res = append(res, bloomBits) - } - } - return -} - -// TODO: parallelize if filters too large -func MatchFilters(bloom ethtypes.Bloom, filters [][]bloomIndexes) bool { - for _, filter := range filters { - if !matchFilter(bloom, filter) { - return false - } - } - return true -} +var BitMasks = [8]uint8{1, 2, 4, 8, 16, 32, 64, 128} -func matchFilter(bloom ethtypes.Bloom, filter []bloomIndexes) bool { - for _, possibility := range filter { - if matchBloomIndexes(bloom, possibility) { - return true - } - } - return false +func EncodeFilters(addresses []common.Address, topics [][]common.Hash) [][]BloomIndexes { + return ethbloom.EncodeFilters(addresses, topics) } -func matchBloomIndexes(bloom ethtypes.Bloom, idx bloomIndexes) bool { - for _, bit := range idx { - // big endian - whichByte := bloom[ethtypes.BloomByteLength-1-bit/8] - mask := BitMasks[bit%8] - if whichByte&mask == 0 { - return false - } - } - return true +func MatchFilters(bloom ethtypes.Bloom, filters [][]BloomIndexes) bool { + return ethbloom.MatchFilters(bloom, filters) } diff --git a/evmrpc/ethbloom/bloom.go b/evmrpc/ethbloom/bloom.go new file mode 100644 index 0000000000..29ac596f86 --- /dev/null +++ b/evmrpc/ethbloom/bloom.go @@ -0,0 +1,126 @@ +package ethbloom + +import ( + "github.com/ethereum/go-ethereum/common" + ethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/eth/filters" +) + +var bitMasks = [8]uint8{1, 2, 4, 8, 16, 32, 64, 128} + +// BloomIndexes represents the bit indexes inside the bloom filter that belong +// to some key. +type BloomIndexes [3]uint + +func calcBloomIndexes(b []byte) BloomIndexes { + b = crypto.Keccak256(b) + + var idxs BloomIndexes + for i := 0; i < len(idxs); i++ { + idxs[i] = (uint(b[2*i])<<8)&2047 + uint(b[2*i+1]) + } + return idxs +} + +// EncodeFilters builds bloom-index slices from filter criteria. +// Result semantics: AND on outer level, OR on mid level, AND on inner level (all 3 bits). +func EncodeFilters(addresses []common.Address, topics [][]common.Hash) (res [][]BloomIndexes) { + filters := make([][][]byte, 1+len(topics)) + if len(addresses) > 0 { + filter := make([][]byte, len(addresses)) + for i, address := range addresses { + filter[i] = address.Bytes() + } + filters = append(filters, filter) + } + for _, topicList := range topics { + filter := make([][]byte, len(topicList)) + for i, topic := range topicList { + filter[i] = topic.Bytes() + } + filters = append(filters, filter) + } + for _, filter := range filters { + if len(filter) == 0 { + continue + } + bloomBits := make([]BloomIndexes, len(filter)) + for i, clause := range filter { + if clause == nil { + bloomBits = nil + break + } + bloomBits[i] = calcBloomIndexes(clause) + } + if bloomBits != nil { + res = append(res, bloomBits) + } + } + return +} + +// MatchFilters returns true when bloom matches all filter groups. +func MatchFilters(bloom ethtypes.Bloom, filterGroups [][]BloomIndexes) bool { + for _, filter := range filterGroups { + if !matchFilter(bloom, filter) { + return false + } + } + return true +} + +func matchFilter(bloom ethtypes.Bloom, filter []BloomIndexes) bool { + for _, possibility := range filter { + if matchBloomIndexes(bloom, possibility) { + return true + } + } + return false +} + +func matchBloomIndexes(bloom ethtypes.Bloom, idx BloomIndexes) bool { + for _, bit := range idx { + whichByte := bloom[ethtypes.BloomByteLength-1-bit/8] + mask := bitMasks[bit%8] + if whichByte&mask == 0 { + return false + } + } + return true +} + +// MatchesCriteria checks if a log matches the filter criteria (exact match). +func MatchesCriteria(log *ethtypes.Log, crit filters.FilterCriteria) bool { + if len(crit.Addresses) > 0 { + found := false + for _, addr := range crit.Addresses { + if log.Address == addr { + found = true + break + } + } + if !found { + return false + } + } + for i, topicList := range crit.Topics { + if len(topicList) == 0 { + continue + } + if i >= len(log.Topics) { + return false + } + found := false + for _, topic := range topicList { + if log.Topics[i] == topic { + found = true + break + } + } + if !found { + return false + } + } + return true +} diff --git a/evmrpc/filter.go b/evmrpc/filter.go index 5115e56105..727d44c894 100644 --- a/evmrpc/filter.go +++ b/evmrpc/filter.go @@ -16,6 +16,7 @@ import ( ethrpc "github.com/ethereum/go-ethereum/rpc" "github.com/hashicorp/golang-lru/v2/expirable" evmrpcconfig "github.com/sei-protocol/sei-chain/evmrpc/config" + "github.com/sei-protocol/sei-chain/evmrpc/ethbloom" "github.com/sei-protocol/sei-chain/sei-cosmos/client" sdk "github.com/sei-protocol/sei-chain/sei-cosmos/types" "github.com/sei-protocol/sei-chain/sei-db/ledger_db/receipt" @@ -957,7 +958,7 @@ func (f *LogFetcher) collectLogs(block *coretypes.ResultBlock, crit filters.Filt // Pre-encode bloom filter indexes for fast per-receipt filtering hasFilters := len(crit.Addresses) != 0 || len(crit.Topics) != 0 - var filterIndexes [][]bloomIndexes + var filterIndexes [][]BloomIndexes if hasFilters { filterIndexes = EncodeFilters(crit.Addresses, crit.Topics) } @@ -996,7 +997,7 @@ func (f *LogFetcher) collectLogs(block *coretypes.ResultBlock, crit filters.Filt } logIndex++ - if !matchesCriteria(ethLog, crit) { + if !MatchesCriteria(ethLog, crit) { continue } collector.Append(ethLog) @@ -1004,47 +1005,13 @@ func (f *LogFetcher) collectLogs(block *coretypes.ResultBlock, crit filters.Filt } } -// matchesCriteria checks if a log matches the filter criteria -func matchesCriteria(log *ethtypes.Log, crit filters.FilterCriteria) bool { - // Check address filter - if len(crit.Addresses) > 0 { - found := false - for _, addr := range crit.Addresses { - if log.Address == addr { - found = true - break - } - } - if !found { - return false - } - } - - // Check topics filter - for i, topicList := range crit.Topics { - if len(topicList) == 0 { - continue - } - if i >= len(log.Topics) { - return false - } - found := false - for _, topic := range topicList { - if log.Topics[i] == topic { - found = true - break - } - } - if !found { - return false - } - } - - return true +// MatchesCriteria checks if a log matches the filter criteria. +func MatchesCriteria(log *ethtypes.Log, crit filters.FilterCriteria) bool { + return ethbloom.MatchesCriteria(log, crit) } // Optimized fetchBlocksByCrit with batch processing -func (f *LogFetcher) fetchBlocksByCrit(ctx context.Context, crit filters.FilterCriteria, lastToHeight int64, bloomIndexes [][]bloomIndexes) (chan *coretypes.ResultBlock, int64, bool, error) { +func (f *LogFetcher) fetchBlocksByCrit(ctx context.Context, crit filters.FilterCriteria, lastToHeight int64, bloomIndexes [][]BloomIndexes) (chan *coretypes.ResultBlock, int64, bool, error) { if crit.BlockHash != nil { // Check for invalid zero hash zeroHash := common.Hash{} @@ -1143,7 +1110,7 @@ func (f *LogFetcher) fetchBlocksByCrit(ctx context.Context, crit filters.FilterC } // Batch processing function for blocks -func (f *LogFetcher) processBatch(ctx context.Context, start, end int64, crit filters.FilterCriteria, bloomIndexes [][]bloomIndexes, res chan *coretypes.ResultBlock, errChan chan error) { +func (f *LogFetcher) processBatch(ctx context.Context, start, end int64, crit filters.FilterCriteria, bloomIndexes [][]BloomIndexes, res chan *coretypes.ResultBlock, errChan chan error) { defer func() { metrics.IncrementRpcRequestCounter("num_blocks_fetched", "blocks", true) }() diff --git a/sei-db/db_engine/pebbledb/mvcc/db.go b/sei-db/db_engine/pebbledb/mvcc/db.go index e1940671c2..2ce5d674ee 100644 --- a/sei-db/db_engine/pebbledb/mvcc/db.go +++ b/sei-db/db_engine/pebbledb/mvcc/db.go @@ -80,6 +80,7 @@ type Database struct { type VersionedChangesets struct { Version int64 Changesets []*proto.NamedChangeSet + Done chan struct{} // non-nil for barrier: closed when this entry is processed } func OpenDB(dataDir string, config config.StateStoreConfig) (*Database, error) { @@ -216,6 +217,15 @@ func (db *Database) Close() error { return err } +// PebbleMetrics returns the underlying Pebble DB metrics for observability (e.g. compaction/flush counts). +// Returns nil if the database is closed. +func (db *Database) PebbleMetrics() *pebble.Metrics { + if db.storage == nil { + return nil + } + return db.storage.Metrics() +} + func (db *Database) SetLatestVersion(version int64) error { if version < 0 { return fmt.Errorf("version must be non-negative") @@ -457,11 +467,6 @@ func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.Named int64(len(db.pendingChanges)), ) }() - // Add to pending changes first - db.pendingChanges <- VersionedChangesets{ - Version: version, - Changesets: changesets, - } // Write to WAL if db.streamHandler != nil { entry := proto.ChangelogEntry{ @@ -474,13 +479,21 @@ func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.Named return err } } - + // Add to pending changes first + db.pendingChanges <- VersionedChangesets{ + Version: version, + Changesets: changesets, + } return nil } func (db *Database) writeAsyncInBackground() { defer db.asyncWriteWG.Done() for nextChange := range db.pendingChanges { + if nextChange.Done != nil { + close(nextChange.Done) + continue + } version := nextChange.Version if err := db.ApplyChangesetSync(version, nextChange.Changesets); err != nil { panic(err) @@ -488,6 +501,13 @@ func (db *Database) writeAsyncInBackground() { } } +// WaitForPendingWrites waits for all pending writes to be processed +func (db *Database) WaitForPendingWrites() { + done := make(chan struct{}) + db.pendingChanges <- VersionedChangesets{Done: done} + <-done +} + // Prune attempts to prune all versions up to and including the current version // Get the range of keys, manually iterate over them and delete them // We add a heuristic to skip over a module's keys during pruning if it hasn't been updated diff --git a/sei-db/ledger_db/parquet/store.go b/sei-db/ledger_db/parquet/store.go index 040a89be01..f3e4723cc4 100644 --- a/sei-db/ledger_db/parquet/store.go +++ b/sei-db/ledger_db/parquet/store.go @@ -172,6 +172,14 @@ func (s *Store) CacheRotateInterval() uint64 { return s.config.MaxBlocksPerFile } +// SetBlockFlushInterval overrides the number of blocks buffered before +// flushing to a parquet file. Intended for testing. +func (s *Store) SetBlockFlushInterval(interval uint64) { + s.mu.Lock() + defer s.mu.Unlock() + s.config.BlockFlushInterval = interval +} + // GetReceiptByTxHash retrieves a receipt by transaction hash. func (s *Store) GetReceiptByTxHash(ctx context.Context, txHash common.Hash) (*ReceiptResult, error) { return s.Reader.GetReceiptByTxHash(ctx, txHash) @@ -482,6 +490,14 @@ func (s *Store) initWriters() error { return nil } +// Flush acquires the write lock and flushes all buffered data to disk. +// Mostly used for testing and benchmarking. +func (s *Store) Flush() error { + s.mu.Lock() + defer s.mu.Unlock() + return s.flushLocked() +} + func (s *Store) flushLocked() error { if len(s.receiptsBuffer) == 0 { return nil diff --git a/sei-db/ledger_db/receipt/receipt_bench_parquet_test.go b/sei-db/ledger_db/receipt/receipt_bench_parquet_test.go new file mode 100644 index 0000000000..313201015d --- /dev/null +++ b/sei-db/ledger_db/receipt/receipt_bench_parquet_test.go @@ -0,0 +1,68 @@ +package receipt + +import ( + "fmt" + "testing" + + dbLogger "github.com/sei-protocol/sei-chain/sei-db/common/logger" + dbconfig "github.com/sei-protocol/sei-chain/sei-db/config" +) + +func benchmarkParquetWriteAsync(b *testing.B, receiptsPerBlock int, blocks int) { + b.Helper() + + ctx, storeKey := newTestContext() + cfg := dbconfig.DefaultReceiptStoreConfig() + cfg.DBDirectory = b.TempDir() + fmt.Println("cfg.DBDirectory = ", cfg.DBDirectory) + cfg.KeepRecent = 1000 + cfg.PruneIntervalSeconds = 10 + cfg.Backend = receiptBackendParquet + + store, err := newReceiptBackend(dbLogger.NewNopLogger(), cfg, storeKey) + if err != nil { + b.Fatalf("failed to create receipt store: %v", err) + } + b.Cleanup(func() { _ = store.Close() }) + + pqs, ok := store.(*parquetReceiptStore) + if !ok { + b.Fatalf("expected parquet receipt store, got %T", store) + } + + pqs.store.SetBlockFlushInterval(100) + + totalReceipts := receiptsPerBlock * blocks + addrs := addressPool(5) + t0s := topicPool(3, 0x01) + t1s := topicPool(3, 0x02) + batch := makeDiverseReceiptBatch(1, receiptsPerBlock, 0, addrs, t0s, t1s, nil) + bytePerReceipt := len(batch[0].ReceiptBytes) + totalBytes := int64(bytePerReceipt * totalReceipts) + + b.SetBytes(totalBytes) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + for block := 0; block < blocks; block++ { + blockNumber := int64(i*blocks + block + 1) + for j := range batch { + batch[j].Receipt.BlockNumber = uint64(blockNumber) //nolint:gosec + } + if err := pqs.SetReceipts(ctx.WithBlockHeight(blockNumber), batch); err != nil { + b.Fatalf("failed to write receipts: %v", err) + } + if (block+1)%1000 == 0 { + fmt.Printf("parquet: block %d/%d (%.1f%%)\n", block+1, blocks, float64(block+1)/float64(blocks)*100) + } + } + fmt.Println("parquet: starting final flush...") + if err := pqs.store.Flush(); err != nil { + b.Fatalf("failed to flush parquet files: %v", err) + } + fmt.Println("parquet: final flush complete") + } + b.StopTimer() + + reportBenchMetrics(b, totalReceipts, totalBytes, blocks) +} diff --git a/sei-db/ledger_db/receipt/receipt_bench_read_test.go b/sei-db/ledger_db/receipt/receipt_bench_read_test.go new file mode 100644 index 0000000000..e2bb661bec --- /dev/null +++ b/sei-db/ledger_db/receipt/receipt_bench_read_test.go @@ -0,0 +1,525 @@ +package receipt + +import ( + "encoding/binary" + "fmt" + "sync" + "sync/atomic" + "testing" + + "github.com/ethereum/go-ethereum/common" + ethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/filters" + "github.com/sei-protocol/sei-chain/evmrpc/ethbloom" + sdk "github.com/sei-protocol/sei-chain/sei-cosmos/types" + dbLogger "github.com/sei-protocol/sei-chain/sei-db/common/logger" + dbconfig "github.com/sei-protocol/sei-chain/sei-db/config" + "github.com/sei-protocol/sei-chain/x/evm/types" +) + +const ( + readBenchWriteBlocks = 2000 + readBenchReceiptsPerBlock = 3000 + readBenchNumAddresses = 50 + readBenchNumTopic0 = 7 // coprime with numAddresses so all (addr, topic0) combos occur + readBenchNumTopic1 = 97 // coprime with both +) + +type rangeConfig struct { + size int + store bool // true → target early blocks (outside cache); false → target tail (cache) +} + +var ( + readBenchRanges = []rangeConfig{ + {100, false}, + {500, false}, + {100, true}, + {500, true}, + } + readBenchConcurrences = []int{1, 2, 4, 8, 16} +) + +type filterConfig struct { + name string + crit func(env *readBenchEnv) filters.FilterCriteria +} + +var readBenchFilters = []filterConfig{ + { + name: "none", + crit: func(_ *readBenchEnv) filters.FilterCriteria { + return filters.FilterCriteria{} + }, + }, + { + name: "address", + crit: func(env *readBenchEnv) filters.FilterCriteria { + return filters.FilterCriteria{ + Addresses: []common.Address{env.addrs[0]}, + } + }, + }, + { + name: "topic0", + crit: func(env *readBenchEnv) filters.FilterCriteria { + return filters.FilterCriteria{ + Topics: [][]common.Hash{{env.topic0s[0]}}, + } + }, + }, + { + name: "address+topic0", + crit: func(env *readBenchEnv) filters.FilterCriteria { + return filters.FilterCriteria{ + Addresses: []common.Address{env.addrs[0]}, + Topics: [][]common.Hash{{env.topic0s[0]}}, + } + }, + }, +} + +// BenchmarkReceiptReadStore100 is a focused benchmark: blocks=[1,100] (store-only), +// concurrency=16, filter=address+topic0, for both pebble and parquet. +func BenchmarkReceiptReadStore100(b *testing.B) { + for _, backend := range []string{receiptBackendPebble, receiptBackendParquet} { + b.Run(backend, func(b *testing.B) { + // Setup in parent scope so the probe run (b.N=1) and real run share it. + env := setupReadBenchmark(b, backend, + readBenchWriteBlocks, readBenchReceiptsPerBlock, + readBenchNumAddresses, readBenchNumTopic0, readBenchNumTopic1, + ) + + rc := rangeConfig{size: 100, store: true} + crit := filters.FilterCriteria{ + Addresses: []common.Address{env.addrs[0]}, + Topics: [][]common.Hash{{env.topic0s[0]}}, + } + b.Run("query", func(b *testing.B) { + runFilterLogsBenchmark(b, env, backend, rc, 16, crit) + }) + }) + } +} + +// This takes very long to run, use regex to run specific benchmarks, example: +// Example: go test -run='^$' -bench 'BenchmarkReceiptRead/.*/range=100_store/concurrency=4/filter=address.topic0' -benchtime=3x -count=1 -timeout=30m ./sei-db/ledger_db/receipt/ +func BenchmarkReceiptRead(b *testing.B) { + backends := []string{receiptBackendPebble, receiptBackendParquet} + + for _, backend := range backends { + b.Run(backend, func(b *testing.B) { + env := setupReadBenchmark(b, backend, + readBenchWriteBlocks, readBenchReceiptsPerBlock, + readBenchNumAddresses, readBenchNumTopic0, readBenchNumTopic1, + ) + + fmt.Printf("[bench %s] starting read benchmarks ...\n", backend) + + for _, rc := range readBenchRanges { + label := fmt.Sprintf("range=%d", rc.size) + if rc.store { + label += "_store" + } else { + label += "_cache" + } + b.Run(label, func(b *testing.B) { + for _, conc := range readBenchConcurrences { + b.Run(fmt.Sprintf("concurrency=%d", conc), func(b *testing.B) { + for _, fc := range readBenchFilters { + crit := fc.crit(env) + b.Run(fmt.Sprintf("filter=%s", fc.name), func(b *testing.B) { + runFilterLogsBenchmark(b, env, backend, rc, conc, crit) + }) + } + }) + } + }) + } + }) + } +} + +// runFilterLogsBenchmark executes b.N FilterLogs queries spread across +// the given number of concurrent goroutines. +// +// rc.store=false → target the most recent blocks (cache hits). +// rc.store=true → target early blocks outside the cache window (store reads). +func runFilterLogsBenchmark(b *testing.B, env *readBenchEnv, backend string, rc rangeConfig, concurrency int, crit filters.FilterCriteria) { + b.Helper() + + var fromBlock, toBlock uint64 + if rc.store { + fromBlock = 1 + toBlock = uint64(rc.size) + } else { + toBlock = uint64(env.blocks) + fromBlock = toBlock - uint64(rc.size) + 1 + } + + cacheHint := "cache" + if rc.store { + cacheHint = "store" + } + fmt.Printf("[bench %s] FilterLogs blocks=[%d,%d] concurrency=%d filter=%s (%s)\n", + backend, fromBlock, toBlock, concurrency, filterName(crit), cacheHint) + + b.ResetTimer() + + var remaining atomic.Int64 + remaining.Store(int64(b.N)) + + // Track the number of logs per query for reporting, -1 means not yet set + // using atomic int64 to avoid race conditions + var logsPerQuery atomic.Int64 + logsPerQuery.Store(-1) + + fmt.Printf("[bench %s] Starting benchmark queries\n", backend) + var wg sync.WaitGroup + for g := 0; g < concurrency; g++ { + wg.Add(1) + go func() { + defer wg.Done() + for remaining.Add(-1) >= 0 { + logs, qerr := execFilterLogs(env, fromBlock, toBlock, crit) + if qerr != nil { + b.Errorf("FilterLogs failed: %v", qerr) + return + } + logsPerQuery.CompareAndSwap(-1, int64(len(logs))) + } + }() + } + wg.Wait() + + b.StopTimer() + + elapsed := b.Elapsed() + lpq := logsPerQuery.Load() + if elapsed > 0 && b.N > 0 { + queriesPerSec := float64(b.N) / elapsed.Seconds() + b.ReportMetric(queriesPerSec, "queries/s") + + if lpq >= 0 { + b.ReportMetric(float64(lpq), "logs/query") + } + + if lpq > 0 { + totalLogs := int64(b.N) * lpq + nsPerLog := float64(elapsed.Nanoseconds()) / float64(totalLogs) + b.ReportMetric(nsPerLog, "ns/log") + } + } +} + +// execFilterLogs dispatches to the correct read path for the backend. +func execFilterLogs(env *readBenchEnv, fromBlock, toBlock uint64, crit filters.FilterCriteria) ([]*ethtypes.Log, error) { + if env.isPebble { + return pebbleFilterLogs(env.store, env.ctx, fromBlock, toBlock, env.idx, crit) + } + return env.store.FilterLogs(env.ctx, fromBlock, toBlock, crit) +} + +// readBenchEnv holds everything produced by the write phase that the read +// benchmarks need. +type readBenchEnv struct { + store ReceiptStore + ctx sdk.Context + idx *readBenchIndex + addrs []common.Address + topic0s []common.Hash + topic1s []common.Hash + blocks int + isPebble bool +} + +// setupReadBenchmark writes diverse receipt data through the cached layer and +// returns an environment ready for read benchmarks. The write phase is not +// timed. backend must be "pebble" or "parquet". +func setupReadBenchmark(b *testing.B, backend string, blocks, receiptsPerBlock, numAddrs, numTopic0, numTopic1 int) *readBenchEnv { + b.Helper() + + ctx, storeKey := newTestContext() + + cfg := dbconfig.DefaultReceiptStoreConfig() + cfg.DBDirectory = b.TempDir() + cfg.KeepRecent = 0 + cfg.PruneIntervalSeconds = 0 + cfg.Backend = backend + + store, err := NewReceiptStore(dbLogger.NewNopLogger(), cfg, storeKey) + if err != nil { + b.Fatalf("failed to create %s receipt store: %v", backend, err) + } + b.Cleanup(func() { _ = store.Close() }) + + cached, ok := store.(*cachedReceiptStore) + if !ok { + b.Fatalf("expected cachedReceiptStore, got %T", store) + } + + // For parquet, set a reasonable flush interval so data lands in parquet files. + if backend == receiptBackendParquet { + pqs, ok := cached.backend.(*parquetReceiptStore) + if !ok { + b.Fatalf("expected parquetReceiptStore backend, got %T", cached.backend) + } + pqs.store.SetBlockFlushInterval(100) + } + + addrs := addressPool(numAddrs) + t0s := topicPool(numTopic0, 0x01) + t1s := topicPool(numTopic1, 0x02) + idx := newReadBenchIndex() + + fmt.Printf("[setup %s] writing %d blocks x %d receipts/block ...\n", backend, blocks, receiptsPerBlock) + + logInterval := blocks / 5 // log at ~20%, 40%, 60%, 80%, 100% + if logInterval == 0 { + logInterval = 1 + } + + var seed uint64 + for block := 0; block < blocks; block++ { + blockNumber := uint64(block + 1) + batch := makeDiverseReceiptBatch(blockNumber, receiptsPerBlock, seed, addrs, t0s, t1s, idx) + if err := store.SetReceipts(ctx.WithBlockHeight(int64(blockNumber)), batch); err != nil { + b.Fatalf("failed to write block %d: %v", blockNumber, err) + } + seed += uint64(receiptsPerBlock) + + if (block+1)%logInterval == 0 { + fmt.Printf("[setup %s] wrote block %d/%d (%.0f%%)\n", backend, block+1, blocks, float64(block+1)/float64(blocks)*100) + } + } + + fmt.Printf("[setup %s] flushing async writes ...\n", backend) + + // Drain all async writes so the underlying store is fully populated. + switch backend { + case receiptBackendPebble: + rs, ok := cached.backend.(*receiptStore) + if !ok { + b.Fatalf("expected receiptStore backend, got %T", cached.backend) + } + rs.db.WaitForPendingWrites() + case receiptBackendParquet: + pqs, ok := cached.backend.(*parquetReceiptStore) + if !ok { + b.Fatalf("expected parquetReceiptStore backend, got %T", cached.backend) + } + if err := pqs.store.Flush(); err != nil { + b.Fatalf("failed to flush parquet: %v", err) + } + } + + fmt.Printf("[setup %s] done (%d blocks x %d receipts = %d total)\n", backend, blocks, receiptsPerBlock, blocks*receiptsPerBlock) + + return &readBenchEnv{ + store: store, + ctx: ctx, + idx: idx, + addrs: addrs, + topic0s: t0s, + topic1s: t1s, + blocks: blocks, + isPebble: backend == receiptBackendPebble, + } +} + +// pebbleFilterLogs mirrors the production evmrpc fallback path for pebble: +// iterate each block in [fromBlock, toBlock], use the block-level bloom to skip +// non-matching blocks, use per-tx bloom to skip non-matching receipts, then +// exact-match filter the remaining logs. +func pebbleFilterLogs( + store ReceiptStore, + ctx sdk.Context, + fromBlock, toBlock uint64, + txIndex *readBenchIndex, + crit filters.FilterCriteria, +) ([]*ethtypes.Log, error) { + hasFilters := len(crit.Addresses) > 0 || len(crit.Topics) > 0 + var filterIdxs [][]ethbloom.BloomIndexes + if hasFilters { + filterIdxs = ethbloom.EncodeFilters(crit.Addresses, crit.Topics) + } + + var result []*ethtypes.Log + for block := fromBlock; block <= toBlock; block++ { + if hasFilters { + blockBloom := txIndex.blockBlooms[block] + if blockBloom != (ethtypes.Bloom{}) && !ethbloom.MatchFilters(blockBloom, filterIdxs) { + continue + } + } + + hashes := txIndex.blockTxHashes[block] + var logStartIndex uint + for _, txHash := range hashes { + receipt, err := store.GetReceipt(ctx, txHash) + if err != nil { + return nil, fmt.Errorf("block %d tx %s: %w", block, txHash.Hex(), err) + } + + if hasFilters && len(receipt.LogsBloom) > 0 { + txBloom := ethtypes.Bloom(receipt.LogsBloom) + if !ethbloom.MatchFilters(txBloom, filterIdxs) { + logStartIndex += uint(len(receipt.Logs)) + continue + } + } + + txLogs := getLogsForTx(receipt, logStartIndex) + logStartIndex += uint(len(txLogs)) + for _, lg := range txLogs { + if ethbloom.MatchesCriteria(lg, crit) { + result = append(result, lg) + } + } + } + } + return result, nil +} + +// readBenchIndex tracks block-to-txHash mappings and block-level bloom filters +// built during the write phase. +type readBenchIndex struct { + blockTxHashes map[uint64][]common.Hash + blockBlooms map[uint64]ethtypes.Bloom +} + +func newReadBenchIndex() *readBenchIndex { + return &readBenchIndex{ + blockTxHashes: make(map[uint64][]common.Hash), + blockBlooms: make(map[uint64]ethtypes.Bloom), + } +} + +// record indexes a single receipt's tx hash for its block. +func (idx *readBenchIndex) record(blockNumber uint64, txHash common.Hash) { + idx.blockTxHashes[blockNumber] = append(idx.blockTxHashes[blockNumber], txHash) +} + +// addressPool generates a deterministic set of unique contract addresses. +func addressPool(n int) []common.Address { + pool := make([]common.Address, n) + for i := range pool { + var buf [20]byte + binary.BigEndian.PutUint64(buf[12:], uint64(i)+1) + buf[0] = 0xAA // prefix to distinguish from topic hashes + pool[i] = common.BytesToAddress(buf[:]) + } + return pool +} + +// topicPool generates a deterministic set of unique topic hashes. +// The prefix byte differentiates pools (e.g. 0x01 for event sigs, 0x02 for indexed params). +func topicPool(n int, prefix byte) []common.Hash { + pool := make([]common.Hash, n) + for i := range pool { + var buf [32]byte + buf[0] = prefix + binary.BigEndian.PutUint64(buf[24:], uint64(i)+1) + pool[i] = common.BytesToHash(buf[:]) + } + return pool +} + +// makeDiverseReceiptBatch creates a batch of receipts with varied addresses and +// topics drawn from the provided pools. Assignment is deterministic based on +// seed so results are reproducible. +// +// Each receipt gets exactly 1 log with: +// - Address: addrs[pick(seed+i, len(addrs))] +// - Topic[0]: topic0s[pick(seed+i, len(topic0s))] (event signature) +// - Topic[1]: topic1s[pick(seed+i, len(topic1s))] (indexed param) +// +// The function also records every log into idx for later correctness checking. +func makeDiverseReceiptBatch( + blockNumber uint64, + count int, + seed uint64, + addrs []common.Address, + topic0s []common.Hash, + topic1s []common.Hash, + idx *readBenchIndex, +) []ReceiptRecord { + records := make([]ReceiptRecord, count) + + logData := make([]byte, 32) + var blockBloom ethtypes.Bloom + + for i := 0; i < count; i++ { + txHash := hashFromUint64(seed + uint64(i)) + + base := seed*3 + uint64(i) + addr := addrs[pick(base, len(addrs))] + t0 := topic0s[pick(base+1, len(topic0s))] + t1 := topic1s[pick(base+2, len(topic1s))] + + receipt := &types.Receipt{ + TxHashHex: txHash.Hex(), + BlockNumber: blockNumber, + TransactionIndex: uint32(i), + GasUsed: 52000, + CumulativeGasUsed: uint64(52000 * (i + 1)), + Status: 1, + Logs: []*types.Log{ + { + Address: addr.Hex(), + Topics: []string{t0.Hex(), t1.Hex()}, + Data: logData, + Index: 0, + }, + }, + } + + txBloom := ethtypes.CreateBloom(ðtypes.Receipt{Logs: getLogsForTx(receipt, 0)}) + receipt.LogsBloom = txBloom.Bytes() + for j := range blockBloom { + blockBloom[j] |= txBloom[j] + } + + receiptBytes, err := receipt.Marshal() + if err != nil { + panic(fmt.Sprintf("makeDiverseReceiptBatch: marshal failed: %v", err)) + } + + records[i] = ReceiptRecord{ + TxHash: txHash, + Receipt: receipt, + ReceiptBytes: receiptBytes, + } + + if idx != nil { + idx.record(blockNumber, txHash) + } + } + + if idx != nil { + idx.blockBlooms[blockNumber] = blockBloom + } + + return records +} + +// filterName returns a human-readable label for a FilterCriteria. +func filterName(crit filters.FilterCriteria) string { + hasAddr := len(crit.Addresses) > 0 + hasTopic := len(crit.Topics) > 0 && len(crit.Topics[0]) > 0 + switch { + case hasAddr && hasTopic: + return "address+topic0" + case hasAddr: + return "address" + case hasTopic: + return "topic0" + default: + return "none" + } +} + +// pick returns a deterministic index in [0, poolSize) derived from val. +func pick(val uint64, poolSize int) int { + return int(val % uint64(poolSize)) //nolint:gosec +} diff --git a/sei-db/ledger_db/receipt/receipt_bench_test.go b/sei-db/ledger_db/receipt/receipt_bench_test.go new file mode 100644 index 0000000000..30dde1a5b7 --- /dev/null +++ b/sei-db/ledger_db/receipt/receipt_bench_test.go @@ -0,0 +1,140 @@ +package receipt + +import ( + "encoding/binary" + "fmt" + "testing" + + "github.com/ethereum/go-ethereum/common" + dbLogger "github.com/sei-protocol/sei-chain/sei-db/common/logger" + dbconfig "github.com/sei-protocol/sei-chain/sei-db/config" + "github.com/sei-protocol/sei-chain/sei-db/proto" + iavl "github.com/sei-protocol/sei-chain/sei-iavl" + "github.com/sei-protocol/sei-chain/x/evm/types" +) + +// BenchmarkReceiptWriteAsync compares async write throughput between pebble and parquet. +func BenchmarkReceiptWriteAsync(b *testing.B) { + const ( + blocks = 5000 // 5k + receiptsPerBlock = 3000 + ) + b.Run(fmt.Sprintf("blocks=%d/per_block=%d", blocks, receiptsPerBlock), func(b *testing.B) { + b.Run("pebble-async-with-wal", func(b *testing.B) { + benchmarkPebbleWriteAsync(b, receiptsPerBlock, blocks) + }) + b.Run("parquet-async-with-wal", func(b *testing.B) { + benchmarkParquetWriteAsync(b, receiptsPerBlock, blocks) + }) + }) +} + +func benchmarkPebbleWriteAsync(b *testing.B, receiptsPerBlock int, blocks int) { + b.Helper() + + _, storeKey := newTestContext() + cfg := dbconfig.DefaultReceiptStoreConfig() + cfg.DBDirectory = b.TempDir() + cfg.KeepRecent = 1000 + cfg.PruneIntervalSeconds = 10 + cfg.Backend = receiptBackendPebble + + store, err := newReceiptBackend(dbLogger.NewNopLogger(), cfg, storeKey) + if err != nil { + b.Fatalf("failed to create receipt store: %v", err) + } + b.Cleanup(func() { _ = store.Close() }) + + rs, ok := store.(*receiptStore) + if !ok { + b.Fatalf("expected pebble receipt store, got %T", store) + } + + totalReceipts := receiptsPerBlock * blocks + addrs := addressPool(5) + t0s := topicPool(3, 0x01) + t1s := topicPool(3, 0x02) + batch := makeDiverseReceiptBatch(1, receiptsPerBlock, 0, addrs, t0s, t1s, nil) + totalBytes := int64(len(batch[0].ReceiptBytes) * totalReceipts) + + // Get Pebble metrics before writing mostly to track compaction and flush counts. + before := rs.db.PebbleMetrics() + beforeCompactCount := int64(before.Compact.Count) + beforeCompactDuration := before.Compact.Duration.Seconds() + beforeFlushCount := int64(before.Flush.Count) + beforeFlushBytes := int64(before.Flush.WriteThroughput.Bytes) + + b.SetBytes(totalBytes) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + for block := 0; block < blocks; block++ { + blockNumber := int64(i*blocks + block + 1) + if err := applyReceiptsAsync(rs, blockNumber, batch); err != nil { + b.Fatalf("failed to write receipts: %v", err) + } + if (block+1)%1000 == 0 { + fmt.Printf("pebble: block %d/%d (%.1f%%)\n", block+1, blocks, float64(block+1)/float64(blocks)*100) + } + } + } + rs.db.WaitForPendingWrites() + b.StopTimer() + + // Get Pebble metrics after writing to track compaction and flush counts. + after := rs.db.PebbleMetrics() + afterCompactCount := int64(after.Compact.Count) + afterCompactDuration := after.Compact.Duration.Seconds() + afterFlushCount := int64(after.Flush.Count) + afterFlushBytes := int64(after.Flush.WriteThroughput.Bytes) + b.ReportMetric(float64(afterCompactCount-beforeCompactCount), "compactions") + b.ReportMetric(afterCompactDuration-beforeCompactDuration, "compaction_s") + b.ReportMetric(float64(afterFlushCount-beforeFlushCount), "flushes") + b.ReportMetric(float64(afterFlushBytes-beforeFlushBytes), "flush_bytes") + + reportBenchMetrics(b, totalReceipts, totalBytes, blocks) +} + +// applyReceiptsAsync writes receipts to pebble with async durability. +func applyReceiptsAsync(store *receiptStore, version int64, receipts []ReceiptRecord) error { + pairs := make([]*iavl.KVPair, 0, len(receipts)) + for _, record := range receipts { + if record.Receipt == nil { + continue + } + kvPair := &iavl.KVPair{ + Key: types.ReceiptKey(record.TxHash), + Value: record.ReceiptBytes, + } + pairs = append(pairs, kvPair) + } + + ncs := &proto.NamedChangeSet{ + Name: types.ReceiptStoreKey, + Changeset: iavl.ChangeSet{Pairs: pairs}, + } + return store.db.ApplyChangesetAsync(version, []*proto.NamedChangeSet{ncs}) +} + +func hashFromUint64(value uint64) common.Hash { + var buf [32]byte + binary.BigEndian.PutUint64(buf[24:], value) + return common.BytesToHash(buf[:]) +} + +func reportBenchMetrics(b *testing.B, totalReceipts int, totalBytes int64, blocks int) { + b.Helper() + elapsed := b.Elapsed() + if elapsed > 0 && b.N > 0 { + perOpSeconds := elapsed.Seconds() / float64(b.N) + if perOpSeconds > 0 { + receiptsPerSecond := float64(totalReceipts) / perOpSeconds + b.ReportMetric(receiptsPerSecond, "receipts/s") + bytesPerSecond := float64(totalBytes) / perOpSeconds + b.ReportMetric(bytesPerSecond, "bytes/s") + } + } + b.ReportMetric(float64(totalReceipts), "receipts/op") + b.ReportMetric(float64(totalBytes), "bytes/op") + b.ReportMetric(float64(blocks), "blocks/op") +} diff --git a/sei-db/ledger_db/receipt/receipt_store.go b/sei-db/ledger_db/receipt/receipt_store.go index 6c461c7e76..edff027f8a 100644 --- a/sei-db/ledger_db/receipt/receipt_store.go +++ b/sei-db/ledger_db/receipt/receipt_store.go @@ -54,6 +54,7 @@ type receiptStore struct { db *mvcc.Database storeKey sdk.StoreKey stopPruning chan struct{} + pruneWg sync.WaitGroup closeOnce sync.Once } @@ -111,13 +112,13 @@ func newReceiptBackend(log dbLogger.Logger, config dbconfig.ReceiptStoreConfig, _ = db.Close() return nil, err } - stopPruning := make(chan struct{}) - startReceiptPruning(log, db, int64(ssConfig.KeepRecent), int64(ssConfig.PruneIntervalSeconds), stopPruning) - return &receiptStore{ + rs := &receiptStore{ db: db, storeKey: storeKey, - stopPruning: stopPruning, - }, nil + stopPruning: make(chan struct{}), + } + startReceiptPruning(log, db, int64(ssConfig.KeepRecent), int64(ssConfig.PruneIntervalSeconds), rs.stopPruning, &rs.pruneWg) + return rs, nil default: return nil, fmt.Errorf("unsupported receipt store backend: %s", config.Backend) } @@ -236,10 +237,10 @@ func (s *receiptStore) FilterLogs(_ sdk.Context, _, _ uint64, _ filters.FilterCr func (s *receiptStore) Close() error { var err error s.closeOnce.Do(func() { - // Signal the pruning goroutine to stop if s.stopPruning != nil { close(s.stopPruning) } + s.pruneWg.Wait() err = s.db.Close() }) return err @@ -298,12 +299,21 @@ func recoverReceiptStore(log dbLogger.Logger, changelogPath string, db *mvcc.Dat return nil } -func startReceiptPruning(log dbLogger.Logger, db *mvcc.Database, keepRecent int64, pruneInterval int64, stopCh <-chan struct{}) { +func startReceiptPruning(log dbLogger.Logger, db *mvcc.Database, keepRecent int64, pruneInterval int64, stopCh <-chan struct{}, wg *sync.WaitGroup) { if keepRecent <= 0 || pruneInterval <= 0 { return } + wg.Add(1) go func() { + defer wg.Done() for { + select { + case <-stopCh: + log.Info("Receipt store pruning goroutine stopped") + return + default: + } + pruneStartTime := time.Now() latestVersion := db.GetLatestVersion() pruneVersion := latestVersion - keepRecent