From b6650425b9f06d924c399f9e1b952c9357e332e3 Mon Sep 17 00:00:00 2001 From: Christian Decker Date: Sat, 2 Jan 2021 14:28:31 +0100 Subject: [PATCH] pyln: Add notifications support to LightningRpc Changelog-Added: pyln-client: Added support for command notifications to LightningRpc via the `notify` context-manager. --- contrib/pyln-client/pyln/client/lightning.py | 73 +++++++++++++++++--- 1 file changed, 63 insertions(+), 10 deletions(-) diff --git a/contrib/pyln-client/pyln/client/lightning.py b/contrib/pyln-client/pyln/client/lightning.py index 4e0fb388e..c99122d21 100644 --- a/contrib/pyln-client/pyln/client/lightning.py +++ b/contrib/pyln-client/pyln/client/lightning.py @@ -1,12 +1,13 @@ -from decimal import Decimal -from math import floor, log10 -from typing import Optional, Union import json import logging import os import socket import warnings +from contextlib import contextmanager +from decimal import Decimal from json import JSONEncoder +from math import floor, log10 +from typing import Optional, Union def _patched_default(self, obj): @@ -283,8 +284,9 @@ class UnixDomainSocketRpc(object): self.decoder = decoder self.executor = executor self.logger = logger + self._notify = None - self.next_id = 0 + self.next_id = 1 def _writeobj(self, sock, obj): s = json.dumps(obj, ensure_ascii=False, cls=self.encoder_cls) @@ -334,18 +336,44 @@ class UnixDomainSocketRpc(object): # FIXME: we open a new socket for every readobj call... sock = UnixSocket(self.socket_path) this_id = self.next_id - self._writeobj(sock, { + self.next_id += 0 + buf = b'' + + if self._notify is not None: + # Opt into the notifications support + self._writeobj(sock, { + "jsonrpc": "2.0", + "method": "notifications", + "id": 0, + "params": { + "enable": True + }, + }) + _, buf = self._readobj(sock, buf) + + request = { "jsonrpc": "2.0", "method": method, "params": payload, "id": this_id, - }) - self.next_id += 1 - buf = b'' + } + + self._writeobj(sock, request) while True: resp, buf = self._readobj(sock, buf) - # FIXME: We should offer a callback for notifications. - if 'method' not in resp or 'id' in resp: + id = resp.get("id", None) + meth = resp.get("method", None) + + if meth == 'message' and self._notify is not None: + n = resp['params'] + self._notify( + message=n.get('message', None), + progress=n.get('progress', None), + request=request + ) + continue + + if meth is None or id is None: break self.logger.debug("Received response for %s call: %r", method, resp) @@ -361,6 +389,31 @@ class UnixDomainSocketRpc(object): raise ValueError("Malformed response, \"result\" missing.") return resp["result"] + @contextmanager + def notify(self, fn): + """Register a notification callback to use for a set of RPC calls. + + This is a context manager and should be used like this: + + ```python + def fn(message, progress, request, **kwargs): + print(message) + + with rpc.notify(fn): + rpc.somemethod() + ``` + + The `fn` function will be called once for each notification + the is sent by `somemethod`. This is a context manager, + meaning that multiple commands can share the same context, and + the same notification function. + + """ + old = self._notify + self._notify = fn + yield + self._notify = old + class LightningRpc(UnixDomainSocketRpc): """