Skip to content

Instantly share code, notes, and snippets.

@kawashirov
Created February 15, 2017 13:04
Show Gist options
  • Save kawashirov/c95c16588b9ee5b0ff9904ca219ccc5d to your computer and use it in GitHub Desktop.
Save kawashirov/c95c16588b9ee5b0ff9904ca219ccc5d to your computer and use it in GitHub Desktop.
import gzip
import struct
from tornado.gen import coroutine
from tornado.iostream import BaseIOStream
from mgc2api.types import IntType, BeanType
from mgc2api.util import assert_isinstance, struct_read_from, struct_unpack_from
_MSG_BYTE_ARRAY_HEADER = b'\x05ur\x00\x00\x02[B\xac\xf3\x17\xf8\x06\x08T\xe0\x02\x00\x00xp'
class Codec(object):
"""
Codec is the thing that can read/write beans from/to BaseIOStream as bytes
"""
@coroutine
def read_stream(self, io_stream):
raise Exception('ABSTRACT')
@coroutine
def write_stream(self, io_stream, msg):
raise Exception('ABSTRACT')
class NettyCodec(Codec):
"""
This thing excepts java bytes[] object encoded using netty object codecs.
"""
@coroutine
def read_stream(self, io_stream):
assert_isinstance(io_stream, BaseIOStream)
print('Reading netty message...')
# Stage 1: reading java object data block
(msg_length,) = yield struct_read_from(io_stream, '>i')
print('It should be {} bytes length, reading...'.format(msg_length))
msg = yield io_stream.read_bytes(msg_length)
print('Got message: ', msg)
# Stage 2: decoding java byte[]
if _MSG_BYTE_ARRAY_HEADER != msg[:len(_MSG_BYTE_ARRAY_HEADER)]:
raise IOError('Message header differs from _MSG_BYTE_ARRAY_HEADER:', msg[:len(_MSG_BYTE_ARRAY_HEADER)])
# cutting of the header
msg = msg[len(_MSG_BYTE_ARRAY_HEADER):]
(msg, (array_length,)) = struct_unpack_from(msg, '>i')
if len(msg) != array_length:
raise IOError('Size of leftover data mismatch: excepted {}, got {}: {}'.format(
array_length, len(msg), repr(msg)
))
# Stage 3: decompressing
msg = gzip.decompress(msg)
# Stage 4: reading bean itself
msg_type = IntType()
msg = msg_type.read_as_javabytes(msg)
print('Decoded MSG_TYPE:', msg_type.value)
bean = BeanType()
msg = bean.read_as_javabytes_wo_prefix(msg)
print('Decoded Bean:', repr(bean))
if len(msg) > 0:
raise IOError('Left {} bytes after bean reading: {}'.format(len(msg), repr(msg)))
return bean
@coroutine
def write_stream(self, io_stream, msg):
assert_isinstance(io_stream, BaseIOStream), (type(io_stream), io_stream)
assert_isinstance(msg, BeanType)
# Stage 4: writing bean itself
buff_a = bytearray()
msg_type = msg.value['msgType']
assert_isinstance(msg_type, IntType)
msg_type.write_as_javabytes(buff_a)
msg.write_as_javabytes_wo_prefix(buff_a)
# Stage 3: compressing
buff_a = gzip.compress(buff_a)
# Stage 2 + 1: encoding java byte[], writing java object data block
buff_b = bytearray()
buff_b.extend(struct.pack('>i', len(buff_a)))
buff_b.extend(buff_a)
buff_c = bytearray()
buff_c.extend(struct.pack('>i', len(_MSG_BYTE_ARRAY_HEADER) + len(buff_b)))
buff_c.extend(_MSG_BYTE_ARRAY_HEADER)
buff_c.extend(buff_b)
yield io_stream.write(bytes(buff_c))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment