글쓴이: 김정주([email protected])
이 글은 AWS 블로그를 참고하고 추가/보완하여 작성되었습니다.
Kinesis는 아마존 웹서비스(AWS)에서 제공하는 실시간 데이터 처리기이다.
여기에서는 AWS Kinesis를 사용하여 App에서 로그를 수집하는 방법을 살펴본다.
Kinesis는 다음과 같은 특징이 있다.
- 스트림만 생성하면 바로 데이터 수집이 가능
- 스케일 인/아웃이 자유롭다.
- 인증을 통한 네크워킹
- 스트림 당 25샤드(Shard)까지 가질 수 있다(별도 계약으로 더 쓸 수 있음)
- SDK를 사용할 수 없다면 HTTPS로 데이터를 보내야 함
- 쓰기: 한 샤드에 대해 초당 1000레코드, 1MB크기
- 읽기: 한 샤드에 대해 초당 5트랜잭션, 2MB크기
초당 쓰기 레코드 수를 늘리려면 샤드를 늘리거나(비용 증가), 뒤에서 설명할 모아(Collect) 보내기를 이용해야한다.
- 샤드의 Split/Merge를 통해 구현. 유저가 폭증할 경우는 AWS측과 최대 샤드 수 증가 협의 필요
- HA는 기본적으로 지원
다음과 같은 방법이 있다.
- AWS에서 제공하는 SDK(다양한 언어지원)
- AWS Service(REST) API
- 범용으로 쓸 수 있으나 인증 과정이 좀 복잡하다.
- KPL(Kinesis Producer Library와 KCL(Kinesis Client Library)을 사용
- KPL과 KCL을 사용하면 Auto (De)Aggregation을 통해 퍼포먼스 향상 효과가 있으나,
- KPL은 Java만 지원하고, KCL은 언어별 라이브러리를 제공하여,
- KPL은 제한적인 환경에서 사용할 수 있을 것 같다.
- 모바일 디바이스의 경우 AWS Mobile SDK을 사용하면 편리하게 Kinesis를 이용할 수 있다.
- IoT 디바이스
- mqtt-kinesis-bridge를 사용하면 MQTT를 받을 수 있다. (Front 서버 필요)
- AWS IoT 서비스를 사용하면 MQTT와 HTTP를 사용할 수 있는 것 같으나, 이 부분은 아직 레퍼런스가 충분하지 않음
검토해본 결과, 일반적으로 사용하기에 SDK가 무난한 것 같다.
Kinesis는 다양한 툴과 랭귀지를 지원하지만, 여기에서는 aws cli 와 Python SDK(boto)을 이용해 진행하겠다. AWS 콘솔페이지에서 KinesisFullAccess
권한이 있는 IAM 계정을 만들고, aws cli 설치 후 다음과 같이 정보를 설정한다.
$ aws configure
AWS Access Key ID [None]: AKIAIOSFODNN7EXAMPLE
AWS Secret Access Key [None]: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
Default region name [None]: ap-northeast-1
Default output format [None]: json
다음과 같이 aws cli를 사용하여 새 스트림을 생성할 수 있다. (이름은 'KinesisTest', 샤드 수는 1로 한다.)
$ aws kinesis create-stream --stream-name=KinesisTest --shard-count=1
이제 KinesisTest
스트림이 생성되었다.
스트림의 정보는 다음과 같이 볼 수있다.
$ aws kinesis describe-stream --stream-name=KinesisTest
{
"StreamDescription": {
"RetentionPeriodHours": 24,
"StreamStatus": "ACTIVE",
"StreamName": "KinesisTest",
"StreamARN": "arn:aws:kinesis:ap-northeast-1:415742736303:stream/KinesisTest",
"Shards": [
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"EndingHashKey": "340282366920938463463374607431768211455",
"StartingHashKey": "0"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49558153526886807271208233148040194081103188177553719298"
}
}
]
}
}
스트림의 상태(Status)가 ACTIVE
면 데이터를 전송할 수 있다. 샤드가 하나 있고 Id는 shardId-000000000000
인 것을 확인할 수 있다.
AWS콘솔 페이지에서도 다음과 같이 확인할 수 있다.
생성된 스트림으로 데이터를 보내겠다. Kinesis에서는 데이터를 '레코드'라고 칭한다. 데이터의 단위라고 이해하면 되겠다.
다음과 같이 put.py
파일을 생성한다. 여기에서 testdata
모듈은 임의의 유저 데이터를 생성해주기 위한 용도이다.
import json
import testdata
from boto import kinesis
kinesis = kinesis.connect_to_region('ap-northeast-1')
# Kinesis에 넣을 가짜 유저데이터 생성기
class Users(testdata.DictFactory):
firstname = testdata.FakeDataFactory('firstName')
lastname = testdata.FakeDataFactory('lastName')
age = testdata.RandomInteger(10, 30)
gender = testdata.RandomSelection(['female', 'male'])
for user in Users().generate(50):
print(user)
# 스트림에 레코드를 하나씩 PUT
kinesis.put_record('KinesisTest', json.dumps(user), 'partitionkey')
실행한다.
$ python put.py
{'lastname': 'Hansen', 'age': 19, 'firstname': 'Lou', 'gender': 'male'}
{'lastname': 'Hahn', 'age': 23, 'firstname': 'Justice', 'gender': 'female'}
{'lastname': 'Kozey', 'age': 29, 'firstname': 'Dalton', 'gender': 'male'}
{'lastname': 'Blick', 'age': 26, 'firstname': 'Missouri', 'gender': 'female'}
...
가상의 유저 데이터가 JSON 형태로 Kinesis 스트림에 보내지는 것을 확인할 수 있다.
put_record
에서 사용되는 파티션키는 샤드가 여럿일 경우 샤드를 결정하는 데 사용된다. 현재는 샤드가 하나 밖에 없어서 파티션키를 지정해도 의미가 없다.
스트림에 저장된 레코드는 유효 기간(기본 24시간)이 지나면 사라진다. 스트림에서 데이터를 얻어와 S3나 DynamoDB와 같은 영속적인 저장소에 저장해야 하는데, 이 과정은 사용자가 직접 구현하고, EC2 같은 서버에서 실행되어야 한다. 이런 프로그램을 Kinesis Consumer(또는 Stream) Application이라고 한다. (FireHose를 사용하면 필요가 없으나 아직 모든 리전에서 사용할 수 없다)
다음과 같이 get.py
를 생성한다.
import json
from boto import kinesis
import time
kinesis = kinesis.connect_to_region('ap-northeast-1')
shard_it = kinesis.get_shard_iterator('KinesisTest', 'shardId-000000000000', 'LATEST')['ShardIterator']
while True:
out = kinesis.get_records(shard_it, limit=2)
print(out['Records'])
shard_it = out['NextShardIterator']
time.sleep(0.2)
실행하면 다음과 같이 빈 Records만 나온다.
$ python get.py
[]
[]
[]
...
이것은 kinesis.get_shard_iterator
의 LATEST
옵션이 가장 최근의 레코드를 표시하는 옵션이어서 그렇다. (이외에도 다양한 옵션이 있다.) get.py
를 실행한 상태에서 새로운 터미널을 열고 put.py
를 다시 실행해보면 데이터가 받아지는 것을 확인할 수 있다.
$ python get.py
[]
[]
[{u'Data': u'{"lastname": "Heaney", "age": 11, "firstname": "Niko", "gender": "male"}', u'PartitionKey': u'partitionkey', u'ApproximateArrivalTimestamp': 1452576997.186, u'SequenceNumber': u'49558153526886807271208233148423423565921135851976261634'}]
...
Kinesis의 강점은 실시간으로 샤드를 나누거나 합쳐서, 변화하는 데이터 양에 대응할 수 있다는 점이다.
데이터가 몰리면 부하를 분산하기 위해서 더 많은 샤드가 필요하다. 새로운 샤드는 기존의 샤드를 두 개로 분리하여 생성하게 된다. 이때 기존 샤드를 부모 샤드, 새로이 생성된 샤드를 자식 샤드라고 한다. 샤드 나누기도 aws cli를 통해서 할 수 있으나, 자식 샤드의 시작 해쉬값을 부모 샤드에 할당된 범위에서 계산해야 하기에 스크립트를 만들어서 하겠다.
다음과 같이 split.py
를 생성한다.
from boto import kinesis
kinesis = kinesis.connect_to_region('ap-northeast-1')
sinfo = kinesis.describe_stream('KinesisTest')
hkey = int(sinfo['StreamDescription']['Shards'][0]['HashKeyRange']
['EndingHashKey'])
shard_id = 'shardId-000000000000'
kinesis.split_shard('KinesisTest', shard_id, str(hkey/2))
실행하면 첫 번째 샤드를 분리하게 된다.
$ python split.py
샤드 분리에는 시간이 좀 걸리는데, 이 동안에도 put.py
를 실행해 데이터를 보낼 수 있다.
부모 샤드는 자식 샤드가 완전히 초기화 될 때까지 계속 레코드를 받을 수 있다. 자식 샤드의 초기화가 끝난 후 새로운 레코드는 자식 샤드로 들어간다. 부모 샤드의 기존 레코드는 유효 기간이 남아있는 한 읽기가 가능하며, 자기가 가진 모든 레코드의 유효 기간이 끝날 때 부모 샤드는 사라지게 된다.
실재로 샤드 분리 명령을 내리고 바로 put.py
를 실행해보면(먼저 아래의 '파티션 키 지정해서 보내기' 작업 필요), 1번 샤드로 데이터가 가다가, 잠시 후 2번, 3번 샤드로 가는 것을 확인할 수 있었다.
샤드 분리 명령 잠시 후에 다시 스트림 정보를 살펴보면, 다음과 같이 2번 샤드( shardId-000000000001
)와 3번 샤드(shardId-000000000002
)샤드가 생성된 것을 확인할 수 있다.
$ aws describe-stream --stream-name=KinesisTest
...
"Shards": [
{
"ShardId": "shardId-000000000000",
...
},
{
"ShardId": "shardId-000000000001",
...
},
{
"ShardId": "shardId-000000000002",
...
}
]
...
이제 샤드가 2개가 되기에 레코드를 보낼 때 의미있는 파티션 키를 사용하겠다. 일단 유저의 성별을 파티션 정보로 사용하도록 한다. 기존의 put.py
파일에서 put_record
부분을 다음과 같이 고친다.
pinfo = kinesis.put_record('KinesisTest', json.dumps(user), str(hash(user['gender'])))
print(pinfo)
실행해보면,
$ python sput.py
{'lastname': 'Breitenberg', 'age': 17, 'firstname': 'Nickolas', 'gender': 'male'}
{u'ShardId': u'shardId-000000000002', u'SequenceNumber': u'49558150195735293985894932004291066377443943335107493922'}
{'lastname': "O'Keefe", 'age': 24, 'firstname': 'Joseph', 'gender': 'male'}
{u'ShardId': u'shardId-000000000002', u'SequenceNumber': u'49558150195735293985894932004292275303263557964282200098'}
{'lastname': 'Haley', 'age': 23, 'firstname': 'Blanca', 'gender': 'female'}
{u'ShardId': u'shardId-000000000001', u'SequenceNumber': u'49558150195712993240696401381151948510810524231950925842'}
유저의 성별에 따라서 다른 샤드로 보내지는 것을 알 수 있다.
비용 절감을 위해서 데이터의 양이 줄어들면 샤드를 같이 줄이는 것이 필요하다. 다음의 명령으로 2, 3번 째 샤드를 합친다.
$ aws kinesis merge-shards --stream-name=KinesisTest --shard-to-merge=shardId-000000000001 --adjacent-shard-to-merge=shardId-000000000002
다시 스트림의 상태를 보면 다음과 같이 나온다.
$ aws kinesis describe-stream --stream-name=KinesisTest
...
"Shards": [
{
"ShardId": "shardId-000000000000",
...
},
{
"ShardId": "shardId-000000000001",
...
},
{
"ShardId": "shardId-000000000002",
...
},
{
"ShardId": "shardId-000000000003",
...
}
]
}
}
2, 3번 샤드가 합쳐진 4번 샤드(shardId-000000000003
)가 새로 생성된 것을 알 수 있다.
여기에서는 Kinesis를 제대로 활용하기 위한 테크닉을 살펴보겠다.
실무에서 샤드 확장은 높은 확률로 발생하기에, 처음부터 파티션키를 샤드의 갯수보다 넉넉히 주는 것이 좋다. (Kinesis는 MD5 Hash함수를 사용해 파티션 키를 128bit 정수값으로 변환하여 샤드 지정에 사용한다)
예제에서 사용한 유저 데이터의 경우 age
항목이 적절한 파티션키가 될 것이다.
한 번에 하나씩 레코드를 보내는 것 보다, 다수의 레코드를 모아 보내면 퍼포먼스도 좋고 비용도 절감할 수 있다. AWS에서는 이것을 Collection이라고 한다. (비슷한 Batching개념으로 Aggregation이 있으나, 이는 KPL을 통해서 사용해야 할 것이다) put_records
를 사용하여 다음과 같이 put.py
를 개선할 수 있다.
import json
import testdata
from boto import kinesis
kinesis = kinesis.connect_to_region('ap-northeast-1')
class Users(testdata.DictFactory):
firstname = testdata.FakeDataFactory('firstName')
lastname = testdata.FakeDataFactory('lastName')
age = testdata.RandomInteger(10, 30)
gender = testdata.RandomSelection(['female', 'male'])
for i in range(5):
records = []
for user in Users().generate(10):
record = {'Data': json.dumps(user), 'PartitionKey':
str(hash(user['age']))}
records.append(record)
pinfo = kinesis.put_records(records, 'KinesisTest')
print pinfo
이렇게 하면 한 번 접속에 많은 레코드를 보내기에, 빠른 속도로 전송된다.
기존 샤드를 분리하면 자식 샤드가 생기는데, 데이터의 순서가 중요하다면 데이터를 꺼낼 때 다음과 같이 해야한다.
- 부모 샤드에 처리되지 않고 남아있는 레코드를 먼저 처리해 준다.
- 부모 샤드 처리 후 자식 샤드에 대해 처리한다.
또, 샤드 분리와 병합이 잦은 경우, 샤드Id로 이터레이터를 얻어와 처리하는 과정이 좀 번거롭다.
이런 이유들로 서비스에 적용할 때는 SDK가 아닌 KCL(Kinesis Client Library)을 사용하면 편리할 것이다. (여기에서 Client는 Client/Server의 그것과 다름에 주의하자) KCL은 다음과 같은 특징이 있다.
- 복수의 워커를 띄워 병렬 처리 (샤드에 변경이 있는 경우 그 상태를 DynamoDB에 저장하고 워커들과 공유한다)
- 부모 샤드의 레코드를 처리 후 자식 샤드로 넘어가 순서를 보장한다.
- KCL은 기본적으로 Java 라이브러리이고, 다른 언어(Python, .NET, Ruby 등)를 위한 랩퍼를 제공한다. 따라서 Java가 설치되어 있어야 한다.
(KCL을 사용하는 Consumer Application을 KCL Application이라고도 한다)
파이썬용 KCL 을 이용면 다음과 같은 형식의 코드가 된다.
from amazon_kclpy import kcl
import json, base64
class RecordProcessor(kcl.RecordProcessorBase):
def initialize(self, shard_id):
pass
def process_records(self, records, checkpointer):
pass
def shutdown(self, checkpointer, reason):
pass
if __name__ == "__main__":
kclprocess = kcl.KCLProcess(RecordProcessor())
kclprocess.run()
여기에서 process_records
함수를 구현하여, S3에 저장하는 등의 작업을 처리한다.
Kinesis를 사용하는 시스템은 일반적으로 다음과 같이 구성될 것이다. (DynamoDB부분은 KCL에서 자동으로 사용)
App에서 Kinesis 스트림으로 데이터를 보내기 위해서는 AWS Credential 정보가 필요하다. PC나 모바일 디바이스의 경우, 역 엔지니어링을 통해 App에 내장된 Credential이 누출되어 어뷰징될 가능성이 있기에 Cognito를 사용하여 임시 계정을 발급받아 사용하는 것이 바람직하다. AWS에서 제공하는 모바일 SDK들은 이런 방식을 채택하고 있다.
실재 서비스에서는 AWS 콘솔페이지에서 Kinesis 스트림의 상태를 모니터링 하면서 필요에 따라 샤딩을 이용하자.
필요없게된 스트림은 삭제한다.
$ aws kinesis delete-stream --stream-name=KinesisTest
http://docs.aws.amazon.com/kinesis/latest/dev/developing-producers-with-kpl.html http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html http://www.slideshare.net/zenos2408/aws-tajo?qid=426c3f19-d837-4619-823f-542306e60a31&v=qf1&b=&from_search=1 http://aws.amazon.com/ko/cognito/ https://dzone.com/articles/amazon-kinesis-is-20x-cheaper-when-used-with-the-k http://boto.cloudhackers.com/en/latest/ref/kinesis.html http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-using-sdk-java-after-resharding.html
감사합니다.
@channy 알려주신 유틸리티도 살펴보겠습니다. 감사합니다.