diff --git a/gossipd/broadcast.c b/gossipd/broadcast.c index 39c42db59..4ce02fe9f 100644 --- a/gossipd/broadcast.c +++ b/gossipd/broadcast.c @@ -19,12 +19,14 @@ static void destroy_broadcast_state(struct broadcast_state *bstate) } struct broadcast_state *new_broadcast_state(struct routing_state *rstate, - struct gossip_store *gs) + struct gossip_store *gs, + struct list_head *peers) { struct broadcast_state *bstate = tal(rstate, struct broadcast_state); uintmap_init(&bstate->broadcasts); bstate->count = 0; bstate->gs = gs; + bstate->peers = peers; tal_add_destructor(bstate, destroy_broadcast_state); return bstate; } @@ -63,6 +65,8 @@ void insert_broadcast(struct broadcast_state **bstate, const u8 *msg, struct broadcastable *bcast) { + u32 offset; + /* If we're loading from the store, we already have index */ if (!bcast->index) { u64 idx; @@ -79,7 +83,9 @@ void insert_broadcast(struct broadcast_state **bstate, insert_broadcast_nostore(*bstate, bcast); /* If it compacts, it replaces *bstate */ - gossip_store_maybe_compact((*bstate)->gs, bstate); + gossip_store_maybe_compact((*bstate)->gs, bstate, &offset); + if (offset) + update_peers_broadcast_index((*bstate)->peers, offset); } struct broadcastable *next_broadcast_raw(struct broadcast_state *bstate, diff --git a/gossipd/broadcast.h b/gossipd/broadcast.h index a9dc57a07..b4c0b4830 100644 --- a/gossipd/broadcast.h +++ b/gossipd/broadcast.h @@ -24,6 +24,7 @@ struct broadcast_state { UINTMAP(struct broadcastable *) broadcasts; size_t count; struct gossip_store *gs; + struct list_head *peers; }; static inline void broadcastable_init(struct broadcastable *bcast) @@ -32,7 +33,8 @@ static inline void broadcastable_init(struct broadcastable *bcast) } struct broadcast_state *new_broadcast_state(struct routing_state *rstate, - struct gossip_store *gs); + struct gossip_store *gs, + struct list_head *peers); /* Append a queued message for broadcast. Must be explicitly deleted. * Also adds it to the gossip store. */ @@ -68,4 +70,6 @@ u64 broadcast_final_index(const struct broadcast_state *bstate); struct broadcast_state *broadcast_state_check(struct broadcast_state *b, const char *abortstr); +/* Callback for after we compacted the store */ +void update_peers_broadcast_index(struct list_head *peers, u32 offset); #endif /* LIGHTNING_GOSSIPD_BROADCAST_H */ diff --git a/gossipd/gossip_store.c b/gossipd/gossip_store.c index 1b5d1c216..bbf7032eb 100644 --- a/gossipd/gossip_store.c +++ b/gossipd/gossip_store.c @@ -253,9 +253,13 @@ static bool add_local_unnannounced(int in_fd, int out_fd, * * Creates a new file, writes all the updates from the `broadcast_state`, and * then atomically swaps the files. + * + * Returns the amount of shrinkage in @offset on success, otherwise @offset + * is unchanged. */ bool gossip_store_compact(struct gossip_store *gs, - struct broadcast_state **bs) + struct broadcast_state **bs, + u32 *offset) { size_t count = 0; int fd; @@ -274,7 +278,7 @@ bool gossip_store_compact(struct gossip_store *gs, "Compacting gossip_store with %zu entries, %zu of which are stale", gs->count, gs->count - oldb->count); - newb = new_broadcast_state(gs->rstate, gs); + newb = new_broadcast_state(gs->rstate, gs, oldb->peers); fd = open(GOSSIP_STORE_TEMP_FILENAME, O_RDWR|O_APPEND|O_CREAT, 0600); if (fd < 0) { @@ -349,6 +353,7 @@ bool gossip_store_compact(struct gossip_store *gs, "Compaction completed: dropped %zu messages, new count %zu, len %"PRIu64, gs->count - count, count, len); gs->count = count; + *offset = gs->len - len; gs->len = len; close(gs->fd); gs->fd = fd; @@ -368,8 +373,11 @@ disable: } void gossip_store_maybe_compact(struct gossip_store *gs, - struct broadcast_state **bs) + struct broadcast_state **bs, + u32 *offset) { + *offset = 0; + /* Don't compact while loading! */ if (!gs->writable) return; @@ -378,7 +386,7 @@ void gossip_store_maybe_compact(struct gossip_store *gs, if (gs->count < (*bs)->count * 1.25) return; - gossip_store_compact(gs, bs); + gossip_store_compact(gs, bs, offset); } u64 gossip_store_add(struct gossip_store *gs, const u8 *gossip_msg) diff --git a/gossipd/gossip_store.h b/gossipd/gossip_store.h index b37be3855..6caa82109 100644 --- a/gossipd/gossip_store.h +++ b/gossipd/gossip_store.h @@ -51,12 +51,20 @@ const u8 *gossip_store_get(const tal_t *ctx, * If we need to compact the gossip store, do so. * @gs: the gossip store. * @bs: a pointer to the broadcast state: replaced if we compact it. + * @offset: the change in the store, if any. + * + * If @offset is non-zero on return, caller must update peers. */ void gossip_store_maybe_compact(struct gossip_store *gs, - struct broadcast_state **bs); + struct broadcast_state **bs, + u32 *offset); /* Expose for dev-compact-gossip-store to force compaction. */ bool gossip_store_compact(struct gossip_store *gs, - struct broadcast_state **bs); + struct broadcast_state **bs, + u32 *offset); + +/* Callback for when gossip_store indexes move */ + #endif /* LIGHTNING_GOSSIPD_GOSSIP_STORE_H */ diff --git a/gossipd/gossipd.c b/gossipd/gossipd.c index 4e2404b7a..2d5d74fab 100644 --- a/gossipd/gossipd.c +++ b/gossipd/gossipd.c @@ -695,6 +695,21 @@ static u8 *handle_gossip_timestamp_filter(struct peer *peer, const u8 *msg) return NULL; } +/*~ When we compact the gossip store, all the broadcast indexs move. + * We simply offset everyone, which means in theory they could retransmit + * some, but that's a lesser evil than skipping some. */ +void update_peers_broadcast_index(struct list_head *peers, u32 offset) +{ + struct peer *peer; + + list_for_each(peers, peer, list) { + if (peer->broadcast_index < offset) + peer->broadcast_index = 0; + else + peer->broadcast_index -= offset; + } +} + /*~ We can send multiple replies when the peer queries for all channels in * a given range of blocks; each one indicates the range of blocks it covers. */ static void reply_channel_range(struct peer *peer, @@ -1956,6 +1971,7 @@ static struct io_plan *gossip_init(struct io_conn *conn, chainparams_by_chainhash(&daemon->chain_hash), &daemon->id, update_channel_interval * 2, + &daemon->peers, dev_gossip_time, dev_unknown_channel_satoshis); @@ -2565,8 +2581,15 @@ static struct io_plan *dev_compact_store(struct io_conn *conn, struct daemon *daemon, const u8 *msg) { + u32 offset; bool done = gossip_store_compact(daemon->rstate->broadcasts->gs, - &daemon->rstate->broadcasts); + &daemon->rstate->broadcasts, + &offset); + + /* Peers keep an offset into where they are with gossip. */ + if (done) + update_peers_broadcast_index(&daemon->peers, offset); + daemon_conn_send(daemon->master, take(towire_gossip_dev_compact_store_reply(NULL, done))); diff --git a/gossipd/routing.c b/gossipd/routing.c index d4e61e5c0..9d3bf28c4 100644 --- a/gossipd/routing.c +++ b/gossipd/routing.c @@ -181,12 +181,14 @@ struct routing_state *new_routing_state(const tal_t *ctx, const struct chainparams *chainparams, const struct node_id *local_id, u32 prune_timeout, + struct list_head *peers, const u32 *dev_gossip_time, const struct amount_sat *dev_unknown_channel_satoshis) { struct routing_state *rstate = tal(ctx, struct routing_state); rstate->nodes = empty_node_map(rstate); - rstate->broadcasts = new_broadcast_state(rstate, gossip_store_new(rstate)); + rstate->broadcasts + = new_broadcast_state(rstate, gossip_store_new(rstate), peers); rstate->chainparams = chainparams; rstate->local_id = *local_id; rstate->prune_timeout = prune_timeout; diff --git a/gossipd/routing.h b/gossipd/routing.h index a3f4bda24..e5e4f6fcf 100644 --- a/gossipd/routing.h +++ b/gossipd/routing.h @@ -226,6 +226,7 @@ struct routing_state *new_routing_state(const tal_t *ctx, const struct chainparams *chainparams, const struct node_id *local_id, u32 prune_timeout, + struct list_head *peers, const u32 *dev_gossip_time, const struct amount_sat *dev_unknown_channel_satoshis); diff --git a/gossipd/test/run-bench-find_route.c b/gossipd/test/run-bench-find_route.c index 4a83ac992..d61bfd3c2 100644 --- a/gossipd/test/run-bench-find_route.c +++ b/gossipd/test/run-bench-find_route.c @@ -97,6 +97,9 @@ u8 *towire_gossip_store_local_add_channel(const tal_t *ctx UNNEEDED, const u8 *l /* Generated stub for towire_gossip_store_node_announcement */ u8 *towire_gossip_store_node_announcement(const tal_t *ctx UNNEEDED, const u8 *announcement UNNEEDED) { fprintf(stderr, "towire_gossip_store_node_announcement called!\n"); abort(); } +/* Generated stub for update_peers_broadcast_index */ +void update_peers_broadcast_index(struct list_head *peers UNNEEDED, u32 offset UNNEEDED) +{ fprintf(stderr, "update_peers_broadcast_index called!\n"); abort(); } /* Generated stub for wire_type_name */ const char *wire_type_name(int e UNNEEDED) { fprintf(stderr, "wire_type_name called!\n"); abort(); } @@ -214,7 +217,7 @@ int main(int argc, char *argv[]) setup_tmpctx(); me = nodeid(0); - rstate = new_routing_state(tmpctx, NULL, &me, 0, NULL, NULL); + rstate = new_routing_state(tmpctx, NULL, &me, 0, NULL, NULL, NULL); opt_register_noarg("--perfme", opt_set_bool, &perfme, "Run perfme-start and perfme-stop around benchmark"); diff --git a/gossipd/test/run-find_route-specific.c b/gossipd/test/run-find_route-specific.c index 5455177db..9d2d04cef 100644 --- a/gossipd/test/run-find_route-specific.c +++ b/gossipd/test/run-find_route-specific.c @@ -86,6 +86,9 @@ u8 *towire_gossip_store_local_add_channel(const tal_t *ctx UNNEEDED, const u8 *l /* Generated stub for towire_gossip_store_node_announcement */ u8 *towire_gossip_store_node_announcement(const tal_t *ctx UNNEEDED, const u8 *announcement UNNEEDED) { fprintf(stderr, "towire_gossip_store_node_announcement called!\n"); abort(); } +/* Generated stub for update_peers_broadcast_index */ +void update_peers_broadcast_index(struct list_head *peers UNNEEDED, u32 offset UNNEEDED) +{ fprintf(stderr, "update_peers_broadcast_index called!\n"); abort(); } /* Generated stub for wire_type_name */ const char *wire_type_name(int e UNNEEDED) { fprintf(stderr, "wire_type_name called!\n"); abort(); } @@ -171,7 +174,7 @@ int main(void) strlen("02cca6c5c966fcf61d121e3a70e03a1cd9eeeea024b26ea666ce974d43b242e636"), &d); - rstate = new_routing_state(tmpctx, NULL, &a, 0, NULL, NULL); + rstate = new_routing_state(tmpctx, NULL, &a, 0, NULL, NULL, NULL); /* [{'active': True, 'short_id': '6990:2:1/1', 'fee_per_kw': 10, 'delay': 5, 'message_flags': 0, 'channel_flags': 1, 'destination': '0230ad0e74ea03976b28fda587bb75bdd357a1938af4424156a18265167f5e40ae', 'source': '02ea622d5c8d6143f15ed3ce1d501dd0d3d09d3b1c83a44d0034949f8a9ab60f06', 'last_update': 1504064344}, */ diff --git a/gossipd/test/run-find_route.c b/gossipd/test/run-find_route.c index c60f76bdc..e881b4606 100644 --- a/gossipd/test/run-find_route.c +++ b/gossipd/test/run-find_route.c @@ -84,6 +84,9 @@ u8 *towire_gossip_store_local_add_channel(const tal_t *ctx UNNEEDED, const u8 *l /* Generated stub for towire_gossip_store_node_announcement */ u8 *towire_gossip_store_node_announcement(const tal_t *ctx UNNEEDED, const u8 *announcement UNNEEDED) { fprintf(stderr, "towire_gossip_store_node_announcement called!\n"); abort(); } +/* Generated stub for update_peers_broadcast_index */ +void update_peers_broadcast_index(struct list_head *peers UNNEEDED, u32 offset UNNEEDED) +{ fprintf(stderr, "update_peers_broadcast_index called!\n"); abort(); } /* Generated stub for wire_type_name */ const char *wire_type_name(int e UNNEEDED) { fprintf(stderr, "wire_type_name called!\n"); abort(); } @@ -205,7 +208,7 @@ int main(void) memset(&tmp, 'a', sizeof(tmp)); node_id_from_privkey(&tmp, &a); - rstate = new_routing_state(tmpctx, NULL, &a, 0, NULL, NULL); + rstate = new_routing_state(tmpctx, NULL, &a, 0, NULL, NULL, NULL); new_node(rstate, &a);