Last active
August 7, 2023 07:31
-
-
Save trvswgnr/8e6857a35910bdbd0dab3663e22f1e97 to your computer and use it in GitHub Desktop.
daxxer challenge 2
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import type { | |
UserId, | |
User, | |
Todo, | |
IBService, | |
IQueueService, | |
ICacheService, | |
ILogService, | |
IMonitorService, | |
} from './types'; | |
export class BService implements IBService { | |
public users: NonNullable<User>[] = []; | |
public todos: Todo[] = []; | |
constructor() { } | |
async getUser(userId: UserId): Promise<User> { | |
return this.users.find(user => user.id === userId) || null; | |
} | |
async createUser(userId: UserId): Promise<User> { | |
const user = { | |
id: userId, | |
name: `travvy${userId}`, | |
email: '[email protected]', | |
}; | |
this.users.push(user); | |
return user | |
} | |
async createTodo(todo: Todo): Promise<Todo> { | |
const { id, userId } = todo; | |
const user = await this.getUser(userId); | |
if (!user) { | |
throw new Error(`user does not exist: ${userId}`); | |
} | |
const existingTodo = this.todos.find(todo => todo.id === id); | |
if (existingTodo) { | |
throw new Error(`todo already exists: ${id}`); | |
} | |
this.todos.push(todo); | |
return todo; | |
} | |
} | |
export class LogService implements ILogService { | |
public logs: any[] = []; | |
constructor(private showOutput?: boolean) { } | |
log(...messages: any[]): void { | |
if (this.showOutput) { | |
console.log(...messages); | |
} | |
this.logs.push(`LOG: ${messages.join(' ')}`); | |
} | |
error(...messages: any[]): void { | |
if (this.showOutput) { | |
console.error(...messages); | |
} | |
this.logs.push(`ERROR: ${messages.join(' ')}`); | |
} | |
} | |
export class CacheService<T> implements ICacheService<T> { | |
private cache = new Set<T>(); | |
add(key: T): void { | |
this.cache.add(key); | |
} | |
has(key: T): boolean { | |
return this.cache.has(key); | |
} | |
delete(key: T): void { | |
this.cache.delete(key); | |
} | |
get size(): number { | |
return this.cache.size; | |
} | |
} | |
export class QueueService<T> implements IQueueService<T> { | |
private queue: T[] = []; | |
push(item: T): void { | |
this.queue.push(item); | |
} | |
shift(): T | undefined { | |
return this.queue.shift(); | |
} | |
get length(): number { | |
return this.queue.length; | |
} | |
} | |
export class MonitorService implements IMonitorService { | |
public monitorCount = 0; | |
monitor(callback: () => void, interval: number): void { | |
setInterval(() => { | |
callback(); | |
this.monitorCount++; | |
}, interval); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { jest, describe, it, expect, beforeEach } from '@jest/globals'; | |
import { SyncService } from './SyncService'; | |
import { EventFromA, ILogService, IQueueService } from './types'; | |
import { QueueService, LogService, BService, MonitorService, CacheService } from './mock-services'; | |
jest.mock('./services'); | |
describe('no race conditions', () => { | |
let syncService: SyncService; | |
let mockQueue: IQueueService<EventFromA>; | |
let mockLogService: ILogService; | |
beforeEach(() => { | |
mockQueue = new QueueService(); | |
mockLogService = new LogService(); | |
syncService = new SyncService( | |
new BService(), | |
new MonitorService(), | |
mockLogService, | |
new CacheService(), | |
new CacheService(), | |
mockQueue | |
); | |
}); | |
it('should handle race condition when processing events', async () => { | |
const events = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10].map(id => ({ | |
id, | |
userId: 1, | |
type: 'todo.created' as const, | |
todo: { | |
id, | |
userId: 1, | |
title: `Todo ${id}`, | |
description: `Description ${id}`, | |
}, | |
})); | |
// simulate a delay when processing events | |
jest.spyOn(syncService, 'process' as any).mockImplementation(() => new Promise(resolve => setTimeout(resolve, 500))); | |
const handleEvents = events.map(event => syncService.handleEvent(event)); | |
// wait for all events to be handled | |
await Promise.all(handleEvents); | |
// wait for the queue to be processed | |
await new Promise(resolve => setTimeout(resolve, 100)); | |
// check that the 'processQueue' method was not called concurrently | |
expect(mockLogService.error).not.toHaveBeenCalledWith('the queue is empty, this means we have a race condition'); | |
}); | |
}); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { SyncService } from './SyncService'; | |
import type { | |
EventId, | |
UserId, | |
EventFromA, | |
} from './types'; | |
import { describe, beforeEach, it, expect } from '@jest/globals'; | |
import { QueueService, LogService, BService, MonitorService, CacheService } from './mock-services'; | |
describe('SyncService', () => { | |
let bService: BService; | |
let syncService: SyncService; | |
let queue: QueueService<EventFromA>; | |
let userCache: CacheService<UserId>; | |
let logService: LogService; | |
let monitorService: MonitorService; | |
let idempotencyKeys: CacheService<EventId>; | |
beforeEach(() => { | |
bService = new BService(); | |
queue = new QueueService(); | |
userCache = new CacheService<UserId>(); | |
logService = new LogService(); | |
monitorService = new MonitorService(); | |
idempotencyKeys = new CacheService<EventId>(); | |
syncService = new SyncService( | |
bService, | |
monitorService, | |
logService, | |
userCache, | |
idempotencyKeys, | |
queue, | |
); | |
}); | |
it('should create a user and cache it', async () => { | |
const event: EventFromA = { id: 1, userId: 1, type: 'user.created' }; | |
syncService.handleEvent(event); | |
await new Promise(resolve => setTimeout(resolve, 100)); // wait for processing | |
expect(userCache.has(1)).toBe(true); | |
}); | |
it('should create a todo and cache it', async () => { | |
const userEvent: EventFromA = { id: 1, userId: 1, type: 'user.created' }; | |
const todoEvent: EventFromA = { id: 2, userId: 1, type: 'todo.created', todo: { id: 1, userId: 1, title: 'Todo 1', description: 'Description 1' } }; | |
syncService.handleEvent(userEvent); | |
syncService.handleEvent(todoEvent); | |
await new Promise(resolve => setTimeout(resolve, 100)); // wait for processing | |
expect(userCache.has(1)).toBe(true); | |
}); | |
it('should not create a user if it already exists', async () => { | |
const event1: EventFromA = { id: 1, userId: 1, type: 'user.created' }; | |
const event2: EventFromA = { id: 2, userId: 1, type: 'user.created' }; | |
syncService.handleEvent(event1); | |
syncService.handleEvent(event2); | |
await new Promise(resolve => setTimeout(resolve, 500)); // wait for processing | |
expect(userCache.size).toBe(1); | |
expect(bService.users.length).toBe(1); | |
expect(logService.logs).toContain("LOG: user already exists in cache: 1, skipping..."); | |
}); | |
it('should not create a todo if it already exists', async () => { | |
const userEvent: EventFromA = { id: 1, userId: 1, type: 'user.created' }; | |
const todoEvent1: EventFromA = { id: 2, userId: 1, type: 'todo.created', todo: { id: 1, userId: 1, title: 'Todo 1', description: 'Description 1' } }; | |
const todoEvent2: EventFromA = { id: 3, userId: 1, type: 'todo.created', todo: { id: 1, userId: 1, title: 'Todo 1', description: 'Description 1' } }; | |
syncService.handleEvent(userEvent); | |
syncService.handleEvent(todoEvent1); | |
syncService.handleEvent(todoEvent2); | |
await new Promise(resolve => setTimeout(resolve, 100)); // wait for processing | |
expect(userCache.size).toBe(1); | |
expect(bService.users.length).toBe(1); | |
expect(logService.logs).toContain("ERROR: failed to process event: 3 - todo already exists: 1"); | |
expect(bService.todos.length).toBe(1); | |
}); | |
it('should create a new user if it does not exist before creating a todo', async () => { | |
const todoEvent: EventFromA = { id: 1, userId: 1, type: 'todo.created', todo: { id: 1, userId: 1, title: 'Todo 1', description: 'Description 1' } }; | |
syncService.handleEvent(todoEvent); | |
await new Promise(resolve => setTimeout(resolve, 100)); // wait for processing | |
expect(userCache.has(1)).toBe(true); | |
expect(bService.users.length).toBe(1); | |
expect(bService.todos.length).toBe(1); | |
}); | |
it('should handle multiple events concurrently', async () => { | |
const userEvent1: EventFromA = { id: 1, userId: 1, type: 'user.created' }; | |
const userEvent2: EventFromA = { id: 2, userId: 2, type: 'user.created' }; | |
const todoEvent1: EventFromA = { id: 3, userId: 1, type: 'todo.created', todo: { id: 1, userId: 1, title: 'Todo 1', description: 'Description 1' } }; | |
const todoEvent2: EventFromA = { id: 4, userId: 2, type: 'todo.created', todo: { id: 2, userId: 2, title: 'Todo 2', description: 'Description 2' } }; | |
const eventPromises = [ | |
new Promise((resolve) => setTimeout(() => resolve(syncService.handleEvent(userEvent1)), Math.random() * 1000)), | |
new Promise((resolve) => setTimeout(() => resolve(syncService.handleEvent(userEvent2)), Math.random() * 1000)), | |
new Promise((resolve) => setTimeout(() => resolve(syncService.handleEvent(todoEvent1)), Math.random() * 1000)), | |
new Promise((resolve) => setTimeout(() => resolve(syncService.handleEvent(todoEvent2)), Math.random() * 1000)), | |
]; | |
await Promise.race(eventPromises); | |
await new Promise(resolve => setTimeout(resolve, 5000)); // wait for processing | |
expect(userCache.size).toBe(2); | |
expect(bService.users.length).toBe(2); | |
expect(bService.todos.length).toBe(2); | |
}, 10000); | |
it('should retry if a network error occurs', async () => { | |
// mock a network error | |
let calls = 0; | |
bService.getUser = async () => { | |
calls++; | |
await new Promise(resolve => setTimeout(resolve, 500)); // wait for processing | |
if (calls === 1) { | |
throw new Error('network error'); | |
} | |
return { id: 1, name: 'travvy1', email: '[email protected]' } | |
}; | |
const event: EventFromA = { id: 1, userId: 1, type: 'user.created' }; | |
syncService.handleEvent(event); | |
await new Promise(resolve => setTimeout(resolve, 1000)); // wait for processing | |
expect(logService.logs).toContain("ERROR: failed to process event: 1 - network error - adding event back to queue..."); | |
await new Promise(resolve => setTimeout(resolve, 500)); // wait for processing | |
expect(userCache.size).toBe(1); | |
}); | |
it('should not retry if a non-network error occurs', async () => { | |
// mock a non-network error | |
bService.getUser = async () => { | |
throw new Error('some other error'); | |
}; | |
const event: EventFromA = { id: 1, userId: 1, type: 'user.created' }; | |
syncService.handleEvent(event); | |
await new Promise(resolve => setTimeout(resolve, 1000)); // wait for processing | |
expect(logService.logs).toContain("ERROR: failed to process event: 1 - some other error"); | |
expect(userCache.size).toBe(0); | |
}); | |
it('should process all events even if they come in at different times', async () => { | |
const user1: EventFromA = { id: 1, userId: 1, type: 'user.created' }; | |
const todo1: EventFromA = { id: 2, userId: 1, type: 'todo.created', todo: { id: 1, userId: 1, title: 'Todo 1', description: 'Description 1' } }; | |
const user2: EventFromA = { id: 3, userId: 2, type: 'user.created' }; | |
const todo2: EventFromA = { id: 4, userId: 2, type: 'todo.created', todo: { id: 2, userId: 2, title: 'Todo 2', description: 'Description 2' } }; | |
setTimeout(() => { | |
syncService.handleEvent(user2); | |
syncService.handleEvent(todo2); | |
}, 2000); | |
syncService.handleEvent(user1); | |
syncService.handleEvent(todo1); | |
await new Promise(resolve => setTimeout(resolve, 3000)); // wait for processing | |
expect(queue.length).toBe(0); // all events should be processed | |
expect(userCache.size).toBe(2); // both users should be created | |
}, 10000); | |
it('should process all events independently', async () => { | |
const event1: EventFromA = { id: 1, userId: 1, type: 'user.created' }; | |
const event2: EventFromA = { id: 2, userId: 2, type: 'user.created' }; | |
syncService.handleEvent(event1); | |
syncService.handleEvent(event2); | |
// wait some time to simulate waiting for another event | |
await new Promise(resolve => setTimeout(resolve, 5000)); | |
expect(userCache.has(1)).toBe(true); | |
expect(userCache.has(2)).toBe(true); | |
const event3: EventFromA = { id: 3, userId: 3, type: 'user.created' }; | |
syncService.handleEvent(event3); | |
// wait for the third event to be processed | |
await new Promise(resolve => setTimeout(resolve, 1000)); | |
expect(userCache.has(3)).toBe(true); | |
}, 10000); | |
}); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import type { | |
EventFromA, | |
EventId, | |
UserId, | |
IBService, | |
IQueueService, | |
ICacheService, | |
ILogService, | |
IMonitorService, | |
} from './types'; | |
export class SyncService { | |
private processing: Promise<void> = Promise.resolve(); | |
constructor( | |
private bService: IBService, | |
private monitorService: IMonitorService, | |
private logService: ILogService, | |
private userCache: ICacheService<UserId>, | |
private idempotencyCache: ICacheService<EventId>, | |
private eventQueue: IQueueService<EventFromA>, | |
) { | |
this.monitor() | |
} | |
async handleEvent(event: EventFromA) { | |
this.logService.log(`received event: ${event.type}`); | |
this.eventQueue.push(event); | |
this.processing = this.processing | |
.then(() => this.processQueue()) | |
.catch((error) => { | |
let msg = error instanceof Error ? error.message : 'unknown error'; | |
return this.logService.error(`failed to process queue - ${msg}`); | |
}) | |
.finally(() => this.processing = Promise.resolve()); | |
} | |
private async processQueue() { | |
if (this.eventQueue.length === 0) { | |
this.logService.error('the queue is empty, this means we have a race condition'); | |
} | |
while (this.eventQueue.length > 0) { | |
const event = this.eventQueue.shift()!; | |
if (this.idempotencyCache.has(event.id)) { | |
this.logService.log(`event already processed: ${event.id}`); | |
continue; | |
} | |
this.idempotencyCache.add(event.id); | |
this.logService.log(`processing event: ${event.id}`); | |
await this.process(event) | |
.catch(error => { | |
if (!(error instanceof Error)) throw error; | |
let message = `failed to process event: ${event.id} - ${error.message}`; | |
// retry if matches our retry policy | |
if (this.shouldRetry(error)) { | |
this.eventQueue.push(event); | |
this.idempotencyCache.delete(event.id); | |
message += ' - adding event back to queue...'; | |
} | |
this.logService.error(message); | |
}); | |
} | |
} | |
private async process(event: EventFromA) { | |
switch (event.type) { | |
case 'user.created': | |
await this.ensureUserExists(event.userId); | |
break; | |
case 'todo.created': | |
if (!event.todo) throw new Error(`missing todo: ${event.id}`); | |
await this.ensureUserExists(event.userId); | |
await this.bService.createTodo(event.todo) | |
.then(() => this.logService.log(`todo created: ${event.todo.id}`)); | |
break; | |
default: | |
const invalidEvent = event; | |
throw new Error(`unknown event type: ${(invalidEvent as any).type}`); | |
} | |
} | |
private async ensureUserExists(userId: UserId) { | |
if (this.userCache.has(userId)) { | |
this.logService.log(`user already exists in cache: ${userId}, skipping...`); | |
return; | |
} | |
const user = await this.bService.getUser(userId); | |
if (user) { | |
this.userCache.add(userId); | |
this.logService.log(`user already exists in Service B: ${userId}, skipping...`); | |
return; | |
} | |
this.logService.log(`user does not exist in Service B: ${userId}, creating...`); | |
await this.bService.createUser(userId) | |
.then(() => { | |
this.userCache.add(userId); | |
this.logService.log(`user created: ${userId}`); | |
}) | |
.catch(error => { | |
if (!(error instanceof Error)) throw error; | |
this.logService.error(`failed to create user: ${userId} - ${error.message}`); | |
}); | |
} | |
private monitor() { | |
this.monitorService.monitor(() => { | |
this.logService.log(`queue length: ${this.eventQueue.length}`); | |
this.logService.log(`user cache size: ${this.userCache.size}`); | |
this.logService.log(`idempotency keys size: ${this.idempotencyCache.size}`); | |
}, 60000); // monitor every minute | |
} | |
shouldRetry(error: Error) { | |
return error.message.includes('network error'); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export interface IMonitorService { | |
monitor(...args: any[]): any; | |
} | |
export interface ILogService { | |
log(...messages: any[]): Promise<void> | void; | |
error(...messages: any[]): Promise<void> | void; | |
} | |
export interface IBService { | |
getUser(userId: UserId): Promise<User>; | |
createUser(userId: UserId): Promise<User>; | |
createTodo(todo: Todo): Promise<Todo>; | |
} | |
export interface ICacheService<T> { | |
add(key: T): void; | |
has(key: T): boolean; | |
delete(key: T): void; | |
size: number; | |
} | |
export interface IQueueService<T> { | |
push(item: T): void; | |
shift(): T | undefined; | |
length: number; | |
} | |
export type UserId = number; | |
export type TodoId = number; | |
export type EventId = number; | |
export type User = { | |
id: UserId; | |
name: string; | |
email: string; | |
} | null; | |
export type Todo = { | |
id: TodoId; | |
userId: UserId; | |
title: string; | |
description: string; | |
} | |
export type BaseEvent<T> = { | |
id: EventId; | |
userId: UserId; | |
type: T; | |
}; | |
export type EventFromA = | |
| BaseEvent<'user.created'> | |
| BaseEvent<'todo.created'> & { todo: Todo }; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment