-
Notifications
You must be signed in to change notification settings - Fork 445
Defer ChainMonitor updates and persistence to flush() #4351
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
a7af611
e03d010
939f798
dcfb304
52081b6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1118,9 +1118,18 @@ where | |
| None => {}, | ||
| } | ||
|
|
||
| // We capture pending_operation_count inside the persistence branch to | ||
| // avoid a race: ChannelManager handlers queue deferred monitor ops | ||
| // before the persistence flag is set. Capturing outside would let us | ||
| // observe pending ops while the flag is still unset, causing us to | ||
| // flush monitor writes without persisting the ChannelManager. | ||
| // Declared before futures so it outlives the Joiner (drop order). | ||
| let pending_monitor_writes; | ||
|
|
||
| let mut futures = Joiner::new(); | ||
|
|
||
| if channel_manager.get_cm().get_and_clear_needs_persistence() { | ||
| pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); | ||
| log_trace!(logger, "Persisting ChannelManager..."); | ||
|
|
||
| let fut = async { | ||
|
|
@@ -1131,7 +1140,12 @@ where | |
| CHANNEL_MANAGER_PERSISTENCE_KEY, | ||
| channel_manager.get_cm().encode(), | ||
| ) | ||
| .await | ||
| .await?; | ||
|
|
||
| // Flush monitor operations that were pending before we persisted. New updates | ||
| // that arrived after are left for the next iteration. | ||
| chain_monitor.get_cm().flush(pending_monitor_writes, &logger); | ||
| Ok(()) | ||
| }; | ||
| // TODO: Once our MSRV is 1.68 we should be able to drop the Box | ||
| let mut fut = Box::pin(fut); | ||
|
|
@@ -1373,6 +1387,7 @@ where | |
| // After we exit, ensure we persist the ChannelManager one final time - this avoids | ||
| // some races where users quit while channel updates were in-flight, with | ||
| // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. | ||
| let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); | ||
| kv_store | ||
| .write( | ||
| CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, | ||
|
|
@@ -1381,6 +1396,10 @@ where | |
| channel_manager.get_cm().encode(), | ||
| ) | ||
| .await?; | ||
|
|
||
| // Flush monitor operations that were pending before final persistence. | ||
| chain_monitor.get_cm().flush(pending_monitor_writes, &logger); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Haven't tried it, but can we move this into the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved it into the fut. It only is long-running with sync persistence, and then it blocks a thread, but if there are multiple threads it may still improve parallelism? |
||
|
|
||
| if let Some(ref scorer) = scorer { | ||
| kv_store | ||
| .write( | ||
|
|
@@ -1684,7 +1703,17 @@ impl BackgroundProcessor { | |
| channel_manager.get_cm().timer_tick_occurred(); | ||
| last_freshness_call = Instant::now(); | ||
| } | ||
|
|
||
| // We capture pending_operation_count inside the persistence | ||
| // branch to avoid a race: ChannelManager handlers queue | ||
| // deferred monitor ops before the persistence flag is set. | ||
| // Capturing outside would let us observe pending ops while the | ||
| // flag is still unset, causing us to flush monitor writes | ||
| // without persisting the ChannelManager. | ||
| let mut pending_monitor_writes = 0; | ||
|
|
||
| if channel_manager.get_cm().get_and_clear_needs_persistence() { | ||
| pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); | ||
| log_trace!(logger, "Persisting ChannelManager..."); | ||
| (kv_store.write( | ||
| CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, | ||
|
|
@@ -1695,6 +1724,10 @@ impl BackgroundProcessor { | |
| log_trace!(logger, "Done persisting ChannelManager."); | ||
| } | ||
|
|
||
| // Flush monitor operations that were pending before we persisted. New | ||
| // updates that arrived after are left for the next iteration. | ||
| chain_monitor.get_cm().flush(pending_monitor_writes, &logger); | ||
|
|
||
| if let Some(liquidity_manager) = liquidity_manager.as_ref() { | ||
| log_trace!(logger, "Persisting LiquidityManager..."); | ||
| let _ = liquidity_manager.get_lm().persist().map_err(|e| { | ||
|
|
@@ -1809,12 +1842,17 @@ impl BackgroundProcessor { | |
| // After we exit, ensure we persist the ChannelManager one final time - this avoids | ||
| // some races where users quit while channel updates were in-flight, with | ||
| // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. | ||
| let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); | ||
| kv_store.write( | ||
| CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, | ||
| CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, | ||
| CHANNEL_MANAGER_PERSISTENCE_KEY, | ||
| channel_manager.get_cm().encode(), | ||
| )?; | ||
|
|
||
| // Flush monitor operations that were pending before final persistence. | ||
| chain_monitor.get_cm().flush(pending_monitor_writes, &logger); | ||
|
|
||
| if let Some(ref scorer) = scorer { | ||
| kv_store.write( | ||
| SCORER_PERSISTENCE_PRIMARY_NAMESPACE, | ||
|
|
@@ -1896,9 +1934,10 @@ mod tests { | |
| use bitcoin::transaction::{Transaction, TxOut}; | ||
| use bitcoin::{Amount, ScriptBuf, Txid}; | ||
| use core::sync::atomic::{AtomicBool, Ordering}; | ||
| use lightning::chain::chainmonitor; | ||
| use lightning::chain::channelmonitor::ANTI_REORG_DELAY; | ||
| use lightning::chain::transaction::OutPoint; | ||
| use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter}; | ||
| use lightning::chain::{BestBlock, Confirm, Filter}; | ||
| use lightning::events::{Event, PathFailure, ReplayEvent}; | ||
| use lightning::ln::channelmanager; | ||
| use lightning::ln::channelmanager::{ | ||
|
|
@@ -2444,6 +2483,7 @@ mod tests { | |
| Arc::clone(&kv_store), | ||
| Arc::clone(&keys_manager), | ||
| keys_manager.get_peer_storage_key(), | ||
| true, | ||
| )); | ||
| let best_block = BestBlock::from_network(network); | ||
| let params = ChainParameters { network, best_block }; | ||
|
|
@@ -2567,6 +2607,8 @@ mod tests { | |
| (persist_dir, nodes) | ||
| } | ||
|
|
||
| /// Opens a channel between two nodes without a running `BackgroundProcessor`, | ||
| /// so deferred monitor operations are flushed manually at each step. | ||
| macro_rules! open_channel { | ||
| ($node_a: expr, $node_b: expr, $channel_value: expr) => {{ | ||
| begin_open_channel!($node_a, $node_b, $channel_value); | ||
|
|
@@ -2582,19 +2624,31 @@ mod tests { | |
| tx.clone(), | ||
| ) | ||
| .unwrap(); | ||
| // funding_transaction_generated does not call watch_channel, so no | ||
| // deferred op is queued and FundingCreated is available immediately. | ||
| let msg_a = get_event_msg!( | ||
| $node_a, | ||
| MessageSendEvent::SendFundingCreated, | ||
| $node_b.node.get_our_node_id() | ||
| ); | ||
| $node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a); | ||
| // Flush node_b's new monitor (watch_channel) so it releases the | ||
| // FundingSigned message. | ||
| $node_b | ||
| .chain_monitor | ||
| .flush($node_b.chain_monitor.pending_operation_count(), &$node_b.logger); | ||
| get_event!($node_b, Event::ChannelPending); | ||
| let msg_b = get_event_msg!( | ||
| $node_b, | ||
| MessageSendEvent::SendFundingSigned, | ||
| $node_a.node.get_our_node_id() | ||
| ); | ||
| $node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b); | ||
| // Flush node_a's new monitor (watch_channel) queued by | ||
| // handle_funding_signed. | ||
| $node_a | ||
| .chain_monitor | ||
| .flush($node_a.chain_monitor.pending_operation_count(), &$node_a.logger); | ||
| get_event!($node_a, Event::ChannelPending); | ||
| tx | ||
| }}; | ||
|
|
@@ -2720,6 +2774,20 @@ mod tests { | |
| confirm_transaction_depth(node, tx, ANTI_REORG_DELAY); | ||
| } | ||
|
|
||
| /// Waits until the background processor has flushed all pending deferred monitor | ||
| /// operations for the given node. Panics if the pending count does not reach zero | ||
| /// within `EVENT_DEADLINE`. | ||
| fn wait_for_flushed(chain_monitor: &ChainMonitor) { | ||
| let start = std::time::Instant::now(); | ||
| while chain_monitor.pending_operation_count() > 0 { | ||
| assert!( | ||
| start.elapsed() < EVENT_DEADLINE, | ||
| "Pending monitor operations were not flushed within deadline" | ||
| ); | ||
| std::thread::sleep(Duration::from_millis(10)); | ||
| } | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_background_processor() { | ||
| // Test that when a new channel is created, the ChannelManager needs to be re-persisted with | ||
|
|
@@ -3060,11 +3128,21 @@ mod tests { | |
| .node | ||
| .funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone()) | ||
| .unwrap(); | ||
| // funding_transaction_generated does not call watch_channel, so no deferred op is | ||
| // queued and the FundingCreated message is available immediately. | ||
| let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id); | ||
| nodes[1].node.handle_funding_created(node_0_id, &msg_0); | ||
| // Node 1 has no bg processor, flush its new monitor (watch_channel) manually so | ||
| // events and FundingSigned are released. | ||
| nodes[1] | ||
| .chain_monitor | ||
| .flush(nodes[1].chain_monitor.pending_operation_count(), &nodes[1].logger); | ||
| get_event!(nodes[1], Event::ChannelPending); | ||
| let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id); | ||
| nodes[0].node.handle_funding_signed(node_1_id, &msg_1); | ||
| // Wait for the bg processor to flush the new monitor (watch_channel) queued by | ||
| // handle_funding_signed. | ||
| wait_for_flushed(&nodes[0].chain_monitor); | ||
| channel_pending_recv | ||
| .recv_timeout(EVENT_DEADLINE) | ||
| .expect("ChannelPending not handled within deadline"); | ||
|
|
@@ -3125,6 +3203,9 @@ mod tests { | |
| error_message.to_string(), | ||
| ) | ||
| .unwrap(); | ||
| // Wait for the bg processor to flush the monitor update triggered by force close | ||
| // so the commitment tx is broadcast. | ||
| wait_for_flushed(&nodes[0].chain_monitor); | ||
| let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap(); | ||
| confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32); | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.