Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 1 addition & 7 deletions contrib/pyln-testing/pyln/testing/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,7 @@ def bitcoind(request, directory, teardown_checks):

yield bitcoind

try:
bitcoind.stop()
except Exception:
bitcoind.proc.kill()
bitcoind.proc.wait()

bitcoind.cleanup_files()
bitcoind.kill()


class TeardownErrors(object):
Expand Down
68 changes: 51 additions & 17 deletions contrib/pyln-testing/pyln/testing/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from pyln.client import NodeVersion

import ephemeral_port_reserve # type: ignore
import tempfile
import json
import logging
import lzma
Expand Down Expand Up @@ -166,25 +167,53 @@ def get_tx_p2wsh_outnum(bitcoind, tx, amount):
return None


unused_port_lock = threading.Lock()
unused_port_set = set()
_PORT_LOCK_DIR = Path(tempfile.gettempdir()) / "pyln-testing-ports"
_PORT_LOCK_DIR.mkdir(exist_ok=True)


def reserve_unused_port():
"""Get an unused port: avoids handing out the same port unless it's been
returned"""
with unused_port_lock:
while True:
port = ephemeral_port_reserve.reserve()
if port not in unused_port_set:
break
unused_port_set.add(port)
while True:
port = ephemeral_port_reserve.reserve()

return port
lock_path = _PORT_LOCK_DIR / f"{port}.lock"
try:
fd = os.open(str(lock_path), os.O_CREAT | os.O_EXCL | os.O_WRONLY)
os.write(fd, str(os.getpid()).encode())
os.close(fd)
return port
except FileExistsError:
continue


def drop_unused_port(port):
unused_port_set.remove(port)
if port:
lock_path = _PORT_LOCK_DIR / f"{port}.lock"
lock_path.unlink(missing_ok=True)


def cleanup_stale_port_locks():
"""Remove lockfiles whose owning process no longer exists."""
logging.info("Cleaning up stale port locks")
try:
for lock_path in _PORT_LOCK_DIR.glob("*.lock"):
logging.info(f"lock path: {str(lock_path)}")
try:
pid = int(lock_path.read_text())
logging.info(f"pid: {str(pid)}")
try:
os.kill(pid, 0) # signal 0 = existence check, no actual signal
logging.warning(f"process still alive, not cleaning up {str(lock_path)}")
except ProcessLookupError:
logging.info(f"process gone, cleaning up {str(lock_path)}")
lock_path.unlink(missing_ok=True)
except (ValueError, PermissionError, FileNotFoundError) as e:
logging.info(f"skipping {str(lock_path)}: {str(e)}")
pass
except Exception as e:
logging.info(f"cleanup error: {str(e)}")
pass # best-effort, never crash the test run over cleanup


class TailableProc(object):
Expand Down Expand Up @@ -285,6 +314,8 @@ def kill(self):

def cleanup_files(self):
"""Ensure files are closed."""
cleanup_stale_port_locks()

for f in ["stdout_write", "stderr_write", "stdout_read", "stderr_read"]:
try:
getattr(self, f).close()
Expand Down Expand Up @@ -454,10 +485,7 @@ def __init__(self, bitcoin_dir="/tmp/bitcoind-test", rpcport=None):
TailableProc.__init__(self, bitcoin_dir, verbose=False)

if rpcport is None:
self.reserved_rpcport = reserve_unused_port()
rpcport = self.reserved_rpcport
else:
self.reserved_rpcport = None
rpcport = reserve_unused_port()

self.bitcoin_dir = bitcoin_dir
self.rpcport = rpcport
Expand Down Expand Up @@ -494,9 +522,15 @@ def __init__(self, bitcoin_dir="/tmp/bitcoind-test", rpcport=None):
self.rpc = SimpleBitcoinProxy(btc_conf_file=self.conf_file)
self.proxies = []

def __del__(self):
if self.reserved_rpcport is not None:
drop_unused_port(self.reserved_rpcport)
def kill(self):
try:
self.stop()
except Exception:
self.proc.kill()
self.proc.wait()

self.cleanup_files()
drop_unused_port(self.rpcport)

def start(self, wallet_file=None):
TailableProc.start(self)
Expand Down
1 change: 1 addition & 0 deletions plugins/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ tracing = { version = "^0.1", features = ["async-await", "log"] }
[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
cln-grpc = { workspace = true }
cln-rpc = { workspace = true }
5 changes: 5 additions & 0 deletions plugins/examples/cln-subscribe-wildcard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ async fn main() -> Result<()> {

async fn handle_wildcard_notification(_plugin: Plugin<()>, value: serde_json::Value) -> Result<()> {
let notification_type: String = value.as_object().unwrap().keys().next().unwrap().into();
let notification: cln_rpc::Notification = serde_json::from_value(value)?;

if let cln_rpc::Notification::Shutdown(_shutdown_notification) = notification {
std::process::exit(0);
}

log::info!("Received notification {}", notification_type);
Ok(())
Expand Down
95 changes: 91 additions & 4 deletions plugins/grpc-plugin/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use anyhow::{Context, Result};
use anyhow::{Context, Result, anyhow};
use cln_grpc::pb::node_server::NodeServer;
use cln_plugin::{Builder, Plugin, options};
use cln_rpc::notifications::Notification;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::time::Duration;
use tokio::sync::broadcast;

mod tls;
Expand Down Expand Up @@ -159,15 +160,101 @@ async fn run_interface(bind_addr: SocketAddr, state: PluginState) -> Result<()>
.serve(bind_addr);

log::info!(
"Connecting to {:?} and serving grpc on {:?}",
&state.rpc_path,
"Connecting to {} and serving grpc on {}",
state.rpc_path.display(),
&bind_addr
);

server.await.context("serving requests")?;
match server.await {
Ok(()) => (),
Err(e) => {
debug_who_uses_port(bind_addr.port());
tokio::time::sleep(Duration::from_secs(2)).await;
return Err(anyhow!("serving requests: {e}"));
}
}

Ok(())
}
fn debug_who_uses_port(port: u16) {
log::info!("🔍 Investigating who is using port {}...", port);

// First, get the raw socket info
let output = std::process::Command::new("ss")
.args(["-ltnp", &format!("sport = :{}", port)])
.output();

if let Ok(out) = output {
let text = String::from_utf8_lossy(&out.stdout);
if !text.trim().is_empty() {
log::info!("Socket information:\n{}", text);
}
}

// Now extract PIDs and get detailed process info
if let Ok(out) = std::process::Command::new("ss")
.args(["-ltnp", &format!("sport = :{}", port)])
.output()
{
let text = String::from_utf8_lossy(&out.stdout);
let lines: Vec<&str> = text.lines().collect();

for line in lines {
if let Some(pid_start) = line.find("pid=") {
if let Some(pid_part) = line[pid_start..].split(',').next() {
let pid_str = pid_part.trim_start_matches("pid=");
if let Ok(pid) = pid_str.parse::<u32>() {
log::info!("Found process with PID {} holding the port", pid);
show_process_details(pid);
}
}
}
}
}

// Fallback tools
for (name, args) in [
("lsof", vec!["-i", &format!(":{}", port)]),
("netstat", vec!["-ltnp"]),
] {
if let Ok(out) = std::process::Command::new(name).args(args).output() {
let s = String::from_utf8_lossy(&out.stdout);
if !s.trim().is_empty() {
log::info!("\n{} output:\n{}", name.to_uppercase(), s);
}
}
}
}

fn show_process_details(pid: u32) {
let pid_str = pid.to_string();
let env = format!("/proc/{}/environ", pid);
let commands = vec![
// Full command line
vec!["ps", "-p", &pid_str, "-o", "pid,ppid,user,comm,args"],
// Process tree
vec!["pstree", "-p", "-a", &pid_str],
// Environment (useful in CI)
vec!["cat", &env],
];

for cmd in commands {
log::info!("\nRunning: {} {}", cmd[0], cmd[1..].join(" "));
match std::process::Command::new(cmd[0]).args(&cmd[1..]).output() {
Ok(output) => {
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
if !stdout.trim().is_empty() {
log::info!("{}", stdout);
}
if !stderr.trim().is_empty() {
log::warn!("{}", stderr);
}
}
Err(e) => log::warn!("Failed to run {:?}: {}", cmd, e),
}
}
}

async fn handle_notification(plugin: Plugin<PluginState>, value: serde_json::Value) -> Result<()> {
let notification: Result<Notification, _> = serde_json::from_value(value);
Expand Down
15 changes: 5 additions & 10 deletions tests/plugins/misc_notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
"""Plugin to be used to test miscellaneous notifications.
"""

from pyln.client import Plugin, RpcError
from pyln.client import Plugin
import sys
import pytest

plugin = Plugin()

Expand Down Expand Up @@ -36,14 +35,10 @@ def shutdown(plugin, **kwargs):
plugin.rpc.getinfo()
plugin.rpc.datastore(key='test', string='Allowed', mode="create-or-append")
plugin.log("via plugin stop, datastore success")
except RpcError as e:
if e.error == {'code': -5, 'message': 'lightningd is shutting down'}:
# JSON RPC is disabled by now, but can do logging
with pytest.raises(RpcError, match=r'-5.*lightningd is shutting down'):
plugin.rpc.datastore(key='test', string='Not allowed', mode="create-or-append")
plugin.log("via lightningd shutdown, datastore failed")
else:
raise
except ConnectionRefusedError as e:
# lightningd shutdown, refusing RPC calls
plugin.log(str(e))
sys.exit(0)

sys.exit(0)

Expand Down
8 changes: 8 additions & 0 deletions tests/test_cln_rs.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,8 @@ def test_grpc_listpeerchannels(bitcoind, node_factory):
announce_channels=True, # Do not enforce scid-alias
)

wait_for_grpc_start(l1)

stub = l1.grpc
res = stub.ListPeerChannels(clnpb.ListpeerchannelsRequest(id=None))

Expand Down Expand Up @@ -419,6 +421,8 @@ def test_rust_plugin_subscribe_wildcard(node_factory):
def test_grpc_block_added_notifications(node_factory, bitcoind):
l1 = node_factory.get_node()

wait_for_grpc_start(l1)

# Test the block_added notification
# Start listening to block added events over grpc
block_added_stream = l1.grpc.SubscribeBlockAdded(clnpb.StreamBlockAddedRequest())
Expand All @@ -435,6 +439,8 @@ def test_grpc_block_added_notifications(node_factory, bitcoind):
def test_grpc_connect_notification(node_factory):
l1, l2 = node_factory.get_nodes(2)

wait_for_grpc_start(l1)

# Test the connect notification
connect_stream = l1.grpc.SubscribeConnect(clnpb.StreamConnectRequest())

Expand All @@ -451,6 +457,8 @@ def test_grpc_connect_notification(node_factory):
def test_grpc_custommsg_notification(node_factory):
l1, l2 = node_factory.get_nodes(2)

wait_for_grpc_start(l1)

# Test the connect notification
custommsg_stream = l1.grpc.SubscribeCustomMsg(clnpb.StreamCustomMsgRequest())
l2.connect(l1)
Expand Down
Loading