Skip to content

Instantly share code, notes, and snippets.

@perpen
Created July 27, 2022 19:28
Show Gist options
  • Save perpen/7d4f4223429b59190314fc63977d9235 to your computer and use it in GitHub Desktop.
Save perpen/7d4f4223429b59190314fc63977d9235 to your computer and use it in GitHub Desktop.
A textual status bar for use with dwm, tmux, etc.
#!/usr/bin/env ruby
# frozen_string_literal: true
## TODO
# - use ~/bin/laptop for more things
PROG = 'status-bar'
NPROC = `nproc`.to_i
if true
# solarized light
# MYFG = '#657b83'
MYFG = '#111111'
MYBG = '#fdf6e3'
else
# solarized dark
MYFG = '#839496'
MYBG = '#002b36'
end
class AbstractMonitor
attr_reader :title, :display
def initialize(title, delay: 60)
@title = title
@delay = delay
@value = nil
@default_color = MYFG
@display = nil
Thread.new do
loop do
if is_out_of_date?
@value = compute
@compute_time = Time.now
end
@display = if @value.nil?
nil
else
"#{@title}#{@value}"
end
sleep(1)
end
end
end
def sh(cmd)
out = `#{cmd}`.chomp
estatus = $?.exitstatus
if estatus != 0
warn "error code #{estatus} running: #{cmd}: #{out}"
out = 'error'
end
out
end
def with_color(color, x)
if $colorised
"#[fg=#{color}]#{x}#[@default_color]"
else
x
end
end
def white(x)
with_color MYFG, x
end
def magenta(x)
with_color 'magenta', x
end
def green(x)
with_color 'green', x
end
def blue(x)
with_color 'blue', x
end
def red(x)
with_color 'red', x
end
def pad_percentage(perc)
perc = 0 if perc.nil?
format('%3d%%', perc)
end
def pad_string(s, width)
s = '' if s.nil?
s = s.to_s
s = " #{s}" while s.length < width
s
end
private
def is_out_of_date?
@compute_time.nil? || (Time.now - @compute_time >= @delay)
end
end
class Repos < AbstractMonitor
def initialize(title, delay)
super(title, delay: delay)
end
def compute
dirty = sh 'repos status --behind'
red dirty unless dirty == 'ok'
end
end
class Secrets < AbstractMonitor
def initialize(title, delay)
super(title, delay: delay)
end
def compute
dirty = sh 'secrets'
red dirty unless dirty == 'ok'
end
end
class Security < AbstractMonitor
def initialize(title, delay)
super(title, delay: delay)
end
def compute
dirty = sh('security -q')
red dirty unless dirty == 'ok'
end
end
class GoogleCloud < AbstractMonitor
def initialize(title, delay)
super(title, delay: delay)
end
def compute
dirty = sh 'gcloud2 status'
red dirty unless dirty == 'ok'
end
end
class TimeMonitor < AbstractMonitor
def initialize(title, delay: 10)
super(title, delay: delay)
end
def compute
magenta(Time.now.strftime('%a %d %b ')) + white(Time.now.strftime('%H:%M '))
# magenta Time.now.strftime('%a %d %b')
end
end
class MemoryMonitor < AbstractMonitor
def initialize(title, delay: 30, visibility: 0, critical: 50)
super(title, delay: delay)
@visibility = visibility
@critical = critical
end
def compute
reading = `free -m`
total, used, free, shared, buf_cache, available = reading[/Mem:(.+)/, 1].split
# puts total, used, free, shared, buf_cache, available
# swap = reading[/Swap:(.+)/, 1].split[1]
percent_used = (100 * (total.to_f - available.to_f) / total.to_f).floor
if percent_used > @visibility
if percent_used > @critical
red "#{percent_used}%"
else
green "#{percent_used}%"
end
end
end
end
class DiskMonitor < AbstractMonitor
def initialize(title, delay: 30, visibility: 0, critical: 50)
super(title, delay: delay)
@visibility = visibility
@critical = critical
end
def compute
reading = `df /home`
fs, blocks, used, avail, percentage, mnt = reading[%r{/dev.*}].split
percent_used = percentage[/\d+/].to_i
if percent_used > @visibility
if percent_used > @critical
red "#{percent_used}%"
else
green "#{percent_used}%"
end
end
end
end
class VpnMonitor < AbstractMonitor
def initialize(title, delay: 30)
super(title, delay: delay)
end
def compute
reading = `expressvpn status`
red 'vpn: on' if reading.include? 'Connected'
end
end
class Top < AbstractMonitor
def initialize(title, delay: 10, visibility: 0)
super(title, delay: delay)
@visibility = visibility
@cpus = NPROC
@t = Thread.new { run_top }
end
def compute
top_command_name = (@t['top_command_name'] || '?')
top_command_cpu = (@t['top_command_cpu'] || 0)
total_cpu = (@t['total_cpu'] || 0)
if total_cpu >= @visibility
"#{blue pad_string(top_command_name,
14)} #{blue pad_percentage(top_command_cpu)} / #{red pad_percentage(total_cpu)}"
end
end
def run_top
Thread.current['top_command_name'] = '?'
Thread.current['top_command_cpu'] = 0
Thread.current['total_cpu'] = 0
top_delay = @delay > 5 ? (top_delay.to_f / 2).round : @delay
cmd = "top -b -i -d #{top_delay}"
IO.popen(cmd) do |subprocess|
procs = {}
procs_count = {}
while s = subprocess.readline
if s =~ /^ *[0-9]+ /
top_process = s
# puts top_process.split
pid, user, pr, ni, virt, res, shr, s, cpu, mem, time, command = top_process.split
cpu = cpu.to_f
if procs[command].nil?
procs[command] = 0
procs_count[command] = 0
end
procs[command] = procs[command] + cpu
procs_count[command] += 1
elsif s =~ /^$/
unless procs.empty?
command, cpu = procs.entries.min { |a, b| b[1] <=> a[1] }
command += "[#{procs_count[command]}]" if procs_count[command] > 1
Thread.current['top_command_name'] = command
Thread.current['top_command_cpu'] = (cpu.to_f / @cpus).round
end
procs = {}
procs_count = {}
elsif s =~ /^.Cpu/
cpu_line = s.chomp
rx = /%Cpu\(s\): *([\d.]+) *us, *([\d.]+) *sy, *([\d.]+) *ni, *([\d.]+) *id, *([\d.]+) *wa, *([\d.]+) *hi, *([\d.]+) *si, *([\d.]+) *st/
us, sy, ni, id, wa, hi, si, st = cpu_line.scan(rx).first
if us.nil?
warn "unable to match on line: '#{cpu_line}'"
total_cpu = 99
else
total_cpu = (us.to_f + sy.to_f + ni.to_f).round
end
Thread.current['total_cpu'] = total_cpu
end
end
end
end
end
class BatteryMonitor < AbstractMonitor
def initialize(title, delay: 120)
super(title, delay: delay)
@enabled = File.exist? '/usr/bin/acpi'
@state = '?'
end
def compute
return unless @enabled
reading = sh('acpi -b')
# puts(reading)
if matches = reading.match(/Battery 0: ([^,]+), (\d+%), (.*)$/)
state_s = matches[1]
level_s = matches[2][/\d+/]
duration_s = matches[3]
state = case state_s
when 'Charging'
'C '
when 'Discharging'
'D '
else
''
end
if state != @state
@state = state
system 'laptop adjust-profile >/dev/null'
end
duration = (duration_s != '0:00' and state_s =~ /(Disc|C)harging/) ? " (#{duration})" : ''
if duration_s =~ /zero rate/
level = 'full'
duration = ''
end
duration = duration_s.sub(' remaining', '').sub(' until charged', '')
duration = duration.sub(/(..:..):../, '\1')
if level == 'full'
nil
else
# if state == "D" and level_s.to_i > 0 and level_s.to_i < 5
# system "logger 'status-bar: powering off as battery at #{level_s}%'"
# system "sudo poweroff < /dev/null > /dev/null"
# end
"#{level_s}% #{state}#{level}#{duration}"
end
end
end
end
class WifiMonitor < AbstractMonitor
def initialize(title, delay: 10, visibility: 0, interface: 'wlan0')
super(title, delay: delay)
@visibility = visibility
@interface = interface
@site = 'google.com'
@ping_thread = Thread.new { run_ping }
end
def compute
reading = `iw dev #{@interface} link 2>&1`
alive = reading.match(/SSID:/)
if alive
signal = `iw dev #{@interface} link`.match(/signal: -(\d+)/)[1].to_i
@quality = ((90 - signal) * 100 / 60).floor
@ssid = reading.match(/SSID: ([^ ]+)$/)[1]
connected = @ping_thread['connected']
if @quality < @visibility or !connected
"#{@ssid}: #{connected ? ' ' : '!'}#{@quality}%"
end
else
"#{@interface}: off"
end
end
def run_ping
Thread.current['connected'] = true
cmd = "ping -i#{@delay} #{@site} 2>&1"
loop do
begin
IO.popen(cmd) do |subprocess|
while s = subprocess.readline
Thread.current['connected'] = if s =~ /bytes from/
true
else
false
end
end
end
rescue StandardError
# Nothing
end
Thread.current['connected'] = false
sleep(@delay)
end
end
end
class ThermalMonitor < AbstractMonitor
def initialize(title, delay: 30, visibility: 50)
super(title, delay: delay)
@visibility = visibility
@enabled = system 'laptop temp >/dev/null'
end
def compute
return unless @enabled
reading = `laptop temp`.to_i
"#{reading}C" if reading > @visibility
end
end
class VolumeMonitor < AbstractMonitor
def initialize(title, delay: 10)
super(title, delay: delay)
@enabled = system 'laptop volume level >/dev/null 2>&1'
end
def compute
return unless @enabled
vol = sh('laptop volume level').to_i
unmuted = sh('laptop volume unmuted')
mute = (unmuted == 'n' ? '-' : ' ')
"#{mute}#{pad_percentage vol}"
end
end
class BrightnessMonitor < AbstractMonitor
SCRIPT = 'laptop brightness'
def initialize(title, delay: 10)
super(title, delay: delay)
@max_level = sh("#{SCRIPT} max").to_i
end
def compute
reading = sh("#{SCRIPT} value").chomp.to_i or 0
perc = (100 * reading / @max_level).floor
perc_s = pad_percentage(perc)
perc_s.to_s
end
end
class ArchMonitor < AbstractMonitor
def initialize(title, delay: 600, visibility: 0)
super(title, delay: delay)
@visibility = visibility
@enabled = File.exist? '/usr/bin/pacman'
end
def compute
return unless @enabled
reading = `sudo pacman -Syu < /dev/null 2> /dev/null`
packages = reading[/^Packages \((\d+)\)/, 1].to_i
"#{red(packages)} updates" if packages >= @visibility
end
end
class VmMonitor < AbstractMonitor
def initialize(title, delay: 600, visibility: 1)
super(title, delay: delay)
@enabled = File.exist? '/usr/bin/virsh'
@visibility = visibility
end
def compute
return unless @enabled
reading = `virsh list --state-running --id 2>/dev/null`.chomp.split.count
red(reading).to_s if reading >= @visibility
end
end
class DnsmasqCpu < AbstractMonitor
def initialize(title)
delay = 60
minutes = 5
super(title, delay: delay)
@count = 0
@max = minutes * 60 / delay
end
def compute
rx = 'dnsmasq -k'
reading = `(pgrep -f '#{rx}' | xargs ps -o %cpu,%mem,cmd -p | grep -v '%CPU') | sort -rn | head -1 | awk '{print $1}'`.chomp
cpu = reading.to_i
if cpu > @max
@count += 1
if @count > @max
puts 'killing dnsmasq'
system "sudo pkill -9 '#{rx}'"
end
return "dnsmasq: #{cpu}% cpu"
end
@count = 0
nil
end
end
class GoogleDrive < AbstractMonitor
def initialize(title, delay: 30)
super(title, delay: delay)
end
def compute
reading = `gd status`.chomp
reading if reading == 'mounted'
end
end
$MODES = {
'text' => {
'colorised' => false,
'formatter' => lambda { |values|
all = values.join(' | ')
puts all
}
},
'dwm' => {
'colorised' => false,
'formatter' => lambda { |values|
all = values.join(' | ')
unless system "xsetroot -name '#{all}'"
puts 'call to xsetroot failed, we probably left x, exiting'
exit 1
end
}
},
'tmux' => {
'colorised' => true,
'formatter' => lambda { |values|
File.open("/var/tmp/#{PROG}-tmux.out", 'w') do |f|
f.write "#[fg=#{MYFG}]" + values.join("#[fg=#{MYFG}] | ")
end
}
},
'statuswin' => {
'colorised' => false,
'formatter' => lambda { |values|
File.open("/var/tmp/#{PROG}-statuswin.out", 'w') do |f|
f.write values.join("\n") + "\n"
end
}
}
}
user = `whoami`.chomp
if user == 'root'
puts 'refusing to run as root, aborting'
exit 2
end
mode = ARGV[0]
config = $MODES[ARGV[0]]
unless config
puts "Usage: $0 (#{$MODES.keys.join('|')})"
exit 2
end
$colorised = config['colorised']
formatter = config['formatter']
host_monitors = []
if `hostname`.chomp == 'ptoseis'
host_monitors = [
DnsmasqCpu.new(''),
GoogleDrive.new('google-drive: ', delay: 45),
VmMonitor.new('mach: ', delay: 60),
VpnMonitor.new('', delay: 15),
VolumeMonitor.new('vol: ', delay: 3),
# BrightnessMonitor.new("lcd: ", delay: 10),
WifiMonitor.new('', delay: 5, visibility: 30, interface: 'wlan0'),
# ThermalMonitor.new("temp: ", delay: 10, visibility: 55),
BatteryMonitor.new('batt: ', delay: 60),
ArchMonitor.new('arch: ', delay: 3600, visibility: 100)
]
elsif `hostname`.chomp == 'dell'
host_monitors = [
VolumeMonitor.new('vol: ', delay: 5),
# BrightnessMonitor.new("lcd: ", delay: 10),
WifiMonitor.new('', delay: 5, visibility: 32, interface: 'wlo1'),
# ThermalMonitor.new("temp: ", delay: 10, visibility: 55),
BatteryMonitor.new('batt: ', delay: 60),
ArchMonitor.new('arch: ', delay: 3600, visibility: 100)
]
end
monitors = host_monitors + [
Top.new('', delay: 2, visibility: 30),
Security.new('sec: ', 60 * 60),
Secrets.new('', 120),
Repos.new('git: ', 120),
GoogleCloud.new('gcp: ', 180),
DiskMonitor.new('disk: ', delay: 120, visibility: 70, critical: 80),
MemoryMonitor.new('ram: ', delay: 30, visibility: 80, critical: 80),
TimeMonitor.new('', delay: 10)
]
pids = `pgrep -f "ruby .*#{PROG} +#{mode}"`.split.reject { |pid| pid == $$.to_s }
pids.collect { |pid| system "kill #{pid}" }
loop do
values = monitors.map do |monitor|
monitor.display
rescue StandardError
puts "#{monitor} error: #{$ERROR_INFO}"
"#{monitor} ERROR"
rescue StandardError
puts $ERROR_INFO
"#{monitor} ERROR"
end
# Remove nil values
values = values.reject(&:nil?)
formatter.call(values)
sleep(1)
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment