Created
April 2, 2011 18:02
-
-
Save raggi/899708 to your computer and use it in GitHub Desktop.
An example to assist someone asking for help on the EM mailinglist. An eventmachine connection class that writes periodically to file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class DiskWriter < EM::Connection | |
# A random stab in the dark at somethign that's loosely efficient. Assuming | |
# you have a larger page size than 4kb, you'll probably still want ruby to | |
# write a ton of pages at a time to anything resembling a spindle. On most | |
# systems, this will default to 64kb. It's possible you may get better | |
# performance going much higher. If you end up GC bound, or other operations | |
# are causing leak like behavior, then you may find higher tunings become | |
# less efficient in production over time. It is also worth noting that some | |
# versions ::Queue do not free their buffer pages. | |
DEFAULT_THRESHOLD = 16384 * `getconf PAGESIZE`.to_i | |
def initialize(path, threshold = DEFAULT_THRESHOLD) | |
@path = path | |
@threshold = threshold | |
@queue = Queue.new | |
@writer = Thread.new { writer } | |
# Whilst simpler, the following is not always a good idea, instead you may | |
# want to poll the thread with a periodic timer, and call .join(0.0001), | |
# in order to bubble any errors into the reactor. If you want to prevent | |
# them from killing the reactor, handle them there. | |
# You could alternatively, check in debuff or something. | |
# @writer.abort_on_exception = true | |
@buffer = [] | |
@bytesize = 0 | |
end | |
def recieve_data(data) | |
@buffer << data | |
@bytesize += data.size | |
if @bytesize >= @threshold | |
@queue << debuff | |
end | |
end | |
def unbind | |
@queue << debuff | |
@queue << nil # Stops the writer thread | |
@writer.join # Bubbles errors back into main thread | |
end | |
# runs in a background thread | |
def writer | |
@file = File.new(path) | |
while s = @queue.pop | |
@file.write s | |
end | |
ensure | |
@file.close | |
end | |
def debuff | |
s = @buffer.join | |
@buffer.clear | |
@bytesize = 0 | |
s | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment