Last active
September 12, 2024 03:35
-
-
Save rag594/d35550490a2af6412db7a3b1a062c71c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from ksql import KSQLAPI | |
from influxdb import InfluxDBClient | |
import json | |
import requests | |
HOST="XX.XX.XX.XX" | |
INFLUX_PORT=XXXX | |
KSQL_API="http://XX.XX.XX.XX:XXXX" | |
INLUX_API="http://XX.XX.XX.XX:XXXX" | |
def get_request(url, sql_string, endpoint): | |
url = '{}/{}'.format(url, endpoint) | |
data = json.dumps({ | |
"ksql": sql_string | |
}) | |
headers = { | |
"Content-Type": "application/json" | |
} | |
if endpoint == 'query': | |
stream = True | |
else: | |
stream = False | |
r = requests.request( | |
method="POST", | |
url=url, | |
data=data, | |
timeout=5, | |
headers=headers, | |
stream=stream) | |
return r | |
def get_ksql_connection(): | |
client=KSQLAPI(KSQL_API) | |
return client | |
def get_influx_connection(): | |
client=InfluxDBClient(HOST, INFLUX_PORT, '', '', #####) #Replace #### with your datastore | |
return client | |
def prepare_json_payload(chunk): | |
live_dict=json.loads(str(chunk)) | |
json_body=[{"measurement": "table_name","tags": {"host": "server01","region": "us-west"},"time": live_dict["row"]["columns"][0],"fields": {"amount": live_dict["row"]["columns"][1]}}] | |
return json_body | |
def get_stream_data_from_ksql(sql_string): | |
request_object=get_request(KSQL_API, sql_string, "query") | |
client=get_influx_connection() | |
for chunk in request_object.iter_content(chunk_size=128): | |
if chunk != b'\n': | |
json_body=prepare_json_payload(chunk.decode('utf-8')) | |
client.write_points(json_body) | |
def main(): | |
sql_string="select time,amount from transaction_stream;" | |
get_stream_data_from_ksql(sql_string) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment