Skip to content

Instantly share code, notes, and snippets.

@kaecy
Last active October 30, 2021 08:30
Show Gist options
  • Save kaecy/a61f74a92a50e37dc40aa83a882c6e6b to your computer and use it in GitHub Desktop.
Save kaecy/a61f74a92a50e37dc40aa83a882c6e6b to your computer and use it in GitHub Desktop.
yt channel feed stuff.
import requests
import json
import yaml
basePath = "https://www.googleapis.com/youtube/v3"
class YTChannelFeed:
def __init__(self, channelId):
self.config = yaml.safe_load(open("feeds/config.yaml"))
self.channelId = channelId
try:
self.feed = json.load(open("feeds/" + channelId + ".json"))
self.feedIndex = self.config[channelId]['feedIndex']
except FileNotFoundError:
ytd = YTData(self.config['key'])
playlist = ytd.getUploadsPlaylist(self.channelId)
self.feed = playlist.get()['items']
self.feed.reverse()
self.feedIndex = 0
# save to file
feedFile = open("feeds/" + self.channelId + ".json", "w")
json.dump(self.feed, feedFile, indent=4)
def getNextItem(self):
if self.feedIndex < len(self.feed):
entry = self.feed[self.feedIndex]
self.feedIndex += 1
return entry
else:
print("Feed %s is finished." % self.channelId)
print("Getting more data.")
if self.update():
return self.getNextItem()
else:
print("No more data.")
def update(self):
ytd = YTData(self.config['key'])
playlist = ytd.getUploadsPlaylist(self.channelId)
playlist.update(self.feed)
feedFile = open("feeds/" + self.channelId + ".json", "w")
json.dump(self.feed, feedFile, indent=4)
class Playlist:
def __init__(self, ytdata, playlistId):
self.ytdata = ytdata
self.playlistId = playlistId
self.pageToken = ""
def get(self):
res = self.ytdata._appRequest("/playlistItems", {
"part": "snippet",
"maxResults": 50,
"playlistId": self.playlistId,
"pageToken": self.pageToken
})
self.pageToken = res['nextPageToken']
return res
def getNew(self, videoId):
items = []
finished = False
while True:
newUploadItems = self.get()['items']
for i in range(len(newUploadItems)):
if newUploadItems[i]['snippet']['resourceId']['videoId'] == videoId:
items.extend(newUploadItems[0:i])
finished = True
if not finished:
items.extend(newUploadItems)
else:
break
items.reverse()
return items
def update(self, videoList):
lastVideoId = videoList[-1]['snippet']['resourceId']['videoId']
newVideos = self.getNew(lastVideoId)
videoList.extend(newVideos)
if len(newVideos) > 0:
return True
else:
return False
class YTData:
def __init__(self, key):
self.key = key
def getUploadsId(self, channelId):
content = self.channels(channelId)
return content['items'][0]['contentDetails']['relatedPlaylists']['uploads']
def channels(self, channelId):
return self._appRequest("/channels", {
"part": "contentDetails",
"id": channelId
})
def getUploadsPlaylist(self, channelId):
uploadsPlaylistId = self.getUploadsId(channelId)
return Playlist(self, uploadsPlaylistId)
def _appRequest(self, method, payload):
payload['key'] = self.key
res = requests.get(basePath + method, params=payload)
return res.json()
class YTConfig:
def __init__(self):
self.config = yaml.safe_load(open("feeds/config.yaml"))
def saveFeedState(self, channel):
if channel.channelId not in self.config:
config[self.channelId] = {}
self.config[channel.channelId]['feedIndex'] = channel.feedIndex
self.config[channel.channelId]['feed'] = len(channel.feed)
def save(self):
yaml.dump(self.config, open("feeds/config.yaml", "w"))
if __name__ == "__main__":
# Create a YTChannelFeed which allows for a continuous stream of content.
channelId = "UCSPEjw8F2nQDtmUKPFNF7_A" # NHK Japan channel
feed = YTChannelFeed(channelId)
# Get metadata of video in the feed.
entry = feed.getNextItem()
# If the feed is at the end it performs an update for more content.
# If there is any retrives a new entry from the new content otherwise it will return None.
title = entry['snippet']['title']
videoId = entry['snippet']['resourceId']['videoId']
post = "[NHKJPN] " + title + "\n" + \
" - [Channel] www.youtube.com/channel/%s\n" % feed.channelId + \
" - [Video] http://www.youtube.com/watch?v=%s" % videoId
print(post);
# This is how we save the config data so we can resume where we left.
#config = YTConfig()
#config.saveFeedState(feed)
#config.save()
# The YTChannelFeed then uses the config when you recreate the feeds,
# restoring the previous state.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment