From 7f5fe523205e4fb86324a19fd750b2864c95cf68 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Wed, 31 Jan 2024 14:55:33 +1030 Subject: [PATCH] gossipd: remove online gossip_store compaction. It was an obscure dev command, as it never worked reliably. It would be much easier to re-implement once this is done. This turned out to reveal a tiny leak on tests/test_gossip.py::test_gossip_store_load_amount_truncated where we didn't immedately free chan_ann if it was dangling. Signed-off-by: Rusty Russell --- gossipd/gossip_store.c | 263 +----------------------------------- gossipd/gossip_store.h | 3 - gossipd/gossipd.c | 16 --- gossipd/gossipd_wire.csv | 7 - lightningd/gossip_control.c | 46 ------- tests/test_gossip.py | 39 ------ tools/bench-gossipd.sh | 8 +- 7 files changed, 5 insertions(+), 377 deletions(-) diff --git a/gossipd/gossip_store.c b/gossipd/gossip_store.c index f4f581a73..6b47adbdc 100644 --- a/gossipd/gossip_store.c +++ b/gossipd/gossip_store.c @@ -29,19 +29,10 @@ struct gossip_store { /* Offset of current EOF */ u64 len; - /* Counters for entries in the gossip_store entries. This is used to - * decide whether we should rewrite the on-disk store or not. - * Note: count includes deleted. */ - size_t count, deleted; - /* Handle to the routing_state to retrieve additional information, * should it be needed */ struct routing_state *rstate; - /* Disable compaction if we encounter an error during a prior - * compaction */ - bool disable_compaction; - /* Timestamp of store when we opened it (0 if we created it) */ u32 timestamp; }; @@ -326,7 +317,6 @@ close_old: struct gossip_store *gossip_store_new(struct routing_state *rstate) { struct gossip_store *gs = tal(rstate, struct gossip_store); - gs->count = gs->deleted = 0; gs->writable = true; gs->timestamp = gossip_store_compact_offline(rstate); gs->fd = open(GOSSIP_STORE_FILENAME, O_RDWR|O_CREAT, 0600); @@ -335,7 +325,6 @@ struct gossip_store *gossip_store_new(struct routing_state *rstate) "Opening gossip_store store: %s", strerror(errno)); gs->rstate = rstate; - gs->disable_compaction = false; gs->len = sizeof(gs->version); tal_add_destructor(gs, gossip_store_destroy); @@ -368,244 +357,6 @@ struct gossip_store *gossip_store_new(struct routing_state *rstate) return gs; } -/* Returns bytes transferred, or 0 on error */ -static size_t transfer_store_msg(int from_fd, size_t from_off, - int to_fd, size_t to_off, - int *type) -{ - struct gossip_hdr hdr; - u16 flags, msglen; - u8 *msg; - const u8 *p; - size_t tmplen; - - *type = -1; - if (pread(from_fd, &hdr, sizeof(hdr), from_off) != sizeof(hdr)) { - status_broken("Failed reading header from to gossip store @%zu" - ": %s", - from_off, strerror(errno)); - return 0; - } - - flags = be16_to_cpu(hdr.flags); - if (flags & GOSSIP_STORE_DELETED_BIT) { - status_broken("Can't transfer deleted msg from gossip store @%zu", - from_off); - return 0; - } - - msglen = be16_to_cpu(hdr.len); - - /* FIXME: Reuse buffer? */ - msg = tal_arr(tmpctx, u8, sizeof(hdr) + msglen); - memcpy(msg, &hdr, sizeof(hdr)); - if (pread(from_fd, msg + sizeof(hdr), msglen, from_off + sizeof(hdr)) - != msglen) { - status_broken("Failed reading %u from to gossip store @%zu" - ": %s", - msglen, from_off, strerror(errno)); - return 0; - } - - if (pwrite(to_fd, msg, msglen + sizeof(hdr), to_off) - != msglen + sizeof(hdr)) { - status_broken("Failed writing to gossip store: %s", - strerror(errno)); - return 0; - } - - /* Can't use peektype here, since we have header on front */ - p = msg + sizeof(hdr); - tmplen = msglen; - *type = fromwire_u16(&p, &tmplen); - if (!p) - *type = -1; - tal_free(msg); - return sizeof(hdr) + msglen; -} - -/* We keep a htable map of old gossip_store offsets to new ones. */ -struct offset_map { - size_t from, to; -}; - -static size_t offset_map_key(const struct offset_map *omap) -{ - return omap->from; -} - -static size_t hash_offset(size_t from) -{ - /* Crappy fast hash is "good enough" */ - return (from >> 5) ^ from; -} - -static bool offset_map_eq(const struct offset_map *omap, const size_t from) -{ - return omap->from == from; -} -HTABLE_DEFINE_TYPE(struct offset_map, - offset_map_key, hash_offset, offset_map_eq, offmap); - -static void move_broadcast(struct offmap *offmap, - struct broadcastable *bcast, - const char *what) -{ - struct offset_map *omap; - - if (!bcast->index) - return; - - omap = offmap_get(offmap, bcast->index); - if (!omap) - status_failed(STATUS_FAIL_INTERNAL_ERROR, - "Could not relocate %s at offset %u", - what, bcast->index); - bcast->index = omap->to; - offmap_del(offmap, omap); -} - -/** - * Rewrite the on-disk gossip store, compacting it along the way - * - * Creates a new file, writes all the updates from the `broadcast_state`, and - * then atomically swaps the files. - */ -bool gossip_store_compact(struct gossip_store *gs) -{ - size_t count = 0, deleted = 0; - int fd; - u64 off, len = sizeof(gs->version), idx; - struct offmap *offmap; - struct gossip_hdr hdr; - struct offmap_iter oit; - struct node_map_iter nit; - struct offset_map *omap; - - if (gs->disable_compaction) - return false; - - status_debug( - "Compacting gossip_store with %zu entries, %zu of which are stale", - gs->count, gs->deleted); - - fd = open(GOSSIP_STORE_TEMP_FILENAME, O_RDWR|O_TRUNC|O_CREAT, 0600); - - if (fd < 0) { - status_broken( - "Could not open file for gossip_store compaction"); - goto disable; - } - - if (write(fd, &gs->version, sizeof(gs->version)) - != sizeof(gs->version)) { - status_broken("Writing version to store: %s", strerror(errno)); - goto unlink_disable; - } - - /* Walk old file, copy everything and remember new offsets. */ - offmap = tal(tmpctx, struct offmap); - offmap_init_sized(offmap, gs->count); - - /* Start by writing all channel announcements and updates. */ - off = 1; - while (pread(gs->fd, &hdr, sizeof(hdr), off) == sizeof(hdr)) { - u16 msglen; - u32 wlen; - int msgtype; - - msglen = be16_to_cpu(hdr.len); - if (be16_to_cpu(hdr.flags) & GOSSIP_STORE_DELETED_BIT) { - off += sizeof(hdr) + msglen; - deleted++; - continue; - } - - count++; - wlen = transfer_store_msg(gs->fd, off, fd, len, &msgtype); - if (wlen == 0) - goto unlink_disable; - - /* We track location of all these message types. */ - if (msgtype == WIRE_CHANNEL_ANNOUNCEMENT - || msgtype == WIRE_CHANNEL_UPDATE - || msgtype == WIRE_NODE_ANNOUNCEMENT) { - omap = tal(offmap, struct offset_map); - omap->from = off; - omap->to = len; - offmap_add(offmap, omap); - } - len += wlen; - off += wlen; - } - - /* OK, now we've written file successfully, we can move broadcasts. */ - /* Remap node announcements. */ - for (struct node *n = node_map_first(gs->rstate->nodes, &nit); - n; - n = node_map_next(gs->rstate->nodes, &nit)) { - move_broadcast(offmap, &n->bcast, "node_announce"); - } - - /* Remap channel announcements and updates */ - for (struct chan *c = uintmap_first(&gs->rstate->chanmap, &idx); - c; - c = uintmap_after(&gs->rstate->chanmap, &idx)) { - move_broadcast(offmap, &c->bcast, "channel_announce"); - move_broadcast(offmap, &c->half[0].bcast, "channel_update"); - move_broadcast(offmap, &c->half[1].bcast, "channel_update"); - } - - /* That should be everything. */ - omap = offmap_first(offmap, &oit); - if (omap) - status_failed(STATUS_FAIL_INTERNAL_ERROR, - "gossip_store: Entry at %zu->%zu not updated?", - omap->from, omap->to); - - if (count != gs->count - gs->deleted) - status_failed(STATUS_FAIL_INTERNAL_ERROR, - "gossip_store: Expected %zu msgs in new" - " gossip store, got %zu", - gs->count - gs->deleted, count); - - if (deleted != gs->deleted) - status_failed(STATUS_FAIL_INTERNAL_ERROR, - "gossip_store: Expected %zu deleted msgs in old" - " gossip store, got %zu", - gs->deleted, deleted); - - if (rename(GOSSIP_STORE_TEMP_FILENAME, GOSSIP_STORE_FILENAME) == -1) - status_failed(STATUS_FAIL_INTERNAL_ERROR, - "Error swapping compacted gossip_store into place:" - " %s", - strerror(errno)); - - status_debug( - "Compaction completed: dropped %zu messages, new count %zu, len %"PRIu64, - deleted, count, len); - - /* Write end marker now new one is ready */ - append_msg(gs->fd, towire_gossip_store_ended(tmpctx, len), - 0, false, false, false, &gs->len); - - gs->count = count; - gs->deleted = 0; - gs->len = len; - close(gs->fd); - gs->fd = fd; - - return true; - -unlink_disable: - unlink(GOSSIP_STORE_TEMP_FILENAME); -disable: - status_debug("Encountered an error while compacting, disabling " - "future compactions."); - gs->disable_compaction = true; - return false; -} - u64 gossip_store_add(struct gossip_store *gs, const u8 *gossip_msg, u32 timestamp, bool zombie, bool spam, bool dying, const u8 *addendum) @@ -626,9 +377,6 @@ u64 gossip_store_add(struct gossip_store *gs, const u8 *gossip_msg, return 0; } - gs->count++; - if (addendum) - gs->count++; return off; } @@ -708,7 +456,6 @@ static u32 flag_by_index(struct gossip_store *gs, u32 index, int flag, int type) status_failed(STATUS_FAIL_INTERNAL_ERROR, "Failed writing flags to delete @%u: %s", index, strerror(errno)); - gs->deleted++; return index + sizeof(struct gossip_hdr) + be16_to_cpu(hdr.belen); } @@ -916,6 +663,7 @@ u32 gossip_store_load(struct routing_state *rstate, struct gossip_store *gs) const char *bad; size_t stats[] = {0, 0, 0, 0}; struct timeabs start = time_now(); + size_t deleted = 0; u8 *chan_ann = NULL; u64 chan_ann_off = 0; /* Spurious gcc-9 (Ubuntu 9-20190402-1ubuntu1) 9.0.1 20190402 (experimental) warning */ @@ -940,9 +688,7 @@ u32 gossip_store_load(struct routing_state *rstate, struct gossip_store *gs) /* Skip deleted entries */ if (be16_to_cpu(hdr.flags) & GOSSIP_STORE_DELETED_BIT) { - /* Count includes deleted! */ - gs->count++; - gs->deleted++; + deleted++; goto next; } spam = (be16_to_cpu(hdr.flags) & GOSSIP_STORE_RATELIMIT_BIT); @@ -1013,13 +759,13 @@ u32 gossip_store_load(struct routing_state *rstate, struct gossip_store *gs) goto badmsg; } - gs->count++; next: gs->len += sizeof(hdr) + msglen; clean_tmpctx(); } if (chan_ann) { + tal_free(chan_ann); bad = "dangling channel_announcement"; goto corrupt; } @@ -1045,7 +791,6 @@ corrupt: status_failed(STATUS_FAIL_INTERNAL_ERROR, "Truncating new store file: %s", strerror(errno)); remove_all_gossip(rstate); - gs->count = gs->deleted = 0; gs->len = 1; gs->timestamp = 0; out: @@ -1053,7 +798,7 @@ out: status_debug("total store load time: %"PRIu64" msec", time_to_msec(time_between(time_now(), start))); status_debug("gossip_store: Read %zu/%zu/%zu/%zu cannounce/cupdate/nannounce/cdelete from store (%zu deleted) in %"PRIu64" bytes", - stats[0], stats[1], stats[2], stats[3], gs->deleted, + stats[0], stats[1], stats[2], stats[3], deleted, gs->len); return gs->timestamp; diff --git a/gossipd/gossip_store.h b/gossipd/gossip_store.h index 4f86a5b60..8a4752ba0 100644 --- a/gossipd/gossip_store.h +++ b/gossipd/gossip_store.h @@ -126,9 +126,6 @@ u32 gossip_store_get_timestamp(struct gossip_store *gs, u64 offset); */ void gossip_store_set_timestamp(struct gossip_store *gs, u64 offset, u32 timestamp); -/* Exposed for dev-compact-gossip-store to force compaction. */ -bool gossip_store_compact(struct gossip_store *gs); - /** * Get a readonly fd for the gossip_store. * @gs: the gossip store. diff --git a/gossipd/gossipd.c b/gossipd/gossipd.c index 866cf77c7..c8a1adc48 100644 --- a/gossipd/gossipd.c +++ b/gossipd/gossipd.c @@ -745,15 +745,6 @@ static void dev_gossip_memleak(struct daemon *daemon, const u8 *msg) found_leak))); } -static void dev_compact_store(struct daemon *daemon, const u8 *msg) -{ - bool done = gossip_store_compact(daemon->rstate->gs); - - daemon_conn_send(daemon->master, - take(towire_gossipd_dev_compact_store_reply(NULL, - done))); -} - static void dev_gossip_set_time(struct daemon *daemon, const u8 *msg) { u32 time; @@ -895,12 +886,6 @@ static struct io_plan *recv_req(struct io_conn *conn, goto done; } /* fall thru */ - case WIRE_GOSSIPD_DEV_COMPACT_STORE: - if (daemon->developer) { - dev_compact_store(daemon, msg); - goto done; - } - /* fall thru */ case WIRE_GOSSIPD_DEV_SET_TIME: if (daemon->developer) { dev_gossip_set_time(daemon, msg); @@ -914,7 +899,6 @@ static struct io_plan *recv_req(struct io_conn *conn, case WIRE_GOSSIPD_INIT_REPLY: case WIRE_GOSSIPD_GET_TXOUT: case WIRE_GOSSIPD_DEV_MEMLEAK_REPLY: - case WIRE_GOSSIPD_DEV_COMPACT_STORE_REPLY: case WIRE_GOSSIPD_ADDGOSSIP_REPLY: case WIRE_GOSSIPD_NEW_BLOCKHEIGHT_REPLY: case WIRE_GOSSIPD_GET_ADDRS_REPLY: diff --git a/gossipd/gossipd_wire.csv b/gossipd/gossipd_wire.csv index 63dabd05e..6a0cc5d08 100644 --- a/gossipd/gossipd_wire.csv +++ b/gossipd/gossipd_wire.csv @@ -57,13 +57,6 @@ msgtype,gossipd_dev_memleak,3033 msgtype,gossipd_dev_memleak_reply,3133 msgdata,gossipd_dev_memleak_reply,leak,bool, -# master -> gossipd: please rewrite the gossip_store -msgtype,gossipd_dev_compact_store,3034 - -# gossipd -> master: ok -msgtype,gossipd_dev_compact_store_reply,3134 -msgdata,gossipd_dev_compact_store_reply,success,bool, - # master -> gossipd: blockheight increased. msgtype,gossipd_new_blockheight,3026 msgdata,gossipd_new_blockheight,blockheight,u32, diff --git a/lightningd/gossip_control.c b/lightningd/gossip_control.c index db3d8867e..362a132f0 100644 --- a/lightningd/gossip_control.c +++ b/lightningd/gossip_control.c @@ -174,7 +174,6 @@ static unsigned gossip_msg(struct subd *gossip, const u8 *msg, const int *fds) case WIRE_GOSSIPD_OUTPOINTS_SPENT: case WIRE_GOSSIPD_DEV_SET_MAX_SCIDS_ENCODE_SIZE: case WIRE_GOSSIPD_DEV_MEMLEAK: - case WIRE_GOSSIPD_DEV_COMPACT_STORE: case WIRE_GOSSIPD_DEV_SET_TIME: case WIRE_GOSSIPD_NEW_BLOCKHEIGHT: case WIRE_GOSSIPD_ADDGOSSIP: @@ -182,7 +181,6 @@ static unsigned gossip_msg(struct subd *gossip, const u8 *msg, const int *fds) /* This is a reply, so never gets through to here. */ case WIRE_GOSSIPD_INIT_REPLY: case WIRE_GOSSIPD_DEV_MEMLEAK_REPLY: - case WIRE_GOSSIPD_DEV_COMPACT_STORE_REPLY: case WIRE_GOSSIPD_ADDGOSSIP_REPLY: case WIRE_GOSSIPD_NEW_BLOCKHEIGHT_REPLY: case WIRE_GOSSIPD_GET_ADDRS_REPLY: @@ -456,50 +454,6 @@ static const struct json_command dev_set_max_scids_encode_size = { }; AUTODATA(json_command, &dev_set_max_scids_encode_size); -static void dev_compact_gossip_store_reply(struct subd *gossip UNUSED, - const u8 *reply, - const int *fds UNUSED, - struct command *cmd) -{ - bool success; - - if (!fromwire_gossipd_dev_compact_store_reply(reply, &success)) { - was_pending(command_fail(cmd, LIGHTNINGD, - "Gossip gave bad dev_gossip_compact_store_reply")); - return; - } - - if (!success) - was_pending(command_fail(cmd, LIGHTNINGD, - "gossip_compact_store failed")); - else - was_pending(command_success(cmd, json_stream_success(cmd))); -} - -static struct command_result *json_dev_compact_gossip_store(struct command *cmd, - const char *buffer, - const jsmntok_t *obj UNNEEDED, - const jsmntok_t *params) -{ - u8 *msg; - if (!param(cmd, buffer, params, NULL)) - return command_param_failed(); - - msg = towire_gossipd_dev_compact_store(NULL); - subd_req(cmd->ld->gossip, cmd->ld->gossip, - take(msg), -1, 0, dev_compact_gossip_store_reply, cmd); - return command_still_pending(cmd); -} - -static const struct json_command dev_compact_gossip_store = { - "dev-compact-gossip-store", - "developer", - json_dev_compact_gossip_store, - "Ask gossipd to rewrite the gossip store.", - .dev_only = true, -}; -AUTODATA(json_command, &dev_compact_gossip_store); - static struct command_result *json_dev_gossip_set_time(struct command *cmd, const char *buffer, const jsmntok_t *obj UNNEEDED, diff --git a/tests/test_gossip.py b/tests/test_gossip.py index 2079efd8b..a11fd86ed 100644 --- a/tests/test_gossip.py +++ b/tests/test_gossip.py @@ -1236,11 +1236,6 @@ def test_gossip_store_load_announce_before_update(node_factory): wait_for(lambda: l1.daemon.is_in_log(r'gossip_store: Read 1/1/1/0 cannounce/cupdate/nannounce/cdelete from store \(0 deleted\) in 778 bytes')) assert not l1.daemon.is_in_log('gossip_store.*truncating') - # Extra sanity check if we can. - l1.rpc.call('dev-compact-gossip-store') - l1.restart() - l1.rpc.call('dev-compact-gossip-store') - def test_gossip_store_load_amount_truncated(node_factory): """Make sure we can read canned gossip store with truncated amount""" @@ -1259,11 +1254,6 @@ def test_gossip_store_load_amount_truncated(node_factory): wait_for(lambda: l1.daemon.is_in_log(r'gossip_store: Read 0/0/0/0 cannounce/cupdate/nannounce/cdelete from store \(0 deleted\) in 1 bytes')) assert os.path.exists(os.path.join(l1.daemon.lightning_dir, TEST_NETWORK, 'gossip_store.corrupt')) - # Extra sanity check if we can. - l1.rpc.call('dev-compact-gossip-store') - l1.restart() - l1.rpc.call('dev-compact-gossip-store') - @pytest.mark.openchannel('v1') @pytest.mark.openchannel('v2') @@ -1582,7 +1572,6 @@ def test_gossip_store_compact_noappend(node_factory, bitcoind): with open(os.path.join(l2.daemon.lightning_dir, TEST_NETWORK, 'gossip_store.tmp'), 'wb') as f: f.write(bytearray.fromhex("07deadbeef")) - l2.rpc.call('dev-compact-gossip-store') l2.restart() wait_for(lambda: l2.daemon.is_in_log('gossip_store: Read ')) assert not l2.daemon.is_in_log('gossip_store:.*truncate') @@ -1596,32 +1585,6 @@ def test_gossip_store_load_complex(node_factory, bitcoind): wait_for(lambda: l2.daemon.is_in_log('gossip_store: Read ')) -def test_gossip_store_compact(node_factory, bitcoind): - l2 = setup_gossip_store_test(node_factory, bitcoind) - - # Now compact store. - l2.rpc.call('dev-compact-gossip-store') - - # Should still be connected. - time.sleep(1) - assert len(l2.rpc.listpeers()['peers']) == 2 - - # Should restart ok. - l2.restart() - wait_for(lambda: l2.daemon.is_in_log('gossip_store: Read ')) - - -def test_gossip_store_compact_restart(node_factory, bitcoind): - l2 = setup_gossip_store_test(node_factory, bitcoind) - - # Should restart ok. - l2.restart() - wait_for(lambda: l2.daemon.is_in_log('gossip_store: Read ')) - - # Now compact store. - l2.rpc.call('dev-compact-gossip-store') - - def test_gossip_store_load_no_channel_update(node_factory): """Make sure we can read truncated gossip store with a channel_announcement and no channel_update""" l1 = node_factory.get_node(start=False, allow_broken_log=True) @@ -1652,8 +1615,6 @@ def test_gossip_store_load_no_channel_update(node_factory): assert os.path.exists(os.path.join(l1.daemon.lightning_dir, TEST_NETWORK, 'gossip_store.corrupt')) # This should actually result in an empty store. - l1.rpc.call('dev-compact-gossip-store') - with open(os.path.join(l1.daemon.lightning_dir, TEST_NETWORK, 'gossip_store'), "rb") as f: assert bytearray(f.read()) == bytearray.fromhex("0d") diff --git a/tools/bench-gossipd.sh b/tools/bench-gossipd.sh index cd9ba96f1..79540f1fc 100755 --- a/tools/bench-gossipd.sh +++ b/tools/bench-gossipd.sh @@ -5,7 +5,7 @@ set -e DIR="" TARGETS="" -DEFAULT_TARGETS=" store_load_msec vsz_kb store_rewrite_sec listnodes_sec listchannels_sec routing_sec peer_write_all_sec peer_read_all_sec " +DEFAULT_TARGETS=" store_load_msec vsz_kb listnodes_sec listchannels_sec routing_sec peer_write_all_sec peer_read_all_sec " MCP_DIR=../million-channels-project/data/1M/gossip/ CSV=false @@ -111,12 +111,6 @@ if [ -z "${TARGETS##* vsz_kb *}" ]; then ps -o vsz= -p "$(pidof lightning_gossipd)" | print_stat vsz_kb fi -# How long does rewriting the store take? -if [ -z "${TARGETS##* store_rewrite_sec *}" ]; then - # shellcheck disable=SC2086 - /usr/bin/time --append -f %e $LCLI1 dev-compact-gossip-store 2>&1 > /dev/null | print_stat store_rewrite_sec -fi - # Now, how long does listnodes take? if [ -z "${TARGETS##* listnodes_sec *}" ]; then # shellcheck disable=SC2086