Created
April 20, 2016 21:03
-
-
Save dexterous/516a55f47bf22bf36e020f0024f98222 to your computer and use it in GitHub Desktop.
TurkPipe, a CLI to batch MTurk jobs; copied from https://code.google.com/archive/p/turkpipe/
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
""" | |
Copyright (c) 2009-2010 Voxilate, Inc. | |
Permission is hereby granted, free of charge, to any person obtaining a copy | |
of this software and associated documentation files (the "Software"), to deal | |
in the Software without restriction, including without limitation the rights | |
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
copies of the Software, and to permit persons to whom the Software is | |
furnished to do so, subject to the following conditions: | |
The above copyright notice and this permission notice shall be included in | |
all copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
THE SOFTWARE. | |
""" | |
from boto.mturk.connection import * | |
from boto.mturk.question import * | |
from boto.mturk.price import * | |
from boto.mturk.qualification import * | |
from boto.s3 import * | |
from os.path import * | |
import sys,getopt,re,cPickle,urllib,string,time,os,uuid,mimetypes,codecs | |
import gdbm | |
from BeautifulSoup import BeautifulSoup,Tag | |
from xml.sax.saxutils import escape | |
bucketname = 'com.voxilate.mturk' | |
# redirect stdout | |
real_stdout = codecs.getwriter('utf8')(sys.stdout) | |
sys.stdout = sys.stderr | |
frame_height = 1000 | |
testmode = True | |
title = None | |
description = None | |
s3conn = Connection() | |
# Bucket try/except contributed by idm 6/22/2010 | |
try: | |
bucket = s3conn.get_bucket(bucketname) | |
except boto.exception.S3ResponseError: | |
print "The S3 bucket '%s' has been created." % (bucketname) | |
bucket = s3conn.create_bucket(bucketname) | |
SUBMIT_JS = """<script type="text/javascript" language="JavaScript"><!-- | |
// Given the ID of the assignmentId form field element, populate it | |
// with the assignmentId parameter from the URL. If no assignment ID | |
// is present, inform the worker that the HIT is being previewed. | |
function populateAssignmentID(field_id) { | |
var assignment_id_field = document.getElementById(field_id); | |
var paramstr = window.location.search.substring(1); | |
var parampairs = paramstr.split("&"); | |
for (i in parampairs) { | |
var pair = parampairs[i].split("="); | |
if (pair[0] == "assignmentId") { | |
if (pair[1] == "ASSIGNMENT_ID_NOT_AVAILABLE") { | |
return null; | |
} else { | |
assignment_id_field.value = pair[1]; | |
return pair[1]; | |
} | |
} | |
} | |
return null; | |
} | |
function verifyTurkSubmit(field_id) | |
{ | |
var assignment_id_field = document.getElementById(field_id); | |
if (assignment_id_field.value == '') | |
{ | |
alert("You must accept this HIT to work on it and submit results."); | |
return false; | |
} else | |
return true; | |
} | |
// --></script>""" | |
### | |
def usage(): | |
print 'Usage: \n' + sys.argv[0] + ' -j <# jobs> -t "<jobtitle>" -D "<job description>" -o <output> -w <wait seconds> filename.ext filename.ext' | |
print """ | |
-h, --help Print this help | |
-l, --live Send HITs to live Mturk site (if not specified, | |
use sandbox) | |
-j, --jobs= Specifies the number of concurrent jobs to send | |
for each file. | |
-o, --output= Specifies the output file for returned results. | |
Default is stdout. | |
-p, --price= Specifies the amount to pay per HIT. Default | |
is .01 USD. | |
-t, --title= Title to post for HITs. | |
-D, --description= Description (instructions) to post for HITs. | |
-k, --keywords= Keywords to use, separated by spaces. | |
-d, --duration= Number of seconds workers have to complete | |
the HIT. | |
-e, --expiration= Number of seconds before expiring unaccepted | |
HITs. Default is 300 seconds. | |
-A, --approve Approve all HITs automatically as they're completed. | |
-a, --autoapprove= Automatically approve HITs after the specified | |
number of seconds. Default is 86400. | |
-w, --wait= Wait the specified number of seconds for all HITs | |
to be completed. | |
-X, --panic Cancel all outstanding HITs. | |
""" | |
def safefn(fn): | |
#return md5.new(fn).hexdigest() | |
# TODO: strip off firs char if "C:"? | |
return fn[1:] | |
def unpickle(s): | |
return cPickle.loads(s) | |
def pickle(o): | |
return cPickle.dumps(o,2) | |
def uploadfile(fn, data=None): | |
key = Key(bucket,safefn(fn)) | |
if data: | |
key.path = fn # for content type | |
key.set_contents_from_string(data, policy='public-read') | |
else: | |
key.set_contents_from_filename(fn, policy='public-read') | |
url = key.generate_url(86400, force_http=True, query_auth=False).replace(':443','') | |
print 'Uploaded to',url | |
return url | |
def makeHTMLQuestion(fn, htmldata): | |
soup = BeautifulSoup(htmldata) | |
#add JS | |
soup.find('body')['onload'] = "populateAssignmentID('myAssignmentId')" | |
soup.find('head').insert(0, SUBMIT_JS) | |
#replace forms | |
forms = soup.findAll('form') | |
if forms: | |
for form in forms: | |
if not form.has_key('method'): | |
form['method'] = 'POST' | |
if not form.has_key('action'): | |
if testmode: | |
form['action'] = 'http://workersandbox.mturk.com/mturk/externalSubmit' | |
else: | |
form['action'] = 'http://www.mturk.com/mturk/externalSubmit' | |
if not form.has_key('onSubmit'): | |
form['onSubmit'] = "return verifyTurkSubmit('myAssignmentId');" | |
inputtag = Tag(soup,'input') | |
inputtag['type'] = 'hidden' | |
inputtag['name'] = 'assignmentId' | |
inputtag['id'] = 'myAssignmentId' | |
inputtag['value'] = '' | |
form.insert(0, inputtag) | |
mainurl = uploadfile(fn, str(soup)) | |
for sub in soup.findAll('img'): | |
# TODO | |
fn = dirname(fn) + '/' + sub['src'] | |
uploadfile(fn) | |
return ExternalQuestion(escape(mainurl), frame_height) | |
def makeSimpleQuestion(fn): | |
text = open(fn,'r').read() | |
qn_content = QuestionContent(title=title, text=description + "\n\n" + text) | |
qn = Question(content=qn_content, identifier=fn, | |
answer_spec=AnswerSpecification(FreeTextAnswer())) | |
return QuestionForm([qn]) | |
def makeBinaryContentQuestion(fn,ctype): | |
ct = ctype.split('/') | |
if ct[0] in ('image','video','audio'): | |
bin = uploadfile(fn) | |
qn_content = QuestionContent(title=title, text=description, | |
binary=bin, binary_type=ct[0], binary_subtype=ct[1], binary_alttext=title) | |
qn = Question(content=qn_content, identifier=fn, | |
answer_spec=AnswerSpecification(FreeTextAnswer())) | |
return QuestionForm([qn]) | |
else: | |
return None | |
def getQuestionForFile(fn): | |
root,ext = splitext(fn) | |
if ext in ('.html','.htm'): | |
return makeHTMLQuestion(fn, open(fn,'r').read()) | |
else: | |
content_type = mimetypes.guess_type(fn) | |
#print '%s is %s' % (fn,str(content_type)) | |
q = None | |
if content_type: | |
q = makeBinaryContentQuestion(fn,content_type[0]) | |
if not q: | |
q = makeSimpleQuestion(fn) | |
return q | |
class Job: | |
def __init__(self, key): | |
self.key = key | |
self.hitid = None | |
self.nassignments = 0 | |
self.url = None | |
self.uploads = [] | |
def makeKeywords(s): | |
return list( set(re.findall(r'[A-Za-z]{6,}',s)) ) | |
def parseDuration(s): | |
return int(s) | |
## | |
if __name__=='__main__': | |
try: | |
opts, args = getopt.getopt(sys.argv[1:], | |
"hlj:o:p:t:D:k:e:a:w:XAd:P", | |
["help", "live", "jobs=", "output=", "price=", "title=", "description=", "keywords=", "panic", "test", "expiration=", "autoapprove=", "wait=", "approve", "duration=", "partial"]) | |
except getopt.GetoptError: | |
usage() | |
sys.exit(2) | |
infiles = [] | |
outfile = None | |
nassignments = 1 | |
price = Price(0.01) | |
keywords = None | |
panic = 0 | |
expiration = 300 | |
duration = 300 | |
autoApprove = 86400 | |
timeout = 0 | |
sleepinterval = 15 | |
approve = None | |
approvalPercentage = 80 | |
turkmessage = "Thanks!" | |
partial = False | |
for opt,arg in opts: | |
if opt in ('-h','--help'): | |
usage() | |
sys.exit(2) | |
elif opt in ('--test',): | |
import doctest | |
sys.exit(doctest.testmod(verbose=True)) | |
elif opt in ('-o','--output'): | |
outfile = arg | |
elif opt in ('-j','--jobs'): | |
nassignments = int(arg) | |
elif opt in ('-l','--live'): | |
testmode = False | |
elif opt in ('-p','--price'): | |
price = Price(float(arg)) | |
elif opt in ('-t','--title'): | |
title = arg | |
elif opt in ('-D','--description'): | |
description = arg | |
elif opt in ('-k','--keywords'): | |
keywords = string.split(arg) | |
elif opt in ('-e','--expiration'): | |
expiration = parseDuration(arg) | |
elif opt in ('-a','--autoapprove'): | |
autoApprove = parseDuration(arg) | |
elif opt in ('-d','--duration'): | |
duration = parseDuration(arg) | |
elif opt in ('-w','--wait'): | |
timeout = parseDuration(arg) | |
elif opt in ('-X','--panic'): | |
panic += 1 | |
elif opt in ('-A','--approve'): | |
approve = True | |
elif opt in ('-P','--partial'): | |
partial = True | |
infiles.extend([abspath(fn) for fn in args]) | |
qualifications = Qualifications() | |
if not testmode: | |
qualifications.add(PercentAssignmentsApprovedRequirement('GreaterThan', approvalPercentage)) | |
if testmode: | |
print "You are in test mode." | |
jobsfn = expanduser('~/.turkpipe.sandbox.jobs') | |
else: | |
print "You are in LIVE MODE." | |
jobsfn = expanduser('~/.turkpipe.live.jobs') | |
jobs = None | |
def opendb(ro=False): | |
global jobs | |
while jobs == None: | |
try: | |
if ro and exists(jobsfn): | |
jobs = gdbm.open(jobsfn,'r') | |
else: | |
jobs = gdbm.open(jobsfn,'cs') | |
except gdbm.error: | |
time.sleep(1) | |
def closedb(): | |
global jobs | |
if jobs: | |
jobs.close() | |
jobs = None | |
# connect to turk | |
if testmode: | |
conn = MTurkConnection(host='mechanicalturk.sandbox.amazonaws.com') | |
else: | |
conn = MTurkConnection() | |
# PANIC! | |
if panic: | |
print "PANIC" + string.join(["!"]*panic,'') | |
opendb() | |
n = 0 | |
# iterate through all known jobs, and reject | |
key = jobs.firstkey() | |
while key: | |
job = unpickle(jobs[key]) | |
n += 1 | |
print "Cancelling HIT",job.hitid | |
conn.disable_hit(job.hitid) | |
conn.dispose_hit(job.hitid) | |
if panic>=2: | |
rs = conn.get_assignments(job.hitid) | |
for ass in rs: | |
conn.reject_assignment(ass.AssignmentId, "Sorry, I panicked.") | |
print "Rejected assignment",ass.AssignmentId | |
oldkey = key | |
key = jobs.nextkey(key) | |
del jobs[oldkey] | |
# if -XX, iterate through all HITs that aren't in the list | |
if panic>=2: | |
for hit in conn.search_hits(page_size=100): | |
n += 1 | |
print "Disabling HIT",hit.HITId | |
# disable | |
conn.disable_hit(hit.HITId) | |
conn.dispose_hit(hit.HITId) | |
# reject? | |
if panic>=2: | |
rs = conn.get_assignments(hit.HITId) | |
for ass in rs: | |
conn.reject_assignment(ass.AssignmentId, "Sorry, I panicked.") | |
print "Rejected assignment",ass.AssignmentId | |
try: | |
del jobs[hit.RequesterAnnotation] | |
except KeyError: | |
print "-- was not in database" | |
if n==0: | |
print "Huh, there was nothing to panic about." | |
elif panic<2: | |
print "Run with -XX to super-panic and reject *all* HITs." | |
closedb() | |
sys.exit(0) | |
# if no input files, show some info | |
if len(infiles)==0: | |
opendb(True) | |
print 'Funds remaining:',conn.get_account_balance() | |
print 'There are',len(jobs),'jobs active.' | |
key = jobs.firstkey() | |
while key: | |
job = unpickle(jobs[key]) | |
print job.hitid+'\t'+job.key | |
key = jobs.nextkey(key) | |
sys.exit(3) | |
# create HITs | |
opendb() | |
for fn in infiles: | |
key = fn | |
if not jobs.has_key(key): | |
if not exists(key): | |
print "%s: No file was found." % key | |
sys.exit(2) | |
else: | |
job = Job(key) | |
if not title: | |
print "To create a HIT, you really should have a title (-t)." | |
sys.exit(1) | |
if not description: | |
print "To create a HIT, you really should have a description (-D)." | |
sys.exit(1) | |
#if not keywords: | |
# keywords = makeKeywords(title + " " + description) | |
question = getQuestionForFile(key) | |
rs = conn.create_hit( | |
question=question, | |
title=title, | |
description=description, | |
keywords=keywords, | |
reward=price, | |
max_assignments=nassignments, | |
annotation=key, | |
duration=duration, | |
qualifications=qualifications | |
) | |
for hit in rs: | |
# TODO: if not created properly? | |
if not hasattr(hit,'HITId'): | |
print "%s: Could not create HIT." % key | |
sys.exit(5) | |
print '%s: Created HIT %s.' % (key,hit.HITId) | |
job.hitid = hit.HITId | |
job.nassignments = nassignments | |
# TODO: http://code.google.com/p/boto/issues/detail?id=275 | |
jobs[key] = pickle(job) | |
closedb() | |
# check on status of previously created HITs | |
time0 = time.time() | |
done = False | |
while not done: | |
# parse input files, create jobs if necc | |
completedHits = [] | |
for fn in infiles: | |
# get lock | |
opendb() | |
key = fn | |
job = unpickle(jobs[key]) | |
## extend hit? | |
if nassignments > job.nassignments: | |
conn.extend_hit(job.hitid, | |
assignments_increment=nassignments-job.nassignments, | |
expiration_increment=None) | |
job.nassignments = nassignments | |
jobs[job.key] = pickle(job) | |
## return assignments? | |
rs = conn.get_assignments(hit_id=job.hitid) | |
hit = conn.get_hit(hit_id=job.hitid) | |
hit = hit[0] | |
hitstatus = hit.HITStatus | |
print '%s: %d/%d assignments completed.' % (job.key, len(rs), job.nassignments) | |
if len(rs) >= job.nassignments: # TODO: or (hitstatus == 'Unassignable' and partial): | |
completedHits.append((hit,rs)) | |
# give up lock | |
closedb() | |
# completed? | |
if len(completedHits) == len(infiles): | |
break | |
# result == 0 when we are successful | |
td = timeout - (time.time() - time0) | |
if td < 0: | |
result = 1 | |
if timeout > 0: | |
print 'Timed out.' | |
else: | |
print 'Use -w <timeout> to wait for this assignment to complete.' | |
break | |
else: | |
time.sleep(min(sleepinterval, td)) | |
# successful if all assignments completed | |
if outfile: | |
real_stdout = codecs.getwriter('utf8')(open(outfile,'w')) | |
if len(completedHits) == len(infiles): | |
opendb() | |
for hit,rs in completedHits: | |
key = hit.RequesterAnnotation | |
# print assignments | |
for ass in rs: | |
for a in ass.answers: | |
for b in a: | |
i = 0 | |
# TODO: unicode | |
for k,v in b.fields: | |
if i>0: | |
real_stdout.write(',') | |
i += 1 | |
real_stdout.write(unicode(v)) | |
real_stdout.write('\n') | |
# approve assignments? | |
if approve: | |
for ass in rs: | |
conn.approve_assignment(ass.AssignmentId,turkmessage) | |
# remove it from our DB | |
del jobs[key] | |
print '%s: All assignments approved.' % key | |
closedb() | |
# flush jobs and exit | |
real_stdout.close() | |
sys.exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment