Last active
January 10, 2022 23:16
-
-
Save ali5h/313795d59931c4d128d2 to your computer and use it in GitHub Desktop.
Creating IMDB Top 250 Collection in Plex Media Server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
""" | |
Downloads and creates a CSV file from IMDB Top 250 list | |
""" | |
from pyquery import PyQuery as pq | |
import csv | |
import datetime | |
# for UnicodeWriter | |
import codecs | |
import cStringIO | |
class UnicodeWriter: | |
""" | |
A CSV writer which will write rows to CSV file "f", | |
which is encoded in the given encoding. | |
""" | |
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds): | |
# Redirect output to a queue | |
self.queue = cStringIO.StringIO() | |
self.writer = csv.writer(self.queue, dialect=dialect, **kwds) | |
self.stream = f | |
self.encoder = codecs.getincrementalencoder(encoding)() | |
def writerow(self, row): | |
self.writer.writerow([s.encode("utf-8") for s in row]) | |
# Fetch UTF-8 output from the queue ... | |
data = self.queue.getvalue() | |
data = data.decode("utf-8") | |
# ... and reencode it into the target encoding | |
data = self.encoder.encode(data) | |
# write to the target stream | |
self.stream.write(data) | |
# empty queue | |
self.queue.truncate(0) | |
def writerows(self, rows): | |
for row in rows: | |
self.writerow(row) | |
movies = pq('http://www.imdb.com/chart/top')(".chart .titleColumn") | |
urls = [h.attr("href") for h in movies("a").items()] | |
titles = [h.text() for h in movies("a").items()] | |
years = [h.text().strip('()') for h in movies(".secondaryInfo").items()] | |
filename = "imdb-top250-%s.csv" % str(datetime.date.today()) | |
f = open(filename, 'wb') | |
w = UnicodeWriter(f) | |
imdb250 = [['imdburl', 'title', 'year']] | |
for i in range(movies.length): | |
row = [urls[i], titles[i], years[i]] | |
imdb250.append(row) | |
w.writerows(imdb250) | |
f.close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
""" | |
Creates a tag and tag all owned movies from CSV | |
v0.02 | |
Best to back up database, but seems to work relatively well | |
""" | |
import os | |
import platform | |
import sqlite3 | |
import re | |
import csv | |
import getopt | |
import sys | |
DRYRUN = True #Don't UPDATE databse | |
REPORT = False #Print out unowned films | |
def db_exec(cursor, query): | |
"""Wrap DB query so we can print in DRYRUN mode""" | |
if not DRYRUN: | |
cursor.execute(query) | |
else: | |
#print query | |
pass | |
def connect_db(): | |
"""Connect to Plex DB (linux only at the moment)""" | |
#OSX | |
pf = platform.system() | |
if pf == 'Darwin': | |
homedir = os.path.expanduser("~") | |
conn = sqlite3.connect( | |
'%s/Library/Application Support/Plex Media Server/Plug-in Support/' | |
'Databases/com.plexapp.plugins.library.db' % (homedir)) | |
elif pf == 'Linux': | |
conn = sqlite3.connect( | |
'/var/lib/plexmediaserver/Library/Application Support/' | |
'Plex Media Server/Plug-in Support/Databases/' | |
'com.plexapp.plugins.library.db') | |
elif pf == 'Windows': | |
localappdata = os.getenv('LOCALAPPDATA') | |
conn = sqlite3.connect( | |
'%s\\Plex Media Server\\Plug-in Support\\' | |
'Databases\\com.plexapp.plugins.library.db' % (localappdata)) | |
cursor = conn.cursor() | |
return (conn, cursor) | |
def close_db(cursor): | |
"""Close DB cursor""" | |
cursor.close() | |
def generate_field_string(user_fields): | |
"""Take existing user_fields from DB and add locks for tag""" | |
fields = "" | |
if user_fields: | |
fields = re.split('=', user_fields)[1] | |
if fields: | |
items = fields.split('|') | |
items = [int(x) for x in items] | |
items.append(15) | |
items.append(16) | |
myset = set(items) #get rid of dupes | |
items = list(myset) | |
items.sort() | |
items = [str(x) for x in items] | |
fields = '|'.join(items) | |
else: | |
fields = "15|16" | |
new_fields = "lockedFields=%s" % fields | |
return new_fields | |
def remove_tag(tags_collection, tag_to_remove): | |
""" Build up collection string minus tag_to_remove """ | |
new_tags = "" | |
items = tags_collection.split('|') | |
for i in items: | |
if i != tag_to_remove: | |
if not new_tags: | |
new_tags = i | |
else: | |
new_tags = "%s|%s" % (new_tags, i) | |
else: | |
continue | |
return new_tags | |
def update_metadata_items(cursor, metadata_id, tag_title, untag=False): | |
"""Update tags and user_fields in metadata_items table""" | |
cursor.execute('SELECT tags_collection, user_fields ' | |
'FROM metadata_items WHERE id="%s"' % (metadata_id)) | |
item = cursor.fetchone() | |
tags_collection = item[0] | |
user_fields = item[1] | |
new_field_string = generate_field_string(user_fields) | |
if tags_collection: | |
items = tags_collection.split('|') | |
for i in items: | |
if i == tag_title and not untag: | |
return | |
tags_collection = "%s|%s" % (tags_collection, tag_title) | |
else: | |
tags_collection = tag_title | |
if untag: | |
#Passed argument to unwind the tag set for film | |
tags_collection = remove_tag(tags_collection, tag_title) | |
db_exec(cursor, "UPDATE metadata_items SET tags_collection='%s'," | |
"user_fields='%s' WHERE id='%s'" | |
% (tags_collection, new_field_string, metadata_id)) | |
def map_tag_to_metadata(cursor, tag_id, metadata_item_id, untag=False): | |
"""Create new taggings entry if it doesn't already exist""" | |
cursor.execute('SELECT id FROM taggings ' | |
'WHERE tag_id="%s" and metadata_item_id="%s"' | |
% (tag_id, metadata_item_id)) | |
item = cursor.fetchone() | |
if not item: | |
db_exec(cursor, 'INSERT into taggings (metadata_item_id, tag_id,' | |
'created_at) VALUES ("%s", "%s", datetime("now"))' | |
% (metadata_item_id, tag_id)) | |
elif untag: | |
db_exec(cursor, 'DELETE from taggings WHERE ' | |
'metadata_item_id=%s and tag_id=%s' | |
% (metadata_item_id, tag_id)) | |
def insert_new_tag(cursor, title): | |
"""Insert new tag""" | |
db_exec(cursor, "INSERT into tags (tag, tag_type, created_at, updated_at) " | |
"VALUES ('%s', '2', datetime('now'), datetime('now'))" % (title)) | |
return cursor.lastrowid | |
def create_tag(cursor, title): | |
"""Check if tag exists and if not then insert into db""" | |
cursor.execute('SELECT id, tag, tag_type FROM tags ' | |
'WHERE tag_type = 2 and tag = "%s"' % (title)) | |
item = cursor.fetchone() | |
if item: | |
#tag already exists in database | |
tag_id = item[0] | |
else: | |
#tag needs to be inserted | |
tag_id = insert_new_tag(cursor, title) | |
return tag_id | |
def find_imdb_id(imdb_url): | |
"""Extract id from imdb_url""" | |
re1 = '.*?' | |
re2 = '(\\d+)' | |
regex = re.compile(re1 + re2, re.IGNORECASE|re.DOTALL) | |
match = regex.search(imdb_url) | |
if not match: | |
return False | |
imdb_id = "tt" + match.group(1) | |
return imdb_id | |
def fetch_film_ids(cursor, csv_filename): | |
"""Get metadata ids for all films from Plex that appear in ICM CSV file""" | |
csv_imdb_ids = [] | |
plex_imdb_ids = [] | |
metadata_ids = [] | |
title_lookup = {} | |
if not csv_filename: | |
return "Specify csv!" | |
else: | |
fhandle = open(csv_filename, 'rb') | |
reader = csv.reader(fhandle) | |
titles = reader.next() | |
reader = csv.DictReader(fhandle, titles) | |
for row in reader: | |
imdburl = row['imdburl'] | |
imdb_id = find_imdb_id(imdburl) | |
if imdb_id: | |
csv_imdb_ids.append(imdb_id) | |
title_lookup[imdb_id] = row['title'] + " (" + row['year'] + ")" | |
else: | |
print "ERROR: Unable to find IMDB ID for %s" % (row['title']) | |
continue | |
fhandle.close() | |
cursor.execute('SELECT id, title, user_fields, guid ' | |
'FROM metadata_items WHERE metadata_type=1') | |
films = cursor.fetchall() | |
#go through all films in database and try to match from CSV | |
for item in films: | |
imdburl = item[3] | |
imdb_id = find_imdb_id(imdburl) | |
if imdb_id in csv_imdb_ids: | |
#Keep ID lookup for optional reporting | |
plex_imdb_ids.append(imdb_id) | |
#matched movie from csv->db, so save metadata id | |
metadata_ids.append(item[0]) | |
if REPORT: | |
unowned_films = list(set(csv_imdb_ids) - set(plex_imdb_ids)) | |
if unowned_films: | |
print "-------" | |
print "%d of %d films owned" % (len(plex_imdb_ids), | |
len(csv_imdb_ids)) | |
print "%d unowned films:" % (len(unowned_films)) | |
for imdb_id in unowned_films: | |
print "\t" + title_lookup[imdb_id] | |
print "-------" | |
print "%d films owned:" % (len(plex_imdb_ids)) | |
for imdb_id in plex_imdb_ids: | |
print "\t" + title_lookup[imdb_id] | |
print "-------" | |
return metadata_ids | |
def display_usage(): | |
""" Display usage information and exit """ | |
script_name = os.path.basename(__file__) | |
print """Usage: %s [options] | |
Options: | |
-f CSVFILENAME, --file=CSVFILENAME ICM top list CSV export [REQUIRED] | |
-t "TAG TITLE", --tag="TAG TITLE" Name of collection tag [REQUIRED] | |
-x, --execute Commit changes to database, else dry run | |
-r, --report Print list of films to tag and missing films | |
-u, --untag Remove "TAG TITLE" from all listed films, essentially an undo | |
-h, --help Show this help message and exit | |
Example: | |
%s -f 500+essential+cult+movies.csv -t "500 Essential Cult Movies" -x -r | |
The above will tag all owned movies from the .csv file with the tag "500 Essential Cult Movies", commit changes to database and print a report | |
This new collection can then be filtered in Plex | |
""" % (script_name, script_name) | |
sys.exit(2) | |
def main(): | |
""" | |
Why does main need a docstring? | |
""" | |
tag_title = "" | |
csv_filename = "" | |
untag = False | |
try: | |
opts, dummy = getopt.getopt(sys.argv[1:], "f:t:xruh", | |
["file=", "tag=", "execute", "report", "untag", "help"]) | |
except getopt.GetoptError: | |
sys.exit(2) | |
for opt, arg in opts: | |
if opt in ("-f", "--file"): | |
csv_filename = arg | |
if opt in ("-t", "--tag"): | |
tag_title = arg | |
if opt in ("-x", "--execute"): | |
global DRYRUN | |
DRYRUN = False | |
if opt in ("-r", "--report"): | |
global REPORT | |
REPORT = True | |
if opt in ("-u", "--untag"): | |
untag = True | |
if opt in ("-h", "--help"): | |
display_usage() | |
if not csv_filename or not tag_title: | |
display_usage() | |
conn, cursor = connect_db() | |
metadata_ids = fetch_film_ids(cursor, csv_filename) | |
tag_id = create_tag(cursor, tag_title) | |
for meta_id in metadata_ids: | |
map_tag_to_metadata(cursor, tag_id, meta_id, untag) | |
update_metadata_items(cursor, meta_id, tag_title, untag) | |
conn.commit() | |
close_db(cursor) | |
main() |
How do I install this / execute this on ubuntu?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Collection creation code is from here