Created
April 21, 2024 02:44
-
-
Save franzwong/fd47388d0262334be91a95f57f11f65d to your computer and use it in GitHub Desktop.
Mock kdb+/q ticker plant
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import struct | |
import sys | |
import threading | |
import numpy as np | |
from qpython import qconnection, MetaData, CONVERSION_OPTIONS | |
from qpython._pandas import PandasQReader, PandasQWriter | |
from qpython.qcollection import QTable, qlist, qtable | |
from qpython.qconnection import MessageType | |
from qpython.qreader import QReader, QMessage | |
from qpython.qtype import QException, QSYMBOL_LIST, QFLOAT_LIST | |
from qpython.qwriter import QWriter | |
class ClientConnection: | |
def __init__(self, encoding: str, connection: socket.socket, address): | |
self._encoding = encoding | |
self._conn = connection | |
self._addr = address | |
self._reader: QReader | None = None | |
self._writer: QWriter | None = None | |
def initialize(self, ipc_version): | |
conn_file = self._conn.makefile('rb') | |
self._reader = PandasQReader(conn_file, encoding=self._encoding) | |
self._writer = PandasQWriter(self._conn, protocol_version=ipc_version, encoding=self._encoding) | |
def close(self): | |
self._conn.close() | |
def raw_recv(self, buffer_size, flags=0): | |
return self._conn.recv(buffer_size, flags) | |
def raw_send(self, data, flags=0): | |
self._conn.send(data, flags) | |
def write(self, data, msg_type, **options): | |
self._writer.write(data, msg_type, **options) | |
def read(self, source=None, **options) -> QMessage: | |
return self._reader.read(source, **options) | |
class MockServer: | |
def __init__(self, host: str, port: int, stopper: threading.Event, encoding: str = "latin-1", tick_interval_seconds: int = 1): | |
self._host = host | |
self._port = port | |
self._stopper = stopper | |
self._encoding = encoding | |
self._tick_interval_seconds = tick_interval_seconds | |
self._options = MetaData(**CONVERSION_OPTIONS.union_dict()) | |
def set_encoding(self, encoding: str): | |
self._encoding = encoding | |
def set_tick_interval_seconds(self, interval: int): | |
self._tick_interval_seconds = interval | |
def get_table_name(self): | |
return b"trade" | |
def get_mock_tick(self): | |
columns = qlist(['sym', 'ask'], qtype=QSYMBOL_LIST) | |
return qtable(columns, [qlist(["USDJPY"], qtype=QSYMBOL_LIST), qlist([154.9], qtype=QFLOAT_LIST)]) | |
def stop(self): | |
self._stopper.set() | |
def run(self): | |
with (socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s): | |
s.bind((self._host, self._port)) | |
s.listen() | |
conn, addr = s.accept() | |
client_conn = ClientConnection(self._encoding, conn, addr) | |
try: | |
print("Server: Connected to a client") | |
response = client_conn.raw_recv(99) | |
ipc_version = response[len(response) - 2] if len(response) > 1 else 0 | |
client_conn.raw_send(struct.pack("B", ipc_version)) | |
client_conn.initialize(ipc_version) | |
while not self._stopper.is_set(): | |
response = client_conn.read(**self._options.union_dict()) | |
if response.data[0].decode(self._encoding) == ".u.sub": | |
print(f"Client subscribed to table {response.data[1].decode(self._encoding)}") | |
table = self.get_mock_tick() | |
client_conn.write([self.get_table_name(), table], MessageType.RESPONSE, **self._options.union_dict()) | |
break | |
while not self._stopper.is_set(): | |
table = self.get_mock_tick() | |
client_conn.write([b"upd"] + list([self.get_table_name(), table]), MessageType.ASYNC, **self._options.union_dict()) | |
self._stopper.wait(self._tick_interval_seconds) | |
finally: | |
client_conn.close() | |
class Listener: | |
def __init__(self, stopper: threading.Event, q: qconnection.QConnection): | |
self._stopper = stopper | |
self._q = q | |
def run(self): | |
while not self._stopper.is_set(): | |
try: | |
message = self._q.receive(data_only=False, raw=False) | |
if message.type != MessageType.ASYNC: | |
print('Unexpected message, expected message of type: ASYNC') | |
if isinstance(message.data, list): | |
if len(message.data) == 3 and message.data[0] == b'upd' and isinstance(message.data[2], QTable): | |
for row in message.data[2]: | |
print(row) | |
except QException as e: | |
print(e) | |
def get_free_port() -> int: | |
with socket.socket(socket.AF_INET, type=socket.SOCK_STREAM) as s: | |
s.bind(('localhost', 0)) | |
address, port = s.getsockname() | |
return port | |
def main(): | |
host = 'localhost' | |
port = get_free_port() | |
user_name = "kdbuser" | |
password = "kdbpassword" | |
stopper = threading.Event() | |
mock_server = MockServer(host, port, stopper) | |
server_thread = threading.Thread(target=mock_server.run) | |
server_thread.start() | |
try: | |
with (qconnection.QConnection(host=host, port=port, username=user_name, password=password) as q): | |
response = q.sendSync(".u.sub", np.string_('trade'), np.string_('')) | |
if isinstance(response[1], QTable): | |
print('%s table data model: %s' % (response[0], response[1].dtype)) | |
listener = Listener(stopper, q) | |
listener_thread = threading.Thread(target=listener.run) | |
listener_thread.start() | |
sys.stdin.readline() | |
finally: | |
stopper.set() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment