gossip: Implement gossip_store compaction

Signed-off-by: Christian Decker <decker.christian@gmail.com>
This commit is contained in:
Christian Decker 2018-05-30 16:06:23 +02:00
parent b9a2400a5f
commit 74a1cbd877
4 changed files with 75 additions and 3 deletions

View File

@ -15,6 +15,7 @@
#include <wire/wire.h>
#define GOSSIP_STORE_FILENAME "gossip_store"
#define GOSSIP_STORE_TEMP_FILENAME "gossip_store.tmp"
#define MAX_COUNT_TO_STALE_RATE 10
static u8 gossip_store_version = 0x02;
@ -145,6 +146,63 @@ static bool gossip_store_append(int fd, struct routing_state *rstate, const u8 *
write(fd, msg, msglen) == msglen);
}
/**
* Rewrite the on-disk gossip store, compacting it along the way
*
* Creates a new file, writes all the updates from the `broadcast_state`, and
* then atomically swaps the files.
*/
static void gossip_store_compact(struct gossip_store *gs)
{
size_t count = 0;
u64 index = 0;
int fd;
const u8 *msg;
assert(gs->broadcast);
status_trace(
"Compacting gossip_store with %zu entries, %zu of which are stale",
gs->count, gs->count - gs->broadcast->count);
fd = open(GOSSIP_STORE_TEMP_FILENAME, O_RDWR|O_APPEND|O_CREAT, 0600);
if (fd < 0)
status_failed(
STATUS_FAIL_INTERNAL_ERROR,
"Could not open file for gossip_store compaction");
if (write(fd, &gossip_store_version, sizeof(gossip_store_version))
!= sizeof(gossip_store_version))
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"Writing version to store: %s", strerror(errno));
while ((msg = next_broadcast(gs->broadcast, 0, UINT32_MAX, &index)) != NULL) {
if (!gossip_store_append(fd, gs->rstate, msg)) {
status_broken("Failed writing to gossip store: %s",
strerror(errno));
unlink(GOSSIP_STORE_TEMP_FILENAME);
return;
}
count++;
}
if (rename(GOSSIP_STORE_TEMP_FILENAME, GOSSIP_STORE_FILENAME) == -1) {
status_broken(
"Error swapping compacted gossip_store into place: %s",
strerror(errno));
unlink(GOSSIP_STORE_TEMP_FILENAME);
return;
}
status_trace(
"Compaction completed: dropped %zu messages, new count %zu",
gs->count - count, count);
gs->count = count;
close(gs->fd);
gs->fd = fd;
}
void gossip_store_add(struct gossip_store *gs, const u8 *gossip_msg)
{
size_t stale;
@ -161,9 +219,8 @@ void gossip_store_add(struct gossip_store *gs, const u8 *gossip_msg)
gs->count++;
stale = gs->count - gs->broadcast->count;
if (gs->count >= 100 && stale * MAX_COUNT_TO_STALE_RATE > gs->count) {
/* FIXME(cdecker) Implement rewriting of gossip_store */
}
if (gs->count >= 100 && stale * MAX_COUNT_TO_STALE_RATE > gs->count)
gossip_store_compact(gs);
}
void gossip_store_add_channel_delete(struct gossip_store *gs,

View File

@ -99,6 +99,11 @@ bool fromwire_wireaddr(const u8 **cursor UNNEEDED, size_t *max UNNEEDED, struct
u64 insert_broadcast(struct broadcast_state *bstate UNNEEDED, const u8 *msg UNNEEDED,
u32 timestamp UNNEEDED)
{ fprintf(stderr, "insert_broadcast called!\n"); abort(); }
/* Generated stub for next_broadcast */
const u8 *next_broadcast(struct broadcast_state *bstate UNNEEDED,
u32 timestamp_min UNNEEDED, u32 timestamp_max UNNEEDED,
u64 *last_index UNNEEDED)
{ fprintf(stderr, "next_broadcast called!\n"); abort(); }
/* Generated stub for onion_type_name */
const char *onion_type_name(int e UNNEEDED)
{ fprintf(stderr, "onion_type_name called!\n"); abort(); }

View File

@ -63,6 +63,11 @@ bool fromwire_wireaddr(const u8 **cursor UNNEEDED, size_t *max UNNEEDED, struct
u64 insert_broadcast(struct broadcast_state *bstate UNNEEDED, const u8 *msg UNNEEDED,
u32 timestamp UNNEEDED)
{ fprintf(stderr, "insert_broadcast called!\n"); abort(); }
/* Generated stub for next_broadcast */
const u8 *next_broadcast(struct broadcast_state *bstate UNNEEDED,
u32 timestamp_min UNNEEDED, u32 timestamp_max UNNEEDED,
u64 *last_index UNNEEDED)
{ fprintf(stderr, "next_broadcast called!\n"); abort(); }
/* Generated stub for onion_type_name */
const char *onion_type_name(int e UNNEEDED)
{ fprintf(stderr, "onion_type_name called!\n"); abort(); }

View File

@ -61,6 +61,11 @@ bool fromwire_wireaddr(const u8 **cursor UNNEEDED, size_t *max UNNEEDED, struct
u64 insert_broadcast(struct broadcast_state *bstate UNNEEDED, const u8 *msg UNNEEDED,
u32 timestamp UNNEEDED)
{ fprintf(stderr, "insert_broadcast called!\n"); abort(); }
/* Generated stub for next_broadcast */
const u8 *next_broadcast(struct broadcast_state *bstate UNNEEDED,
u32 timestamp_min UNNEEDED, u32 timestamp_max UNNEEDED,
u64 *last_index UNNEEDED)
{ fprintf(stderr, "next_broadcast called!\n"); abort(); }
/* Generated stub for onion_type_name */
const char *onion_type_name(int e UNNEEDED)
{ fprintf(stderr, "onion_type_name called!\n"); abort(); }