Created
August 11, 2011 18:01
-
-
Save maleadt/1140303 to your computer and use it in GitHub Desktop.
Webcam-based power monitor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python2 | |
# | |
# Configuration | |
# | |
# System modules | |
import sys | |
import argparse # python-argparse | |
import logging | |
import time | |
# Image processing | |
import cv | |
# Calibration data | |
segment = [ | |
215, 78, # x, y | |
150, 47 ] | |
segment_offsets = [ | |
[1, -10], # bottom left | |
[10, -1], # bottom | |
[19, -10], # bottom right | |
[11, -20], # center | |
[3, -30], # top left | |
[12, -37], # top | |
[21, -28] ] # top right | |
digit_positions = [ | |
[115, 41], | |
[79, 41], | |
[43, 41] ] | |
segment_threshold = 5 | |
number_segments = [ | |
[1, 1, 1, 0, 1, 1, 1], | |
[0, 0, 1, 0, 0, 0, 1], | |
[1, 1, 0, 1, 0, 1, 1], | |
[0, 1, 1, 1, 0, 1, 1], | |
[0, 0, 1, 1, 1, 0, 1], | |
[0, 1, 1, 1, 1, 1, 0], | |
[1, 1, 1, 1, 1, 1, 0], | |
[0, 0, 1, 0, 0, 1, 1], | |
[1, 1, 1, 1, 1, 1, 1], | |
[0, 1, 1, 1, 1, 1, 1] ] | |
# | |
# Auxiliary | |
# | |
class PowerMonitor: | |
# Member data | |
logger = logging.getLogger('PowerMonitor') | |
# Constructor | |
def __init__(self): | |
# Open and configure device | |
self.capture = cv.CaptureFromCAM(-1) | |
if not self.capture: | |
raise Exception("could not open device") | |
cv.SetCaptureProperty(self.capture, cv.CV_CAP_PROP_FRAME_HEIGHT, 480) | |
cv.SetCaptureProperty(self.capture, cv.CV_CAP_PROP_FRAME_WIDTH, 640) | |
# Auxiliary | |
def _getFrame(self): | |
frame = cv.QueryFrame(self.capture) | |
if not frame: | |
raise Exception("could not get a frame") | |
cv.SaveImage("/tmp/webcam_input.jpg", frame) | |
return frame | |
def _preprocess(self, frame): | |
# Convert to grayscale | |
grayscale = cv.CreateImage(cv.GetSize(frame), cv.IPL_DEPTH_8U, 1) | |
cv.CvtColor(frame, grayscale, cv.CV_RGB2GRAY) | |
# Crop to relevant region | |
crop = cv.GetSubRect(grayscale, (segment[0], segment[1], segment[2], segment[3])) | |
# Perform a binary adaptive threshold | |
cv.AdaptiveThreshold(crop, crop, 255, cv.CV_THRESH_BINARY, cv.CV_ADAPTIVE_THRESH_MEAN_C, 53, 20) | |
#cv.SaveImage("threshold.jpg", crop) | |
return crop | |
def _recognize(self, image): | |
final_number = 0 | |
order = 0.1 | |
debug = cv.CreateImage(cv.GetSize(image), cv.IPL_DEPTH_8U, 3) | |
cv.CvtColor(image, debug, cv.CV_GRAY2RGB) | |
for digit in range (0, 3): | |
# Get segments | |
segments = [] | |
for segment in range(0, 7): | |
x_origin = digit_positions[digit][0] + segment_offsets[segment][0] | |
y_origin = digit_positions[digit][1] + segment_offsets[segment][1] | |
# Calculate | |
count = 0 | |
for x in range(x_origin-1, x_origin+2): | |
for y in range(y_origin-1, y_origin+2): | |
pixel = cv.Get2D(image, y, x) | |
if pixel[0] == 0: | |
count = count + 1 | |
# Save | |
segments.append(count >= segment_threshold) | |
# Debug output | |
for x in range(x_origin-1, x_origin+2): | |
for y in range(y_origin-1, y_origin+2): | |
if (count >= segment_threshold): | |
cv.Set2D(debug, y, x, cv.RGB(0, 255, 0)) | |
else: | |
cv.Set2D(debug, y, x, cv.RGB(255, 0, 0)) | |
# Calculate number | |
guess = -1 | |
for number in range(0, 10): | |
matches = 0 | |
for segment in range(0, 7): | |
if segments[segment] == number_segments[number][segment]: | |
matches = matches + 1 | |
if matches == 7: | |
guess = number | |
break; | |
if guess == -1: | |
cv.SaveImage("/tmp/webcam_segments_failure.jpg", debug) | |
raise Exception("could not guess digit %i" % digit) | |
final_number = final_number + guess * order | |
order = order * 10 | |
cv.SaveImage("/tmp/webcam_segments.jpg", debug) | |
return final_number | |
# Getters | |
def getConsumption(self): | |
# Acquire and preprocess a frame | |
frame = self._getFrame() | |
preprocessed = self._preprocess(frame) | |
# Perform the recognition | |
number = self._recognize(preprocessed) | |
return number | |
def getConsumptionRobustly(self): | |
consumption = None | |
tries = 0 | |
while consumption is None and tries < 5: | |
try: | |
consumption = self.getConsumption() | |
except Exception, err: | |
tries = tries + 1 | |
time.sleep(0.1) | |
if consumption is None: | |
raise Exception("could not fetch power consumption") | |
return consumption | |
# | |
# Main | |
# | |
def main(args): | |
# Configure logging | |
rootlogger = logging.getLogger() | |
if args.verbose: | |
rootlogger.setLevel(logging.DEBUG) | |
else: | |
rootlogger.setLevel(logging.INFO) | |
monitor = PowerMonitor() | |
# Different execution modi | |
if args.mode == "single": | |
print monitor.getConsumptionRobustly() | |
sys.exit(0) | |
elif args.mode == "watch": | |
while True: | |
print monitor.getConsumptionRobustly() | |
time.sleep(args.sleep) | |
parser = argparse.ArgumentParser(description='Power monitor.') | |
parser.add_argument('--verbose', '-v', help='print more information', action='store_true') | |
parser.add_argument('--mode', '-m', help='mode of execution', choices=['single', 'watch', 'fetch'], default='single') | |
parser.add_argument('--sleep', '-s', help='amount of time between checks', type=int, default=5) | |
main(parser.parse_args()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment