Created
December 16, 2014 07:25
-
-
Save a-hisame/c94170975c3d8e1db0c1 to your computer and use it in GitHub Desktop.
QueueからデータをPollingして、処理をし、処理結果をDynamoDBに保存するサンプル
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding: utf-8 | |
import time | |
import datetime | |
import json | |
import logging | |
import boto.sqs | |
# ----------------------- | |
# 設定によって変更するところ | |
# ----------------------- | |
REGION = 'ap-northeast-1' | |
AWS_ACCESS_KEY = '' | |
AWS_SECRET_ACCESS_KEY = '' | |
FROM_RAW_MESSAGE = True | |
QUEUE_NAME = '' | |
TABLE_NAME = '' | |
# ----------------------- | |
def heavy_procedure(id): | |
''' とても重い処理を行ってそれらしい結果を返します ''' | |
import uuid | |
ids = [] | |
for i in xrange(0,5): | |
time.sleep(1) | |
ids.append(str(uuid.uuid4())) | |
return ':'.join(ids) | |
def put_with_longtime(id): | |
''' | |
idに対応する処理を行ってDynamoDBにデータを保存します | |
''' | |
value = heavy_procedure(id) | |
import boto.dynamodb2 | |
from boto.dynamodb2.table import Table | |
conn = boto.dynamodb2.connect_to_region(REGION, | |
aws_access_key_id=AWS_ACCESS_KEY, | |
aws_secret_access_key=AWS_SECRET_ACCESS_KEY) | |
table = Table(TABLE_NAME, connection=conn) | |
# テーブルに保存 (上書き) | |
table.put_item(data={ | |
'id': id, | |
'value': value | |
}, overwrite=True) | |
def get_messages(else_value=None): | |
''' | |
Queueにメッセージがあればそのメッセージを、 | |
メッセージが無ければelse_valueを返す | |
''' | |
# connは使いまわしてもよく、解放の必要もない | |
conn = boto.sqs.connect_to_region(REGION, | |
aws_access_key_id=AWS_ACCESS_KEY, | |
aws_secret_access_key=AWS_SECRET_ACCESS_KEY) | |
# AWS-CLIやSNSからのメッセージを受け取る場合は | |
# RawMessageとして受ける必要がある | |
queue = conn.get_queue(QUEUE_NAME) | |
if FROM_RAW_MESSAGE: | |
from boto.sqs.message import RawMessage | |
queue.set_message_class(RawMessage) | |
msgs = queue.get_messages(num_messages=1) | |
if len(msgs) == 0: | |
return else_value | |
# 取得できた1つだけを返す | |
return msgs[0] | |
def main(): | |
msg = get_messages() | |
if msg == None: | |
print 'No Message' | |
return | |
try: | |
body = json.loads(msg.get_body()) | |
id = body['Subject'] | |
put_with_longtime(id) | |
# 正確に処理が終わったので、Queueにメッセージの削除を依頼する | |
msg.delete() | |
except Exception as e: | |
logging.exception(e) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment