Created
July 31, 2018 21:02
-
-
Save FrankC01/caaded1b5c503e95e372cc2eec3f3c9c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import logging | |
import errors | |
from zmq.asyncio import ZMQEventLoop | |
from messaging import ( | |
Connection, DisconnectError, SendBackoffTimeoutError) | |
from sawtooth_sdk.protobuf import client_state_pb2 | |
from sawtooth_sdk.protobuf.validator_pb2 import Message | |
from google.protobuf.json_format import MessageToDict | |
from google.protobuf.message import DecodeError | |
URL = 'tcp://validator:4004' | |
LOGGER = logging.getLogger(__name__) | |
def _check_status_errors(proto, content, etraps=None): | |
pass | |
def _message_to_dict(message): | |
"""Converts a Protobuf object to a python dict with desired settings. | |
""" | |
return MessageToDict( | |
message, | |
including_default_value_fields=True, | |
preserving_proto_field_name=True) | |
def _parse_response(proto, response): | |
"""Parses the content from a validator response Message.""" | |
try: | |
content = proto() | |
content.ParseFromString(response.content) | |
return content | |
except (DecodeError, AttributeError): | |
LOGGER.error('Validator response was not parsable: %s', response) | |
raise errors.ValidatorResponseInvalid() | |
def _get_paging_controls(start=None, limit=300): | |
"""Parses start and/or limit queries into a paging controls dict.""" | |
controls = {} | |
if limit is not None: | |
try: | |
controls['limit'] = int(limit) | |
except ValueError: | |
LOGGER.debug('Request query had an invalid limit: %s', limit) | |
raise errors.CountInvalid() | |
if controls['limit'] <= 0: | |
LOGGER.debug('Request query had an invalid limit: %s', limit) | |
raise errors.CountInvalid() | |
if start is not None: | |
controls['start'] = start | |
return controls | |
async def _query_validator(_connection, request_type, response_proto, payload, error_traps=None): | |
"""Sends a request to the validator and parses the response. | |
""" | |
print('Sending %s request to validator {}'.format(request_type)) | |
payload_bytes = payload.SerializeToString() | |
response = await _send_request( | |
_connection, request_type, payload_bytes) | |
content = _parse_response(response_proto, response) | |
# LOGGER.debug( | |
# 'Received %s response from validator with status %s', | |
# self._get_type_name(response.message_type), | |
# self._get_status_name(response_proto, content.status)) | |
_check_status_errors(response_proto, content, error_traps) | |
return _message_to_dict(content) | |
async def _send_request(_connection, request_type, payload): | |
"""Uses an executor to send an asynchronous ZMQ request to the | |
validator with the handler's Connection | |
""" | |
print("In _send_request") | |
try: | |
return await _connection.send( | |
message_type=request_type, | |
message_content=payload, | |
timeout=100) | |
except DisconnectError: | |
LOGGER.warning('Validator disconnected while waiting for response') | |
raise errors.ValidatorDisconnected() | |
except asyncio.TimeoutError: | |
LOGGER.warning('Timed out while waiting for validator response') | |
raise errors.ValidatorTimedOut() | |
except SendBackoffTimeoutError: | |
LOGGER.warning('Failed sending message - Backoff timed out') | |
raise errors.SendBackoffTimeout() | |
# async def _head_to_root(block_id): | |
# error_traps = [error_handlers.BlockNotFoundTrap] | |
# if block_id: | |
# response = await self._query_validator( | |
# Message.CLIENT_BLOCK_GET_BY_ID_REQUEST, | |
# client_block_pb2.ClientBlockGetResponse, | |
# client_block_pb2.ClientBlockGetByIdRequest(block_id=block_id), | |
# error_traps) | |
# block = self._expand_block(response['block']) | |
# else: | |
# response = await self._query_validator( | |
# Message.CLIENT_BLOCK_LIST_REQUEST, | |
# client_block_pb2.ClientBlockListResponse, | |
# client_block_pb2.ClientBlockListRequest( | |
# paging=client_list_control_pb2.ClientPagingControls( | |
# limit=1)), | |
# error_traps) | |
# block = self._expand_block(response['blocks'][0]) | |
# return ( | |
# block['header_signature'], | |
# block['header']['state_root_hash'], | |
# ) | |
async def state_leaf(_connection, addy): | |
"""Fetches data from a specific address in the validator's state tree. | |
Request: | |
query: | |
- head: The id of the block to use as the head of the chain | |
- address: The 70 character address of the data to be fetched | |
Response: | |
entries: An array of 1 map [{data:, address:}] | |
""" | |
validator_query = client_state_pb2.ClientStateGetRequest( | |
state_root=None, | |
address=addy) | |
response = await _query_validator( | |
_connection, | |
Message.CLIENT_STATE_GET_REQUEST, | |
client_state_pb2.ClientStateGetResponse, | |
validator_query) | |
entries = [{'address': addy, 'data': response['value']}] | |
# print("Leaf response = {}".format(entries)) | |
return entries | |
async def _get_next(_connection, root=None, addy=None, paging=None): | |
validator_query = client_state_pb2.ClientStateListRequest( | |
state_root=root, | |
address=addy, | |
paging=paging) | |
return await _query_validator( | |
_connection, | |
Message.CLIENT_STATE_LIST_REQUEST, | |
client_state_pb2.ClientStateListResponse, | |
validator_query) | |
async def state_list(_connection, addy=None): | |
"""Fetches list of data entries, optionally filtered by address prefix. | |
Request: | |
- _connection: Sawtooth messaging.Connection | |
- address: Return entries whose addresses begin with this prefix | |
Response: | |
entries: An array of maps [{data:, address:},...] | |
""" | |
fetch_more = True | |
entries = [] | |
paging_controls = _get_paging_controls() | |
while fetch_more: | |
response = await _get_next( | |
_connection, addy=addy, paging=paging_controls) | |
entries.extend(response.get('entries', [])) | |
if response['paging']['next']: | |
paging_controls = _get_paging_controls(response['paging']['next']) | |
else: | |
fetch_more = False | |
# validator_query = client_state_pb2.ClientStateListRequest( | |
# state_root=None, | |
# address=addy if addy else None, | |
# paging=paging_controls) | |
# response = await _query_validator( | |
# _connection, | |
# Message.CLIENT_STATE_LIST_REQUEST, | |
# client_state_pb2.ClientStateListResponse, | |
# validator_query) | |
# entries = response.get('entries', []) | |
# print("list entries = {}".format(entries)) | |
# print("list entries = {}".format(len(entries))) | |
# print("First entry = {}".format(entries[0])) | |
# print("list entries = {}".format(len(response.get('entries', [])))) | |
return entries, | |
def main(): | |
loop = ZMQEventLoop() | |
asyncio.set_event_loop(loop) | |
connection = Connection(URL) | |
print('Opening connection') | |
connection.open() | |
# Do stuff | |
tasks = [ | |
# asyncio.ensure_future( | |
# state_leaf( | |
# connection, | |
# '95768292a8fd1de17daaff72210fc39fa3af911d56468fd2a6fa101ec72b6622850c03')), | |
asyncio.ensure_future(state_list(connection, '95768292a8fd'))] | |
loop.run_until_complete(asyncio.wait(tasks)) | |
# asyncio.ensure_future(list_state(connection)) | |
# fetch_state(connection) | |
# Close connection | |
print('Closing connection') | |
connection.close() | |
if __name__ == '__main__': | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment