Created
August 1, 2023 06:19
-
-
Save Zia-/74d63771a529d690dbb56dea31fc86de to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import _thread | |
| import time | |
| # import os | |
| # import sys | |
| # import base64 | |
| # print(os.environ['SSL_CERT_FILE']) | |
| # from kafka.admin import KafkaAdminClient, NewTopic | |
| # from kafka import KafkaProducer | |
| # from datetime import timedelta | |
| import json, time | |
| # sys.path.insert(0, "/net/satvol001/innovation/repos/pysrs/1.3.5/") | |
| # from pySRS.utils.generic import connect_to_internet | |
| # os.environ['SSL_CERT_FILE']="/etc/ssl/certs/ca-certificates.crt" | |
| # import asyncio | |
| import websocket | |
| import json | |
| # import ssl | |
| # from datetime import datetime, timezone | |
| # from time import gmtime, strftime | |
| # def connect_to_internet(network="edn"): | |
| # """Creates a proxied, secure connection through CGG servers | |
| # Parameters | |
| # ---------- | |
| # network : str, optional | |
| # Network choice from edn,redhill,redresearch | |
| # Default: 'edn' | |
| # """ | |
| # if network == "redresearch": | |
| # os.environ["HTTP_PROXY"] = "https://165.225.80.40:443" | |
| # os.environ["HTTPS_PROXY"] = "https://165.225.80.40:443" | |
| # os.environ["http_proxy"] = "https://165.225.80.40:443" | |
| # os.environ["https_proxy"] = "https://165.225.80.40:443" | |
| # elif network == "redhill": | |
| # os.environ["HTTP_PROXY"] = "http://165.225.80.40:80" | |
| # os.environ["HTTPS_PROXY"] = "http://165.225.80.40:80" | |
| # os.environ["http_proxy"] = "http://165.225.80.40:80" | |
| # os.environ["https_proxy"] = "http://165.225.80.40:80" | |
| # elif network == "edn": | |
| # os.environ["HTTP_PROXY"] = "http://zproxy.edn.cgg.com:80" | |
| # os.environ["HTTPS_PROXY"] = "http://zproxy.edn.cgg.com:80" | |
| # os.environ["http_proxy"] = "http://zproxy.edn.cgg.com:80" | |
| # os.environ["https_proxy"] = "http://zproxy.edn.cgg.com:80" | |
| # else: | |
| # raise ValueError( | |
| # "network name not recognised, must be one of " | |
| # f"['redresearch', 'redhill', 'edn'] not {network}" | |
| # ) | |
| # os.environ["no_proxy"] = "localhost,127.0.0.1,127.0.1.1,edn.cgg.com,int.cgg.com,satellite" | |
| # os.environ["NO_PROXY"] = "localhost,127.0.0.1,127.0.1.1,edn.cgg.com,int.cgg.com,satellite" | |
| # # TODO why are we accessing a private function from ssl? | |
| # ssl._create_default_https_context = ssl._create_unverified_context | |
| # # TODO remove? | |
| # os.environ["REQUESTS_CA_BUNDLE"] = "/etc/ssl/certs/ca-certificates.crt" | |
| # connect_to_internet() | |
| api_key = "0aa8017c291bbebc2fbc4523760cf787ff6fa6bb" | |
| sleep = 30 * 60 | |
| types = ['ShipStaticData', 'StaticDataReport'] | |
| dump_size = 1000 | |
| # hostname = 'redsat636v.int.cgg.com' | |
| # port = 9092 | |
| messages = [] | |
| # producer = KafkaProducer( | |
| # bootstrap_servers=hostname+":"+str(port), | |
| # # bootstrap_servers = hostname, | |
| # # security_protocol="SSL", | |
| # # ssl_cafile=cert_folder+"/ca.pem", | |
| # # ssl_certfile=cert_folder+"/service.cert", | |
| # # ssl_keyfile=cert_folder+"/service.key", | |
| # value_serializer=lambda v: json.dumps(v).encode('ascii'), | |
| # key_serializer=lambda v: json.dumps(v).encode('ascii') | |
| # ) | |
| def on_message(ws, message): | |
| global messages | |
| message = json.loads(message) | |
| messages.append(message) | |
| if len(messages) > dump_size: | |
| # result_string = "|".join(json.dumps(d) for d in messages) | |
| # producer.send( | |
| # 'test_ais', | |
| # key={"id":time.time()}, | |
| # value={'message': result_string} | |
| # ) | |
| # producer.flush() | |
| print(time.time()) | |
| messages = [] | |
| def on_error(ws, error): | |
| print(error) | |
| print("ERROR") | |
| def on_close(w1, w2, w3): | |
| print('CLOSED CONNECTION') | |
| def on_open(ws): | |
| def run(*args): | |
| # subscribe_message = {"APIKey": api_key, "BoundingBoxes": [[[-5, 50], [5, 60]]]} | |
| subscribe_message = {"APIKey": api_key, "BoundingBoxes": [[[-180, -90], [180, 90]]]} | |
| ws.send(json.dumps(subscribe_message)) | |
| time.sleep(sleep) | |
| ws.close() | |
| _thread.start_new_thread(run, ()) | |
| if __name__ == "__main__": | |
| #websocket.enableTrace(True) | |
| ws = websocket.WebSocketApp("wss://stream.aisstream.io/v0/stream", | |
| on_message = on_message, | |
| on_error = on_error, | |
| on_close = on_close) | |
| try: | |
| ws.on_open = on_open | |
| t0 = time.time() | |
| ws.run_forever() | |
| errored = ws.has_errored | |
| except KeyboardInterrupt: | |
| ws.close() | |
| print('Closed connection') | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment