Created
October 10, 2021 18:27
-
-
Save kapcod/b7bd7c3c0c3c120321a9ed6c4cefb6a9 to your computer and use it in GitHub Desktop.
Airflow DAG log cleaner. Assumes log structure /var/log/airflow/<dag>/<operator>/<date>. Call: `cleanup_airflow_logs.rb 7` to retain only last week of logs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
# frozen_string_literal: true | |
require 'fileutils' | |
require 'time' | |
class AirflowLogCleaner | |
attr_reader :list, :to_delete | |
def list_folders | |
@list ||= Dir['/var/log/airflow/*/*/*'] | |
end | |
def select_to_delete(retention_days) | |
puts "Scanning #{list.size} folders..." | |
threshold = Time.now - retention_days * 24 * 3600 | |
@to_delete = [] | |
list.each_with_index do |dir, index| | |
time = Time.parse(File.basename(dir)) | |
@to_delete << dir if time < threshold | |
end | |
end | |
def delete! | |
puts "Deleting #{to_delete.size} folders..." | |
to_delete.each_with_index do |dir, index| | |
FileUtils.rm_r(dir) | |
sleep 1 if (index+1) % 1000 == 0 # limit disk io below 1000/s | |
puts "Deleted #{index+1}/#{to_delete.size}" if (index+1) % 10000 == 0 | |
rescue => e | |
puts "Error for #{dir}: #{e}" | |
end | |
end | |
def perform(retention_days) | |
start_ts = Time.now.to_i | |
puts "Pulling folders list" | |
list_folders | |
select_to_delete(retention_days) | |
delete! | |
puts "Done. Took #{Time.now.to_i - start_ts}s" | |
end | |
def count_by_op | |
list.map{|dir| dir.split('/')[4..] }.group_by{|a,b,c| a}.transform_values {|ar| ar.group_by {|a,b,c| b }.transform_values(&:size)} | |
end | |
def count_by_dag | |
count_by_op.transform_values { |ar| ar.sum { |k, v| v } } | |
end | |
def inspect | |
"\#<#{self.class.name}: #{object_id}, #{{list: @list&.size, to_delete: @to_delete&.size}.inspect}>" | |
end | |
end | |
retention_days = ARGV[0].to_i | |
if retention_days > 0 | |
AirflowLogCleaner.new.perform(retention_days) | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment