Created
February 15, 2017 13:04
-
-
Save kawashirov/c95c16588b9ee5b0ff9904ca219ccc5d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gzip | |
import struct | |
from tornado.gen import coroutine | |
from tornado.iostream import BaseIOStream | |
from mgc2api.types import IntType, BeanType | |
from mgc2api.util import assert_isinstance, struct_read_from, struct_unpack_from | |
_MSG_BYTE_ARRAY_HEADER = b'\x05ur\x00\x00\x02[B\xac\xf3\x17\xf8\x06\x08T\xe0\x02\x00\x00xp' | |
class Codec(object): | |
""" | |
Codec is the thing that can read/write beans from/to BaseIOStream as bytes | |
""" | |
@coroutine | |
def read_stream(self, io_stream): | |
raise Exception('ABSTRACT') | |
@coroutine | |
def write_stream(self, io_stream, msg): | |
raise Exception('ABSTRACT') | |
class NettyCodec(Codec): | |
""" | |
This thing excepts java bytes[] object encoded using netty object codecs. | |
""" | |
@coroutine | |
def read_stream(self, io_stream): | |
assert_isinstance(io_stream, BaseIOStream) | |
print('Reading netty message...') | |
# Stage 1: reading java object data block | |
(msg_length,) = yield struct_read_from(io_stream, '>i') | |
print('It should be {} bytes length, reading...'.format(msg_length)) | |
msg = yield io_stream.read_bytes(msg_length) | |
print('Got message: ', msg) | |
# Stage 2: decoding java byte[] | |
if _MSG_BYTE_ARRAY_HEADER != msg[:len(_MSG_BYTE_ARRAY_HEADER)]: | |
raise IOError('Message header differs from _MSG_BYTE_ARRAY_HEADER:', msg[:len(_MSG_BYTE_ARRAY_HEADER)]) | |
# cutting of the header | |
msg = msg[len(_MSG_BYTE_ARRAY_HEADER):] | |
(msg, (array_length,)) = struct_unpack_from(msg, '>i') | |
if len(msg) != array_length: | |
raise IOError('Size of leftover data mismatch: excepted {}, got {}: {}'.format( | |
array_length, len(msg), repr(msg) | |
)) | |
# Stage 3: decompressing | |
msg = gzip.decompress(msg) | |
# Stage 4: reading bean itself | |
msg_type = IntType() | |
msg = msg_type.read_as_javabytes(msg) | |
print('Decoded MSG_TYPE:', msg_type.value) | |
bean = BeanType() | |
msg = bean.read_as_javabytes_wo_prefix(msg) | |
print('Decoded Bean:', repr(bean)) | |
if len(msg) > 0: | |
raise IOError('Left {} bytes after bean reading: {}'.format(len(msg), repr(msg))) | |
return bean | |
@coroutine | |
def write_stream(self, io_stream, msg): | |
assert_isinstance(io_stream, BaseIOStream), (type(io_stream), io_stream) | |
assert_isinstance(msg, BeanType) | |
# Stage 4: writing bean itself | |
buff_a = bytearray() | |
msg_type = msg.value['msgType'] | |
assert_isinstance(msg_type, IntType) | |
msg_type.write_as_javabytes(buff_a) | |
msg.write_as_javabytes_wo_prefix(buff_a) | |
# Stage 3: compressing | |
buff_a = gzip.compress(buff_a) | |
# Stage 2 + 1: encoding java byte[], writing java object data block | |
buff_b = bytearray() | |
buff_b.extend(struct.pack('>i', len(buff_a))) | |
buff_b.extend(buff_a) | |
buff_c = bytearray() | |
buff_c.extend(struct.pack('>i', len(_MSG_BYTE_ARRAY_HEADER) + len(buff_b))) | |
buff_c.extend(_MSG_BYTE_ARRAY_HEADER) | |
buff_c.extend(buff_b) | |
yield io_stream.write(bytes(buff_c)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment