Created
March 4, 2020 18:10
-
-
Save denisb411/d19a178b0643db1eade2c124efe360d4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import os | |
import sys | |
from threading import Thread | |
from argparse import ArgumentParser | |
import io | |
from flask import Flask, render_template, Response | |
from flask import send_file | |
from pypylon import pylon | |
from pypylon import genicam | |
import cv2 | |
app = Flask(__name__) | |
__author__ = "[email protected]" | |
__date__ = "03/02/2019" | |
__version__ = "0.1" | |
__description__ = "" | |
global cam | |
class Camera(Thread): | |
def __init__(self, fps): | |
Thread.__init__(self) | |
os.environ["PYLON_CAMEMU"] = "1" | |
try: | |
cam = pylon.InstantCamera(pylon.TlFactory.GetInstance().CreateFirstDevice()) | |
cam.AcquisitionFrameRateEnable = True | |
cam.AcquisitionFrameRateAbs = fps | |
cam.Open() | |
cam.StartGrabbing(pylon.GrabStrategy_LatestImageOnly) | |
print("Using device ", cam.GetDeviceInfo().GetModelName()) | |
except genicam.GenericException as e: | |
# Error handling | |
print("An exception occurred.", e) | |
self.cam = cam | |
self.buffered_photo = None | |
self.daemon = True | |
self.start() | |
def run(self): | |
while True: | |
try: | |
grabResult = self.cam.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException) | |
frame = grabResult.Array | |
except genicam.GenericException as e: | |
print("An exception occurred.", e) | |
continue | |
ret, buffer = cv2.imencode('.jpg', frame) | |
self.buffered_photo = buffer.tobytes() | |
def frames_generator(self): | |
while True: | |
yield (b'--frame\r\n' | |
b'Content-Type: image/jpeg\r\n\r\n' + self.buffered_photo + b'\r\n') # concat frame one by one and show result | |
@app.route('/video_feed') | |
def video_feed(): | |
global cam | |
"""Video streaming route. Put this in the src attribute of an img tag.""" | |
return Response(cam.frames_generator(), | |
mimetype='multipart/x-mixed-replace; boundary=frame') | |
@app.route('/capture_photo') | |
def capture_photo(): | |
global cam | |
return send_file( | |
io.BytesIO(cam.buffered_photo), | |
mimetype='image/jpeg') | |
if __name__ == '__main__': | |
argParser = ArgumentParser(description=__description__, | |
epilog='Developed by ' + __author__ + ' in ' + __date__) | |
argParser.add_argument('--fps', dest="fps", type=str, default='8') | |
argParser.add_argument('--port', dest="port", type=str, default='8080') | |
args = argParser.parse_args() | |
global cam | |
cam = Camera(int(args.fps)) | |
app.run(host='0.0.0.0', port=args.port, debug=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment