Async connection abstraction

This is an attempt at unifying all the async connection handling into
a reusable module.
This commit is contained in:
Christian Decker 2017-03-10 13:37:20 +01:00
parent 0596a6cc2c
commit 408d2f5170
3 changed files with 118 additions and 0 deletions

View File

@ -39,6 +39,7 @@ LIGHTNINGD_LIB_SRC := \
lightningd/channel.c \
lightningd/channel_config.c \
lightningd/commit_tx.c \
lightningd/connection.c \
lightningd/cryptomsg.c \
lightningd/crypto_sync.c \
lightningd/debug.c \

69
lightningd/connection.c Normal file
View File

@ -0,0 +1,69 @@
#include "connection.h"
#include <ccan/take/take.h>
#include <wire/wire_io.h>
static void daemon_conn_enqueue(struct daemon_conn *dc, u8 *msg)
{
size_t n = tal_count(dc->msg_out);
tal_resize(&dc->msg_out, n + 1);
dc->msg_out[n] = tal_dup_arr(dc->ctx, u8, msg, tal_count(msg), 0);
}
static const u8 *daemon_conn_dequeue(struct daemon_conn *dc)
{
const u8 *msg;
size_t n = tal_count(dc->msg_out);
if (n == 0)
return NULL;
msg = dc->msg_out[0];
memmove(dc->msg_out, dc->msg_out + 1, sizeof(dc->msg_in[0]) * (n-1));
tal_resize(&dc->msg_out, n-1);
return msg;
}
struct io_plan *daemon_conn_read_next(struct io_conn *conn,
struct daemon_conn *dc)
{
dc->msg_in = tal_free(dc->msg_in);
return io_read_wire(conn, dc->ctx, &dc->msg_in, dc->daemon_conn_recv,
dc);
}
static struct io_plan *daemon_conn_write_next(struct io_conn *conn,
struct daemon_conn *dc)
{
const u8 *msg = daemon_conn_dequeue(dc);
if (msg) {
return io_write_wire(conn, take(msg), daemon_conn_write_next, dc);
} else {
return io_out_wait(conn, dc, daemon_conn_write_next, dc);
}
}
static struct io_plan *daemon_conn_start(struct io_conn *conn,
struct daemon_conn *dc)
{
dc->conn = conn;
return io_duplex(conn, daemon_conn_read_next(conn, dc),
daemon_conn_write_next(conn, dc));
}
void daemon_conn_init(tal_t *ctx, struct daemon_conn *dc, int fd,
struct io_plan *(*daemon_conn_recv)(struct io_conn *,
struct daemon_conn *))
{
dc->daemon_conn_recv = daemon_conn_recv;
dc->ctx = ctx;
dc->msg_in = NULL;
dc->msg_out = tal_arr(ctx, u8 *, 0);
dc->conn_fd = fd;
io_new_conn(ctx, fd, daemon_conn_start, dc);
}
void daemon_conn_send(struct daemon_conn *dc, u8 *msg)
{
daemon_conn_enqueue(dc, msg);
io_wake(dc);
}

48
lightningd/connection.h Normal file
View File

@ -0,0 +1,48 @@
#ifndef LIGHTNING_LIGHTNINGD_CONNECTION_H
#define LIGHTNING_LIGHTNINGD_CONNECTION_H
#include "config.h"
#include <ccan/io/io.h>
#include <ccan/short_types/short_types.h>
struct daemon_conn {
/* Context to tallocate all things from, possibly the
* container of this connection. */
tal_t *ctx;
/* Last message we received */
u8 *msg_in;
/* Array of queued outgoing messages */
u8 **msg_out;
int conn_fd;
struct io_conn *conn;
/* Callback for incoming messages */
struct io_plan *(*daemon_conn_recv)(struct io_conn *conn, struct daemon_conn *);
};
/**
* daemon_conn_init - Initialize a new daemon connection
*
* @ctx: context to allocate from
* @dc: daemon_conn to initialize
* @fd: socket file descriptor to wrap
* @daemon_conn_recv: callback function to be called upon receiving a message
*/
void daemon_conn_init(tal_t *ctx, struct daemon_conn *dc, int fd,
struct io_plan *(*daemon_conn_recv)(struct io_conn *,
struct daemon_conn *));
/**
* daemon_conn_send - Enqueue an outgoing message to be sent
*/
void daemon_conn_send(struct daemon_conn *dc, u8 *msg);
/**
* daemon_conn_read_next - Read the next message
*/
struct io_plan *daemon_conn_read_next(struct io_conn *conn,
struct daemon_conn *dc);
#endif /* LIGHTNING_LIGHTNINGD_CONNECTION_H */