#include /* To reach into io_plan: not a public header! */ #include #include #include #include #include #include #include #include #include #include #include #include struct json_stream { /* NULL if we ran OOM! */ struct json_out *jout; /* Who is writing to this buffer now; NULL if nobody is. */ struct command *writer; /* Who is io_writing from this buffer now: NULL if nobody is. */ struct io_conn *reader; struct io_plan *(*reader_cb)(struct io_conn *conn, struct json_stream *js, void *arg); void *reader_arg; size_t len_read; /* Where to log I/O */ struct log *log; }; static void adjust_io_write(struct json_out *jout, ptrdiff_t delta, struct json_stream *js) { /* If io_write is in progress, we shift it to point to new buffer pos */ if (js->reader) /* FIXME: This, or something prettier (io_replan?) belong in ccan/io! */ js->reader->plan[IO_OUT].arg.u1.cp += delta; } struct json_stream *new_json_stream(const tal_t *ctx, struct command *writer, struct log *log) { struct json_stream *js = tal(ctx, struct json_stream); /* FIXME: Add magic so tal_resize can fail! */ js->jout = json_out_new(js); json_out_call_on_move(js->jout, adjust_io_write, js); js->writer = writer; js->reader = NULL; js->log = log; return js; } struct json_stream *json_stream_dup(const tal_t *ctx, struct json_stream *original, struct log *log) { struct json_stream *js = tal_dup(ctx, struct json_stream, original); if (original->jout) js->jout = json_out_dup(js, original->jout); js->log = log; return js; } bool json_stream_still_writing(const struct json_stream *js) { return js->writer != NULL; } void json_stream_log_suppress(struct json_stream *js, const char *cmd_name) { /* Really shouldn't be used for anything else */ assert(streq(cmd_name, "getlog")); js->log = NULL; } /* If we have an allocation failure. */ static void COLD js_oom(struct json_stream *js) { js->jout = tal_free(js->jout); } void json_stream_append(struct json_stream *js, const char *str, size_t len) { char *dest; if (!js->jout) return; dest = json_out_direct(js->jout, len); if (!dest) { js_oom(js); return; } memcpy(dest, str, len); } void json_stream_close(struct json_stream *js, struct command *writer) { /* FIXME: We use writer == NULL for malformed: make writer a void *? * I used to assert(writer); here. */ assert(js->writer == writer); /* Should be well-formed at this point! */ json_out_finished(js->jout); json_stream_append(js, "\n\n", strlen("\n\n")); json_stream_flush(js); js->writer = NULL; } /* Also called when we're oom, so it will kill reader. */ void json_stream_flush(struct json_stream *js) { /* Wake the stream reader. FIXME: Could have a flag here to optimize */ io_wake(js); } char *json_member_direct(struct json_stream *js, const char *fieldname, size_t extra) { char *dest; if (!js->jout) return NULL; dest = json_out_member_direct(js->jout, fieldname, extra); if (!dest) js_oom(js); return dest; } void json_array_start(struct json_stream *js, const char *fieldname) { if (js->jout && !json_out_start(js->jout, fieldname, '[')) js_oom(js); } void json_array_end(struct json_stream *js) { if (js->jout && !json_out_end(js->jout, ']')) js_oom(js); } void json_object_start(struct json_stream *js, const char *fieldname) { if (js->jout && !json_out_start(js->jout, fieldname, '{')) js_oom(js); } void json_object_end(struct json_stream *js) { if (js->jout && !json_out_end(js->jout, '}')) js_oom(js); } void json_object_compat_end(struct json_stream *js) { /* In 0.7.1 we upgraded pylightning to no longer need this. */ #ifdef COMPAT_V070 json_stream_append(js, " ", 1); #endif json_object_end(js); } void json_add_member(struct json_stream *js, const char *fieldname, bool quote, const char *fmt, ...) { va_list ap; va_start(ap, fmt); if (js->jout && !json_out_addv(js->jout, fieldname, quote, fmt, ap)) js_oom(js); va_end(ap); } /* This is where we read the json_stream and write it to conn */ static struct io_plan *json_stream_output_write(struct io_conn *conn, struct json_stream *js) { const char *p; /* Out of memory? Nothing we can do but close conn */ if (!js->jout) return io_close(conn); /* For when we've just done some output */ json_out_consume(js->jout, js->len_read); /* Get how much we can write out from js */ p = json_out_contents(js->jout, &js->len_read); /* Nothing in buffer? */ if (!p) { /* We're not doing io_write now, unset. */ js->reader = NULL; if (!json_stream_still_writing(js)) return js->reader_cb(conn, js, js->reader_arg); return io_out_wait(conn, js, json_stream_output_write, js); } js->reader = conn; if (js->log) log_io(js->log, LOG_IO_OUT, "", p, js->len_read); return io_write(conn, p, js->len_read, json_stream_output_write, js); } struct io_plan *json_stream_output_(struct json_stream *js, struct io_conn *conn, struct io_plan *(*cb)(struct io_conn *conn, struct json_stream *js, void *arg), void *arg) { assert(!js->reader); js->reader_cb = cb; js->reader_arg = arg; js->len_read = 0; return json_stream_output_write(conn, js); }