Last active
August 18, 2024 09:21
-
-
Save laanwj/1d5414fa1499e0d766cac87377e08266 to your computer and use it in GitHub Desktop.
Bitcoind RPC example from Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Convenience utility for connecting to a bitcoind instance through RPC. | |
''' | |
# W.J. van der Laan 2021 :: SPDX-License-Identifier: MIT | |
import base64 | |
import decimal | |
from http import HTTPStatus | |
import http.client | |
import json | |
import logging | |
import os | |
import socket | |
import time | |
import urllib | |
import urllib.parse | |
HTTP_TIMEOUT = 30 | |
USER_AGENT = "AuthServiceProxy/0.1" | |
log = logging.getLogger("BitcoinRPC") | |
class JSONRPCException(Exception): | |
def __init__(self, rpc_error, http_status=None): | |
try: | |
errmsg = '%(message)s (%(code)i)' % rpc_error | |
except (KeyError, TypeError): | |
errmsg = '' | |
super().__init__(errmsg) | |
self.error = rpc_error | |
self.http_status = http_status | |
def EncodeDecimal(o): | |
if isinstance(o, decimal.Decimal): | |
return str(o) | |
raise TypeError(repr(o) + " is not JSON serializable") | |
class AuthServiceProxy(): | |
__id_count = 0 | |
# ensure_ascii: escape unicode as \uXXXX, passed to json.dumps | |
def __init__(self, service_url, service_name=None, timeout=HTTP_TIMEOUT, connection=None, ensure_ascii=True): | |
self.__service_url = service_url | |
self._service_name = service_name | |
self.ensure_ascii = ensure_ascii # can be toggled on the fly by tests | |
self.__url = urllib.parse.urlparse(service_url) | |
user = None if self.__url.username is None else self.__url.username.encode('utf8') | |
passwd = None if self.__url.password is None else self.__url.password.encode('utf8') | |
authpair = user + b':' + passwd | |
self.__auth_header = b'Basic ' + base64.b64encode(authpair) | |
self.timeout = timeout | |
self._set_conn(connection) | |
def __getattr__(self, name): | |
if name.startswith('__') and name.endswith('__'): | |
# Python internal stuff | |
raise AttributeError | |
if self._service_name is not None: | |
name = "%s.%s" % (self._service_name, name) | |
return AuthServiceProxy(self.__service_url, name, connection=self.__conn) | |
def _request(self, method, path, postdata): | |
''' | |
Do a HTTP request, with retry if we get disconnected (e.g. due to a timeout). | |
This is a workaround for https://bugs.python.org/issue3566 which is fixed in Python 3.5. | |
''' | |
headers = {'Host': self.__url.hostname, | |
'User-Agent': USER_AGENT, | |
'Authorization': self.__auth_header, | |
'Content-type': 'application/json'} | |
if os.name == 'nt': | |
# Windows somehow does not like to re-use connections | |
# TODO: Find out why the connection would disconnect occasionally and make it reusable on Windows | |
# Avoid "ConnectionAbortedError: [WinError 10053] An established connection was aborted by the software in your host machine" | |
self._set_conn() | |
try: | |
self.__conn.request(method, path, postdata, headers) | |
return self._get_response() | |
except (BrokenPipeError, ConnectionResetError): | |
# Python 3.5+ raises BrokenPipeError when the connection was reset | |
# ConnectionResetError happens on FreeBSD | |
self.__conn.close() | |
self.__conn.request(method, path, postdata, headers) | |
return self._get_response() | |
except OSError as e: | |
retry = ( | |
'[WinError 10053] An established connection was aborted by the software in your host machine' in str(e)) | |
# Workaround for a bug on macOS. See https://bugs.python.org/issue33450 | |
retry = retry or ('[Errno 41] Protocol wrong type for socket' in str(e)) | |
if retry: | |
self.__conn.close() | |
self.__conn.request(method, path, postdata, headers) | |
return self._get_response() | |
else: | |
raise | |
def get_request(self, *args, **argsn): | |
AuthServiceProxy.__id_count += 1 | |
log.debug("-{}-> {} {}".format( | |
AuthServiceProxy.__id_count, | |
self._service_name, | |
json.dumps(args or argsn, default=EncodeDecimal, ensure_ascii=self.ensure_ascii), | |
)) | |
if args and argsn: | |
raise ValueError('Cannot handle both named and positional arguments') | |
return {'version': '1.1', | |
'method': self._service_name, | |
'params': args or argsn, | |
'id': AuthServiceProxy.__id_count} | |
def __call__(self, *args, **argsn): | |
postdata = json.dumps(self.get_request(*args, **argsn), default=EncodeDecimal, ensure_ascii=self.ensure_ascii) | |
response, status = self._request('POST', self.__url.path, postdata.encode('utf-8')) | |
if response['error'] is not None: | |
raise JSONRPCException(response['error'], status) | |
elif 'result' not in response: | |
raise JSONRPCException({ | |
'code': -343, 'message': 'missing JSON-RPC result'}, status) | |
elif status != HTTPStatus.OK: | |
raise JSONRPCException({ | |
'code': -342, 'message': 'non-200 HTTP status code but no JSON-RPC error'}, status) | |
else: | |
return response['result'] | |
def batch(self, rpc_call_list): | |
postdata = json.dumps(list(rpc_call_list), default=EncodeDecimal, ensure_ascii=self.ensure_ascii) | |
log.debug("--> " + postdata) | |
response, status = self._request('POST', self.__url.path, postdata.encode('utf-8')) | |
if status != HTTPStatus.OK: | |
raise JSONRPCException({ | |
'code': -342, 'message': 'non-200 HTTP status code but no JSON-RPC error'}, status) | |
return response | |
def _get_response(self): | |
req_start_time = time.time() | |
try: | |
http_response = self.__conn.getresponse() | |
except socket.timeout: | |
raise JSONRPCException({ | |
'code': -344, | |
'message': '%r RPC took longer than %f seconds. Consider ' | |
'using larger timeout for calls that take ' | |
'longer to return.' % (self._service_name, | |
self.__conn.timeout)}) | |
if http_response is None: | |
raise JSONRPCException({ | |
'code': -342, 'message': 'missing HTTP response from server'}) | |
content_type = http_response.getheader('Content-Type') | |
if content_type != 'application/json': | |
raise JSONRPCException( | |
{'code': -342, 'message': 'non-JSON HTTP response with \'%i %s\' from server' % (http_response.status, http_response.reason)}, | |
http_response.status) | |
responsedata = http_response.read().decode('utf8') | |
response = json.loads(responsedata, parse_float=decimal.Decimal) | |
elapsed = time.time() - req_start_time | |
if "error" in response and response["error"] is None: | |
log.debug("<-%s- [%.6f] %s" % (response["id"], elapsed, json.dumps(response["result"], default=EncodeDecimal, ensure_ascii=self.ensure_ascii))) | |
else: | |
log.debug("<-- [%.6f] %s" % (elapsed, responsedata)) | |
return response, http_response.status | |
def __truediv__(self, relative_uri): | |
return AuthServiceProxy("{}/{}".format(self.__service_url, relative_uri), self._service_name, connection=self.__conn) | |
def _set_conn(self, connection=None): | |
port = 80 if self.__url.port is None else self.__url.port | |
if connection: | |
self.__conn = connection | |
self.timeout = connection.timeout | |
elif self.__url.scheme == 'https': | |
self.__conn = http.client.HTTPSConnection(self.__url.hostname, port, timeout=self.timeout) | |
else: | |
self.__conn = http.client.HTTPConnection(self.__url.hostname, port, timeout=self.timeout) | |
# Datadir suffix and RPC port per bitcoind chain. | |
CHAIN_PARAMS = { | |
'main': ("", 8332), | |
'test': ("testnet3", 18332), | |
'signet': ("signet", 38332), | |
'regtest': ("regtest", 18443), | |
} | |
def get_rpc_proxy(*, network=None, host=None, port=None, user=None, passwd=None, wallet=None, datadir=None): | |
''' | |
Create a bitcoind RPC proxy. | |
If you specify `network` (main, test, signet, regtest) it will try to establish cookie authentication | |
to a local bitcoind based on the `datadir` (defaults to `~/.bitcoin`). | |
It is also possible to specify `host`, `port`, `user` and `passwd` (or a subset of them) manually. | |
A specific `wallet` can be specified as well (defaults to no wallet). | |
''' | |
if port is None: | |
if network is None: | |
raise ValueError('A network must be provided if no RPC port is provided') | |
port = CHAIN_PARAMS[network][1] | |
if host is None: | |
host = '127.0.0.1' | |
if user is None or passwd is None: # Cookie auth | |
if network is None: | |
raise ValueError('A network must be provided for cookie authentication') | |
if datadir is None: | |
datadir = os.getenv("DATADIR", os.path.join(os.getenv('HOME'), '.bitcoin')) | |
datadir = os.path.join(datadir, CHAIN_PARAMS[network][0]) | |
with open(os.path.join(datadir, '.cookie'),'r') as f: | |
auth = f.read() | |
else: # Explicit username and password | |
auth = urllib.parse.quote(user) + ':' + urllib.parse.quote(passwd) | |
tgt = f"http://{auth}@{host}:{port}/" | |
if wallet is not None: | |
tgt += 'wallet/' + urllib.parse.quote(wallet) | |
return AuthServiceProxy(tgt) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import easyrpc | |
import sys | |
p = easyrpc.get_rpc_proxy(network='main') | |
for blkid in range(0, 1000): | |
blockhash = p.getblockhash(blkid) | |
data = p.getblock(blockhash, True) | |
merkleroot = data['merkleroot'] | |
num_tx = len(data['tx']) | |
print(f'Merkleroot for block {blkid}: {merkleroot}, {num_tx} transactions') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment