channel: infrastructure for gossipd request/response.

The same as master request/response: we queue up incoming replies we
don't want for later processing.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell 2017-11-29 09:22:29 +10:30 committed by Christian Decker
parent 9de3827199
commit 0a596fb043
1 changed files with 46 additions and 20 deletions

View File

@ -43,6 +43,7 @@
#include <common/version.h>
#include <errno.h>
#include <fcntl.h>
#include <gossipd/gen_gossip_wire.h>
#include <gossipd/routing.h>
#include <hsmd/gen_hsm_client_wire.h>
#include <inttypes.h>
@ -115,9 +116,9 @@ struct peer {
bool post_sabotage;
#endif
/* Messages from master: we queue them since we might be waiting for
* a specific reply. */
struct msg_queue from_master;
/* Messages from master / gossipd: we queue them since we
* might be waiting for a specific reply. */
struct msg_queue from_master, from_gossipd;
struct timers timers;
struct oneshot *commit_timer;
@ -575,43 +576,59 @@ static void maybe_send_shutdown(struct peer *peer)
peer->shutdown_sent[LOCAL] = true;
}
/* This queues other traffic from the master until we get reply. */
static u8 *master_wait_sync_reply(const tal_t *ctx,
struct peer *peer, const u8 *msg,
enum channel_wire_type replytype)
/* This queues other traffic from the fd until we get reply. */
static u8 *wait_sync_reply(const tal_t *ctx,
const u8 *msg,
int replytype,
int fd,
struct msg_queue *queue,
const char *who)
{
u8 *reply;
status_trace("Sending master %s",
channel_wire_type_name(fromwire_peektype(msg)));
status_trace("Sending %s %u", who, fromwire_peektype(msg));
if (!wire_sync_write(MASTER_FD, msg))
if (!wire_sync_write(fd, msg))
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"Could not set sync write to master: %s",
strerror(errno));
"Could not set sync write to %s: %s",
who, strerror(errno));
status_trace("... , awaiting %s",
channel_wire_type_name(replytype));
status_trace("... , awaiting %u", replytype);
for (;;) {
reply = wire_sync_read(ctx, MASTER_FD);
reply = wire_sync_read(ctx, fd);
if (!reply)
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"Could not set sync read from master: %s",
strerror(errno));
"Could not set sync read from %s: %s",
who, strerror(errno));
if (fromwire_peektype(reply) == replytype) {
status_trace("Got it!");
break;
}
status_trace("Nope, got %s instead",
channel_wire_type_name(fromwire_peektype(reply)));
msg_enqueue(&peer->from_master, take(reply));
status_trace("Nope, got %u instead", fromwire_peektype(reply));
msg_enqueue(queue, take(reply));
}
return reply;
}
static u8 *master_wait_sync_reply(const tal_t *ctx,
struct peer *peer, const u8 *msg,
enum channel_wire_type replytype)
{
return wait_sync_reply(ctx, msg, replytype,
MASTER_FD, &peer->from_master, "master");
}
static UNNEEDED u8 *gossipd_wait_sync_reply(const tal_t *ctx,
struct peer *peer, const u8 *msg,
enum gossip_wire_type replytype)
{
return wait_sync_reply(ctx, msg, replytype,
GOSSIP_FD, &peer->from_gossipd, "gossipd");
}
static struct commit_sigs *calc_commitsigs(const tal_t *ctx,
const struct peer *peer,
u64 commit_index)
@ -2432,6 +2449,7 @@ int main(int argc, char *argv[])
peer->have_sigs[LOCAL] = peer->have_sigs[REMOTE] = false;
peer->announce_depth_reached = false;
msg_queue_init(&peer->from_master, peer);
msg_queue_init(&peer->from_gossipd, peer);
msg_queue_init(&peer->peer_out, peer);
peer->peer_outmsg = NULL;
peer->peer_outoff = 0;
@ -2486,6 +2504,14 @@ int main(int argc, char *argv[])
continue;
}
msg = msg_dequeue(&peer->from_gossipd);
if (msg) {
status_trace("Now dealing with deferred gossip %u",
fromwire_peektype(msg));
gossip_in(peer, msg);
continue;
}
if (timer_earliest(&peer->timers, &first)) {
timeout = timespec_to_timeval(
timemono_between(first, now).ts);