connectd: fix forwarding after tx_abort.

If we get a WIRE_TX_ABORT then another message, we send the other message to the same
subd (even though the tx abort causes it to shutdown).  This means we effectively
lose the next message, and timeout (see below from CI, reproduced locally).

So, have connectd ignore the subd after it forwards the WIRE_TX_ABORT.  The next
message will, correctly, cause a fresh subdaemon to be spawned.

```
    @unittest.skipIf(TEST_NETWORK != 'regtest', 'elementsd doesnt yet support PSBT features we need')
    @pytest.mark.openchannel('v2')
    def test_v2_rbf_multi(node_factory, bitcoind, chainparams):
        l1, l2 = node_factory.get_nodes(2,
                                        opts={'may_reconnect': True,
                                              'dev-no-reconnect': None,
                                              'allow_warning': True})
    
        l1.rpc.connect(l2.info['id'], 'localhost', l2.port)
        amount = 2**24
        chan_amount = 100000
        bitcoind.rpc.sendtoaddress(l1.rpc.newaddr()['bech32'], amount / 10**8 + 0.01)
        bitcoind.generate_block(1)
        # Wait for it to arrive.
        wait_for(lambda: len(l1.rpc.listfunds()['outputs']) > 0)
    
        res = l1.rpc.fundchannel(l2.info['id'], chan_amount)
        chan_id = res['channel_id']
        vins = bitcoind.rpc.decoderawtransaction(res['tx'])['vin']
        assert(only_one(vins))
        prev_utxos = ["{}:{}".format(vins[0]['txid'], vins[0]['vout'])]
    
        # Check that we're waiting for lockin
        l1.daemon.wait_for_log(' to DUALOPEND_AWAITING_LOCKIN')
    
        # Attempt to do abort, should fail since we've
        # already gotten an inflight
        with pytest.raises(RpcError):
            l1.rpc.openchannel_abort(chan_id)
    
        rate = int(find_next_feerate(l1, l2)[:-5])
        # We 4x the feerate to beat the min-relay fee
        next_feerate = '{}perkw'.format(rate * 4)
    
        # Initiate an RBF
        startweight = 42 + 172  # base weight, funding output
        initpsbt = l1.rpc.utxopsbt(chan_amount, next_feerate, startweight,
                                   prev_utxos, reservedok=True,
                                   min_witness_weight=110,
                                   excess_as_change=True)
    
        # Do the bump
        bump = l1.rpc.openchannel_bump(chan_id, chan_amount,
                                       initpsbt['psbt'],
                                       funding_feerate=next_feerate)
    
        # Abort this open attempt! We will re-try
        aborted = l1.rpc.openchannel_abort(chan_id)
        assert not aborted['channel_canceled']
        # We no longer disconnect on aborts, because magic!
        assert only_one(l1.rpc.listpeers()['peers'])['connected']
    
        # Do the bump, again, same feerate
>       bump = l1.rpc.openchannel_bump(chan_id, chan_amount,
                                       initpsbt['psbt'],
                                       funding_feerate=next_feerate)

tests/test_opening.py:668: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
contrib/pyln-client/pyln/client/lightning.py:1206: in openchannel_bump
    return self.call("openchannel_bump", payload)
contrib/pyln-testing/pyln/testing/utils.py:718: in call
    res = LightningRpc.call(self, method, payload, cmdprefix, filter)
contrib/pyln-client/pyln/client/lightning.py:398: in call
    resp, buf = self._readobj(sock, buf)
contrib/pyln-client/pyln/client/lightning.py:315: in _readobj
    b = sock.recv(max(1024, len(buff)))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pyln.client.lightning.UnixSocket object at 0x7f34675aae80>
length = 1024

    def recv(self, length: int) -> bytes:
        if self.sock is None:
            raise socket.error("not connected")
    
>       return self.sock.recv(length)
E       Failed: Timeout >1200.0s
```
This commit is contained in:
Rusty Russell 2023-10-23 16:34:35 +10:30
parent 920e50db6b
commit 4216affe90
1 changed files with 21 additions and 2 deletions

View File

@ -58,6 +58,9 @@ struct subd {
/* Output buffer */
struct msg_queue *outq;
/* After we've told it to tx_abort, we don't send anything else. */
bool rcvd_tx_abort;
};
static struct subd *find_subd(struct peer *peer,
@ -66,6 +69,10 @@ static struct subd *find_subd(struct peer *peer,
for (size_t i = 0; i < tal_count(peer->subds); i++) {
struct subd *subd = peer->subds[i];
/* Once we sent it tx_abort, we pretend it doesn't exist */
if (subd->rcvd_tx_abort)
continue;
/* Once we see a message using the real channel_id, we
* clear the temporary_channel_id */
if (channel_id_eq(&subd->channel_id, channel_id)) {
@ -1040,6 +1047,7 @@ static struct subd *new_subd(struct peer *peer,
subd->temporary_channel_id = NULL;
subd->opener_revocation_basepoint = NULL;
subd->conn = NULL;
subd->rcvd_tx_abort = false;
/* Connect it to the peer */
tal_arr_expand(&peer->subds, subd);
@ -1056,6 +1064,8 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn,
u8 *decrypted;
struct channel_id channel_id;
struct subd *subd;
enum peer_wire type;
decrypted = cryptomsg_decrypt_body(tmpctx, &peer->cs,
peer->peer_in);
@ -1066,6 +1076,8 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn,
}
tal_free(peer->peer_in);
type = fromwire_peektype(decrypted);
/* dev_disconnect can disable read */
if (!peer->dev_read_enabled)
return read_hdr_from_peer(peer_conn, peer);
@ -1083,8 +1095,6 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn,
/* After this we should be able to match to subd by channel_id */
if (!extract_channel_id(decrypted, &channel_id)) {
enum peer_wire type = fromwire_peektype(decrypted);
/* We won't log this anywhere else, so do it here. */
status_peer_io(LOG_IO_IN, &peer->id, decrypted);
@ -1137,6 +1147,15 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn,
/* Tell them to write. */
msg_enqueue(subd->outq, take(decrypted));
/* Is this a tx_abort? Ignore from now on, and close after sending! */
if (type == WIRE_TX_ABORT) {
subd->rcvd_tx_abort = true;
/* In case it doesn't close by itself */
notleak(new_reltimer(&peer->daemon->timers, subd,
time_from_sec(5),
close_subd_timeout, subd));
}
/* Wait for them to wake us */
return io_wait(peer_conn, &peer->peer_in, read_hdr_from_peer, peer);
}