Last active
July 22, 2024 19:54
-
-
Save arpruss/4059c36b0e45ddd66b36a2bab57bd426 to your computer and use it in GitHub Desktop.
Snap photos with Sony API when there are significant differences in the preview
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from libsonyapi.camera import Camera | |
from libsonyapi.actions import Actions | |
import urllib.request | |
import cv2 | |
import numpy as np | |
import time | |
import sys | |
DISPLAY = False | |
COMPARISON_DELAY = 3 | |
TIME_STEP = 0.25 | |
SHOT_SPACING = 1.0 | |
KERNEL_SIZE = 0.042 | |
WINDOW_NAME = "autophoto" | |
THRESHOLD = 25 | |
COMPARISON_VALUE = 255 | |
TRIGGER_THRESHOLD = 0.1 | |
history = [] | |
lastTime = 0 | |
lastShot = 0 | |
if DISPLAY: | |
cv2.namedWindow(WINDOW_NAME, cv2.WINDOW_AUTOSIZE) | |
camera = Camera() # create camera instance | |
#camera_info = camera.info() # get camera camera_info | |
#print(camera_info) | |
print(camera.name) # print name of camera | |
#print(camera.api_version) # print api version of camera | |
#print(camera.services) | |
#print(camera.do("getAvailableApiList")['result']) | |
#camera.do(Actions.actTakePicture) # take a picture | |
liveURL = camera.do(Actions.startLiveview)['result'][0] | |
stream = urllib.request.urlopen(liveURL) | |
frame = 1 | |
buffer = bytearray() | |
lastJPEG = bytearray() | |
def snap(): | |
print("snap") | |
camera.do(Actions.actTakePicture) | |
def getHistoric(t): | |
for i in range(len(history)): | |
if history[i][0] >= t - COMPARISON_DELAY: | |
return history[i][1] | |
if len(history): | |
return history[-1][1] | |
else: | |
return None | |
def haveJPEG(jpeg): | |
global lastTime, history, lastShot | |
curTime = time.time() | |
if curTime - lastTime < TIME_STEP: | |
return | |
lastTime = curTime | |
nparr = np.frombuffer(jpeg, np.uint8) | |
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
kernelSize = 1+2 * int(img.shape[0] * KERNEL_SIZE) | |
blurred = cv2.GaussianBlur(img, (kernelSize, kernelSize), 0) | |
prev = getHistoric(curTime) | |
for i in range(len(history)): | |
if history[i][0] >= curTime - COMPARISON_DELAY: | |
del history[:i] | |
break | |
history.append((curTime,blurred)) | |
if curTime < lastShot + SHOT_SPACING: | |
return | |
if prev is not None: | |
delta = cv2.threshold(cv2.absdiff(prev, blurred), THRESHOLD, COMPARISON_VALUE, cv2.THRESH_BINARY)[1] | |
avg_color_per_row = np.average(delta, axis=0) | |
avg_color = np.average(avg_color_per_row, axis=0) | |
avg_difference = np.average(avg_color, axis=0) | |
print("difference",avg_difference) | |
if avg_difference >= TRIGGER_THRESHOLD: | |
lastShot = curTime | |
snap() | |
if DISPLAY: | |
cv2.resizeWindow(WINDOW_NAME, img.shape[1], img.shape[0]) | |
cv2.imshow(WINDOW_NAME, delta) | |
if (cv2.waitKey(1) & 0xFF) == 27 or cv2.getWindowProperty(WINDOW_NAME, 0) == -1: | |
cv2.destroyAllWindows() | |
sys.exit(0) | |
def processStream(data): | |
global buffer, lastJPEG | |
if not lastJPEG: | |
buffer += data | |
try: | |
offset = buffer.index(bytes((0xFF,0xD8))) | |
lastJPEG = buffer[offset:] | |
buffer = bytearray() | |
data = bytearray() | |
except ValueError: | |
pass | |
if lastJPEG: | |
lastJPEG += data | |
try: | |
offset = lastJPEG.index(bytes((0xFF,0xD9))) | |
buffer = lastJPEG[offset+2:] | |
haveJPEG(lastJPEG[:offset+2]) | |
lastJPEG = None | |
except ValueError: | |
pass | |
while True: | |
data = stream.read(4096) | |
processStream(bytearray(data)) | |
#fNumber = camera.do(Actions.getFNumber) | |
#print(fNumber) # prints fnumber | |
#camera.do(Actions.setFNumber, "5") # set aperture to 5 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment