diff --git a/CHANGELOG.md b/CHANGELOG.md index a523f16d8..b77158b75 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,9 +22,9 @@ changes. ### Fixed - JSON API: uppercase invoices now parsed correctly (broken in 0.6.2). +- JSON API: commands are once again read even if one hasn't responded yet (broken in 0.6.2). - pylightning: handle multiple simultanous RPC replies reliably. - ### Security diff --git a/lightningd/json_stream.c b/lightningd/json_stream.c index 97667bef0..11ab28150 100644 --- a/lightningd/json_stream.c +++ b/lightningd/json_stream.c @@ -23,7 +23,9 @@ struct json_stream { /* Who is io_writing from this buffer now: NULL if nobody is. */ struct io_conn *reader; - struct io_plan *(*reader_cb)(struct io_conn *conn, void *arg); + struct io_plan *(*reader_cb)(struct io_conn *conn, + struct json_stream *js, + void *arg); void *reader_arg; size_t len_read; @@ -263,7 +265,7 @@ static struct io_plan *json_stream_output_write(struct io_conn *conn, /* We're not doing io_write now, unset. */ js->reader = NULL; if (!json_stream_still_writing(js)) - return js->reader_cb(conn, js->reader_arg); + return js->reader_cb(conn, js, js->reader_arg); return io_out_wait(conn, js, json_stream_output_write, js); } @@ -276,6 +278,7 @@ static struct io_plan *json_stream_output_write(struct io_conn *conn, struct io_plan *json_stream_output_(struct json_stream *js, struct io_conn *conn, struct io_plan *(*cb)(struct io_conn *conn, + struct json_stream *js, void *arg), void *arg) { diff --git a/lightningd/json_stream.h b/lightningd/json_stream.h index a79fff890..29c05fce3 100644 --- a/lightningd/json_stream.h +++ b/lightningd/json_stream.h @@ -85,12 +85,14 @@ json_add_member(struct json_stream *js, const char *fieldname, typesafe_cb_preargs(struct io_plan *, \ void *, \ (cb), (arg), \ - struct io_conn *), \ + struct io_conn *, \ + struct json_stream *), \ (arg)) struct io_plan *json_stream_output_(struct json_stream *js, struct io_conn *conn, struct io_plan *(*cb)(struct io_conn *conn, + struct json_stream *js, void *arg), void *arg); diff --git a/lightningd/jsonrpc.c b/lightningd/jsonrpc.c index 825bc1514..d0adad463 100644 --- a/lightningd/jsonrpc.c +++ b/lightningd/jsonrpc.c @@ -31,6 +31,8 @@ #include #include +/* This represents a JSON RPC connection. It can invoke multiple commands, but + * a command can outlive the connection, which could close any time. */ struct json_connection { /* The global state */ struct lightningd *ld; @@ -53,19 +55,53 @@ struct json_connection { /* We've been told to stop. */ bool stop; - /* Current command. */ - struct command *command; + /* Our commands */ + struct list_head commands; - /* Our json_stream */ - struct json_stream *js; + /* Our json_streams (owned by the commands themselves while running). + * Since multiple streams could start returning data at once, we + * always service these in order, freeing once empty. */ + struct json_stream **js_arr; }; +/* The command itself usually owns the stream, because jcon may get closed. + * The command transfers ownership once it's done though. */ +static struct json_stream *jcon_new_json_stream(const tal_t *ctx, + struct json_connection *jcon, + struct command *writer) +{ + /* Wake writer to start streaming, in case it's not already. */ + io_wake(jcon); + + /* FIXME: Keep streams around for recycling. */ + return *tal_arr_expand(&jcon->js_arr) = new_json_stream(ctx, writer); +} + +static void jcon_remove_json_stream(struct json_connection *jcon, + struct json_stream *js) +{ + for (size_t i = 0; i < tal_count(jcon->js_arr); i++) { + if (js != jcon->js_arr[i]) + continue; + + memmove(jcon->js_arr + i, + jcon->js_arr + i + 1, + (tal_count(jcon->js_arr) - i - 1) + * sizeof(jcon->js_arr[i])); + tal_resize(&jcon->js_arr, tal_count(jcon->js_arr)-1); + return; + } + abort(); +} + /* jcon and cmd have separate lifetimes: we detach them on either destruction */ static void destroy_jcon(struct json_connection *jcon) { - if (jcon->command) { - log_debug(jcon->log, "Abandoning command"); - jcon->command->jcon = NULL; + struct command *c; + + list_for_each(&jcon->commands, c, list) { + log_debug(jcon->log, "Abandoning command %s", c->json_cmd->name); + c->jcon = NULL; } /* Make sure this happens last! */ @@ -313,9 +349,7 @@ static void destroy_command(struct command *cmd) "Command returned result after jcon close"); return; } - - assert(cmd->jcon->command == cmd); - cmd->jcon->command = NULL; + list_del_from(&cmd->jcon->commands, &cmd->list); } void command_success(struct command *cmd, struct json_stream *result) @@ -375,32 +409,29 @@ static void json_command_malformed(struct json_connection *jcon, const char *id, const char *error) { - /* FIXME: We only allow one command at a time */ - assert(!jcon->js); - jcon->js = new_json_stream(jcon, NULL); - io_wake(jcon); + /* NULL writer is OK here, since we close it immediately. */ + struct json_stream *js = jcon_new_json_stream(jcon, jcon, NULL); - json_stream_append_fmt(jcon->js, + json_stream_append_fmt(js, "{ \"jsonrpc\": \"2.0\", \"id\" : %s," " \"error\" : " "{ \"code\" : %d," " \"message\" : \"%s\" } }\n\n", id, JSONRPC2_INVALID_REQUEST, error); - json_stream_close(jcon->js, NULL); + json_stream_close(js, NULL); } static struct json_stream *attach_json_stream(struct command *cmd) { - struct json_stream *js = new_json_stream(cmd, cmd); + struct json_stream *js; + + /* If they still care about the result, attach it to them. */ + if (cmd->jcon) + js = jcon_new_json_stream(cmd, cmd->jcon, cmd); + else + js = new_json_stream(cmd, cmd); - /* If they still care about the result, wake them */ - if (cmd->jcon) { - /* FIXME: We only allow one command at a time */ - assert(!cmd->jcon->js); - cmd->jcon->js = js; - io_wake(cmd->jcon); - } assert(!cmd->have_json_stream); cmd->have_json_stream = true; return js; @@ -447,8 +478,7 @@ struct json_stream *json_stream_fail(struct command *cmd, return r; } -/* Returns true if command already completed. */ -static bool parse_request(struct json_connection *jcon, const jsmntok_t tok[]) +static void parse_request(struct json_connection *jcon, const jsmntok_t tok[]) { const jsmntok_t *method, *id, *params; struct command *c; @@ -456,7 +486,7 @@ static bool parse_request(struct json_connection *jcon, const jsmntok_t tok[]) if (tok[0].type != JSMN_OBJECT) { json_command_malformed(jcon, "null", "Expected {} for json command"); - return true; + return; } method = json_get_member(jcon->buffer, tok, "method"); @@ -465,12 +495,12 @@ static bool parse_request(struct json_connection *jcon, const jsmntok_t tok[]) if (!id) { json_command_malformed(jcon, "null", "No id"); - return true; + return; } if (id->type != JSMN_STRING && id->type != JSMN_PRIMITIVE) { json_command_malformed(jcon, "null", "Expected string/primitive for id"); - return true; + return; } /* This is a convenient tal parent for duration of command @@ -485,19 +515,19 @@ static bool parse_request(struct json_connection *jcon, const jsmntok_t tok[]) json_tok_len(id)); c->mode = CMD_NORMAL; c->ok = NULL; - jcon->command = c; + list_add_tail(&jcon->commands, &c->list); tal_add_destructor(c, destroy_command); if (!method || !params) { command_fail(c, JSONRPC2_INVALID_REQUEST, method ? "No params" : "No method"); - return true; + return; } if (method->type != JSMN_STRING) { command_fail(c, JSONRPC2_INVALID_REQUEST, "Expected string for method"); - return true; + return; } c->json_cmd = find_cmd(jcon->buffer, method); @@ -506,50 +536,52 @@ static bool parse_request(struct json_connection *jcon, const jsmntok_t tok[]) "Unknown command '%.*s'", method->end - method->start, jcon->buffer + method->start); - return true; + return; } if (c->json_cmd->deprecated && !deprecated_apis) { command_fail(c, JSONRPC2_METHOD_NOT_FOUND, "Command '%.*s' is deprecated", method->end - method->start, jcon->buffer + method->start); - return true; + return; } db_begin_transaction(jcon->ld->wallet->db); c->json_cmd->dispatch(c, jcon->buffer, params); db_commit_transaction(jcon->ld->wallet->db); - /* If they didn't complete it, they must call command_still_pending */ - if (jcon->command == c) + /* If they didn't complete it, they must call command_still_pending. + * If they completed it, it's freed already. */ + list_for_each(&jcon->commands, c, list) assert(c->pending); - - return jcon->command == NULL; } /* Mutual recursion */ static struct io_plan *stream_out_complete(struct io_conn *conn, + struct json_stream *js, struct json_connection *jcon); static struct io_plan *start_json_stream(struct io_conn *conn, struct json_connection *jcon) { /* If something has created an output buffer, start streaming. */ - if (jcon->js) - return json_stream_output(jcon->js, conn, + if (tal_count(jcon->js_arr)) + return json_stream_output(jcon->js_arr[0], conn, stream_out_complete, jcon); + /* Tell reader it can run next command. */ + io_wake(conn); + /* Wait for attach_json_stream */ return io_out_wait(conn, jcon, start_json_stream, jcon); } /* Command has completed writing, and we've written it all out to conn. */ static struct io_plan *stream_out_complete(struct io_conn *conn, + struct json_stream *js, struct json_connection *jcon) { - /* Free up the json_stream for next command. */ - assert(jcon->js); - jcon->js = tal_free(jcon->js); + jcon_remove_json_stream(jcon, js); if (jcon->stop) { log_unusual(jcon->log, "JSON-RPC shutdown"); @@ -558,9 +590,6 @@ static struct io_plan *stream_out_complete(struct io_conn *conn, return io_close(conn); } - /* Tell reader to run next command. */ - io_wake(conn); - /* Wait for more output. */ return start_json_stream(conn, jcon); } @@ -569,7 +598,7 @@ static struct io_plan *read_json(struct io_conn *conn, struct json_connection *jcon) { jsmntok_t *toks; - bool valid, completed; + bool valid; if (jcon->len_read) log_io(jcon->log, LOG_IO_IN, "", @@ -581,7 +610,7 @@ static struct io_plan *read_json(struct io_conn *conn, tal_resize(&jcon->buffer, jcon->used * 2); /* We wait for pending output to be consumed, to avoid DoS */ - if (jcon->js) { + if (tal_count(jcon->js_arr) != 0) { jcon->len_read = 0; return io_wait(conn, conn, read_json, jcon); } @@ -607,26 +636,20 @@ static struct io_plan *read_json(struct io_conn *conn, goto read_more; } - completed = parse_request(jcon, toks); + parse_request(jcon, toks); /* Remove first {}. */ memmove(jcon->buffer, jcon->buffer + toks[0].end, tal_count(jcon->buffer) - toks[0].end); jcon->used -= toks[0].end; - /* If we haven't completed, wait for cmd completion. */ - jcon->len_read = 0; - if (!completed) { - tal_free(toks); - return io_wait(conn, conn, read_json, jcon); - } - /* If we have more to process, try again. FIXME: this still gets * first priority in io_loop, so can starve others. Hack would be * a (non-zero) timer, but better would be to have io_loop avoid * such livelock */ if (jcon->used) { tal_free(toks); + jcon->len_read = 0; return io_always(conn, read_json, jcon); } @@ -648,9 +671,9 @@ static struct io_plan *jcon_connected(struct io_conn *conn, jcon->used = 0; jcon->buffer = tal_arr(jcon, char, 64); jcon->stop = false; - jcon->js = NULL; + jcon->js_arr = tal_arr(jcon, struct json_stream *, 0); jcon->len_read = 0; - jcon->command = NULL; + list_head_init(&jcon->commands); /* We want to log on destruction, so we free this in destructor. */ jcon->log = new_log(ld->log_book, ld->log_book, "%sjcon fd %i:", diff --git a/lightningd/jsonrpc.h b/lightningd/jsonrpc.h index c8f06f463..c51c75565 100644 --- a/lightningd/jsonrpc.h +++ b/lightningd/jsonrpc.h @@ -3,7 +3,7 @@ #include "config.h" #include #include -#include +#include #include #include #include @@ -19,6 +19,8 @@ enum command_mode { /* Context for a command (from JSON, but might outlive the connection!). */ /* FIXME: move definition into jsonrpc.c */ struct command { + /* Off json_cmd->commands */ + struct list_node list; /* The global state */ struct lightningd *ld; /* The 'id' which we need to include in the response. */ diff --git a/tests/test_misc.py b/tests/test_misc.py index 3cc2829ff..f1974a7af 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -605,7 +605,7 @@ def test_multirpc(node_factory): b'{"id":5,"jsonrpc":"2.0","method":"listpeers","params":[]}', b'{"id":6,"jsonrpc":"2.0","method":"listpeers","params":[]}', b'{"method": "invoice", "params": [100, "foo", "foo"], "jsonrpc": "2.0", "id": 7 }', - # FIXME: b'{"method": "waitinvoice", "params": ["foo"], "jsonrpc" : "2.0", "id": 8 }', + b'{"method": "waitinvoice", "params": ["foo"], "jsonrpc" : "2.0", "id": 8 }', b'{"method": "delinvoice", "params": ["foo", "unpaid"], "jsonrpc" : "2.0", "id": 9 }', ]