Skip to content

Instantly share code, notes, and snippets.

@nuno-andre
Created November 16, 2017 01:21
Show Gist options
  • Save nuno-andre/f06e3adc52fbdabe0e3f1eda35cc921a to your computer and use it in GitHub Desktop.
Save nuno-andre/f06e3adc52fbdabe0e3f1eda35cc921a to your computer and use it in GitHub Desktop.
Sanic + MessagePack
import requests
from json import dumps, loads
from msgpack import packb, unpackb
data = {'say': 'hello, world!'}
headers_msgpack = {'Content-Type': 'application/msgpack'}
headers_json = {'Content-Type': 'application/json'}
r = requests.post('http://localhost:8000/', data=packb(data), headers=headers_msgpack)
print(unpackb(r.content))
r = requests.post('http://localhost:8000/', data=dumps(data), headers=headers_json)
print(loads(r.content))
from sanic.response import HTTPResponse
from sanic.request import Request
from sanic.exceptions import InvalidUsage
from msgpack import packb, unpackb
MSGPACK_MIME_TYPE = 'application/msgpack'
# https://github.com/msgpack/msgpack/issues/194
class MsgPackRequest(Request):
__slots__ = ('parsed_msgpack',)
def __init__(self, url_bytes, headers, version, method, transport):
self.parsed_msgpack = None
super().__init__(url_bytes, headers, version, method, transport)
@property
def msgpack(self):
if self.parsed_msgpack is None:
try:
self.parsed_msgpack = unpackb(self.body)
except Exception:
raise InvalidUsage('Failed when parsing body as msgpack')
return self.parsed_msgpack
def msgpack(body, status=200, headers=None,
content_type=MSGPACK_MIME_TYPE, **kwargs):
"""
Returns response object with body in MessagePack format.
:param body: Response data.
:param status: Response code.
:param headers: Custom Headers.
:param content_type: the content type (string) of the response.
:param kwargs: Remaining arguments that are passed to the msgpack encoder.
"""
return HTTPResponse(body_bytes=packb(body, **kwargs), status=status,
headers=headers, content_type=content_type)
from sanic import Sanic
from sanic import response
from sanic_msgpack import MsgPackRequest, msgpack
response.msgpack = msgpack
app = Sanic(request_class=MsgPackRequest)
@app.route("/", methods=['POST'])
async def test(request):
if request.content_type == 'application/msgpack':
return response.msgpack({'say': 'hello, msgpack!'})
elif request.content_type == 'application/json':
return response.json({'say': 'hello, json!'})
if __name__ == "__main__":
app.run(host='localhost', port=8000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment