Last active
August 29, 2015 14:20
-
-
Save masayuki5160/f72e089fb471708b9f8c to your computer and use it in GitHub Desktop.
Python + Kinesis
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{"date": "2015-05-03 10:36:43", "result": "106.179.89.159 - - [03/May/2015:10:36:43 +0000] \"GET / HTTP/1.1\" 403 3839 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.135 Safari/537.36\"\n"} | |
{"date": "2015-05-03 10:36:43", "result": "106.179.89.159 - - [03/May/2015:10:36:43 +0000] \"GET /icons/apache_pb2.gif HTTP/1.1\" 304 - \"http://52.68.97.208/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.135 Safari/537.36\"\n"} | |
{"date": "2015-05-03 10:36:44", "result": "106.179.89.159 - - [03/May/2015:10:36:44 +0000] \"GET / HTTP/1.1\" 403 3839 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.135 Safari/537.36\"\n"} | |
{"date": "2015-05-03 10:36:44", "result": "106.179.89.159 - - [03/May/2015:10:36:44 +0000] \"GET /icons/apache_pb2.gif HTTP/1.1\" 304 - \"http://52.68.97.208/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.135 Safari/537.36\"\n"} | |
{"date": "2015-05-03 10:36:46", "result": "106.179.89.159 - - [03/May/2015:10:36:46 +0000] \"GET / HTTP/1.1\" 403 3839 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.135 Safari/537.36\"\n"} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: utf-8 -*- | |
import boto.kinesis,datetime,time | |
import re | |
import json | |
import urllib2 | |
import os.path | |
import calendar | |
from urllib2 import Request, urlopen, URLError, HTTPError | |
from boto.s3.key import Key | |
# kinesisで加工したログの保存先S3(バケット名は任意) | |
s3 = boto.connect_s3() | |
bucket = s3.get_bucket("masayuki-emr") | |
k = Key(bucket) | |
dt = "" | |
# kinesisへの接続設定(regionとstream名を設定) | |
connection = boto.kinesis.connect_to_region('ap-northeast-1') | |
stream_name = 'masayuki' | |
stream = connection.describe_stream(stream_name) | |
shards = stream['StreamDescription']['Shards'][0]['ShardId'] | |
kinesis_iterator = connection.get_shard_iterator(stream_name,shards,'LATEST') | |
next_iterator = None | |
while True: | |
if next_iterator is None: | |
next_iterator = kinesis_iterator['ShardIterator'] | |
else: | |
next_iterator = responce['NextShardIterator'] | |
responce = None | |
responce = connection.get_records(next_iterator,limit=1) | |
if len(responce['Records'])!=0: | |
result = responce['Records'][0]['Data'] | |
# 日付のフォーマット変換 | |
resultArray = result.split(' [') | |
dateTime = resultArray[1].split(']')[0] | |
dateArray = dateTime.split('/') | |
timeArray = dateArray[2].split(':') | |
for i ,v in enumerate(calendar.month_abbr): | |
if dateArray[1]==v: | |
dateMonth = i | |
eDatetime = timeArray[0]+'-'+str('%02d' % dateMonth)+'-'+dateArray[0]+' '+timeArray[1]+':'+timeArray[2]+':'+re.match(r'(.*) ',timeArray[3]).group(1) | |
od = dt | |
d = datetime.datetime.today() | |
# 分単位でログファイルを生成 | |
dt = '%s%s%s%s%s' % (d.year, str('%02d' % d.month), str('%02d' % d.day), str('%02d' % d.hour), str('%02d' % d.minute)) | |
if od != dt: | |
logfile = 'access_log.' + od | |
if os.path.exists(logfile): | |
print logfile | |
k.key = logfile | |
k.set_contents_from_filename(logfile) | |
fi = open('access_log.' + dt, 'a') | |
# JSON形式に加工 | |
tmpjson = json.dumps({'date':eDatetime, 'result':result}) | |
fi.write(tmpjson + "\n") | |
# fi.write(eDatetime + "\t" + result + "\t" + "TEST\n") | |
fi.close() | |
# 1件づつ取得をしているため、Streamの帯域にあわせて制御 | |
time.sleep(0.2) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: utf-8 -*- | |
import boto.kinesis,datetime,time | |
import re | |
import json | |
import urllib2 | |
import os.path | |
import calendar | |
from urllib2 import Request, urlopen, URLError, HTTPError | |
from boto.s3.key import Key | |
# kinesisで加工したログの保存先S3(バケット名は任意) | |
s3 = boto.connect_s3() | |
bucket = s3.get_bucket("masayuki-emr") | |
k = Key(bucket) | |
dt = "" | |
# kinesisへの接続設定(regionとstream名を設定) | |
connection = boto.kinesis.connect_to_region('ap-northeast-1') | |
stream_name = 'masayuki' | |
stream = connection.describe_stream(stream_name) | |
shards = stream['StreamDescription']['Shards'][0]['ShardId'] | |
kinesis_iterator = connection.get_shard_iterator(stream_name,shards,'LATEST') | |
next_iterator = None | |
while True: | |
if next_iterator is None: | |
next_iterator = kinesis_iterator['ShardIterator'] | |
else: | |
next_iterator = responce['NextShardIterator'] | |
responce = None | |
responce = connection.get_records(next_iterator,limit=1) | |
if len(responce['Records'])!=0: | |
result = responce['Records'][0]['Data'] | |
# 日付のフォーマット変換 | |
resultArray = result.split(' [') | |
dateTime = resultArray[1].split(']')[0] | |
dateArray = dateTime.split('/') | |
timeArray = dateArray[2].split(':') | |
for i ,v in enumerate(calendar.month_abbr): | |
if dateArray[1]==v: | |
dateMonth = i | |
eDatetime = timeArray[0]+'-'+str('%02d' % dateMonth)+'-'+dateArray[0]+' '+timeArray[1]+':'+timeArray[2]+':'+re.match(r'(.*) ',timeArray[3]).group(1) | |
od = dt | |
d = datetime.datetime.today() | |
# 分単位でログファイルを生成 | |
dt = '%s%s%s%s%s' % (d.year, str('%02d' % d.month), str('%02d' % d.day), str('%02d' % d.hour), str('%02d' % d.minute)) | |
if od != dt: | |
logfile = 'sql.' + od + '.sql' | |
if os.path.exists(logfile): | |
print logfile | |
k.key = logfile | |
k.set_contents_from_filename(logfile) | |
fi = open('sql.' + dt + '.sql', 'a') | |
# 不要な"(ダブルクオテーションを削除) | |
logtext = result.replace('"','') | |
fi.write('INSERT INTO LOG_TBL (ID, LOG_TEXT, UPD_DATE) VALUES (NULL, "' + eDatetime + '","' + logtext + '");\n') | |
fi.close() | |
# 1件づつ取得をしているため、Streamの帯域にあわせて制御 | |
time.sleep(0.2) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: utf-8 -*- | |
import boto.kinesis,datetime,time | |
import threading, Queue, subprocess, sys | |
tailq = Queue.Queue(maxsize=10) | |
def tail_forever(fn): | |
p = subprocess.Popen(["tail", "-f", fn], stdout=subprocess.PIPE) | |
while 1: | |
line = p.stdout.readline() | |
tailq.put(line) | |
if not line: | |
break | |
def main(): | |
fn = sys.argv[1] | |
threading.Thread(target=tail_forever, args=(fn,)).start() | |
connection = boto.kinesis.connect_to_region('ap-northeast-1') | |
stream_name = 'masayuki' | |
partition_key = 'kinesis-sample' | |
while True: | |
print connection.put_record(stream_name,tailq.get(),partition_key) | |
if __name__ == '__main__': | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
INSERT INTO LOG_TBL (ID, LOG_TEXT, UPD_DATE) VALUES (NULL, "2015-05-03 11:06:02","106.179.89.159 - - [03/May/2015:11:06:02 +0000] GET / HTTP/1.1 403 3839 - Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.135 Safari/537.36 | |
"); | |
INSERT INTO LOG_TBL (ID, LOG_TEXT, UPD_DATE) VALUES (NULL, "2015-05-03 11:06:02","106.179.89.159 - - [03/May/2015:11:06:02 +0000] GET /icons/apache_pb2.gif HTTP/1.1 304 - http://52.68.97.208/ Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.135 Safari/537.36 | |
"); | |
INSERT INTO LOG_TBL (ID, LOG_TEXT, UPD_DATE) VALUES (NULL, "2015-05-03 11:06:04","106.179.89.159 - - [03/May/2015:11:06:04 +0000] GET / HTTP/1.1 403 3839 - Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.135 Safari/537.36 | |
"); | |
INSERT INTO LOG_TBL (ID, LOG_TEXT, UPD_DATE) VALUES (NULL, "2015-05-03 11:06:04","106.179.89.159 - - [03/May/2015:11:06:04 +0000] GET /icons/apache_pb2.gif HTTP/1.1 304 - http://52.68.97.208/ Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.135 Safari/537.36 | |
"); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[EC2上で作業実施] | |
1. クレデンシャルの設定まで. | |
# sudo yum update -y | |
# sudo easy_install boto | |
# vim ~/.boto(クレデンシャルを設定) | |
[Credentials] | |
aws_access_key_id = ********* | |
aws_secret_access_key = ********* | |
2. botoの動作テスト | |
# python | |
Python 2.7.9 (default, Apr 1 2015, 18:18:03) | |
[GCC 4.8.2 20140120 (Red Hat 4.8.2-16)] on linux2 | |
Type "help", "copyright", "credits" or "license" for more information. | |
>>> import boto | |
>>> boto.set_stream_logger('boto') | |
>>> s3 = boto.connect_s3() | |
2015-05-03 04:04:19,508 boto [DEBUG]:Using access key found in config file. | |
2015-05-03 04:04:19,508 boto [DEBUG]:Using secret key found in config file. | |
3. kinesisへのput検証まで(/var/log/dmesg をputしていく) | |
# vim putrecords.py | |
# python putrecords.py /var/log/dmesg | |
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257176699974018233308339503106'} | |
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257177908899837847937514209282'} | |
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257179117825657462566688915458'} | |
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257180326751477077195863621634'} | |
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257181535677296691825038327810'} | |
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257182744603116306454213033986'} | |
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257183953528935921083387740162'} | |
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257185162454755535712562446338'} | |
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257186371380575150341737152514'} | |
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257187580306394765039631335426'} | |
バックグラウンドでやるとき | |
# python putrecords.py /var/log/dmesg & | |
4. apacheのログをputしてgetする検証 | |
# python putrecords.py /var/log/httpd/* | |
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257188789232214572976694099970'} | |
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257189998158034187605868806146'} | |
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257191207083853802235043512322'} | |
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257192416009673426897261821954'} | |
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257193624935493041526436528130'} | |
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257194833861312656567928094722'} | |
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257196042787132271197102800898'} | |
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257197251712951885963716460546'} | |
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49549430257140065771124799257198460638771500799049596930'} | |
# python getRecords.py | |
access_log.201505030500がS3上の任意のバケット上に作成される |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Amazon Kinesis/Redshift編~アクセスログをkinesisで加工してTableauで表示してみよう1〜4 | |
http://recipe.kc-cloud.jp/archives/6289 | |
http://recipe.kc-cloud.jp/archives/6362 | |
http://recipe.kc-cloud.jp/archives/6364 | |
http://recipe.kc-cloud.jp/archives/6366 | |
AWS Python SDK(boto)の使い方 | |
http://recipe.kc-cloud.jp/archives/4296 | |
http://recipe.kc-cloud.jp/archives/4300 | |
Getting Started with Boto | |
http://docs.pythonboto.org/en/latest/getting_started.html#making-connections | |
AWSのリージョンとエンドポイント(SDKでよく参照する) | |
http://docs.aws.amazon.com/ja_jp/general/latest/gr/rande.html | |
Amazon Redshiftで高速にINSERT + UPDATEを行なう | |
http://analysis.blog.jp.klab.com/archives/30306912.html | |
pythonでJSONのエンコード、デコード | |
http://qiita.com/unchainendo/items/7865bdcdaadd62f2e435 | |
Pythonでの文字列置換をマスターする | |
http://orangain.hatenablog.com/entry/20100503/1272900555 | |
Logをs3とredshiftに格納する仕組み | |
http://www.slideshare.net/mokemokechicken/logs3redshift | |
AWS S3へのログの蓄積はとりあえずJSONにしましょう | |
http://librabuch.jp/2014/06/redshift_copy_from_json/ | |
Amazon S3 から MySQL へのデータのコピー | |
http://docs.aws.amazon.com/ja_jp/datapipeline/latest/DeveloperGuide/dp-copydata-tomysql.html | |
事例からAmazon Kinesisとは何なのかを学ぶ | |
http://dev.classmethod.jp/cloud/aws/what-is-kinesis/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment