broadcast: invert ownership of messages.

Make the update/announce messages own the element in the broadcast map
not the other way around.

Then we keep a pointer to the message, and when we free it
(eg. channel closed, update replaces it), it gets freed from the
broadcast map automatically.

The result is much nicer!

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell 2018-05-10 21:52:37 +09:30 committed by Christian Decker
parent 8940528bdb
commit c71e16f784
8 changed files with 60 additions and 129 deletions

View File

@ -26,35 +26,21 @@ static void destroy_queued_message(struct queued_message *msg,
static struct queued_message *new_queued_message(const tal_t *ctx,
struct broadcast_state *bstate,
const u8 *payload TAKES,
const u8 *payload,
u64 index)
{
struct queued_message *msg = tal(ctx, struct queued_message);
msg->payload = tal_dup_arr(msg, u8, payload, tal_len(payload), 0);
msg->payload = payload;
msg->index = index;
uintmap_add(&bstate->broadcasts, index, msg);
tal_add_destructor2(msg, destroy_queued_message, bstate);
return msg;
}
bool replace_broadcast(const tal_t *ctx,
struct broadcast_state *bstate,
u64 *index,
const u8 *payload TAKES)
void insert_broadcast(struct broadcast_state *bstate, const u8 *payload)
{
struct queued_message *msg;
bool evicted = false;
msg = uintmap_get(&bstate->broadcasts, *index);
if (msg) {
tal_free(msg);
evicted = true;
}
/* Now add the message to the queue */
msg = new_queued_message(ctx, bstate, payload, bstate->next_index++);
*index = msg->index;
return evicted;
/* Free payload, free index. */
new_queued_message(payload, bstate, payload, bstate->next_index++);
}
const u8 *next_broadcast(struct broadcast_state *bstate, u64 *last_index)
@ -66,13 +52,3 @@ const u8 *next_broadcast(struct broadcast_state *bstate, u64 *last_index)
return m->payload;
return NULL;
}
const u8 *get_broadcast(struct broadcast_state *bstate, u64 msgidx)
{
struct queued_message *m;
m = uintmap_get(&bstate->broadcasts, msgidx);
if (m)
return m->payload;
return NULL;
}

View File

@ -16,17 +16,10 @@ struct broadcast_state {
struct broadcast_state *new_broadcast_state(tal_t *ctx);
/* Replace a queued message with @index, if it matches the type and
* tag for the new message. The new message will be queued with the
* next highest index. @index is updated to hold the index of the
* newly queued message*/
bool replace_broadcast(const tal_t *ctx,
struct broadcast_state *bstate,
u64 *index,
const u8 *payload TAKES);
/* Append a queued message for broadcast. Freeing the msg will remove it. */
void insert_broadcast(struct broadcast_state *bstate, const u8 *msg);
/* Return the broadcast with index >= *last_index, and update *last_index.
* There's no broadcast with index 0. */
const u8 *next_broadcast(struct broadcast_state *bstate, u64 *last_index);
const u8 *get_broadcast(struct broadcast_state *bstate, u64 msgidx);
#endif /* LIGHTNING_GOSSIPD_BROADCAST_H */

View File

@ -902,15 +902,11 @@ static void handle_get_update(struct peer *peer, const u8 *msg)
&scid));
update = NULL;
} else {
/* We want (public) update that comes from our end. */
/* We want the update that comes from our end. */
if (pubkey_eq(&chan->nodes[0]->id, &peer->daemon->id))
update = get_broadcast(rstate->broadcasts,
chan->half[0]
.channel_update_msgidx);
update = chan->half[0].channel_update;
else if (pubkey_eq(&chan->nodes[1]->id, &peer->daemon->id))
update = get_broadcast(rstate->broadcasts,
chan->half[1]
.channel_update_msgidx);
update = chan->half[1].channel_update;
else {
status_unusual("peer %s scid %s: not our channel?",
type_to_string(tmpctx, struct pubkey,
@ -1253,8 +1249,8 @@ static void append_half_channel(struct gossip_getchannels_entry **entries,
if (!c)
return;
/* Don't mention non-public inactive channels. */
if (!c->active && !c->channel_update_msgidx)
/* Don't mention inactive or unannounced channels. */
if (!c->active && !c->channel_update)
return;
n = tal_count(*entries);
@ -1266,9 +1262,9 @@ static void append_half_channel(struct gossip_getchannels_entry **entries,
e->satoshis = chan->satoshis;
e->active = c->active;
e->flags = c->flags;
e->public = chan->public && (c->channel_update_msgidx != 0);
e->public = chan->public && (c->channel_update != NULL);
e->short_channel_id = chan->scid;
e->last_update_timestamp = c->channel_update_msgidx ? c->last_timestamp : -1;
e->last_update_timestamp = c->channel_update ? c->last_timestamp : -1;
if (e->last_update_timestamp >= 0) {
e->base_fee_msat = c->base_fee;
e->fee_per_millionth = c->proportional_fee;
@ -1459,14 +1455,10 @@ static void gossip_send_keepalive_update(struct routing_state *rstate,
u64 htlc_minimum_msat;
u16 flags, cltv_expiry_delta;
u8 *update, *msg, *err;
const u8 *old_update;
/* Parse old update */
old_update = get_broadcast(rstate->broadcasts,
hc->channel_update_msgidx);
if (!fromwire_channel_update(
old_update, &sig, &chain_hash, &scid, &timestamp,
hc->channel_update, &sig, &chain_hash, &scid, &timestamp,
&flags, &cltv_expiry_delta, &htlc_minimum_msat, &fee_base_msat,
&fee_proportional_millionths)) {
status_failed(
@ -1524,8 +1516,8 @@ static void gossip_refresh_network(struct daemon *daemon)
for (size_t i = 0; i < tal_count(n->chans); i++) {
struct half_chan *hc = half_chan_from(n, n->chans[i]);
if (!hc->channel_update_msgidx) {
/* Connection is not public yet, so don't even
if (!hc->channel_update) {
/* Connection is not announced yet, so don't even
* try to re-announce it */
continue;
}
@ -2329,7 +2321,6 @@ static struct io_plan *handle_disable_channel(struct io_conn *conn,
secp256k1_ecdsa_signature sig;
u64 htlc_minimum_msat;
u8 *err;
const u8 *old_update;
if (!fromwire_gossip_disable_channel(msg, &scid, &direction, &active) ) {
status_unusual("Unable to parse %s",
@ -2352,7 +2343,7 @@ static struct io_plan *handle_disable_channel(struct io_conn *conn,
hc->active = active;
if (!hc->channel_update_msgidx) {
if (!hc->channel_update) {
status_trace(
"Channel %s/%d doesn't have a channel_update yet, can't "
"disable",
@ -2361,11 +2352,8 @@ static struct io_plan *handle_disable_channel(struct io_conn *conn,
goto fail;
}
old_update = get_broadcast(daemon->rstate->broadcasts,
hc->channel_update_msgidx);
if (!fromwire_channel_update(
old_update, &sig, &chain_hash, &scid, &timestamp,
hc->channel_update, &sig, &chain_hash, &scid, &timestamp,
&flags, &cltv_expiry_delta, &htlc_minimum_msat, &fee_base_msat,
&fee_proportional_millionths)) {
status_failed(

View File

@ -147,7 +147,7 @@ static struct node *new_node(struct routing_state *rstate,
n->id = *id;
n->chans = tal_arr(n, struct chan *, 0);
n->alias = NULL;
n->node_announce_msgidx = 0;
n->node_announcement = NULL;
n->last_timestamp = -1;
n->addresses = tal_arr(n, struct wireaddr, 0);
node_map_add(rstate->nodes, n);
@ -193,8 +193,7 @@ static void init_half_chan(struct routing_state *rstate,
{
struct half_chan *c = &chan->half[idx];
c->channel_update_msgidx = 0;
c->private_update = NULL;
c->channel_update = NULL;
c->unroutable_until = 0;
c->active = false;
c->flags = idx;
@ -228,7 +227,7 @@ struct chan *new_chan(struct routing_state *rstate,
chan->nodes[n1idx] = n1;
chan->nodes[!n1idx] = n2;
chan->txout_script = NULL;
chan->channel_announce_msgidx = 0;
chan->channel_announce = NULL;
chan->public = false;
chan->satoshis = 0;
@ -611,8 +610,6 @@ bool routing_add_channel_announcement(struct routing_state *rstate,
struct pubkey node_id_2;
struct pubkey bitcoin_key_1;
struct pubkey bitcoin_key_2;
bool old_chan, old_public;
u64 old_msgidx;
fromwire_channel_announcement(
tmpctx, msg, &node_signature_1, &node_signature_2,
@ -622,35 +619,24 @@ bool routing_add_channel_announcement(struct routing_state *rstate,
* local_add_channel(); normally we don't accept new
* channel_announcements. See handle_channel_announcement. */
chan = get_channel(rstate, &scid);
old_chan = chan;
if (!chan)
chan = new_chan(rstate, &scid, &node_id_1, &node_id_2);
old_public = chan->public;
old_msgidx = chan->channel_announce_msgidx;
/* Channel is now public. */
chan->public = true;
chan->satoshis = satoshis;
if (replace_broadcast(chan, rstate->broadcasts,
&chan->channel_announce_msgidx, msg)) {
status_broken("Announcement %s was replaced: %s, %s, msgidx was %"PRIu64" now %"PRIu64"?",
tal_hex(tmpctx, msg),
old_chan ? "preexisting" : "new channel",
old_public ? "public" : "not public",
old_msgidx, chan->channel_announce_msgidx);
return false;
}
chan->channel_announce = tal_dup_arr(chan, u8, msg, tal_len(msg), 0);
/* If we have previously private updates for channels, process now */
/* Now we can broadcast channel announce */
insert_broadcast(rstate->broadcasts, chan->channel_announce);
/* If we had private updates for channels, we can broadcast them too. */
for (size_t i = 0; i < ARRAY_SIZE(chan->half); i++) {
const u8 *update = chan->half[i].private_update;
if (update) {
chan->half[i].private_update = NULL;
routing_add_channel_update(rstate, take(update));
}
if (!chan->half[i].channel_update)
continue;
insert_broadcast(rstate->broadcasts,
chan->half[i].channel_update);
}
return true;
@ -960,18 +946,18 @@ bool routing_add_channel_update(struct routing_state *rstate,
(flags & ROUTING_FLAGS_DISABLED) == 0, timestamp,
htlc_minimum_msat);
/* Replace any old one. */
tal_free(chan->half[direction].channel_update);
chan->half[direction].channel_update
= tal_dup_arr(chan, u8, update, tal_len(update), 0);
/* For private channels, we get updates without an announce: don't
* broadcast them! */
if (chan->channel_announce_msgidx == 0) {
tal_free(chan->half[direction].private_update);
chan->half[direction].private_update
= tal_dup_arr(chan, u8, update, tal_len(update), 0);
if (!chan->channel_announce)
return true;
}
replace_broadcast(chan, rstate->broadcasts,
&chan->half[direction].channel_update_msgidx,
update);
insert_broadcast(rstate->broadcasts,
chan->half[direction].channel_update);
return true;
}
@ -1131,7 +1117,7 @@ bool routing_add_node_announcement(struct routing_state *rstate, const u8 *msg T
node = get_node(rstate, &node_id);
/* May happen if we accepted the node_announcement due to a local
* channel, for which we didn't have the announcement hust yet. */
* channel, for which we didn't have the announcement yet. */
if (node == NULL)
return false;
@ -1144,9 +1130,9 @@ bool routing_add_node_announcement(struct routing_state *rstate, const u8 *msg T
tal_free(node->alias);
node->alias = tal_dup_arr(node, u8, alias, 32, 0);
replace_broadcast(node, rstate->broadcasts,
&node->node_announce_msgidx,
msg);
tal_free(node->node_announcement);
node->node_announcement = tal_dup_arr(node, u8, msg, tal_len(msg), 0);
insert_broadcast(rstate->broadcasts, node->node_announcement);
return true;
}

View File

@ -34,11 +34,8 @@ struct half_chan {
* things indicated direction wrt the `channel_id` */
u16 flags;
/* Cached `channel_update` we might forward to new peers (or 0) */
u64 channel_update_msgidx;
/* If it's a private update, it's not in the broadcast map. */
const u8 *private_update;
/* Cached `channel_update` we might forward to new peers (or NULL) */
const u8 *channel_update;
/* If greater than current time, this connection should not
* be used for routing. */
@ -57,8 +54,8 @@ struct chan {
/* node[0].id < node[1].id */
struct node *nodes[2];
/* Cached `channel_announcement` we might forward to new peers (or 0) */
u64 channel_announce_msgidx;
/* NULL if not announced yet */
const u8 *channel_announce;
/* Is this a public channel, or was it only added locally? */
bool public;
@ -94,8 +91,8 @@ struct node {
/* Color to be used when displaying the name */
u8 rgb_color[3];
/* Cached `node_announcement` we might forward to new peers (or 0). */
u64 node_announce_msgidx;
/* Cached `node_announcement` we might forward to new peers (or NULL). */
const u8 *node_announcement;
};
const secp256k1_pubkey *node_map_keyof_node(const struct node *n);

View File

@ -89,15 +89,12 @@ u8 fromwire_u8(const u8 **cursor UNNEEDED, size_t *max UNNEEDED)
/* Generated stub for fromwire_wireaddr */
bool fromwire_wireaddr(const u8 **cursor UNNEEDED, size_t *max UNNEEDED, struct wireaddr *addr UNNEEDED)
{ fprintf(stderr, "fromwire_wireaddr called!\n"); abort(); }
/* Generated stub for insert_broadcast */
void insert_broadcast(struct broadcast_state *bstate UNNEEDED, const u8 *msg UNNEEDED)
{ fprintf(stderr, "insert_broadcast called!\n"); abort(); }
/* Generated stub for onion_type_name */
const char *onion_type_name(int e UNNEEDED)
{ fprintf(stderr, "onion_type_name called!\n"); abort(); }
/* Generated stub for replace_broadcast */
bool replace_broadcast(const tal_t *ctx UNNEEDED,
struct broadcast_state *bstate UNNEEDED,
u64 *index UNNEEDED,
const u8 *payload TAKES UNNEEDED)
{ fprintf(stderr, "replace_broadcast called!\n"); abort(); }
/* Generated stub for sanitize_error */
char *sanitize_error(const tal_t *ctx UNNEEDED, const u8 *errmsg UNNEEDED,
struct channel_id *channel_id UNNEEDED)

View File

@ -53,15 +53,12 @@ u8 fromwire_u8(const u8 **cursor UNNEEDED, size_t *max UNNEEDED)
/* Generated stub for fromwire_wireaddr */
bool fromwire_wireaddr(const u8 **cursor UNNEEDED, size_t *max UNNEEDED, struct wireaddr *addr UNNEEDED)
{ fprintf(stderr, "fromwire_wireaddr called!\n"); abort(); }
/* Generated stub for insert_broadcast */
void insert_broadcast(struct broadcast_state *bstate UNNEEDED, const u8 *msg UNNEEDED)
{ fprintf(stderr, "insert_broadcast called!\n"); abort(); }
/* Generated stub for onion_type_name */
const char *onion_type_name(int e UNNEEDED)
{ fprintf(stderr, "onion_type_name called!\n"); abort(); }
/* Generated stub for replace_broadcast */
bool replace_broadcast(const tal_t *ctx UNNEEDED,
struct broadcast_state *bstate UNNEEDED,
u64 *index UNNEEDED,
const u8 *payload TAKES UNNEEDED)
{ fprintf(stderr, "replace_broadcast called!\n"); abort(); }
/* Generated stub for sanitize_error */
char *sanitize_error(const tal_t *ctx UNNEEDED, const u8 *errmsg UNNEEDED,
struct channel_id *channel_id UNNEEDED)

View File

@ -51,15 +51,12 @@ u8 fromwire_u8(const u8 **cursor UNNEEDED, size_t *max UNNEEDED)
/* Generated stub for fromwire_wireaddr */
bool fromwire_wireaddr(const u8 **cursor UNNEEDED, size_t *max UNNEEDED, struct wireaddr *addr UNNEEDED)
{ fprintf(stderr, "fromwire_wireaddr called!\n"); abort(); }
/* Generated stub for insert_broadcast */
void insert_broadcast(struct broadcast_state *bstate UNNEEDED, const u8 *msg UNNEEDED)
{ fprintf(stderr, "insert_broadcast called!\n"); abort(); }
/* Generated stub for onion_type_name */
const char *onion_type_name(int e UNNEEDED)
{ fprintf(stderr, "onion_type_name called!\n"); abort(); }
/* Generated stub for replace_broadcast */
bool replace_broadcast(const tal_t *ctx UNNEEDED,
struct broadcast_state *bstate UNNEEDED,
u64 *index UNNEEDED,
const u8 *payload TAKES UNNEEDED)
{ fprintf(stderr, "replace_broadcast called!\n"); abort(); }
/* Generated stub for sanitize_error */
char *sanitize_error(const tal_t *ctx UNNEEDED, const u8 *errmsg UNNEEDED,
struct channel_id *channel_id UNNEEDED)