Created
July 6, 2025 00:07
-
-
Save kidGodzilla/e444140d9ee85037475be22c82be9046 to your computer and use it in GitHub Desktop.
A Bluesky Discover Feed
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env node | |
require('dotenv').config(); | |
const express = require('express'); | |
const cors = require('cors'); | |
const postgres = require('postgres'); | |
const WebSocket = require('ws'); | |
const fs = require('fs'); | |
const path = require('path'); | |
const { exec } = require('child_process'); | |
const schedule = require('node-schedule'); | |
const { DidResolver, MemoryCache } = require('@atproto/identity'); | |
const xrpc = require('@atproto/xrpc-server'); | |
const { verifyJwt, AuthRequiredError } = require('@atproto/xrpc-server'); | |
const fetch = require('node-fetch'); | |
// Configuration | |
const OUTPUT_DIR = process.env.OUTPUT_DIR || path.join(__dirname, 'csv_output'); | |
const MAX_ROWS = process.env.MAX_ROWS || 10000; | |
const JETSTREAM_HOST = process.env.JETSTREAM_HOST || 'jetstream2.us-east.bsky.network'; | |
const JETSTREAM_PARAMS = process.env.JETSTREAM_PARAMS || '?wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.feed.like&wantedCollections=app.bsky.feed.repost&wantedCollections=app.bsky.graph.follow'; | |
const PORT = process.env.PORT || 5000; | |
const RECONNECT_DELAY_MS = 20_000; | |
// Database connection | |
const sql = postgres(process.env.DATABASE_URL || 'postgres://localhost:5432/bluesky_feed', { | |
onnotice: () => {}, // Suppress notices | |
}); | |
// Cache and DID resolver | |
const didCache = new MemoryCache(); | |
const didResolver = new DidResolver({ | |
plcUrl: 'https://plc.directory', | |
didCache, | |
}); | |
// Global variables | |
const csvWriters = []; | |
let lastCreatorJobId = null; | |
let isRecreatingPostLikeLookup = false; | |
let lastDedupe = null; | |
let lastDelete = null; | |
// Simple in-memory cache | |
class SimpleCache { | |
constructor() { | |
this.cache = new Map(); | |
} | |
getKey(path, params = {}) { | |
return path + '_' + JSON.stringify(params); | |
} | |
get(key) { | |
const item = this.cache.get(key); | |
if (!item) return null; | |
if (Date.now() > item.expires) { | |
this.cache.delete(key); | |
return null; | |
} | |
return item.value; | |
} | |
set(key, value, ttlMs = 3600000) { | |
this.cache.set(key, { | |
value, | |
expires: Date.now() + ttlMs | |
}); | |
} | |
delete(key) { | |
this.cache.delete(key); | |
} | |
clear() { | |
this.cache.clear(); | |
} | |
} | |
const Cache = new SimpleCache(); | |
// Utility functions | |
function formatTimestamp(ts) { | |
return new Date(ts).toISOString().replace(/\.\d{3}Z$/, 'Z'); | |
} | |
function formatTimestampAsDate(ts) { | |
const d = new Date(ts); | |
return `${d.getUTCFullYear()}-${String(d.getUTCMonth() + 1).padStart(2, '0')}-${String(d.getUTCDate()).padStart(2, '0')}`; | |
} | |
function getPostId(did, rkey) { | |
const normalizedDid = did.replace('did:plc:', ''); | |
return `${normalizedDid}/p/${rkey}`; | |
} | |
function getPostIdFromSubject(uri) { | |
let normalized = uri.replace('at://', ''); | |
normalized = normalized.replace('did:plc:', ''); | |
normalized = normalized.replace('/app.bsky.feed.post/', '/p/'); | |
return normalized; | |
} | |
function normalizePostIdToAtUri(postId) { | |
if (!postId || typeof postId !== 'string') return postId; | |
postId = postId.replace('/p/', '/app.bsky.feed.post/'); | |
if (postId.includes('://')) postId = postId.split('://')[1]; | |
if (!postId.includes('did:')) postId = `did:plc:${postId}`; | |
postId = `at://${postId}`; | |
return postId; | |
} | |
function normalizeLang(s) { | |
if (!s || typeof s !== 'string') return ''; | |
s = s.split(';')[0].toLowerCase(); | |
const removables = ['-us', '-gb', '-jp', '-au', '-ca', '-cn', '-fr', '-de', '-in', '-it', '-es', '-ru', '-kr', '-mx', '-nl', '-se', '-ch', '-za', '-nz']; | |
for (const rm of removables) { | |
if (s.endsWith(rm)) { | |
s = s.replace(rm, ''); | |
break; | |
} | |
} | |
return s; | |
} | |
function formatLabels(labels) { | |
if (labels.length === 1) return (labels[0] || '').replace(/,/g, ''); | |
if (labels.length === 0) return ''; | |
const priorities = ['porn', 'sexual', 'nudity', 'graphic-media']; | |
let label; | |
priorities.forEach(priorityLabel => { | |
if (labels.includes(priorityLabel) && !label) label = priorityLabel; | |
}); | |
return (label || labels[0] || '').replace(/,/g, ''); | |
} | |
// CSV Writer class | |
class BufferedCsvWriter { | |
constructor(filenameBase, headers) { | |
this.filenameBase = filenameBase; | |
this.headers = headers; | |
this.buffer = []; | |
this.rowCount = 0; | |
this.flushInterval = 5000; | |
this.created = Date.now(); | |
this.writeCount = 0; | |
if (!fs.existsSync(OUTPUT_DIR)) { | |
fs.mkdirSync(OUTPUT_DIR, { recursive: true }); | |
} | |
this.currentFilename = this.getTempFilename(); | |
fs.writeFileSync(this.currentFilename, this.headers.join(',') + '\n'); | |
this.intervalId = this.startFlushTimer(); | |
} | |
getTempFilename() { | |
const timestamp = new Date().toISOString().replace(/[:.]/g, '-'); | |
return path.join(OUTPUT_DIR, `${this.filenameBase}_${timestamp}.tmp`); | |
} | |
getFinalFilename(tempFilename) { | |
return tempFilename.replace(/\.tmp$/, '.csv'); | |
} | |
startFlushTimer() { | |
return setInterval(() => { | |
if (this.buffer.length > 0) { | |
this.flush(); | |
} | |
}, this.flushInterval); | |
} | |
flush() { | |
try { | |
const data = this.buffer.join('\n') + '\n'; | |
fs.appendFileSync(this.currentFilename, data); | |
const numRowsFlushed = data.split('\n').length - 1; | |
this.buffer = []; | |
this.rowCount += numRowsFlushed; | |
if (this.rowCount >= MAX_ROWS) { | |
fs.renameSync(this.currentFilename, this.getFinalFilename(this.currentFilename)); | |
this.rowCount = 0; | |
this.currentFilename = this.getTempFilename(); | |
fs.writeFileSync(this.currentFilename, this.headers.join(',') + '\n'); | |
} | |
} catch (error) { | |
console.error('Error flushing data:', error); | |
} | |
} | |
writeRow(row) { | |
this.buffer.push(row.join(',')); | |
this.writeCount++; | |
} | |
shutdown() { | |
clearInterval(this.intervalId); | |
if (this.buffer.length > 0) { | |
this.flush(); | |
} | |
if (this.currentFilename.endsWith('.tmp')) { | |
fs.renameSync(this.currentFilename, this.getFinalFilename(this.currentFilename)); | |
} | |
} | |
} | |
// Create CSV writers | |
const postWriter = new BufferedCsvWriter('posts', ['post_id', 'uid', 'ts', 'lang', 'type', 'labels']); | |
const likeWriter = new BufferedCsvWriter('likes', ['post_id', 'uid', 'date', 'processed_at']); | |
const repostWriter = new BufferedCsvWriter('reposts', ['post_id', 'uid', 'date', 'processed_at']); | |
const replyWriter = new BufferedCsvWriter('replies', ['post_id', 'uid', 'date', 'processed_at']); | |
const followWriter = new BufferedCsvWriter('follows', ['uid', 'subject', 'timestamp', 'is_create']); | |
csvWriters.push(postWriter, likeWriter, repostWriter, replyWriter, followWriter); | |
// Database initialization | |
const initDatabase = async () => { | |
try { | |
// Create roaring bitmap extension if it doesn't exist | |
await sql`CREATE EXTENSION IF NOT EXISTS roaringbitmap;`; | |
console.log('Roaring bitmap extension enabled'); | |
// Create users table | |
await sql` | |
CREATE TABLE IF NOT EXISTS users ( | |
uid TEXT PRIMARY KEY, | |
id SERIAL UNIQUE | |
); | |
`; | |
// Create posts table (no primary key - allows duplicates, deduplication happens later) | |
await sql` | |
CREATE TABLE IF NOT EXISTS posts ( | |
post_id TEXT NOT NULL, | |
uid TEXT NOT NULL, | |
ts TIMESTAMPTZ NOT NULL, | |
lang TEXT, | |
type TEXT, | |
labels TEXT | |
); | |
`; | |
// Create likes table | |
await sql` | |
CREATE TABLE IF NOT EXISTS likes ( | |
id SERIAL PRIMARY KEY, | |
post_id TEXT NOT NULL, | |
uid TEXT NOT NULL, | |
date DATE NOT NULL, | |
processed_at TIMESTAMPTZ DEFAULT now() | |
); | |
`; | |
// Create reposts table | |
await sql` | |
CREATE TABLE IF NOT EXISTS reposts ( | |
id SERIAL PRIMARY KEY, | |
post_id TEXT NOT NULL, | |
uid TEXT NOT NULL, | |
date DATE NOT NULL, | |
processed_at TIMESTAMPTZ DEFAULT now() | |
); | |
`; | |
// Create replies table | |
await sql` | |
CREATE TABLE IF NOT EXISTS replies ( | |
id SERIAL PRIMARY KEY, | |
post_id TEXT NOT NULL, | |
uid TEXT NOT NULL, | |
date DATE NOT NULL, | |
processed_at TIMESTAMPTZ DEFAULT now() | |
); | |
`; | |
// Create follows table | |
await sql` | |
CREATE TABLE IF NOT EXISTS follows ( | |
id SERIAL PRIMARY KEY, | |
uid TEXT NOT NULL, | |
subject TEXT NOT NULL, | |
timestamp TIMESTAMPTZ NOT NULL, | |
is_create BOOLEAN NOT NULL | |
); | |
`; | |
// Create affinity table | |
await sql` | |
CREATE TABLE IF NOT EXISTS user_creator_affinity ( | |
user_id TEXT NOT NULL, | |
creator_id TEXT NOT NULL, | |
type TEXT NOT NULL DEFAULT '', | |
inserted_at TIMESTAMPTZ DEFAULT now(), | |
last_interaction_at TIMESTAMPTZ DEFAULT now(), | |
last_touched TIMESTAMPTZ DEFAULT now(), | |
score NUMERIC, | |
action TEXT, | |
PRIMARY KEY (user_id, creator_id, type) | |
); | |
`; | |
// Create follows bitmap table for roaring bitmap storage | |
await sql` | |
CREATE TABLE IF NOT EXISTS follows_bitmap ( | |
uid TEXT PRIMARY KEY, | |
bitmap BYTEA, | |
count INTEGER, | |
updated_date DATE | |
); | |
`; | |
// Create indexes | |
await sql`CREATE INDEX IF NOT EXISTS idx_likes_uid_date ON likes (uid, date);`; | |
await sql`CREATE INDEX IF NOT EXISTS idx_likes_post_id_date ON likes (post_id, date);`; | |
await sql`CREATE INDEX IF NOT EXISTS idx_posts_uid_ts ON posts (uid, ts);`; | |
await sql`CREATE INDEX IF NOT EXISTS idx_posts_type_ts ON posts (type, ts);`; | |
await sql`CREATE INDEX IF NOT EXISTS idx_user_creator_affinity_user_id ON user_creator_affinity (user_id);`; | |
await sql`CREATE INDEX IF NOT EXISTS idx_follows_uid ON follows (uid);`; | |
console.log('Database initialized successfully'); | |
} catch (error) { | |
console.error('Error initializing database:', error); | |
} | |
}; | |
// Post like totals recreation | |
async function recreatePostLikeLookup() { | |
if (isRecreatingPostLikeLookup) return; | |
isRecreatingPostLikeLookup = true; | |
console.log('Recreating post_like_totals'); | |
let _startTime = Date.now(); | |
try { | |
await sql`DROP TABLE IF EXISTS post_like_totals_new CASCADE;`; | |
await sql`DROP INDEX IF EXISTS idx_post_like_totals_creator_uid_new;`; | |
await sql`DROP INDEX IF EXISTS idx_plt_post_id_likes10_new;`; | |
await sql` | |
CREATE TABLE post_like_totals_new AS | |
SELECT | |
post_id, | |
split_part(post_id, '/p/', 1) AS creator_uid, | |
COUNT(*) AS total_likes | |
FROM likes | |
WHERE date >= CURRENT_DATE - INTERVAL '3 days' | |
GROUP BY post_id; | |
`; | |
await sql`ALTER TABLE post_like_totals_new ADD PRIMARY KEY (post_id);`; | |
await sql`CREATE INDEX idx_post_like_totals_creator_uid_new ON post_like_totals_new (creator_uid);`; | |
await sql`CREATE INDEX idx_plt_post_id_likes10_new ON post_like_totals_new (post_id) WHERE total_likes >= 10;`; | |
await sql`DROP TABLE IF EXISTS post_like_totals CASCADE;`; | |
await sql`ALTER TABLE post_like_totals_new RENAME TO post_like_totals;`; | |
let queryTime = ((Date.now() - _startTime) / 1000).toFixed(2); | |
console.log(`Successfully recreated post_like_totals in ${queryTime}s`); | |
} catch(e) { | |
console.log('Error recreating post_like_totals:', e); | |
} | |
isRecreatingPostLikeLookup = false; | |
} | |
// Dedupe function to remove duplicate posts/likes/reposts/replies | |
async function dedupe(table) { | |
console.log(`Deduplicating ${table}...`); | |
if (table === 'posts') { | |
// For posts, dedupe by post_id only (keep the most recent by timestamp) | |
await sql.unsafe(` | |
DELETE FROM ${table} | |
WHERE ctid IN ( | |
SELECT ctid FROM ( | |
SELECT ctid, | |
ROW_NUMBER() OVER (PARTITION BY post_id ORDER BY ts DESC) AS rn | |
FROM ${table} | |
) sub | |
WHERE rn > 1 | |
); | |
`); | |
} else { | |
// For likes/reposts/replies, dedupe by post_id and uid | |
await sql.unsafe(` | |
DELETE FROM ${table} | |
WHERE ctid IN ( | |
SELECT ctid FROM ( | |
SELECT ctid, | |
ROW_NUMBER() OVER (PARTITION BY post_id, uid ORDER BY id DESC) AS rn | |
FROM ${table} | |
) sub | |
WHERE rn > 1 | |
); | |
`); | |
} | |
console.log(`Finished deduplicating ${table}`); | |
} | |
// Batch delete function to maintain table sizes | |
async function batchDelete(table, column, interval, batchSize = 10000, whereExtra = '', maxIterations = 20, minThreshold = 500) { | |
let rowsDeleted; | |
let iterations = 0; | |
const extraClause = whereExtra ? `AND (${whereExtra})` : ''; | |
do { | |
const result = await sql.unsafe(` | |
WITH del AS ( | |
DELETE FROM ${table} | |
WHERE ctid IN ( | |
SELECT ctid FROM ${table} | |
WHERE ${column} < now() - interval '${interval}' | |
${extraClause} | |
LIMIT ${batchSize} | |
) | |
RETURNING 1 | |
) | |
SELECT count(*) FROM del; | |
`); | |
rowsDeleted = Number(result[0]?.count || 0); | |
console.log(`${rowsDeleted} rows deleted from ${table}`); | |
iterations++; | |
} while (rowsDeleted > minThreshold && iterations < maxIterations); | |
} | |
// Populate users table from activity data | |
async function populateUsersTable() { | |
try { | |
console.log('Populating users table from activity data...'); | |
// Insert unique users from posts, likes, reposts, replies, and follows | |
await sql` | |
INSERT INTO users (uid) | |
SELECT DISTINCT uid FROM ( | |
SELECT uid FROM posts | |
UNION | |
SELECT uid FROM likes | |
UNION | |
SELECT uid FROM reposts | |
UNION | |
SELECT uid FROM replies | |
UNION | |
SELECT uid FROM follows | |
UNION | |
SELECT subject as uid FROM follows | |
) AS all_users | |
ON CONFLICT (uid) DO NOTHING; | |
`; | |
const result = await sql`SELECT COUNT(*) as count FROM users;`; | |
console.log(`Users table now has ${result[0].count} users`); | |
} catch (error) { | |
console.error('Error populating users table:', error); | |
} | |
} | |
// Generate catchup feed for popular posts | |
async function generateCatchupFeed(options = {}) { | |
let cacheKey = Cache.getKey('/catch-up', options); | |
// Check cache first | |
const cachedResult = Cache.get(cacheKey); | |
async function getFeed(options) { | |
let { lang, type, labelFilter, limit = 100 } = options; | |
if (lang) lang = normalizeLang(lang); | |
limit = parseInt(limit); | |
const filterLabels = labelFilter | |
? labelFilter.split(',').map(label => label.trim()) | |
: null; | |
// Get popular posts from the last 72 hours | |
let posts = await sql` | |
SELECT | |
p.post_id, | |
p.uid, | |
p.ts, | |
p.lang, | |
p.type, | |
p.labels, | |
plt.total_likes as like_count | |
FROM posts p | |
JOIN post_like_totals plt ON p.post_id = plt.post_id | |
WHERE p.ts >= now() - interval '72 hours' | |
AND plt.total_likes >= 10 | |
${lang ? sql`AND p.lang = ${lang}` : sql``} | |
${type ? sql`AND p.type = ${type}` : sql``} | |
${filterLabels ? sql`AND NOT (p.labels = ANY(${filterLabels}))` : sql``} | |
ORDER BY plt.total_likes DESC, p.ts DESC | |
LIMIT ${limit}; | |
`; | |
Cache.set(cacheKey, posts, 30 * 60 * 1000); // 30 minutes | |
return posts; | |
} | |
if (cachedResult) { | |
return cachedResult; | |
} else { | |
return await getFeed(options); | |
} | |
} | |
// Get follows for a user | |
async function getFollows(did, maxPages = 5) { | |
const baseUrl = 'https://public.api.bsky.app/xrpc/app.bsky.graph.getFollows'; | |
let follows = []; | |
let cursor = null; | |
let pageCount = 0; | |
do { | |
if (pageCount >= maxPages) break; | |
let url = `${baseUrl}?actor=${encodeURIComponent(did)}&limit=100`; | |
if (cursor) url += `&cursor=${encodeURIComponent(cursor)}`; | |
try { | |
const response = await fetch(url); | |
if (!response.ok) break; | |
const data = await response.json(); | |
follows = follows.concat(data.follows || []); | |
cursor = data.cursor; | |
pageCount++; | |
} catch (e) { | |
console.error('Error fetching follows:', e); | |
break; | |
} | |
} while (cursor); | |
return follows; | |
} | |
// Find likes for a user (used to build user preferences) | |
async function findLikesForUser(userId) { | |
try { | |
const result = await sql` | |
SELECT | |
split_part(post_id, '/p/', 1) AS creator_id, | |
COUNT(*) AS like_count | |
FROM likes | |
WHERE uid = ${ userId } | |
AND date >= CURRENT_DATE - INTERVAL '2 days' | |
GROUP BY creator_id | |
ORDER BY like_count DESC | |
LIMIT 1000; | |
`; | |
return result; | |
} catch(e) { | |
console.log('Error finding likes for user:', e); | |
return []; | |
} | |
} | |
// CORE COLLABORATIVE FILTERING FUNCTIONS | |
// These are the essential functions that make the collaborative filtering work | |
async function prepareCreatorSnapshot({ actionTable, entityA, jobId, whereClause, relCountFilter, isVideo = false }) { | |
// Sanitize jobId for safe table naming. | |
const safeJobId = jobId.replace(/[^a-zA-Z0-9_]/g, ''); | |
// Build table names. | |
const creatorIndexTable = `"creator_index_${safeJobId}"`; | |
const userBitmapTable = `"user_bitmap_${safeJobId}"`; | |
// Conditional SQL for video feed vs. general. | |
let videoJoinClause = ''; | |
if (isVideo) { | |
// Add join to the posts table with a video type filter. | |
videoJoinClause = `JOIN posts p ON l.post_id = p.post_id AND p.type = 'video'`; | |
} | |
// 1. Create a table mapping each creator to a dense index. | |
// We extract the creator uid from the post_id using split_part(post_id, '/p/', 1). | |
const createCreatorIndexSQL = ` | |
CREATE TABLE ${creatorIndexTable} AS | |
SELECT creator_uid, | |
CAST(ROW_NUMBER() OVER (ORDER BY rel_count DESC) - 1 AS int4) AS creator_idx | |
FROM ( | |
SELECT split_part(l.post_id, '/p/', 1) AS creator_uid, COUNT(*) AS rel_count | |
FROM ${actionTable} l | |
${videoJoinClause} | |
WHERE ${whereClause} | |
GROUP BY split_part(l.post_id, '/p/', 1) | |
) sub | |
WHERE ${relCountFilter}; | |
`; | |
await sql.unsafe(createCreatorIndexSQL); | |
// 2. Create the user-to-bitmap table mapping each user (entityA) to a bitmap of the creators they have liked. | |
const createUserBitmapSQL = ` | |
CREATE TABLE ${userBitmapTable} AS | |
WITH bitmap_cte AS ( | |
SELECT uid, | |
rb_build(array_agg(creator_idx)::int4[]) AS bitmap | |
FROM ( | |
SELECT l.uid, ci.creator_idx | |
FROM ${actionTable} l | |
JOIN ${creatorIndexTable} ci | |
ON split_part(l.post_id, '/p/', 1) = ci.creator_uid | |
${videoJoinClause} | |
WHERE ${whereClause} | |
) joined | |
GROUP BY uid | |
HAVING COUNT(*) BETWEEN 10 AND 500 | |
) | |
SELECT uid, bitmap | |
FROM bitmap_cte; | |
`; | |
await sql.unsafe(createUserBitmapSQL); | |
console.log(`Creator snapshot tables for job ${jobId} have been created.`); | |
} | |
// Cleanup old bitmap tables | |
async function cleanup(jobId) { | |
const safeJobId = jobId.replace(/[^a-zA-Z0-9_]/g, ''); | |
const userBitmapTable = `"user_bitmap_${safeJobId}"`; | |
const creatorIndexTable = `"creator_index_${safeJobId}"`; | |
try { | |
await sql.unsafe(`DROP TABLE IF EXISTS ${userBitmapTable}, ${creatorIndexTable};`); | |
console.log(`Cleaned up temporary snapshot tables for job ${jobId}`); | |
} catch (error) { | |
console.log(`Error cleaning up tables for job ${jobId}:`, error); | |
} | |
} | |
async function findRecommendedCreatorsByUserLikers(targetUserId, jobId, entityA = 'uid', limit = 100, userLimit = 200) { | |
const safeJobId = jobId.replace(/[^a-zA-Z0-9_]/g, ''); | |
const userBitmapTable = `"user_bitmap_${safeJobId}"`; | |
const creatorIndexTable = `"creator_index_${safeJobId}"`; | |
// Get target user's bitmap | |
let targetBitmap; | |
const existing = await sql` | |
SELECT bitmap FROM ${sql.unsafe(userBitmapTable)} | |
WHERE uid = ${targetUserId} | |
LIMIT 1 | |
`; | |
if (existing.length > 0) { | |
targetBitmap = existing[0].bitmap; | |
} else { | |
const build = await sql` | |
WITH user_creators AS ( | |
SELECT ci.creator_idx | |
FROM likes l | |
JOIN ${sql.unsafe(creatorIndexTable)} ci | |
ON split_part(l.post_id, '/p/', 1) = ci.creator_uid | |
WHERE l.uid = ${targetUserId} | |
), | |
new_bitmap AS ( | |
SELECT | |
COALESCE(rb_build(array_agg(creator_idx)::int4[]), rb_build('{}'::int4[])) AS bitmap | |
FROM user_creators | |
) | |
INSERT INTO ${sql.unsafe(userBitmapTable)} (uid, bitmap) | |
SELECT ${targetUserId}, bitmap | |
FROM new_bitmap | |
RETURNING bitmap | |
`; | |
if (build.length === 0) throw new Error('Failed to build bitmap'); | |
targetBitmap = build[0].bitmap; | |
} | |
// Step 1: Get similar users using roaring bitmap operations | |
const similarUsers = await sql` | |
SELECT uid | |
FROM ${sql.unsafe(userBitmapTable)} | |
WHERE uid != ${targetUserId} | |
AND bitmap && ${targetBitmap}::roaringbitmap | |
ORDER BY rb_jaccard_dist(bitmap, ${targetBitmap}::roaringbitmap) ASC | |
LIMIT ${userLimit} | |
`; | |
if (similarUsers.length === 0) return []; | |
const similarUserIds = similarUsers.map(row => row.uid); | |
// Step 2: Get top recommended creators by z-scored like count | |
const recommended = await sql` | |
WITH creator_likes AS ( | |
SELECT ci.creator_uid AS uid, | |
COUNT(*) AS like_count | |
FROM likes l | |
JOIN ${sql.unsafe(creatorIndexTable)} ci | |
ON split_part(l.post_id, '/p/', 1) = ci.creator_uid | |
WHERE l.uid = ANY(${sql.array(similarUserIds)}) | |
GROUP BY ci.creator_uid | |
), | |
stats AS ( | |
SELECT AVG(like_count) AS mean, STDDEV_POP(like_count) AS stddev FROM creator_likes | |
) | |
SELECT cl.uid, | |
cl.like_count, | |
CASE | |
WHEN stats.stddev > 0 THEN (cl.like_count - stats.mean) / stats.stddev / 100 | |
ELSE 0 | |
END AS similarity | |
FROM creator_likes cl, stats | |
ORDER BY similarity DESC | |
LIMIT ${limit} | |
`; | |
return recommended; | |
} | |
async function ensureCreatorIndex() { | |
const jobId = `creator_snapshot_${formatTimestampAsDate(Date.now())}`; | |
const safeJobId = jobId.replace(/[^a-zA-Z0-9_]/g, ''); | |
// Check if table exists | |
const tableExists = await sql.unsafe(` | |
SELECT EXISTS ( | |
SELECT 1 | |
FROM pg_tables | |
WHERE schemaname = 'public' | |
AND tablename = 'user_bitmap_${safeJobId}' | |
); | |
`); | |
if (!tableExists[0].exists) { | |
if (lastCreatorJobId) return buildCreatorIndex(); | |
else return await buildCreatorIndex(); | |
} else { | |
if (!lastCreatorJobId) lastCreatorJobId = jobId; | |
return; | |
} | |
if (lastCreatorJobId && lastCreatorJobId !== jobId) return buildCreatorIndex(); | |
else if (!lastCreatorJobId || lastCreatorJobId !== jobId) return await buildCreatorIndex(); | |
} | |
async function buildCreatorIndex() { | |
try { | |
const jobId = `creator_snapshot_${formatTimestampAsDate(Date.now())}`; | |
await prepareCreatorSnapshot({ | |
actionTable: 'likes', | |
entityA: 'uid', | |
entityB: 'post_id', | |
whereClause: "date >= now() - interval '3 days'", | |
relCountFilter: "rel_count BETWEEN 10 AND 500", | |
jobId, | |
}); | |
lastCreatorJobId = jobId; | |
console.log('Finished preparing snapshot for likes:', jobId); | |
} catch (err) { | |
console.log('Error preparing snapshot:', err); | |
} | |
} | |
// Update follows bitmap with roaring bitmap operations | |
async function updateFollowsBitmap(uid, follows) { | |
const normalizedDids = follows | |
.map(f => f.did?.replace('did:plc:', '')) | |
.filter(Boolean); | |
if (normalizedDids.length === 0) return; | |
// Lookup internal IDs from the users table | |
const rows = await sql` | |
SELECT id FROM users WHERE uid IN (${sql.array(normalizedDids, 'text')}) | |
`; | |
const followedIds = rows.map(r => r.id); | |
if (followedIds.length === 0) return; | |
await sql` | |
INSERT INTO follows_bitmap (uid, bitmap, count, updated_date) | |
VALUES ( | |
${uid}, | |
rb_build(${sql.array(followedIds)}::int[])::BYTEA, | |
${followedIds.length}, | |
CURRENT_DATE | |
) | |
ON CONFLICT (uid) DO UPDATE SET | |
bitmap = rb_or(follows_bitmap.bitmap::roaringbitmap, EXCLUDED.bitmap::roaringbitmap)::BYTEA, | |
count = follows_bitmap.count + ${followedIds.length}, | |
updated_date = CURRENT_DATE; | |
`; | |
} | |
// Get user affinities | |
async function getUserAffinities(userId, type = '') { | |
const cacheKey = Cache.getKey('/affinities', { userId, type }); | |
let affinities = Cache.get(cacheKey); | |
if (!affinities) { | |
try { | |
affinities = await sql` | |
SELECT * | |
FROM user_creator_affinity | |
WHERE user_id = ${userId} | |
AND type = ${type} | |
AND (action IS DISTINCT FROM 'suppressed') | |
AND ( | |
last_interaction_at >= now() - interval '7 days' OR | |
last_touched >= now() - interval '7 days' OR | |
action = 'forever' OR | |
action = 'following' | |
); | |
`; | |
Cache.set(cacheKey, affinities, 3600000); | |
} catch(e) { | |
console.log('Error getting user affinities:', e); | |
affinities = []; | |
} | |
} | |
return affinities.map(user => ({...user, uid: user.creator_id})); | |
} | |
// Main recommendation function with proper collaborative filtering | |
async function getRecommendedPostsForUser(userId, params = {}, queryParams = {}) { | |
const { post_limit = 2500, acceptLanguages = ['en'] } = params; | |
const { type } = queryParams; | |
let jobId = params.jobId || lastCreatorJobId; | |
try { | |
await ensureCreatorIndex(); | |
// Get user's current affinities | |
let affinities = await getUserAffinities(userId, type); | |
// Find similar users and get recommended creators using collaborative filtering | |
let recommendedCreators = await findRecommendedCreatorsByUserLikers(userId, jobId, 'uid', 100); | |
// Get user's direct likes for affinity updates | |
let likedCreators = await findLikesForUser(userId); | |
// Update affinities based on collaborative filtering results | |
const likedCreatorsForInsert = likedCreators.filter(creator => Number(creator.like_count) >= 5); | |
try { | |
await Promise.all(likedCreatorsForInsert.map(creator => | |
sql` | |
INSERT INTO user_creator_affinity (user_id, creator_id, type, last_interaction_at, last_touched, score) | |
VALUES (${userId}, ${creator.creator_id}, COALESCE(${type}, ''::text), now(), now(), 0.1) | |
ON CONFLICT (user_id, creator_id, type) DO UPDATE | |
SET last_interaction_at = now(); | |
` | |
)); | |
} catch (e) { | |
console.log('Error inserting/updating liked creators:', e); | |
} | |
// Process recommended creators from collaborative filtering | |
try { | |
await Promise.all(recommendedCreators.map(user => | |
sql` | |
INSERT INTO user_creator_affinity (user_id, creator_id, type, last_interaction_at, last_touched, score) | |
VALUES (${userId}, ${user.uid}, COALESCE(${type}, ''::text), now(), now(), ${user.similarity}) | |
ON CONFLICT (user_id, creator_id, type) DO UPDATE | |
SET last_touched = now(), | |
score = GREATEST(user_creator_affinity.score, ${user.similarity}); | |
` | |
)); | |
} catch (e) { | |
console.log('Error processing recommended creators:', e); | |
} | |
// Get final list of creators from updated affinities | |
affinities = await getUserAffinities(userId, type); | |
const creatorIds = affinities.map(a => a.creator_id); | |
if (creatorIds.length === 0) { | |
// Fallback to popular posts | |
const posts = await sql` | |
SELECT p.post_id, p.ts, p.lang, p.type, p.labels, plt.total_likes as like_count | |
FROM posts p | |
JOIN post_like_totals plt ON p.post_id = plt.post_id | |
WHERE p.ts >= now() - interval '1 day' | |
AND plt.total_likes >= 10 | |
ORDER BY plt.total_likes DESC, p.ts DESC | |
LIMIT ${post_limit}; | |
`; | |
return { posts }; | |
} | |
// Get posts from recommended creators | |
const posts = await sql` | |
SELECT p.post_id, p.ts, p.lang, p.type, p.labels, COALESCE(plt.total_likes, 0) as like_count | |
FROM posts p | |
LEFT JOIN post_like_totals plt ON p.post_id = plt.post_id | |
WHERE p.uid = ANY(${sql.array(creatorIds)}) | |
AND p.ts >= now() - interval '2 days' | |
AND p.type != 'reply' | |
ORDER BY COALESCE(plt.total_likes, 0) DESC, p.ts DESC | |
LIMIT ${post_limit}; | |
`; | |
return { posts }; | |
} catch(e) { | |
console.log('Error getting recommended posts:', e); | |
return { posts: [] }; | |
} | |
} | |
// Discover algorithm | |
const algos = { | |
'discover': async (ctx, params = {}, requesterDid) => { | |
try { | |
const { limit = 50, cursor, acceptLanguages = ['en'] } = params; | |
let userId = (requesterDid || '').replace('did:plc:', ''); | |
let { posts } = await getRecommendedPostsForUser(userId, params); | |
posts = posts.map(p => ({ | |
post_id: normalizePostIdToAtUri(p.post_id), | |
ts: (+ new Date(p.ts)), | |
lang: p.lang, | |
type: p.type, | |
like_count: p.like_count, | |
label: p.labels | |
})); | |
// Filter by language | |
const langSet = new Set([...acceptLanguages, '', 'en']); | |
posts = posts.filter(p => !p.lang || langSet.has(p.lang)); | |
// Sort by timestamp descending | |
const sortedPosts = posts.sort((a, b) => b.ts - a.ts); | |
let startIndex = 0; | |
if (cursor) { | |
const cursorNum = parseInt(cursor, 10); | |
startIndex = sortedPosts.findIndex(p => p.ts < cursorNum); | |
if (startIndex === -1) startIndex = sortedPosts.length; | |
} | |
const paginatedPosts = sortedPosts.slice(startIndex, startIndex + limit); | |
let newCursor; | |
if (startIndex + limit < sortedPosts.length) { | |
newCursor = paginatedPosts[paginatedPosts.length - 1].ts.toString(); | |
} | |
const feedItems = paginatedPosts.map(p => ({ post: p.post_id })); | |
return { | |
cursor: newCursor, | |
feed: feedItems, | |
}; | |
} catch(e) { | |
console.log('Error in discover algorithm:', e); | |
return { feed: [] }; | |
} | |
}, | |
'whats-hot': async (ctx, params = {}, requesterDid) => { | |
try { | |
const { limit = 50, cursor, acceptLanguages = ['en'] } = params; | |
let posts = await generateCatchupFeed({ limit: 10000, lang: acceptLanguages[0] || 'en' }); | |
posts = posts.map(p => ({ | |
post_id: normalizePostIdToAtUri(p.post_id), | |
ts: (+ new Date(p.ts)) | |
})); | |
// Sort posts descending by timestamp (most recent first) | |
const sortedPosts = posts.sort((a, b) => b.ts - a.ts); | |
let startIndex = 0; | |
if (cursor) { | |
const cursorNum = parseInt(cursor, 10); | |
startIndex = sortedPosts.findIndex(p => p.ts < cursorNum); | |
if (startIndex === -1) startIndex = sortedPosts.length; | |
} | |
const paginatedPosts = sortedPosts.slice(startIndex, startIndex + limit); | |
let newCursor; | |
if (startIndex + limit < sortedPosts.length) { | |
newCursor = paginatedPosts[paginatedPosts.length - 1].ts.toString(); | |
} | |
const feedItems = paginatedPosts.map(p => ({ post: p.post_id })); | |
return { | |
cursor: newCursor, | |
feed: feedItems, | |
}; | |
} catch(e) { | |
console.log('Error in whats-hot algorithm:', e); | |
return { feed: [] }; | |
} | |
} | |
}; | |
// Jetstream connection | |
function connectToJetstream() { | |
const wsUrl = `wss://${JETSTREAM_HOST}/subscribe${JETSTREAM_PARAMS}`; | |
const ws = new WebSocket(wsUrl); | |
ws.on('open', () => { | |
console.log('Connected to Jetstream.'); | |
}); | |
ws.on('message', (data) => { | |
let msg; | |
try { | |
msg = JSON.parse(data); | |
} catch (e) { | |
console.error('Error parsing message:', data); | |
return; | |
} | |
if (msg.kind !== 'commit' || !msg.commit || msg.commit.operation !== 'create') { | |
return; | |
} | |
const collection = msg.commit.collection; | |
const commit = msg.commit; | |
const record = commit.record; | |
const uid = (msg.did || '').replace('did:plc:', ''); | |
// Process posts | |
if (collection === 'app.bsky.feed.post') { | |
const postId = getPostId(msg.did, commit.rkey); | |
const ts = formatTimestamp(record.createdAt || Date.now()); | |
const d = formatTimestampAsDate(record.createdAt || Date.now()); | |
const lang = normalizeLang((record.langs && record.langs[0]) || ''); | |
let type = 'text', labels = [], isReply = false; | |
if (record.reply && record.reply.parent && record.reply.root) { | |
isReply = true; | |
type = 'reply'; | |
} | |
// Determine post type | |
if (record.embed && record.embed.$type) { | |
let inferredType = (record.embed.$type || '').replace('app.bsky.embed.', ''); | |
if (!['record', 'recordWithMedia'].includes(inferredType)) type = inferredType; | |
} | |
// Determine labels | |
if (record.labels && record.labels.values) { | |
labels = record.labels.values.map(x => (x.val || '').trim().toLowerCase()).filter(x => !!x); | |
} | |
postWriter.writeRow([postId, uid, ts, lang, type, formatLabels(labels)]); | |
if (isReply) { | |
replyWriter.writeRow([postId, uid, d, null]); | |
} | |
} else if (collection === 'app.bsky.feed.like') { | |
const subjectUri = record.subject.uri; | |
const postId = getPostIdFromSubject(subjectUri); | |
const d = formatTimestampAsDate(record.createdAt); | |
if (postId && uid && d) likeWriter.writeRow([postId, uid, d, null]); | |
} else if (collection === 'app.bsky.feed.repost') { | |
const subjectUri = record.subject.uri; | |
const postId = getPostIdFromSubject(subjectUri); | |
const d = formatTimestampAsDate(record.createdAt); | |
if (postId && uid && d) repostWriter.writeRow([postId, uid, d, null]); | |
} else if (collection === 'app.bsky.graph.follow') { | |
let subject = (record.subject || '').replace('did:plc:', ''); | |
const ts = formatTimestamp(record.createdAt || Date.now()); | |
let isCreate = msg.commit.operation === 'create'; | |
if (uid && subject && ts) followWriter.writeRow([uid, subject, ts, isCreate]); | |
} | |
}); | |
ws.on('close', () => { | |
console.warn('Jetstream WebSocket closed. Reconnecting momentarily...'); | |
setTimeout(connectToJetstream, RECONNECT_DELAY_MS); | |
}); | |
ws.on('error', (err) => { | |
console.error('Jetstream WebSocket error:', err); | |
ws.close(); | |
}); | |
} | |
// AT Protocol server setup | |
const app = express(); | |
app.use(cors()); | |
app.use(express.json()); | |
// DID document endpoint | |
app.get('/.well-known/did.json', (req, res) => { | |
const serviceDid = process.env.FEEDGEN_SERVICE_DID || 'did:web:example.com'; | |
const hostname = process.env.FEEDGEN_HOSTNAME || 'localhost'; | |
res.json({ | |
'@context': ['https://www.w3.org/ns/did/v1'], | |
id: serviceDid, | |
service: [ | |
{ | |
id: '#bsky_fg', | |
type: 'BskyFeedGenerator', | |
serviceEndpoint: `https://${hostname}`, | |
}, | |
], | |
}); | |
}); | |
// Create AT Protocol server | |
const server = xrpc.createServer([ | |
{ | |
lexicon: 1, | |
id: 'app.bsky.feed.getFeedSkeleton', | |
defs: { | |
main: { | |
type: 'query', | |
description: 'Get a skeleton of a feed provided by a feed generator', | |
parameters: { | |
type: 'params', | |
required: ['feed'], | |
properties: { | |
feed: { | |
type: 'string', | |
format: 'at-uri', | |
description: 'Reference (AT-URI) to feed generator record' | |
}, | |
limit: { | |
type: 'integer', | |
minimum: 1, | |
maximum: 100, | |
default: 50 | |
}, | |
cursor: { | |
type: 'string' | |
} | |
} | |
}, | |
output: { | |
encoding: 'application/json', | |
schema: { | |
type: 'object', | |
required: ['feed'], | |
properties: { | |
cursor: { | |
type: 'string' | |
}, | |
feed: { | |
type: 'array', | |
items: { | |
type: 'object', | |
required: ['post'], | |
properties: { | |
post: { | |
type: 'string', | |
format: 'at-uri' | |
} | |
} | |
} | |
} | |
} | |
} | |
} | |
} | |
} | |
} | |
], { | |
validateAuth: async (req, serviceDid, didResolver) => { | |
const auth = req.headers.authorization; | |
if (!auth) throw new AuthRequiredError(); | |
const jwt = auth.replace('Bearer ', ''); | |
const result = await verifyJwt(jwt, serviceDid, didResolver); | |
return { credentials: { did: result.sub } }; | |
}, | |
authOptional: true, | |
}); | |
// Feed skeleton endpoint | |
server.method('app.bsky.feed.getFeedSkeleton', async ({ auth, params }) => { | |
const requesterDid = auth?.credentials?.did; | |
const algo = algos[params.feed]; | |
if (!algo) { | |
throw new Error(`Algorithm ${params.feed} not found`); | |
} | |
try { | |
const result = await algo(null, params, requesterDid); | |
return { | |
encoding: 'application/json', | |
body: result, | |
}; | |
} catch (error) { | |
console.error('Error in getFeedSkeleton:', error); | |
return { | |
encoding: 'application/json', | |
body: { feed: [] }, | |
}; | |
} | |
}); | |
// Mount AT Protocol server | |
app.use(server.router); | |
// Health check | |
app.get('/health', (req, res) => { | |
res.json({ status: 'ok', timestamp: new Date().toISOString() }); | |
}); | |
// Statistics endpoint | |
app.get('/stats', async (req, res) => { | |
try { | |
const stats = await sql` | |
SELECT | |
(SELECT COUNT(*) FROM posts) as posts_count, | |
(SELECT COUNT(*) FROM likes) as likes_count, | |
(SELECT COUNT(*) FROM users) as users_count, | |
(SELECT COUNT(*) FROM user_creator_affinity) as affinities_count; | |
`; | |
const csvStats = csvWriters.map(writer => ({ | |
filename: writer.filenameBase, | |
writeCount: writer.writeCount, | |
timeElapsed: Math.round((Date.now() - writer.created) / 1000) | |
})); | |
res.json({ | |
database: stats[0], | |
csvWriters: csvStats, | |
lastCreatorJobId | |
}); | |
} catch (error) { | |
console.error('Error getting stats:', error); | |
res.status(500).json({ error: 'Failed to get stats' }); | |
} | |
}); | |
// Rebuild creator index endpoint (for debugging/maintenance) | |
app.get('/creators/rebuild', async (req, res) => { | |
let _startTime = Date.now(); | |
lastCreatorJobId = null; | |
await ensureCreatorIndex(); | |
let queryTime = ((Date.now() - _startTime) / 1000).toFixed(2); | |
res.send(`Finished rebuilding creator index in ${queryTime} seconds.`); | |
}); | |
// Catchup feed endpoint (for debugging/testing) | |
app.get('/catch-up', async (req, res) => { | |
try { | |
let _startTime = Date.now(); | |
let posts = await generateCatchupFeed(req.query); | |
let queryTime = ((Date.now() - _startTime) / 1000).toFixed(2); | |
res.json({ queryTime, posts }); | |
} catch (err) { | |
console.error('Error fetching catchup feed:', err); | |
res.status(500).json({ error: err.message }); | |
} | |
}); | |
// Scheduled jobs | |
schedule.scheduleJob('*/15 * * * *', async function() { | |
console.log('Running scheduled post like totals recreation...'); | |
await recreatePostLikeLookup(); | |
}); | |
schedule.scheduleJob('0 2 * * *', async function() { | |
console.log('Running daily creator index rebuild...'); | |
lastCreatorJobId = null; | |
await ensureCreatorIndex(); | |
}); | |
// Data maintenance job - runs every 5 minutes | |
schedule.scheduleJob('*/5 * * * *', async function() { | |
try { | |
// Do deduplication once per hour, max | |
if (lastDedupe !== new Date().getHours()) { | |
console.log("Executing dedupe queries..."); | |
await dedupe('posts'); | |
await dedupe('likes'); | |
await dedupe('replies'); | |
await dedupe('reposts'); | |
lastDedupe = new Date().getHours(); | |
} | |
// Batch delete once per day | |
if (lastDelete !== new Date().getDate()) { | |
console.log("Running batch deletes to maintain table sizes..."); | |
await batchDelete('likes', 'date', '4 days'); | |
await batchDelete('replies', 'date', '7 days'); | |
await batchDelete('reposts', 'date', '7 days'); | |
await batchDelete('posts', 'ts', '4 days'); | |
await batchDelete('follows', 'timestamp', '4 days'); | |
await batchDelete('user_creator_affinity', 'last_interaction_at', '30 days', 10000, `action IS DISTINCT FROM 'forever' AND action IS DISTINCT FROM 'following'`); | |
console.log('Batch deletes completed.'); | |
lastDelete = new Date().getDate(); | |
// Cleanup old bitmap tables | |
const threeDaysAgoTimestamp = Date.now() - 3 * 24 * 60 * 60 * 1000; | |
let oldJobId = `creator_snapshot_${formatTimestampAsDate(threeDaysAgoTimestamp)}`; | |
await cleanup(oldJobId); | |
} | |
// Populate users table from activity data | |
await populateUsersTable(); | |
// Warm up catchup feed | |
await generateCatchupFeed({ limit: 1000, lang: 'en' }); | |
} catch (error) { | |
console.error('Error in maintenance job:', error); | |
} | |
}); | |
// CSV Import functionality | |
function copyToPostgres(tableName, filePath, cols = '') { | |
return new Promise((resolve, reject) => { | |
const cmd = `psql ${process.env.DATABASE_URL} -c "\\copy ${tableName} ${cols} FROM '${filePath}' WITH (FORMAT csv, HEADER true, NULL '');"`; | |
const timeout = setTimeout(() => { | |
reject(new Error(`psql import timed out: ${filePath}`)); | |
}, 120000); // 2 minute timeout | |
exec(cmd, (error, stdout, stderr) => { | |
clearTimeout(timeout); | |
if (error) { | |
console.error(`Error importing ${filePath}:`, error); | |
reject(error); | |
} else { | |
resolve(stdout); | |
} | |
}); | |
}); | |
} | |
// Import job variables | |
let importJob; | |
let runningImport = false; | |
async function importJobFunction() { | |
if (!sql) return; | |
if (runningImport) return; | |
console.log('Starting import job...'); | |
runningImport = true; | |
try { | |
// Get CSV files in OUTPUT_DIR | |
const files = fs.readdirSync(OUTPUT_DIR); | |
const csvFiles = files.filter(file => file.endsWith('.csv')); | |
if (csvFiles.length === 0) { | |
console.log('No new files to import.'); | |
runningImport = false; | |
return; | |
} | |
for (const file of csvFiles) { | |
const filePath = path.join(OUTPUT_DIR, file); | |
const tableName = file.split('_')[0]; // Extract table name from filename | |
try { | |
console.log(`Importing ${file} to ${tableName}...`); | |
// Handle different table types with appropriate columns | |
if (tableName === 'keywords') { | |
await copyToPostgres(tableName, filePath, '(post_id, keyword, date)'); | |
} else if (tableName === 'follows') { | |
await copyToPostgres(tableName, filePath, '(uid, subject, timestamp, is_create)'); | |
} else if (tableName === 'posts') { | |
await copyToPostgres(tableName, filePath, '(post_id, uid, ts, lang, type, labels)'); | |
} else if (tableName === 'likes' || tableName === 'reposts' || tableName === 'replies') { | |
await copyToPostgres(tableName, filePath, '(post_id, uid, date, processed_at)'); | |
} else { | |
// Fallback - try without column specification | |
await copyToPostgres(tableName, filePath); | |
} | |
// Archive the file after successful import | |
const archiveDir = path.join(__dirname, 'archive'); | |
if (!fs.existsSync(archiveDir)) { | |
fs.mkdirSync(archiveDir, { recursive: true }); | |
} | |
const archivePath = path.join(archiveDir, file); | |
fs.renameSync(filePath, archivePath); | |
console.log(`Successfully imported and archived ${file}`); | |
} catch (error) { | |
console.error(`Error importing ${file}:`, error); | |
// Move failed file to a failed directory for manual inspection | |
const failedDir = path.join(__dirname, 'failed_imports'); | |
if (!fs.existsSync(failedDir)) { | |
fs.mkdirSync(failedDir, { recursive: true }); | |
} | |
const failedPath = path.join(failedDir, file); | |
fs.renameSync(filePath, failedPath); | |
} | |
} | |
console.log("Import job completed."); | |
} catch (error) { | |
console.error("Error during import job:", error); | |
} finally { | |
runningImport = false; | |
} | |
} | |
// Start import job (runs every 5 minutes) | |
if (process.env.HALT_IMPORT !== 'true') { | |
importJob = setInterval(importJobFunction, 5 * 60 * 1000); | |
console.log('CSV import job scheduled to run every 5 minutes'); | |
} | |
// Graceful shutdown | |
process.on('SIGINT', () => { | |
console.log('Shutting down gracefully...'); | |
if (importJob) clearInterval(importJob); | |
csvWriters.forEach(writer => writer.shutdown()); | |
setTimeout(() => { | |
process.exit(0); | |
}, 5000); | |
}); | |
// Start the application | |
async function start() { | |
try { | |
await initDatabase(); | |
// Start Jetstream listener | |
connectToJetstream(); | |
// Start web server | |
app.listen(PORT, () => { | |
console.log(`π Server running on port ${PORT}`); | |
console.log(`π Stats available at http://localhost:${PORT}/stats`); | |
console.log(`π Health check at http://localhost:${PORT}/health`); | |
console.log(`π§ Rebuild index at http://localhost:${PORT}/creators/rebuild`); | |
}); | |
} catch (error) { | |
console.error('Failed to start application:', error); | |
process.exit(1); | |
} | |
} | |
start(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment