Created
June 8, 2024 04:34
-
-
Save treethought/b51f168ae54fe911bc2865dde8026fa6 to your computer and use it in GitHub Desktop.
farcaster unique channel casters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests # Import the requests library for making HTTP requests | |
import argparse # Import the argparse library for parsing command-line arguments | |
import os # Import the os library for accessing environment variables | |
import json | |
# Function to make a GET request to the Neynar API | |
def get(endpoint, args, api_key=None): | |
url = f"https://api.neynar.com/{endpoint}" # Construct the full URL for the endpoint | |
params = args.copy() # Copy the arguments to avoid modifying the original dictionary | |
if api_key is None: | |
api_key = os.getenv('NEYNAR_API_KEY') # Retrieve the API key from the environment variable | |
if api_key is None: | |
raise ValueError("API key not provided") | |
params["api_key"] = api_key # Add the API key to the parameters | |
print(params) | |
response = requests.get(url, params=params) # Make the GET request | |
if response.status_code != 200: | |
raise ValueError(f"HTTP error {response.status_code}: {response.text}") | |
return response.json() # Return the response as JSON | |
# Function to get user information by Farcaster username | |
def get_user(fc_username, api_key=None): | |
user_response = get( | |
"v1/farcaster/user-by-username", # Endpoint to get user by username | |
{ | |
"username": fc_username, # Parameter: Farcaster username | |
}, | |
api_key=api_key # Optional API key | |
) | |
if "result" not in user_response: | |
raise ValueError("No user found.") # Raise an error if no user is found | |
return user_response["result"]["user"] # Return the user information | |
# Function to get casts (posts) by a user with their Farcaster ID (fid) | |
def get_user_casts(fid, cursor='', api_key=None): | |
return get( | |
"v2/farcaster/feed", # Endpoint to get user's feed | |
{ | |
"feed_type": "filter", # Filter type for the feed | |
"filter_type": "fids", # Filter by fids (Farcaster IDs) | |
"fids": fid, # Parameter: user's Farcaster ID | |
"with_recasts": "false", # Exclude recasts | |
"cursor": cursor, # Pagination cursor | |
}, | |
api_key=api_key # Optional API key | |
) | |
def get_channel_feed(parent_url, channel_id, cursor='', api_key=None, include_recasts=False): | |
params = { | |
"feed_type": "filter", # Filter type for the feed | |
"filter_type": "parent_url", # Filter by channel_ids | |
"parent_url": parent_url, # Parameter: channel ID | |
"with_recasts": include_recasts, # Include recasts | |
"cursor": cursor, # Pagination cursor | |
# "limit": 1, | |
} | |
if channel_id != "": | |
params.pop("filter_type") | |
params.pop("parent_url") | |
params["filter_type"] = "channel_id" | |
params["channel_id"] = channel_id | |
return get( | |
"v2/farcaster/feed", params, | |
api_key=api_key # Optional API key | |
) | |
def get_channel_unique_users_by_parent_url(parent_url, cursor='', api_key=None, n=50, include_recasts=False): | |
users = {} | |
count = 0 | |
cursor = '' | |
while len(users) < n: | |
casts_response = get_channel_feed(parent_url, "", cursor, api_key, include_recasts) | |
if len(casts_response["casts"]) == 0: | |
return users | |
next_cursor = casts_response["next"]["cursor"] | |
if next_cursor == "": | |
break | |
cursor = next_cursor | |
for cast in casts_response["casts"]: | |
count += 1 | |
user = cast["author"] | |
if user["fid"] not in users: | |
users[user["fid"]] = user | |
if len(users) == n: | |
break | |
print(f'found {len(users)} unique users over {count} casts, continuing...') | |
return users | |
def get_channel_unique_users_by_channel_id(channel_id, cursor='', api_key=None, n=50, include_recasts=False): | |
users = {} | |
count = 0 | |
cursor = '' | |
while len(users) < n: | |
casts_response = get_channel_feed("", channel_id, cursor, api_key, include_recasts) | |
if len(casts_response["casts"]) == 0: | |
return users | |
next_cursor = casts_response["next"]["cursor"] | |
if next_cursor == cursor: | |
break | |
cursor = next_cursor | |
for cast in casts_response["casts"]: | |
count += 1 | |
user = cast["author"] | |
if user["fid"] not in users: | |
users[user["fid"]] = user | |
if len(users) == n: | |
break | |
print(f'found {len(users)} unique users over {count} casts, continuing...') | |
return users | |
# Function to search for channels | |
def get_channels(query, api_key=None): | |
return get( | |
"v2/farcaster/channel/search", # Endpoint to search channels | |
{ | |
"q": query, # Search query | |
}, | |
api_key=api_key # Optional API key | |
) | |
# Function to get a specific channel by query | |
def get_channel(query, api_key=None): | |
channels_response = get_channels(query, api_key) # Get channels matching the query | |
if len(channels_response["channels"]) == 0: | |
raise ValueError("No channel found.") # Raise an error if no channel is found | |
return channels_response["channels"][0] # Return the first matching channel | |
def display_unique_users(fc_channel_name): | |
channel = get_channel(fc_channel_name) | |
channel_parent_url = channel["parent_url"] | |
print(f'Getting unique users for channel {fc_channel_name}, parent_url: {channel_parent_url}, id: {channel["id"]}') | |
unique_users = get_channel_unique_users_by_parent_url(channel_parent_url, n=50, include_recasts=True) | |
if len(unique_users) == 0: | |
print("failed to get unique users by parent_url, trying by channel_id") | |
unique_users = get_channel_unique_users_by_channel_id(channel["id"], n=50, include_recasts=True) | |
print(f'\nFound {len(unique_users)} unique castrs for channel: {fc_channel_name}') | |
for user in unique_users.values(): | |
print(f'{user["username"]} (@{user["display_name"]}, fid: {user["fid"]})') | |
# Main function to get the last cast by a user in a specific channel | |
def main(fc_username, fc_channel_name): | |
if fc_username == "": | |
display_unique_users(fc_channel_name) | |
return | |
user = get_user(fc_username) # Get user information | |
fid = user["fid"] # Extract the user's Farcaster ID | |
channel = get_channel(fc_channel_name) # Get channel information | |
channel_parent_url = channel["parent_url"] # Extract the channel's parent URL | |
# Find the last cast by the user in the channel | |
found_cast = None | |
cursor = '' | |
while not found_cast: | |
casts_response = get_user_casts(fid, cursor=cursor) # Get user's casts | |
for cast in casts_response["casts"]: | |
if cast["parent_url"] == channel_parent_url: | |
found_cast = cast # Found the cast in the specified channel | |
cursor = casts_response["next"]["cursor"] # Update the cursor for pagination | |
# Print the details of the found cast | |
print(f'{fc_username} last casted in /{fc_channel_name} at {found_cast["timestamp"]}:') | |
print() | |
print(found_cast['text']) | |
print() | |
print('Hash:', found_cast['hash']) | |
print('Warpcast URL:', f'https://warpcast.com/{fc_username}/{found_cast["hash"][:8]}') | |
# Entry point of the script | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description='Get last cast by user in channel') # Create an argument parser | |
parser.add_argument('--fc_username', type=str, help='Farcaster username', default="", required=False) # Add Farcaster username argument | |
parser.add_argument('--fc_channel_name', type=str, nargs='?', default='plan', help='Channel name (default: plan)') # Add optional channel name argument with default value | |
args = parser.parse_args() # Parse the command-line arguments | |
main(args.fc_username, args.fc_channel_name) # Call the main function with the parsed arguments |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment