Skip to content

Instantly share code, notes, and snippets.

@FrankC01
Created July 31, 2018 21:02
Show Gist options
  • Save FrankC01/caaded1b5c503e95e372cc2eec3f3c9c to your computer and use it in GitHub Desktop.
Save FrankC01/caaded1b5c503e95e372cc2eec3f3c9c to your computer and use it in GitHub Desktop.
import asyncio
import logging
import errors
from zmq.asyncio import ZMQEventLoop
from messaging import (
Connection, DisconnectError, SendBackoffTimeoutError)
from sawtooth_sdk.protobuf import client_state_pb2
from sawtooth_sdk.protobuf.validator_pb2 import Message
from google.protobuf.json_format import MessageToDict
from google.protobuf.message import DecodeError
URL = 'tcp://validator:4004'
LOGGER = logging.getLogger(__name__)
def _check_status_errors(proto, content, etraps=None):
pass
def _message_to_dict(message):
"""Converts a Protobuf object to a python dict with desired settings.
"""
return MessageToDict(
message,
including_default_value_fields=True,
preserving_proto_field_name=True)
def _parse_response(proto, response):
"""Parses the content from a validator response Message."""
try:
content = proto()
content.ParseFromString(response.content)
return content
except (DecodeError, AttributeError):
LOGGER.error('Validator response was not parsable: %s', response)
raise errors.ValidatorResponseInvalid()
def _get_paging_controls(start=None, limit=300):
"""Parses start and/or limit queries into a paging controls dict."""
controls = {}
if limit is not None:
try:
controls['limit'] = int(limit)
except ValueError:
LOGGER.debug('Request query had an invalid limit: %s', limit)
raise errors.CountInvalid()
if controls['limit'] <= 0:
LOGGER.debug('Request query had an invalid limit: %s', limit)
raise errors.CountInvalid()
if start is not None:
controls['start'] = start
return controls
async def _query_validator(_connection, request_type, response_proto, payload, error_traps=None):
"""Sends a request to the validator and parses the response.
"""
print('Sending %s request to validator {}'.format(request_type))
payload_bytes = payload.SerializeToString()
response = await _send_request(
_connection, request_type, payload_bytes)
content = _parse_response(response_proto, response)
# LOGGER.debug(
# 'Received %s response from validator with status %s',
# self._get_type_name(response.message_type),
# self._get_status_name(response_proto, content.status))
_check_status_errors(response_proto, content, error_traps)
return _message_to_dict(content)
async def _send_request(_connection, request_type, payload):
"""Uses an executor to send an asynchronous ZMQ request to the
validator with the handler's Connection
"""
print("In _send_request")
try:
return await _connection.send(
message_type=request_type,
message_content=payload,
timeout=100)
except DisconnectError:
LOGGER.warning('Validator disconnected while waiting for response')
raise errors.ValidatorDisconnected()
except asyncio.TimeoutError:
LOGGER.warning('Timed out while waiting for validator response')
raise errors.ValidatorTimedOut()
except SendBackoffTimeoutError:
LOGGER.warning('Failed sending message - Backoff timed out')
raise errors.SendBackoffTimeout()
# async def _head_to_root(block_id):
# error_traps = [error_handlers.BlockNotFoundTrap]
# if block_id:
# response = await self._query_validator(
# Message.CLIENT_BLOCK_GET_BY_ID_REQUEST,
# client_block_pb2.ClientBlockGetResponse,
# client_block_pb2.ClientBlockGetByIdRequest(block_id=block_id),
# error_traps)
# block = self._expand_block(response['block'])
# else:
# response = await self._query_validator(
# Message.CLIENT_BLOCK_LIST_REQUEST,
# client_block_pb2.ClientBlockListResponse,
# client_block_pb2.ClientBlockListRequest(
# paging=client_list_control_pb2.ClientPagingControls(
# limit=1)),
# error_traps)
# block = self._expand_block(response['blocks'][0])
# return (
# block['header_signature'],
# block['header']['state_root_hash'],
# )
async def state_leaf(_connection, addy):
"""Fetches data from a specific address in the validator's state tree.
Request:
query:
- head: The id of the block to use as the head of the chain
- address: The 70 character address of the data to be fetched
Response:
entries: An array of 1 map [{data:, address:}]
"""
validator_query = client_state_pb2.ClientStateGetRequest(
state_root=None,
address=addy)
response = await _query_validator(
_connection,
Message.CLIENT_STATE_GET_REQUEST,
client_state_pb2.ClientStateGetResponse,
validator_query)
entries = [{'address': addy, 'data': response['value']}]
# print("Leaf response = {}".format(entries))
return entries
async def _get_next(_connection, root=None, addy=None, paging=None):
validator_query = client_state_pb2.ClientStateListRequest(
state_root=root,
address=addy,
paging=paging)
return await _query_validator(
_connection,
Message.CLIENT_STATE_LIST_REQUEST,
client_state_pb2.ClientStateListResponse,
validator_query)
async def state_list(_connection, addy=None):
"""Fetches list of data entries, optionally filtered by address prefix.
Request:
- _connection: Sawtooth messaging.Connection
- address: Return entries whose addresses begin with this prefix
Response:
entries: An array of maps [{data:, address:},...]
"""
fetch_more = True
entries = []
paging_controls = _get_paging_controls()
while fetch_more:
response = await _get_next(
_connection, addy=addy, paging=paging_controls)
entries.extend(response.get('entries', []))
if response['paging']['next']:
paging_controls = _get_paging_controls(response['paging']['next'])
else:
fetch_more = False
# validator_query = client_state_pb2.ClientStateListRequest(
# state_root=None,
# address=addy if addy else None,
# paging=paging_controls)
# response = await _query_validator(
# _connection,
# Message.CLIENT_STATE_LIST_REQUEST,
# client_state_pb2.ClientStateListResponse,
# validator_query)
# entries = response.get('entries', [])
# print("list entries = {}".format(entries))
# print("list entries = {}".format(len(entries)))
# print("First entry = {}".format(entries[0]))
# print("list entries = {}".format(len(response.get('entries', []))))
return entries,
def main():
loop = ZMQEventLoop()
asyncio.set_event_loop(loop)
connection = Connection(URL)
print('Opening connection')
connection.open()
# Do stuff
tasks = [
# asyncio.ensure_future(
# state_leaf(
# connection,
# '95768292a8fd1de17daaff72210fc39fa3af911d56468fd2a6fa101ec72b6622850c03')),
asyncio.ensure_future(state_list(connection, '95768292a8fd'))]
loop.run_until_complete(asyncio.wait(tasks))
# asyncio.ensure_future(list_state(connection))
# fetch_state(connection)
# Close connection
print('Closing connection')
connection.close()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment