Prerequisites
-
Hive3
-
Download Hive Kafka Storage Handler jar from https://mvnrepository.com/artifact/org.apache.hive/kafka-handler and add in hive lib location
-
Restart hive
- Create topic
hive-topic2
./bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --create --topic hive-topic2 --replication-factor 3 --partitions 3
- Produce some json records
./bin/kafka-console-producer.sh --broker-list kafka2:9092 --topic hive-topic2
{"message":"Hello World"}
{"message":"This is a test"}
{"message":"These are messages"}
- Connect to HS2 using beeline
0: jdbc:hive2://localhost:10000/default>
CREATE EXTERNAL TABLE test_kafka
(`message` string)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES
("kafka.topic" = "hive-topic2",
"kafka.bootstrap.servers"="kafka1:9092,kafka2:9092,kafka3:9092"
);
0: jdbc:hive2://localhost:10000/default>select message from test_kafka;
+-----------------------------+
| message |
+-----------------------------+
| Hello World |
| This is a test |
| These are messages |
+-----------------------------+
- Create topic
hive-avro-topic2
./bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --create --topic hive-avro-topic2 --replication-factor 3 --partitions 3
- Using the demo code in : https://github.com/vinodkc/KafkaExperiments/tree/master/KafkaCore/KafkaClientDemo
mvn package
java ./target/KafkaClientDemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.vkc.ProducerWithAvroSchemaDemo2 hive-avro-topic2 kafka1:9092 100 1000
- Connect to HS2 using beeline
0: jdbc:hive2://localhost:10000/default>
CREATE EXTERNAL TABLE kafka_avro_table
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES
("kafka.topic" = "hive-avro-topic2",
"kafka.bootstrap.servers"="kafka1:9092,kafka2:9092,kafka3:9092",
"kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe",
'avro.schema.literal'='{"namespace": "com.vkc.avro",
"type": "record",
"name": "LogRecord",
"fields": [
{"name": "ip", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "url", "type": "string"},
{"name": "referrer", "type": "string"},
{"name": "useragent", "type": "string"},
{"name": "sessionid", "type": ["null","int"], "default": null}
]
}'
);
0: jdbc:hive2://localhost:10000/default> select ip,from_unixtime(cast(`timestamp`/1000 as bigint)) as ts ,url,referrer,useragent,sessionid from kafka_avro_table where url = 'help.html' limit 5 ;
+--------------+----------------------+------------+------------------+----------------------------------------------------+------------+
| ip | ts | url | referrer | useragent | sessionid |
+--------------+----------------------+------------+------------------+----------------------------------------------------+------------+
| 66.249.1.7 | 2020-07-15 16:00:59 | help.html | www.example.com | Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.125 Safari/537.36 | NULL |
| 66.249.1.31 | 2020-07-15 16:00:59 | help.html | www.example.com | Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.125 Safari/537.36 | NULL |
| 66.249.1.37 | 2020-07-15 16:00:59 | help.html | www.example.com | Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.125 Safari/537.36 | NULL |
| 66.249.1.86 | 2020-07-15 16:00:59 | help.html | www.example.com | Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.125 Safari/537.36 | NULL |
| 66.249.1.98 | 2020-07-15 16:00:59 | help.html | www.example.com | Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.125 Safari/537.36 | NULL |
+--------------+----------------------+------------+------------------+----------------------------------------------------+------------+
5 rows selected (7.954 seconds)
- avro schema used in above kafka producer and hive table definition
{"namespace": "com.vkc.avro",
"type": "record",
"name": "LogRecord",
"fields": [
{"name": "ip", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "url", "type": "string"},
{"name": "referrer", "type": "string"},
{"name": "useragent", "type": "string"},
{"name": "sessionid", "type": ["null","int"], "default": null}
]
}
- This is an example in non kerberized cluster, in case of secure cluster, add required security settings in above kafka client and hive table definition.