Skip to content

Instantly share code, notes, and snippets.

@FrankGrimm
Created February 28, 2020 18:13
Show Gist options
  • Save FrankGrimm/b443c6eca76501b382356b03204904b1 to your computer and use it in GitHub Desktop.
Save FrankGrimm/b443c6eca76501b382356b03204904b1 to your computer and use it in GitHub Desktop.
dump unread items in the canto feed reader (for use in wtfutil)
#!/usr/bin/python3
# sudo /usr/bin/python3 -m pip install termcolor
from termcolor import colored
import shutil
import sys
import logging
from canto_next.client import CantoClient
from canto_next.plugins import PluginHandler, Plugin, try_plugins, set_program # TODO cleanup
import json
TERM_WIDTH = shutil.get_terminal_size( (80, 20) )[0]
logging.basicConfig(
format = "%(asctime)s : %(name)s -> %(message)s",
datefmt = "%H:%M:%S",
level = logging.ERROR
)
class MockPlugin(Plugin):
pass
class FeedDump(PluginHandler, CantoClient):
def __init__(self):
self.plugin_attrs = {}
optl = self.common_args("h", ["help"], "cantodump.py")
if optl == -1:
print("optl-1")
sys.exit(-1)
set_program("canto-feeddump")
try_plugins(self.conf_dir, self.plugin_default, self.disabled_plugins, self.enabled_plugins)
PluginHandler.__init__(self)
self.plugin_class = MockPlugin
self.update_plugin_lookups()
try:
if self.port < 0:
self.start_daemon()
CantoClient.__init__(self, self.socket_path)
else:
CantoClient.__init__(self, None,\
port = self.port, address = self.addr)
except Exception as e:
print("Error: %s" % e)
print(self.socket_path)
sys.exit(-1)
#print("[initialized]")
def dump(self):
state = "unread"
self.write("LISTTAGS","")
tags = self._wait_response("LISTTAGS")
#print("[tags] %s" % ", ".join(tags))
for tag in tags:
tagname = tag
if tagname.startswith("maintag:"):
tagname = tagname[len("maintag:"):]
tag_unread = self.items(tag)
if tag_unread is None or len(tag_unread) == 0:
continue
print(colored(tagname, "white", attrs=["bold"]) + " [" + colored("%s" % len(tag_unread), "green") + "]")
OFFSET = 2
for item in tag_unread:
title = item['title'] or ''
if len(title) > (TERM_WIDTH - OFFSET):
title = title[:TERM_WIDTH - OFFSET - 3] + "..."
print(" " * OFFSET + title)
def items(self, tag):
self.write("AUTOATTR", [ "canto-state", "title", "link" ])
self.write("ITEMS", [ tag ])
items = []
cmd, r = self._wait_response(None)
while cmd != "ITEMSDONE":
if cmd == "ITEMS":
items.extend(r[tag])
cmd, r = self._wait_response(None)
if len(items) == 0:
return []
attrs = self._wait_response("ATTRIBUTES")
res = []
for key, val in attrs.items():
key = json.loads(key)
item = {"id": key["ID"]}
item['title'] = val['title']
item['link'] = val['link']
res.append(item)
return res
# from CantoRemote
def _wait_response(self, cmd):
r = None
while True:
r = self.read()
if type(r) == int:
if r == 16:
print("Server hung up.")
else:
print("Got code: %d" % r)
print("Please check daemon-log for exception.")
return
elif type(r) == tuple:
if not cmd:
return r
if r[0] == cmd:
return r[1]
elif r[0] == "ERRORS":
print("ERRORS!")
for key in list(r[1].keys()):
for val, err in r[1][key]:
print("%s -> %s: %s" % (key, val, err))
elif r:
print("Unknown return: %s" % r)
break
return None
fd = FeedDump()
fd.dump()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment