Skip to content

Instantly share code, notes, and snippets.

@hkwi
Last active September 6, 2016 10:12
Show Gist options
  • Save hkwi/e8307c7a3f1dcfbe2dbb6f9bc040c2a8 to your computer and use it in GitHub Desktop.
Save hkwi/e8307c7a3f1dcfbe2dbb6f9bc040c2a8 to your computer and use it in GitHub Desktop.
network traffic fuse
#! coding: UTF-8
#
# Example: if_fuse.py -i wwan0 -t 450M -c ip link set wwan0 down
#
import time
import pyroute2
import subprocess
import argparse
import atexit
def num_bytes(s):
lv = 1024**("KMGT".find(s[-1].upper()) + 1)
if lv != 1:
return int(s[:-1])*lv
return int(s)
p = argparse.ArgumentParser()
p.add_argument("-i", "--interface", dest="iface")
p.add_argument("-s", "--send", dest="send", type=num_bytes, default=-1)
p.add_argument("-r", "--recv", dest="recv", type=num_bytes, default=-1)
p.add_argument("-t", "--total", dest="total", type=num_bytes, default=-1)
p.add_argument("-c", "--cmd", dest="cmds", nargs="*", action="append")
opt = p.parse_args()
def diff64(prev, cur):
ret = 0
if cur < prev:
ret = 0xffffffffffffffff
return ret + cur - prev
prev = None
tx_bytes = 0
rx_bytes = 0
loop = True
def log_bytes():
print "rx=%d tx=%d" % (rx_bytes, tx_bytes)
atexit.register(log_bytes)
ip=pyroute2.IPRoute()
while loop:
if prev:
time.sleep(1)
for n in ip.link("dump"):
if opt.iface and dict(n["attrs"])["IFLA_IFNAME"]!=opt.iface:
continue
s = dict(n["attrs"])["IFLA_STATS64"]
if prev is None:
prev = s
else:
tx_bytes += diff64(prev["tx_bytes"], s["tx_bytes"])
rx_bytes += diff64(prev["rx_bytes"], s["rx_bytes"])
if opt.recv!=-1 and opt.recv < rx_bytes:
loop = False
elif opt.send!=-1 and opt.send < tx_bytes:
loop = False
elif opt.total!=-1 and opt.total < rx_bytes+tx_bytes:
loop = False
for cmd in opt.cmds:
try:
subprocess.call(cmd)
except Exception:
logging.error("cmd failed", exc_info=True)
[Unit]
Description=Interface fuse command
After=network.target
[Service]
ExecStart=/home/pi/if_fuse_py/bin/python /home/pi/if_fuse/if_fuse.py -i wwan0 -t 450M -c ip link set wwan0 down
[Install]
WantedBy=multi-user.target
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment