Skip to content

Instantly share code, notes, and snippets.

@henryjfry
Last active August 12, 2025 10:19
Show Gist options
  • Save henryjfry/86ee7d2a09143b6c5b86d283cf6b76f0 to your computer and use it in GitHub Desktop.
Save henryjfry/86ee7d2a09143b6c5b86d283cf6b76f0 to your computer and use it in GitHub Desktop.
imdb_methods
def get_imdb_list_ids_api(list_str=None):
import requests, json
url = "https://api.graphql.imdb.com/"
headers = {
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0",
"x-imdb-client-name": "imdb-web-next"
}
base_title_card = """
fragment BaseTitleCard on Title {
id
titleText { text }
primaryImage { url }
releaseYear { year }
titleType { text }
}
"""
query = f"""
query VideoPlaylistWidgetList($id: ID!, $first: Int!, $after: ID) {{
list(id: $id) {{
items(first: $first, after: $after) {{
total
pageInfo {{
endCursor
hasNextPage
}}
edges {{
node {{
listItem {{
... on Title {{
...BaseTitleCard
}}
}}
}}
}}
}}
}}
}}
{base_title_card}
"""
all_items = []
after_cursor = None
total = None
while True:
variables = {
"id": list_str,
"first": 1000,
"after": after_cursor
}
payload = {"query": query,"variables": variables}
#response = requests.post(url, json={"query": query, "variables": variables}, headers=headers)
response = requests.post(url, headers=headers, data=json.dumps(payload))
if response.status_code != 200:
raise Exception(f"Query failed with status code {response.status_code}: {response.text}")
data = response.json()
items_data = data.get("data", {}).get("list", {}).get("items", {})
if total is None:
total = items_data.get("total", 0)
edges = items_data.get("edges", [])
for edge in edges:
node = edge.get("node", {})
title = node.get("listItem", {})
if title:
all_items.append({
"id": title.get("id"),
"title": title.get("titleText", {}).get("text"),
"image": title.get("primaryImage", {}).get("url"),
"year": title.get("releaseYear", {}).get("year"),
"type": title.get("titleType", {}).get("text")
})
page_info = items_data.get("pageInfo", {})
if not page_info.get("hasNextPage"):
break
after_cursor = page_info.get("endCursor")
movies = []
for i in all_items:
movies.append(i['id'])
#return {"total": total,"fetched": len(all_items),"items": all_items}
return movies
def get_imdb_recommendations_api(imdb_id=None):
import requests
import json
url = "https://api.graphql.imdb.com/"
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"User-Agent": "Mozilla/5.0"
}
base_title_card = """
fragment BaseTitleCard on Title {
id
titleText {
text
}
primaryImage {
url
}
releaseYear {
year
}
titleType {
text
}
}
"""
query = f"""
query MoreLikeThis($id: ID!) {{
title(id: $id) {{
...TMD_MoreLikeThis
}}
}}
fragment TMD_MoreLikeThis on Title {{
id
isAdult
moreLikeThisTitles(first: 12) {{
edges {{
node {{
...BaseTitleCard
}}
}}
}}
}}
{base_title_card}
"""
variables = {
"id": imdb_id
}
payload = {
"query": query,
"variables": variables
}
imdb_header = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36'}
response = requests.post(url, headers=headers, data=json.dumps(payload))
if response.status_code != 200:
raise Exception(f"Query failed with status code {response.status_code}: {response.text}")
data = response.json()
edges = data.get("data", {}).get("title", {}).get("moreLikeThisTitles", {}).get("edges", [])
movies = [edge["node"]["id"] for edge in edges if "node" in edge and "id" in edge["node"]]
return movies
def get_imdb_season_episodes(imdb_id, season, first=99):
import requests
import json
url = "https://api.graphql.imdb.com/"
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"User-Agent": "Mozilla/5.0"
}
query = """
query SeasonEpisodes($id: ID!, $season: String!, $first: Int!, $originalTitleText: Boolean!) {
title(id: $id) {
episodes {
episodes(
first: $first
filter: { includeSeasons: [$season] }
sort: { by: EPISODE_THEN_RELEASE, order: ASC }
) {
edges {
node {
id
titleText {
text
}
plot {
plotText {
plaidHtml(showOriginalTitleText: $originalTitleText)
}
}
releaseDate {
year
month
day
}
ratingsSummary {
aggregateRating
voteCount
}
primaryImage {
url
}
series {
displayableEpisodeNumber {
episodeNumber {
displayableProperty {
value {
plainText
}
}
}
displayableSeason {
displayableProperty {
value {
plainText
}
}
}
}
}
}
}
}
}
}
}
"""
variables = {
"id": imdb_id,
"season": str(season),
"first": first,
"originalTitleText": True
}
payload = {
"query": query,
"variables": variables
}
response = requests.post(url, headers=headers, data=json.dumps(payload))
response.raise_for_status()
data = response.json()
edges = data.get("data", {}).get("title", {}).get("episodes", {}).get("episodes", {}).get("edges", [])
episodes = []
for edge in edges:
try:
node = edge.get("node", {})
displayable_episode_number = node.get("series", {}).get("displayableEpisodeNumber", {})
episode_number = displayable_episode_number.get("episodeNumber", {}).get("displayableProperty", {}).get("value", {}).get("plainText")
season_number = displayable_episode_number.get("displayableSeason", {}).get("displayableProperty", {}).get("value", {}).get("plainText")
plot_html = node.get("plot", {}).get("plotText", {}).get("plaidHtml")
episodes.append({
"id": node.get("id"),
"title": node.get("titleText", {}).get("text"),
"plot": plot_html,
"release_date": node.get("releaseDate"),
"rating": node.get("ratingsSummary", {}).get("aggregateRating"),
"vote_count": node.get("ratingsSummary", {}).get("voteCount"),
"image_url": node.get("primaryImage", {}).get("url"),
"episode_number": episode_number,
"season_number": season_number
})
except: continue
return episodes
def get_imdb_userlists(ur_id=None):
import requests
import time
API_URL = "https://graphql.prod.api.imdb.a2z.com/"
HEADERS = {
'Referer': 'https://www.imdb.com/',
'Origin': 'https://www.imdb.com',
'User-Agent': 'Mozilla/5.0',
'Content-Type': 'application/json',
'Accept': 'application/json',
}
query = '''
query ListsPage(
$first: Int!,
$after: ID,
$anyListTypes: [ListTypeId!],
$anyVisibilities: [ListVisibilityId!],
$sort: ListSearchSort,
$urConst: ID
) {
userProfile(input: { userId: $urConst }) {
username { text }
}
userListSearch(
after: $after,
filter: {
anyClassTypes: [LIST],
anyListTypes: $anyListTypes,
anyVisibilities: $anyVisibilities
},
first: $first,
listOwnerUserId: $urConst,
sort: $sort
) {
edges {
node {
...UserListListItemMetadata
}
}
pageInfo {
endCursor
hasNextPage
hasPreviousPage
}
total
}
}
fragment UserListListItemMetadata on List {
author {
nickName
userId
}
id
name {
originalText
}
listType {
id
}
listClass {
id
name { text }
}
description {
originalText {
plainText
}
}
items(first: 0) {
total
}
createdDate
lastModifiedDate
primaryImage {
image {
id
caption { plainText }
height
width
url
}
}
visibility { id }
}
'''
variables = {
"first": 1000,
"after": None,
"anyListTypes": ["TITLES"],
"anyVisibilities": ["PUBLIC", "PRIVATE"],
"sort": {"by": "DATE_MODIFIED", "order": "DESC"},
"urConst": ur_id
}
all_items = []
has_next_page = True
cursor = None
while has_next_page:
variables["after"] = cursor
payload = {
"operationName": "ListsPage",
"query": query,
"variables": variables
}
response = requests.post(API_URL, headers=HEADERS, json=payload)
response.raise_for_status()
data = response.json()
edges = data.get("data", {}).get("userListSearch", {}).get("edges", [])
page_info = data.get("data", {}).get("userListSearch", {}).get("pageInfo", {})
username = data.get("data", {}).get("userProfile", {}).get("username", {}).get("text", "")
all_items.extend(edges)
has_next_page = page_info.get("hasNextPage", False)
cursor = page_info.get("endCursor")
time.sleep(0.3)
imdb_list = {"imdb_list": []}
for item in all_items:
node = item.get("node", {})
list_id = node.get("id")
list_name = node.get("name", {}).get("originalText", "")
list_count = node.get("items", {}).get("total", 0)
imdb_list["imdb_list"].append({list_id: f"IMDB - {list_name} - {list_count}"})
return imdb_list
def get_imdb_watchlist_ids_api(ur_list_str=None):
import requests, json
API_URL = "https://graphql.prod.api.imdb.a2z.com/"
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
"Accept": "application/json",
"Content-Type": "application/json",
"Referer": "https://www.imdb.com/user/%s/" % (str(ur_list_str)),
"Origin": "https://www.imdb.com",
"x-imdb-client-name": "imdb-web-next",
"x-imdb-user-language": "en-GB",
"x-imdb-user-country": "GB"
}
GRAPHQL_QUERY = """
query YourPredefinedListsSidebar($userId: ID!) {
watchlist: predefinedList(classType: WATCH_LIST, userId: $userId) {
...ListPreviewCard
}
}
fragment ListPreviewCard on List {
id
lastModifiedDate
createdDate
name { originalText }
listType { id }
primaryImage {
image {
url height width
caption { plainText }
}
}
items(first: 0) { total }
}
"""
response = requests.post(API_URL, headers=HEADERS, json={"query": GRAPHQL_QUERY, "variables": {"userId": ur_list_str}})
data = response.json()
list_id = data['data']['watchlist']['id']
url = "https://api.graphql.imdb.com/"
movies = get_imdb_list_ids_api(list_str=list_id)
return movies
def get_imdb_list_ids_api(list_str=None):
import requests, json
url = "https://api.graphql.imdb.com/"
headers = {
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0",
"x-imdb-client-name": "imdb-web-next"
}
base_title_card = """
fragment BaseTitleCard on Title {
id
titleText { text }
primaryImage { url }
releaseYear { year }
titleType { text }
}
"""
query = f"""
query VideoPlaylistWidgetList($id: ID!, $first: Int!, $after: ID) {{
list(id: $id) {{
items(first: $first, after: $after) {{
total
pageInfo {{
endCursor
hasNextPage
}}
edges {{
node {{
listItem {{
... on Title {{
...BaseTitleCard
}}
}}
}}
}}
}}
}}
}}
{base_title_card}
"""
all_items = []
after_cursor = None
total = None
while True:
variables = {
"id": list_str,
"first": 1000,
"after": after_cursor
}
payload = {"query": query,"variables": variables}
#response = requests.post(url, json={"query": query, "variables": variables}, headers=headers)
response = requests.post(url, headers=headers, data=json.dumps(payload))
if response.status_code != 200:
raise Exception(f"Query failed with status code {response.status_code}: {response.text}")
data = response.json()
items_data = data.get("data", {}).get("list", {}).get("items", {})
if total is None:
total = items_data.get("total", 0)
edges = items_data.get("edges", [])
for edge in edges:
node = edge.get("node", {})
title = node.get("listItem", {})
if title:
all_items.append({
"id": title.get("id"),
"title": title.get("titleText", {}).get("text"),
"image": title.get("primaryImage", {}).get("url"),
"year": title.get("releaseYear", {}).get("year"),
"type": title.get("titleType", {}).get("text")
})
page_info = items_data.get("pageInfo", {})
if not page_info.get("hasNextPage"):
break
after_cursor = page_info.get("endCursor")
movies = []
for i in all_items:
movies.append(i['id'])
#return {"total": total,"fetched": len(all_items),"items": all_items}
return movies
def get_user_watchlist_id(ur_list_str=None):
import requests, json
API_URL = "https://graphql.prod.api.imdb.a2z.com/"
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
"Accept": "application/json",
"Content-Type": "application/json",
"Referer": "https://www.imdb.com/user/%s/" % (str(ur_list_str)),
"Origin": "https://www.imdb.com",
"x-imdb-client-name": "imdb-web-next",
"x-imdb-user-language": "en-GB",
"x-imdb-user-country": "GB"
}
GRAPHQL_QUERY = """
query YourPredefinedListsSidebar($userId: ID!) {
watchlist: predefinedList(classType: WATCH_LIST, userId: $userId) {
...ListPreviewCard
}
}
fragment ListPreviewCard on List {
id
lastModifiedDate
createdDate
name { originalText }
listType { id }
primaryImage {
image {
url height width
caption { plainText }
}
}
items(first: 0) { total }
}
"""
response = requests.post(API_URL, headers=HEADERS, json={"query": GRAPHQL_QUERY, "variables": {"userId": ur_list_str}})
data = response.json()
watchlist_id = data['data']['watchlist']['id']
return watchlist_id
def get_imdb_videos(imdb_id):
import re, requests
API_URL = "https://graphql.prod.api.imdb.a2z.com/"
HEADERS = {
'Referer': 'https://www.imdb.com/',
'Origin': 'https://www.imdb.com',
'User-Agent': 'Mozilla/5.0'
}
def gqlmin(q):
return re.sub(' {4}', '', q)
query_subpage = '''
query TitleVideoGallerySubPage(
$const: ID!,
$first: Int!,
$filter: VideosQueryFilter,
$sort: VideoSort
) {
title(id: $const) {
titleText { text }
plot { plotText { plainText } }
videoStrip(first: $first, filter: $filter, sort: $sort) {
...VideoGalleryItems
}
}
}
'''
query_pagination = '''
query TitleVideoGalleryPagination(
$const: ID!,
$first: Int!,
$after: ID!,
$filter: VideosQueryFilter,
$sort: VideoSort
) {
title(id: $const) {
videoStrip(first: $first, after: $after, filter: $filter, sort: $sort) {
...VideoGalleryItems
}
}
}
'''
fragment = '''
fragment VideoGalleryItems on VideoConnection {
pageInfo {
endCursor
hasNextPage
}
total
edges {
node {
id
contentType { id }
name { value }
runtime { value }
thumbnail { url }
primaryTitle {
series {
displayableEpisodeNumber {
displayableSeason {
season
}
}
series {
titleText { text }
}
}
}
}
}
}
'''
variables = {
"const": imdb_id,
"first": 50,
"filter": {"maturityLevel": "INCLUDE_MATURE","nameConstraints":{},"titleConstraints":{},"types":["TRAILER"]},
"sort": {"by": "DATE", "order": "DESC"}
}
videos = []
plot_text = ""
item_title = ""
total_videos = None
# First page
pdata = {
'operationName': "TitleVideoGallerySubPage",
'query': gqlmin(query_subpage + fragment),
'variables': variables
}
r = requests.post(API_URL, headers=HEADERS, json=pdata)
r.raise_for_status()
json_data = r.json()
title_data = json_data.get('data', {}).get('title', {})
plot_text = title_data.get('plot', {}).get('plotText', {}).get('plainText', "")
item_title = title_data.get('titleText', {}).get('text', "")
video_data = title_data.get('videoStrip', {})
total_videos = video_data.get('total')
videos.extend([edge.get('node', {}) for edge in video_data.get('edges', [])])
cursor = video_data.get('pageInfo', {}).get('endCursor')
has_next = video_data.get('pageInfo', {}).get('hasNextPage', False)
# Pagination loop
while has_next and cursor:
variables["after"] = cursor
pdata = {
'operationName': "TitleVideoGalleryPagination",
'query': gqlmin(query_pagination + fragment),
'variables': variables
}
r = requests.post(API_URL, headers=HEADERS, json=pdata)
r.raise_for_status()
video_data = r.json().get('data', {}).get('title', {}).get('videoStrip', {})
videos.extend([edge.get('node', {}) for edge in video_data.get('edges', [])])
cursor = video_data.get('pageInfo', {}).get('endCursor')
has_next = video_data.get('pageInfo', {}).get('hasNextPage', False)
time.sleep(0.3)
# Match old output: inject plot, total, and item_title
for idx, v in enumerate(videos):
v["plot"] = plot_text
v["total"] = total_videos
v["item_title"] = item_title
videos[idx] = v
return videos
def extract_imdb_mp4_url(video_id, best_trailer):
url, video = get_video_info(video_id)
return best_trailer, url, video
def get_video_info(viconst):
API_URL = "https://graphql.prod.api.imdb.a2z.com/"
HEADERS = {
'Referer': 'https://www.imdb.com/',
'Origin': 'https://www.imdb.com',
'User-Agent': 'Mozilla/5.0',
'Content-Type': 'application/json'
}
query = '''
query VideoPlayback(
$viconst: ID!
) {
video(id: $viconst) {
contentType {
displayName {
value
}
}
videoDimensions {
aspectRatio
}
...VideoInfo
...SharedVideoAllPlaybackUrls
}
}
fragment VideoInfo on Video {
name {
value
language
}
description {
value
language
}
primaryTitle {
genres {
genres {
text
}
}
...BaseTitleCard
}
}
fragment BaseTitleCard on Title {
id
titleText {
text
}
titleType {
id
text
canHaveEpisodes
displayableProperty {
value {
plainText
}
}
}
originalTitleText {
text
}
primaryImage {
id
width
height
url
caption {
plainText
}
}
releaseYear {
year
endYear
}
ratingsSummary {
aggregateRating
voteCount
}
runtime {
seconds
}
certificate {
rating
}
canRate {
isRatable
}
titleGenres {
genres(limit: 3) {
genre {
text
}
}
}
}
fragment SharedVideoAllPlaybackUrls on Video {
playbackURLs {
displayName {
value
language
}
videoMimeType
videoDefinition
url
}
}
'''
variables = {
"viconst": viconst,
"userAgent": "Mozilla/5.0",
"pageType": "VIDEO",
"subPageType": "VIDEO",
"viewportSize": {
"width": 1920,
"height": 1080
},
"autoStartVideo": False
}
response = requests.post(API_URL, headers=HEADERS, json={
"operationName": "VideoPlayback",
"query": query,
"variables": variables
})
data = response.json()
for i in data['data']['video']['playbackURLs']:
if i['videoMimeType'] == 'MP4':
url = i['url']
video = i
return url, video
def imdb_base_title_card(imdb_id: str):
url = "https://api.graphql.imdb.com/"
headers = {
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0"
}
query = """
query Title_Summary_Prompt_From_Base($id: ID!) {
title(id: $id) {
...BaseTitleCard
}
}
fragment BaseTitleCard on Title {
id
titleText { text }
titleType {
id
text
canHaveEpisodes
displayableProperty { value { plainText } }
}
originalTitleText { text }
primaryImage {
id
width
height
url
caption { plainText }
}
releaseYear { year endYear }
ratingsSummary { aggregateRating voteCount }
runtime { seconds }
certificate { rating }
canRate { isRatable }
titleGenres {
genres(limit: 3) {
genre { text }
}
}
}
"""
variables = {
"id": imdb_id
}
response = requests.post(url, json={"query": query, "variables": variables}, headers=headers)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Query failed with status code {response.status_code}: {response.text}")
def imdb_coming_soon(country: str, coming_soon_type: str):
import requests
API_URL = "https://graphql.prod.api.imdb.a2z.com/"
HEADERS = {
'Referer': 'https://www.imdb.com/',
'Origin': 'https://www.imdb.com',
'User-Agent': 'Mozilla/5.0',
'Content-Type': 'application/json'
}
fragment = """
fragment CalendarCard on Title {
id
titleText { text }
originalTitleText { text }
titleType { id text canHaveEpisodes }
isAdult
releaseDate {
year
month
day
country { id }
displayableProperty {
qualifiersInMarkdownList { plaidHtml }
}
}
releaseDates(
first: 10,
filter: {
countries: [$countryOverride],
wideRelease: WIDE_RELEASE_ONLY
}
) {
edges {
node {
year
month
day
country { id }
displayableProperty {
qualifiersInMarkdownList { plaidHtml }
}
}
}
}
primaryImage {
url
height
width
caption { plainText }
}
titleGenres {
genres(limit: 3) {
genre {
displayableProperty {
value { plainText }
}
}
}
}
principalCredits(filter: { categories: ["cast"] }) {
credits {
name {
id
nameText { text }
}
}
}
releaseYear { endYear year }
series {
displayableEpisodeNumber {
episodeNumber { text }
displayableSeason { season }
}
series {
titleText { text }
originalTitleText { text }
}
}
}
"""
query = f"""
query CalendarPage(
$comingSoonType: ComingSoonType!,
$regionOverride: String!,
$countryOverride: ID!,
$releasingOnOrAfter: Date!,
$releasingOnOrBefore: Date!,
$disablePopularityFilter: Boolean!,
$after: ID
) {{
comingSoon(
comingSoonType: $comingSoonType,
first: 250,
regionOverride: $regionOverride,
releasingOnOrAfter: $releasingOnOrAfter,
releasingOnOrBefore: $releasingOnOrBefore,
disablePopularityFilter: $disablePopularityFilter,
after: $after,
sort: [
{{ sortBy: RELEASE_DATE, sortOrder: ASC }},
{{ sortBy: POPULARITY, sortOrder: ASC }}
]
) {{
pageInfo {{
endCursor
hasNextPage
}}
edges {{
node {{
...CalendarCard
}}
}}
}}
}}
{fragment}
"""
today = datetime.utcnow().date()
end_date = today + timedelta(days=360)
variables = {
"comingSoonType": coming_soon_type,
"regionOverride": country,
"countryOverride": country,
"releasingOnOrAfter": today.isoformat(),
"releasingOnOrBefore": end_date.isoformat(),
"disablePopularityFilter": False,
"isInPace": False,
"after": None
}
all_titles = []
while True:
response = requests.post(API_URL, headers=HEADERS, json={
"operationName": "CalendarPage",
"query": query,
"variables": variables
})
print(response.text)
data = response.json()["data"]["comingSoon"]
titles = [edge["node"]["id"] for edge in data["edges"]]
all_titles.extend(titles)
if not data["pageInfo"]["hasNextPage"]:
break
variables["after"] = data["pageInfo"]["endCursor"]
return all_titles
titles = imdb_coming_soon(country="GB", coming_soon_type="MOVIE")
print(titles)
def get_imdb_language_api(imdb_id=None):
import requests
import json
url = "https://api.graphql.imdb.com/"
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"User-Agent": "Mozilla/5.0"
}
query = """
query ($id: ID!) {
title(id: $id) {
spokenLanguages {
spokenLanguages {
id
text
}
}
countriesOfOrigin {
countries(limit: 1) {
id
}
}
}
}
"""
variables = {"id": imdb_id}
payload = {"query": query, "variables": variables}
imdb_header = headers
response = requests.post(url, headers=headers, data=json.dumps(payload))
if response.status_code != 200:
return []
data = response.json()
spoken = data.get("data", {}).get("title", {}).get("spokenLanguages", {}).get("spokenLanguages", [])
countries = data.get("data", {}).get("title", {}).get("countriesOfOrigin", {}).get("countries", [])
language_list = [lang.get("text") for lang in spoken if "text" in lang]
english_speaking_countries = {"US", "UK", "GB", "CA", "AU", "NZ", "IE", "ZA"}
country_id = countries[0].get("id") if countries else ""
if "English" in language_list and country_id in english_speaking_countries:
language_list = ["English"] + [l for l in language_list if l != "English"]
results = language_list
return results
def imdb_movies_near_you(latitude,longitude,radius_meters,start_date,end_date,first=100,sort_by="POPULARITY",sort_order="ASC"):
import requests
API_URL = "https://graphql.prod.api.imdb.a2z.com/"
HEADERS = {
'Referer': 'https://www.imdb.com/',
'Origin': 'https://www.imdb.com',
'User-Agent': 'Mozilla/5.0',
'Content-Type': 'application/json'
}
query = '''
query MoviesNearYou($first: Int!, $sort: AdvancedTitleSearchSort!, $constraints: AdvancedTitleSearchConstraints!) {
advancedTitleSearch(first: $first, sort: $sort, constraints: $constraints) {
edges {
node {
title {
id
titleText { text }
ratingsSummary { aggregateRating voteCount }
releaseYear { year }
primaryImage { url }
}
}
}
total
}
}
'''
variables = {
"first": first,
"sort": {
"sortBy": sort_by,
"sortOrder": sort_order
},
"constraints": {
"inTheatersConstraint": {
"dateTimeRange": {
"start": f"{start_date}T00:00:00.000Z",
"end": f"{end_date}T00:00:00.000Z"
},
"location": {
"latLong": {
"lat": str(latitude),
"long": str(longitude)
},
"radiusInMeters": radius_meters
}
}
}
}
response = requests.post(API_URL, headers=HEADERS, json={
"operationName": "MoviesNearYou",
"query": query,
"variables": variables
})
def image_url(title_info):
try:
if "primaryImage" in title_info and "url" in title_info["primaryImage"]:
image_url = title_info["primaryImage"]["url"]
else:
image_url = ''
return image_url
except: return ''
data = response.json().get("data", {}).get("advancedTitleSearch", {})
movies = []
for edge in data.get("edges", []):
title_info = edge["node"]["title"]
movie = {
"id": title_info.get("id"),
"title": title_info.get("titleText", {}).get("text"),
"rating": title_info.get("ratingsSummary", {}).get("aggregateRating"),
"vote_count": title_info.get("ratingsSummary", {}).get("voteCount"),
"release_year": title_info.get("releaseYear", {}).get("year"),
"image_url": image_url(title_info)
}
movies.append(movie)
return movies
# Example usage
results = imdb_movies_near_you(
latitude=53.5,
longitude=-2.22,
radius_meters=80467,
start_date="2025-08-10",
end_date="2026-08-11"
)
for movie in results:
print(movie)
def imdb_top_1000():
import requests
API_URL = "https://graphql.prod.api.imdb.a2z.com/"
HEADERS = {
'Referer': 'https://www.imdb.com/',
'Origin': 'https://www.imdb.com',
'User-Agent': 'Mozilla/5.0',
'Content-Type': 'application/json'
}
query = '''
query AdvancedTitleSearch(
$first: Int!,
$after: String,
$originCountryConstraint: OriginCountrySearchConstraint,
$rankedTitleListConstraint: RankedTitleListSearchConstraint,
$sortBy: AdvancedTitleSearchSortBy!,
$sortOrder: SortOrder!
) {
advancedTitleSearch(
first: $first,
after: $after,
constraints: {
originCountryConstraint: $originCountryConstraint,
rankedTitleListConstraint: $rankedTitleListConstraint
},
sort: {
sortBy: $sortBy,
sortOrder: $sortOrder
}
) {
...AdvancedTitleSearchConnection
}
}
fragment AdvancedTitleSearchConnection on AdvancedTitleSearchConnection {
total
pageInfo {
endCursor
hasNextPage
}
edges {
node {
title {
id
titleText { text }
originalTitleText { text }
ratingsSummary { aggregateRating voteCount }
releaseYear { year }
primaryImage { url }
}
}
}
}
'''
variables = {
"first": 1000,
"after": None,
"locale": "en-US",
"originCountryConstraint": {
"excludeCountries": ["IN"]
},
"rankedTitleListConstraint": {
"allRankedTitleLists": [{
"rankRange": {"max": 1000},
"rankedTitleListType": "TOP_RATED_MOVIES"
}],
"excludeRankedTitleLists": []
},
"sortBy": "USER_RATING",
"sortOrder": "DESC"
}
all_titles = []
while True:
response = requests.post(API_URL, headers=HEADERS, json={
"operationName": "AdvancedTitleSearch",
"query": query,
"variables": variables
})
#print(response.text)
data = response.json()["data"]["advancedTitleSearch"]
titles = [edge["node"]["title"]["id"] for edge in data["edges"]]
all_titles.extend(titles)
if not data["pageInfo"]["hasNextPage"]:
break
variables["after"] = data["pageInfo"]["endCursor"]
return all_titles
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment