Skip to content

Instantly share code, notes, and snippets.

@vinodkc
Last active August 18, 2020 12:25
Show Gist options
  • Save vinodkc/010aacc0d497f4050a0cdc999f5bab55 to your computer and use it in GitHub Desktop.
Save vinodkc/010aacc0d497f4050a0cdc999f5bab55 to your computer and use it in GitHub Desktop.

Kafka Hive integration

Prerequisites

  1. Hive3

  2. Download Hive Kafka Storage Handler jar from https://mvnrepository.com/artifact/org.apache.hive/kafka-handler and add in hive lib location

  3. Restart hive

JSON format Example

  1. Create topic hive-topic2
./bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka  --create --topic hive-topic2  --replication-factor 3 --partitions 3
  1. Produce some json records
./bin/kafka-console-producer.sh --broker-list kafka2:9092 --topic hive-topic2
{"message":"Hello World"}
{"message":"This is a test"}
{"message":"These are messages"}
  1. Connect to HS2 using beeline
0: jdbc:hive2://localhost:10000/default>
    CREATE EXTERNAL TABLE test_kafka
    (`message` string)
    STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
    TBLPROPERTIES
    ("kafka.topic" = "hive-topic2",
     "kafka.bootstrap.servers"="kafka1:9092,kafka2:9092,kafka3:9092"
     );

  
  0: jdbc:hive2://localhost:10000/default>select message  from test_kafka;
  
+-----------------------------+
|           message           |
+-----------------------------+
| Hello World                 |
| This is a test              |
| These are messages          |
+-----------------------------+

Avro format Example

  1. Create topic hive-avro-topic2
./bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka  --create --topic hive-avro-topic2  --replication-factor 3 --partitions 3
  1. Using the demo code in : https://github.com/vinodkc/KafkaExperiments/tree/master/KafkaCore/KafkaClientDemo
  mvn package 
  java ./target/KafkaClientDemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.vkc.ProducerWithAvroSchemaDemo2 hive-avro-topic2 kafka1:9092 100 1000
  1. Connect to HS2 using beeline
0: jdbc:hive2://localhost:10000/default>
CREATE EXTERNAL TABLE kafka_avro_table
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES
("kafka.topic" = "hive-avro-topic2",
"kafka.bootstrap.servers"="kafka1:9092,kafka2:9092,kafka3:9092",
"kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe",
'avro.schema.literal'='{"namespace": "com.vkc.avro",
  "type": "record",
  "name": "LogRecord",
  "fields": [
    {"name": "ip", "type": "string"},
    {"name": "timestamp",  "type": "long"},
    {"name": "url",  "type": "string"},
    {"name": "referrer",  "type": "string"},
    {"name": "useragent",  "type": "string"},
    {"name": "sessionid",  "type": ["null","int"], "default": null}
  ]
}'
);

0: jdbc:hive2://localhost:10000/default>  select ip,from_unixtime(cast(`timestamp`/1000 as bigint)) as ts ,url,referrer,useragent,sessionid  from kafka_avro_table where url = 'help.html'  limit 5 ;

+--------------+----------------------+------------+------------------+----------------------------------------------------+------------+
|      ip      |          ts          |    url     |     referrer     |                     useragent                      | sessionid  |
+--------------+----------------------+------------+------------------+----------------------------------------------------+------------+
| 66.249.1.7   | 2020-07-15 16:00:59  | help.html  | www.example.com  | Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.125 Safari/537.36 | NULL       |
| 66.249.1.31  | 2020-07-15 16:00:59  | help.html  | www.example.com  | Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.125 Safari/537.36 | NULL       |
| 66.249.1.37  | 2020-07-15 16:00:59  | help.html  | www.example.com  | Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.125 Safari/537.36 | NULL       |
| 66.249.1.86  | 2020-07-15 16:00:59  | help.html  | www.example.com  | Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.125 Safari/537.36 | NULL       |
| 66.249.1.98  | 2020-07-15 16:00:59  | help.html  | www.example.com  | Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.125 Safari/537.36 | NULL       |
+--------------+----------------------+------------+------------------+----------------------------------------------------+------------+
5 rows selected (7.954 seconds)

Note :

  1. avro schema used in above kafka producer and hive table definition
{"namespace": "com.vkc.avro",
     "type": "record",
     "name": "LogRecord",
     "fields": [
       {"name": "ip", "type": "string"},
       {"name": "timestamp",  "type": "long"},
       {"name": "url",  "type": "string"},
       {"name": "referrer",  "type": "string"},
       {"name": "useragent",  "type": "string"},
       {"name": "sessionid",  "type": ["null","int"], "default": null}
     ]
   }
  1. This is an example in non kerberized cluster, in case of secure cluster, add required security settings in above kafka client and hive table definition.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment