gossipd: provide new fd to per-peer daemons when we compact it.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell 2019-05-04 15:23:13 +09:30
parent 13717c6ebb
commit d8db4e871f
10 changed files with 82 additions and 13 deletions

View File

@ -793,18 +793,24 @@ static u8 *wait_sync_reply(const tal_t *ctx,
status_trace("... , awaiting %u", replytype);
for (;;) {
int type;
reply = wire_sync_read(ctx, fd);
if (!reply)
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"Could not set sync read from %s: %s",
who, strerror(errno));
if (fromwire_peektype(reply) == replytype) {
type = fromwire_peektype(reply);
if (type == replytype) {
status_trace("Got it!");
break;
}
status_trace("Nope, got %u instead", fromwire_peektype(reply));
status_trace("Nope, got %u instead", type);
msg_enqueue(queue, take(reply));
/* This one has an fd appended */
if (type == WIRE_GOSSIPD_NEW_STORE_FD)
msg_enqueue_fd(queue, fdpass_recv(fd));
}
return reply;
@ -3052,6 +3058,14 @@ int main(int argc, char *argv[])
msg = msg_dequeue(peer->from_gossipd);
if (msg) {
if (fromwire_gossipd_new_store_fd(msg)) {
tal_free(msg);
msg = msg_dequeue(peer->from_gossipd);
new_gossip_store(GOSSIP_STORE_FD,
msg_extract_fd(msg));
tal_free(msg);
continue;
}
status_trace("Now dealing with deferred gossip %u",
fromwire_peektype(msg));
handle_gossip_msg(PEER_FD, &peer->cs, take(msg));
@ -3091,6 +3105,12 @@ int main(int argc, char *argv[])
* connection comes in. */
if (!msg)
peer_failed_connection_lost();
if (fromwire_gossipd_new_store_fd(msg)) {
tal_free(msg);
new_gossip_store(GOSSIP_STORE_FD,
fdpass_recv(GOSSIP_FD));
continue;
}
handle_gossip_msg(PEER_FD, &peer->cs, take(msg));
}
}

View File

@ -1,4 +1,5 @@
#include <bitcoin/script.h>
#include <ccan/fdpass/fdpass.h>
#include <closingd/gen_closing_wire.h>
#include <common/close_tx.h>
#include <common/crypto_sync.h>
@ -16,6 +17,7 @@
#include <common/version.h>
#include <common/wire_error.h>
#include <errno.h>
#include <gossipd/gen_gossip_peerd_wire.h>
#include <hsmd/gen_hsm_wire.h>
#include <inttypes.h>
#include <stdio.h>
@ -99,6 +101,12 @@ static u8 *closing_read_peer_msg(const tal_t *ctx,
msg = peer_or_gossip_sync_read(ctx, PEER_FD, GOSSIP_FD,
cs, &from_gossipd);
if (from_gossipd) {
if (fromwire_gossipd_new_store_fd(msg)) {
tal_free(msg);
new_gossip_store(GOSSIP_STORE_FD,
fdpass_recv(GOSSIP_FD));
continue;
}
handle_gossip_msg(PEER_FD, cs, take(msg));
continue;
}

View File

@ -8,6 +8,7 @@
#include <errno.h>
#include <gossipd/gen_gossip_peerd_wire.h>
#include <sys/select.h>
#include <unistd.h>
#include <wire/peer_wire.h>
#include <wire/wire_sync.h>
@ -152,3 +153,12 @@ handled:
tal_free(msg);
return true;
}
void new_gossip_store(int gossip_store_fd, int new_gossip_store_fd)
{
if (dup2(new_gossip_store_fd, gossip_store_fd) == -1)
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"Could not dup2 new fd %i onto %i: %s",
new_gossip_store_fd, gossip_store_fd,
strerror(errno));
}

View File

@ -74,4 +74,10 @@ bool handle_peer_gossip_or_error(int peer_fd, int gossip_fd, int gossip_store_fd
void handle_gossip_msg(int peer_fd, struct crypto_state *cs,
const u8 *msg TAKES);
/**
* new_gossip_store - handle replacement gossip_store_fd.
* @gossip_store_fd: our fixed fd we expect to use to read gossip_store.
* @new_gossip_store_fd: fd received from gossipd.
*/
void new_gossip_store(int gossip_store_fd, int new_gossip_store_fd);
#endif /* LIGHTNING_COMMON_READ_PEER_MSG_H */

View File

@ -85,8 +85,7 @@ void insert_broadcast(struct broadcast_state **bstate,
insert_broadcast_nostore(*bstate, bcast);
/* If it compacts, it replaces *bstate */
gossip_store_maybe_compact((*bstate)->gs, bstate, &offset);
if (offset)
if (gossip_store_maybe_compact((*bstate)->gs, bstate, &offset))
update_peers_broadcast_index((*bstate)->peers, offset);
}

View File

@ -30,3 +30,6 @@ gossipd_local_channel_update,,htlc_minimum_msat,struct amount_msat
gossipd_local_channel_update,,fee_base_msat,u32
gossipd_local_channel_update,,fee_proportional_millionths,u32
gossipd_local_channel_update,,htlc_maximum_msat,struct amount_msat
# Update your gossip_store fd: + gossip_store_fd
gossipd_new_store_fd,3505

1 # Channel daemon can ask for updates for a specific channel, for sending
30 gossipd_new_store_fd,3505
31
32
33
34
35

View File

@ -476,7 +476,7 @@ disable:
return false;
}
void gossip_store_maybe_compact(struct gossip_store *gs,
bool gossip_store_maybe_compact(struct gossip_store *gs,
struct broadcast_state **bs,
u32 *offset)
{
@ -484,13 +484,13 @@ void gossip_store_maybe_compact(struct gossip_store *gs,
/* Don't compact while loading! */
if (!gs->writable)
return;
return false;
if (gs->count < 1000)
return;
return false;
if (gs->count < (*bs)->count * 1.25)
return;
return false;
gossip_store_compact(gs, bs, offset);
return gossip_store_compact(gs, bs, offset);
}
u64 gossip_store_add(struct gossip_store *gs, const u8 *gossip_msg,

View File

@ -54,9 +54,9 @@ const u8 *gossip_store_get(const tal_t *ctx,
* @bs: a pointer to the broadcast state: replaced if we compact it.
* @offset: the change in the store, if any.
*
* If @offset is non-zero on return, caller must update peers.
* If return value is true, caller must update peers.
*/
void gossip_store_maybe_compact(struct gossip_store *gs,
bool gossip_store_maybe_compact(struct gossip_store *gs,
struct broadcast_state **bs,
u32 *offset);

View File

@ -700,13 +700,28 @@ static u8 *handle_gossip_timestamp_filter(struct peer *peer, const u8 *msg)
* some, but that's a lesser evil than skipping some. */
void update_peers_broadcast_index(struct list_head *peers, u32 offset)
{
struct peer *peer;
struct peer *peer, *next;
list_for_each(peers, peer, list) {
list_for_each_safe(peers, peer, next, list) {
int gs_fd;
if (peer->broadcast_index < offset)
peer->broadcast_index = 0;
else
peer->broadcast_index -= offset;
/*~ Since store has been compacted, they need a new fd for the
* new store. The only one will still work, but after this
* any offsets will refer to the new store. */
gs_fd = gossip_store_readonly_fd(peer->daemon->rstate->broadcasts->gs);
if (gs_fd < 0) {
status_broken("Can't get read-only gossip store fd:"
" killing peer");
tal_free(peer);
} else {
u8 *msg = towire_gossipd_new_store_fd(NULL);
daemon_conn_send(peer->dc, take(msg));
daemon_conn_send_fd(peer->dc, gs_fd);
}
}
}
@ -1670,6 +1685,7 @@ static struct io_plan *peer_msg_in(struct io_conn *conn,
/* These are the ones we send, not them */
case WIRE_GOSSIPD_GET_UPDATE_REPLY:
case WIRE_GOSSIPD_SEND_GOSSIP:
case WIRE_GOSSIPD_NEW_STORE_FD:
break;
}

View File

@ -35,6 +35,7 @@
#include <common/version.h>
#include <common/wire_error.h>
#include <errno.h>
#include <gossipd/gen_gossip_peerd_wire.h>
#include <hsmd/gen_hsm_wire.h>
#include <inttypes.h>
#include <openingd/gen_opening_wire.h>
@ -376,6 +377,12 @@ static u8 *opening_negotiate_msg(const tal_t *ctx, struct state *state,
/* Use standard helper for gossip msgs (forwards, if it's an
* error, exits). */
if (from_gossipd) {
if (fromwire_gossipd_new_store_fd(msg)) {
tal_free(msg);
new_gossip_store(GOSSIP_STORE_FD,
fdpass_recv(GOSSIP_FD));
continue;
}
handle_gossip_msg(PEER_FD, &state->cs, take(msg));
continue;
}