Skip to content

Instantly share code, notes, and snippets.

@lmazuel
Last active August 14, 2020 23:29
Show Gist options
  • Save lmazuel/51543e3d9919e8c36a023088ec5532c3 to your computer and use it in GitHub Desktop.
Save lmazuel/51543e3d9919e8c36a023088ec5532c3 to your computer and use it in GitHub Desktop.
SR brainstorm
with ServiceBusClient.from_connection_string(connstr) as client:
# max_wait_time specifies how long the receiver should wait with no incoming messages before stopping receipt.
# Default is None; to receive forever.
with client.get_queue_receiver(queue_name, max_wait_time=30) as receiver:
for msg in receiver: # ServiceBusReceiver instance is a generator
bytes_data = msg.body
sr_client = SRClientCache(credentials, endpoint)
schema_type: str = SRClientCache.detect_type(bytes_data)
schema = sr_client.getSchema(schema_type) # this call might be cached
avro_deserializer = AvroDeserializer()
dict_data = avro_deserializer.deserialize(bytes_data, schema)
from azure.sr import SRAvroSerializer
from azure.sr import SRClient # Should work
with ServiceBusClient.from_connection_string(connstr) as client:
# max_wait_time specifies how long the receiver should wait with no incoming messages before stopping receipt.
# Default is None; to receive forever.
with client.get_queue_receiver(queue_name, max_wait_time=30) as receiver:
for msg in receiver: # ServiceBusReceiver instance is a generator
bytes_data = msg.body
avro_deserializer = SRAvroSerializer(credentials, cache=True) # includes the client inside
dict_data = avro_deserializer.deserialize(bytes_data)
# This calls will:
# - cut shema id and payload
# - do a SR call with the schema id
# - build the avro schema from the received schema
# - decode the payload using that schema (might raise an AvroSchemaValidationError)
# - return the dict, if there was no exception
import avro
avro_schema = AvroSchema(......)
my_dict_data = {...} # that matches the avro_schema
with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_sender(queue_name) as sender:
# Sending a single message
single_message = Message("Single message")
sender.send_messages(single_message)
avro_deserializer = SRAvroSerializer(credentials, cache=True) # includes the client inside
my_bytes_data = avro_deserializer.serialize(my_dict_data, avro_schema)
# This calls will
# - auto-register the schema if necessayr using SR client
# - get bytes from ht dict using the avro package
# - concat schema id + payload
# Sending a list of messages
messages = [Message(my_bytes_data)]
sender.send_messages(messages)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment