Skip to content

Instantly share code, notes, and snippets.

@wmalarski
Created October 8, 2019 19:23
Show Gist options
  • Save wmalarski/f55f376c6a0153de5cc0004a805f1e72 to your computer and use it in GitHub Desktop.
Save wmalarski/f55f376c6a0153de5cc0004a805f1e72 to your computer and use it in GitHub Desktop.
import base64
from aiohttp import web
import socketio
import cv2
# creates a new Async Socket IO Server
sio = socketio.AsyncServer(cors_allowed_origins=[])
# Creates a new Aiohttp Web Application
app = web.Application()
# Binds our Socket.IO server to our Web App
# instance
sio.attach(app)
@sio.event
async def connect(sid, environ):
print('connect ', sid)
@sio.event
async def disconnect(sid):
print('disconnect ', sid)
class Controller:
def __init__(self):
self._cap = cv2.VideoCapture('Peek 2019-05-20 20-19.mp4')
self._total_frames = self._cap.get(cv2.CAP_PROP_FRAME_COUNT)
print('self._total_frames', self._total_frames)
sio.on('message', self.print_message)
async def print_message(self, sid, message):
# When we receive a new event of type
# 'message' through a socket.io connection
# we print the socket ID and the message
print("Socket ID: ", sid)
print(message)
self._cap.set(cv2.CAP_PROP_POS_FRAMES, int(message))
ret, frame = self._cap.read()
if ret:
print(base64.b64encode(frame)[:100])
await sio.emit('message', base64.b64encode(frame))
def release(self):
self._cap.release()
if __name__ == '__main__':
controller = Controller()
web.run_app(app)
controller.release()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment