From 95f41287f0e334a9ea395edb4174f7dccea66d42 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Fri, 10 Mar 2017 21:18:43 +1030 Subject: [PATCH] lightningd/subd: new code for subdaemons. This uses a single fd for both status and control. To make this work, we enforce the convention that replies are the same as requests + 100, and that their name ends in "_REPLY". This also means that various daemons can simply exit when done; there's no race between reading request and closing status fds. Signed-off-by: Rusty Russell --- lightningd/Makefile | 2 + lightningd/lightningd.c | 3 +- lightningd/subd.c | 405 ++++++++++++++++++++++++++++++++++++++++ lightningd/subd.h | 123 ++++++++++++ lightningd/subdaemon.c | 6 - lightningd/subdaemon.h | 2 - 6 files changed, 532 insertions(+), 9 deletions(-) create mode 100644 lightningd/subd.c create mode 100644 lightningd/subd.h diff --git a/lightningd/Makefile b/lightningd/Makefile index bf75acb0c..9727997f6 100644 --- a/lightningd/Makefile +++ b/lightningd/Makefile @@ -59,6 +59,7 @@ LIGHTNINGD_SRC := \ lightningd/hsm_control.c \ lightningd/lightningd.c \ lightningd/peer_control.c \ + lightningd/subd.c \ lightningd/subdaemon.c LIGHTNINGD_OBJS := $(LIGHTNINGD_SRC:.c=.o) @@ -73,6 +74,7 @@ LIGHTNINGD_HEADERS_NOGEN = \ lightningd/hsm_control.h \ lightningd/lightningd.h \ lightningd/peer_control.h \ + lightningd/subd.h \ lightningd/subdaemon.h \ $(LIGHTNINGD_OLD_LIB_HEADERS) \ $(LIGHTNINGD_LIB_HEADERS) \ diff --git a/lightningd/lightningd.c b/lightningd/lightningd.c index 8b81e7350..7e08072a1 100644 --- a/lightningd/lightningd.c +++ b/lightningd/lightningd.c @@ -2,6 +2,7 @@ #include "hsm_control.h" #include "lightningd.h" #include "peer_control.h" +#include "subd.h" #include "subdaemon.h" #include #include @@ -177,7 +178,7 @@ int main(int argc, char *argv[]) ld->daemon_dir = find_my_path(ld, argv[0]); register_opts(&ld->dstate); - opt_register_arg("--dev-debugger=", opt_subdaemon_debug, NULL, + opt_register_arg("--dev-debugger=", opt_subd_debug, NULL, ld, "Wait for gdb attach at start of "); /* Handle options and config; move to .lightningd */ diff --git a/lightningd/subd.c b/lightningd/subd.c new file mode 100644 index 000000000..69958dc96 --- /dev/null +++ b/lightningd/subd.c @@ -0,0 +1,405 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static bool move_fd(int from, int to) +{ + if (dup2(from, to) == -1) + return false; + close(from); + return true; +} + +/* FIXME: Expose the ccan/io version? */ +static void set_blocking(int fd, bool block) +{ + int flags = fcntl(fd, F_GETFL); + + if (block) + flags &= ~O_NONBLOCK; + else + flags |= O_NONBLOCK; + + fcntl(fd, F_SETFL, flags); +} + +struct subd_req { + struct list_node list; + + /* Callback for a reply. */ + int reply_type; + bool (*replycb)(struct subd *, const u8 *msg_in, void *reply_data); + void *replycb_data; + int *fd_in; +}; + +static void free_subd_req(struct subd_req *sr) +{ + list_del(&sr->list); +} + +static void add_req(struct subd *sd, int type, + bool (*replycb)(struct subd *, const u8 *, void *), + void *replycb_data, + int *reply_fd_in) +{ + struct subd_req *sr = tal(sd, struct subd_req); + + sr->reply_type = type + SUBD_REPLY_OFFSET; + sr->replycb = replycb; + sr->replycb_data = replycb_data; + sr->fd_in = reply_fd_in; + if (sr->fd_in) + *sr->fd_in = -1; + assert(strends(sd->msgname(sr->reply_type), "_REPLY")); + + /* Keep in FIFO order: we sent in order, so replies will be too. */ + list_add_tail(&sd->reqs, &sr->list); + tal_add_destructor(sr, free_subd_req); +} + +/* Caller must free. */ +static struct subd_req *get_req(struct subd *sd, int reply_type) +{ + struct subd_req *sr; + + list_for_each(&sd->reqs, sr, list) { + if (sr->reply_type == reply_type) + return sr; + } + return NULL; +} + +/* We use sockets, not pipes, because fds are bidir. */ +static int subd(const char *dir, const char *name, bool debug, + int *msgfd, va_list ap) +{ + int childmsg[2], execfail[2]; + pid_t childpid; + int err, fd; + + if (socketpair(AF_LOCAL, SOCK_STREAM, 0, childmsg) != 0) + goto fail; + + if (pipe(execfail) != 0) + goto close_msgfd_fail; + + if (fcntl(execfail[1], F_SETFD, fcntl(execfail[1], F_GETFD) + | FD_CLOEXEC) < 0) + goto close_execfail_fail; + + childpid = fork(); + if (childpid < 0) + goto close_execfail_fail; + + if (childpid == 0) { + int fdnum = 3; + long max; + const char *debug_arg = NULL; + + close(childmsg[0]); + close(execfail[0]); + + // msg = STDIN + if (childmsg[1] != STDIN_FILENO) { + if (!move_fd(childmsg[1], STDIN_FILENO)) + goto child_errno_fail; + } + + /* Dup any extra fds up first. */ + while ((fd = va_arg(ap, int)) != -1) { + /* If this were stdin, dup2 closed! */ + assert(fd != STDIN_FILENO); + if (!move_fd(fd, fdnum)) + goto child_errno_fail; + fdnum++; + } + close(STDOUT_FILENO); + + /* Make (fairly!) sure all other fds are closed. */ + max = sysconf(_SC_OPEN_MAX); + for (fd = fdnum; fd < max; fd++) + close(fd); + + if (debug) + debug_arg = "--debugger"; + execl(path_join(NULL, dir, name), name, debug_arg, NULL); + + child_errno_fail: + err = errno; + /* Gcc's warn-unused-result fail. */ + if (write(execfail[1], &err, sizeof(err))) { + ; + } + exit(127); + } + + close(childmsg[1]); + close(execfail[1]); + + while ((fd = va_arg(ap, int)) != -1) + close(fd); + + /* Child will close this without writing on successful exec. */ + if (read(execfail[0], &err, sizeof(err)) == sizeof(err)) { + close(execfail[0]); + waitpid(childpid, NULL, 0); + errno = err; + return -1; + } + close(execfail[0]); + *msgfd = childmsg[0]; + return childpid; + +close_execfail_fail: + close_noerr(execfail[0]); + close_noerr(execfail[1]); +close_msgfd_fail: + close_noerr(childmsg[0]); + close_noerr(childmsg[1]); +fail: + return -1; +} + +static struct io_plan *sd_msg_read(struct io_conn *conn, struct subd *sd); + +static struct io_plan *sd_msg_reply(struct io_conn *conn, struct subd *sd, + struct subd_req *sr) +{ + int type = fromwire_peektype(sd->msg_in); + bool keep_open; + + if (sr->fd_in) { + /* Don't trust subd to set it blocking. */ + set_blocking(*sr->fd_in, true); + log_info(sd->log, "REPLY %s with fd %i", sd->msgname(type), + *sr->fd_in); + } else + log_info(sd->log, "REPLY %s", sd->msgname(type)); + + /* If not stolen, we'll free this below. */ + tal_steal(sr, sd->msg_in); + keep_open = sr->replycb(sd, sd->msg_in, sr->replycb_data); + tal_free(sr); + + if (!keep_open) + return io_close(conn); + + return io_read_wire(conn, sd, &sd->msg_in, sd_msg_read, sd); +} + +static struct io_plan *sd_msg_read(struct io_conn *conn, struct subd *sd) +{ + int type = fromwire_peektype(sd->msg_in); + const char *str; + int str_len; + const tal_t *tmpctx; + struct subd_req *sr; + + if (type == -1) { + log_unusual(sd->log, "ERROR: Invalid msg output"); + return io_close(conn); + } + + /* First, check for replies. */ + sr = get_req(sd, type); + if (sr) { + /* If we need fd, read it and call us again. */ + if (sr->fd_in && *sr->fd_in == -1) + return io_recv_fd(conn, sr->fd_in, sd_msg_read, sd); + return sd_msg_reply(conn, sd, sr); + } + + /* If not stolen, we'll free this below. */ + tmpctx = tal_tmpctx(sd); + tal_steal(tmpctx, sd->msg_in); + + /* If it's a string. */ + str_len = tal_count(sd->msg_in) - sizeof(be16); + str = (const char *)sd->msg_in + sizeof(be16); + + if (type == STATUS_TRACE) + log_debug(sd->log, "TRACE: %.*s", str_len, str); + else if (type & STATUS_FAIL) + log_unusual(sd->log, "FAILURE %s: %.*s", + sd->msgname(type), str_len, str); + else { + log_info(sd->log, "UPDATE %s", sd->msgname(type)); + if (sd->msgcb) { + enum subd_msg_ret r; + + /* If received from subd, set blocking. */ + if (sd->fd_in != -1) + set_blocking(sd->fd_in, true); + r = sd->msgcb(sd, sd->msg_in, sd->fd_in); + switch (r) { + case SUBD_NEED_FD: + /* Don't free msg_in: we go around again. */ + tal_steal(sd, sd->msg_in); + tal_free(tmpctx); + return io_recv_fd(conn, &sd->fd_in, + sd_msg_read, sd); + case SUBD_COMPLETE: + break; + default: + fatal("Unknown msgcb return for %s:%s: %u", + sd->name, sd->msgname(type), r); + } + } + } + sd->msg_in = NULL; + sd->fd_in = -1; + tal_free(tmpctx); + return io_read_wire(conn, sd, &sd->msg_in, sd_msg_read, sd); +} + +static void destroy_subd(struct subd *sd) +{ + int status; + + switch (waitpid(sd->pid, &status, WNOHANG)) { + case 0: + log_debug(sd->log, "Status closed, but not exited. Killing"); + kill(sd->pid, SIGKILL); + waitpid(sd->pid, &status, 0); + break; + case -1: + log_unusual(sd->log, "Status closed, but waitpid %i says %s", + sd->pid, strerror(errno)); + status = -1; + break; + } + if (sd->finished) + sd->finished(sd, status); +} + +static struct io_plan *msg_send_next(struct io_conn *conn, struct subd *sd) +{ + const u8 *msg = msg_dequeue(&sd->outq); + + if (sd->fd_to_close != -1) { + close(sd->fd_to_close); + sd->fd_to_close = -1; + } + + /* Nothing to do? Wait for msg_enqueue. */ + if (!msg) + return msg_queue_wait(conn, &sd->outq, msg_send_next, sd); + + /* We overload STATUS_TRACE for outgoing to mean "send an fd" */ + if (fromwire_peektype(msg) == STATUS_TRACE) { + const u8 *p = msg + sizeof(be16); + size_t len = tal_count(msg) - sizeof(be16); + sd->fd_to_close = fromwire_u32(&p, &len); + tal_free(msg); + return io_send_fd(conn, sd->fd_to_close, msg_send_next, sd); + } + return io_write_wire(conn, take(msg), msg_send_next, sd); +} + +static struct io_plan *msg_setup(struct io_conn *conn, struct subd *sd) +{ + return io_duplex(conn, + io_read_wire(conn, sd, &sd->msg_in, sd_msg_read, sd), + msg_send_next(conn, sd)); +} + +struct subd *new_subd(const tal_t *ctx, + struct lightningd *ld, + const char *name, + struct peer *peer, + const char *(*msgname)(int msgtype), + enum subd_msg_ret (*msgcb) + (struct subd *, const u8 *, int fd), + void (*finished)(struct subd *, int), + ...) +{ + va_list ap; + struct subd *sd = tal(ctx, struct subd); + int msg_fd; + bool debug; + + debug = ld->dev_debug_subdaemon + && strends(name, ld->dev_debug_subdaemon); + va_start(ap, finished); + sd->pid = subd(ld->daemon_dir, name, debug, &msg_fd, ap); + va_end(ap); + if (sd->pid == (pid_t)-1) { + log_unusual(ld->log, "subd %s failed: %s", + name, strerror(errno)); + return tal_free(sd); + } + sd->ld = ld; + sd->log = new_log(sd, ld->dstate.log_book, "%s(%u):", name, sd->pid); + sd->name = name; + sd->finished = finished; + sd->msgname = msgname; + sd->msgcb = msgcb; + sd->fd_in = -1; + msg_queue_init(&sd->outq, sd); + sd->fd_to_close = -1; + tal_add_destructor(sd, destroy_subd); + list_head_init(&sd->reqs); + + /* conn actually owns daemon: we die when it does. */ + sd->conn = io_new_conn(ctx, msg_fd, msg_setup, sd); + tal_steal(sd->conn, sd); + + log_info(sd->log, "pid %u, msgfd %i", sd->pid, msg_fd); + + sd->peer = tal_steal(sd, peer); + return sd; +} + +void subd_send_msg(struct subd *sd, const u8 *msg_out) +{ + /* We overload STATUS_TRACE for outgoing to mean "send an fd" */ + assert(fromwire_peektype(msg_out) != STATUS_TRACE); + if (!taken(msg_out)) + msg_out = tal_dup_arr(sd, u8, msg_out, tal_len(msg_out), 0); + msg_enqueue(&sd->outq, msg_out); +} + +void subd_send_fd(struct subd *sd, int fd) +{ + /* We overload STATUS_TRACE for outgoing to mean "send an fd" */ + u8 *fdmsg = tal_arr(sd, u8, 0); + towire_u16(&fdmsg, STATUS_TRACE); + towire_u32(&fdmsg, fd); + msg_enqueue(&sd->outq, fdmsg); +} + +void subd_req_(struct subd *sd, + const u8 *msg_out, + int fd_out, int *fd_in, + bool (*replycb)(struct subd *, const u8 *, void *), + void *replycb_data) +{ + subd_send_msg(sd, msg_out); + if (fd_out >= 0) + subd_send_fd(sd, fd_out); + + add_req(sd, fromwire_peektype(msg_out), replycb, replycb_data, fd_in); +} + +char *opt_subd_debug(const char *optarg, struct lightningd *ld) +{ + ld->dev_debug_subdaemon = optarg; + return NULL; +} diff --git a/lightningd/subd.h b/lightningd/subd.h new file mode 100644 index 000000000..24bbd2f03 --- /dev/null +++ b/lightningd/subd.h @@ -0,0 +1,123 @@ +#ifndef LIGHTNING_LIGHTNINGD_SUBD_H +#define LIGHTNING_LIGHTNINGD_SUBD_H +#include "config.h" +#include +#include +#include +#include +#include + +struct io_conn; + +enum subd_msg_ret { + SUBD_NEED_FD, + SUBD_COMPLETE +}; + +/* By convention, replies are requests + 100 */ +#define SUBD_REPLY_OFFSET 100 + +/* One of our subds. */ +struct subd { + /* Name, like John, or "lightningd_hsm" */ + const char *name; + /* The Big Cheese. */ + struct lightningd *ld; + /* pid, for waiting for status when it dies. */ + int pid; + /* Connection. */ + struct io_conn *conn; + + /* If we are associated with a single peer, this points to it. */ + struct peer *peer; + + /* For logging */ + struct log *log; + + /* Callback when non-reply message comes in. */ + enum subd_msg_ret (*msgcb)(struct subd *, const u8 *, int); + const char *(*msgname)(int msgtype); + void (*finished)(struct subd *sd, int status); + + /* Buffer for input. */ + u8 *msg_in; + /* While we're reading an fd in. */ + int fd_in; + + /* Messages queue up here. */ + struct msg_queue outq; + + /* FD to close (used when we just sent it). */ + int fd_to_close; + + /* Callbacks for replies. */ + struct list_head reqs; +}; + +/** + * new_subd - create a new subdaemon. + * @ctx: context to allocate from + * @ld: global state + * @name: basename of daemon + * @peer: peer to take ownership of if non-NULL + * @msgname: function to get name from messages + * @msgcb: function to call when non-fatal message received (or NULL) + * @finished: function to call when it's finished (with exit status). + * @...: the fds to hand as fd 3, 4... terminated with -1. + * + * @msgcb is called with fd == -1 when a message is received; if it + * returns SUBD_NEED_FD, we read an fd from the daemon and call it + * again with that as the third arg. + * + * If this succeeds subd owns @peer. + */ +struct subd *new_subd(const tal_t *ctx, + struct lightningd *ld, + const char *name, + struct peer *peer, + const char *(*msgname)(int msgtype), + enum subd_msg_ret (*msgcb) + (struct subd *, const u8 *, int fd), + void (*finished)(struct subd *, int), ...); + +/** + * subd_send_msg - queue a message to the subdaemon. + * @sd: subdaemon to request + * @msg_out: message (can be take) + */ +void subd_send_msg(struct subd *sd, const u8 *msg_out); + +/** + * subd_send_msg - queue a file descriptor to pass to the subdaemon. + * @sd: subdaemon to request + * @fd: the file descriptor (closed after passing). + */ +void subd_send_fd(struct subd *sd, int fd); + +/** + * subd_req - queue a request to the subdaemon. + * @sd: subdaemon to request + * @msg_out: request message (can be take) + * @fd_out: if >=0 fd to pass at the end of the message (closed after) + * @fd_in: if not NULL, where to put fd read in at end of reply. + * @replycb: callback when reply comes in, returns false to shutdown daemon. + * @replycb_data: final arg to hand to @replycb + * + * @replycb cannot free @sd, so it returns false to remove it. + */ +#define subd_req(sd, msg_out, fd_out, fd_in, replycb, replycb_data) \ + subd_req_((sd), (msg_out), (fd_out), (fd_in), \ + typesafe_cb_preargs(bool, void *, \ + (replycb), (replycb_data), \ + struct subd *, \ + const u8 *), \ + (replycb_data)) +void subd_req_(struct subd *sd, + const u8 *msg_out, + int fd_out, int *fd_in, + bool (*replycb)(struct subd *, const u8 *, void *), + void *replycb_data); + +char *opt_subd_debug(const char *optarg, struct lightningd *ld); + +#endif /* LIGHTNING_LIGHTNINGD_SUBD_H */ diff --git a/lightningd/subdaemon.c b/lightningd/subdaemon.c index dffc2804e..b067d268b 100644 --- a/lightningd/subdaemon.c +++ b/lightningd/subdaemon.c @@ -413,9 +413,3 @@ void subdaemon_req_(struct subdaemon *sd, list_add_tail(&sd->reqs, &sr->list); io_wake(sd); } - -char *opt_subdaemon_debug(const char *optarg, struct lightningd *ld) -{ - ld->dev_debug_subdaemon = optarg; - return NULL; -} diff --git a/lightningd/subdaemon.h b/lightningd/subdaemon.h index c1de98460..073bf74e4 100644 --- a/lightningd/subdaemon.h +++ b/lightningd/subdaemon.h @@ -98,6 +98,4 @@ void subdaemon_req_(struct subdaemon *sd, void (*reqcb)(struct subdaemon *, const u8 *, void *), void *reqcb_data); -char *opt_subdaemon_debug(const char *optarg, struct lightningd *ld); - #endif /* LIGHTNING_LIGHTNINGD_SUBDAEMON_H */