-
-
Save naviat/7cf3f136f8e96b5de0961a95e6ebc38c to your computer and use it in GitHub Desktop.
Elasticsearch: calculating user sessions with Map/Reduce (Ruby)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# The same algorithm which is used in Google Analytics (https://support.google.com/analytics/answer/2731565?hl=en): | |
# Time-based expiry (including end of day): | |
# - After 30 minutes of inactivity | |
# - At midnight | |
# Enable dynamic scripting for Groovy (https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-scripting.html#_enabling_dynamic_scripting) | |
# ! WARNING: please read about security first | |
# Usage: | |
# | |
# > Elastic::CalcUserSessionsService.new(from_date: 1.day.ago, to_date: Time.current, user_ids: [1]).execute | |
# | |
# { | |
# 1 => [ | |
# { | |
# day: Thu, 28 May 2015, | |
# sum: 125, | |
# average: 25, | |
# median: 20 | |
# } | |
# ] | |
# } | |
class Elastic::CalcUserSessionsService | |
INDEX_NAME = "your_index".freeze | |
TIME_FIELD = "read_at".freeze | |
MILLISECONDS_IN_SEC = 1_000 | |
MILLISECONDS_IN_MIN = 60 * MILLISECONDS_IN_SEC | |
INACTIVITY_DURATION_IN_MILLISECONDS = 30 * MILLISECONDS_IN_MIN | |
attr_reader :params | |
def initialize(params) | |
@params = params | |
end | |
def execute | |
result = {} | |
response = elastic_adapter.search(index: INDEX_NAME, body: query) | |
response["aggregations"]["user_ids"]["buckets"].each do |user_bucket| | |
result[user_bucket["key"]] = [] | |
user_bucket["by_days"]["buckets"].each do |session_bucket| | |
next if session_bucket["sessions"]["value"].blank? | |
result[user_bucket["key"]] << { | |
day: Time.at(session_bucket["key"] / MILLISECONDS_IN_SEC).to_date, | |
sum: session_bucket["sessions"]["value"]["sum"].to_i, | |
average: session_bucket["sessions"]["value"]["average"].to_i, | |
median: session_bucket["sessions"]["value"]["median"].to_i | |
} | |
end | |
end | |
result | |
end | |
def query | |
{ | |
size: 0, | |
query: { | |
filtered: { | |
filter: { | |
bool: { | |
must: must_filters, | |
should: should_filters | |
} | |
} | |
} | |
}, | |
aggs: { | |
user_ids: { | |
terms: { | |
field: "user_id" | |
}, | |
aggs: { | |
by_days: { | |
date_histogram: { | |
field: TIME_FIELD, | |
interval: "1d" | |
}, | |
aggs: { | |
sessions: { | |
scripted_metric: { | |
init_script: "_agg['read_ats'] = []", | |
map_script: "_agg.read_ats.add(doc['#{ TIME_FIELD }'].value)", | |
combine_script: oneliner(%Q{ | |
sessions = [] | |
if (_agg.read_ats.size() < 2) { | |
return sessions | |
} | |
_agg.read_ats.sort() | |
session_started_at = _agg.read_ats[0] | |
previous_read_at = session_started_at | |
last_read_at = _agg.read_ats[-1] | |
for (read_at in _agg.read_ats[1..-1]) { | |
if (read_at - previous_read_at > #{ INACTIVITY_DURATION_IN_MILLISECONDS }) { | |
if (previous_read_at - session_started_at != 0) { | |
sessions << (previous_read_at - session_started_at) / #{ MILLISECONDS_IN_MIN } | |
} | |
session_started_at = read_at | |
} else if (read_at == last_read_at && read_at - session_started_at != 0) { | |
sessions << (last_read_at - session_started_at) / #{ MILLISECONDS_IN_MIN } | |
} | |
previous_read_at = read_at | |
} | |
return sessions | |
}), | |
reduce_script: oneliner(%Q{ | |
sessions = [] | |
stats = [:] | |
for (shard_sessions in _aggs) { sessions.addAll(shard_sessions) } | |
session_count = sessions.size() | |
if (session_count == 0) { return stats } | |
sessions.sort() | |
median_session_position1 = (int)((session_count - 1) / 2) | |
median_session_position2 = (int)(session_count / 2) | |
stats.median = (sessions[median_session_position1] + sessions[median_session_position2]) / 2 | |
stats.sum = sessions.sum() | |
stats.average = stats.sum / session_count | |
return stats | |
}) | |
} | |
} | |
} | |
} | |
} | |
} | |
} | |
} | |
end | |
private | |
def oneliner(code) | |
code.gsub(/\A\s*|\s*\z/, "").gsub(/\n[ \n]*/, "; ").squish.gsub("{;", "{").gsub("; }", " }") | |
end | |
def must_filters | |
[ | |
{ range: { read_at: { gte: params[:from_date], lte: params[:to_date] } } } | |
] | |
end | |
def should_filters | |
params[:user_ids].inject([]) do |result, user_id| | |
result << { term: { user_id: user_id } } | |
end | |
end | |
def elastic_adapter | |
Elasticsearch::Client.new(hosts: ElasticSettings.hosts, log: ElasticSettings.log) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment