Last active
December 28, 2015 04:59
-
-
Save tejasbubane/7446106 to your computer and use it in GitHub Desktop.
Parser for all log file in my directory and put the data into mongodb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'mongo' | |
require 'time' | |
include Mongo | |
mongo_client = MongoClient.new("localhost",27017) | |
db = mongo_client.db("logs") | |
requests = db.collection("requests") # master database | |
date_logs = db.collection("date_logs") # One record per day | |
user_details = db.collection("user_details") | |
items = db.collection("items") # Item specific information | |
filenames = db.collection("filenames") # stores all processed filenames | |
# Only unprocessed files will be processed | |
cnt = 0 | |
file_cnt = 0 | |
Dir.glob('/media/MediA/test_logs/*.log') do |f| | |
row = {} | |
file_cnt += 1 | |
myfile = filenames.find_one("_id"=>f) | |
next if myfile # do nothing if the file is already processed | |
myfile = { "_id"=>f, "timestamp"=>Time.now.utc } | |
filenames.insert(myfile) # file processed | |
puts "Processing file #" + file_cnt.to_s | |
puts f | |
file = File.open(f).each do |line| | |
begin | |
line.encode!('UTF-16', 'UTF-8', :invalid => :replace, :replace => '') | |
line.encode!('UTF-8', 'UTF-16') | |
line = line.chop.chop | |
error_line = line | |
data = line.split(',') | |
h = {} | |
data.each do |c| | |
key = c.split('=>').first | |
value = c.split('=>').last | |
key = "" if key.nil? # found some nil keys, replacing them with "" | |
key = key.gsub('.','_') # mongodb doesn't allow a dot (.) in the key, hence a workaround -> replace with _ | |
h[key] = value | |
end | |
h["trans_date"] = Time.parse(h["trans_date"]).utc if h["trans_date"] | |
# hash created, now find the record if it exists [record goes into the date_logs table] | |
if h["device_id"] and h["log_format"] and h["trans_date"] # do nothing if device_id, log_format or trans_date is absent | |
record = date_logs.find_one("_id" => { "trans_date"=>h["trans_date"].to_date.to_time, "partner_id" => h["partner_id"], "channel_id" => h["channel_id"] }) | |
record = { "_id" => { "trans_date"=>h["trans_date"].to_date.to_time, "partner_id" => h["partner_id"], "channel_id" => h["channel_id"] }, "download"=>{ }, "streaming"=>{ }, "activation"=>{ }, "access"=>{ } } unless record # create new record if record does not exist | |
case h["log_format"] | |
when "access" # case 1 | |
user = user_details.find_one("_id" => { "device_id" => h["device_id"], "channel_id" => h["channel_id"], "partner_id" => h["partner_id"] }) | |
if user # user exists (Access) | |
if record["access"][h["device_id"]] then record["access"][h["device_id"]] += 1 else record["access"][h["device_id"]] = 1 end #access count update or initalize | |
else # user does not exist (Activation / first access) | |
user = {"_id"=> {"device_id"=>h["device_id"], "channel_id"=>h["channel_id"], "partner_id"=>h["partner_id"]}, "trans_date"=>h["trans_date"], "ip_address"=>h["ip_address"], "mmn"=>h["mmn"] } | |
user_details.insert(user) | |
record["activation"][h["device_id"]] = h["ip_address"] # no need to find if exists in activation, it will always be unique | |
end | |
when "download" # case 2 | |
if record["download"][h["open_id"]] then record["download"][h["open_id"]] += 1 else record["download"][h["open_id"]] = 1 end #count update or initalize | |
when "streaming" # case 3 | |
if record["streaming"][h["open_id"]] then record["streaming"][h["open_id"]] += 1 else record["streaming"][h["open_id"]] = 1 end #count update or initalize | |
end | |
# all done now store the records | |
requests.insert(h) # master record contains everything | |
date_logs.update({:_id => record["_id"]}, record, { :upsert => true }) | |
item = items.find_one(:_id => {:open_id => h["open_id"], :partner_id => h["partner_id"], :channel_id => h["channel_id"]} ) | |
if item.nil? # create and save the new item | |
item = {:_id => { :open_id => h["open_id"], :partner_id => h["partner_id"], :channel_id => h["channel_id"] }, | |
:count_type => h["count_type"], :item_title => h["item_title"], :track_Artist => h["track_artist"], | |
:product_uri => h["product_uri"], :image_uri => h["image_uri"]} | |
items.insert(item) | |
end | |
cnt += 1 | |
#puts cnt | |
end | |
rescue Exception => e | |
puts e | |
puts h | |
puts record | |
exit | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment