Skip to content

Instantly share code, notes, and snippets.

@geggleto
Created October 21, 2024 03:52
Show Gist options
  • Save geggleto/bd6bbc17d16477c4994cab6a24a9d891 to your computer and use it in GitHub Desktop.
Save geggleto/bd6bbc17d16477c4994cab6a24a9d891 to your computer and use it in GitHub Desktop.

Query Bus Pattern in CQRS

Overview

CQRS (Command Query Responsibility Segregation) is an architectural pattern that separates the responsibilities of commands (which change the state of the system) and queries (which retrieve data). The Query Bus pattern is a part of CQRS and specifically handles queries.

How It Works

The Query Bus pattern consists of:

  • Query: A request for information that does not alter the state.
  • Query Handler: A component designed to handle a specific type of query. Each query type has a corresponding handler.
  • Query Bus: The mediator that routes queries to their appropriate handlers.

Benefits

Modularity:

By decoupling query handling from the main application logic, the responsibilities are clearly separated. Each query and query handler can evolve independently. This results in a modular codebase where adding new queries or modifying existing ones doesn't affect other parts of the system.

Easy Code Reading:

The Query Bus pattern leads to a more organized code structure. Each query handler is dedicated to a specific type of query, making it easier to understand what each part of the code is responsible for. Developers can quickly locate the logic for handling specific queries, which improves maintainability.

Scalability:

As the number of queries grows, the system can handle complexity better because each query type has a dedicated handler. This separation also enhances testing since individual query handlers can be tested in isolation.

Decoupling:

The query bus decouples the sender of a query from its handler. This allows the application to change the way queries are handled without impacting the rest of the system.

Drawbacks

Complexity:

For simple applications, introducing the Query Bus pattern might add unnecessary complexity. The benefits really show in larger, more complex systems. Developers need to understand the pattern well to avoid misusing it and creating inefficiencies.

Performance Overhead:

The indirection caused by the query bus can introduce a slight performance overhead compared to direct method calls. However, this is usually minimal and can be mitigated with optimizations.

Infrastructure Requirement:

Proper infrastructure must be in place to manage the registration and resolution of query handlers, especially in highly dynamic or distributed environments.

Example Scenario

Let's consider an example. Suppose we have a system that needs to fetch user details. Using the Query Bus pattern:

  • UserDetailsQuery: A query object specifically requesting user details.
  • UserDetailsQueryHandler: A handler for the UserDetailsQuery that contains the logic for fetching and returning user details.
  • QueryBus: The bus that takes the UserDetailsQuery, identifies the UserDetailsQueryHandler, and dispatches the query to it. The separation makes it clear where the logic for fetching user details resides, simplifies changes, and ensures that other parts of the system remain unaffected by changes in query handling.

Conclusion

The Query Bus pattern is invaluable in a CQRS architecture, particularly for applications needing clear separation of concerns, modularity, and scalability. While it introduces some complexity and potential performance overhead, the benefits of maintainability, readability, and decoupling typically outweigh these drawbacks in medium to large applications

import RedisClient from "../persistence/clients/RedisClient";
import { EntityManager } from '@mikro-orm/core';
import { FriendModel } from '../models/Friend';
import { QueryBus } from '../infrastructure/QueryBus';
import { ListFriendsQuery } from './Queries/ListFriendsQuery';
import { ListFriendsHandler } from './Handlers/ListFriendsHandler';
import {ListIncomingRequestsQuery} from "./Queries/ListIncomingRequestsQuery";
import {ListOutgoingRequestsQuery} from "./Queries/ListOutgoingRequestsQuery";
import {ListIncomingRequestsHandler} from "./Handlers/ListIncomingRequestHandler";
import {ListOutgoingRequestsHandler} from "./Handlers/ListOutgoingRequestHandler";
const OUTGOING_REQUEST_LIST = 'friend:request:outgoingRequestList';
const INCOMING_REQUEST_LIST = 'friend:request:incomingRequestList';
const FRIEND_LIST = 'friend:list:';
export class FriendService {
constructor(
private redisClient: RedisClient,
private em: EntityManager,
private queryBus: QueryBus
) {
// Register query handlers
this.queryBus.registerHandler(ListFriendsQuery.name, new ListFriendsHandler(this.redisClient, this.em));
this.queryBus.registerHandler(ListIncomingRequestsQuery.name, new ListIncomingRequestsHandler(this.redisClient));
this.queryBus.registerHandler(ListOutgoingRequestsQuery.name, new ListOutgoingRequestsHandler(this.redisClient));
}
async sendFriendRequest(senderUid: string, receiverUid: string): Promise<void> {
await this.redisClient.client.lpush(`${OUTGOING_REQUEST_LIST}:${senderUid}`, receiverUid);
await this.redisClient.client.lpush(`${INCOMING_REQUEST_LIST}:${receiverUid}`, senderUid);
// Optional: Persist the outgoing friend request to the database
const request = this.em.create(FriendModel, {
uid: `${senderUid}-${receiverUid}`, // Generate a composite key or a unique identifier
playerUid: senderUid,
friendUid: receiverUid,
});
await this.em.persistAndFlush(request);
}
async acceptFriendRequest(receiverUid: string, senderUid: string): Promise<void> {
await this.redisClient.client.lrem(`${INCOMING_REQUEST_LIST}:${receiverUid}`, 0, senderUid);
await this.redisClient.client.lrem(`${OUTGOING_REQUEST_LIST}:${senderUid}`, 0, receiverUid);
// Add to each other's friend list
await this.redisClient.client.lpush(`${FRIEND_LIST}${receiverUid}`, senderUid);
await this.redisClient.client.lpush(`${FRIEND_LIST}${senderUid}`, receiverUid);
// Persist the accepted friend relationship to the database
const friendRelation = this.em.create(FriendModel, {
uid: `${receiverUid}-${senderUid}`, // Generate a composite key or a unique identifier
playerUid: receiverUid,
friendUid: senderUid,
});
await this.em.persistAndFlush(friendRelation);
}
async rejectFriendRequest(receiverUid: string, senderUid: string): Promise<void> {
await this.redisClient.client.lrem(`${INCOMING_REQUEST_LIST}:${receiverUid}`, 0, senderUid);
await this.redisClient.client.lrem(`${OUTGOING_REQUEST_LIST}:${senderUid}`, 0, receiverUid);
// Optional: Remove the stored outgoing friend request from the database
const request = await this.em.findOne(FriendModel, {
playerUid: senderUid,
friendUid: receiverUid,
});
if (request) {
await this.em.removeAndFlush(request);
}
}
async removeFriend(uid: string, friendUid: string): Promise<void> {
await this.redisClient.client.lrem(`${FRIEND_LIST}${uid}`, 0, friendUid);
await this.redisClient.client.lrem(`${FRIEND_LIST}${friendUid}`, 0, uid);
// Remove the friend relationship from the database
const friendRelation = await this.em.findOne(FriendModel, {
playerUid: uid,
friendUid: friendUid,
}) || await this.em.findOne(FriendModel, {
playerUid: friendUid,
friendUid: uid,
});
if (friendRelation) {
await this.em.removeAndFlush(friendRelation);
}
}
async listFriends(uid: string): Promise<string[]> {
return this.queryBus.execute(new ListFriendsQuery(uid));
}
// Update these methods to use the Query Bus
async listIncomingRequests(uid: string): Promise<string[]> {
return this.queryBus.execute(new ListIncomingRequestsQuery(uid));
}
async listOutgoingRequests(uid: string): Promise<string[]> {
return this.queryBus.execute(new ListOutgoingRequestsQuery(uid));
}
}
export interface IQuery {
// Marker interface, can include common properties if needed
}
import {IQuery} from "./IQuery";
export interface IQueryHandler<TQuery extends IQuery, TResult> {
handle(query: TQuery): Promise<TResult>;
}
import { IQueryHandler } from '../../infrastructure/IQueryHandler';
import { ListFriendsQuery } from '../Queries/ListFriendsQuery';
import RedisClient from '../../persistence/clients/RedisClient';
import { EntityManager } from '@mikro-orm/core';
import {FriendModel} from '../../models/Friend';
const FRIEND_LIST = 'friend:list:';
export class ListFriendsHandler implements IQueryHandler<ListFriendsQuery, string[]> {
constructor(
private redisClient: RedisClient,
private em: EntityManager
) {}
async handle(query: ListFriendsQuery): Promise<string[]> {
const cacheKey = `${FRIEND_LIST}${query.uid}`;
const cachedFriends = await this.redisClient.client.lrange(cacheKey, 0, -1);
if (cachedFriends.length > 0) {
// Cache hit
return cachedFriends;
} else {
// Cache miss, retrieve from database
const friends = await this.em.find(FriendModel, { playerUid: query.uid });
const friendIds = friends.map(f => f.friendUid);
// Store friends in cache
if (friendIds.length > 0) {
await this.redisClient.client.rpush(cacheKey, ...friendIds);
}
return friendIds;
}
}
}
// ListFriendsQuery.ts
import { IQuery } from '../../infrastructure/IQuery';
export class ListFriendsQuery implements IQuery {
constructor(public uid: string) {}
}
// ListIncomingRequestsHandler.ts
import { IQueryHandler } from '../../infrastructure/IQueryHandler';
import { ListIncomingRequestsQuery } from '../Queries/ListIncomingRequestsQuery';
import RedisClient from '../../persistence/clients/RedisClient';
const INCOMING_REQUEST_LIST = 'friend:request:incomingRequestList';
export class ListIncomingRequestsHandler implements IQueryHandler<ListIncomingRequestsQuery, string[]> {
constructor(private redisClient: RedisClient) {}
async handle(query: ListIncomingRequestsQuery): Promise<string[]> {
return this.redisClient.client.lrange(`${INCOMING_REQUEST_LIST}:${query.uid}`, 0, -1);
}
}
import { IQuery } from '../../infrastructure/IQuery';
export class ListIncomingRequestsQuery implements IQuery {
constructor(public uid: string) {}
}
// ListOutgoingRequestsHandler.ts
import { IQueryHandler } from '../../infrastructure/IQueryHandler';
import { ListOutgoingRequestsQuery } from '../Queries/ListOutgoingRequestsQuery';
import RedisClient from '../../persistence/clients/RedisClient';
const OUTGOING_REQUEST_LIST = 'friend:request:outgoingRequestList';
export class ListOutgoingRequestsHandler implements IQueryHandler<ListOutgoingRequestsQuery, string[]> {
constructor(private redisClient: RedisClient) {}
async handle(query: ListOutgoingRequestsQuery): Promise<string[]> {
return this.redisClient.client.lrange(`${OUTGOING_REQUEST_LIST}:${query.uid}`, 0, -1);
}
}
import { IQuery } from '../../infrastructure/IQuery';
export class ListOutgoingRequestsQuery implements IQuery {
constructor(public uid: string) {}
}
// QueryBus.ts
import { IQuery } from './IQuery';
import { IQueryHandler } from './IQueryHandler';
export class QueryBus {
private handlers = new Map<string, IQueryHandler<IQuery, any>>();
registerHandler<TQuery extends IQuery, TResult>(queryType: string, handler: IQueryHandler<TQuery, TResult>): void {
this.handlers.set(queryType, handler);
}
async execute<TQuery extends IQuery, TResult>(query: TQuery): Promise<TResult> {
const handler = this.handlers.get(query.constructor.name);
if (!handler) {
throw new Error(`No handler registered for query type: ${query.constructor.name}`);
}
return handler.handle(query);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment