""" A bitcoind proxy that allows instrumentation and canned responses """ from flask import Flask, request from bitcoin.rpc import JSONRPCError # type: ignore from bitcoin.rpc import RawProxy as BitcoinProxy # type: ignore from cheroot.wsgi import Server # type: ignore from cheroot.wsgi import PathInfoDispatcher # type: ignore import decimal import flask import json import logging import threading class DecimalEncoder(json.JSONEncoder): """By default json.dumps does not handle Decimals correctly, so we override it's handling """ def default(self, o): if isinstance(o, decimal.Decimal): return "{:.8f}".format(float(o)) return super(DecimalEncoder, self).default(o) class BitcoinRpcProxy(object): def __init__(self, bitcoind, rpcport=0): self.app = Flask("BitcoindProxy") self.app.add_url_rule("/", "API entrypoint", self.proxy, methods=['POST']) self.rpcport = rpcport self.mocks = {} self.mock_counts = {} self.bitcoind = bitcoind self.request_count = 0 def _handle_request(self, r): brpc = BitcoinProxy(btc_conf_file=self.bitcoind.conf_file) method = r['method'] # If we have set a mock for this method reply with that instead of # forwarding the request. if method in self.mocks and type(self.mocks[method]) == dict: ret = {} ret['id'] = r['id'] ret['error'] = None ret['result'] = self.mocks[method] self.mock_counts[method] += 1 return ret elif method in self.mocks and callable(self.mocks[method]): self.mock_counts[method] += 1 return self.mocks[method](r) try: reply = { "result": brpc._call(r['method'], *r['params']), "error": None, "id": r['id'] } except JSONRPCError as e: reply = { "error": e.error, "code": -32603, "id": r['id'] } self.request_count += 1 return reply def proxy(self): r = json.loads(request.data.decode('ASCII')) if isinstance(r, list): reply = [self._handle_request(subreq) for subreq in r] else: reply = self._handle_request(r) response = flask.Response(json.dumps(reply, cls=DecimalEncoder)) response.headers['Content-Type'] = 'application/json' return response def start(self): d = PathInfoDispatcher({'/': self.app}) self.server = Server(('0.0.0.0', self.rpcport), d) self.proxy_thread = threading.Thread(target=self.server.start) self.proxy_thread.daemon = True self.proxy_thread.start() # Now that bitcoind is running on the real rpcport, let's tell all # future callers to talk to the proxyport. We use the bind_addr as a # signal that the port is bound and accepting connections. while self.server.bind_addr[1] == 0: pass self.rpcport = self.server.bind_addr[1] logging.debug("BitcoinRpcProxy proxying incoming port {} to {}".format(self.rpcport, self.bitcoind.rpcport)) def stop(self): self.server.stop() self.proxy_thread.join() logging.debug("BitcoinRpcProxy shut down after processing {} requests".format(self.request_count)) def mock_rpc(self, method, response=None): """Mock the response to a future RPC call of @method The response can either be a dict with the full JSON-RPC response, or a function that returns such a response. If the response is None the mock is removed and future calls will be passed through to bitcoind again. """ if response is not None: self.mocks[method] = response self.mock_counts[method] = 0 elif method in self.mocks: del self.mocks[method]