Skip to content

Instantly share code, notes, and snippets.

@treethought
Created June 8, 2024 04:34
Show Gist options
  • Save treethought/b51f168ae54fe911bc2865dde8026fa6 to your computer and use it in GitHub Desktop.
Save treethought/b51f168ae54fe911bc2865dde8026fa6 to your computer and use it in GitHub Desktop.
farcaster unique channel casters
import requests # Import the requests library for making HTTP requests
import argparse # Import the argparse library for parsing command-line arguments
import os # Import the os library for accessing environment variables
import json
# Function to make a GET request to the Neynar API
def get(endpoint, args, api_key=None):
url = f"https://api.neynar.com/{endpoint}" # Construct the full URL for the endpoint
params = args.copy() # Copy the arguments to avoid modifying the original dictionary
if api_key is None:
api_key = os.getenv('NEYNAR_API_KEY') # Retrieve the API key from the environment variable
if api_key is None:
raise ValueError("API key not provided")
params["api_key"] = api_key # Add the API key to the parameters
print(params)
response = requests.get(url, params=params) # Make the GET request
if response.status_code != 200:
raise ValueError(f"HTTP error {response.status_code}: {response.text}")
return response.json() # Return the response as JSON
# Function to get user information by Farcaster username
def get_user(fc_username, api_key=None):
user_response = get(
"v1/farcaster/user-by-username", # Endpoint to get user by username
{
"username": fc_username, # Parameter: Farcaster username
},
api_key=api_key # Optional API key
)
if "result" not in user_response:
raise ValueError("No user found.") # Raise an error if no user is found
return user_response["result"]["user"] # Return the user information
# Function to get casts (posts) by a user with their Farcaster ID (fid)
def get_user_casts(fid, cursor='', api_key=None):
return get(
"v2/farcaster/feed", # Endpoint to get user's feed
{
"feed_type": "filter", # Filter type for the feed
"filter_type": "fids", # Filter by fids (Farcaster IDs)
"fids": fid, # Parameter: user's Farcaster ID
"with_recasts": "false", # Exclude recasts
"cursor": cursor, # Pagination cursor
},
api_key=api_key # Optional API key
)
def get_channel_feed(parent_url, channel_id, cursor='', api_key=None, include_recasts=False):
params = {
"feed_type": "filter", # Filter type for the feed
"filter_type": "parent_url", # Filter by channel_ids
"parent_url": parent_url, # Parameter: channel ID
"with_recasts": include_recasts, # Include recasts
"cursor": cursor, # Pagination cursor
# "limit": 1,
}
if channel_id != "":
params.pop("filter_type")
params.pop("parent_url")
params["filter_type"] = "channel_id"
params["channel_id"] = channel_id
return get(
"v2/farcaster/feed", params,
api_key=api_key # Optional API key
)
def get_channel_unique_users_by_parent_url(parent_url, cursor='', api_key=None, n=50, include_recasts=False):
users = {}
count = 0
cursor = ''
while len(users) < n:
casts_response = get_channel_feed(parent_url, "", cursor, api_key, include_recasts)
if len(casts_response["casts"]) == 0:
return users
next_cursor = casts_response["next"]["cursor"]
if next_cursor == "":
break
cursor = next_cursor
for cast in casts_response["casts"]:
count += 1
user = cast["author"]
if user["fid"] not in users:
users[user["fid"]] = user
if len(users) == n:
break
print(f'found {len(users)} unique users over {count} casts, continuing...')
return users
def get_channel_unique_users_by_channel_id(channel_id, cursor='', api_key=None, n=50, include_recasts=False):
users = {}
count = 0
cursor = ''
while len(users) < n:
casts_response = get_channel_feed("", channel_id, cursor, api_key, include_recasts)
if len(casts_response["casts"]) == 0:
return users
next_cursor = casts_response["next"]["cursor"]
if next_cursor == cursor:
break
cursor = next_cursor
for cast in casts_response["casts"]:
count += 1
user = cast["author"]
if user["fid"] not in users:
users[user["fid"]] = user
if len(users) == n:
break
print(f'found {len(users)} unique users over {count} casts, continuing...')
return users
# Function to search for channels
def get_channels(query, api_key=None):
return get(
"v2/farcaster/channel/search", # Endpoint to search channels
{
"q": query, # Search query
},
api_key=api_key # Optional API key
)
# Function to get a specific channel by query
def get_channel(query, api_key=None):
channels_response = get_channels(query, api_key) # Get channels matching the query
if len(channels_response["channels"]) == 0:
raise ValueError("No channel found.") # Raise an error if no channel is found
return channels_response["channels"][0] # Return the first matching channel
def display_unique_users(fc_channel_name):
channel = get_channel(fc_channel_name)
channel_parent_url = channel["parent_url"]
print(f'Getting unique users for channel {fc_channel_name}, parent_url: {channel_parent_url}, id: {channel["id"]}')
unique_users = get_channel_unique_users_by_parent_url(channel_parent_url, n=50, include_recasts=True)
if len(unique_users) == 0:
print("failed to get unique users by parent_url, trying by channel_id")
unique_users = get_channel_unique_users_by_channel_id(channel["id"], n=50, include_recasts=True)
print(f'\nFound {len(unique_users)} unique castrs for channel: {fc_channel_name}')
for user in unique_users.values():
print(f'{user["username"]} (@{user["display_name"]}, fid: {user["fid"]})')
# Main function to get the last cast by a user in a specific channel
def main(fc_username, fc_channel_name):
if fc_username == "":
display_unique_users(fc_channel_name)
return
user = get_user(fc_username) # Get user information
fid = user["fid"] # Extract the user's Farcaster ID
channel = get_channel(fc_channel_name) # Get channel information
channel_parent_url = channel["parent_url"] # Extract the channel's parent URL
# Find the last cast by the user in the channel
found_cast = None
cursor = ''
while not found_cast:
casts_response = get_user_casts(fid, cursor=cursor) # Get user's casts
for cast in casts_response["casts"]:
if cast["parent_url"] == channel_parent_url:
found_cast = cast # Found the cast in the specified channel
cursor = casts_response["next"]["cursor"] # Update the cursor for pagination
# Print the details of the found cast
print(f'{fc_username} last casted in /{fc_channel_name} at {found_cast["timestamp"]}:')
print()
print(found_cast['text'])
print()
print('Hash:', found_cast['hash'])
print('Warpcast URL:', f'https://warpcast.com/{fc_username}/{found_cast["hash"][:8]}')
# Entry point of the script
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Get last cast by user in channel') # Create an argument parser
parser.add_argument('--fc_username', type=str, help='Farcaster username', default="", required=False) # Add Farcaster username argument
parser.add_argument('--fc_channel_name', type=str, nargs='?', default='plan', help='Channel name (default: plan)') # Add optional channel name argument with default value
args = parser.parse_args() # Parse the command-line arguments
main(args.fc_username, args.fc_channel_name) # Call the main function with the parsed arguments
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment