Skip to content

Instantly share code, notes, and snippets.

@rag594
Last active September 12, 2024 03:35
Show Gist options
  • Save rag594/d35550490a2af6412db7a3b1a062c71c to your computer and use it in GitHub Desktop.
Save rag594/d35550490a2af6412db7a3b1a062c71c to your computer and use it in GitHub Desktop.
from ksql import KSQLAPI
from influxdb import InfluxDBClient
import json
import requests
HOST="XX.XX.XX.XX"
INFLUX_PORT=XXXX
KSQL_API="http://XX.XX.XX.XX:XXXX"
INLUX_API="http://XX.XX.XX.XX:XXXX"
def get_request(url, sql_string, endpoint):
url = '{}/{}'.format(url, endpoint)
data = json.dumps({
"ksql": sql_string
})
headers = {
"Content-Type": "application/json"
}
if endpoint == 'query':
stream = True
else:
stream = False
r = requests.request(
method="POST",
url=url,
data=data,
timeout=5,
headers=headers,
stream=stream)
return r
def get_ksql_connection():
client=KSQLAPI(KSQL_API)
return client
def get_influx_connection():
client=InfluxDBClient(HOST, INFLUX_PORT, '', '', #####) #Replace #### with your datastore
return client
def prepare_json_payload(chunk):
live_dict=json.loads(str(chunk))
json_body=[{"measurement": "table_name","tags": {"host": "server01","region": "us-west"},"time": live_dict["row"]["columns"][0],"fields": {"amount": live_dict["row"]["columns"][1]}}]
return json_body
def get_stream_data_from_ksql(sql_string):
request_object=get_request(KSQL_API, sql_string, "query")
client=get_influx_connection()
for chunk in request_object.iter_content(chunk_size=128):
if chunk != b'\n':
json_body=prepare_json_payload(chunk.decode('utf-8'))
client.write_points(json_body)
def main():
sql_string="select time,amount from transaction_stream;"
get_stream_data_from_ksql(sql_string)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment