Skip to content

Instantly share code, notes, and snippets.

@takeru
Created April 5, 2010 08:12
Show Gist options
  • Select an option

  • Save takeru/356142 to your computer and use it in GitHub Desktop.

Select an option

Save takeru/356142 to your computer and use it in GitHub Desktop.
GAE/JRuby has spinup problem.
First request for a instance took 10 seconds or more.
(And second request need to load Rails, tooks 10+ seconds)
ForwardSpinupFilter is deploy to before RackFilter(=RubyGAEApp).
All requests are proccessed by ForwardSpinupFilter.
And route to A or B:
A. Next filter RackFilter.
B. Call URLFetch to forward request to same URL. Like a HTTP proxy server.
If ForwardSpinupFilter is not exists, always A.
First and second request is took 10+ seconds for RailsApp.
Then, 1st and 2nd request will be routed to B.
But not user request(=spinup request, path is /spinup) allowed to wait long time.
They will be routed to A. This request took 10+ seconds. But it is not problem.
Because spinup request is not generated by user. No one wait long.
After ForwardSpinupFilter routes request to A 2 times, then RubyApp is spined-up.
After this point ForwardSpinupFilter will route all request to A.
The spinup request is generated by ForwardSpinupFilter.
When a request is routed to B, the instance need to spinup, spinup request is
generated as async URLFecth.
I want to this spinup request reached to cold instance. But it may reaches to warm
instance. While cold instance receive request, spinup request will generated.
This code (ForwardSpinupFilter.duby) is not work so good.
But it will help you understand this idea.
require 'appengine-rack'
require 'appengine-rack/java'
AppEngine::Rack.configure_app(
:application => 'fwspin',
:precompilation_enabled => true,
:sessions_enabled => true,
:version => "1")
skip_rack_servlet
AppEngine::Rack.app.resource_files.exclude :rails_excludes
ENV['RAILS_ENV'] = AppEngine::Rack.environment
#require ::File.expand_path('../config/environment', __FILE__)
#use Rails::Rack::LogTailer if ENV['RAILS_ENV'].eql? 'development'
class DeferredDispatcher
def initialize args
@args = args
end
def call env
if @runtime.nil?
@runtime = true
# 1: redirect with runtime and jruby-rack loaded
redirect_or_error(env)
elsif @rack_app.nil?
require @args[:require]
@rack_app = Object.module_eval(@args[:dispatch]).new
# 2: redirect with framework required & dispatched
redirect_or_error(env)
else
# 3: process all other requests
@rack_app.call(env)
end
end
def redirect_or_error(env)
if env['REQUEST_METHOD'].eql?('GET')
redir_url = env['REQUEST_URI'] +
(env['QUERY_STRING'].eql?('') ? '?' : '&') +
"_spinup_=#{Time.now.to_i}"
res = ::Rack::Response.new('*', 302)
res['Location'] = redir_url
res.finish
else
::Rack::Response.new('Service Unavailable', 503).finish
end
end
end
deferred_dispatcher = DeferredDispatcher.new(
:require => File.expand_path('../config/environment', __FILE__),
:dispatch => 'ActionController::Dispatcher')
map '/' do
use(JavaServletFilter, 'fwspin.ForwardSpinupFilter', {:name=>'ForwardSpinupFilter', :wildcard=>true})
use(JavaServletFilter, 'org.jruby.rack.RackFilter', {:name=>'RackFilter', :wildcard=>true})
#run ActionController::Dispatcher.new
run deferred_dispatcher
end
import javax.servlet.http.HttpServletRequest
import javax.servlet.http.HttpServletResponse
import javax.servlet.Filter
import java.util.Date
import java.util.ArrayList
import java.net.URL
import java.net.MalformedURLException
import java.io.IOException
import com.google.appengine.api.urlfetch.URLFetchService
import com.google.appengine.api.urlfetch.FetchOptions
import "FetchOptionsBuilder", "com.google.appengine.api.urlfetch.FetchOptions$Builder"
import com.google.appengine.api.urlfetch.HTTPHeader
import com.google.appengine.api.urlfetch.HTTPRequest
import com.google.appengine.api.urlfetch.HTTPResponse
import com.google.appengine.api.urlfetch.URLFetchServiceFactory
import com.google.appengine.api.urlfetch.HTTPMethod
class ForwardSpinupFilter; implements Filter
def initialize
end
def init(conf)
{:return=>void}
@counter = 0
@load_counter = 0
@spinup_path = "/spinup"
log("ForwardSpinupFilter:init")
end
def doFilter(_request, _response, chain)
{:return=>void}
request = HttpServletRequest(_request)
response = HttpServletResponse(_response)
@counter += 1
log("@counter=#{@counter} @load_counter=#{@load_counter} : "+request.getRequestURL.toString)
if 2<=@load_counter || is_spinup_url(request)
# @load_counter == 0 => load jruby
# @load_counter == 1 => load rails
# @load_counter == 2 => load app and run!
chain.doFilter(request, response)
# if success
@load_counter += 1
# else
# timeout etc. instance is bloken?
# end
nil
else
kick_spinup_url(request)
kick_spinup_url(request)
# forward request to active instance
fetched = url_fetch(request)
# copy code
response.setStatus(fetched.getResponseCode)
# copy headers
fetched.getHeaders.each do |_header|
header = HTTPHeader(_header)
response.setHeader(header.getName, header.getValue)
end
# copy body
if fetched.getContent
os = response.getOutputStream
os.write(fetched.getContent)
os.flush
end
nil
end
end
def destroy
{:return=>void}
log("destroy")
end
# http://code.google.com/intl/en/appengine/docs/java/javadoc/com/google/appengine/api/urlfetch/package-summary.html
def url_fetch(request:HttpServletRequest)
throws MalformedURLException, IOException
url = request.getRequestURL
q = request.getQueryString
if q
url.append("?").append(q)
end
fs = URLFetchServiceFactory.getURLFetchService
opt = FetchOptionsBuilder.disallowTruncate.doNotFollowRedirects.setDeadline(double(5.0))
req = HTTPRequest.new(URL.new(url.toString), HTTPMethod.valueOf(request.getMethod), opt)
copy_headers(request, req)
copy_payload(request, req)
fs.fetch(req) # HTTPResponse
end
def is_spinup_url(request:HttpServletRequest)
# log "@spinup_path: #{@spinup_path}"
# log "PathInfo : #{request.getPathInfo || '(null)'}"
# log "ContextPath : #{request.getContextPath || '(null)'}"
log "is_spinup_url:ServletPath=#{request.getServletPath || '(null)'}"
return @spinup_path.equals(request.getServletPath)
end
def kick_spinup_url(request:HttpServletRequest)
throws MalformedURLException
fs = URLFetchServiceFactory.getURLFetchService
url = "http://"+request.getServerName
url += ":#{request.getServerPort}" if 80 != request.getServerPort
url += @spinup_path
log "kick_spinup_url: #{url}"
fs.fetchAsync(URL.new(url))
end
def copy_headers(request:HttpServletRequest, req:HTTPRequest)
{:return=>void}
headers = ArrayList.new
names = request.getHeaderNames
while(names.hasMoreElements) do
name = String(names.nextElement)
values = request.getHeaders(name)
while(values.hasMoreElements) do
value = String(values.nextElement)
req.addHeader(HTTPHeader.new(name, value))
end
end
end
def copy_payload(request:HttpServletRequest, req:HTTPRequest)
throws IOException
{:return=>void}
if 1 <= request.getContentLength
is = request.getInputStream
data = byte[request.getContentLength]
off = 0
while off < data.length
size = is.read(data, off, data.length-off)
if size==-1
raise IOException.new("payload is short")
end
off += size
end
req.setPayload(data)
end
end
def log(s:String)
{:return=>void}
puts(Date.new.toString + " : " + s)
end
end
// Generated from fwspin/ForwardSpinupFilter.duby
package fwspin;
import com.google.appengine.api.urlfetch.FetchOptions;
public class ForwardSpinupFilter extends java.lang.Object implements javax.servlet.Filter {
private int counter;
private int load_counter;
private java.lang.String spinup_path;
public ForwardSpinupFilter() {
}
public void init(javax.servlet.FilterConfig conf) throws javax.servlet.ServletException {
this.counter = 0;
this.load_counter = 0;
this.spinup_path = "/spinup";
this.log("ForwardSpinupFilter:init");
}
public void doFilter(javax.servlet.ServletRequest _request, javax.servlet.ServletResponse _response, javax.servlet.FilterChain chain) throws java.io.IOException, javax.servlet.ServletException {
javax.servlet.http.HttpServletRequest request = ((javax.servlet.http.HttpServletRequest)(_request));
javax.servlet.http.HttpServletResponse response = ((javax.servlet.http.HttpServletResponse)(_response));
this.counter = (this.counter + 1);
this.log(("@counter=" + this.counter + " @load_counter=" + this.load_counter + " : " + request.getRequestURL().toString()));
boolean temp$1 = false;
boolean __xform_tmp_1 = (2 <= this.load_counter);
temp$1 = __xform_tmp_1 ? (__xform_tmp_1) : (this.is_spinup_url(request));
if (temp$1) {
chain.doFilter(request, response);
this.load_counter = (this.load_counter + 1);
}
else {
this.kick_spinup_url(request);
this.kick_spinup_url(request);
com.google.appengine.api.urlfetch.HTTPResponse fetched = this.url_fetch(request);
response.setStatus(fetched.getResponseCode());
java.util.Iterator __xform_tmp_3 = fetched.getHeaders().iterator();
label2:
while (__xform_tmp_3.hasNext()) {
java.lang.Object _header = __xform_tmp_3.next();
label3:
{
com.google.appengine.api.urlfetch.HTTPHeader header = ((com.google.appengine.api.urlfetch.HTTPHeader)(_header));
response.setHeader(header.getName(), header.getValue());
}
}
if ((fetched.getContent() != null)) {
javax.servlet.ServletOutputStream os = response.getOutputStream();
os.write(fetched.getContent());
os.flush();
}
}
}
public void destroy() {
this.log("destroy");
}
public com.google.appengine.api.urlfetch.HTTPResponse url_fetch(javax.servlet.http.HttpServletRequest request) throws java.net.MalformedURLException, java.io.IOException {
java.lang.StringBuffer url = request.getRequestURL();
java.lang.String q = request.getQueryString();
if ((q != null)) {
url.append("?").append(q);
}
com.google.appengine.api.urlfetch.URLFetchService fs = com.google.appengine.api.urlfetch.URLFetchServiceFactory.getURLFetchService();
com.google.appengine.api.urlfetch.FetchOptions opt = com.google.appengine.api.urlfetch.FetchOptions.Builder.disallowTruncate().doNotFollowRedirects().setDeadline(((double)((float)5.0)));
com.google.appengine.api.urlfetch.HTTPRequest req = new com.google.appengine.api.urlfetch.HTTPRequest(new java.net.URL(url.toString()), com.google.appengine.api.urlfetch.HTTPMethod.valueOf(request.getMethod()), opt);
this.copy_headers(request, req);
this.copy_payload(request, req);
return fs.fetch(req);
}
public boolean is_spinup_url(javax.servlet.http.HttpServletRequest request) {
java.lang.String temp$1 = null;
java.lang.String temp$2 = null;
java.lang.String __xform_tmp_2 = request.getServletPath();
temp$2 = (__xform_tmp_2 != null) ? (__xform_tmp_2) : ("(null)");
temp$1 = "is_spinup_url:ServletPath=" + temp$2;
this.log(temp$1);
return this.spinup_path.equals(request.getServletPath());
}
public java.util.concurrent.Future kick_spinup_url(javax.servlet.http.HttpServletRequest request) throws java.net.MalformedURLException {
com.google.appengine.api.urlfetch.URLFetchService fs = com.google.appengine.api.urlfetch.URLFetchServiceFactory.getURLFetchService();
java.lang.String url = ("http://" + request.getServerName());
if ((80 != request.getServerPort())) {
url = (url + ":" + request.getServerPort());
}
url = (url + this.spinup_path);
this.log("kick_spinup_url: " + url);
return fs.fetchAsync(new java.net.URL(url));
}
public void copy_headers(javax.servlet.http.HttpServletRequest request, com.google.appengine.api.urlfetch.HTTPRequest req) {
java.util.ArrayList headers = new java.util.ArrayList();
java.util.Enumeration names = request.getHeaderNames();
label1:
while (names.hasMoreElements()) {
label2:
{
java.lang.String name = ((java.lang.String)(names.nextElement()));
java.util.Enumeration values = request.getHeaders(name);
label3:
while (values.hasMoreElements()) {
label4:
{
java.lang.String value = ((java.lang.String)(values.nextElement()));
req.addHeader(new com.google.appengine.api.urlfetch.HTTPHeader(name, value));
}
}
}
}
}
public void copy_payload(javax.servlet.http.HttpServletRequest request, com.google.appengine.api.urlfetch.HTTPRequest req) throws java.io.IOException {
if ((1 <= request.getContentLength())) {
javax.servlet.ServletInputStream is = request.getInputStream();
byte[] data = new byte[request.getContentLength()];
int off = 0;
label1:
while ((off < data.length)) {
label2:
{
int size = is.read(data, off, (data.length - off));
if ((size == -1)) {
throw new java.io.IOException("payload is short");
}
off = (off + size);
}
}
req.setPayload(data);
}
}
public void log(java.lang.String s) {
java.lang.System.out.println(((new java.util.Date().toString() + " : ") + s));
}
}
#
# this script is for load instances and measure
# - spent time to load page
# - thoughput
# it works good with jruby. cruby's thread is not so good.
#
require "open-uri"
require "thread"
require "monitor"
$stdout.sync = true
class SpinupTest
def initialize
@monitor = Monitor.new
@thread_start_interval = 2.0
@url = "http://fwspin.appspot.com/sleep"
@server_sleep_sec = 0.0
@window_sec = 10.0
@results = {}
end
def sync_puts(s)
@monitor.synchronize do
puts(s)
end
end
def current_time
Time.now - @start_time
end
def record_result(status, time)
@monitor.synchronize do
window_no = (current_time / @window_sec).floor
unless @results[window_no]
@results[window_no-1][:threads] = @threads.size if 1<=window_no
@results.keys.sort.each do |n|
rec = @results[n]
s = "%3d: threads=%3d %s" % [n, rec[:threads], s]
status_list = ((rec.keys-[:threads])+[200,500]).uniq.sort
status_list.each do |st|
rec[st] ||= {}
rec[st][:count] ||= 0
rec[st][:time ] ||= 0.0
time_per_req = rec[st][:count]==0 ? 0 : rec[st][:time]/rec[st][:count]
s << " | %3d=%4d(%4.1f)" % [st, rec[st][:count], time_per_req]
end
sync_puts(s)
end
sync_puts("--------")
end
@results[window_no] ||= {}
@results[window_no][status] ||= {}
@results[window_no][status][:count] ||= 0
@results[window_no][status][:count] += 1
@results[window_no][status][:time ] ||= 0.0
@results[window_no][status][:time ] += time
end
nil
end
def send_request(thread_no, counter)
start_time = Time.now
begin
open("#{@url}?sleep_sec=#{@server_sleep_sec}") do |f|
f.read
record_result(f.status[0].to_i, Time.now-start_time)
end
rescue OpenURI::HTTPError => e
record_result(e.io.status[0].to_i, Time.now-start_time)
end
end
def start_thread(_thread_no)
Thread.new(_thread_no) do |thread_no|
begin
counter = 0
loop{
counter += 1
send_request(thread_no, counter)
}
rescue => e
puts "#{e.inspect}\n #{e.backtrace.join("\n ")}"
end
end
end
def start
@start_time = Time.now
@threads = []
loop_counter = 0
loop{
loop_counter += 1
th = start_thread(loop_counter)
@threads << th
sleep(@thread_start_interval)
}
threads.each{|th| th.join }
end
end
SpinupTest.new.start
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment