Skip to content

Instantly share code, notes, and snippets.

@kidGodzilla
Created July 6, 2025 00:07
Show Gist options
  • Save kidGodzilla/e444140d9ee85037475be22c82be9046 to your computer and use it in GitHub Desktop.
Save kidGodzilla/e444140d9ee85037475be22c82be9046 to your computer and use it in GitHub Desktop.
A Bluesky Discover Feed
#!/usr/bin/env node
require('dotenv').config();
const express = require('express');
const cors = require('cors');
const postgres = require('postgres');
const WebSocket = require('ws');
const fs = require('fs');
const path = require('path');
const { exec } = require('child_process');
const schedule = require('node-schedule');
const { DidResolver, MemoryCache } = require('@atproto/identity');
const xrpc = require('@atproto/xrpc-server');
const { verifyJwt, AuthRequiredError } = require('@atproto/xrpc-server');
const fetch = require('node-fetch');
// Configuration
const OUTPUT_DIR = process.env.OUTPUT_DIR || path.join(__dirname, 'csv_output');
const MAX_ROWS = process.env.MAX_ROWS || 10000;
const JETSTREAM_HOST = process.env.JETSTREAM_HOST || 'jetstream2.us-east.bsky.network';
const JETSTREAM_PARAMS = process.env.JETSTREAM_PARAMS || '?wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.feed.like&wantedCollections=app.bsky.feed.repost&wantedCollections=app.bsky.graph.follow';
const PORT = process.env.PORT || 5000;
const RECONNECT_DELAY_MS = 20_000;
// Database connection
const sql = postgres(process.env.DATABASE_URL || 'postgres://localhost:5432/bluesky_feed', {
onnotice: () => {}, // Suppress notices
});
// Cache and DID resolver
const didCache = new MemoryCache();
const didResolver = new DidResolver({
plcUrl: 'https://plc.directory',
didCache,
});
// Global variables
const csvWriters = [];
let lastCreatorJobId = null;
let isRecreatingPostLikeLookup = false;
let lastDedupe = null;
let lastDelete = null;
// Simple in-memory cache
class SimpleCache {
constructor() {
this.cache = new Map();
}
getKey(path, params = {}) {
return path + '_' + JSON.stringify(params);
}
get(key) {
const item = this.cache.get(key);
if (!item) return null;
if (Date.now() > item.expires) {
this.cache.delete(key);
return null;
}
return item.value;
}
set(key, value, ttlMs = 3600000) {
this.cache.set(key, {
value,
expires: Date.now() + ttlMs
});
}
delete(key) {
this.cache.delete(key);
}
clear() {
this.cache.clear();
}
}
const Cache = new SimpleCache();
// Utility functions
function formatTimestamp(ts) {
return new Date(ts).toISOString().replace(/\.\d{3}Z$/, 'Z');
}
function formatTimestampAsDate(ts) {
const d = new Date(ts);
return `${d.getUTCFullYear()}-${String(d.getUTCMonth() + 1).padStart(2, '0')}-${String(d.getUTCDate()).padStart(2, '0')}`;
}
function getPostId(did, rkey) {
const normalizedDid = did.replace('did:plc:', '');
return `${normalizedDid}/p/${rkey}`;
}
function getPostIdFromSubject(uri) {
let normalized = uri.replace('at://', '');
normalized = normalized.replace('did:plc:', '');
normalized = normalized.replace('/app.bsky.feed.post/', '/p/');
return normalized;
}
function normalizePostIdToAtUri(postId) {
if (!postId || typeof postId !== 'string') return postId;
postId = postId.replace('/p/', '/app.bsky.feed.post/');
if (postId.includes('://')) postId = postId.split('://')[1];
if (!postId.includes('did:')) postId = `did:plc:${postId}`;
postId = `at://${postId}`;
return postId;
}
function normalizeLang(s) {
if (!s || typeof s !== 'string') return '';
s = s.split(';')[0].toLowerCase();
const removables = ['-us', '-gb', '-jp', '-au', '-ca', '-cn', '-fr', '-de', '-in', '-it', '-es', '-ru', '-kr', '-mx', '-nl', '-se', '-ch', '-za', '-nz'];
for (const rm of removables) {
if (s.endsWith(rm)) {
s = s.replace(rm, '');
break;
}
}
return s;
}
function formatLabels(labels) {
if (labels.length === 1) return (labels[0] || '').replace(/,/g, '');
if (labels.length === 0) return '';
const priorities = ['porn', 'sexual', 'nudity', 'graphic-media'];
let label;
priorities.forEach(priorityLabel => {
if (labels.includes(priorityLabel) && !label) label = priorityLabel;
});
return (label || labels[0] || '').replace(/,/g, '');
}
// CSV Writer class
class BufferedCsvWriter {
constructor(filenameBase, headers) {
this.filenameBase = filenameBase;
this.headers = headers;
this.buffer = [];
this.rowCount = 0;
this.flushInterval = 5000;
this.created = Date.now();
this.writeCount = 0;
if (!fs.existsSync(OUTPUT_DIR)) {
fs.mkdirSync(OUTPUT_DIR, { recursive: true });
}
this.currentFilename = this.getTempFilename();
fs.writeFileSync(this.currentFilename, this.headers.join(',') + '\n');
this.intervalId = this.startFlushTimer();
}
getTempFilename() {
const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
return path.join(OUTPUT_DIR, `${this.filenameBase}_${timestamp}.tmp`);
}
getFinalFilename(tempFilename) {
return tempFilename.replace(/\.tmp$/, '.csv');
}
startFlushTimer() {
return setInterval(() => {
if (this.buffer.length > 0) {
this.flush();
}
}, this.flushInterval);
}
flush() {
try {
const data = this.buffer.join('\n') + '\n';
fs.appendFileSync(this.currentFilename, data);
const numRowsFlushed = data.split('\n').length - 1;
this.buffer = [];
this.rowCount += numRowsFlushed;
if (this.rowCount >= MAX_ROWS) {
fs.renameSync(this.currentFilename, this.getFinalFilename(this.currentFilename));
this.rowCount = 0;
this.currentFilename = this.getTempFilename();
fs.writeFileSync(this.currentFilename, this.headers.join(',') + '\n');
}
} catch (error) {
console.error('Error flushing data:', error);
}
}
writeRow(row) {
this.buffer.push(row.join(','));
this.writeCount++;
}
shutdown() {
clearInterval(this.intervalId);
if (this.buffer.length > 0) {
this.flush();
}
if (this.currentFilename.endsWith('.tmp')) {
fs.renameSync(this.currentFilename, this.getFinalFilename(this.currentFilename));
}
}
}
// Create CSV writers
const postWriter = new BufferedCsvWriter('posts', ['post_id', 'uid', 'ts', 'lang', 'type', 'labels']);
const likeWriter = new BufferedCsvWriter('likes', ['post_id', 'uid', 'date', 'processed_at']);
const repostWriter = new BufferedCsvWriter('reposts', ['post_id', 'uid', 'date', 'processed_at']);
const replyWriter = new BufferedCsvWriter('replies', ['post_id', 'uid', 'date', 'processed_at']);
const followWriter = new BufferedCsvWriter('follows', ['uid', 'subject', 'timestamp', 'is_create']);
csvWriters.push(postWriter, likeWriter, repostWriter, replyWriter, followWriter);
// Database initialization
const initDatabase = async () => {
try {
// Create roaring bitmap extension if it doesn't exist
await sql`CREATE EXTENSION IF NOT EXISTS roaringbitmap;`;
console.log('Roaring bitmap extension enabled');
// Create users table
await sql`
CREATE TABLE IF NOT EXISTS users (
uid TEXT PRIMARY KEY,
id SERIAL UNIQUE
);
`;
// Create posts table (no primary key - allows duplicates, deduplication happens later)
await sql`
CREATE TABLE IF NOT EXISTS posts (
post_id TEXT NOT NULL,
uid TEXT NOT NULL,
ts TIMESTAMPTZ NOT NULL,
lang TEXT,
type TEXT,
labels TEXT
);
`;
// Create likes table
await sql`
CREATE TABLE IF NOT EXISTS likes (
id SERIAL PRIMARY KEY,
post_id TEXT NOT NULL,
uid TEXT NOT NULL,
date DATE NOT NULL,
processed_at TIMESTAMPTZ DEFAULT now()
);
`;
// Create reposts table
await sql`
CREATE TABLE IF NOT EXISTS reposts (
id SERIAL PRIMARY KEY,
post_id TEXT NOT NULL,
uid TEXT NOT NULL,
date DATE NOT NULL,
processed_at TIMESTAMPTZ DEFAULT now()
);
`;
// Create replies table
await sql`
CREATE TABLE IF NOT EXISTS replies (
id SERIAL PRIMARY KEY,
post_id TEXT NOT NULL,
uid TEXT NOT NULL,
date DATE NOT NULL,
processed_at TIMESTAMPTZ DEFAULT now()
);
`;
// Create follows table
await sql`
CREATE TABLE IF NOT EXISTS follows (
id SERIAL PRIMARY KEY,
uid TEXT NOT NULL,
subject TEXT NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
is_create BOOLEAN NOT NULL
);
`;
// Create affinity table
await sql`
CREATE TABLE IF NOT EXISTS user_creator_affinity (
user_id TEXT NOT NULL,
creator_id TEXT NOT NULL,
type TEXT NOT NULL DEFAULT '',
inserted_at TIMESTAMPTZ DEFAULT now(),
last_interaction_at TIMESTAMPTZ DEFAULT now(),
last_touched TIMESTAMPTZ DEFAULT now(),
score NUMERIC,
action TEXT,
PRIMARY KEY (user_id, creator_id, type)
);
`;
// Create follows bitmap table for roaring bitmap storage
await sql`
CREATE TABLE IF NOT EXISTS follows_bitmap (
uid TEXT PRIMARY KEY,
bitmap BYTEA,
count INTEGER,
updated_date DATE
);
`;
// Create indexes
await sql`CREATE INDEX IF NOT EXISTS idx_likes_uid_date ON likes (uid, date);`;
await sql`CREATE INDEX IF NOT EXISTS idx_likes_post_id_date ON likes (post_id, date);`;
await sql`CREATE INDEX IF NOT EXISTS idx_posts_uid_ts ON posts (uid, ts);`;
await sql`CREATE INDEX IF NOT EXISTS idx_posts_type_ts ON posts (type, ts);`;
await sql`CREATE INDEX IF NOT EXISTS idx_user_creator_affinity_user_id ON user_creator_affinity (user_id);`;
await sql`CREATE INDEX IF NOT EXISTS idx_follows_uid ON follows (uid);`;
console.log('Database initialized successfully');
} catch (error) {
console.error('Error initializing database:', error);
}
};
// Post like totals recreation
async function recreatePostLikeLookup() {
if (isRecreatingPostLikeLookup) return;
isRecreatingPostLikeLookup = true;
console.log('Recreating post_like_totals');
let _startTime = Date.now();
try {
await sql`DROP TABLE IF EXISTS post_like_totals_new CASCADE;`;
await sql`DROP INDEX IF EXISTS idx_post_like_totals_creator_uid_new;`;
await sql`DROP INDEX IF EXISTS idx_plt_post_id_likes10_new;`;
await sql`
CREATE TABLE post_like_totals_new AS
SELECT
post_id,
split_part(post_id, '/p/', 1) AS creator_uid,
COUNT(*) AS total_likes
FROM likes
WHERE date >= CURRENT_DATE - INTERVAL '3 days'
GROUP BY post_id;
`;
await sql`ALTER TABLE post_like_totals_new ADD PRIMARY KEY (post_id);`;
await sql`CREATE INDEX idx_post_like_totals_creator_uid_new ON post_like_totals_new (creator_uid);`;
await sql`CREATE INDEX idx_plt_post_id_likes10_new ON post_like_totals_new (post_id) WHERE total_likes >= 10;`;
await sql`DROP TABLE IF EXISTS post_like_totals CASCADE;`;
await sql`ALTER TABLE post_like_totals_new RENAME TO post_like_totals;`;
let queryTime = ((Date.now() - _startTime) / 1000).toFixed(2);
console.log(`Successfully recreated post_like_totals in ${queryTime}s`);
} catch(e) {
console.log('Error recreating post_like_totals:', e);
}
isRecreatingPostLikeLookup = false;
}
// Dedupe function to remove duplicate posts/likes/reposts/replies
async function dedupe(table) {
console.log(`Deduplicating ${table}...`);
if (table === 'posts') {
// For posts, dedupe by post_id only (keep the most recent by timestamp)
await sql.unsafe(`
DELETE FROM ${table}
WHERE ctid IN (
SELECT ctid FROM (
SELECT ctid,
ROW_NUMBER() OVER (PARTITION BY post_id ORDER BY ts DESC) AS rn
FROM ${table}
) sub
WHERE rn > 1
);
`);
} else {
// For likes/reposts/replies, dedupe by post_id and uid
await sql.unsafe(`
DELETE FROM ${table}
WHERE ctid IN (
SELECT ctid FROM (
SELECT ctid,
ROW_NUMBER() OVER (PARTITION BY post_id, uid ORDER BY id DESC) AS rn
FROM ${table}
) sub
WHERE rn > 1
);
`);
}
console.log(`Finished deduplicating ${table}`);
}
// Batch delete function to maintain table sizes
async function batchDelete(table, column, interval, batchSize = 10000, whereExtra = '', maxIterations = 20, minThreshold = 500) {
let rowsDeleted;
let iterations = 0;
const extraClause = whereExtra ? `AND (${whereExtra})` : '';
do {
const result = await sql.unsafe(`
WITH del AS (
DELETE FROM ${table}
WHERE ctid IN (
SELECT ctid FROM ${table}
WHERE ${column} < now() - interval '${interval}'
${extraClause}
LIMIT ${batchSize}
)
RETURNING 1
)
SELECT count(*) FROM del;
`);
rowsDeleted = Number(result[0]?.count || 0);
console.log(`${rowsDeleted} rows deleted from ${table}`);
iterations++;
} while (rowsDeleted > minThreshold && iterations < maxIterations);
}
// Populate users table from activity data
async function populateUsersTable() {
try {
console.log('Populating users table from activity data...');
// Insert unique users from posts, likes, reposts, replies, and follows
await sql`
INSERT INTO users (uid)
SELECT DISTINCT uid FROM (
SELECT uid FROM posts
UNION
SELECT uid FROM likes
UNION
SELECT uid FROM reposts
UNION
SELECT uid FROM replies
UNION
SELECT uid FROM follows
UNION
SELECT subject as uid FROM follows
) AS all_users
ON CONFLICT (uid) DO NOTHING;
`;
const result = await sql`SELECT COUNT(*) as count FROM users;`;
console.log(`Users table now has ${result[0].count} users`);
} catch (error) {
console.error('Error populating users table:', error);
}
}
// Generate catchup feed for popular posts
async function generateCatchupFeed(options = {}) {
let cacheKey = Cache.getKey('/catch-up', options);
// Check cache first
const cachedResult = Cache.get(cacheKey);
async function getFeed(options) {
let { lang, type, labelFilter, limit = 100 } = options;
if (lang) lang = normalizeLang(lang);
limit = parseInt(limit);
const filterLabels = labelFilter
? labelFilter.split(',').map(label => label.trim())
: null;
// Get popular posts from the last 72 hours
let posts = await sql`
SELECT
p.post_id,
p.uid,
p.ts,
p.lang,
p.type,
p.labels,
plt.total_likes as like_count
FROM posts p
JOIN post_like_totals plt ON p.post_id = plt.post_id
WHERE p.ts >= now() - interval '72 hours'
AND plt.total_likes >= 10
${lang ? sql`AND p.lang = ${lang}` : sql``}
${type ? sql`AND p.type = ${type}` : sql``}
${filterLabels ? sql`AND NOT (p.labels = ANY(${filterLabels}))` : sql``}
ORDER BY plt.total_likes DESC, p.ts DESC
LIMIT ${limit};
`;
Cache.set(cacheKey, posts, 30 * 60 * 1000); // 30 minutes
return posts;
}
if (cachedResult) {
return cachedResult;
} else {
return await getFeed(options);
}
}
// Get follows for a user
async function getFollows(did, maxPages = 5) {
const baseUrl = 'https://public.api.bsky.app/xrpc/app.bsky.graph.getFollows';
let follows = [];
let cursor = null;
let pageCount = 0;
do {
if (pageCount >= maxPages) break;
let url = `${baseUrl}?actor=${encodeURIComponent(did)}&limit=100`;
if (cursor) url += `&cursor=${encodeURIComponent(cursor)}`;
try {
const response = await fetch(url);
if (!response.ok) break;
const data = await response.json();
follows = follows.concat(data.follows || []);
cursor = data.cursor;
pageCount++;
} catch (e) {
console.error('Error fetching follows:', e);
break;
}
} while (cursor);
return follows;
}
// Find likes for a user (used to build user preferences)
async function findLikesForUser(userId) {
try {
const result = await sql`
SELECT
split_part(post_id, '/p/', 1) AS creator_id,
COUNT(*) AS like_count
FROM likes
WHERE uid = ${ userId }
AND date >= CURRENT_DATE - INTERVAL '2 days'
GROUP BY creator_id
ORDER BY like_count DESC
LIMIT 1000;
`;
return result;
} catch(e) {
console.log('Error finding likes for user:', e);
return [];
}
}
// CORE COLLABORATIVE FILTERING FUNCTIONS
// These are the essential functions that make the collaborative filtering work
async function prepareCreatorSnapshot({ actionTable, entityA, jobId, whereClause, relCountFilter, isVideo = false }) {
// Sanitize jobId for safe table naming.
const safeJobId = jobId.replace(/[^a-zA-Z0-9_]/g, '');
// Build table names.
const creatorIndexTable = `"creator_index_${safeJobId}"`;
const userBitmapTable = `"user_bitmap_${safeJobId}"`;
// Conditional SQL for video feed vs. general.
let videoJoinClause = '';
if (isVideo) {
// Add join to the posts table with a video type filter.
videoJoinClause = `JOIN posts p ON l.post_id = p.post_id AND p.type = 'video'`;
}
// 1. Create a table mapping each creator to a dense index.
// We extract the creator uid from the post_id using split_part(post_id, '/p/', 1).
const createCreatorIndexSQL = `
CREATE TABLE ${creatorIndexTable} AS
SELECT creator_uid,
CAST(ROW_NUMBER() OVER (ORDER BY rel_count DESC) - 1 AS int4) AS creator_idx
FROM (
SELECT split_part(l.post_id, '/p/', 1) AS creator_uid, COUNT(*) AS rel_count
FROM ${actionTable} l
${videoJoinClause}
WHERE ${whereClause}
GROUP BY split_part(l.post_id, '/p/', 1)
) sub
WHERE ${relCountFilter};
`;
await sql.unsafe(createCreatorIndexSQL);
// 2. Create the user-to-bitmap table mapping each user (entityA) to a bitmap of the creators they have liked.
const createUserBitmapSQL = `
CREATE TABLE ${userBitmapTable} AS
WITH bitmap_cte AS (
SELECT uid,
rb_build(array_agg(creator_idx)::int4[]) AS bitmap
FROM (
SELECT l.uid, ci.creator_idx
FROM ${actionTable} l
JOIN ${creatorIndexTable} ci
ON split_part(l.post_id, '/p/', 1) = ci.creator_uid
${videoJoinClause}
WHERE ${whereClause}
) joined
GROUP BY uid
HAVING COUNT(*) BETWEEN 10 AND 500
)
SELECT uid, bitmap
FROM bitmap_cte;
`;
await sql.unsafe(createUserBitmapSQL);
console.log(`Creator snapshot tables for job ${jobId} have been created.`);
}
// Cleanup old bitmap tables
async function cleanup(jobId) {
const safeJobId = jobId.replace(/[^a-zA-Z0-9_]/g, '');
const userBitmapTable = `"user_bitmap_${safeJobId}"`;
const creatorIndexTable = `"creator_index_${safeJobId}"`;
try {
await sql.unsafe(`DROP TABLE IF EXISTS ${userBitmapTable}, ${creatorIndexTable};`);
console.log(`Cleaned up temporary snapshot tables for job ${jobId}`);
} catch (error) {
console.log(`Error cleaning up tables for job ${jobId}:`, error);
}
}
async function findRecommendedCreatorsByUserLikers(targetUserId, jobId, entityA = 'uid', limit = 100, userLimit = 200) {
const safeJobId = jobId.replace(/[^a-zA-Z0-9_]/g, '');
const userBitmapTable = `"user_bitmap_${safeJobId}"`;
const creatorIndexTable = `"creator_index_${safeJobId}"`;
// Get target user's bitmap
let targetBitmap;
const existing = await sql`
SELECT bitmap FROM ${sql.unsafe(userBitmapTable)}
WHERE uid = ${targetUserId}
LIMIT 1
`;
if (existing.length > 0) {
targetBitmap = existing[0].bitmap;
} else {
const build = await sql`
WITH user_creators AS (
SELECT ci.creator_idx
FROM likes l
JOIN ${sql.unsafe(creatorIndexTable)} ci
ON split_part(l.post_id, '/p/', 1) = ci.creator_uid
WHERE l.uid = ${targetUserId}
),
new_bitmap AS (
SELECT
COALESCE(rb_build(array_agg(creator_idx)::int4[]), rb_build('{}'::int4[])) AS bitmap
FROM user_creators
)
INSERT INTO ${sql.unsafe(userBitmapTable)} (uid, bitmap)
SELECT ${targetUserId}, bitmap
FROM new_bitmap
RETURNING bitmap
`;
if (build.length === 0) throw new Error('Failed to build bitmap');
targetBitmap = build[0].bitmap;
}
// Step 1: Get similar users using roaring bitmap operations
const similarUsers = await sql`
SELECT uid
FROM ${sql.unsafe(userBitmapTable)}
WHERE uid != ${targetUserId}
AND bitmap && ${targetBitmap}::roaringbitmap
ORDER BY rb_jaccard_dist(bitmap, ${targetBitmap}::roaringbitmap) ASC
LIMIT ${userLimit}
`;
if (similarUsers.length === 0) return [];
const similarUserIds = similarUsers.map(row => row.uid);
// Step 2: Get top recommended creators by z-scored like count
const recommended = await sql`
WITH creator_likes AS (
SELECT ci.creator_uid AS uid,
COUNT(*) AS like_count
FROM likes l
JOIN ${sql.unsafe(creatorIndexTable)} ci
ON split_part(l.post_id, '/p/', 1) = ci.creator_uid
WHERE l.uid = ANY(${sql.array(similarUserIds)})
GROUP BY ci.creator_uid
),
stats AS (
SELECT AVG(like_count) AS mean, STDDEV_POP(like_count) AS stddev FROM creator_likes
)
SELECT cl.uid,
cl.like_count,
CASE
WHEN stats.stddev > 0 THEN (cl.like_count - stats.mean) / stats.stddev / 100
ELSE 0
END AS similarity
FROM creator_likes cl, stats
ORDER BY similarity DESC
LIMIT ${limit}
`;
return recommended;
}
async function ensureCreatorIndex() {
const jobId = `creator_snapshot_${formatTimestampAsDate(Date.now())}`;
const safeJobId = jobId.replace(/[^a-zA-Z0-9_]/g, '');
// Check if table exists
const tableExists = await sql.unsafe(`
SELECT EXISTS (
SELECT 1
FROM pg_tables
WHERE schemaname = 'public'
AND tablename = 'user_bitmap_${safeJobId}'
);
`);
if (!tableExists[0].exists) {
if (lastCreatorJobId) return buildCreatorIndex();
else return await buildCreatorIndex();
} else {
if (!lastCreatorJobId) lastCreatorJobId = jobId;
return;
}
if (lastCreatorJobId && lastCreatorJobId !== jobId) return buildCreatorIndex();
else if (!lastCreatorJobId || lastCreatorJobId !== jobId) return await buildCreatorIndex();
}
async function buildCreatorIndex() {
try {
const jobId = `creator_snapshot_${formatTimestampAsDate(Date.now())}`;
await prepareCreatorSnapshot({
actionTable: 'likes',
entityA: 'uid',
entityB: 'post_id',
whereClause: "date >= now() - interval '3 days'",
relCountFilter: "rel_count BETWEEN 10 AND 500",
jobId,
});
lastCreatorJobId = jobId;
console.log('Finished preparing snapshot for likes:', jobId);
} catch (err) {
console.log('Error preparing snapshot:', err);
}
}
// Update follows bitmap with roaring bitmap operations
async function updateFollowsBitmap(uid, follows) {
const normalizedDids = follows
.map(f => f.did?.replace('did:plc:', ''))
.filter(Boolean);
if (normalizedDids.length === 0) return;
// Lookup internal IDs from the users table
const rows = await sql`
SELECT id FROM users WHERE uid IN (${sql.array(normalizedDids, 'text')})
`;
const followedIds = rows.map(r => r.id);
if (followedIds.length === 0) return;
await sql`
INSERT INTO follows_bitmap (uid, bitmap, count, updated_date)
VALUES (
${uid},
rb_build(${sql.array(followedIds)}::int[])::BYTEA,
${followedIds.length},
CURRENT_DATE
)
ON CONFLICT (uid) DO UPDATE SET
bitmap = rb_or(follows_bitmap.bitmap::roaringbitmap, EXCLUDED.bitmap::roaringbitmap)::BYTEA,
count = follows_bitmap.count + ${followedIds.length},
updated_date = CURRENT_DATE;
`;
}
// Get user affinities
async function getUserAffinities(userId, type = '') {
const cacheKey = Cache.getKey('/affinities', { userId, type });
let affinities = Cache.get(cacheKey);
if (!affinities) {
try {
affinities = await sql`
SELECT *
FROM user_creator_affinity
WHERE user_id = ${userId}
AND type = ${type}
AND (action IS DISTINCT FROM 'suppressed')
AND (
last_interaction_at >= now() - interval '7 days' OR
last_touched >= now() - interval '7 days' OR
action = 'forever' OR
action = 'following'
);
`;
Cache.set(cacheKey, affinities, 3600000);
} catch(e) {
console.log('Error getting user affinities:', e);
affinities = [];
}
}
return affinities.map(user => ({...user, uid: user.creator_id}));
}
// Main recommendation function with proper collaborative filtering
async function getRecommendedPostsForUser(userId, params = {}, queryParams = {}) {
const { post_limit = 2500, acceptLanguages = ['en'] } = params;
const { type } = queryParams;
let jobId = params.jobId || lastCreatorJobId;
try {
await ensureCreatorIndex();
// Get user's current affinities
let affinities = await getUserAffinities(userId, type);
// Find similar users and get recommended creators using collaborative filtering
let recommendedCreators = await findRecommendedCreatorsByUserLikers(userId, jobId, 'uid', 100);
// Get user's direct likes for affinity updates
let likedCreators = await findLikesForUser(userId);
// Update affinities based on collaborative filtering results
const likedCreatorsForInsert = likedCreators.filter(creator => Number(creator.like_count) >= 5);
try {
await Promise.all(likedCreatorsForInsert.map(creator =>
sql`
INSERT INTO user_creator_affinity (user_id, creator_id, type, last_interaction_at, last_touched, score)
VALUES (${userId}, ${creator.creator_id}, COALESCE(${type}, ''::text), now(), now(), 0.1)
ON CONFLICT (user_id, creator_id, type) DO UPDATE
SET last_interaction_at = now();
`
));
} catch (e) {
console.log('Error inserting/updating liked creators:', e);
}
// Process recommended creators from collaborative filtering
try {
await Promise.all(recommendedCreators.map(user =>
sql`
INSERT INTO user_creator_affinity (user_id, creator_id, type, last_interaction_at, last_touched, score)
VALUES (${userId}, ${user.uid}, COALESCE(${type}, ''::text), now(), now(), ${user.similarity})
ON CONFLICT (user_id, creator_id, type) DO UPDATE
SET last_touched = now(),
score = GREATEST(user_creator_affinity.score, ${user.similarity});
`
));
} catch (e) {
console.log('Error processing recommended creators:', e);
}
// Get final list of creators from updated affinities
affinities = await getUserAffinities(userId, type);
const creatorIds = affinities.map(a => a.creator_id);
if (creatorIds.length === 0) {
// Fallback to popular posts
const posts = await sql`
SELECT p.post_id, p.ts, p.lang, p.type, p.labels, plt.total_likes as like_count
FROM posts p
JOIN post_like_totals plt ON p.post_id = plt.post_id
WHERE p.ts >= now() - interval '1 day'
AND plt.total_likes >= 10
ORDER BY plt.total_likes DESC, p.ts DESC
LIMIT ${post_limit};
`;
return { posts };
}
// Get posts from recommended creators
const posts = await sql`
SELECT p.post_id, p.ts, p.lang, p.type, p.labels, COALESCE(plt.total_likes, 0) as like_count
FROM posts p
LEFT JOIN post_like_totals plt ON p.post_id = plt.post_id
WHERE p.uid = ANY(${sql.array(creatorIds)})
AND p.ts >= now() - interval '2 days'
AND p.type != 'reply'
ORDER BY COALESCE(plt.total_likes, 0) DESC, p.ts DESC
LIMIT ${post_limit};
`;
return { posts };
} catch(e) {
console.log('Error getting recommended posts:', e);
return { posts: [] };
}
}
// Discover algorithm
const algos = {
'discover': async (ctx, params = {}, requesterDid) => {
try {
const { limit = 50, cursor, acceptLanguages = ['en'] } = params;
let userId = (requesterDid || '').replace('did:plc:', '');
let { posts } = await getRecommendedPostsForUser(userId, params);
posts = posts.map(p => ({
post_id: normalizePostIdToAtUri(p.post_id),
ts: (+ new Date(p.ts)),
lang: p.lang,
type: p.type,
like_count: p.like_count,
label: p.labels
}));
// Filter by language
const langSet = new Set([...acceptLanguages, '', 'en']);
posts = posts.filter(p => !p.lang || langSet.has(p.lang));
// Sort by timestamp descending
const sortedPosts = posts.sort((a, b) => b.ts - a.ts);
let startIndex = 0;
if (cursor) {
const cursorNum = parseInt(cursor, 10);
startIndex = sortedPosts.findIndex(p => p.ts < cursorNum);
if (startIndex === -1) startIndex = sortedPosts.length;
}
const paginatedPosts = sortedPosts.slice(startIndex, startIndex + limit);
let newCursor;
if (startIndex + limit < sortedPosts.length) {
newCursor = paginatedPosts[paginatedPosts.length - 1].ts.toString();
}
const feedItems = paginatedPosts.map(p => ({ post: p.post_id }));
return {
cursor: newCursor,
feed: feedItems,
};
} catch(e) {
console.log('Error in discover algorithm:', e);
return { feed: [] };
}
},
'whats-hot': async (ctx, params = {}, requesterDid) => {
try {
const { limit = 50, cursor, acceptLanguages = ['en'] } = params;
let posts = await generateCatchupFeed({ limit: 10000, lang: acceptLanguages[0] || 'en' });
posts = posts.map(p => ({
post_id: normalizePostIdToAtUri(p.post_id),
ts: (+ new Date(p.ts))
}));
// Sort posts descending by timestamp (most recent first)
const sortedPosts = posts.sort((a, b) => b.ts - a.ts);
let startIndex = 0;
if (cursor) {
const cursorNum = parseInt(cursor, 10);
startIndex = sortedPosts.findIndex(p => p.ts < cursorNum);
if (startIndex === -1) startIndex = sortedPosts.length;
}
const paginatedPosts = sortedPosts.slice(startIndex, startIndex + limit);
let newCursor;
if (startIndex + limit < sortedPosts.length) {
newCursor = paginatedPosts[paginatedPosts.length - 1].ts.toString();
}
const feedItems = paginatedPosts.map(p => ({ post: p.post_id }));
return {
cursor: newCursor,
feed: feedItems,
};
} catch(e) {
console.log('Error in whats-hot algorithm:', e);
return { feed: [] };
}
}
};
// Jetstream connection
function connectToJetstream() {
const wsUrl = `wss://${JETSTREAM_HOST}/subscribe${JETSTREAM_PARAMS}`;
const ws = new WebSocket(wsUrl);
ws.on('open', () => {
console.log('Connected to Jetstream.');
});
ws.on('message', (data) => {
let msg;
try {
msg = JSON.parse(data);
} catch (e) {
console.error('Error parsing message:', data);
return;
}
if (msg.kind !== 'commit' || !msg.commit || msg.commit.operation !== 'create') {
return;
}
const collection = msg.commit.collection;
const commit = msg.commit;
const record = commit.record;
const uid = (msg.did || '').replace('did:plc:', '');
// Process posts
if (collection === 'app.bsky.feed.post') {
const postId = getPostId(msg.did, commit.rkey);
const ts = formatTimestamp(record.createdAt || Date.now());
const d = formatTimestampAsDate(record.createdAt || Date.now());
const lang = normalizeLang((record.langs && record.langs[0]) || '');
let type = 'text', labels = [], isReply = false;
if (record.reply && record.reply.parent && record.reply.root) {
isReply = true;
type = 'reply';
}
// Determine post type
if (record.embed && record.embed.$type) {
let inferredType = (record.embed.$type || '').replace('app.bsky.embed.', '');
if (!['record', 'recordWithMedia'].includes(inferredType)) type = inferredType;
}
// Determine labels
if (record.labels && record.labels.values) {
labels = record.labels.values.map(x => (x.val || '').trim().toLowerCase()).filter(x => !!x);
}
postWriter.writeRow([postId, uid, ts, lang, type, formatLabels(labels)]);
if (isReply) {
replyWriter.writeRow([postId, uid, d, null]);
}
} else if (collection === 'app.bsky.feed.like') {
const subjectUri = record.subject.uri;
const postId = getPostIdFromSubject(subjectUri);
const d = formatTimestampAsDate(record.createdAt);
if (postId && uid && d) likeWriter.writeRow([postId, uid, d, null]);
} else if (collection === 'app.bsky.feed.repost') {
const subjectUri = record.subject.uri;
const postId = getPostIdFromSubject(subjectUri);
const d = formatTimestampAsDate(record.createdAt);
if (postId && uid && d) repostWriter.writeRow([postId, uid, d, null]);
} else if (collection === 'app.bsky.graph.follow') {
let subject = (record.subject || '').replace('did:plc:', '');
const ts = formatTimestamp(record.createdAt || Date.now());
let isCreate = msg.commit.operation === 'create';
if (uid && subject && ts) followWriter.writeRow([uid, subject, ts, isCreate]);
}
});
ws.on('close', () => {
console.warn('Jetstream WebSocket closed. Reconnecting momentarily...');
setTimeout(connectToJetstream, RECONNECT_DELAY_MS);
});
ws.on('error', (err) => {
console.error('Jetstream WebSocket error:', err);
ws.close();
});
}
// AT Protocol server setup
const app = express();
app.use(cors());
app.use(express.json());
// DID document endpoint
app.get('/.well-known/did.json', (req, res) => {
const serviceDid = process.env.FEEDGEN_SERVICE_DID || 'did:web:example.com';
const hostname = process.env.FEEDGEN_HOSTNAME || 'localhost';
res.json({
'@context': ['https://www.w3.org/ns/did/v1'],
id: serviceDid,
service: [
{
id: '#bsky_fg',
type: 'BskyFeedGenerator',
serviceEndpoint: `https://${hostname}`,
},
],
});
});
// Create AT Protocol server
const server = xrpc.createServer([
{
lexicon: 1,
id: 'app.bsky.feed.getFeedSkeleton',
defs: {
main: {
type: 'query',
description: 'Get a skeleton of a feed provided by a feed generator',
parameters: {
type: 'params',
required: ['feed'],
properties: {
feed: {
type: 'string',
format: 'at-uri',
description: 'Reference (AT-URI) to feed generator record'
},
limit: {
type: 'integer',
minimum: 1,
maximum: 100,
default: 50
},
cursor: {
type: 'string'
}
}
},
output: {
encoding: 'application/json',
schema: {
type: 'object',
required: ['feed'],
properties: {
cursor: {
type: 'string'
},
feed: {
type: 'array',
items: {
type: 'object',
required: ['post'],
properties: {
post: {
type: 'string',
format: 'at-uri'
}
}
}
}
}
}
}
}
}
}
], {
validateAuth: async (req, serviceDid, didResolver) => {
const auth = req.headers.authorization;
if (!auth) throw new AuthRequiredError();
const jwt = auth.replace('Bearer ', '');
const result = await verifyJwt(jwt, serviceDid, didResolver);
return { credentials: { did: result.sub } };
},
authOptional: true,
});
// Feed skeleton endpoint
server.method('app.bsky.feed.getFeedSkeleton', async ({ auth, params }) => {
const requesterDid = auth?.credentials?.did;
const algo = algos[params.feed];
if (!algo) {
throw new Error(`Algorithm ${params.feed} not found`);
}
try {
const result = await algo(null, params, requesterDid);
return {
encoding: 'application/json',
body: result,
};
} catch (error) {
console.error('Error in getFeedSkeleton:', error);
return {
encoding: 'application/json',
body: { feed: [] },
};
}
});
// Mount AT Protocol server
app.use(server.router);
// Health check
app.get('/health', (req, res) => {
res.json({ status: 'ok', timestamp: new Date().toISOString() });
});
// Statistics endpoint
app.get('/stats', async (req, res) => {
try {
const stats = await sql`
SELECT
(SELECT COUNT(*) FROM posts) as posts_count,
(SELECT COUNT(*) FROM likes) as likes_count,
(SELECT COUNT(*) FROM users) as users_count,
(SELECT COUNT(*) FROM user_creator_affinity) as affinities_count;
`;
const csvStats = csvWriters.map(writer => ({
filename: writer.filenameBase,
writeCount: writer.writeCount,
timeElapsed: Math.round((Date.now() - writer.created) / 1000)
}));
res.json({
database: stats[0],
csvWriters: csvStats,
lastCreatorJobId
});
} catch (error) {
console.error('Error getting stats:', error);
res.status(500).json({ error: 'Failed to get stats' });
}
});
// Rebuild creator index endpoint (for debugging/maintenance)
app.get('/creators/rebuild', async (req, res) => {
let _startTime = Date.now();
lastCreatorJobId = null;
await ensureCreatorIndex();
let queryTime = ((Date.now() - _startTime) / 1000).toFixed(2);
res.send(`Finished rebuilding creator index in ${queryTime} seconds.`);
});
// Catchup feed endpoint (for debugging/testing)
app.get('/catch-up', async (req, res) => {
try {
let _startTime = Date.now();
let posts = await generateCatchupFeed(req.query);
let queryTime = ((Date.now() - _startTime) / 1000).toFixed(2);
res.json({ queryTime, posts });
} catch (err) {
console.error('Error fetching catchup feed:', err);
res.status(500).json({ error: err.message });
}
});
// Scheduled jobs
schedule.scheduleJob('*/15 * * * *', async function() {
console.log('Running scheduled post like totals recreation...');
await recreatePostLikeLookup();
});
schedule.scheduleJob('0 2 * * *', async function() {
console.log('Running daily creator index rebuild...');
lastCreatorJobId = null;
await ensureCreatorIndex();
});
// Data maintenance job - runs every 5 minutes
schedule.scheduleJob('*/5 * * * *', async function() {
try {
// Do deduplication once per hour, max
if (lastDedupe !== new Date().getHours()) {
console.log("Executing dedupe queries...");
await dedupe('posts');
await dedupe('likes');
await dedupe('replies');
await dedupe('reposts');
lastDedupe = new Date().getHours();
}
// Batch delete once per day
if (lastDelete !== new Date().getDate()) {
console.log("Running batch deletes to maintain table sizes...");
await batchDelete('likes', 'date', '4 days');
await batchDelete('replies', 'date', '7 days');
await batchDelete('reposts', 'date', '7 days');
await batchDelete('posts', 'ts', '4 days');
await batchDelete('follows', 'timestamp', '4 days');
await batchDelete('user_creator_affinity', 'last_interaction_at', '30 days', 10000, `action IS DISTINCT FROM 'forever' AND action IS DISTINCT FROM 'following'`);
console.log('Batch deletes completed.');
lastDelete = new Date().getDate();
// Cleanup old bitmap tables
const threeDaysAgoTimestamp = Date.now() - 3 * 24 * 60 * 60 * 1000;
let oldJobId = `creator_snapshot_${formatTimestampAsDate(threeDaysAgoTimestamp)}`;
await cleanup(oldJobId);
}
// Populate users table from activity data
await populateUsersTable();
// Warm up catchup feed
await generateCatchupFeed({ limit: 1000, lang: 'en' });
} catch (error) {
console.error('Error in maintenance job:', error);
}
});
// CSV Import functionality
function copyToPostgres(tableName, filePath, cols = '') {
return new Promise((resolve, reject) => {
const cmd = `psql ${process.env.DATABASE_URL} -c "\\copy ${tableName} ${cols} FROM '${filePath}' WITH (FORMAT csv, HEADER true, NULL '');"`;
const timeout = setTimeout(() => {
reject(new Error(`psql import timed out: ${filePath}`));
}, 120000); // 2 minute timeout
exec(cmd, (error, stdout, stderr) => {
clearTimeout(timeout);
if (error) {
console.error(`Error importing ${filePath}:`, error);
reject(error);
} else {
resolve(stdout);
}
});
});
}
// Import job variables
let importJob;
let runningImport = false;
async function importJobFunction() {
if (!sql) return;
if (runningImport) return;
console.log('Starting import job...');
runningImport = true;
try {
// Get CSV files in OUTPUT_DIR
const files = fs.readdirSync(OUTPUT_DIR);
const csvFiles = files.filter(file => file.endsWith('.csv'));
if (csvFiles.length === 0) {
console.log('No new files to import.');
runningImport = false;
return;
}
for (const file of csvFiles) {
const filePath = path.join(OUTPUT_DIR, file);
const tableName = file.split('_')[0]; // Extract table name from filename
try {
console.log(`Importing ${file} to ${tableName}...`);
// Handle different table types with appropriate columns
if (tableName === 'keywords') {
await copyToPostgres(tableName, filePath, '(post_id, keyword, date)');
} else if (tableName === 'follows') {
await copyToPostgres(tableName, filePath, '(uid, subject, timestamp, is_create)');
} else if (tableName === 'posts') {
await copyToPostgres(tableName, filePath, '(post_id, uid, ts, lang, type, labels)');
} else if (tableName === 'likes' || tableName === 'reposts' || tableName === 'replies') {
await copyToPostgres(tableName, filePath, '(post_id, uid, date, processed_at)');
} else {
// Fallback - try without column specification
await copyToPostgres(tableName, filePath);
}
// Archive the file after successful import
const archiveDir = path.join(__dirname, 'archive');
if (!fs.existsSync(archiveDir)) {
fs.mkdirSync(archiveDir, { recursive: true });
}
const archivePath = path.join(archiveDir, file);
fs.renameSync(filePath, archivePath);
console.log(`Successfully imported and archived ${file}`);
} catch (error) {
console.error(`Error importing ${file}:`, error);
// Move failed file to a failed directory for manual inspection
const failedDir = path.join(__dirname, 'failed_imports');
if (!fs.existsSync(failedDir)) {
fs.mkdirSync(failedDir, { recursive: true });
}
const failedPath = path.join(failedDir, file);
fs.renameSync(filePath, failedPath);
}
}
console.log("Import job completed.");
} catch (error) {
console.error("Error during import job:", error);
} finally {
runningImport = false;
}
}
// Start import job (runs every 5 minutes)
if (process.env.HALT_IMPORT !== 'true') {
importJob = setInterval(importJobFunction, 5 * 60 * 1000);
console.log('CSV import job scheduled to run every 5 minutes');
}
// Graceful shutdown
process.on('SIGINT', () => {
console.log('Shutting down gracefully...');
if (importJob) clearInterval(importJob);
csvWriters.forEach(writer => writer.shutdown());
setTimeout(() => {
process.exit(0);
}, 5000);
});
// Start the application
async function start() {
try {
await initDatabase();
// Start Jetstream listener
connectToJetstream();
// Start web server
app.listen(PORT, () => {
console.log(`πŸš€ Server running on port ${PORT}`);
console.log(`πŸ“Š Stats available at http://localhost:${PORT}/stats`);
console.log(`πŸ” Health check at http://localhost:${PORT}/health`);
console.log(`πŸ”§ Rebuild index at http://localhost:${PORT}/creators/rebuild`);
});
} catch (error) {
console.error('Failed to start application:', error);
process.exit(1);
}
}
start();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment