Created
August 3, 2016 14:48
-
-
Save affo/c543dccdbc84f7cc85c6f91f251c8c09 to your computer and use it in GitHub Desktop.
Python scripts for producing inputs and consuming outputs for an Azure Streaming Analytics Job
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from azure.storage.blob import BlockBlobService | |
from utils import get_storage_credentials | |
if __name__ == '__main__': | |
import sys, time | |
if len(sys.argv) < 2: | |
print 'Specify container name, please.' | |
sys.exit(1) | |
C_NAME = sys.argv[1] | |
account, key = get_storage_credentials() | |
blob_service = BlockBlobService(account, key) | |
blobs = blob_service.list_blobs(C_NAME) | |
for blob in blobs: | |
blob_service.delete_blob(C_NAME, blob.name) | |
print 'Blob deleted: {}/{}'.format(C_NAME, blob.name) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Consume only from service bus queues. | |
''' | |
if __name__ == '__main__': | |
import sys, json | |
from utils import get_servicebus | |
if len(sys.argv) < 2: | |
print 'Specify the queue name, please.' | |
sys.exit(1) | |
Q_NAME = sys.argv[1] | |
bus_service = get_servicebus() | |
def window_repr(data): | |
r = ', '.join([t.get('value') for t in data.get('collect')]) | |
return '[' + r + ']' | |
while True: | |
msg = bus_service.receive_queue_message(Q_NAME, peek_lock=False) | |
if not msg.body: | |
continue | |
start = msg.body.find('{') | |
end = msg.body.rfind('}') | |
body = msg.body[start : end + 1] | |
try: | |
body = json.loads(body) | |
window_end = body.get('ts') | |
body = window_repr(body) | |
print body, window_end | |
except ValueError: | |
print body | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from utils import * | |
MAX_SIM_TUPLES = 5 | |
MAX_GAP = 5 | |
if __name__ == '__main__': | |
import sys, time | |
STARTUP_TIME = int(round(time.time() * 1000)) | |
if len(sys.argv) < 2: | |
print 'Specify resource name, please.' | |
sys.exit(1) | |
RES_NAME = sys.argv[1] # could be hub, queue or container | |
dense = 1 | |
if len(sys.argv) >= 3: | |
dense = int(sys.argv[2]) | |
# get your production method | |
prod = get_eventhub_prod_fn(RES_NAME) | |
# or prod = get_blobstorage_prod_fn(RES_NAME) | |
# or prod = get_queue_prod_fn(RES_NAME) | |
def pub(ts, tid): | |
tapp = STARTUP_TIME + ts * 1000 | |
value = '({},t-{})'.format(ts, tid) | |
data = dict(tapp=tapp, value=value) | |
msg = json.dumps(data, encoding='utf-8') | |
prod(msg) | |
# or, in the case of blobs | |
# prod(msg, 'blob' + str(tid)) | |
print '>>>', msg | |
last_ts = -1 | |
last_id = 0 | |
while True: | |
# choose number of simultaneous tuples | |
no_sim_tuples = 1 | |
if not dense: | |
no_sim_tuples = unbalanced_randint(MAX_SIM_TUPLES) | |
# choose a gap in seconds | |
gap = 1 | |
if not dense: | |
gap = unbalanced_randint(MAX_GAP) | |
# sleep for gap | |
time.sleep(gap) | |
# update timestamps | |
last_ts += gap | |
for _ in xrange(no_sim_tuples): | |
pub(last_ts, last_id) | |
last_id += 1 | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
azure-servicebus | |
azure-storage |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Endpoint=[...];SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=[...] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"account": "foo", | |
"key": "..." | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
from azure.servicebus import ServiceBusService | |
from azure.storage.blob import BlockBlobService | |
STORAGE_CREDS = 'storage_credentials.json' | |
SERVICEBUS_CREDS = 'servicebus_credentials.txt' | |
def get_storage_credentials(): | |
creds = {} | |
with open(STORAGE_CREDS, 'r') as fp: | |
creds = json.load(fp) | |
return creds['account'], creds['key'] | |
def get_servicebus_credentials(): | |
creds = {} | |
with open(SERVICEBUS_CREDS, 'r') as f: | |
line = f.readline() | |
kvs = line.split(';') | |
for kv in kvs: | |
k, v = kv.strip().split('=', 1) | |
creds[k] = v | |
return creds | |
def get_servicebus(): | |
creds = get_servicebus_credentials() | |
namespace = creds['Endpoint'].split('.')[0][5:] | |
sak_name = creds['SharedAccessKeyName'] | |
sak_value = creds['SharedAccessKey'] | |
return ServiceBusService( | |
service_namespace=namespace, | |
shared_access_key_name=sak_name, | |
shared_access_key_value=sak_value | |
) | |
def get_eventhub_prod_fn(hub_name): | |
bs = get_servicebus() | |
return lambda msg: bs.send_event(hub_name, msg, device_id="mylaptop") | |
def get_queue_prod_fn(q_name): | |
bs = get_servicebus() | |
return lambda msg: bs.send_queue_message(q_name, msg) | |
def get_blobstorage_prod_fn(c_name): | |
account, key = get_storage_credentials() | |
bs = BlockBlobService(account, key) | |
return lambda msg, blob_name: bs.create_blob_from_text(c_name, blob_name, msg) | |
def unbalanced_randint(upper): | |
assert upper >= 2 | |
if random.random() < 0.7: | |
return 1 | |
return random.randint(2, upper) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment