Created
December 29, 2009 07:50
-
-
Save eric/265204 to your computer and use it in GitHub Desktop.
The important changes that fixed the memory leaks in god
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/lib/god.rb b/lib/god.rb | |
index 388179a..df39592 100644 | |
--- a/lib/god.rb | |
+++ b/lib/god.rb | |
@@ -534,6 +531,9 @@ module God | |
watches = self.pending_watches.dup | |
self.pending_watches.clear | |
self.pending_watch_states.clear | |
+ | |
+ # make sure we quit capturing when we're done | |
+ LOG.finish_capture | |
rescue Exception => e | |
# don't ever let running_load take down god | |
errors << LOG.finish_capture | |
diff --git a/lib/god/task.rb b/lib/god/task.rb | |
index f9e3a44..7985cc6 100644 | |
--- a/lib/god/task.rb | |
+++ b/lib/god/task.rb | |
@@ -299,7 +299,7 @@ module God | |
end | |
def unregister! | |
- # override if necessary | |
+ driver.shutdown | |
end | |
########################################################################### | |
diff --git a/lib/god/watch.rb b/lib/god/watch.rb | |
index 8476c8b..8b9ef38 100644 | |
--- a/lib/god/watch.rb | |
+++ b/lib/god/watch.rb | |
@@ -177,6 +177,7 @@ module God | |
def unregister! | |
God.registry.remove(@process) | |
+ super | |
end | |
end | |
diff --git a/lib/god/event_handler.rb b/lib/god/event_handler.rb | |
index c46965e..777c153 100644 | |
--- a/lib/god/event_handler.rb | |
+++ b/lib/god/event_handler.rb | |
@@ -38,16 +38,12 @@ module God | |
@@handler.register_process(pid, @@actions[pid].keys) | |
end | |
- def self.deregister(pid, event=nil) | |
+ def self.deregister(pid, event) | |
if watching_pid? pid | |
running = ::Process.kill(0, pid.to_i) rescue false | |
- if event.nil? | |
- @@actions.delete(pid) | |
- @@handler.register_process(pid, []) if running | |
- else | |
- @@actions[pid].delete(event) | |
- @@handler.register_process(pid, @@actions[pid].keys) if running | |
- end | |
+ @@actions[pid].delete(event) | |
+ @@handler.register_process(pid, @@actions[pid].keys) if running | |
+ @@actions.delete(pid) if @@actions[pid].empty? | |
end | |
end | |
diff --git a/lib/god/driver.rb b/lib/god/driver.rb | |
index e15bf56..434643b 100644 | |
--- a/lib/god/driver.rb | |
+++ b/lib/god/driver.rb | |
@@ -1,3 +1,5 @@ | |
+require 'monitor' | |
+ | |
module God | |
class TimedEvent | |
include Comparable | |
@@ -52,29 +54,24 @@ module God | |
@task.send(@name, *@args) | |
end | |
end | |
- | |
- class DriverEventQueue | |
+ | |
+ class DriverEventQueue | |
def initialize | |
@shutdown = false | |
- @waiting = [] | |
@events = [] | |
- @waiting.taint | |
+ @monitor = Monitor.new | |
+ @resource = @monitor.new_cond | |
@events.taint | |
self.taint | |
end | |
- # | |
+ # | |
# Wake any sleeping threads after setting the sentinel | |
- # | |
+ # | |
def shutdown | |
@shutdown = true | |
- begin | |
- Thread.critical = true | |
- @waiting.each do |t| | |
- t.run | |
- end | |
- ensure | |
- Thread.critical = false | |
+ @monitor.synchronize do | |
+ @resource.broadcast | |
end | |
end | |
@@ -82,47 +79,34 @@ module God | |
# Sleep until the queue has something due | |
# | |
def pop | |
- begin | |
- while (Thread.critical = true; @events.empty? or [email protected]?) | |
- @waiting.push Thread.current | |
- if @events.empty? | |
- raise ThreadError, "queue empty" if @shutdown | |
- Thread.stop | |
- else | |
- Thread.critical = false | |
- delay = @events.first.at - Time.now | |
- sleep delay if delay > 0 | |
- Thread.critical = true | |
- end | |
+ @monitor.synchronize do | |
+ if @events.empty? | |
+ raise ThreadError, "queue empty" if @shutdown | |
+ @resource.wait | |
+ else [email protected]? | |
+ delay = @events.first.at - Time.now | |
+ @resource.wait(delay) if delay > 0 | |
end | |
+ | |
@events.shift | |
- ensure | |
- Thread.critical = false | |
end | |
end | |
alias shift pop | |
alias deq pop | |
- # | |
- # Add an event to the queue, wake any waiters if what we added needs to | |
+ # | |
+ # Add an event to the queue, wake any waiters if what we added needs to | |
# happen sooner than the next pending event | |
# | |
def push(event) | |
- Thread.critical = true | |
- @events << event | |
- @events.sort! | |
- begin | |
- t = @waiting.shift if @events.first == event | |
- t.wakeup if t | |
- rescue ThreadError | |
- retry | |
- ensure | |
- Thread.critical = false | |
- end | |
- begin | |
- t.run if t | |
- rescue ThreadError | |
+ @monitor.synchronize do | |
+ @events << event | |
+ @events.sort! | |
+ | |
+ # If we've sorted the events and found the one we're adding is at | |
+ # the front, it will likely need to run before the next due date | |
+ @resource.signal if @events.first == event | |
end | |
end | |
@@ -130,7 +114,7 @@ module God | |
alias enq push | |
def empty? | |
- @que.empty? | |
+ @events.empty? | |
end | |
def clear | |
@@ -142,16 +126,12 @@ module God | |
end | |
alias size length | |
- | |
- def num_waiting | |
- @waiting.size | |
- end | |
end | |
class Driver | |
attr_reader :thread | |
- | |
+ | |
# Instantiate a new Driver and start the scheduler loop to handle events | |
# +task+ is the Task this Driver belongs to | |
# | |
@@ -176,12 +156,26 @@ module God | |
end | |
end | |
+ # Check if we're in the driver context | |
+ # | |
+ # Returns true if in driver thread | |
+ def in_driver_context? | |
+ Thread.current == @thread | |
+ end | |
+ | |
# Clear all events for this Driver | |
# | |
# Returns nothing | |
def clear_events | |
@events.clear | |
end | |
+ | |
+ # Shutdown the DriverEventQueue threads | |
+ # | |
+ # Returns nothing | |
+ def shutdown | |
+ @events.shutdown | |
+ end | |
# Queue an asynchronous message | |
# +name+ is the Symbol name of the operation |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment