Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
de009ae
feat: add ledger cache layer for receipt store
jewei1997 Feb 2, 2026
215a286
feat: add parquet receipt store with DuckDB range queries
jewei1997 Feb 2, 2026
982cf16
fix: update fakeReceiptStore.FilterLogs to new signature
jewei1997 Feb 2, 2026
432bd51
fix: update fakeReceiptStore.FilterLogs to new signature
jewei1997 Feb 2, 2026
3da7c1c
rename flushBatch to fillCache
jewei1997 Feb 3, 2026
3f64159
Merge branch 'main' into ledger-cache-layer
jewei1997 Feb 3, 2026
75dd167
go bench receipts parquet vs pebble
jewei1997 Feb 3, 2026
eb54490
Merge branch 'main' into ledger-cache-layer
jewei1997 Feb 4, 2026
6aa9c6b
fix test
jewei1997 Feb 4, 2026
a9dd371
make parquet its own package
jewei1997 Feb 4, 2026
4fe5492
Merge branch 'ledger-cache-layer' into parquet-receiptdb
jewei1997 Feb 4, 2026
e744ecd
fix build issue
jewei1997 Feb 4, 2026
9d1d18f
remove duckdb stub
jewei1997 Feb 4, 2026
2c364bc
remove parquet enabled
jewei1997 Feb 4, 2026
e807617
fix go lint
jewei1997 Feb 4, 2026
d6f9cdc
receipt: allow pre-marshaled bytes and expand parquet benchmarks
jewei1997 Feb 5, 2026
4b294d9
feat: optimize WAL write path with binary encoding and batched writes
jewei1997 Feb 9, 2026
69423d9
feat: add store helpers and pebble pre-marshaled bytes support
jewei1997 Feb 9, 2026
793dc6b
Merge branch 'parquet-receiptdb' into go-bench-receipts-unoptimized
jewei1997 Feb 9, 2026
6f87c37
fixes like waiting for all writes to flush
jewei1997 Feb 23, 2026
954f7af
fixed up parquet writing bench
jewei1997 Feb 23, 2026
d9b0832
add read benchmark
jewei1997 Feb 23, 2026
b8a5dce
increase concurrency in standalone read bench
jewei1997 Feb 23, 2026
fc364a2
Merge branch 'main' into go-bench-receipts-unoptimized
jewei1997 Feb 23, 2026
d50cf78
delete dead code
jewei1997 Feb 23, 2026
6318fbf
remove dead code
jewei1997 Feb 23, 2026
a477943
refactor
jewei1997 Feb 23, 2026
8ff7fdb
add bloom.go
jewei1997 Feb 23, 2026
01860a9
revert unnecessary changes
jewei1997 Feb 23, 2026
db2f5d1
Merge branch 'main' into go-bench-receipts-unoptimized
jewei1997 Feb 23, 2026
9babd0f
refactor
jewei1997 Feb 23, 2026
98a56be
Merge branch 'main' into go-bench-receipts-unoptimized
jewei1997 Feb 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 8 additions & 81 deletions evmrpc/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,91 +3,18 @@ package evmrpc
import (
"github.com/ethereum/go-ethereum/common"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/sei-protocol/sei-chain/evmrpc/ethbloom"
)

var BitMasks = [8]uint8{1, 2, 4, 8, 16, 32, 64, 128}

// bloomIndexes represents the bit indexes inside the bloom filter that belong
// to some key.
type bloomIndexes [3]uint

// calcBloomIndexes returns the bloom filter bit indexes belonging to the given key.
func calcBloomIndexes(b []byte) bloomIndexes {
b = crypto.Keccak256(b)

var idxs bloomIndexes
for i := 0; i < len(idxs); i++ {
idxs[i] = (uint(b[2*i])<<8)&2047 + uint(b[2*i+1])
}
return idxs
}
// BloomIndexes is re-exported for backward compatibility.
type BloomIndexes = ethbloom.BloomIndexes

// res: AND on outer level, OR on mid level, AND on inner level (i.e. all 3 bits)
func EncodeFilters(addresses []common.Address, topics [][]common.Hash) (res [][]bloomIndexes) {
filters := make([][][]byte, 1+len(topics))
if len(addresses) > 0 {
filter := make([][]byte, len(addresses))
for i, address := range addresses {
filter[i] = address.Bytes()
}
filters = append(filters, filter)
}
for _, topicList := range topics {
filter := make([][]byte, len(topicList))
for i, topic := range topicList {
filter[i] = topic.Bytes()
}
filters = append(filters, filter)
}
for _, filter := range filters {
// Gather the bit indexes of the filter rule, special casing the nil filter
if len(filter) == 0 {
continue
}
bloomBits := make([]bloomIndexes, len(filter))
for i, clause := range filter {
if clause == nil {
bloomBits = nil
break
}
bloomBits[i] = calcBloomIndexes(clause)
}
// Accumulate the filter rules if no nil rule was within
if bloomBits != nil {
res = append(res, bloomBits)
}
}
return
}

// TODO: parallelize if filters too large
func MatchFilters(bloom ethtypes.Bloom, filters [][]bloomIndexes) bool {
for _, filter := range filters {
if !matchFilter(bloom, filter) {
return false
}
}
return true
}
var BitMasks = [8]uint8{1, 2, 4, 8, 16, 32, 64, 128}

func matchFilter(bloom ethtypes.Bloom, filter []bloomIndexes) bool {
for _, possibility := range filter {
if matchBloomIndexes(bloom, possibility) {
return true
}
}
return false
func EncodeFilters(addresses []common.Address, topics [][]common.Hash) [][]BloomIndexes {
return ethbloom.EncodeFilters(addresses, topics)
}

func matchBloomIndexes(bloom ethtypes.Bloom, idx bloomIndexes) bool {
for _, bit := range idx {
// big endian
whichByte := bloom[ethtypes.BloomByteLength-1-bit/8]
mask := BitMasks[bit%8]
if whichByte&mask == 0 {
return false
}
}
return true
func MatchFilters(bloom ethtypes.Bloom, filters [][]BloomIndexes) bool {
return ethbloom.MatchFilters(bloom, filters)
}
126 changes: 126 additions & 0 deletions evmrpc/ethbloom/bloom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package ethbloom
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a refactor, not new code added


import (
"github.com/ethereum/go-ethereum/common"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/filters"
)

var bitMasks = [8]uint8{1, 2, 4, 8, 16, 32, 64, 128}

// BloomIndexes represents the bit indexes inside the bloom filter that belong
// to some key.
type BloomIndexes [3]uint

func calcBloomIndexes(b []byte) BloomIndexes {
b = crypto.Keccak256(b)

var idxs BloomIndexes
for i := 0; i < len(idxs); i++ {
idxs[i] = (uint(b[2*i])<<8)&2047 + uint(b[2*i+1])
}
return idxs
}

// EncodeFilters builds bloom-index slices from filter criteria.
// Result semantics: AND on outer level, OR on mid level, AND on inner level (all 3 bits).
func EncodeFilters(addresses []common.Address, topics [][]common.Hash) (res [][]BloomIndexes) {
filters := make([][][]byte, 1+len(topics))
if len(addresses) > 0 {
filter := make([][]byte, len(addresses))
for i, address := range addresses {
filter[i] = address.Bytes()
}
filters = append(filters, filter)
}
for _, topicList := range topics {
filter := make([][]byte, len(topicList))
for i, topic := range topicList {
filter[i] = topic.Bytes()
}
filters = append(filters, filter)
}
for _, filter := range filters {
if len(filter) == 0 {
continue
}
bloomBits := make([]BloomIndexes, len(filter))
for i, clause := range filter {
if clause == nil {
bloomBits = nil
break
}
bloomBits[i] = calcBloomIndexes(clause)
}
if bloomBits != nil {
res = append(res, bloomBits)
}
}
return
}

// MatchFilters returns true when bloom matches all filter groups.
func MatchFilters(bloom ethtypes.Bloom, filterGroups [][]BloomIndexes) bool {
for _, filter := range filterGroups {
if !matchFilter(bloom, filter) {
return false
}
}
return true
}

func matchFilter(bloom ethtypes.Bloom, filter []BloomIndexes) bool {
for _, possibility := range filter {
if matchBloomIndexes(bloom, possibility) {
return true
}
}
return false
}

func matchBloomIndexes(bloom ethtypes.Bloom, idx BloomIndexes) bool {
for _, bit := range idx {
whichByte := bloom[ethtypes.BloomByteLength-1-bit/8]
mask := bitMasks[bit%8]
if whichByte&mask == 0 {
return false
}
}
return true
}

// MatchesCriteria checks if a log matches the filter criteria (exact match).
func MatchesCriteria(log *ethtypes.Log, crit filters.FilterCriteria) bool {
if len(crit.Addresses) > 0 {
found := false
for _, addr := range crit.Addresses {
if log.Address == addr {
found = true
break
}
}
if !found {
return false
}
}
for i, topicList := range crit.Topics {
if len(topicList) == 0 {
continue
}
if i >= len(log.Topics) {
return false
}
found := false
for _, topic := range topicList {
if log.Topics[i] == topic {
found = true
break
}
}
if !found {
return false
}
}
return true
}
49 changes: 8 additions & 41 deletions evmrpc/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
ethrpc "github.com/ethereum/go-ethereum/rpc"
"github.com/hashicorp/golang-lru/v2/expirable"
evmrpcconfig "github.com/sei-protocol/sei-chain/evmrpc/config"
"github.com/sei-protocol/sei-chain/evmrpc/ethbloom"
"github.com/sei-protocol/sei-chain/sei-cosmos/client"
sdk "github.com/sei-protocol/sei-chain/sei-cosmos/types"
"github.com/sei-protocol/sei-chain/sei-db/ledger_db/receipt"
Expand Down Expand Up @@ -957,7 +958,7 @@ func (f *LogFetcher) collectLogs(block *coretypes.ResultBlock, crit filters.Filt

// Pre-encode bloom filter indexes for fast per-receipt filtering
hasFilters := len(crit.Addresses) != 0 || len(crit.Topics) != 0
var filterIndexes [][]bloomIndexes
var filterIndexes [][]BloomIndexes
if hasFilters {
filterIndexes = EncodeFilters(crit.Addresses, crit.Topics)
}
Expand Down Expand Up @@ -996,55 +997,21 @@ func (f *LogFetcher) collectLogs(block *coretypes.ResultBlock, crit filters.Filt
}
logIndex++

if !matchesCriteria(ethLog, crit) {
if !MatchesCriteria(ethLog, crit) {
continue
}
collector.Append(ethLog)
}
}
}

// matchesCriteria checks if a log matches the filter criteria
func matchesCriteria(log *ethtypes.Log, crit filters.FilterCriteria) bool {
// Check address filter
if len(crit.Addresses) > 0 {
found := false
for _, addr := range crit.Addresses {
if log.Address == addr {
found = true
break
}
}
if !found {
return false
}
}

// Check topics filter
for i, topicList := range crit.Topics {
if len(topicList) == 0 {
continue
}
if i >= len(log.Topics) {
return false
}
found := false
for _, topic := range topicList {
if log.Topics[i] == topic {
found = true
break
}
}
if !found {
return false
}
}

return true
// MatchesCriteria checks if a log matches the filter criteria.
func MatchesCriteria(log *ethtypes.Log, crit filters.FilterCriteria) bool {
return ethbloom.MatchesCriteria(log, crit)
}

// Optimized fetchBlocksByCrit with batch processing
func (f *LogFetcher) fetchBlocksByCrit(ctx context.Context, crit filters.FilterCriteria, lastToHeight int64, bloomIndexes [][]bloomIndexes) (chan *coretypes.ResultBlock, int64, bool, error) {
func (f *LogFetcher) fetchBlocksByCrit(ctx context.Context, crit filters.FilterCriteria, lastToHeight int64, bloomIndexes [][]BloomIndexes) (chan *coretypes.ResultBlock, int64, bool, error) {
if crit.BlockHash != nil {
// Check for invalid zero hash
zeroHash := common.Hash{}
Expand Down Expand Up @@ -1143,7 +1110,7 @@ func (f *LogFetcher) fetchBlocksByCrit(ctx context.Context, crit filters.FilterC
}

// Batch processing function for blocks
func (f *LogFetcher) processBatch(ctx context.Context, start, end int64, crit filters.FilterCriteria, bloomIndexes [][]bloomIndexes, res chan *coretypes.ResultBlock, errChan chan error) {
func (f *LogFetcher) processBatch(ctx context.Context, start, end int64, crit filters.FilterCriteria, bloomIndexes [][]BloomIndexes, res chan *coretypes.ResultBlock, errChan chan error) {
defer func() {
metrics.IncrementRpcRequestCounter("num_blocks_fetched", "blocks", true)
}()
Expand Down
32 changes: 26 additions & 6 deletions sei-db/db_engine/pebbledb/mvcc/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type Database struct {
type VersionedChangesets struct {
Version int64
Changesets []*proto.NamedChangeSet
Done chan struct{} // non-nil for barrier: closed when this entry is processed
}

func OpenDB(dataDir string, config config.StateStoreConfig) (*Database, error) {
Expand Down Expand Up @@ -216,6 +217,15 @@ func (db *Database) Close() error {
return err
}

// PebbleMetrics returns the underlying Pebble DB metrics for observability (e.g. compaction/flush counts).
// Returns nil if the database is closed.
func (db *Database) PebbleMetrics() *pebble.Metrics {
if db.storage == nil {
return nil
}
return db.storage.Metrics()
}

func (db *Database) SetLatestVersion(version int64) error {
if version < 0 {
return fmt.Errorf("version must be non-negative")
Expand Down Expand Up @@ -457,11 +467,6 @@ func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.Named
int64(len(db.pendingChanges)),
)
}()
// Add to pending changes first
db.pendingChanges <- VersionedChangesets{
Version: version,
Changesets: changesets,
}
// Write to WAL
if db.streamHandler != nil {
entry := proto.ChangelogEntry{
Expand All @@ -474,20 +479,35 @@ func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.Named
return err
}
}

// Add to pending changes first
db.pendingChanges <- VersionedChangesets{
Version: version,
Changesets: changesets,
}
return nil
}

func (db *Database) writeAsyncInBackground() {
defer db.asyncWriteWG.Done()
for nextChange := range db.pendingChanges {
if nextChange.Done != nil {
close(nextChange.Done)
continue
}
version := nextChange.Version
if err := db.ApplyChangesetSync(version, nextChange.Changesets); err != nil {
panic(err)
}
}
}

// WaitForPendingWrites waits for all pending writes to be processed
func (db *Database) WaitForPendingWrites() {
done := make(chan struct{})
db.pendingChanges <- VersionedChangesets{Done: done}
<-done
}

// Prune attempts to prune all versions up to and including the current version
// Get the range of keys, manually iterate over them and delete them
// We add a heuristic to skip over a module's keys during pruning if it hasn't been updated
Expand Down
Loading
Loading