Skip to content

Instantly share code, notes, and snippets.

@cloudlakecho
Created March 25, 2020 03:00
Show Gist options
  • Save cloudlakecho/e949a2770788e6a0c9c6900ef2dfcc43 to your computer and use it in GitHub Desktop.
Save cloudlakecho/e949a2770788e6a0c9c6900ef2dfcc43 to your computer and use it in GitHub Desktop.
Read USB Camera in Nvidia Jetson TX2 Development Board
from PIL import Image
import sys
import os
import pdb
import urllib
from posix_ipc import *
from mmap import *
import subprocess
# Python modules
import mmap
import os
import sys
import hashlib
# 3rd party modules
import posix_ipc
# globals
ipc = {}
ipc['app2py'] = None
ipc['py2app'] = None
ipc['shm_app2py'] = None
ipc['shm_py2app'] = None
# ----- ----- ----- ----- ----- ----- ----- ----- ----- -----
def ipc_init():
global ipc
# Semaphore(name, [flags = 0, [mode = 0600, [initial_value = 0]]])
# Creates a new semaphore or opens an existing one.
ipc['app2py'] = Semaphore("/ipc_app2py", flags=O_CREAT)
ipc['py2app'] = Semaphore("/ipc_py2app", flags=O_CREAT)
# SharedMemory(name, [flags = 0, [mode = 0600, ]]])
# Creates a new shared memory segment or opens an existing one.
ipc['shm_app2py'] = SharedMemory("/ipc_shm_app2py", flags=O_CREAT,
size=SHMEM_BUF_SIZE_BYTES)
ipc['shm_py2app'] = SharedMemory("/ipc_shm_py2app", flags=O_CREAT,
size=SHMEM_BUF_SIZE_BYTES)
# ----- ----- -----
# spawn the camera server + rendering process
subprocess.Popen(['../tegra_mm_jetpack33/samples/12_camera_v4l2_cuda/camera_v4l2_cuda',
'-d', '/dev/video4'])
def ipc_pend():
global ipc
print("ipc: waiting...")
# ex) acquire([timeout = None])
# Waits (conditionally) until the semaphore's value is > 0
# and then returns, decrementing the semaphore.
ipc['app2py'].acquire()
print("ipc: got event!")
# Read at most n bytes from file descriptor fd.
# Return a string containing the bytes read.
data = os.read(ipc['shm_app2py'].fd, 300 * 300 * 5 * 3)
# Set the current position of file descriptor fd to position pos,
# modified by how: SEEK_SET or 0 to set the position relative
os.lseek(ipc['shm_app2py'].fd, 0, os.SEEK_SET)
img = Image.frombytes('RGB', (1500,300), data)
return img
def ipc_post(buf):
global ipc
print("ipc: posting...")
os.lseek(ipc['shm_py2app'].fd, 0, os.SEEK_SET)
os.write(ipc['shm_py2app'].fd, buf)
os.fsync(ipc['shm_py2app'].fd)
ipc['py2app'].release()
# ----- ----- ----- ----- ----- ----- ----- ----- ----- -----
def main():
ipc_init()
imgs = [None] * 5
while True:
image = ipc_pend();
# Loading and Preprocessing Image
py2app_response_arr = []
py2app_response_arr.append(float('nan'))
ipc_post(bytearray(np.array(py2app_response_arr, dtype=np.float32)))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment