Last active
August 14, 2020 23:29
-
-
Save lmazuel/51543e3d9919e8c36a023088ec5532c3 to your computer and use it in GitHub Desktop.
SR brainstorm
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
with ServiceBusClient.from_connection_string(connstr) as client: | |
# max_wait_time specifies how long the receiver should wait with no incoming messages before stopping receipt. | |
# Default is None; to receive forever. | |
with client.get_queue_receiver(queue_name, max_wait_time=30) as receiver: | |
for msg in receiver: # ServiceBusReceiver instance is a generator | |
bytes_data = msg.body | |
sr_client = SRClientCache(credentials, endpoint) | |
schema_type: str = SRClientCache.detect_type(bytes_data) | |
schema = sr_client.getSchema(schema_type) # this call might be cached | |
avro_deserializer = AvroDeserializer() | |
dict_data = avro_deserializer.deserialize(bytes_data, schema) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from azure.sr import SRAvroSerializer | |
from azure.sr import SRClient # Should work | |
with ServiceBusClient.from_connection_string(connstr) as client: | |
# max_wait_time specifies how long the receiver should wait with no incoming messages before stopping receipt. | |
# Default is None; to receive forever. | |
with client.get_queue_receiver(queue_name, max_wait_time=30) as receiver: | |
for msg in receiver: # ServiceBusReceiver instance is a generator | |
bytes_data = msg.body | |
avro_deserializer = SRAvroSerializer(credentials, cache=True) # includes the client inside | |
dict_data = avro_deserializer.deserialize(bytes_data) | |
# This calls will: | |
# - cut shema id and payload | |
# - do a SR call with the schema id | |
# - build the avro schema from the received schema | |
# - decode the payload using that schema (might raise an AvroSchemaValidationError) | |
# - return the dict, if there was no exception | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import avro | |
avro_schema = AvroSchema(......) | |
my_dict_data = {...} # that matches the avro_schema | |
with ServiceBusClient.from_connection_string(connstr) as client: | |
with client.get_queue_sender(queue_name) as sender: | |
# Sending a single message | |
single_message = Message("Single message") | |
sender.send_messages(single_message) | |
avro_deserializer = SRAvroSerializer(credentials, cache=True) # includes the client inside | |
my_bytes_data = avro_deserializer.serialize(my_dict_data, avro_schema) | |
# This calls will | |
# - auto-register the schema if necessayr using SR client | |
# - get bytes from ht dict using the avro package | |
# - concat schema id + payload | |
# Sending a list of messages | |
messages = [Message(my_bytes_data)] | |
sender.send_messages(messages) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment