Created
March 28, 2021 18:25
-
-
Save duhanebel/68063b43a2548ec6716262a01cb036a3 to your computer and use it in GitHub Desktop.
Proxy service that forward everything from/to the fanju weather station, intercepting the local data and uploading it to influxDB
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import argparse | |
import signal | |
import logging | |
import select | |
import socket | |
import datetime | |
from time import gmtime, strftime | |
from influxdb import InfluxDBClient | |
from influxdb import SeriesHelper | |
# InfluxDB connections settings | |
host = 'URL' | |
port = 8086 | |
user = 'user' | |
password = 'password' | |
dbname = 'fanju_weather' | |
class Measurement: | |
sensorID = "000" | |
humidity = 0 | |
battery = 0 | |
def __init__(self, id, humidity, battery): | |
sensorID = id | |
self.humidity = humidity | |
self.battery = battery | |
dbclient = InfluxDBClient(host, port, user, password, dbname) | |
class FanjuSeries(SeriesHelper): | |
class Meta: | |
client = dbclient | |
# The series name must be a string. Add dependent fields/tags | |
# in curly brackets. | |
series_name = 'sensordata' | |
# Defines all the fields in this time series. | |
fields = ['temperature', 'humidity'] | |
# Defines all the tags for the series. | |
tags = ['device'] | |
# Defines the number of data points to store prior to writing | |
# on the wire. | |
bulk_size = 4 | |
# autocommit must be set to True when using bulk_size | |
autocommit = True | |
FORMAT = '%(asctime)-15s %(message)s' | |
logging.basicConfig(format=FORMAT) | |
LOGGER = logging.getLogger() | |
def remove_header_footer(str): | |
assert(str.startswith("aa3c5701009569f05180")) | |
assert(str.endswith("cc3e")) | |
new_str = str[20:-4] | |
return new_str | |
def is_local_data_request(str): | |
return str.startswith("53300100") | |
def is_weather_request(str): | |
return str.startswith("5231010000008104") | |
def is_stats_request(str): | |
return str.startswith("5230010000008004") | |
LOCAL_DATA_HANDLER = lambda req:req | |
REMOTE_DATA_HANDLER = lambda x:x | |
BUFFER_SIZE = 2 ** 10 # 1024. Keep buffer size as power of 2. | |
def ftoc(f): | |
return round((f - 32) * 5/9, 1) | |
def convert_num(num): | |
return (num - 900) / 10 | |
def process_local_request(str): | |
tempi = ftoc(convert_num(int(str[32:34] + str[30:32], 16))) | |
humi = int(str[34:36], 16) | |
temp1 = ftoc(convert_num(int(str[50:52] + str[48:50], 16))) | |
hum1 = int(str[52:54], 16) | |
temp2 = ftoc(convert_num(int(str[68:70] + str[66:68], 16))) | |
hum2 = int(str[70:72], 16) | |
temp3 = ftoc(convert_num(int(str[86:88] + str[84:86], 16))) | |
hum3 = int(str[88:90], 16) | |
LOGGER.debug('Processing request: i: {}C {}%, 1: {}C {}%, 2: {}C {},% 3: {}C {}%'.format(tempi, humi, temp1, hum1, temp2, hum2, temp3, hum3)) | |
FanjuSeries(device="inside", temperature=tempi, humidity=humi) | |
FanjuSeries(device="outside1", temperature=temp1, humidity=hum1) | |
FanjuSeries(device="outside2", temperature=temp2, humidity=hum2) | |
FanjuSeries(device="outside3", temperature=temp3, humidity=hum3) | |
FanjuSeries.commit() | |
def udp_proxy(src, dst): | |
"""Run UDP proxy. | |
Arguments: | |
src -- Source IP address and port string. I.e.: '127.0.0.1:8000' | |
dst -- Destination IP address and port. I.e.: '127.0.0.1:8888' | |
""" | |
LOGGER.debug('Starting UDP proxy...') | |
LOGGER.debug('Src: {}'.format(src)) | |
LOGGER.debug('Dst: {}'.format(dst)) | |
proxy_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
proxy_socket.bind(ip_to_tuple(src)) | |
client_address = None | |
server_address = ip_to_tuple(dst) | |
LOGGER.debug('Looping proxy (press Ctrl-Break to stop)...') | |
while True: | |
data, address = proxy_socket.recvfrom(BUFFER_SIZE) | |
hex_data_trim = remove_header_footer(data.hex()) | |
if client_address == None: | |
client_address = address | |
if address == client_address: | |
LOGGER.debug("") | |
LOGGER.debug("REQUEST: {}".format(hex_data_trim)) | |
data = LOCAL_DATA_HANDLER(data) | |
if is_local_data_request(hex_data_trim) == True: | |
LOGGER.debug("Received local data request") | |
process_local_request(hex_data_trim) | |
proxy_socket.sendto(data, server_address) | |
elif address == server_address: | |
LOGGER.debug("RESPONSE: Received from server: {}".format(hex_data_trim)) | |
data = REMOTE_DATA_HANDLER(data) | |
proxy_socket.sendto(data, client_address) | |
client_address = None | |
else: | |
LOGGER.warning('Unknown address: {}'.format(str(address))) | |
# end-of-function udp_proxy | |
def tcp_proxy(src, dst): | |
"""Run TCP proxy. | |
Arguments: | |
src -- Source IP address and port string. I.e.: '127.0.0.1:8000' | |
dst -- Destination IP address and port. I.e.: '127.0.0.1:8888' | |
""" | |
LOGGER.debug('Starting TCP proxy...') | |
LOGGER.debug('Src: {}'.format(src)) | |
LOGGER.debug('Dst: {}'.format(dst)) | |
sockets = [] | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
s.bind(ip_to_tuple(src)) | |
s.listen(1) | |
s_src, _ = s.accept() | |
s_dst = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
s_dst.connect(ip_to_tuple(dst)) | |
sockets.append(s_src) | |
sockets.append(s_dst) | |
while True: | |
s_read, _, _ = select.select(sockets, [], []) | |
for s in s_read: | |
data = s.recv(BUFFER_SIZE) | |
if s == s_src: | |
d = LOCAL_DATA_HANDLER(data) | |
s_dst.sendall(d) | |
elif s == s_dst: | |
d = REMOTE_DATA_HANDLER(data) | |
s_src.sendall(d) | |
# end-of-function tcp_proxy | |
def ip_to_tuple(ip): | |
"""Parse IP string and return (ip, port) tuple. | |
Arguments: | |
ip -- IP address:port string. I.e.: '127.0.0.1:8000'. | |
""" | |
ip, port = ip.split(':') | |
return (ip, int(port)) | |
# end-of-function ip_to_tuple | |
def main(): | |
"""Main method.""" | |
parser = argparse.ArgumentParser(description='TCP/UPD proxy.') | |
# TCP UPD groups | |
proto_group = parser.add_mutually_exclusive_group(required=True) | |
proto_group.add_argument('--tcp', action='store_true', help='TCP proxy') | |
proto_group.add_argument('--udp', action='store_true', help='UDP proxy') | |
parser.add_argument('-s', '--src', required=True, help='Source IP and port, i.e.: 127.0.0.1:8000') | |
parser.add_argument('-d', '--dst', required=True, help='Destination IP and port, i.e.: 127.0.0.1:8888') | |
output_group = parser.add_mutually_exclusive_group() | |
output_group.add_argument('-q', '--quiet', action='store_true', help='Be quiet') | |
output_group.add_argument('-v', '--verbose', action='store_true', help='Be loud') | |
args = parser.parse_args() | |
if args.quiet: | |
LOGGER.setLevel(logging.CRITICAL) | |
if args.verbose: | |
LOGGER.setLevel(logging.NOTSET) | |
if args.udp: | |
udp_proxy(args.src, args.dst) | |
elif args.tcp: | |
tcp_proxy(args.src, args.dst) | |
# end-of-function main | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment