pylightning: Add notification subscription handlers
Just like we added the RPC methods, the notification handlers can also be registered using a function decorator, and we auto-subscribe when asked for a manifest. Signed-off-by: Christian Decker <decker.christian@gmail.com>
This commit is contained in:
parent
69953bcb2a
commit
5338d3115f
|
@ -18,6 +18,9 @@ class Plugin(object):
|
|||
self.methods = {}
|
||||
self.options = {}
|
||||
|
||||
# A dict from topics to handler functions
|
||||
self.subscriptions = {}
|
||||
|
||||
if not stdout:
|
||||
self.stdout = sys.stdout
|
||||
if not stdin:
|
||||
|
@ -60,6 +63,33 @@ class Plugin(object):
|
|||
# Register the function with the name
|
||||
self.methods[name] = func
|
||||
|
||||
def add_subscription(self, topic, func):
|
||||
"""Add a subscription to our list of subscriptions.
|
||||
|
||||
A subscription is an association between a topic and a handler
|
||||
function. Adding a subscription means that we will
|
||||
automatically subscribe to events from that topic with
|
||||
`lightningd` and, upon receiving a matching notification, we
|
||||
will call the associated handler. Notice that in order for the
|
||||
automatic subscriptions to work, the handlers need to be
|
||||
registered before we send our manifest, hence before
|
||||
`Plugin.run` is called.
|
||||
|
||||
"""
|
||||
if topic in self.subscriptions:
|
||||
raise ValueError(
|
||||
"Topic {} already has a handler".format(topic)
|
||||
)
|
||||
self.subscriptions[topic] = func
|
||||
|
||||
def subscribe(self, topic):
|
||||
"""Function decorator to register a notification handler.
|
||||
"""
|
||||
def decorator(f):
|
||||
self.add_subscription(topic, f)
|
||||
return f
|
||||
return decorator
|
||||
|
||||
def add_option(self, name, default, description):
|
||||
"""Add an option that we'd like to register with lightningd.
|
||||
|
||||
|
@ -98,17 +128,11 @@ class Plugin(object):
|
|||
return f
|
||||
return decorator
|
||||
|
||||
def _dispatch(self, request):
|
||||
name = request['method']
|
||||
def _exec_func(self, func, request):
|
||||
params = request['params']
|
||||
|
||||
if name not in self.methods:
|
||||
raise ValueError("No method {} found.".format(name))
|
||||
|
||||
args = params.copy() if isinstance(params, list) else []
|
||||
kwargs = params.copy() if isinstance(params, dict) else {}
|
||||
|
||||
func = self.methods[name]
|
||||
sig = inspect.signature(func)
|
||||
|
||||
if 'plugin' in sig.parameters:
|
||||
|
@ -121,6 +145,43 @@ class Plugin(object):
|
|||
ba.apply_defaults()
|
||||
return func(*ba.args, **ba.kwargs)
|
||||
|
||||
def _dispatch_request(self, request):
|
||||
name = request['method']
|
||||
|
||||
if name not in self.methods:
|
||||
raise ValueError("No method {} found.".format(name))
|
||||
func = self.methods[name]
|
||||
|
||||
try:
|
||||
result = {
|
||||
'jsonrpc': '2.0',
|
||||
'id': request['id'],
|
||||
'result': self._exec_func(func, request)
|
||||
}
|
||||
except Exception as e:
|
||||
result = {
|
||||
'jsonrpc': '2.0',
|
||||
'id': request['id'],
|
||||
"error": "Error while processing {}: {}".format(
|
||||
request['method'], repr(e)
|
||||
),
|
||||
}
|
||||
self.log(traceback.format_exc())
|
||||
json.dump(result, fp=self.stdout)
|
||||
self.stdout.write('\n\n')
|
||||
self.stdout.flush()
|
||||
|
||||
def _dispatch_notification(self, request):
|
||||
name = request['method']
|
||||
if name not in self.subscriptions:
|
||||
raise ValueError("No subscription for {} found.".format(name))
|
||||
func = self.subscriptions[name]
|
||||
|
||||
try:
|
||||
self._exec_func(func, request)
|
||||
except Exception as _:
|
||||
self.log(traceback.format_exc())
|
||||
|
||||
def notify(self, method, params):
|
||||
payload = {
|
||||
'jsonrpc': '2.0',
|
||||
|
@ -145,24 +206,14 @@ class Plugin(object):
|
|||
for payload in msgs[:-1]:
|
||||
request = json.loads(payload)
|
||||
|
||||
try:
|
||||
result = {
|
||||
"jsonrpc": "2.0",
|
||||
"result": self._dispatch(request),
|
||||
"id": request['id']
|
||||
}
|
||||
except Exception as e:
|
||||
result = {
|
||||
"jsonrpc": "2.0",
|
||||
"error": "Error while processing {}".format(
|
||||
request['method']
|
||||
),
|
||||
"id": request['id']
|
||||
}
|
||||
self.log(traceback.format_exc())
|
||||
json.dump(result, fp=self.stdout)
|
||||
self.stdout.write('\n\n')
|
||||
self.stdout.flush()
|
||||
# If this has an 'id'-field, it's a request and returns a
|
||||
# result. Otherwise it's a notification and it doesn't
|
||||
# return anything.
|
||||
if 'id' in request:
|
||||
self._dispatch_request(request)
|
||||
else:
|
||||
self._dispatch_notification(request)
|
||||
|
||||
return msgs[-1]
|
||||
|
||||
def run(self):
|
||||
|
@ -204,6 +255,7 @@ class Plugin(object):
|
|||
return {
|
||||
'options': list(self.options.values()),
|
||||
'rpcmethods': methods,
|
||||
'subscriptions': list(self.subscriptions.keys()),
|
||||
}
|
||||
|
||||
def _init(self, options, configuration, request):
|
||||
|
@ -217,7 +269,7 @@ class Plugin(object):
|
|||
if self.init:
|
||||
self.methods['init'] = self.init
|
||||
self.init = None
|
||||
return self._dispatch(request)
|
||||
return self._exec_func(self.methods['init'], request)
|
||||
return None
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue