htlcs: remove origin_htlc_id from htlc_out.

This is a transient field, so rework things so we don't leave it in
struct htlc_out.  Instead, load htlc_in first and connect htlc_out to
them as we go.

This also changes one place where we use it instead of the am_origin
flag.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell 2019-12-12 10:09:10 +10:30 committed by Christian Decker
parent 345ca9b122
commit 12985331f7
11 changed files with 170 additions and 148 deletions

View File

@ -28,6 +28,22 @@ struct htlc_in *find_htlc_in(const struct htlc_in_map *map,
return htlc_in_map_get(map, &key);
}
struct htlc_in *remove_htlc_in_by_dbid(struct htlc_in_map *remaining_htlcs_in,
u64 dbid)
{
struct htlc_in *hin;
struct htlc_in_map_iter ini;
for (hin = htlc_in_map_first(remaining_htlcs_in, &ini); hin;
hin = htlc_in_map_next(remaining_htlcs_in, &ini)) {
if (hin->dbid == dbid) {
htlc_in_map_del(remaining_htlcs_in, hin);
return hin;
}
}
return NULL;
}
static void destroy_htlc_in(struct htlc_in *hend, struct htlc_in_map *map)
{
htlc_in_map_del(map, hend);

View File

@ -58,7 +58,6 @@ struct htlc_out {
* is saved to the database, must be >0 after saving to the
* database. */
u64 dbid;
u64 origin_htlc_id;
struct htlc_key key;
struct amount_msat msat;
u32 cltv_expiry;
@ -123,6 +122,10 @@ struct htlc_in *find_htlc_in(const struct htlc_in_map *map,
const struct channel *channel,
u64 htlc_id);
/* FIXME: Slow function only used at startup. */
struct htlc_in *remove_htlc_in_by_dbid(struct htlc_in_map *remaining_htlcs_in,
u64 dbid);
struct htlc_out *find_htlc_out(const struct htlc_out_map *map,
const struct channel *channel,
u64 htlc_id);

View File

@ -630,7 +630,7 @@ int main(int argc, char *argv[])
int stop_fd;
struct timers *timers;
const char *stop_response;
struct htlc_in_map *unprocessed_htlcs;
struct htlc_in_map *unconnected_htlcs_in;
struct rlimit nofile = {1024, 1024};
/*~ Make sure that we limit ourselves to something reasonable. Modesty
@ -778,7 +778,7 @@ int main(int argc, char *argv[])
* topology is initialized since some decisions rely on being able to
* know the blockheight. */
db_begin_transaction(ld->wallet->db);
unprocessed_htlcs = load_channels_from_wallet(ld);
unconnected_htlcs_in = load_channels_from_wallet(ld);
db_commit_transaction(ld->wallet->db);
/*~ Create RPC socket: now lightning-cli can send us JSON RPC commands
@ -792,7 +792,7 @@ int main(int argc, char *argv[])
/*~ Process any HTLCs we were in the middle of when we exited, now
* that plugins (who might want to know via htlc_accepted hook) are
* active. */
htlcs_resubmit(ld, unprocessed_htlcs);
htlcs_resubmit(ld, unconnected_htlcs_in);
/*~ Activate connect daemon. Needs to be after the initialization of
* chaintopology, otherwise peers may connect and ask for

View File

@ -1579,27 +1579,47 @@ void activate_peers(struct lightningd *ld)
struct htlc_in_map *load_channels_from_wallet(struct lightningd *ld)
{
struct peer *peer;
struct htlc_in_map *unconnected_htlcs_in = tal(ld, struct htlc_in_map);
/* Load channels from database */
if (!wallet_init_channels(ld->wallet))
fatal("Could not load channels from the database");
/* This is a poor-man's db join :( */
/* First we load the incoming htlcs */
list_for_each(&ld->peers, peer, list) {
struct channel *channel;
list_for_each(&peer->channels, channel, list) {
if (!wallet_htlcs_load_for_channel(ld->wallet,
channel,
&ld->htlcs_in,
&ld->htlcs_out)) {
if (!wallet_htlcs_load_in_for_channel(ld->wallet,
channel,
&ld->htlcs_in)) {
fatal("could not load htlcs for channel");
}
}
}
/* Now connect HTLC pointers together */
return htlcs_reconnect(ld, &ld->htlcs_in, &ld->htlcs_out);
/* Make a copy of the htlc_map: entries removed as they're matched */
htlc_in_map_copy(unconnected_htlcs_in, &ld->htlcs_in);
/* Now we load the outgoing HTLCs, so we can connect them. */
list_for_each(&ld->peers, peer, list) {
struct channel *channel;
list_for_each(&peer->channels, channel, list) {
if (!wallet_htlcs_load_out_for_channel(ld->wallet,
channel,
&ld->htlcs_out,
unconnected_htlcs_in)) {
fatal("could not load outgoing htlcs for channel");
}
}
}
#ifdef COMPAT_V061
fixup_htlcs_out(ld);
#endif /* COMPAT_V061 */
return unconnected_htlcs_in;
}
static struct command_result *json_disconnect(struct command *cmd,

View File

@ -1274,7 +1274,7 @@ static bool update_out_htlc(struct channel *channel,
}
/* For our own HTLCs, we commit payment to db lazily */
if (hout->origin_htlc_id == 0)
if (hout->am_origin)
payment_store(ld,
&hout->payment_hash, hout->partid);
}
@ -2089,94 +2089,35 @@ static void fixup_hout(struct lightningd *ld, struct htlc_out *hout)
&hout->key.channel->peer->id),
fix);
}
void fixup_htlcs_out(struct lightningd *ld)
{
struct htlc_out_map_iter outi;
struct htlc_out *hout;
for (hout = htlc_out_map_first(&ld->htlcs_out, &outi);
hout;
hout = htlc_out_map_next(&ld->htlcs_out, &outi)) {
if (!hout->am_origin)
fixup_hout(ld, hout);
}
}
#endif /* COMPAT_V061 */
/**
* htlcs_reconnect -- Link outgoing HTLCs to their origins after initial db load
*
* For each outgoing HTLC find the incoming HTLC that triggered it. If
* we are the origin of the transfer then we cannot resolve the
* incoming HTLC in which case we just leave it `NULL`.
*
* Returns a map of any htlcs we need to retry.
*/
struct htlc_in_map *htlcs_reconnect(struct lightningd *ld,
struct htlc_in_map *htlcs_in,
struct htlc_out_map *htlcs_out)
{
struct htlc_in_map_iter ini;
struct htlc_out_map_iter outi;
struct htlc_in *hin;
struct htlc_out *hout;
struct htlc_in_map *unprocessed = tal(NULL, struct htlc_in_map);
/* Any HTLCs which happened to be incoming and weren't forwarded before
* we shutdown/crashed: fail them now.
*
* Note that since we do local processing synchronously, so this never
* captures local payments. But if it did, it would be a tiny corner
* case. */
htlc_in_map_init(unprocessed);
for (hin = htlc_in_map_first(htlcs_in, &ini); hin;
hin = htlc_in_map_next(htlcs_in, &ini)) {
if (hin->hstate == RCVD_ADD_ACK_REVOCATION)
htlc_in_map_add(unprocessed, hin);
}
for (hout = htlc_out_map_first(htlcs_out, &outi); hout;
hout = htlc_out_map_next(htlcs_out, &outi)) {
if (hout->am_origin) {
continue;
}
/* For fulfilled HTLCs, we fulfill incoming before outgoing is
* completely resolved, so it's possible that we don't find
* the incoming. */
for (hin = htlc_in_map_first(htlcs_in, &ini); hin;
hin = htlc_in_map_next(htlcs_in, &ini)) {
if (hout->origin_htlc_id == hin->dbid) {
log_debug(ld->log,
"Found corresponding htlc_in %" PRIu64
" for htlc_out %" PRIu64,
hin->dbid, hout->dbid);
htlc_out_connect_htlc_in(hout, hin);
break;
}
}
if (!hout->in && !hout->preimage) {
#ifdef COMPAT_V061
log_broken(ld->log,
"Missing preimage for orphaned HTLC; replacing with zeros");
hout->preimage = talz(hout, struct preimage);
#else
fatal("Unable to find corresponding htlc_in %"PRIu64
" for unfulfilled htlc_out %"PRIu64,
hout->origin_htlc_id, hout->dbid);
#endif
}
#ifdef COMPAT_V061
fixup_hout(ld, hout);
#endif
if (hout->in)
htlc_in_map_del(unprocessed, hout->in);
}
return unprocessed;
}
void htlcs_resubmit(struct lightningd *ld, struct htlc_in_map *unprocessed)
void htlcs_resubmit(struct lightningd *ld,
struct htlc_in_map *unconnected_htlcs_in)
{
struct htlc_in *hin;
struct htlc_in_map_iter ini;
enum onion_type failcode COMPILER_WANTS_INIT("gcc7.4.0 bad, 8.3 OK");
/* Now fail any which were stuck. */
for (hin = htlc_in_map_first(unprocessed, &ini);
/* Now retry any which were stuck. */
for (hin = htlc_in_map_first(unconnected_htlcs_in, &ini);
hin;
hin = htlc_in_map_next(unprocessed, &ini)) {
hin = htlc_in_map_next(unconnected_htlcs_in, &ini)) {
if (hin->hstate != RCVD_ADD_ACK_REVOCATION)
continue;
log_unusual(hin->key.channel->log,
"Replaying old unprocessed HTLC #%"PRIu64,
hin->key.id);
@ -2192,8 +2133,8 @@ void htlcs_resubmit(struct lightningd *ld, struct htlc_in_map *unprocessed)
}
/* Don't leak memory! */
htlc_in_map_clear(unprocessed);
tal_free(unprocessed);
htlc_in_map_clear(unconnected_htlcs_in);
tal_free(unconnected_htlcs_in);
}
#if DEVELOPER

View File

@ -64,11 +64,11 @@ void onchain_fulfilled_htlc(struct channel *channel,
void htlcs_notify_new_block(struct lightningd *ld, u32 height);
struct htlc_in_map *htlcs_reconnect(struct lightningd *ld,
struct htlc_in_map *htlcs_in,
struct htlc_out_map *htlcs_out);
/* Only defined if COMPAT_V061 */
void fixup_htlcs_out(struct lightningd *ld);
void htlcs_resubmit(struct lightningd *ld, struct htlc_in_map *unprocessed);
void htlcs_resubmit(struct lightningd *ld,
struct htlc_in_map *unconnected_htlcs_in);
/* For HTLCs which terminate here, invoice payment calls one of these. */
void fulfill_htlc(struct htlc_in *hin, const struct preimage *preimage);

View File

@ -97,7 +97,8 @@ void hsm_init(struct lightningd *ld UNNEEDED)
void htlcs_notify_new_block(struct lightningd *ld UNNEEDED, u32 height UNNEEDED)
{ fprintf(stderr, "htlcs_notify_new_block called!\n"); abort(); }
/* Generated stub for htlcs_resubmit */
void htlcs_resubmit(struct lightningd *ld UNNEEDED, struct htlc_in_map *unprocessed UNNEEDED)
void htlcs_resubmit(struct lightningd *ld UNNEEDED,
struct htlc_in_map *unconnected_htlcs_in UNNEEDED)
{ fprintf(stderr, "htlcs_resubmit called!\n"); abort(); }
/* Generated stub for jsonrpc_listen */
void jsonrpc_listen(struct jsonrpc *rpc UNNEEDED, struct lightningd *ld UNNEEDED)

View File

@ -96,6 +96,9 @@ void fatal(const char *fmt UNNEEDED, ...)
/* Generated stub for feature_is_set */
bool feature_is_set(const u8 *features UNNEEDED, size_t bit UNNEEDED)
{ fprintf(stderr, "feature_is_set called!\n"); abort(); }
/* Generated stub for fixup_htlcs_out */
void fixup_htlcs_out(struct lightningd *ld UNNEEDED)
{ fprintf(stderr, "fixup_htlcs_out called!\n"); abort(); }
/* Generated stub for fromwire_channel_dev_memleak_reply */
bool fromwire_channel_dev_memleak_reply(const void *p UNNEEDED, bool *leak UNNEEDED)
{ fprintf(stderr, "fromwire_channel_dev_memleak_reply called!\n"); abort(); }
@ -133,11 +136,6 @@ bool htlc_is_trimmed(enum side htlc_owner UNNEEDED,
struct amount_sat dust_limit UNNEEDED,
enum side side UNNEEDED)
{ fprintf(stderr, "htlc_is_trimmed called!\n"); abort(); }
/* Generated stub for htlcs_reconnect */
struct htlc_in_map *htlcs_reconnect(struct lightningd *ld UNNEEDED,
struct htlc_in_map *htlcs_in UNNEEDED,
struct htlc_out_map *htlcs_out UNNEEDED)
{ fprintf(stderr, "htlcs_reconnect called!\n"); abort(); }
/* Generated stub for json_add_address */
void json_add_address(struct json_stream *response UNNEEDED, const char *fieldname UNNEEDED,
const struct wireaddr *addr UNNEEDED)
@ -504,12 +502,17 @@ void wallet_channeltxs_add(struct wallet *w UNNEEDED, struct channel *chan UNNEE
const int type UNNEEDED, const struct bitcoin_txid *txid UNNEEDED,
const u32 input_num UNNEEDED, const u32 blockheight UNNEEDED)
{ fprintf(stderr, "wallet_channeltxs_add called!\n"); abort(); }
/* Generated stub for wallet_htlcs_load_for_channel */
bool wallet_htlcs_load_for_channel(struct wallet *wallet UNNEEDED,
struct channel *chan UNNEEDED,
struct htlc_in_map *htlcs_in UNNEEDED,
struct htlc_out_map *htlcs_out UNNEEDED)
{ fprintf(stderr, "wallet_htlcs_load_for_channel called!\n"); abort(); }
/* Generated stub for wallet_htlcs_load_in_for_channel */
bool wallet_htlcs_load_in_for_channel(struct wallet *wallet UNNEEDED,
struct channel *chan UNNEEDED,
struct htlc_in_map *htlcs_in UNNEEDED)
{ fprintf(stderr, "wallet_htlcs_load_in_for_channel called!\n"); abort(); }
/* Generated stub for wallet_htlcs_load_out_for_channel */
bool wallet_htlcs_load_out_for_channel(struct wallet *wallet UNNEEDED,
struct channel *chan UNNEEDED,
struct htlc_out_map *htlcs_out UNNEEDED,
struct htlc_in_map *remaining_htlcs_in UNNEEDED)
{ fprintf(stderr, "wallet_htlcs_load_out_for_channel called!\n"); abort(); }
/* Generated stub for wallet_init_channels */
bool wallet_init_channels(struct wallet *w UNNEEDED)
{ fprintf(stderr, "wallet_init_channels called!\n"); abort(); }

View File

@ -1156,7 +1156,7 @@ static bool test_htlc_crud(struct lightningd *ld, const tal_t *ctx)
struct channel *chan = tal(ctx, struct channel);
struct peer *peer = talz(ctx, struct peer);
struct wallet *w = create_test_wallet(ld, ctx);
struct htlc_in_map *htlcs_in = tal(ctx, struct htlc_in_map);
struct htlc_in_map *htlcs_in = tal(ctx, struct htlc_in_map), *rem;
struct htlc_out_map *htlcs_out = tal(ctx, struct htlc_out_map);
/* Make sure we have our references correct */
@ -1220,11 +1220,16 @@ static bool test_htlc_crud(struct lightningd *ld, const tal_t *ctx)
db_begin_transaction(w->db);
CHECK(!wallet_err);
CHECK_MSG(wallet_htlcs_load_for_channel(w, chan, htlcs_in, htlcs_out),
"Failed loading HTLCs");
CHECK_MSG(wallet_htlcs_load_in_for_channel(w, chan, htlcs_in),
"Failed loading in HTLCs");
/* Freed by htlcs_resubmit */
rem = tal(NULL, struct htlc_in_map);
htlc_in_map_copy(rem, htlcs_in);
CHECK_MSG(wallet_htlcs_load_out_for_channel(w, chan, htlcs_out, rem),
"Failed loading out HTLCs");
db_commit_transaction(w->db);
htlcs_resubmit(w->ld, htlcs_reconnect(w->ld, htlcs_in, htlcs_out));
htlcs_resubmit(w->ld, rem);
CHECK(!wallet_err);
hin = htlc_in_map_get(htlcs_in, &in.key);

View File

@ -1647,7 +1647,6 @@ void wallet_htlc_save_out(struct wallet *wallet,
/* We absolutely need the incoming HTLC to be persisted before
* we can persist it's dependent */
assert(out->in == NULL || out->in->dbid != 0);
out->origin_htlc_id = out->in?out->in->dbid:0;
stmt = db_prepare_v2(
wallet->db,
@ -1770,8 +1769,11 @@ static bool wallet_stmt2htlc_in(struct channel *channel,
return ok;
}
static bool wallet_stmt2htlc_out(struct channel *channel,
struct db_stmt *stmt, struct htlc_out *out)
/* Removes matching htlc from unconnected_htlcs_in */
static bool wallet_stmt2htlc_out(struct wallet *wallet,
struct channel *channel,
struct db_stmt *stmt, struct htlc_out *out,
struct htlc_in_map *unconnected_htlcs_in)
{
bool ok = true;
out->dbid = db_column_u64(stmt, 0);
@ -1795,20 +1797,32 @@ static bool wallet_stmt2htlc_out(struct channel *channel,
out->failuremsg = db_column_arr(out, stmt, 8, u8);
out->failcode = db_column_int_or_default(stmt, 9, 0);
out->in = NULL;
if (!db_column_is_null(stmt, 10)) {
out->origin_htlc_id = db_column_u64(stmt, 10);
u64 in_id = db_column_u64(stmt, 10);
struct htlc_in *hin;
hin = remove_htlc_in_by_dbid(unconnected_htlcs_in, in_id);
if (hin)
htlc_out_connect_htlc_in(out, hin);
out->am_origin = false;
if (!out->in && !out->preimage) {
#ifdef COMPAT_V061
log_broken(wallet->log,
"Missing preimage for orphaned HTLC; replacing with zeros");
out->preimage = talz(out, struct preimage);
#else
fatal("Unable to find corresponding htlc_in %"PRIu64
" for unfulfilled htlc_out %"PRIu64,
in_id, out->dbid);
#endif
}
} else {
out->origin_htlc_id = 0;
out->partid = db_column_u64(stmt, 13);
out->am_origin = true;
}
/* Need to defer wiring until we can look up all incoming
* htlcs, will wire using origin_htlc_id */
out->in = NULL;
return ok;
}
@ -1849,16 +1863,15 @@ static void fixup_hin(struct wallet *wallet, struct htlc_in *hin)
#endif
}
bool wallet_htlcs_load_for_channel(struct wallet *wallet,
struct channel *chan,
struct htlc_in_map *htlcs_in,
struct htlc_out_map *htlcs_out)
bool wallet_htlcs_load_in_for_channel(struct wallet *wallet,
struct channel *chan,
struct htlc_in_map *htlcs_in)
{
struct db_stmt *stmt;
bool ok = true;
int incount = 0, outcount = 0;
int incount = 0;
log_debug(wallet->log, "Loading HTLCs for channel %"PRIu64, chan->dbid);
log_debug(wallet->log, "Loading in HTLCs for channel %"PRIu64, chan->dbid);
stmt = db_prepare_v2(wallet->db, SQL("SELECT"
" id"
", channel_htlc_id"
@ -1892,6 +1905,19 @@ bool wallet_htlcs_load_for_channel(struct wallet *wallet,
}
tal_free(stmt);
log_debug(wallet->log, "Restored %d incoming HTLCS", incount);
return ok;
}
bool wallet_htlcs_load_out_for_channel(struct wallet *wallet,
struct channel *chan,
struct htlc_out_map *htlcs_out,
struct htlc_in_map *unconnected_htlcs_in)
{
struct db_stmt *stmt;
bool ok = true;
int outcount = 0;
stmt = db_prepare_v2(wallet->db, SQL("SELECT"
" id"
", channel_htlc_id"
@ -1918,7 +1944,8 @@ bool wallet_htlcs_load_for_channel(struct wallet *wallet,
while (db_step(stmt)) {
struct htlc_out *out = tal(chan, struct htlc_out);
ok &= wallet_stmt2htlc_out(chan, stmt, out);
ok &= wallet_stmt2htlc_out(wallet, chan, stmt, out,
unconnected_htlcs_in);
connect_htlc_out(htlcs_out, out);
/* Cannot htlc_out_check because we haven't wired the
* dependencies in yet */
@ -1926,7 +1953,7 @@ bool wallet_htlcs_load_for_channel(struct wallet *wallet,
}
tal_free(stmt);
log_debug(wallet->log, "Restored %d incoming and %d outgoing HTLCS", incount, outcount);
log_debug(wallet->log, "Restored %d outgoing HTLCS", outcount);
return ok;
}

View File

@ -594,29 +594,35 @@ void wallet_htlc_update(struct wallet *wallet, const u64 htlc_dbid,
enum onion_type failcode, const u8 *failuremsg);
/**
* wallet_htlcs_load_for_channel - Load HTLCs associated with chan from DB.
* wallet_htlcs_load_in_for_channel - Load incoming HTLCs associated with chan from DB.
*
* @wallet: wallet to load from
* @chan: load HTLCs associated with this channel
* @htlcs_in: htlc_in_map to store loaded htlc_in in
* @htlcs_out: htlc_out_map to store loaded htlc_out in
*
* This function looks for HTLCs that are associated with the given
* channel and loads them into the provided maps. One caveat is that
* the `struct htlc_out` instances are not wired up with the
* corresponding `struct htlc_in` in the forwarding case nor are they
* associated with a `struct pay_command` in the case we originated
* the payment. In the former case the corresponding `struct htlc_in`
* may not have been loaded yet. In the latter case the pay_command
* does not exist anymore since we restarted.
*
* Use `htlcs_reconnect` to wire htlc_out instances to the
* corresponding htlc_in after loading all channels.
* This function looks for incoming HTLCs that are associated with the given
* channel and loads them into the provided map.
*/
bool wallet_htlcs_load_for_channel(struct wallet *wallet,
struct channel *chan,
struct htlc_in_map *htlcs_in,
struct htlc_out_map *htlcs_out);
bool wallet_htlcs_load_in_for_channel(struct wallet *wallet,
struct channel *chan,
struct htlc_in_map *htlcs_in);
/**
* wallet_htlcs_load_out_for_channel - Load outgoing HTLCs associated with chan from DB.
*
* @wallet: wallet to load from
* @chan: load HTLCs associated with this channel
* @htlcs_out: htlc_out_map to store loaded htlc_out in.
* @remaining_htlcs_in: htlc_in_map with unconnected htlcs (removed as we progress)
*
* We populate htlc_out->in by looking up in remaining_htlcs_in. It's
* possible that it's still NULL, since we can have outgoing HTLCs
* outlive their corresponding incoming.
*/
bool wallet_htlcs_load_out_for_channel(struct wallet *wallet,
struct channel *chan,
struct htlc_out_map *htlcs_out,
struct htlc_in_map *remaining_htlcs_in);
/**
* wallet_announcement_save - Save remote announcement information with channel.