Created
August 1, 2017 15:42
-
-
Save dlech/6877e0d3f96ba563b2ca5dfded651d48 to your computer and use it in GitHub Desktop.
NXTCam5 streaming (uses unreleased GRX library, so won't work unless you compile GRX yourself)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import fcntl | |
import os | |
import gi | |
gi.require_version('GLib', '2.0') | |
from gi.repository import GLib | |
gi.require_version('Grx', '3.0') | |
from gi.repository import Grx | |
PORT = '/dev/ttyACM0' | |
class App(Grx.Application): | |
_r = None | |
_w = None | |
def __init__(self): | |
super(Grx.Application, self).__init__() | |
self.init() | |
self.hold() | |
def do_activate(self): | |
# super important! | |
os.system('stty --file={} raw'.format(PORT)) | |
# using 'rb+' gives an error because file is not seekable | |
# so using separate read and write file handles | |
self._r = open(PORT, 'rb', buffering=0) | |
self._w = open(PORT, 'wb', buffering=0) | |
# drain any old data | |
fd = self._r.fileno() | |
orig_fl = fcntl.fcntl(fd, fcntl.F_GETFL) | |
fcntl.fcntl(fd, fcntl.F_SETFL, orig_fl | os.O_NONBLOCK) | |
while self._r.read(1): | |
pass | |
fcntl.fcntl(fd, fcntl.F_SETFL, orig_fl) | |
GLib.timeout_add(100, self._update) | |
def _update(self): | |
self._w.write(b's') | |
size = int(self._r.readline()) | |
# doesn't seem to be able to read the data all at once | |
# so we have to keep reading until we get all of the data | |
# e.g. `data = self._r.read(size)` should work but doesn't | |
data = b'' | |
remaining = size | |
while remaining > 0: | |
d = self._r.read(remaining) | |
data += d | |
remaining -= len(d) | |
# print(size, len(data)) | |
try: | |
Grx.get_screen_context().load_from_jpeg_data(data, 1) | |
except: | |
pass | |
return GLib.SOURCE_CONTINUE | |
def do_event(self, event): | |
if Grx.Application.do_event(self, event): | |
return True | |
if event.type == Grx.EventType.TOUCH_DOWN: | |
self.quit() | |
return True | |
return False | |
if __name__ == '__main__': | |
GLib.set_prgname('camstream.py') | |
GLib.set_application_name('NXTCam5 streaming test') | |
app = App() | |
app.run() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pyb | |
import sensor | |
import time | |
sensor.reset() | |
sensor.set_pixformat(sensor.RGB565) | |
sensor.set_framesize(sensor.QVGA) | |
sensor.skip_frames(30) | |
clock = time.clock() | |
first_tick = False | |
usb = pyb.USB_VCP() | |
while True: | |
c = usb.recv(1) | |
if c == b's': | |
if not first_tick: | |
clock.tick() | |
first_tick = True | |
s = sensor.snapshot() | |
s.draw_string(10, 10, '{} FPS'.format(clock.fps()), color=(0, 0, 0)) | |
clock.tick() | |
s.compress() | |
usb.write(str(s.size())) | |
usb.write('\n') | |
usb.send(s) |
Would it be possible to communicate using the regular cables that connect the NXTCam-v5 to one of the input ports of the EV3 instead of communication via USB and if so, how would I have to change the code above to achieve this?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
camstream.py
runs on the host system.main.py
runs on the NXTCam.