connectd: give connections a chance to drain when lightningd says to disconnect, or peer disconnects.

We want to avoid lost messages in the common cases.

This generalizes our drain code, by giving the subds each 5 seconds to
close themselves, but continue to allow them to send us traffic (if
peer is still connected) and continue to send them traffic.

We continue to send traffic *out* to the peer (if it's still
connected), until all subds are gone.  We still have a 5 second timer
to close the connection to peer.

On reconnects, we don't do this "drain period" on reconnects: we kill
immediately.

We fix up one test which was looking for the "disconnect" message
explicitly.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell 2022-07-18 21:42:27 +09:30 committed by neil saitug
parent 9cff125590
commit 719d1384d1
5 changed files with 97 additions and 42 deletions

View File

@ -211,10 +211,15 @@ static void peer_connected_in(struct daemon *daemon,
tal_free(connect);
}
/*~ When we free a peer, we remove it from the daemon's hashtable */
/*~ When we free a peer, we remove it from the daemon's hashtable.
* We also call this manually if we want to elegantly drain peer's
* queues. */
void destroy_peer(struct peer *peer)
{
peer_htable_del(&peer->daemon->peers, peer);
assert(!peer->draining);
if (!peer_htable_del(&peer->daemon->peers, peer))
abort();
/* Tell gossipd to stop asking this peer gossip queries */
daemon_conn_send(peer->daemon->gossipd,
@ -225,6 +230,10 @@ void destroy_peer(struct peer *peer)
take(towire_connectd_peer_disconnect_done(NULL,
&peer->id,
peer->counter)));
/* This makes multiplex.c routines not feed us more, but
* *also* means that if we're freed directly, the ->to_peer
* destructor won't call drain_peer(). */
peer->draining = true;
}
/*~ This is where we create a new peer. */
@ -282,7 +291,7 @@ struct io_plan *peer_connected(struct io_conn *conn,
int subd_fd;
bool option_gossip_queries;
/* We remove any previous connection, on the assumption it's dead */
/* We remove any previous connection immediately, on the assumption it's dead */
peer = peer_htable_get(&daemon->peers, id);
if (peer)
tal_free(peer);
@ -1858,8 +1867,8 @@ static void peer_discard(struct daemon *daemon, const u8 *msg)
/* If it's reconnected already, it will learn soon. */
if (peer->counter != counter)
return;
status_peer_debug(&id, "disconnect");
tal_free(peer);
status_peer_debug(&id, "discard_peer");
drain_peer(peer);
}
/* lightningd tells us to send a msg and disconnect. */

View File

@ -81,39 +81,76 @@ static struct subd *find_subd(struct peer *peer,
return NULL;
}
/* We try to send the final messages, but if buffer is full and they're
* not reading, we have to give up. */
static void close_timeout(struct peer *peer)
/* Except for a reconnection, we finally free a peer when the io_conn
* is closed and all subds are gone. */
static void maybe_free_peer(struct peer *peer)
{
/* BROKEN means we'll trigger CI if we see it, though it's possible */
status_peer_broken(&peer->id, "Peer did not close, forcing close");
if (peer->to_peer)
return;
if (tal_count(peer->subds) != 0)
return;
status_debug("maybe_free_peer freeing peer!");
tal_free(peer);
}
/* We just want to send these messages out to the peer's connection,
* then close. We consider the peer dead to us (can be freed). */
static void drain_peer(struct peer *peer)
/* We try to send the final messages, but if buffer is full and they're
* not reading, we have to give up. */
static void close_peer_io_timeout(struct peer *peer)
{
/* BROKEN means we'll trigger CI if we see it, though it's possible */
status_peer_broken(&peer->id, "Peer did not close, forcing close");
io_close(peer->to_peer);
}
static void close_subd_timeout(struct subd *subd)
{
/* BROKEN means we'll trigger CI if we see it, though it's possible */
status_peer_broken(&subd->peer->id, "Subd did not close, forcing close");
io_close(subd->conn);
}
void drain_peer(struct peer *peer)
{
status_debug("drain_peer");
assert(!peer->draining);
/* This is a 5-second leak, worst case! */
notleak(peer);
/* Since we immediately free any subds we didn't connect yet,
* we need peer->to_peer set so it won't free peer! */
assert(peer->to_peer);
/* We no longer want subds feeding us more messages (they
* remove themselves from array when freed) */
while (tal_count(peer->subds))
tal_free(peer->subds[0]);
peer->draining = true;
/* Give the subds 5 seconds to close their fds to us. */
for (size_t i = 0; i < tal_count(peer->subds); i++) {
if (!peer->subds[i]->conn) {
/* Deletes itself from array, so be careful! */
tal_free(peer->subds[i]);
i--;
continue;
}
status_debug("drain_peer draining subd!");
notleak(new_reltimer(&peer->daemon->timers,
peer->subds[i], time_from_sec(5),
close_subd_timeout, peer->subds[i]));
/* Wake any outgoing queued on subd */
io_wake(peer->subds[i]->outq);
}
/* You have 5 seconds to drain... */
notleak(new_reltimer(&peer->daemon->timers,
peer, time_from_sec(5),
close_timeout, peer));
/* Wake them to ensure they notice the close! */
io_wake(&peer->subds);
if (peer->to_peer) {
/* You have 5 seconds to drain... */
notleak(new_reltimer(&peer->daemon->timers,
peer->to_peer, time_from_sec(5),
close_peer_io_timeout, peer));
}
/* Clean peer from hashtable; we no longer exist. */
destroy_peer(peer);
tal_del_destructor(peer, destroy_peer);
/* This is a 5-second leak, worst case! */
notleak(peer);
/* Start draining process! */
io_wake(peer->peer_outq);
}
@ -899,12 +936,13 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn,
/* Still nothing to send? */
if (!msg) {
/* Draining? We're done. */
if (peer->draining)
/* Draining? We're done when subds are done. */
if (peer->draining && tal_count(peer->subds) == 0)
return io_sock_shutdown(peer_conn);
/* If they want us to send gossip, do so now. */
msg = maybe_from_gossip_store(NULL, peer);
if (!peer->draining)
msg = maybe_from_gossip_store(NULL, peer);
if (!msg) {
/* Tell them to read again, */
io_wake(&peer->subds);
@ -992,6 +1030,9 @@ static void destroy_subd(struct subd *subd)
/* Make sure we try to keep reading from peer (might
* have been waiting for write_to_subd) */
io_wake(&peer->peer_in);
/* Maybe we were last subd out? */
maybe_free_peer(peer);
}
static struct subd *new_subd(struct peer *peer,
@ -1132,6 +1173,9 @@ static struct io_plan *subd_conn_init(struct io_conn *subd_conn,
struct subd *subd)
{
subd->conn = subd_conn;
/* subd is a child of the conn: free when it closes! */
tal_steal(subd->conn, subd);
return io_duplex(subd_conn,
read_from_subd(subd_conn, subd),
write_to_subd(subd_conn, subd));
@ -1140,18 +1184,15 @@ static struct io_plan *subd_conn_init(struct io_conn *subd_conn,
static void destroy_peer_conn(struct io_conn *peer_conn, struct peer *peer)
{
assert(peer->to_peer == peer_conn);
/* If subds need cleaning, this will do it */
if (!peer->draining)
drain_peer(peer);
peer->to_peer = NULL;
/* Flush internal connections if any: last one out will free peer. */
if (tal_count(peer->subds) != 0) {
for (size_t i = 0; i < tal_count(peer->subds); i++)
msg_wake(peer->subds[i]->outq);
return;
}
/* We never had any subds? Free peer (might already be being freed,
* as it's our parent, but that's allowed by tal). */
tal_free(peer);
/* Or if there were no subds, this will free the peer. */
maybe_free_peer(peer);
}
struct io_plan *multiplex_peer_setup(struct io_conn *peer_conn,
@ -1203,8 +1244,9 @@ void peer_connect_subd(struct daemon *daemon, const u8 *msg, int fd)
subd = new_subd(peer, &channel_id);
assert(!subd->conn);
/* This sets subd->conn inside subd_conn_init */
io_new_conn(subd, fd, subd_conn_init, subd);
/* This sets subd->conn inside subd_conn_init, and reparents subd! */
io_new_conn(peer, fd, subd_conn_init, subd);
}
/* Lightningd says to send a ping */

View File

@ -22,6 +22,10 @@ void multiplex_final_msg(struct peer *peer,
* this does io logging. */
void inject_peer_msg(struct peer *peer, const u8 *msg TAKES);
/* Start closing the peer: removes itself from hash table, frees itself
* once done. */
void drain_peer(struct peer *peer);
void setup_peer_gossip_store(struct peer *peer,
const struct feature_set *our_features,
const u8 *their_features);

View File

@ -2869,8 +2869,8 @@ def test_opener_feerate_reconnect(node_factory, bitcoind):
l2.daemon.wait_for_log(r'dev_disconnect: \-WIRE_COMMITMENT_SIGNED')
# Wait until they reconnect.
l1.daemon.wait_for_log('Peer transient failure in CHANNELD_NORMAL')
l1.daemon.wait_for_log('peer_disconnect_done')
l1.daemon.wait_for_logs(['Peer transient failure in CHANNELD_NORMAL',
'peer_disconnect_done'])
wait_for(lambda: l1.rpc.getpeer(l2.info['id'])['connected'])
# Should work normally.

View File

@ -354,7 +354,7 @@ def test_v2_rbf_liquidity_ad(node_factory, bitcoind, chainparams):
# l1 leases a channel from l2
l1.rpc.connect(l2.info['id'], 'localhost', l2.port)
rates = l1.rpc.dev_queryrates(l2.info['id'], amount, amount)
l1.daemon.wait_for_log('disconnect')
wait_for(lambda: l1.rpc.listpeers()['peers'] == [])
l1.rpc.connect(l2.info['id'], 'localhost', l2.port)
chan_id = l1.rpc.fundchannel(l2.info['id'], amount, request_amt=amount,
feerate='{}perkw'.format(feerate),