Created
January 23, 2011 19:04
-
-
Save phillbaker/792327 to your computer and use it in GitHub Desktop.
Basic abstracted structure of this scraper.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'rubygems' | |
require 'sqlite3' | |
require 'thread' | |
require 'digest/md5' | |
require 'lib/ThreadPool.rb' | |
#main thread that hits a sqlite db | |
@db = SQLite3::Database.new('test_thread.sqlite')#SQLite3::Database.new(":memory:") | |
#create simple table in db | |
@db.execute_batch('create table `table` (id integer primary key autoincrement, value text)') | |
@threads = ThreadPooling::ThreadPool.new(10) | |
#@threads.debug = true | |
#throw everything that needs to be written into this thread-safe queue | |
@work_queue = Queue.new | |
#store our results here | |
@results = {} #see http://stackoverflow.com/questions/1080993/pure-ruby-concurrent-hash/1081604#1081604 at the bottom; this might not need to be a concurrent hash | |
working = true | |
#centralize our read/writes to the db | |
Thread.new do #@threads.dispatch do | |
db = SQLite3::Database.open('test_thread.sqlite') | |
while working do #keep this thread running until we're done; in the real implementation, this would be forever... | |
#this works because even if we turn off working, we'll have stuff queued and we'll loop in the inner loop until the queue is cleared | |
until @work_queue.empty? do | |
val, key = @work_queue.pop() | |
db.execute("insert into `table` (value) values ('#{val}')") | |
#the value to reference the written value at | |
if key | |
id = db.get_first_value("SELECT last_insert_rowid()").to_s | |
@results[key] = id #only do this if we need to return it | |
end | |
end | |
end | |
db.close unless db.closed? | |
#puts 'end' | |
end | |
start = Time.now.to_i | |
times = [] | |
repeats = 1001 | |
repeats.times do |i| | |
beg = Time.now.to_i | |
key = Digest::MD5.hexdigest(Time.now.to_i.to_s + i.to_s) | |
@work_queue << [i.to_s, key] #the kick starting value | |
#we need the id that we inserted this at; maybe put it on the queue with a unique id (hash of unix time? + value to be written) and then wait until that id gets passed back? | |
puts 'pushed work: ' + i.to_s if i % 100 == 0 | |
3.times do |j| | |
@threads.dispatch do | |
k = j | |
begin | |
#my_key = key.dup #keep local, threaded copy of key | |
#wait here for our id to come back | |
#could do another queue; with the db_worker pushing the ids on, and then just popping/pushing until we find the one we're looking for - performance would suck! we need something non-linear like a hash | |
loop do | |
break if @results[key] | |
end | |
#simulate work for sometime less than a second | |
sleep(rand(3)) #simulate a long running task | |
raise Exception.new("in worker #{i} - #{j}") if rand(10) == 1 | |
@work_queue << ["#{i} - #{j}"] | |
#puts 'finish worker' | |
rescue Exception => e | |
#puts e | |
print '.' | |
ensure | |
print "\n" if j == 1000 | |
end | |
end | |
end | |
#we can only do the below if all dependent threads on that id are done! | |
#TODO @results[my_key] = nil #keep the hash from growing infinitely; put this in and we finish queuing much quicker, but never finish... | |
times << Time.now.to_i - beg | |
end | |
puts 'queuing done in ' + (Time.now.to_i - start).to_s + 's' | |
loop do | |
break if @work_queue.empty? #@threads.queue.empty? | |
end | |
puts 'writing done in ' + (Time.now.to_i - start).to_s + 's' | |
puts 'size: ' + @threads.queue.size.to_s | |
working = false | |
@threads.join | |
puts 'working done in ' + (Time.now.to_i - start).to_s + 's' | |
sleep 10 #TODO we're losing some values, it may not matter in the broader scheme of things since we're daemonizing... | |
puts 'count: ' + @db.get_first_value("SELECT count(*) from `table`") + ' avg: ' + (times.inject(0){|s,o| s+= o }.to_f/repeats).to_s | |
@db.close unless @db.closed? |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# This file is part of ThreadPool, a jruby or ruby-based | |
# thread pool manager. | |
# Copyright (C) 2009,2010 Daniel Bush | |
# This program is distributed under the terms of the MIT | |
# license. | |
# A copy of the license is | |
# enclosed with this project in the file LICENSE. | |
# | |
# | |
module ThreadPooling | |
# A class containing an internal Queue and pool of threads. | |
# | |
# ThreadPool uses a 'dispatch' method with a block for putting jobs on | |
# the queue to be processed asynchronously: | |
# | |
# tp = ThreadPool.new(5) # Create 5 threads | |
# tp.dispatch do | |
# ... your task ... | |
# end | |
# | |
# Or lambdas | |
# | |
# func = lambda { ... your task ... } | |
# tp.dispatch func | |
# | |
# In fact, any object that responds to 'call' should be ok. | |
class ThreadPool | |
require 'thread' | |
attr_reader :threads , :thread_count, :queue | |
attr_writer :debug | |
# Initialize a ThreadPool instance with 'num' number | |
# of threads. | |
def initialize num=1 | |
@thread_count=0 | |
@threads=[] | |
# Other option is to use ThreadGroup. | |
@queue = Queue.new | |
@mutex = Mutex.new | |
# Private mutex. | |
self.increment(num) | |
require 'logger' | |
@logger = Logger.new('log/pool.log') | |
end | |
def debug msg | |
@mutex.synchronize do | |
puts msg | |
end | |
end | |
# Add threads to the pool | |
def increment num=1 | |
num.times do | |
@mutex.synchronize do | |
@threads.push( | |
Thread.new do | |
loop do | |
item = @queue.pop | |
#print out a bit mask of whether each thread in the pull is currently active/dead/etc. | |
#sleep, run, aborting, false (terminated normally), nil (terminated with exception) | |
#look at inspect() of each thread | |
#bit mask of threads running | |
puts @queue.size.to_s + ' ' + @threads.collect{|o| o.status.to_s[0..0] }.to_s | |
#bit mask of threads sleeping | |
#bit maks of threads dead | |
#@logger.info 'calling - queue size: ' + @queue.size.to_s | |
#begin | |
case item | |
when Array | |
item[0].call(*item[1]) | |
# item[0] should be lambda; | |
# item[1] should be its args. | |
else | |
item.call | |
end | |
#rescue Exception => e | |
# puts 'Exception in caller: ' + e.to_s | |
# puts e.backtrace | |
#end | |
end | |
end | |
) | |
end | |
end | |
@thread_count+=num | |
end | |
# Remove threads from the pool | |
def decrement num=1 | |
num=@thread_count if num>@thread_count | |
num.times do | |
debug "Dispatching termination command" if @debug | |
self.dispatch do | |
@mutex.synchronize do | |
@threads.delete(Thread.current) | |
end | |
debug "Deleting thread #{Thread.current}" if @debug | |
Thread.current.exit | |
end | |
end | |
@thread_count-=num | |
end | |
# The thread that calls this will block until | |
# the threads in @threads have finished. | |
# These threads will be terminated and the thread | |
# pool emptied. | |
def join | |
[email protected] | |
# Taking a copy here is really important! | |
self.decrement @thread_count | |
# Stop the threads or else suffer a deadlock. | |
threads.each do |t| | |
debug "joining thread #{t}" if @debug | |
t.join | |
end | |
end | |
# Dispatch jobs asynchronously. | |
def dispatch func=nil , args=nil , &block | |
if func.nil? | |
raise "Must be called with a block or lambda." unless block_given? | |
else | |
if args.nil? | |
@queue << func | |
else | |
@queue << [func,args] | |
end | |
end | |
if block_given? | |
@queue << block | |
#puts 'thread queue: ' + @queue.size.to_s | |
end | |
end | |
end | |
# A Queue that contains its own thread and which | |
# dispatches jobs synchronously. | |
# | |
# Use it like: | |
# | |
# sq = SyncQueue.new | |
# sq.dispatch do | |
# ... your task ... | |
# end | |
# | |
# Or | |
# | |
# sq.dispatch lambda { ... your task ... } | |
# | |
# Or | |
# | |
# sq.push lambda { ... your task ... } | |
class SyncQueue < Queue | |
def initialize | |
@processing=false | |
@stopping=false | |
@running=false | |
super | |
start | |
end | |
# True if 'stop' has been called but we haven't | |
# terminated yet. | |
def stopping? | |
@stopping | |
end | |
# True if the SyncQueue is no longer | |
# running. The thread for this queue is | |
# not in the middle of processing anything. | |
# The queue should be empty. | |
# See #terminate . | |
def stopped? | |
!@running && !@stopping && !@processing | |
end | |
# Don't process any more jobs but | |
# the current one; then stop the thread. | |
# Remaining jobs are removed from the queue | |
# and returned | |
def terminate | |
@running=false | |
@stopping=false | |
@left=[] | |
while self.size>0 | |
@left.push self.pop | |
end | |
self << lambda{} | |
# Pass a blank function to unblock | |
# the thread so it can die. | |
@left | |
end | |
# Stop the thread, but allow it to finish | |
# processing the queue. | |
# The queue goes into a special state | |
# where it will throw an error if you try | |
# to add to the queue. | |
# The last job will terminate, allowing | |
# the queue to be added to at a later time. | |
# SyncQueue#stop is used by SyncQueue#join. | |
def stop | |
@stopping=true | |
self << lambda{ self.terminate } | |
# Pass a terminate function as final | |
# function on queue. Will unblock thread | |
# if not doing anything. | |
end | |
# True if the SyncQueue instance is not terminated | |
# or in a stopping state. | |
def running? | |
@running && !@stopping | |
end | |
# Fires up a new thread to process the queue. | |
# | |
# This method is automatically called when you | |
# instantiate. | |
# | |
# Using it to restart an existing SyncQueue instance | |
# has not been fully tested yet. Currently, it | |
# will call SyncQueue#join and go into a stopping | |
# state before starting up a new thread. | |
def start | |
self.join if @running | |
@running=true | |
@thread = Thread.new do | |
while @running | |
block=self.pop | |
@processing=true | |
block.call | |
@processing=false | |
end | |
end | |
end | |
# Dispatch jobs synchronously. | |
def dispatch func=nil , &block | |
if block_given? | |
self << func unless func.nil? | |
self << block | |
else | |
raise "Must be called with a block." if func.nil? | |
self << func | |
end | |
end | |
# Thread calling this will wait for @thread to | |
# finish all queued jobs and terminate @thread. | |
def join | |
self.stop | |
# Stop the thread or else suffer a deadlock. | |
@thread.join | |
end | |
# Push blocks onto the queue. | |
# | |
# Raise an error if this queue is in a stopping | |
# state caused by calling SyncQueue#stop. | |
# Note that enq and << are aliases for 'push'. | |
def push block | |
if @stopping | |
raise "This SyncQueue has been put into a stopping state using ThreadPool::SyncQueue#stop." | |
end | |
super | |
end | |
end | |
end |
Okay, above might work. In order to get the id that we inserted the initial value at; maybe put it on the queue with a unique id (hash of unix time?) and then wait until that id gets passed back?
Okay...well now it's hanging. Run it and check the print statements, the loop do kills it...
Two new problems:
- It's slow
- We're not getting everything written to the database
without dispatch()
$ rm test_thread.sqlite && ruby test_structure.rb
pushed work: 0
queuing done in 33s
working done in 33s
count: 400 avg: 0.33
with dispatch
$ rm test_thread.sqlite && ruby test_structure.rb
pushed work: 0
pushed work: 100
queuing done in 9s
working done in 9s
count: 400 avg: 0.0891089108910891
So...working better, I think. But now it won't complete...
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Tried calling close() on the handles passed to the workers, but that doesn't work either.