Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions dash-spv/src/network/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,21 @@ impl PeerNetworkManager {
}
});
}
Some(NetworkRequest::BroadcastMessage(msg)) => {
tracing::debug!("Request processor: broadcasting {}", msg.cmd());
let this = this.clone();
tokio::spawn(async move {
let results = this.broadcast(msg).await;
let failures = results.iter().filter(|r| r.is_err()).count();
if failures > 0 {
tracing::warn!(
"Request processor: broadcast had {} failures out of {} peers",
failures,
results.len()
);
}
});
}
None => {
tracing::info!("Request processor: channel closed");
break;
Expand Down
9 changes: 9 additions & 0 deletions dash-spv/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ pub enum NetworkRequest {
SendMessage(NetworkMessage),
/// Send a message to a specific peer.
SendMessageToPeer(NetworkMessage, SocketAddr),
/// Broadcast a message to all connected peers.
BroadcastMessage(NetworkMessage),
}

/// Handle for managers to queue outgoing network requests.
Expand Down Expand Up @@ -81,6 +83,13 @@ impl RequestSender {
.map_err(|e| NetworkError::ProtocolError(e.to_string()))
}

/// Queue a message to be broadcast to all connected peers.
pub(crate) fn broadcast(&self, msg: NetworkMessage) -> NetworkResult<()> {
self.tx
.send(NetworkRequest::BroadcastMessage(msg))
.map_err(|e| NetworkError::ProtocolError(e.to_string()))
}

/// Request inventory from a specific peer.
pub fn request_inventory(
&self,
Expand Down
97 changes: 97 additions & 0 deletions dash-spv/src/sync/mempool/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};

use dashcore::ephemerealdata::instant_lock::InstantLock;
use dashcore::network::message::NetworkMessage;
use dashcore::network::message_blockdata::Inventory;
use dashcore::{Amount, Transaction, Txid};
use rand::seq::IteratorRandom;
Expand Down Expand Up @@ -42,6 +43,9 @@ const MAX_PENDING_IS_LOCKS: usize = 1000;
/// Covers the window where multiple peers respond to the initial `mempool` request.
const SEEN_TXID_EXPIRY: Duration = Duration::from_secs(180);

/// Per-transaction interval between rebroadcast attempts (10 minutes).
const REBROADCAST_INTERVAL: Duration = Duration::from_secs(600);

/// Mempool manager that monitors unconfirmed transactions from the P2P network.
///
/// Tracks connected peers via a unified map where:
Expand Down Expand Up @@ -436,6 +440,36 @@ impl<W: WalletInterface> MempoolManager<W> {
}
}

/// Rebroadcast unconfirmed self-sent transactions to all peers.
///
/// Each transaction in `recent_sends` tracks when it was last broadcast.
/// Transactions whose last broadcast was more than `REBROADCAST_INTERVAL`
/// ago are rebroadcast and their timestamp is reset.
pub(super) async fn rebroadcast_if_due(&mut self, requests: &RequestSender) {
self.rebroadcast_if_due_at(requests, Instant::now()).await
}

/// `now`-injected variant of [`Self::rebroadcast_if_due`]. Tests project `now`
/// forward instead of subtracting from `Instant::now()`, which underflows on
/// Windows when the QPC-based monotonic clock has a small value at boot.
async fn rebroadcast_if_due_at(&mut self, requests: &RequestSender, now: Instant) {
let mut count: usize = 0;
for (txid, last_broadcast) in &mut self.recent_sends {
if now.saturating_duration_since(*last_broadcast) < REBROADCAST_INTERVAL {
continue;
}
if let Some(unconfirmed) = self.transactions.get(txid) {
let _ = requests.broadcast(NetworkMessage::Tx(unconfirmed.transaction.clone()));
*last_broadcast = now;
Comment thread
xdustinface marked this conversation as resolved.
count += 1;
}
}

if count > 0 {
tracing::info!("Rebroadcast {} unconfirmed transaction(s) to all peers", count);
}
}

fn is_queued(&self, txid: &Txid) -> bool {
self.peers.values().filter_map(|v| v.as_ref()).any(|q| q.contains(txid))
}
Expand Down Expand Up @@ -1603,6 +1637,69 @@ mod tests {
);
}

#[tokio::test]
async fn test_rebroadcast_sends_old_recent_sends() {
let (mut manager, requests, mut rx) = create_test_manager();

let tx = Transaction {
version: 10,
lock_time: 0,
input: vec![],
output: vec![],
special_transaction_payload: None,
};
let txid = tx.txid();

let t0 = Instant::now();
let later = t0 + REBROADCAST_INTERVAL + Duration::from_secs(1);

manager.transactions.insert(
txid,
UnconfirmedTransaction::new(tx, Amount::from_sat(0), false, true, Vec::new(), -100_000),
);
manager.recent_sends.insert(txid, t0);

manager.rebroadcast_if_due_at(&requests, later).await;

// Should have sent a BroadcastMessage for the transaction
let msg = rx.try_recv().expect("expected a rebroadcast message");
assert!(
matches!(msg, NetworkRequest::BroadcastMessage(NetworkMessage::Tx(_))),
"expected BroadcastMessage(Tx), got {:?}",
msg
);

// Timestamp should be reset to `later`, so a second call at the same instant
// must not rebroadcast.
manager.rebroadcast_if_due_at(&requests, later).await;
assert!(rx.try_recv().is_err(), "should not rebroadcast immediately after reset");
}

#[tokio::test]
async fn test_rebroadcast_skips_recent_transactions() {
let (mut manager, requests, mut rx) = create_test_manager();

let tx = Transaction {
version: 11,
lock_time: 0,
input: vec![],
output: vec![],
special_transaction_payload: None,
};
let txid = tx.txid();

// Add a transaction that was just sent (within the rebroadcast interval)
manager.transactions.insert(
txid,
UnconfirmedTransaction::new(tx, Amount::from_sat(0), false, true, Vec::new(), -50_000),
);
manager.recent_sends.insert(txid, Instant::now());

manager.rebroadcast_if_due(&requests).await;

assert!(rx.try_recv().is_err(), "recently sent transactions should not be rebroadcast");
}

#[test]
fn test_peer_disconnect_keeps_other_peers_intact() {
let (mut manager, _requests, _rx) = create_test_manager();
Expand Down
3 changes: 3 additions & 0 deletions dash-spv/src/sync/mempool/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ impl<W: WalletInterface + 'static> SyncManager for MempoolManager<W> {
// Send queued getdata requests now that slots may have freed up
self.send_queued(requests).await?;

// Rebroadcast unconfirmed self-sent transactions on a randomized interval
self.rebroadcast_if_due(requests).await;

// Rebuild bloom filter if the wallet's monitored set has changed.
//
// We poll the revision counter rather than using push-based wallet events
Expand Down
Loading