From 719d1384d15b3bb782a7f09c14aec6d68edb7ed9 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Mon, 18 Jul 2022 21:42:27 +0930 Subject: [PATCH] connectd: give connections a chance to drain when lightningd says to disconnect, or peer disconnects. We want to avoid lost messages in the common cases. This generalizes our drain code, by giving the subds each 5 seconds to close themselves, but continue to allow them to send us traffic (if peer is still connected) and continue to send them traffic. We continue to send traffic *out* to the peer (if it's still connected), until all subds are gone. We still have a 5 second timer to close the connection to peer. On reconnects, we don't do this "drain period" on reconnects: we kill immediately. We fix up one test which was looking for the "disconnect" message explicitly. Signed-off-by: Rusty Russell --- connectd/connectd.c | 19 +++++-- connectd/multiplex.c | 110 +++++++++++++++++++++++++++------------ connectd/multiplex.h | 4 ++ tests/test_connection.py | 4 +- tests/test_opening.py | 2 +- 5 files changed, 97 insertions(+), 42 deletions(-) diff --git a/connectd/connectd.c b/connectd/connectd.c index 5d57fd8f7..08a229ef0 100644 --- a/connectd/connectd.c +++ b/connectd/connectd.c @@ -211,10 +211,15 @@ static void peer_connected_in(struct daemon *daemon, tal_free(connect); } -/*~ When we free a peer, we remove it from the daemon's hashtable */ +/*~ When we free a peer, we remove it from the daemon's hashtable. + * We also call this manually if we want to elegantly drain peer's + * queues. */ void destroy_peer(struct peer *peer) { - peer_htable_del(&peer->daemon->peers, peer); + assert(!peer->draining); + + if (!peer_htable_del(&peer->daemon->peers, peer)) + abort(); /* Tell gossipd to stop asking this peer gossip queries */ daemon_conn_send(peer->daemon->gossipd, @@ -225,6 +230,10 @@ void destroy_peer(struct peer *peer) take(towire_connectd_peer_disconnect_done(NULL, &peer->id, peer->counter))); + /* This makes multiplex.c routines not feed us more, but + * *also* means that if we're freed directly, the ->to_peer + * destructor won't call drain_peer(). */ + peer->draining = true; } /*~ This is where we create a new peer. */ @@ -282,7 +291,7 @@ struct io_plan *peer_connected(struct io_conn *conn, int subd_fd; bool option_gossip_queries; - /* We remove any previous connection, on the assumption it's dead */ + /* We remove any previous connection immediately, on the assumption it's dead */ peer = peer_htable_get(&daemon->peers, id); if (peer) tal_free(peer); @@ -1858,8 +1867,8 @@ static void peer_discard(struct daemon *daemon, const u8 *msg) /* If it's reconnected already, it will learn soon. */ if (peer->counter != counter) return; - status_peer_debug(&id, "disconnect"); - tal_free(peer); + status_peer_debug(&id, "discard_peer"); + drain_peer(peer); } /* lightningd tells us to send a msg and disconnect. */ diff --git a/connectd/multiplex.c b/connectd/multiplex.c index 2ae1b7514..6d583cc0b 100644 --- a/connectd/multiplex.c +++ b/connectd/multiplex.c @@ -81,39 +81,76 @@ static struct subd *find_subd(struct peer *peer, return NULL; } -/* We try to send the final messages, but if buffer is full and they're - * not reading, we have to give up. */ -static void close_timeout(struct peer *peer) +/* Except for a reconnection, we finally free a peer when the io_conn + * is closed and all subds are gone. */ +static void maybe_free_peer(struct peer *peer) { - /* BROKEN means we'll trigger CI if we see it, though it's possible */ - status_peer_broken(&peer->id, "Peer did not close, forcing close"); + if (peer->to_peer) + return; + if (tal_count(peer->subds) != 0) + return; + status_debug("maybe_free_peer freeing peer!"); tal_free(peer); } -/* We just want to send these messages out to the peer's connection, - * then close. We consider the peer dead to us (can be freed). */ -static void drain_peer(struct peer *peer) +/* We try to send the final messages, but if buffer is full and they're + * not reading, we have to give up. */ +static void close_peer_io_timeout(struct peer *peer) { + /* BROKEN means we'll trigger CI if we see it, though it's possible */ + status_peer_broken(&peer->id, "Peer did not close, forcing close"); + io_close(peer->to_peer); +} + +static void close_subd_timeout(struct subd *subd) +{ + /* BROKEN means we'll trigger CI if we see it, though it's possible */ + status_peer_broken(&subd->peer->id, "Subd did not close, forcing close"); + io_close(subd->conn); +} + +void drain_peer(struct peer *peer) +{ + status_debug("drain_peer"); assert(!peer->draining); - /* This is a 5-second leak, worst case! */ - notleak(peer); + /* Since we immediately free any subds we didn't connect yet, + * we need peer->to_peer set so it won't free peer! */ + assert(peer->to_peer); - /* We no longer want subds feeding us more messages (they - * remove themselves from array when freed) */ - while (tal_count(peer->subds)) - tal_free(peer->subds[0]); - peer->draining = true; + /* Give the subds 5 seconds to close their fds to us. */ + for (size_t i = 0; i < tal_count(peer->subds); i++) { + if (!peer->subds[i]->conn) { + /* Deletes itself from array, so be careful! */ + tal_free(peer->subds[i]); + i--; + continue; + } + status_debug("drain_peer draining subd!"); + notleak(new_reltimer(&peer->daemon->timers, + peer->subds[i], time_from_sec(5), + close_subd_timeout, peer->subds[i])); + /* Wake any outgoing queued on subd */ + io_wake(peer->subds[i]->outq); + } - /* You have 5 seconds to drain... */ - notleak(new_reltimer(&peer->daemon->timers, - peer, time_from_sec(5), - close_timeout, peer)); + /* Wake them to ensure they notice the close! */ + io_wake(&peer->subds); + + if (peer->to_peer) { + /* You have 5 seconds to drain... */ + notleak(new_reltimer(&peer->daemon->timers, + peer->to_peer, time_from_sec(5), + close_peer_io_timeout, peer)); + } /* Clean peer from hashtable; we no longer exist. */ destroy_peer(peer); tal_del_destructor(peer, destroy_peer); + /* This is a 5-second leak, worst case! */ + notleak(peer); + /* Start draining process! */ io_wake(peer->peer_outq); } @@ -899,12 +936,13 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn, /* Still nothing to send? */ if (!msg) { - /* Draining? We're done. */ - if (peer->draining) + /* Draining? We're done when subds are done. */ + if (peer->draining && tal_count(peer->subds) == 0) return io_sock_shutdown(peer_conn); /* If they want us to send gossip, do so now. */ - msg = maybe_from_gossip_store(NULL, peer); + if (!peer->draining) + msg = maybe_from_gossip_store(NULL, peer); if (!msg) { /* Tell them to read again, */ io_wake(&peer->subds); @@ -992,6 +1030,9 @@ static void destroy_subd(struct subd *subd) /* Make sure we try to keep reading from peer (might * have been waiting for write_to_subd) */ io_wake(&peer->peer_in); + + /* Maybe we were last subd out? */ + maybe_free_peer(peer); } static struct subd *new_subd(struct peer *peer, @@ -1132,6 +1173,9 @@ static struct io_plan *subd_conn_init(struct io_conn *subd_conn, struct subd *subd) { subd->conn = subd_conn; + + /* subd is a child of the conn: free when it closes! */ + tal_steal(subd->conn, subd); return io_duplex(subd_conn, read_from_subd(subd_conn, subd), write_to_subd(subd_conn, subd)); @@ -1140,18 +1184,15 @@ static struct io_plan *subd_conn_init(struct io_conn *subd_conn, static void destroy_peer_conn(struct io_conn *peer_conn, struct peer *peer) { assert(peer->to_peer == peer_conn); + + /* If subds need cleaning, this will do it */ + if (!peer->draining) + drain_peer(peer); + peer->to_peer = NULL; - /* Flush internal connections if any: last one out will free peer. */ - if (tal_count(peer->subds) != 0) { - for (size_t i = 0; i < tal_count(peer->subds); i++) - msg_wake(peer->subds[i]->outq); - return; - } - - /* We never had any subds? Free peer (might already be being freed, - * as it's our parent, but that's allowed by tal). */ - tal_free(peer); + /* Or if there were no subds, this will free the peer. */ + maybe_free_peer(peer); } struct io_plan *multiplex_peer_setup(struct io_conn *peer_conn, @@ -1203,8 +1244,9 @@ void peer_connect_subd(struct daemon *daemon, const u8 *msg, int fd) subd = new_subd(peer, &channel_id); assert(!subd->conn); - /* This sets subd->conn inside subd_conn_init */ - io_new_conn(subd, fd, subd_conn_init, subd); + + /* This sets subd->conn inside subd_conn_init, and reparents subd! */ + io_new_conn(peer, fd, subd_conn_init, subd); } /* Lightningd says to send a ping */ diff --git a/connectd/multiplex.h b/connectd/multiplex.h index 77a41fa2a..a3bda8ad3 100644 --- a/connectd/multiplex.h +++ b/connectd/multiplex.h @@ -22,6 +22,10 @@ void multiplex_final_msg(struct peer *peer, * this does io logging. */ void inject_peer_msg(struct peer *peer, const u8 *msg TAKES); +/* Start closing the peer: removes itself from hash table, frees itself + * once done. */ +void drain_peer(struct peer *peer); + void setup_peer_gossip_store(struct peer *peer, const struct feature_set *our_features, const u8 *their_features); diff --git a/tests/test_connection.py b/tests/test_connection.py index 24c7cec89..6b6d33ba2 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -2869,8 +2869,8 @@ def test_opener_feerate_reconnect(node_factory, bitcoind): l2.daemon.wait_for_log(r'dev_disconnect: \-WIRE_COMMITMENT_SIGNED') # Wait until they reconnect. - l1.daemon.wait_for_log('Peer transient failure in CHANNELD_NORMAL') - l1.daemon.wait_for_log('peer_disconnect_done') + l1.daemon.wait_for_logs(['Peer transient failure in CHANNELD_NORMAL', + 'peer_disconnect_done']) wait_for(lambda: l1.rpc.getpeer(l2.info['id'])['connected']) # Should work normally. diff --git a/tests/test_opening.py b/tests/test_opening.py index 3a454d50e..8d63b2a92 100644 --- a/tests/test_opening.py +++ b/tests/test_opening.py @@ -354,7 +354,7 @@ def test_v2_rbf_liquidity_ad(node_factory, bitcoind, chainparams): # l1 leases a channel from l2 l1.rpc.connect(l2.info['id'], 'localhost', l2.port) rates = l1.rpc.dev_queryrates(l2.info['id'], amount, amount) - l1.daemon.wait_for_log('disconnect') + wait_for(lambda: l1.rpc.listpeers()['peers'] == []) l1.rpc.connect(l2.info['id'], 'localhost', l2.port) chan_id = l1.rpc.fundchannel(l2.info['id'], amount, request_amt=amount, feerate='{}perkw'.format(feerate),