gossipd: remove online gossip_store compaction.

It was an obscure dev command, as it never worked reliably.
It would be much easier to re-implement once this is done.

This turned out to reveal a tiny leak on
tests/test_gossip.py::test_gossip_store_load_amount_truncated where we
didn't immedately free chan_ann if it was dangling.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell 2024-01-31 14:55:33 +10:30
parent 561859da0c
commit 7f5fe52320
7 changed files with 5 additions and 377 deletions

View File

@ -29,19 +29,10 @@ struct gossip_store {
/* Offset of current EOF */
u64 len;
/* Counters for entries in the gossip_store entries. This is used to
* decide whether we should rewrite the on-disk store or not.
* Note: count includes deleted. */
size_t count, deleted;
/* Handle to the routing_state to retrieve additional information,
* should it be needed */
struct routing_state *rstate;
/* Disable compaction if we encounter an error during a prior
* compaction */
bool disable_compaction;
/* Timestamp of store when we opened it (0 if we created it) */
u32 timestamp;
};
@ -326,7 +317,6 @@ close_old:
struct gossip_store *gossip_store_new(struct routing_state *rstate)
{
struct gossip_store *gs = tal(rstate, struct gossip_store);
gs->count = gs->deleted = 0;
gs->writable = true;
gs->timestamp = gossip_store_compact_offline(rstate);
gs->fd = open(GOSSIP_STORE_FILENAME, O_RDWR|O_CREAT, 0600);
@ -335,7 +325,6 @@ struct gossip_store *gossip_store_new(struct routing_state *rstate)
"Opening gossip_store store: %s",
strerror(errno));
gs->rstate = rstate;
gs->disable_compaction = false;
gs->len = sizeof(gs->version);
tal_add_destructor(gs, gossip_store_destroy);
@ -368,244 +357,6 @@ struct gossip_store *gossip_store_new(struct routing_state *rstate)
return gs;
}
/* Returns bytes transferred, or 0 on error */
static size_t transfer_store_msg(int from_fd, size_t from_off,
int to_fd, size_t to_off,
int *type)
{
struct gossip_hdr hdr;
u16 flags, msglen;
u8 *msg;
const u8 *p;
size_t tmplen;
*type = -1;
if (pread(from_fd, &hdr, sizeof(hdr), from_off) != sizeof(hdr)) {
status_broken("Failed reading header from to gossip store @%zu"
": %s",
from_off, strerror(errno));
return 0;
}
flags = be16_to_cpu(hdr.flags);
if (flags & GOSSIP_STORE_DELETED_BIT) {
status_broken("Can't transfer deleted msg from gossip store @%zu",
from_off);
return 0;
}
msglen = be16_to_cpu(hdr.len);
/* FIXME: Reuse buffer? */
msg = tal_arr(tmpctx, u8, sizeof(hdr) + msglen);
memcpy(msg, &hdr, sizeof(hdr));
if (pread(from_fd, msg + sizeof(hdr), msglen, from_off + sizeof(hdr))
!= msglen) {
status_broken("Failed reading %u from to gossip store @%zu"
": %s",
msglen, from_off, strerror(errno));
return 0;
}
if (pwrite(to_fd, msg, msglen + sizeof(hdr), to_off)
!= msglen + sizeof(hdr)) {
status_broken("Failed writing to gossip store: %s",
strerror(errno));
return 0;
}
/* Can't use peektype here, since we have header on front */
p = msg + sizeof(hdr);
tmplen = msglen;
*type = fromwire_u16(&p, &tmplen);
if (!p)
*type = -1;
tal_free(msg);
return sizeof(hdr) + msglen;
}
/* We keep a htable map of old gossip_store offsets to new ones. */
struct offset_map {
size_t from, to;
};
static size_t offset_map_key(const struct offset_map *omap)
{
return omap->from;
}
static size_t hash_offset(size_t from)
{
/* Crappy fast hash is "good enough" */
return (from >> 5) ^ from;
}
static bool offset_map_eq(const struct offset_map *omap, const size_t from)
{
return omap->from == from;
}
HTABLE_DEFINE_TYPE(struct offset_map,
offset_map_key, hash_offset, offset_map_eq, offmap);
static void move_broadcast(struct offmap *offmap,
struct broadcastable *bcast,
const char *what)
{
struct offset_map *omap;
if (!bcast->index)
return;
omap = offmap_get(offmap, bcast->index);
if (!omap)
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"Could not relocate %s at offset %u",
what, bcast->index);
bcast->index = omap->to;
offmap_del(offmap, omap);
}
/**
* Rewrite the on-disk gossip store, compacting it along the way
*
* Creates a new file, writes all the updates from the `broadcast_state`, and
* then atomically swaps the files.
*/
bool gossip_store_compact(struct gossip_store *gs)
{
size_t count = 0, deleted = 0;
int fd;
u64 off, len = sizeof(gs->version), idx;
struct offmap *offmap;
struct gossip_hdr hdr;
struct offmap_iter oit;
struct node_map_iter nit;
struct offset_map *omap;
if (gs->disable_compaction)
return false;
status_debug(
"Compacting gossip_store with %zu entries, %zu of which are stale",
gs->count, gs->deleted);
fd = open(GOSSIP_STORE_TEMP_FILENAME, O_RDWR|O_TRUNC|O_CREAT, 0600);
if (fd < 0) {
status_broken(
"Could not open file for gossip_store compaction");
goto disable;
}
if (write(fd, &gs->version, sizeof(gs->version))
!= sizeof(gs->version)) {
status_broken("Writing version to store: %s", strerror(errno));
goto unlink_disable;
}
/* Walk old file, copy everything and remember new offsets. */
offmap = tal(tmpctx, struct offmap);
offmap_init_sized(offmap, gs->count);
/* Start by writing all channel announcements and updates. */
off = 1;
while (pread(gs->fd, &hdr, sizeof(hdr), off) == sizeof(hdr)) {
u16 msglen;
u32 wlen;
int msgtype;
msglen = be16_to_cpu(hdr.len);
if (be16_to_cpu(hdr.flags) & GOSSIP_STORE_DELETED_BIT) {
off += sizeof(hdr) + msglen;
deleted++;
continue;
}
count++;
wlen = transfer_store_msg(gs->fd, off, fd, len, &msgtype);
if (wlen == 0)
goto unlink_disable;
/* We track location of all these message types. */
if (msgtype == WIRE_CHANNEL_ANNOUNCEMENT
|| msgtype == WIRE_CHANNEL_UPDATE
|| msgtype == WIRE_NODE_ANNOUNCEMENT) {
omap = tal(offmap, struct offset_map);
omap->from = off;
omap->to = len;
offmap_add(offmap, omap);
}
len += wlen;
off += wlen;
}
/* OK, now we've written file successfully, we can move broadcasts. */
/* Remap node announcements. */
for (struct node *n = node_map_first(gs->rstate->nodes, &nit);
n;
n = node_map_next(gs->rstate->nodes, &nit)) {
move_broadcast(offmap, &n->bcast, "node_announce");
}
/* Remap channel announcements and updates */
for (struct chan *c = uintmap_first(&gs->rstate->chanmap, &idx);
c;
c = uintmap_after(&gs->rstate->chanmap, &idx)) {
move_broadcast(offmap, &c->bcast, "channel_announce");
move_broadcast(offmap, &c->half[0].bcast, "channel_update");
move_broadcast(offmap, &c->half[1].bcast, "channel_update");
}
/* That should be everything. */
omap = offmap_first(offmap, &oit);
if (omap)
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"gossip_store: Entry at %zu->%zu not updated?",
omap->from, omap->to);
if (count != gs->count - gs->deleted)
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"gossip_store: Expected %zu msgs in new"
" gossip store, got %zu",
gs->count - gs->deleted, count);
if (deleted != gs->deleted)
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"gossip_store: Expected %zu deleted msgs in old"
" gossip store, got %zu",
gs->deleted, deleted);
if (rename(GOSSIP_STORE_TEMP_FILENAME, GOSSIP_STORE_FILENAME) == -1)
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"Error swapping compacted gossip_store into place:"
" %s",
strerror(errno));
status_debug(
"Compaction completed: dropped %zu messages, new count %zu, len %"PRIu64,
deleted, count, len);
/* Write end marker now new one is ready */
append_msg(gs->fd, towire_gossip_store_ended(tmpctx, len),
0, false, false, false, &gs->len);
gs->count = count;
gs->deleted = 0;
gs->len = len;
close(gs->fd);
gs->fd = fd;
return true;
unlink_disable:
unlink(GOSSIP_STORE_TEMP_FILENAME);
disable:
status_debug("Encountered an error while compacting, disabling "
"future compactions.");
gs->disable_compaction = true;
return false;
}
u64 gossip_store_add(struct gossip_store *gs, const u8 *gossip_msg,
u32 timestamp, bool zombie,
bool spam, bool dying, const u8 *addendum)
@ -626,9 +377,6 @@ u64 gossip_store_add(struct gossip_store *gs, const u8 *gossip_msg,
return 0;
}
gs->count++;
if (addendum)
gs->count++;
return off;
}
@ -708,7 +456,6 @@ static u32 flag_by_index(struct gossip_store *gs, u32 index, int flag, int type)
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"Failed writing flags to delete @%u: %s",
index, strerror(errno));
gs->deleted++;
return index + sizeof(struct gossip_hdr) + be16_to_cpu(hdr.belen);
}
@ -916,6 +663,7 @@ u32 gossip_store_load(struct routing_state *rstate, struct gossip_store *gs)
const char *bad;
size_t stats[] = {0, 0, 0, 0};
struct timeabs start = time_now();
size_t deleted = 0;
u8 *chan_ann = NULL;
u64 chan_ann_off = 0; /* Spurious gcc-9 (Ubuntu 9-20190402-1ubuntu1) 9.0.1 20190402 (experimental) warning */
@ -940,9 +688,7 @@ u32 gossip_store_load(struct routing_state *rstate, struct gossip_store *gs)
/* Skip deleted entries */
if (be16_to_cpu(hdr.flags) & GOSSIP_STORE_DELETED_BIT) {
/* Count includes deleted! */
gs->count++;
gs->deleted++;
deleted++;
goto next;
}
spam = (be16_to_cpu(hdr.flags) & GOSSIP_STORE_RATELIMIT_BIT);
@ -1013,13 +759,13 @@ u32 gossip_store_load(struct routing_state *rstate, struct gossip_store *gs)
goto badmsg;
}
gs->count++;
next:
gs->len += sizeof(hdr) + msglen;
clean_tmpctx();
}
if (chan_ann) {
tal_free(chan_ann);
bad = "dangling channel_announcement";
goto corrupt;
}
@ -1045,7 +791,6 @@ corrupt:
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"Truncating new store file: %s", strerror(errno));
remove_all_gossip(rstate);
gs->count = gs->deleted = 0;
gs->len = 1;
gs->timestamp = 0;
out:
@ -1053,7 +798,7 @@ out:
status_debug("total store load time: %"PRIu64" msec",
time_to_msec(time_between(time_now(), start)));
status_debug("gossip_store: Read %zu/%zu/%zu/%zu cannounce/cupdate/nannounce/cdelete from store (%zu deleted) in %"PRIu64" bytes",
stats[0], stats[1], stats[2], stats[3], gs->deleted,
stats[0], stats[1], stats[2], stats[3], deleted,
gs->len);
return gs->timestamp;

View File

@ -126,9 +126,6 @@ u32 gossip_store_get_timestamp(struct gossip_store *gs, u64 offset);
*/
void gossip_store_set_timestamp(struct gossip_store *gs, u64 offset, u32 timestamp);
/* Exposed for dev-compact-gossip-store to force compaction. */
bool gossip_store_compact(struct gossip_store *gs);
/**
* Get a readonly fd for the gossip_store.
* @gs: the gossip store.

View File

@ -745,15 +745,6 @@ static void dev_gossip_memleak(struct daemon *daemon, const u8 *msg)
found_leak)));
}
static void dev_compact_store(struct daemon *daemon, const u8 *msg)
{
bool done = gossip_store_compact(daemon->rstate->gs);
daemon_conn_send(daemon->master,
take(towire_gossipd_dev_compact_store_reply(NULL,
done)));
}
static void dev_gossip_set_time(struct daemon *daemon, const u8 *msg)
{
u32 time;
@ -895,12 +886,6 @@ static struct io_plan *recv_req(struct io_conn *conn,
goto done;
}
/* fall thru */
case WIRE_GOSSIPD_DEV_COMPACT_STORE:
if (daemon->developer) {
dev_compact_store(daemon, msg);
goto done;
}
/* fall thru */
case WIRE_GOSSIPD_DEV_SET_TIME:
if (daemon->developer) {
dev_gossip_set_time(daemon, msg);
@ -914,7 +899,6 @@ static struct io_plan *recv_req(struct io_conn *conn,
case WIRE_GOSSIPD_INIT_REPLY:
case WIRE_GOSSIPD_GET_TXOUT:
case WIRE_GOSSIPD_DEV_MEMLEAK_REPLY:
case WIRE_GOSSIPD_DEV_COMPACT_STORE_REPLY:
case WIRE_GOSSIPD_ADDGOSSIP_REPLY:
case WIRE_GOSSIPD_NEW_BLOCKHEIGHT_REPLY:
case WIRE_GOSSIPD_GET_ADDRS_REPLY:

View File

@ -57,13 +57,6 @@ msgtype,gossipd_dev_memleak,3033
msgtype,gossipd_dev_memleak_reply,3133
msgdata,gossipd_dev_memleak_reply,leak,bool,
# master -> gossipd: please rewrite the gossip_store
msgtype,gossipd_dev_compact_store,3034
# gossipd -> master: ok
msgtype,gossipd_dev_compact_store_reply,3134
msgdata,gossipd_dev_compact_store_reply,success,bool,
# master -> gossipd: blockheight increased.
msgtype,gossipd_new_blockheight,3026
msgdata,gossipd_new_blockheight,blockheight,u32,

1 #include <common/cryptomsg.h>
57 msgtype,gossipd_new_blockheight_reply,3126 # Empty string means no problem.
58 # Lightningd tells us to inject a gossip message (for addgossip RPC) msgtype,gossipd_addgossip_reply,3144
59 msgtype,gossipd_addgossip,3044 msgdata,gossipd_addgossip_reply,err,wirestring,
msgdata,gossipd_addgossip,len,u16,
msgdata,gossipd_addgossip,msg,u8,len
# Empty string means no problem.
msgtype,gossipd_addgossip_reply,3144
msgdata,gossipd_addgossip_reply,err,wirestring,
# Lightningd asks gossipd for any known addresses for that node.
msgtype,gossipd_get_addrs,3050
60 msgdata,gossipd_get_addrs,id,node_id, # Lightningd asks gossipd for any known addresses for that node.
61 msgtype,gossipd_get_addrs_reply,3150 msgtype,gossipd_get_addrs,3050
62 msgdata,gossipd_get_addrs_reply,num,u16, msgdata,gossipd_get_addrs,id,node_id,

View File

@ -174,7 +174,6 @@ static unsigned gossip_msg(struct subd *gossip, const u8 *msg, const int *fds)
case WIRE_GOSSIPD_OUTPOINTS_SPENT:
case WIRE_GOSSIPD_DEV_SET_MAX_SCIDS_ENCODE_SIZE:
case WIRE_GOSSIPD_DEV_MEMLEAK:
case WIRE_GOSSIPD_DEV_COMPACT_STORE:
case WIRE_GOSSIPD_DEV_SET_TIME:
case WIRE_GOSSIPD_NEW_BLOCKHEIGHT:
case WIRE_GOSSIPD_ADDGOSSIP:
@ -182,7 +181,6 @@ static unsigned gossip_msg(struct subd *gossip, const u8 *msg, const int *fds)
/* This is a reply, so never gets through to here. */
case WIRE_GOSSIPD_INIT_REPLY:
case WIRE_GOSSIPD_DEV_MEMLEAK_REPLY:
case WIRE_GOSSIPD_DEV_COMPACT_STORE_REPLY:
case WIRE_GOSSIPD_ADDGOSSIP_REPLY:
case WIRE_GOSSIPD_NEW_BLOCKHEIGHT_REPLY:
case WIRE_GOSSIPD_GET_ADDRS_REPLY:
@ -456,50 +454,6 @@ static const struct json_command dev_set_max_scids_encode_size = {
};
AUTODATA(json_command, &dev_set_max_scids_encode_size);
static void dev_compact_gossip_store_reply(struct subd *gossip UNUSED,
const u8 *reply,
const int *fds UNUSED,
struct command *cmd)
{
bool success;
if (!fromwire_gossipd_dev_compact_store_reply(reply, &success)) {
was_pending(command_fail(cmd, LIGHTNINGD,
"Gossip gave bad dev_gossip_compact_store_reply"));
return;
}
if (!success)
was_pending(command_fail(cmd, LIGHTNINGD,
"gossip_compact_store failed"));
else
was_pending(command_success(cmd, json_stream_success(cmd)));
}
static struct command_result *json_dev_compact_gossip_store(struct command *cmd,
const char *buffer,
const jsmntok_t *obj UNNEEDED,
const jsmntok_t *params)
{
u8 *msg;
if (!param(cmd, buffer, params, NULL))
return command_param_failed();
msg = towire_gossipd_dev_compact_store(NULL);
subd_req(cmd->ld->gossip, cmd->ld->gossip,
take(msg), -1, 0, dev_compact_gossip_store_reply, cmd);
return command_still_pending(cmd);
}
static const struct json_command dev_compact_gossip_store = {
"dev-compact-gossip-store",
"developer",
json_dev_compact_gossip_store,
"Ask gossipd to rewrite the gossip store.",
.dev_only = true,
};
AUTODATA(json_command, &dev_compact_gossip_store);
static struct command_result *json_dev_gossip_set_time(struct command *cmd,
const char *buffer,
const jsmntok_t *obj UNNEEDED,

View File

@ -1236,11 +1236,6 @@ def test_gossip_store_load_announce_before_update(node_factory):
wait_for(lambda: l1.daemon.is_in_log(r'gossip_store: Read 1/1/1/0 cannounce/cupdate/nannounce/cdelete from store \(0 deleted\) in 778 bytes'))
assert not l1.daemon.is_in_log('gossip_store.*truncating')
# Extra sanity check if we can.
l1.rpc.call('dev-compact-gossip-store')
l1.restart()
l1.rpc.call('dev-compact-gossip-store')
def test_gossip_store_load_amount_truncated(node_factory):
"""Make sure we can read canned gossip store with truncated amount"""
@ -1259,11 +1254,6 @@ def test_gossip_store_load_amount_truncated(node_factory):
wait_for(lambda: l1.daemon.is_in_log(r'gossip_store: Read 0/0/0/0 cannounce/cupdate/nannounce/cdelete from store \(0 deleted\) in 1 bytes'))
assert os.path.exists(os.path.join(l1.daemon.lightning_dir, TEST_NETWORK, 'gossip_store.corrupt'))
# Extra sanity check if we can.
l1.rpc.call('dev-compact-gossip-store')
l1.restart()
l1.rpc.call('dev-compact-gossip-store')
@pytest.mark.openchannel('v1')
@pytest.mark.openchannel('v2')
@ -1582,7 +1572,6 @@ def test_gossip_store_compact_noappend(node_factory, bitcoind):
with open(os.path.join(l2.daemon.lightning_dir, TEST_NETWORK, 'gossip_store.tmp'), 'wb') as f:
f.write(bytearray.fromhex("07deadbeef"))
l2.rpc.call('dev-compact-gossip-store')
l2.restart()
wait_for(lambda: l2.daemon.is_in_log('gossip_store: Read '))
assert not l2.daemon.is_in_log('gossip_store:.*truncate')
@ -1596,32 +1585,6 @@ def test_gossip_store_load_complex(node_factory, bitcoind):
wait_for(lambda: l2.daemon.is_in_log('gossip_store: Read '))
def test_gossip_store_compact(node_factory, bitcoind):
l2 = setup_gossip_store_test(node_factory, bitcoind)
# Now compact store.
l2.rpc.call('dev-compact-gossip-store')
# Should still be connected.
time.sleep(1)
assert len(l2.rpc.listpeers()['peers']) == 2
# Should restart ok.
l2.restart()
wait_for(lambda: l2.daemon.is_in_log('gossip_store: Read '))
def test_gossip_store_compact_restart(node_factory, bitcoind):
l2 = setup_gossip_store_test(node_factory, bitcoind)
# Should restart ok.
l2.restart()
wait_for(lambda: l2.daemon.is_in_log('gossip_store: Read '))
# Now compact store.
l2.rpc.call('dev-compact-gossip-store')
def test_gossip_store_load_no_channel_update(node_factory):
"""Make sure we can read truncated gossip store with a channel_announcement and no channel_update"""
l1 = node_factory.get_node(start=False, allow_broken_log=True)
@ -1652,8 +1615,6 @@ def test_gossip_store_load_no_channel_update(node_factory):
assert os.path.exists(os.path.join(l1.daemon.lightning_dir, TEST_NETWORK, 'gossip_store.corrupt'))
# This should actually result in an empty store.
l1.rpc.call('dev-compact-gossip-store')
with open(os.path.join(l1.daemon.lightning_dir, TEST_NETWORK, 'gossip_store'), "rb") as f:
assert bytearray(f.read()) == bytearray.fromhex("0d")

View File

@ -5,7 +5,7 @@ set -e
DIR=""
TARGETS=""
DEFAULT_TARGETS=" store_load_msec vsz_kb store_rewrite_sec listnodes_sec listchannels_sec routing_sec peer_write_all_sec peer_read_all_sec "
DEFAULT_TARGETS=" store_load_msec vsz_kb listnodes_sec listchannels_sec routing_sec peer_write_all_sec peer_read_all_sec "
MCP_DIR=../million-channels-project/data/1M/gossip/
CSV=false
@ -111,12 +111,6 @@ if [ -z "${TARGETS##* vsz_kb *}" ]; then
ps -o vsz= -p "$(pidof lightning_gossipd)" | print_stat vsz_kb
fi
# How long does rewriting the store take?
if [ -z "${TARGETS##* store_rewrite_sec *}" ]; then
# shellcheck disable=SC2086
/usr/bin/time --append -f %e $LCLI1 dev-compact-gossip-store 2>&1 > /dev/null | print_stat store_rewrite_sec
fi
# Now, how long does listnodes take?
if [ -z "${TARGETS##* listnodes_sec *}" ]; then
# shellcheck disable=SC2086