diff --git a/lightningd/htlc_end.c b/lightningd/htlc_end.c index ff89ada43..02a5ce97e 100644 --- a/lightningd/htlc_end.c +++ b/lightningd/htlc_end.c @@ -28,6 +28,22 @@ struct htlc_in *find_htlc_in(const struct htlc_in_map *map, return htlc_in_map_get(map, &key); } +struct htlc_in *remove_htlc_in_by_dbid(struct htlc_in_map *remaining_htlcs_in, + u64 dbid) +{ + struct htlc_in *hin; + struct htlc_in_map_iter ini; + + for (hin = htlc_in_map_first(remaining_htlcs_in, &ini); hin; + hin = htlc_in_map_next(remaining_htlcs_in, &ini)) { + if (hin->dbid == dbid) { + htlc_in_map_del(remaining_htlcs_in, hin); + return hin; + } + } + return NULL; +} + static void destroy_htlc_in(struct htlc_in *hend, struct htlc_in_map *map) { htlc_in_map_del(map, hend); diff --git a/lightningd/htlc_end.h b/lightningd/htlc_end.h index 16e20bb1e..bd9bd883a 100644 --- a/lightningd/htlc_end.h +++ b/lightningd/htlc_end.h @@ -58,7 +58,6 @@ struct htlc_out { * is saved to the database, must be >0 after saving to the * database. */ u64 dbid; - u64 origin_htlc_id; struct htlc_key key; struct amount_msat msat; u32 cltv_expiry; @@ -123,6 +122,10 @@ struct htlc_in *find_htlc_in(const struct htlc_in_map *map, const struct channel *channel, u64 htlc_id); +/* FIXME: Slow function only used at startup. */ +struct htlc_in *remove_htlc_in_by_dbid(struct htlc_in_map *remaining_htlcs_in, + u64 dbid); + struct htlc_out *find_htlc_out(const struct htlc_out_map *map, const struct channel *channel, u64 htlc_id); diff --git a/lightningd/lightningd.c b/lightningd/lightningd.c index 75f250a4a..876cc9834 100644 --- a/lightningd/lightningd.c +++ b/lightningd/lightningd.c @@ -630,7 +630,7 @@ int main(int argc, char *argv[]) int stop_fd; struct timers *timers; const char *stop_response; - struct htlc_in_map *unprocessed_htlcs; + struct htlc_in_map *unconnected_htlcs_in; struct rlimit nofile = {1024, 1024}; /*~ Make sure that we limit ourselves to something reasonable. Modesty @@ -778,7 +778,7 @@ int main(int argc, char *argv[]) * topology is initialized since some decisions rely on being able to * know the blockheight. */ db_begin_transaction(ld->wallet->db); - unprocessed_htlcs = load_channels_from_wallet(ld); + unconnected_htlcs_in = load_channels_from_wallet(ld); db_commit_transaction(ld->wallet->db); /*~ Create RPC socket: now lightning-cli can send us JSON RPC commands @@ -792,7 +792,7 @@ int main(int argc, char *argv[]) /*~ Process any HTLCs we were in the middle of when we exited, now * that plugins (who might want to know via htlc_accepted hook) are * active. */ - htlcs_resubmit(ld, unprocessed_htlcs); + htlcs_resubmit(ld, unconnected_htlcs_in); /*~ Activate connect daemon. Needs to be after the initialization of * chaintopology, otherwise peers may connect and ask for diff --git a/lightningd/peer_control.c b/lightningd/peer_control.c index 22806dca7..676f4fa17 100644 --- a/lightningd/peer_control.c +++ b/lightningd/peer_control.c @@ -1579,27 +1579,47 @@ void activate_peers(struct lightningd *ld) struct htlc_in_map *load_channels_from_wallet(struct lightningd *ld) { struct peer *peer; + struct htlc_in_map *unconnected_htlcs_in = tal(ld, struct htlc_in_map); /* Load channels from database */ if (!wallet_init_channels(ld->wallet)) fatal("Could not load channels from the database"); - /* This is a poor-man's db join :( */ + /* First we load the incoming htlcs */ list_for_each(&ld->peers, peer, list) { struct channel *channel; list_for_each(&peer->channels, channel, list) { - if (!wallet_htlcs_load_for_channel(ld->wallet, - channel, - &ld->htlcs_in, - &ld->htlcs_out)) { + if (!wallet_htlcs_load_in_for_channel(ld->wallet, + channel, + &ld->htlcs_in)) { fatal("could not load htlcs for channel"); } } } - /* Now connect HTLC pointers together */ - return htlcs_reconnect(ld, &ld->htlcs_in, &ld->htlcs_out); + /* Make a copy of the htlc_map: entries removed as they're matched */ + htlc_in_map_copy(unconnected_htlcs_in, &ld->htlcs_in); + + /* Now we load the outgoing HTLCs, so we can connect them. */ + list_for_each(&ld->peers, peer, list) { + struct channel *channel; + + list_for_each(&peer->channels, channel, list) { + if (!wallet_htlcs_load_out_for_channel(ld->wallet, + channel, + &ld->htlcs_out, + unconnected_htlcs_in)) { + fatal("could not load outgoing htlcs for channel"); + } + } + } + +#ifdef COMPAT_V061 + fixup_htlcs_out(ld); +#endif /* COMPAT_V061 */ + + return unconnected_htlcs_in; } static struct command_result *json_disconnect(struct command *cmd, diff --git a/lightningd/peer_htlcs.c b/lightningd/peer_htlcs.c index 1edb6b50b..77370da4a 100644 --- a/lightningd/peer_htlcs.c +++ b/lightningd/peer_htlcs.c @@ -1274,7 +1274,7 @@ static bool update_out_htlc(struct channel *channel, } /* For our own HTLCs, we commit payment to db lazily */ - if (hout->origin_htlc_id == 0) + if (hout->am_origin) payment_store(ld, &hout->payment_hash, hout->partid); } @@ -2089,94 +2089,35 @@ static void fixup_hout(struct lightningd *ld, struct htlc_out *hout) &hout->key.channel->peer->id), fix); } + +void fixup_htlcs_out(struct lightningd *ld) +{ + struct htlc_out_map_iter outi; + struct htlc_out *hout; + + for (hout = htlc_out_map_first(&ld->htlcs_out, &outi); + hout; + hout = htlc_out_map_next(&ld->htlcs_out, &outi)) { + if (!hout->am_origin) + fixup_hout(ld, hout); + } +} #endif /* COMPAT_V061 */ -/** - * htlcs_reconnect -- Link outgoing HTLCs to their origins after initial db load - * - * For each outgoing HTLC find the incoming HTLC that triggered it. If - * we are the origin of the transfer then we cannot resolve the - * incoming HTLC in which case we just leave it `NULL`. - * - * Returns a map of any htlcs we need to retry. - */ -struct htlc_in_map *htlcs_reconnect(struct lightningd *ld, - struct htlc_in_map *htlcs_in, - struct htlc_out_map *htlcs_out) -{ - struct htlc_in_map_iter ini; - struct htlc_out_map_iter outi; - struct htlc_in *hin; - struct htlc_out *hout; - struct htlc_in_map *unprocessed = tal(NULL, struct htlc_in_map); - - /* Any HTLCs which happened to be incoming and weren't forwarded before - * we shutdown/crashed: fail them now. - * - * Note that since we do local processing synchronously, so this never - * captures local payments. But if it did, it would be a tiny corner - * case. */ - htlc_in_map_init(unprocessed); - for (hin = htlc_in_map_first(htlcs_in, &ini); hin; - hin = htlc_in_map_next(htlcs_in, &ini)) { - if (hin->hstate == RCVD_ADD_ACK_REVOCATION) - htlc_in_map_add(unprocessed, hin); - } - - for (hout = htlc_out_map_first(htlcs_out, &outi); hout; - hout = htlc_out_map_next(htlcs_out, &outi)) { - - if (hout->am_origin) { - continue; - } - - /* For fulfilled HTLCs, we fulfill incoming before outgoing is - * completely resolved, so it's possible that we don't find - * the incoming. */ - for (hin = htlc_in_map_first(htlcs_in, &ini); hin; - hin = htlc_in_map_next(htlcs_in, &ini)) { - if (hout->origin_htlc_id == hin->dbid) { - log_debug(ld->log, - "Found corresponding htlc_in %" PRIu64 - " for htlc_out %" PRIu64, - hin->dbid, hout->dbid); - htlc_out_connect_htlc_in(hout, hin); - break; - } - } - - if (!hout->in && !hout->preimage) { -#ifdef COMPAT_V061 - log_broken(ld->log, - "Missing preimage for orphaned HTLC; replacing with zeros"); - hout->preimage = talz(hout, struct preimage); -#else - fatal("Unable to find corresponding htlc_in %"PRIu64 - " for unfulfilled htlc_out %"PRIu64, - hout->origin_htlc_id, hout->dbid); -#endif - } -#ifdef COMPAT_V061 - fixup_hout(ld, hout); -#endif - - if (hout->in) - htlc_in_map_del(unprocessed, hout->in); - } - - return unprocessed; -} - -void htlcs_resubmit(struct lightningd *ld, struct htlc_in_map *unprocessed) +void htlcs_resubmit(struct lightningd *ld, + struct htlc_in_map *unconnected_htlcs_in) { struct htlc_in *hin; struct htlc_in_map_iter ini; enum onion_type failcode COMPILER_WANTS_INIT("gcc7.4.0 bad, 8.3 OK"); - /* Now fail any which were stuck. */ - for (hin = htlc_in_map_first(unprocessed, &ini); + /* Now retry any which were stuck. */ + for (hin = htlc_in_map_first(unconnected_htlcs_in, &ini); hin; - hin = htlc_in_map_next(unprocessed, &ini)) { + hin = htlc_in_map_next(unconnected_htlcs_in, &ini)) { + if (hin->hstate != RCVD_ADD_ACK_REVOCATION) + continue; + log_unusual(hin->key.channel->log, "Replaying old unprocessed HTLC #%"PRIu64, hin->key.id); @@ -2192,8 +2133,8 @@ void htlcs_resubmit(struct lightningd *ld, struct htlc_in_map *unprocessed) } /* Don't leak memory! */ - htlc_in_map_clear(unprocessed); - tal_free(unprocessed); + htlc_in_map_clear(unconnected_htlcs_in); + tal_free(unconnected_htlcs_in); } #if DEVELOPER diff --git a/lightningd/peer_htlcs.h b/lightningd/peer_htlcs.h index da8e75623..f6634d12b 100644 --- a/lightningd/peer_htlcs.h +++ b/lightningd/peer_htlcs.h @@ -64,11 +64,11 @@ void onchain_fulfilled_htlc(struct channel *channel, void htlcs_notify_new_block(struct lightningd *ld, u32 height); -struct htlc_in_map *htlcs_reconnect(struct lightningd *ld, - struct htlc_in_map *htlcs_in, - struct htlc_out_map *htlcs_out); +/* Only defined if COMPAT_V061 */ +void fixup_htlcs_out(struct lightningd *ld); -void htlcs_resubmit(struct lightningd *ld, struct htlc_in_map *unprocessed); +void htlcs_resubmit(struct lightningd *ld, + struct htlc_in_map *unconnected_htlcs_in); /* For HTLCs which terminate here, invoice payment calls one of these. */ void fulfill_htlc(struct htlc_in *hin, const struct preimage *preimage); diff --git a/lightningd/test/run-find_my_abspath.c b/lightningd/test/run-find_my_abspath.c index c769c41d4..deb55d493 100644 --- a/lightningd/test/run-find_my_abspath.c +++ b/lightningd/test/run-find_my_abspath.c @@ -97,7 +97,8 @@ void hsm_init(struct lightningd *ld UNNEEDED) void htlcs_notify_new_block(struct lightningd *ld UNNEEDED, u32 height UNNEEDED) { fprintf(stderr, "htlcs_notify_new_block called!\n"); abort(); } /* Generated stub for htlcs_resubmit */ -void htlcs_resubmit(struct lightningd *ld UNNEEDED, struct htlc_in_map *unprocessed UNNEEDED) +void htlcs_resubmit(struct lightningd *ld UNNEEDED, + struct htlc_in_map *unconnected_htlcs_in UNNEEDED) { fprintf(stderr, "htlcs_resubmit called!\n"); abort(); } /* Generated stub for jsonrpc_listen */ void jsonrpc_listen(struct jsonrpc *rpc UNNEEDED, struct lightningd *ld UNNEEDED) diff --git a/lightningd/test/run-invoice-select-inchan.c b/lightningd/test/run-invoice-select-inchan.c index 8768e22eb..33e078df9 100644 --- a/lightningd/test/run-invoice-select-inchan.c +++ b/lightningd/test/run-invoice-select-inchan.c @@ -96,6 +96,9 @@ void fatal(const char *fmt UNNEEDED, ...) /* Generated stub for feature_is_set */ bool feature_is_set(const u8 *features UNNEEDED, size_t bit UNNEEDED) { fprintf(stderr, "feature_is_set called!\n"); abort(); } +/* Generated stub for fixup_htlcs_out */ +void fixup_htlcs_out(struct lightningd *ld UNNEEDED) +{ fprintf(stderr, "fixup_htlcs_out called!\n"); abort(); } /* Generated stub for fromwire_channel_dev_memleak_reply */ bool fromwire_channel_dev_memleak_reply(const void *p UNNEEDED, bool *leak UNNEEDED) { fprintf(stderr, "fromwire_channel_dev_memleak_reply called!\n"); abort(); } @@ -133,11 +136,6 @@ bool htlc_is_trimmed(enum side htlc_owner UNNEEDED, struct amount_sat dust_limit UNNEEDED, enum side side UNNEEDED) { fprintf(stderr, "htlc_is_trimmed called!\n"); abort(); } -/* Generated stub for htlcs_reconnect */ -struct htlc_in_map *htlcs_reconnect(struct lightningd *ld UNNEEDED, - struct htlc_in_map *htlcs_in UNNEEDED, - struct htlc_out_map *htlcs_out UNNEEDED) -{ fprintf(stderr, "htlcs_reconnect called!\n"); abort(); } /* Generated stub for json_add_address */ void json_add_address(struct json_stream *response UNNEEDED, const char *fieldname UNNEEDED, const struct wireaddr *addr UNNEEDED) @@ -504,12 +502,17 @@ void wallet_channeltxs_add(struct wallet *w UNNEEDED, struct channel *chan UNNEE const int type UNNEEDED, const struct bitcoin_txid *txid UNNEEDED, const u32 input_num UNNEEDED, const u32 blockheight UNNEEDED) { fprintf(stderr, "wallet_channeltxs_add called!\n"); abort(); } -/* Generated stub for wallet_htlcs_load_for_channel */ -bool wallet_htlcs_load_for_channel(struct wallet *wallet UNNEEDED, - struct channel *chan UNNEEDED, - struct htlc_in_map *htlcs_in UNNEEDED, - struct htlc_out_map *htlcs_out UNNEEDED) -{ fprintf(stderr, "wallet_htlcs_load_for_channel called!\n"); abort(); } +/* Generated stub for wallet_htlcs_load_in_for_channel */ +bool wallet_htlcs_load_in_for_channel(struct wallet *wallet UNNEEDED, + struct channel *chan UNNEEDED, + struct htlc_in_map *htlcs_in UNNEEDED) +{ fprintf(stderr, "wallet_htlcs_load_in_for_channel called!\n"); abort(); } +/* Generated stub for wallet_htlcs_load_out_for_channel */ +bool wallet_htlcs_load_out_for_channel(struct wallet *wallet UNNEEDED, + struct channel *chan UNNEEDED, + struct htlc_out_map *htlcs_out UNNEEDED, + struct htlc_in_map *remaining_htlcs_in UNNEEDED) +{ fprintf(stderr, "wallet_htlcs_load_out_for_channel called!\n"); abort(); } /* Generated stub for wallet_init_channels */ bool wallet_init_channels(struct wallet *w UNNEEDED) { fprintf(stderr, "wallet_init_channels called!\n"); abort(); } diff --git a/wallet/test/run-wallet.c b/wallet/test/run-wallet.c index 4fc199d9a..ca2c5b785 100644 --- a/wallet/test/run-wallet.c +++ b/wallet/test/run-wallet.c @@ -1156,7 +1156,7 @@ static bool test_htlc_crud(struct lightningd *ld, const tal_t *ctx) struct channel *chan = tal(ctx, struct channel); struct peer *peer = talz(ctx, struct peer); struct wallet *w = create_test_wallet(ld, ctx); - struct htlc_in_map *htlcs_in = tal(ctx, struct htlc_in_map); + struct htlc_in_map *htlcs_in = tal(ctx, struct htlc_in_map), *rem; struct htlc_out_map *htlcs_out = tal(ctx, struct htlc_out_map); /* Make sure we have our references correct */ @@ -1220,11 +1220,16 @@ static bool test_htlc_crud(struct lightningd *ld, const tal_t *ctx) db_begin_transaction(w->db); CHECK(!wallet_err); - CHECK_MSG(wallet_htlcs_load_for_channel(w, chan, htlcs_in, htlcs_out), - "Failed loading HTLCs"); + CHECK_MSG(wallet_htlcs_load_in_for_channel(w, chan, htlcs_in), + "Failed loading in HTLCs"); + /* Freed by htlcs_resubmit */ + rem = tal(NULL, struct htlc_in_map); + htlc_in_map_copy(rem, htlcs_in); + CHECK_MSG(wallet_htlcs_load_out_for_channel(w, chan, htlcs_out, rem), + "Failed loading out HTLCs"); db_commit_transaction(w->db); - htlcs_resubmit(w->ld, htlcs_reconnect(w->ld, htlcs_in, htlcs_out)); + htlcs_resubmit(w->ld, rem); CHECK(!wallet_err); hin = htlc_in_map_get(htlcs_in, &in.key); diff --git a/wallet/wallet.c b/wallet/wallet.c index 1af89b55d..3379c2b97 100644 --- a/wallet/wallet.c +++ b/wallet/wallet.c @@ -1647,7 +1647,6 @@ void wallet_htlc_save_out(struct wallet *wallet, /* We absolutely need the incoming HTLC to be persisted before * we can persist it's dependent */ assert(out->in == NULL || out->in->dbid != 0); - out->origin_htlc_id = out->in?out->in->dbid:0; stmt = db_prepare_v2( wallet->db, @@ -1770,8 +1769,11 @@ static bool wallet_stmt2htlc_in(struct channel *channel, return ok; } -static bool wallet_stmt2htlc_out(struct channel *channel, - struct db_stmt *stmt, struct htlc_out *out) +/* Removes matching htlc from unconnected_htlcs_in */ +static bool wallet_stmt2htlc_out(struct wallet *wallet, + struct channel *channel, + struct db_stmt *stmt, struct htlc_out *out, + struct htlc_in_map *unconnected_htlcs_in) { bool ok = true; out->dbid = db_column_u64(stmt, 0); @@ -1795,20 +1797,32 @@ static bool wallet_stmt2htlc_out(struct channel *channel, out->failuremsg = db_column_arr(out, stmt, 8, u8); out->failcode = db_column_int_or_default(stmt, 9, 0); + out->in = NULL; if (!db_column_is_null(stmt, 10)) { - out->origin_htlc_id = db_column_u64(stmt, 10); + u64 in_id = db_column_u64(stmt, 10); + struct htlc_in *hin; + + hin = remove_htlc_in_by_dbid(unconnected_htlcs_in, in_id); + if (hin) + htlc_out_connect_htlc_in(out, hin); out->am_origin = false; + if (!out->in && !out->preimage) { +#ifdef COMPAT_V061 + log_broken(wallet->log, + "Missing preimage for orphaned HTLC; replacing with zeros"); + out->preimage = talz(out, struct preimage); +#else + fatal("Unable to find corresponding htlc_in %"PRIu64 + " for unfulfilled htlc_out %"PRIu64, + in_id, out->dbid); +#endif + } } else { - out->origin_htlc_id = 0; out->partid = db_column_u64(stmt, 13); out->am_origin = true; } - /* Need to defer wiring until we can look up all incoming - * htlcs, will wire using origin_htlc_id */ - out->in = NULL; - return ok; } @@ -1849,16 +1863,15 @@ static void fixup_hin(struct wallet *wallet, struct htlc_in *hin) #endif } -bool wallet_htlcs_load_for_channel(struct wallet *wallet, - struct channel *chan, - struct htlc_in_map *htlcs_in, - struct htlc_out_map *htlcs_out) +bool wallet_htlcs_load_in_for_channel(struct wallet *wallet, + struct channel *chan, + struct htlc_in_map *htlcs_in) { struct db_stmt *stmt; bool ok = true; - int incount = 0, outcount = 0; + int incount = 0; - log_debug(wallet->log, "Loading HTLCs for channel %"PRIu64, chan->dbid); + log_debug(wallet->log, "Loading in HTLCs for channel %"PRIu64, chan->dbid); stmt = db_prepare_v2(wallet->db, SQL("SELECT" " id" ", channel_htlc_id" @@ -1892,6 +1905,19 @@ bool wallet_htlcs_load_for_channel(struct wallet *wallet, } tal_free(stmt); + log_debug(wallet->log, "Restored %d incoming HTLCS", incount); + return ok; +} + +bool wallet_htlcs_load_out_for_channel(struct wallet *wallet, + struct channel *chan, + struct htlc_out_map *htlcs_out, + struct htlc_in_map *unconnected_htlcs_in) +{ + struct db_stmt *stmt; + bool ok = true; + int outcount = 0; + stmt = db_prepare_v2(wallet->db, SQL("SELECT" " id" ", channel_htlc_id" @@ -1918,7 +1944,8 @@ bool wallet_htlcs_load_for_channel(struct wallet *wallet, while (db_step(stmt)) { struct htlc_out *out = tal(chan, struct htlc_out); - ok &= wallet_stmt2htlc_out(chan, stmt, out); + ok &= wallet_stmt2htlc_out(wallet, chan, stmt, out, + unconnected_htlcs_in); connect_htlc_out(htlcs_out, out); /* Cannot htlc_out_check because we haven't wired the * dependencies in yet */ @@ -1926,7 +1953,7 @@ bool wallet_htlcs_load_for_channel(struct wallet *wallet, } tal_free(stmt); - log_debug(wallet->log, "Restored %d incoming and %d outgoing HTLCS", incount, outcount); + log_debug(wallet->log, "Restored %d outgoing HTLCS", outcount); return ok; } diff --git a/wallet/wallet.h b/wallet/wallet.h index 927007748..9cfbda300 100644 --- a/wallet/wallet.h +++ b/wallet/wallet.h @@ -594,29 +594,35 @@ void wallet_htlc_update(struct wallet *wallet, const u64 htlc_dbid, enum onion_type failcode, const u8 *failuremsg); /** - * wallet_htlcs_load_for_channel - Load HTLCs associated with chan from DB. + * wallet_htlcs_load_in_for_channel - Load incoming HTLCs associated with chan from DB. * * @wallet: wallet to load from * @chan: load HTLCs associated with this channel * @htlcs_in: htlc_in_map to store loaded htlc_in in - * @htlcs_out: htlc_out_map to store loaded htlc_out in * - * This function looks for HTLCs that are associated with the given - * channel and loads them into the provided maps. One caveat is that - * the `struct htlc_out` instances are not wired up with the - * corresponding `struct htlc_in` in the forwarding case nor are they - * associated with a `struct pay_command` in the case we originated - * the payment. In the former case the corresponding `struct htlc_in` - * may not have been loaded yet. In the latter case the pay_command - * does not exist anymore since we restarted. - * - * Use `htlcs_reconnect` to wire htlc_out instances to the - * corresponding htlc_in after loading all channels. + * This function looks for incoming HTLCs that are associated with the given + * channel and loads them into the provided map. */ -bool wallet_htlcs_load_for_channel(struct wallet *wallet, - struct channel *chan, - struct htlc_in_map *htlcs_in, - struct htlc_out_map *htlcs_out); +bool wallet_htlcs_load_in_for_channel(struct wallet *wallet, + struct channel *chan, + struct htlc_in_map *htlcs_in); + +/** + * wallet_htlcs_load_out_for_channel - Load outgoing HTLCs associated with chan from DB. + * + * @wallet: wallet to load from + * @chan: load HTLCs associated with this channel + * @htlcs_out: htlc_out_map to store loaded htlc_out in. + * @remaining_htlcs_in: htlc_in_map with unconnected htlcs (removed as we progress) + * + * We populate htlc_out->in by looking up in remaining_htlcs_in. It's + * possible that it's still NULL, since we can have outgoing HTLCs + * outlive their corresponding incoming. + */ +bool wallet_htlcs_load_out_for_channel(struct wallet *wallet, + struct channel *chan, + struct htlc_out_map *htlcs_out, + struct htlc_in_map *remaining_htlcs_in); /** * wallet_announcement_save - Save remote announcement information with channel.