Created
April 27, 2018 07:31
-
-
Save shiumachi/83e89c231940d8497007d07ca967efa9 to your computer and use it in GitHub Desktop.
Kafka Kudu Demo (WIP)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/sh | |
# logging stdout/stderr | |
set -x | |
exec >> /root/bootstrap-master-init.log 2>&1 | |
date | |
# Master node identifier | |
touch /root/kafka-kudu-demo_edge-node.flag |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
include file("your-aws-info.conf") | |
## Instance Configurations | |
INSTANCE_TYPE_CM: t2.xlarge #vCPU 4, RAM 16G | |
INSTANCE_TYPE_MASTER: t2.large #vCPU 2, RAM 8G | |
INSTANCE_TYPE_WORKER: r3.xlarge #vCPU 4, RAM 30.5G, SSD 80Gx1 | |
WORKER_NODE_NUM: 3 #Number of Worker Nodes | |
## | |
name: kafka-kudu-demo-cluster | |
provider { | |
type: aws | |
accessKeyId: ${?AWS_ACCESS_KEY_ID} | |
secretAccessKey: ${?AWS_SECRET_ACCESS_KEY} | |
region: ${?AWS_REGION} | |
subnetId: ${?AWS_SUBNET_ID} | |
securityGroupsIds: ${?AWS_SECURITY_GROUP} | |
} | |
ssh { | |
username: ${?OS_USERNAME} | |
privateKey: ${?KEY_PAIR} | |
} | |
common-instance-properties { | |
image: ${?AWS_AMI} | |
tags { | |
owner: ${?INSTANCE_OWNER_TAG} | |
} | |
} | |
cloudera-manager { | |
instance: ${common-instance-properties} { | |
type: ${INSTANCE_TYPE_CM} | |
instanceNamePrefix: ${?INSTANCE_NAME_PREFIX}"-cm" | |
tags { | |
application: "Cloudera Manager 5" | |
} | |
bootstrapScriptsPaths: [ | |
"common/bootstrap-common.sh" | |
] | |
} | |
enableEnterpriseTrial: true | |
javaInstallationStrategy: NONE | |
repository: "https://archive.cloudera.com/cm5/redhat/7/x86_64/cm/5.14.1/" | |
repositoryKeyUrl: "https://archive.cloudera.com/cm5/redhat/7/x86_64/cm/RPM-GPG-KEY-cloudera" | |
configs { | |
CLOUDERA_MANAGER { | |
custom_banner_html: "Demo cluster managed by Cloudera Director" | |
} | |
NAVIGATORMETASERVER { | |
# for Navigator Demo | |
navigator_safety_valve: "nav.extractor.poll_period=10" | |
} | |
} | |
} | |
cluster { | |
products { | |
CDH: 5 | |
KAFKA: 3 | |
} | |
parcelRepositories: [ | |
"http://archive.cloudera.com/cdh5/parcels/5.14.0/", | |
"https://archive.cloudera.com/kafka/parcels/3.0/" | |
] | |
services: [ | |
ZOOKEEPER, | |
HDFS, | |
YARN, | |
SPARK_ON_YARN, | |
HIVE, | |
IMPALA, | |
OOZIE, | |
HUE, | |
KUDU, | |
KAFKA | |
] | |
configs { | |
KAFKA { | |
"offsets.topic.replication.factor": 1 | |
} | |
} | |
master { | |
count: 1 | |
instance: ${common-instance-properties} { | |
type: ${INSTANCE_TYPE_MASTER} | |
instanceNamePrefix: ${?INSTANCE_NAME_PREFIX}"-master" | |
tags { | |
group: master | |
} | |
bootstrapScriptsPaths: [ | |
"common/bootstrap-common.sh", | |
"impala-demo-cluster/bootstrap-master.sh" | |
] | |
} | |
roles { | |
ZOOKEEPER: [SERVER] | |
HDFS: [NAMENODE,SECONDARYNAMENODE] | |
YARN: [RESOURCEMANAGER,JOBHISTORY] | |
SPARK_ON_YARN: [SPARK_YARN_HISTORY_SERVER] | |
HIVE: [HIVEMETASTORE,HIVESERVER2] | |
IMPALA: [STATESTORE,CATALOGSERVER] | |
OOZIE: [OOZIE_SERVER] | |
HUE: [HUE_SERVER] | |
KUDU: [KUDU_MASTER] | |
KAFKA: [KAFKA_BROKER] | |
} | |
configs { | |
HDFS { | |
NAMENODE { | |
dfs_name_dir_list: "/data0/nn" | |
} | |
SECONDARYNAMENODE { | |
fs_checkpoint_dir_list: "/data0/snn" | |
} | |
} | |
YARN { | |
RESOURCEMANAGER { | |
yarn_scheduler_maximum_allocation_mb: 8192 | |
yarn_scheduler_maximum_allocation_vcores: 4 | |
} | |
} | |
KUDU { | |
KUDU_MASTER { | |
fs_wal_dir: "/data0/kudu/masterwal" | |
fs_data_dirs: "/data0/kudu/master" | |
} | |
} | |
} | |
} | |
worker { | |
count: ${?WORKER_NODE_NUM} | |
minCount: ${?WORKER_NODE_NUM} | |
instance: ${common-instance-properties} { | |
type: ${INSTANCE_TYPE_WORKER} | |
instanceNamePrefix: ${?INSTANCE_NAME_PREFIX}"-worker" | |
tags { | |
group: worker | |
} | |
bootstrapScriptsPaths: [ | |
"common/bootstrap-common.sh" | |
] | |
} | |
roles { | |
HDFS: [DATANODE] | |
YARN: [NODEMANAGER] | |
IMPALA: [IMPALAD] | |
KUDU: [KUDU_TSERVER] | |
HIVE: [GATEWAY] | |
} | |
configs { | |
HDFS { | |
DATANODE { | |
dfs_data_dir_list: "/data0/dn" | |
} | |
} | |
YARN { | |
NODEMANAGER { | |
yarn_nodemanager_resource_memory_mb: 4096 | |
yarn_nodemanager_resource_cpu_vcores: 2 | |
} | |
} | |
KUDU { | |
KUDU_TSERVER { | |
fs_wal_dir: "/data0/kudu/tabletwal" | |
fs_data_dirs: "/data0/kudu/tablet" | |
#memory_limit_hard_bytes: 17179869184 #16GiB | |
#block_cache_capacity_mb: 2048 #2GiB | |
} | |
} | |
KAFKA { | |
KAFKA_BROKER { | |
broker_max_heap_size: 268435456 | |
} | |
} | |
IMPALA { | |
IMPALAD { | |
impalad_memory_limit: 17179869184 #16GiB | |
} | |
} | |
} | |
} | |
postCreateScriptsPaths: [ | |
"impala-demo-cluster/postcreate-dataload.sh" | |
] | |
instancePostCreateScriptsPaths: [ | |
"impala-demo-cluster/postcreate-kafka-kudu-demo.sh" | |
] | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from kafka import KafkaConsumer | |
from kafka.client import KafkaClient | |
import kudu | |
from kudu.client import Partitioning | |
import argparse | |
def init_argumentparser(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--kudu_master_address', default='', type=str) | |
parser.add_argument('--kudu_master_port', default='7051', type=str) | |
parser.add_argument('--kudu_table', default='test_table', type=str) | |
parser.add_argument('--kafka_broker_address', default='', type=str) | |
parser.add_argument('--kafka_broker_port', default='9092', type=str) | |
parser.add_argument('--kafka_topic', default='test_topic', type=str) | |
parser.add_argument('--config', default='', type=str) | |
return parser | |
def insert_msg(msg, table, session): | |
ts = msg.timestamp | |
value = msg.value.decode('utf-8') | |
op = table.new_insert({'key': ts, 'value': value}) | |
session.apply(op) | |
session.flush() | |
print("key={}, value={}".format(ts, value)) | |
def create_table(kudu_client, kudu_table): | |
# Define a schema for a new table | |
builder = kudu.schema_builder() | |
builder.add_column('key').type(kudu.int64).nullable(False).primary_key() | |
builder.add_column('value', type_=kudu.string, nullable=False, compression='lz4') | |
schema = builder.build() | |
# Define partitioning schema | |
partitioning = Partitioning().add_hash_partitions(column_names=['key'], num_buckets=3) | |
# Create new table | |
kudu_client.create_table(kudu_table, schema, partitioning) | |
def create_kafka_topic(kafka_client, kafka_topic): | |
kafka_client.add_topic(kafka_topic) | |
def main(): | |
argumentparser = init_argumentparser() | |
args = vars(argumentparser.parse_args()) | |
kudu_master_address = args.kudu_master_address | |
kudu_master_port = args.kudu_master_port | |
kudu_table = args.kudu_table | |
kafka_broker_address = args.kafka_broker_address | |
kafka_broker_port = args.kafka_broker_port | |
kafka_topic = args.kafka_topic | |
kudu_client = kudu.connect(host=kudu_master_address, port=kudu_master_port) | |
# create a table | |
tables = kudu_client.list_tables() | |
if kudu_table not in tables: | |
create_table(kudu_client, kudu_table) | |
# Open a table | |
table = kudu_client.table(kudu_table) | |
# Create a new session so that we can apply write operations | |
session = kudu_client.new_session() | |
kafka_bootstrap_servers = ':'.join([kafka_broker_address, str(kafka_broker_port)]) | |
kafka_client = KafkaClient(bootstrap_servers=kafka_bootstrap_servers) | |
# Create a topic | |
topics = kafka_client.cluster.topics() | |
if kafka_topic not in topics: | |
create_kafka_topic(kafka_client, kafka_topic) | |
# Listen a topic as a consumer | |
consumer = KafkaConsumer(kafka_topic, bootstrap_servers=kafka_bootstrap_servers) | |
for msg in consumer: | |
insert_msg(msg, table, session) | |
if __name__ == '__main__': | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/sh | |
# | |
# Post Creation Script for Kafka Kudu Demo | |
# | |
# logging stdout/stderr | |
set -x | |
exec >> /root/instance-postcreate-base 2>&1 | |
date | |
# exit this script if the target host doesn't have a flag file | |
if [ ! -e /root/kafka-kudu-demo_edge-node.flag ]; then | |
echo "INFO: This host is not an edge node. Retry to run in another host ..." | |
exit 0 | |
fi | |
# install EPEL repository | |
yum install -y https://centos7.iuscommunity.org/ius-release.rpm | |
# install setuptools, pip, virtualenv, virtualenvwrapper | |
easy_install setuptools | |
easy_install pip | |
pip install virtualenv virtualenvwrapper | |
source /usr/bin/virtualenvwrapper.sh | |
# install python 3.6 | |
yum -y install python36 python36-devel | |
# create virtualenv | |
mkvirtualenv -p /usr/bin/python3.6 kafka-kudu-demo | |
# install Kafka python client | |
pip install kafka-python | |
# install Kudu repository | |
cd /etc/yum.repos.d | |
wget http://archive.cloudera.com/kudu/redhat/7/x86_64/kudu/cloudera-kudu.repo | |
yum -y install kudu-client-devel | |
# install packages for build | |
yum -y install kudu-client-devel gcc gcc-c++ | |
# install kudu-python | |
# The latest version of Kudu Client 1.7.0 doesn't support 1.6.0 (Cloudera 5.14), | |
# so please specify the previous version 1.2.0 | |
# install Cython before kudu-python. Do not merge into one command. kudu-python cannot resolve dependency. | |
pip install Cython | |
pip install kudu-python==1.2.0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment