Created
May 19, 2011 11:58
-
-
Save hogelog/980587 to your computer and use it in GitHub Desktop.
camera
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys, time | |
import pygame | |
import Image | |
import VideoCapture | |
WAIT_TIME = 3 | |
FONT_SIZE = 64 | |
def image2surface(image): | |
mode = image.mode | |
size = image.size | |
data = image.tostring() | |
return pygame.image.fromstring(data, size, mode) | |
class Camera: | |
PREVIEW_WIDTH = 640 | |
PREVIEW_HEIGHT = 480 | |
PHOTO_WIDTH = 1600 | |
PHOTO_HEIGHT = 1200 | |
def __init__(self): | |
self.preview_size = Camera.PREVIEW_WIDTH, Camera.PREVIEW_HEIGHT | |
pygame.init() | |
pygame.font.init() | |
self.screen = pygame.display.set_mode(self.preview_size) | |
pygame.display.set_caption("Camera Application") | |
pygame.display.set_icon(pygame.image.load("camera.png")) | |
self.ORIGIN = (0, 0) | |
self.update_time = time.time() | |
self.fps = 0.0 | |
self.camera = VideoCapture.Device() | |
self.camera.setResolution(Camera.PHOTO_WIDTH, Camera.PHOTO_HEIGHT) | |
fontname = pygame.font.get_default_font() | |
self.font = pygame.font.Font(fontname, FONT_SIZE) | |
def retrieve(self): | |
image = self.camera.getImage() | |
if image: | |
if (image.size == self.preview_size): | |
return image2surface(image) | |
newimage = image.resize(self.preview_size) | |
return image2surface(newimage) | |
def event(self): | |
return pygame.event.get() | |
def update(self, str=None): | |
surface = self.retrieve() | |
self.screen.blit(surface, self.ORIGIN) | |
if str: | |
text = self.font.render(str, False, (255, 255, 255)) | |
x = (Camera.PREVIEW_WIDTH - text.get_width()) / 2 | |
y = (Camera.PREVIEW_HEIGHT - text.get_height()) / 2 | |
self.screen.blit(text, (x, y)) | |
pygame.display.flip() | |
now = time.time() | |
self.fps = 1 / (now - self.update_time) | |
self.update_time = now | |
return pygame.event.get() | |
def save(self, path): | |
start = time.time() | |
while True: | |
pygame.event.get() | |
t = time.time() | |
diff = t - start | |
if diff > WAIT_TIME: | |
break | |
self.update(str(int(0.99 + WAIT_TIME - diff))) | |
self.update() | |
self.camera.saveSnapshot(path) | |
print("saved: %s" % path) | |
if __name__ == "__main__": | |
camera = Camera() | |
while True: | |
for event in pygame.event.get(): | |
if event.type == pygame.QUIT: | |
sys.exit() | |
elif event.type == pygame.KEYDOWN: | |
if (event.key == pygame.K_p): | |
camera.save("photo.jpg") | |
camera.update() | |
print("fps: %.2f" % camera.fps) | |
# vim: sw=2 ts=2 et: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
from sys import argv | |
argv.pop(0) | |
arg = " ".join(argv) | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
sock.settimeout(1) | |
try: | |
r = sock.connect(('127.0.0.1', 12345)) | |
print('%s\n' % arg) | |
sock.send('%s\n' % arg) | |
except socket.error, e: | |
print 'Error: %s' % e |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import socket | |
import re | |
import time | |
from threading import Thread, Event | |
from camera import Camera | |
import pygame | |
CMD_RE = re.compile("^/(\w+)(?: (.+))?$") | |
WAIT = 0.2 | |
class CameraServer(Thread): | |
def __init__(self, port): | |
self.events = { | |
"photo": Event(), | |
"save": Event(), | |
"quit": Event() | |
} | |
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
self.sock.bind(("localhost", port)) | |
self.sock.listen(1) | |
print 'listen port %s' % port | |
Thread.__init__(self) | |
def event_set(self, name, arg): | |
if name in self.events and self.events[name]: | |
self.events[name].arg = arg | |
self.events[name].set() | |
def event_clear(self, name): | |
if name in self.events and self.events[name]: | |
self.events[name].clear() | |
def event(self, name): | |
if name in self.events and self.events[name].isSet(): | |
return self.events[name] | |
def run(self): | |
while not self.event("quit"): | |
time.sleep(WAIT) | |
connect, address = self.sock.accept() | |
line = connect.recv(8192).rstrip() | |
match = CMD_RE.search(line) | |
if match: | |
name = match.group(1) | |
arg = match.group(2) | |
self.event_set(name, arg) | |
connect.close() | |
def main(port): | |
server = CameraServer(port) | |
server.start() | |
camera = Camera() | |
while server.isAlive(): | |
time.sleep(WAIT) | |
for event in camera.update(): | |
if event.type == pygame.KEYDOWN: | |
if event.key == pygame.K_ESCAPE: | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
sock.connect(("localhost", port)) | |
sock.send("/quit") | |
server.join() | |
sys.exit() | |
elif event.key == pygame.K_p: | |
path = "%s.jpg" % time.strftime("%Y%m%d-%H%M%S") | |
server.event_set("save", path) | |
if server.event("save"): | |
camera.save(server.event("save").arg) | |
server.event_clear("save") | |
if __name__ == "__main__": | |
if len(sys.argv) >= 2: | |
main(int(sys.argv[1])) | |
else: | |
main(12345) | |
# vim: sw=2 ts=2 et: |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment