rgb-cln/plugins/clnrest/utilities/rpc_routes.py

59 lines
2.1 KiB
Python
Raw Normal View History

2023-07-14 03:06:24 +01:00
import json5
from flask import request, make_response
2023-07-14 03:06:24 +01:00
from flask_restx import Namespace, Resource
from .shared import call_rpc_method, verify_rune, process_help_response
from .rpc_plugin import plugin
methods_list = []
rpcns = Namespace("RPCs")
payload_model = rpcns.model("Payload", {}, None, False)
@rpcns.route("/list-methods")
class ListMethodsResource(Resource):
@rpcns.response(200, "Success")
@rpcns.response(500, "Server error")
def get(self):
"""Get the list of all valid rpc methods, useful for Swagger to get human readable list without calling lightning-cli help"""
try:
help_response = call_rpc_method(plugin, "help", [])
html_content = process_help_response(help_response)
response = make_response(html_content)
response.headers["Content-Type"] = "text/html"
return response
except Exception as err:
plugin.log(f"Error: {err}", "error")
return json5.loads(str(err)), 500
@rpcns.route("/<rpc_method>")
class RpcMethodResource(Resource):
@rpcns.doc(security=[{"rune": [], "nodeid": []}])
@rpcns.doc(params={"rpc_method": (f"Name of the RPC method to be called")})
@rpcns.expect(payload_model, validate=False)
@rpcns.response(201, "Success")
@rpcns.response(500, "Server error")
def post(self, rpc_method):
"""Call any valid core lightning method (check list-methods response)"""
try:
is_valid_rune = verify_rune(plugin, request)
if "error" in is_valid_rune:
plugin.log(f"Error: {is_valid_rune}", "error")
raise Exception(is_valid_rune)
except Exception as err:
return json5.loads(str(err)), 403
try:
if request.is_json:
payload = request.get_json()
else:
payload = request.form.to_dict()
return call_rpc_method(plugin, rpc_method, payload), 201
except Exception as err:
plugin.log(f"Error: {err}", "error")
return json5.loads(str(err)), 500