diff --git a/Cargo.lock b/Cargo.lock index 14a10e7a594a..d9b4275127d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -504,6 +504,7 @@ dependencies = [ "anyhow", "bytes", "cln-grpc", + "cln-rpc", "futures", "log", "serde", diff --git a/contrib/pyln-testing/pyln/testing/fixtures.py b/contrib/pyln-testing/pyln/testing/fixtures.py index 335667cfe979..2b5556c65215 100644 --- a/contrib/pyln-testing/pyln/testing/fixtures.py +++ b/contrib/pyln-testing/pyln/testing/fixtures.py @@ -193,13 +193,7 @@ def bitcoind(request, directory, teardown_checks): yield bitcoind - try: - bitcoind.stop() - except Exception: - bitcoind.proc.kill() - bitcoind.proc.wait() - - bitcoind.cleanup_files() + bitcoind.kill() class TeardownErrors(object): diff --git a/contrib/pyln-testing/pyln/testing/utils.py b/contrib/pyln-testing/pyln/testing/utils.py index 1065838c5867..e1d1cfa56b3d 100644 --- a/contrib/pyln-testing/pyln/testing/utils.py +++ b/contrib/pyln-testing/pyln/testing/utils.py @@ -13,6 +13,7 @@ from pyln.client import NodeVersion import ephemeral_port_reserve # type: ignore +import tempfile import json import logging import lzma @@ -166,25 +167,46 @@ def get_tx_p2wsh_outnum(bitcoind, tx, amount): return None -unused_port_lock = threading.Lock() -unused_port_set = set() +_PORT_LOCK_DIR = Path(tempfile.gettempdir()) / "pyln-testing-ports" +_PORT_LOCK_DIR.mkdir(exist_ok=True) def reserve_unused_port(): """Get an unused port: avoids handing out the same port unless it's been returned""" - with unused_port_lock: - while True: - port = ephemeral_port_reserve.reserve() - if port not in unused_port_set: - break - unused_port_set.add(port) + while True: + port = ephemeral_port_reserve.reserve() - return port + lock_path = _PORT_LOCK_DIR / f"{port}.lock" + try: + fd = os.open(str(lock_path), os.O_CREAT | os.O_EXCL | os.O_WRONLY) + os.write(fd, str(os.getpid()).encode()) + os.close(fd) + return port + except FileExistsError: + continue def drop_unused_port(port): - unused_port_set.remove(port) + if port: + lock_path = _PORT_LOCK_DIR / f"{port}.lock" + lock_path.unlink(missing_ok=True) + + +def cleanup_stale_port_locks(): + """Remove lockfiles whose owning process no longer exists.""" + try: + for lock_path in _PORT_LOCK_DIR.glob("*.lock"): + try: + pid = int(lock_path.read_text()) + try: + os.kill(pid, 0) # signal 0 = existence check, no actual signal + except ProcessLookupError: + lock_path.unlink(missing_ok=True) + except (ValueError, PermissionError, FileNotFoundError): + pass + except Exception: + pass # best-effort, never crash the test run over cleanup class TailableProc(object): @@ -285,6 +307,8 @@ def kill(self): def cleanup_files(self): """Ensure files are closed.""" + cleanup_stale_port_locks() + for f in ["stdout_write", "stderr_write", "stdout_read", "stderr_read"]: try: getattr(self, f).close() @@ -454,10 +478,7 @@ def __init__(self, bitcoin_dir="/tmp/bitcoind-test", rpcport=None): TailableProc.__init__(self, bitcoin_dir, verbose=False) if rpcport is None: - self.reserved_rpcport = reserve_unused_port() - rpcport = self.reserved_rpcport - else: - self.reserved_rpcport = None + rpcport = reserve_unused_port() self.bitcoin_dir = bitcoin_dir self.rpcport = rpcport @@ -494,9 +515,15 @@ def __init__(self, bitcoin_dir="/tmp/bitcoind-test", rpcport=None): self.rpc = SimpleBitcoinProxy(btc_conf_file=self.conf_file) self.proxies = [] - def __del__(self): - if self.reserved_rpcport is not None: - drop_unused_port(self.reserved_rpcport) + def kill(self): + try: + self.stop() + except Exception: + self.proc.kill() + self.proc.wait() + + self.cleanup_files() + drop_unused_port(self.rpcport) def start(self, wallet_file=None): TailableProc.start(self) diff --git a/plugins/Cargo.toml b/plugins/Cargo.toml index 421b047636a2..4991e9dc17a2 100644 --- a/plugins/Cargo.toml +++ b/plugins/Cargo.toml @@ -35,3 +35,4 @@ tracing = { version = "^0.1", features = ["async-await", "log"] } [dev-dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread"] } cln-grpc = { workspace = true } +cln-rpc = { workspace = true } diff --git a/plugins/examples/cln-subscribe-wildcard.rs b/plugins/examples/cln-subscribe-wildcard.rs index c403e3ee9956..a8e67167c140 100644 --- a/plugins/examples/cln-subscribe-wildcard.rs +++ b/plugins/examples/cln-subscribe-wildcard.rs @@ -22,6 +22,11 @@ async fn main() -> Result<()> { async fn handle_wildcard_notification(_plugin: Plugin<()>, value: serde_json::Value) -> Result<()> { let notification_type: String = value.as_object().unwrap().keys().next().unwrap().into(); + let notification: cln_rpc::Notification = serde_json::from_value(value)?; + + if let cln_rpc::Notification::Shutdown(_shutdown_notification) = notification { + std::process::exit(0); + } log::info!("Received notification {}", notification_type); Ok(()) diff --git a/tests/plugins/misc_notifications.py b/tests/plugins/misc_notifications.py index f11b4da0614a..9180ce7def77 100755 --- a/tests/plugins/misc_notifications.py +++ b/tests/plugins/misc_notifications.py @@ -2,9 +2,8 @@ """Plugin to be used to test miscellaneous notifications. """ -from pyln.client import Plugin, RpcError +from pyln.client import Plugin import sys -import pytest plugin = Plugin() @@ -36,14 +35,10 @@ def shutdown(plugin, **kwargs): plugin.rpc.getinfo() plugin.rpc.datastore(key='test', string='Allowed', mode="create-or-append") plugin.log("via plugin stop, datastore success") - except RpcError as e: - if e.error == {'code': -5, 'message': 'lightningd is shutting down'}: - # JSON RPC is disabled by now, but can do logging - with pytest.raises(RpcError, match=r'-5.*lightningd is shutting down'): - plugin.rpc.datastore(key='test', string='Not allowed', mode="create-or-append") - plugin.log("via lightningd shutdown, datastore failed") - else: - raise + except ConnectionRefusedError as e: + # lightningd shutdown, refusing RPC calls + plugin.log(str(e)) + sys.exit(0) sys.exit(0) diff --git a/tests/test_cln_rs.py b/tests/test_cln_rs.py index ea76002cd0e8..fab0d98ab67c 100644 --- a/tests/test_cln_rs.py +++ b/tests/test_cln_rs.py @@ -370,6 +370,8 @@ def test_grpc_listpeerchannels(bitcoind, node_factory): announce_channels=True, # Do not enforce scid-alias ) + wait_for_grpc_start(l1) + stub = l1.grpc res = stub.ListPeerChannels(clnpb.ListpeerchannelsRequest(id=None)) @@ -419,6 +421,8 @@ def test_rust_plugin_subscribe_wildcard(node_factory): def test_grpc_block_added_notifications(node_factory, bitcoind): l1 = node_factory.get_node() + wait_for_grpc_start(l1) + # Test the block_added notification # Start listening to block added events over grpc block_added_stream = l1.grpc.SubscribeBlockAdded(clnpb.StreamBlockAddedRequest()) @@ -435,6 +439,8 @@ def test_grpc_block_added_notifications(node_factory, bitcoind): def test_grpc_connect_notification(node_factory): l1, l2 = node_factory.get_nodes(2) + wait_for_grpc_start(l1) + # Test the connect notification connect_stream = l1.grpc.SubscribeConnect(clnpb.StreamConnectRequest()) @@ -451,6 +457,8 @@ def test_grpc_connect_notification(node_factory): def test_grpc_custommsg_notification(node_factory): l1, l2 = node_factory.get_nodes(2) + wait_for_grpc_start(l1) + # Test the connect notification custommsg_stream = l1.grpc.SubscribeCustomMsg(clnpb.StreamCustomMsgRequest()) l2.connect(l1)