Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/ant_protocol/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,12 @@ pub enum ChunkQuoteResponse {
quote: Vec<u8>,
/// `true` when the chunk already exists on this node (skip payment).
already_stored: bool,
/// Up to `CLOSE_GROUP_SIZE` peer IDs (raw 32-byte BLAKE3 hashes) this
/// node considers closest to the content address, **excluding itself**
/// (the local node is filtered out by the DHT query). Clients combine
/// these views from multiple nodes to verify close-group quorum before
/// paying.
close_group: Vec<[u8; 32]>,
},
/// Quote generation failed.
Error(ProtocolError),
Expand Down
6 changes: 6 additions & 0 deletions src/devnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ impl Devnet {
evm: evm_config,
cache_capacity: DEVNET_PAYMENT_CACHE_CAPACITY,
local_rewards_address: rewards_address,
local_peer_id: *identity.peer_id().as_bytes(),
};
let payment_verifier = PaymentVerifier::new(payment_config);
let metrics_tracker = QuotingMetricsTracker::new(DEVNET_INITIAL_RECORDS);
Expand All @@ -594,6 +595,7 @@ impl Devnet {
Arc::new(storage),
Arc::new(payment_verifier),
Arc::new(quote_generator),
None,
))
}

Expand Down Expand Up @@ -635,6 +637,10 @@ impl Devnet {
*node.state.write().await = NodeState::Running;

if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) {
// Inject P2P node into protocol handler for close-group lookups.
if protocol.set_p2p_node(Arc::clone(p2p)).is_err() {
warn!("P2P node already set on protocol handler for devnet node {index}");
}
let mut events = p2p.subscribe_events();
let p2p_clone = Arc::clone(p2p);
let protocol_clone = Arc::clone(protocol);
Expand Down
16 changes: 11 additions & 5 deletions src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,20 +118,23 @@ impl NodeBuilder {
None
};

// Wrap P2P node in Arc early so it can be shared with the protocol handler.
let p2p_node = Arc::new(p2p_node);

// Initialize ANT protocol handler for chunk storage and
// wire the fresh-write channel so PUTs trigger replication.
let (ant_protocol, fresh_write_rx) = if self.config.storage.enabled {
let (fresh_write_tx, fresh_write_rx) = tokio::sync::mpsc::unbounded_channel();
let mut protocol = Self::build_ant_protocol(&self.config, &identity).await?;
let mut protocol =
Self::build_ant_protocol(&self.config, &identity, Some(Arc::clone(&p2p_node)))
.await?;
protocol.set_fresh_write_sender(fresh_write_tx);
(Some(Arc::new(protocol)), Some(fresh_write_rx))
} else {
info!("Chunk storage disabled");
(None, None)
};

let p2p_arc = Arc::new(p2p_node);

// Initialize replication engine (if storage is enabled)
let replication_engine =
if let (Some(ref protocol), Some(fresh_rx)) = (&ant_protocol, fresh_write_rx) {
Expand All @@ -140,7 +143,7 @@ impl NodeBuilder {
let payment_verifier_arc = protocol.payment_verifier_arc();
match ReplicationEngine::new(
repl_config,
Arc::clone(&p2p_arc),
Arc::clone(&p2p_node),
storage_arc,
payment_verifier_arc,
&self.config.root_dir,
Expand All @@ -161,7 +164,7 @@ impl NodeBuilder {

let node = RunningNode {
config: self.config,
p2p_node: p2p_arc,
p2p_node,
shutdown,
events_tx,
events_rx: Some(events_rx),
Expand Down Expand Up @@ -352,6 +355,7 @@ impl NodeBuilder {
async fn build_ant_protocol(
config: &NodeConfig,
identity: &NodeIdentity,
p2p_node: Option<Arc<P2PNode>>,
) -> Result<AntProtocol> {
// Create LMDB storage
let storage_config = LmdbStorageConfig {
Expand Down Expand Up @@ -385,6 +389,7 @@ impl NodeBuilder {
},
cache_capacity: config.payment.cache_capacity,
local_rewards_address: rewards_address,
local_peer_id: *identity.peer_id().as_bytes(),
};
let payment_verifier = PaymentVerifier::new(payment_config);
let metrics_tracker = QuotingMetricsTracker::new(0);
Expand All @@ -398,6 +403,7 @@ impl NodeBuilder {
Arc::new(storage),
Arc::new(payment_verifier),
Arc::new(quote_generator),
p2p_node,
);

info!(
Expand Down
Loading
Loading