diff --git a/dash-spv/src/network/manager.rs b/dash-spv/src/network/manager.rs index f1beaabd4..f5927b7b8 100644 --- a/dash-spv/src/network/manager.rs +++ b/dash-spv/src/network/manager.rs @@ -822,6 +822,21 @@ impl PeerNetworkManager { } }); } + Some(NetworkRequest::BroadcastMessage(msg)) => { + tracing::debug!("Request processor: broadcasting {}", msg.cmd()); + let this = this.clone(); + tokio::spawn(async move { + let results = this.broadcast(msg).await; + let failures = results.iter().filter(|r| r.is_err()).count(); + if failures > 0 { + tracing::warn!( + "Request processor: broadcast had {} failures out of {} peers", + failures, + results.len() + ); + } + }); + } None => { tracing::info!("Request processor: channel closed"); break; diff --git a/dash-spv/src/network/mod.rs b/dash-spv/src/network/mod.rs index b4f2b2866..a428ddde1 100644 --- a/dash-spv/src/network/mod.rs +++ b/dash-spv/src/network/mod.rs @@ -47,6 +47,8 @@ pub enum NetworkRequest { SendMessage(NetworkMessage), /// Send a message to a specific peer. SendMessageToPeer(NetworkMessage, SocketAddr), + /// Broadcast a message to all connected peers. + BroadcastMessage(NetworkMessage), } /// Handle for managers to queue outgoing network requests. @@ -81,6 +83,13 @@ impl RequestSender { .map_err(|e| NetworkError::ProtocolError(e.to_string())) } + /// Queue a message to be broadcast to all connected peers. + pub(crate) fn broadcast(&self, msg: NetworkMessage) -> NetworkResult<()> { + self.tx + .send(NetworkRequest::BroadcastMessage(msg)) + .map_err(|e| NetworkError::ProtocolError(e.to_string())) + } + /// Request inventory from a specific peer. pub fn request_inventory( &self, diff --git a/dash-spv/src/sync/mempool/manager.rs b/dash-spv/src/sync/mempool/manager.rs index 333894220..fc8fd23ea 100644 --- a/dash-spv/src/sync/mempool/manager.rs +++ b/dash-spv/src/sync/mempool/manager.rs @@ -11,6 +11,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use dashcore::ephemerealdata::instant_lock::InstantLock; +use dashcore::network::message::NetworkMessage; use dashcore::network::message_blockdata::Inventory; use dashcore::{Amount, Transaction, Txid}; use rand::seq::IteratorRandom; @@ -42,6 +43,9 @@ const MAX_PENDING_IS_LOCKS: usize = 1000; /// Covers the window where multiple peers respond to the initial `mempool` request. const SEEN_TXID_EXPIRY: Duration = Duration::from_secs(180); +/// Per-transaction interval between rebroadcast attempts (10 minutes). +const REBROADCAST_INTERVAL: Duration = Duration::from_secs(600); + /// Mempool manager that monitors unconfirmed transactions from the P2P network. /// /// Tracks connected peers via a unified map where: @@ -436,6 +440,36 @@ impl MempoolManager { } } + /// Rebroadcast unconfirmed self-sent transactions to all peers. + /// + /// Each transaction in `recent_sends` tracks when it was last broadcast. + /// Transactions whose last broadcast was more than `REBROADCAST_INTERVAL` + /// ago are rebroadcast and their timestamp is reset. + pub(super) async fn rebroadcast_if_due(&mut self, requests: &RequestSender) { + self.rebroadcast_if_due_at(requests, Instant::now()).await + } + + /// `now`-injected variant of [`Self::rebroadcast_if_due`]. Tests project `now` + /// forward instead of subtracting from `Instant::now()`, which underflows on + /// Windows when the QPC-based monotonic clock has a small value at boot. + async fn rebroadcast_if_due_at(&mut self, requests: &RequestSender, now: Instant) { + let mut count: usize = 0; + for (txid, last_broadcast) in &mut self.recent_sends { + if now.saturating_duration_since(*last_broadcast) < REBROADCAST_INTERVAL { + continue; + } + if let Some(unconfirmed) = self.transactions.get(txid) { + let _ = requests.broadcast(NetworkMessage::Tx(unconfirmed.transaction.clone())); + *last_broadcast = now; + count += 1; + } + } + + if count > 0 { + tracing::info!("Rebroadcast {} unconfirmed transaction(s) to all peers", count); + } + } + fn is_queued(&self, txid: &Txid) -> bool { self.peers.values().filter_map(|v| v.as_ref()).any(|q| q.contains(txid)) } @@ -1603,6 +1637,69 @@ mod tests { ); } + #[tokio::test] + async fn test_rebroadcast_sends_old_recent_sends() { + let (mut manager, requests, mut rx) = create_test_manager(); + + let tx = Transaction { + version: 10, + lock_time: 0, + input: vec![], + output: vec![], + special_transaction_payload: None, + }; + let txid = tx.txid(); + + let t0 = Instant::now(); + let later = t0 + REBROADCAST_INTERVAL + Duration::from_secs(1); + + manager.transactions.insert( + txid, + UnconfirmedTransaction::new(tx, Amount::from_sat(0), false, true, Vec::new(), -100_000), + ); + manager.recent_sends.insert(txid, t0); + + manager.rebroadcast_if_due_at(&requests, later).await; + + // Should have sent a BroadcastMessage for the transaction + let msg = rx.try_recv().expect("expected a rebroadcast message"); + assert!( + matches!(msg, NetworkRequest::BroadcastMessage(NetworkMessage::Tx(_))), + "expected BroadcastMessage(Tx), got {:?}", + msg + ); + + // Timestamp should be reset to `later`, so a second call at the same instant + // must not rebroadcast. + manager.rebroadcast_if_due_at(&requests, later).await; + assert!(rx.try_recv().is_err(), "should not rebroadcast immediately after reset"); + } + + #[tokio::test] + async fn test_rebroadcast_skips_recent_transactions() { + let (mut manager, requests, mut rx) = create_test_manager(); + + let tx = Transaction { + version: 11, + lock_time: 0, + input: vec![], + output: vec![], + special_transaction_payload: None, + }; + let txid = tx.txid(); + + // Add a transaction that was just sent (within the rebroadcast interval) + manager.transactions.insert( + txid, + UnconfirmedTransaction::new(tx, Amount::from_sat(0), false, true, Vec::new(), -50_000), + ); + manager.recent_sends.insert(txid, Instant::now()); + + manager.rebroadcast_if_due(&requests).await; + + assert!(rx.try_recv().is_err(), "recently sent transactions should not be rebroadcast"); + } + #[test] fn test_peer_disconnect_keeps_other_peers_intact() { let (mut manager, _requests, _rx) = create_test_manager(); diff --git a/dash-spv/src/sync/mempool/sync_manager.rs b/dash-spv/src/sync/mempool/sync_manager.rs index e618ee0f9..806905224 100644 --- a/dash-spv/src/sync/mempool/sync_manager.rs +++ b/dash-spv/src/sync/mempool/sync_manager.rs @@ -117,6 +117,9 @@ impl SyncManager for MempoolManager { // Send queued getdata requests now that slots may have freed up self.send_queued(requests).await?; + // Rebroadcast unconfirmed self-sent transactions on a randomized interval + self.rebroadcast_if_due(requests).await; + // Rebuild bloom filter if the wallet's monitored set has changed. // // We poll the revision counter rather than using push-based wallet events