-
Notifications
You must be signed in to change notification settings - Fork 863
Autobahn migrated from sei-v3 #2791
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
The latest Buf updates on your PR. Results from workflow Buf / buf (pull_request).
|
| for _, vs := range i.votes[lane].q[n].byHeader { | ||
| if len(vs) >= c.LaneQuorum() { | ||
| return types.NewLaneQC(vs[:c.LaneQuorum()]), true | ||
| } | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
| for lane := range i.votes { | ||
| lr := commitQC.LaneRange(lane) | ||
| i.votes[lr.Lane()].prune(lr.First()) | ||
| i.blocks[lr.Lane()].prune(lr.First()) | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
| for lane, bq := range inner.blocks { | ||
| for i := max(bq.first, r.next[lane]); i < bq.next; i++ { | ||
| batch = append(batch, bq.q[i].Msg().Block().Header()) | ||
| } | ||
| r.next[lane] = bq.next | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
| for _, v := range cv.byHash[h] { | ||
| votes = append(votes, v) | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
| for _, v := range pv.byHash[h] { | ||
| votes = append(votes, v) | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
| for _, qc := range m.laneQCs { | ||
| laneQCs = append(laneQCs, qc) | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
|
|
||
| // GenProposal generates a random Proposal. | ||
| func GenProposal(rng utils.Rng) *Proposal { | ||
| return newProposal(GenView(rng), time.Now(), utils.GenSlice(rng, GenLaneRange), utils.Some(GenAppProposal(rng))) |
Check warning
Code scanning / CodeQL
Calling the system time Warning test
| for _, l := range p.laneRanges { | ||
| laneRanges = append(laneRanges, l) | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
| pb "github.com/tendermint/tendermint/internal/autobahn/pb" | ||
| protoreflect "google.golang.org/protobuf/reflect/protoreflect" | ||
| protoimpl "google.golang.org/protobuf/runtime/protoimpl" | ||
| reflect "reflect" |
Check notice
Code scanning / CodeQL
Sensitive package import Note
| protoimpl "google.golang.org/protobuf/runtime/protoimpl" | ||
| reflect "reflect" | ||
| sync "sync" | ||
| unsafe "unsafe" |
Check notice
Code scanning / CodeQL
Sensitive package import Note
| } | ||
| time.Sleep(time.Second) | ||
| t.Logf("wait for the blocks used in test") | ||
| utils.OrPanic1(waitForBlock(ctx, primary, 3)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, why do we need to change this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test is flaky, it is not related to the autobahn in any sense.
| // Equivalent of `google.protobuf.Timestamp` but supports canonical encoding. | ||
| // See `google.protobuf.Timestamp` for more detailed specification. | ||
| message Timestamp { | ||
| option (hashable.hashable) = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: comment what this does?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is documented on the hashable package.
| } | ||
| } | ||
| } | ||
| // TODO(gprusak): this is counterintuitive asymmetric behavior: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I'm confused, didn't find this in v3 code, Is this migrated from some other repo?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No this is my own code. I've found a bug when plugging the mux into autobahn, which is a result of this inconsistency.
|
|
||
| // Ping implements pb.StreamAPIServer. | ||
| // Note that we use streaming RPC, because unary RPC apparently causes 10ms extra delay on avg (empirically tested). | ||
| func (x *Service) serverPing(ctx context.Context, server rpc.Server[API]) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And from here on is from stream/consensus/server.go?
Why don't we keep the original client/server separation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the client and server implementation are tightly coupled, given that we use streaming RPCs. However, the way I've distributed the code across the files is kinda arbitrary, we can cleanup it later. A lot of it will need to be moved around anyway.
| "github.com/tendermint/tendermint/internal/p2p/rpc" | ||
| ) | ||
|
|
||
| func (x *Service) serverStreamLaneProposals(ctx context.Context, server rpc.Server[API]) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So from here on it's stream/consensus/avail/server.go?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah
| }) | ||
| } | ||
|
|
||
| func (x *Service) clientStreamLaneProposals(ctx context.Context, c rpc.Client[API]) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: If we have to put client and server in the same file, can we always do client first or always do server first maybe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, this is a work-in-progress layout, definitely will need a cleanup.
| @@ -0,0 +1,33 @@ | |||
| package avail | |||
|
|
|||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since queue sounds like a general enough data structure, can you maybe comment why we need it defined in avail/, not at some more generic location?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my rule of thumb is to write general enough abstractions to avoid tangling logic. The question of visibility (where the code should be located, which packages should have access to it) is a totally separate question. So far this struct was under a lot of code churn, that's why it is in the package that is making use of it.
| @@ -0,0 +1,103 @@ | |||
| package avail | |||
|
|
|||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this file is new, can you maybe explain a little bit why it's organized this way? How many receiver instances do we plan to have for each kind?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is 1 receiver per connection currently. I wanted to separate autobahn from the transport layer. Autobahn api surface is way too large atm (number of receivers, custom receivers for each consensus message type, proper buffer sizes in rpcs, etc), it will require some work to simplify it. Or we can fallback to merge back the transport layer into autobahn (which is currently under p2p/giga). This is TBD.
| for _, v := range cv.byHash[h] { | ||
| votes = append(votes, v) | ||
| } | ||
| cv.qc.Store(utils.Some(types.NewCommitQC(votes))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we changing from cv.qc.Update to cv.qc.Load and cv.qc.Store now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank for reminding me. The AtomicWatch has evolved since I implemented it in sei-v3. Now it is separated into AtomicSend and AtomicRecv and does not provide atomic updated by default - you need to wrap AtomicSend into a mutex - it simplifies the AtomicSend api AND allows AtomicSend to share mutex with other data, which is usually very convenient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've left a bunch of TODOs, because this change introduces some race conditions. Let me address them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| return types.NewLaneQC(votes) | ||
| } | ||
|
|
||
| func TestCommitQC( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, I like that the helpers are all moved here.
| MaxGasPerBlock uint64 | ||
| MaxTxsPerBlock uint64 | ||
| MaxTxsPerSecond utils.Option[uint64] | ||
| MempoolSize uint64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still have a mempool in Giga?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the block production pipeline in giga needs to be designed and implemented. I've copied over the producer for reference.
| ) | ||
|
|
||
| // Sends a consensus message to the peer whenever atomic watch is updated. | ||
| func sendUpdates[T interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, this one is moved from stream/consensus/client.go?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
| inner := utils.Alloc(utils.NewAtomicSend(inner{})) | ||
| return &State{ | ||
| cfg: cfg, | ||
| // metrics: NewMetrics(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do plan to add metrics back right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we need to adapt them to the sei-chain metrics
Migrated consensus,avail,data,producer and types modules. Migrated corresponding tests. Replaced gRPC usage with custom rpc layer on top of mux. GigaRouter is still hardcoded to be disabled.
NOTE: there is a lot of missing parts and coverage, but this pr will allow us to stop development of autobahn in sei-v3, so that we can start making it production-ready in sei-chain.