Created
March 25, 2020 03:00
-
-
Save cloudlakecho/e949a2770788e6a0c9c6900ef2dfcc43 to your computer and use it in GitHub Desktop.
Read USB Camera in Nvidia Jetson TX2 Development Board
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from PIL import Image | |
import sys | |
import os | |
import pdb | |
import urllib | |
from posix_ipc import * | |
from mmap import * | |
import subprocess | |
# Python modules | |
import mmap | |
import os | |
import sys | |
import hashlib | |
# 3rd party modules | |
import posix_ipc | |
# globals | |
ipc = {} | |
ipc['app2py'] = None | |
ipc['py2app'] = None | |
ipc['shm_app2py'] = None | |
ipc['shm_py2app'] = None | |
# ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- | |
def ipc_init(): | |
global ipc | |
# Semaphore(name, [flags = 0, [mode = 0600, [initial_value = 0]]]) | |
# Creates a new semaphore or opens an existing one. | |
ipc['app2py'] = Semaphore("/ipc_app2py", flags=O_CREAT) | |
ipc['py2app'] = Semaphore("/ipc_py2app", flags=O_CREAT) | |
# SharedMemory(name, [flags = 0, [mode = 0600, ]]]) | |
# Creates a new shared memory segment or opens an existing one. | |
ipc['shm_app2py'] = SharedMemory("/ipc_shm_app2py", flags=O_CREAT, | |
size=SHMEM_BUF_SIZE_BYTES) | |
ipc['shm_py2app'] = SharedMemory("/ipc_shm_py2app", flags=O_CREAT, | |
size=SHMEM_BUF_SIZE_BYTES) | |
# ----- ----- ----- | |
# spawn the camera server + rendering process | |
subprocess.Popen(['../tegra_mm_jetpack33/samples/12_camera_v4l2_cuda/camera_v4l2_cuda', | |
'-d', '/dev/video4']) | |
def ipc_pend(): | |
global ipc | |
print("ipc: waiting...") | |
# ex) acquire([timeout = None]) | |
# Waits (conditionally) until the semaphore's value is > 0 | |
# and then returns, decrementing the semaphore. | |
ipc['app2py'].acquire() | |
print("ipc: got event!") | |
# Read at most n bytes from file descriptor fd. | |
# Return a string containing the bytes read. | |
data = os.read(ipc['shm_app2py'].fd, 300 * 300 * 5 * 3) | |
# Set the current position of file descriptor fd to position pos, | |
# modified by how: SEEK_SET or 0 to set the position relative | |
os.lseek(ipc['shm_app2py'].fd, 0, os.SEEK_SET) | |
img = Image.frombytes('RGB', (1500,300), data) | |
return img | |
def ipc_post(buf): | |
global ipc | |
print("ipc: posting...") | |
os.lseek(ipc['shm_py2app'].fd, 0, os.SEEK_SET) | |
os.write(ipc['shm_py2app'].fd, buf) | |
os.fsync(ipc['shm_py2app'].fd) | |
ipc['py2app'].release() | |
# ----- ----- ----- ----- ----- ----- ----- ----- ----- ----- | |
def main(): | |
ipc_init() | |
imgs = [None] * 5 | |
while True: | |
image = ipc_pend(); | |
# Loading and Preprocessing Image | |
py2app_response_arr = [] | |
py2app_response_arr.append(float('nan')) | |
ipc_post(bytearray(np.array(py2app_response_arr, dtype=np.float32))) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment