/** * This file is responsible for * 1. storing activities in PostGres/Redis * 2. fetching the users feed from redis * 3. storing following/unfollowing in redis */ import { ActivityTargetType, ActivityVerb, Activity, Profile, Listing, } from 'entities' import { groupBy, isEmpty } from 'lodash' import { redis } from 'store/redis' // ioredis import { ger } from 'store/ger' // look up NPM package ger import { In } from 'typeorm' import batch from 'batch-promises' interface CreateActivityConfig { parent?: number parentType?: ActivityTargetType } const MAX_BATCH = 1000 // Limit the number of Promise.all at once to 1000, that way we dont overload the server say // if a user with 1,000,000 followers makes a post. We wont be making the 1,000,000 writes at once but in batches of 1,000 export class FeedService { // Safely parse the data from redis parseArray(str: string | string[]): any[] { try { if (Array.isArray(str)) { return str.map(item => JSON.parse(item)) } return JSON.parse(str) } catch (error) { return null } } // Users can turn on post notifications for other users. async enablePostNotifications(other: number, current: number) { const follower = String(current) const timestamp = String(new Date().getTime()) return redis.zadd(`post_not:${other}`, timestamp, follower) } // Store the follower ids in redis so we can access them later without touching the DB async follow(other: number, current: number) { const follower = String(current) const timestamp = String(new Date().getTime()) await ger.event('users', current, 'follows', other) return redis.zadd(`followers:${other}`, timestamp, follower) } async unfollow(other: number, current: number) { const follower = String(current) await ger.delete_events('users', current, 'follows', other) return redis.zrem(`followers:${other}`, follower) } // This function taks the activity stored in redis and gets the associated target object from the database. // I only have on clause set up right now but it can be used for much more. // My application has two type of feeds one for the home page and one called "activity" (think the "Following" tab in Instagram) // The only difference is that the home page returns only one of each target object that is no duplicates for an activity // Where as the "activity" page will return it as it is. private extractTargetObjects = (type: 'feed' | 'activity' = 'feed') => < T extends Activity >( feed: T[] ) => { const mapItemToFeedItem = <T>(item: T, index: number) => ({ ...feed[index], // the stored redis activity item, // this is the item form the database }) if (type !== 'feed') { return Promise.all( feed.map(item => { const id = item.targetId switch (item.targetType) { case 'LISTING': return Listing.findOne({ where: { id } }) // TypeORM query default: return null } }) ).then(data => data.map(mapItemToFeedItem)) } const grouped = groupBy(feed, item => item.targetType) const promises = Object.entries(grouped).map(([targetType, items]) => { const ids = items.map(item => item.targetId) switch (targetType) { case 'LISTING': return Listing.find({ where: { id: In(ids) } }) // batch query default: return [] } }) return Promise.all(promises) .then(data => data.reduce((left, right) => left.concat(right), [])) .then(data => data.map(mapItemToFeedItem)) } // get the users feed getFeed(profile: number, limit: number = 6, offset: number = 0) { return redis .zrange(`feed:${profile}`, offset, limit) // query the stored feed in redis .then(this.parseArray) // parse the redis string .then(this.extractTargetObjects('feed')) // extract the target objects } getActivityFeed(profile: number, limit: number = 6, offset: number = 0) { return redis .zrange(`activity:${2}`, offset, limit) // query the stored feed in redis .then(this.parseArray) // parse the redis string .then(this.extractTargetObjects('activity'))// extract the target objects } // Just saves the activity in PostGres and returns a JSON string of it private async generateActivity( actor: Profile, verb: ActivityVerb, targetType: ActivityTargetType, target: number, config: CreateActivityConfig = {} ): Promise<[Activity, string]> { let activity = new Activity() activity.actor = actor activity.targetType = targetType activity.targetId = target activity.verb = verb activity.parentId = config.parent activity.parentType = config.parentType activity = await activity.save() const serializedActivity = JSON.stringify({ ...activity, actor: actor.id, }) return [activity, serializedActivity] } // This stores the activity in postgres and each of the users followers feeds/activity feeds // It also stores the activity as a notification for a user if they have enabled post notifications async createActivity( actor: Profile, verb: ActivityVerb, targetType: ActivityTargetType, target: number, config: CreateActivityConfig = {} ) { const followers: string[] = await redis.zrange( `followers:${actor.id}`, 0, -1 ) // get followers const subscribers: string[] = (await redis.zrange(`post_not:${actor.id}`, 0, -1)) || [] // get those who have enabled push notifications const timestamp = String(new Date().getTime()) const isListing = targetType === ActivityTargetType.LISTING && verb === ActivityVerb.ADD // can omit this check const [activity, serializedActivity] = await this.generateActivity( actor, verb, targetType, target, config ) if (!isEmpty(subscribers)) { // make a notification for people who have enable post notifications batch(MAX_BATCH, subscribers, subscriber => redis.zadd(`notifications:${subscriber}`, timestamp, serializedActivity) ) } if (!isEmpty(followers)) { // Publish to followers feed if it is a listing else to their activity feed batch(MAX_BATCH, followers, async follower => isListing ? redis.zadd(`feed:${follower}`, timestamp, serializedActivity) : redis.zadd(`activity:${follower}`, timestamp, serializedActivity) ) } if (isListing) { // Add to actors feed await redis.zadd(`feed:${actor.id}`, timestamp, serializedActivity) } return activity } } export const feedService = new FeedService()