Skip to content

Instantly share code, notes, and snippets.

@shtrom
Last active December 9, 2020 04:30
Show Gist options
  • Save shtrom/6216d3ac75fef2ea9749999e9c473186 to your computer and use it in GitHub Desktop.
Save shtrom/6216d3ac75fef2ea9749999e9c473186 to your computer and use it in GitHub Desktop.
Simple script to read new records from an AWS Kinesis stream
#!/bin/bash -eu
#
# Test script to read from a Kinesis Data Stream
#
# Olivier Mehani <[email protected]>
#
# Example usage:
#
# REGION=ap-southeast-2 AWS_PROFILE=playground ./read-stream.sh kinesis-cross-account-playground
#
# Write data into the stream directly
#
# aws --profile playground --region ap-southeast-2 kinesis put-records --stream-name kinesis-cross-account-playground --records '{ "PartitionKey": "foo", "Data": "bar" }'
#
AWS='aws'
SED='sed'
UNBASE64='base64 --decode'
function main() {
if [ ${#} -lt 1 ]; then
echo "usage: AWS_PROFILE=... REGION=... usage: ${0} STREAM_NAME [MAX_READS=1000]"
exit 1
fi
REGION=${REGION:-us-east-1}
AWS_PROFILE=${AWS_PROFILE:-default}
export AWS_PROFILE REGION
STREAM_NAME=${1}
MAX_READS=${2:-1000}
info "Searching for stream; profile=${AWS_PROFILE} region=${REGION} stream_name=${STREAM_NAME}"
STREAMS_LIST="$(aws_run_parse 'kinesis list-streams' STREAMNAMES)"
if ! (echo "${STREAMS_LIST}" | grep -q "${STREAM_NAME}"); then
# shellcheck disable=SC2116 disable=SC2086
die "stream name '${STREAM_NAME}' not found in $(echo ${STREAMS_LIST})"
fi
SHARD_IDS="$(aws_run_parse "kinesis list-shards --stream-name ${STREAM_NAME}" SHARDS)"
info "Getting records from stream; stream_name=${STREAM_NAME} nshards=$(echo "${SHARD_IDS}" | count)"
# shellcheck disable=SC2064
trap "kill -TERM -${$}" EXIT
for shard_id in ${SHARD_IDS}; do
read_shard "${STREAM_NAME}" "${shard_id}" &
done
wait
}
function die() {
echo "error: ${1}" >&2
exit 1
}
function count() {
wc -w | ${SED} 's/\s//g'
}
function read_shard() {
stream_name=${1}
shard_id=${2}
shard_iterator="$(aws_run_parse "kinesis get-shard-iterator --stream-name ${stream_name} \
--shard-id ${shard_id} --shard-iterator-type LATEST"
)"
info "Reading shard; shard_id=${shard_id} shard_iterator=${shard_iterator}"
for i in $(seq "${MAX_READS}"); do
RECORDS=$(aws_run_parse "kinesis get-records --shard-iterator ${shard_iterator}")
while read -r type ts record event_id offset; do
if [ -z "${type}" ]; then
continue
fi
case "${type}" in
"0")
shard_iterator="${ts}"
;;
"RECORDS")
event=$(echo "${record}" | ${UNBASE64})
echo "$(date) ${shard_id} [${i}/${MAX_READS}] offset=${offset} ts=${ts} eventId=${event_id} eventData=${event}"
;;
*)
warning "Unknown type from Kinesis; record=${type}"
;;
esac
done <<< "${RECORDS}"
sleep 1
done
}
function aws_run_parse() {
AWS_COMMAND=${1}
KEY=${2:-^}
# shellcheck disable=SC2086
${AWS} --output text --profile ${AWS_PROFILE} --region ${REGION} ${AWS_COMMAND} | ${SED} -n "s/${KEY}//p"
}
function info() {
echo "info: ${1}" >&2
}
function warning() {
echo "warning: ${1}" >&2
}
main "${@}"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment