Created
November 28, 2018 06:34
-
-
Save shiumachi/28e3167b50e643e95e79d315d4b07eb6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from kafka import KafkaConsumer | |
from kafka.client import KafkaClient | |
import kudu | |
from kudu.client import Partitioning | |
import argparse | |
def init_argumentparser(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--kudu_master_address', default='', type=str, required=True) | |
parser.add_argument('--kudu_master_port', default='7051', type=str) | |
parser.add_argument('--kudu_table', default='test_table', type=str) | |
parser.add_argument('--kafka_broker_address', default='', type=str, required=True) | |
parser.add_argument('--kafka_broker_port', default='9092', type=str) | |
parser.add_argument('--kafka_topic', default='test_topic', type=str) | |
parser.add_argument('--config', default='', type=str) | |
return parser | |
def insert_msg(msg, table, session): | |
ts = msg.timestamp | |
value = msg.value.decode('utf-8') | |
op = table.new_insert({'key': ts, 'value': value}) | |
session.apply(op) | |
session.flush() | |
print("key={}, value={}".format(ts, value)) | |
def create_table(kudu_client, kudu_table): | |
# Define a schema for a new table | |
builder = kudu.schema_builder() | |
builder.add_column('key').type(kudu.int64).nullable(False).primary_key() | |
builder.add_column('value', type_=kudu.string, nullable=False, compression='lz4') | |
schema = builder.build() | |
# Define partitioning schema | |
partitioning = Partitioning().add_hash_partitions(column_names=['key'], num_buckets=3) | |
# Create new table | |
kudu_client.create_table(kudu_table, schema, partitioning) | |
def create_kafka_topic(kafka_client, kafka_topic): | |
kafka_client.add_topic(kafka_topic) | |
def main(): | |
argumentparser = init_argumentparser() | |
args = argumentparser.parse_args() | |
kudu_master_address = args.kudu_master_address | |
kudu_master_port = args.kudu_master_port | |
kudu_table = args.kudu_table | |
kafka_broker_address = args.kafka_broker_address | |
kafka_broker_port = args.kafka_broker_port | |
kafka_topic = args.kafka_topic | |
kudu_client = kudu.connect(host=kudu_master_address, port=kudu_master_port) | |
# create a table | |
tables = kudu_client.list_tables() | |
if kudu_table not in tables: | |
create_table(kudu_client, kudu_table) | |
# Open a table | |
table = kudu_client.table(kudu_table) | |
# Create a new session so that we can apply write operations | |
session = kudu_client.new_session() | |
kafka_bootstrap_servers = ':'.join([kafka_broker_address, str(kafka_broker_port)]) | |
kafka_client = KafkaClient(bootstrap_servers=kafka_bootstrap_servers) | |
# Create a topic | |
topics = kafka_client.cluster.topics() | |
if kafka_topic not in topics: | |
create_kafka_topic(kafka_client, kafka_topic) | |
# Listen a topic as a consumer | |
consumer = KafkaConsumer(kafka_topic, bootstrap_servers=kafka_bootstrap_servers) | |
for msg in consumer: | |
insert_msg(msg, table, session) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment