Skip to content

Instantly share code, notes, and snippets.

@adejorosam
Created December 19, 2024 17:01
Show Gist options
  • Save adejorosam/22ac4d3067c504bee33f50558c123ce9 to your computer and use it in GitHub Desktop.
Save adejorosam/22ac4d3067c504bee33f50558c123ce9 to your computer and use it in GitHub Desktop.
websocket.service.ts
import {
WebSocketGateway,
WebSocketServer,
OnGatewayConnection,
OnGatewayDisconnect,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import * as WebSocket from 'ws';
import { Injectable, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { CommonHttpService } from 'src/common/common-http.service';
import { ZanibalService } from 'src/services/zanibal/zanibal.service';
import { Cron, CronExpression } from '@nestjs/schedule';
import { Redis } from 'ioredis';
enum MessageTypes {
SessionStatus = 'SessionStatus',
SecurityList = 'SecurityList',
SecurityStats = 'SecurityStats',
TradeEvent = 'TradeEvent',
MarketNews = 'MarketNews',
LogonRequest = 'LogonRequest',
LogonResponse = 'LogonResponse',
LogoffResponse = 'LogoffResponse',
SubscriptionRequest = 'SubscriptionRequest',
SubscriptionResponse = 'SubscriptionResponse',
MarketDefinitionRequest = 'MarketDefinitionRequest',
MarketDefinitionResponse = 'MarketDefinitionResponse',
TopGainers = 'TopGainers',
TopLosers = 'TopLosers',
}
@Injectable()
@WebSocketGateway()
export class ZanbibalWebsocketGateway
implements OnGatewayConnection, OnGatewayDisconnect
{
@WebSocketServer()
server: Server;
private externalWs: WebSocket;
private apiKey: string;
private tenantId: string;
private wsUrl: string;
private isLoggedOn: boolean = false;
private logger = new Logger(ZanbibalWebsocketGateway.name);
private maxRetries = 5;
private retryCount = 0;
private ZANIBAL_BASE_URL = process.env.ZANIBAL_BASE_URL;
private allSecurityStats: Map<string, any> = new Map();
private readonly TOP_N = 10;
private isLocalEnvironment: boolean;
private redis: Redis;
private lastSecurityStatsTime: number = Date.now();
private emitInterval: NodeJS.Timeout | null = null;
private readonly INACTIVITY_THRESHOLD = 60000; // 1 minute
private readonly EMIT_INTERVAL = 5000; // 5 seconds
private readonly REDIS_KEY = 'security-stats';
private isEmittingFromRedis: boolean = false;
constructor(
private configService: ConfigService,
private readonly commonhttpService: CommonHttpService,
private readonly zanibalService: ZanibalService,
) {
this.isLocalEnvironment = process.env.NODE_ENV === 'local';
this.setupRedis();
// Only set up WebSocket if not in local environment
if (!this.isLocalEnvironment) {
this.setupExternalWebSocket();
} else {
this.logger.log(
'Running in local environment. WebSocket connections disabled.',
);
}
}
private setupRedis() {
this.redis = new Redis({
host: this.configService.get<string>('REDIS_HOST'),
port: this.configService.get<number>('REDIS_PORT'),
password: this.configService.get<string>('REDIS_PASSWORD'),
});
this.redis.on('error', (err) => {
this.logger.error('Redis connection error:', err);
});
this.redis.on('connect', () => {
this.logger.log('Connected to Redis');
});
}
async onModuleInit() {
// Load saved data from Redis
await this.loadSavedData();
// Start emit interval
this.startEmitInterval();
}
private async setupExternalWebSocket() {
const env = this.ZANIBAL_BASE_URL.split('.').slice(1).join('.');
this.tenantId = this.configService.get<string>('ZANIBAL_TENANT_ID');
this.wsUrl = `wss://${this.tenantId}.mds.${env}/ws`;
await this.fetchApiKey();
this.connectToExternalWebSocket();
}
async handleConnection(client: Socket) {
this.logger.log('Client connected');
if (this.externalWs?.readyState === WebSocket.OPEN) {
this.sendLogonRequest();
this.handleSubscriptionRequest();
}
}
handleDisconnect(client: Socket) {
this.logger.log('Client disconnected');
}
private async loadSavedData() {
try {
const savedData = await this.redis.get(this.REDIS_KEY);
if (savedData) {
const savedStats = JSON.parse(savedData);
// Only load saved data if current map is empty
if (this.allSecurityStats.size === 0) {
savedStats.forEach((stat: any) => {
this.allSecurityStats.set(stat.secId, stat);
});
this.logger.log(
`Loaded ${this.allSecurityStats.size} security stats from Redis`,
);
// Emit loaded data immediately if available
if (this.allSecurityStats.size > 0) {
this.emitSecurityStats();
}
}
}
} catch (error) {
this.logger.error(
'Failed to load saved security stats from Redis:',
error,
);
}
}
private async saveSecurityStats() {
try {
if (this.allSecurityStats.size === 0) {
return;
}
const statsArray = Array.from(this.allSecurityStats.values());
await this.redis.set(this.REDIS_KEY, JSON.stringify(statsArray));
// Optional: Set expiry if needed (set to 0 for no expiry)
await this.redis.persist(this.REDIS_KEY);
} catch (error) {
this.logger.error('Failed to save security stats to Redis:', error);
}
}
onModuleDestroy() {
this.stopEmitInterval();
// Save data one last time before shutting down
this.saveSecurityStats().catch((err) =>
this.logger.error('Failed to save security stats during shutdown:', err),
);
// Close Redis connection
this.redis.disconnect();
}
private startEmitInterval() {
this.emitInterval = setInterval(() => {
const timeSinceLastMessage = Date.now() - this.lastSecurityStatsTime;
if (
(this.isEmittingFromRedis ||
timeSinceLastMessage >= this.INACTIVITY_THRESHOLD) &&
(this.isEmittingFromRedis || this.allSecurityStats.size > 0)
) {
this.emitSecurityStats();
}
}, this.EMIT_INTERVAL);
}
private stopEmitInterval() {
if (this.emitInterval) {
clearInterval(this.emitInterval);
}
}
private async emitSecurityStats() {
let statsToEmit: any[];
// Use Redis-saved data if currently in Redis emission mode
if (this.isEmittingFromRedis) {
try {
const savedStatsJson = await this.redis.get(this.REDIS_KEY);
if (savedStatsJson) {
statsToEmit = JSON.parse(savedStatsJson);
} else {
this.logger.warn('No saved stats found in Redis');
return;
}
} catch (error) {
this.logger.error('Error retrieving stats from Redis:', error);
return;
}
} else {
// Use live data from the Map
statsToEmit = Array.from(this.allSecurityStats.values());
}
const sortedStats = statsToEmit
.filter(
(stat) =>
stat.netChgPrevDayPerc !== undefined &&
stat.netChgPrevDayPerc !== null,
)
.filter((stat) => stat.secSubType === 'EQTY')
.sort((a, b) => b.netChgPrevDayPerc - a.netChgPrevDayPerc);
const topGainers = sortedStats.slice(0, this.TOP_N);
const topLosers = sortedStats.slice(-this.TOP_N).reverse();
const messages = [
{
success: true,
type: MessageTypes.SecurityStats,
data: statsToEmit,
},
{
success: true,
text: 'Top Gainers',
type: MessageTypes.TopGainers,
data: topGainers,
},
{
success: true,
text: 'Top Losers',
type: MessageTypes.TopLosers,
data: topLosers,
},
];
messages.forEach((message) => {
this.server.emit('zanibalMessage', message);
});
}
private async updateSecurityStats(data: any) {
this.allSecurityStats.set(data.secId, data);
await this.saveSecurityStats(); // Save to Redis immediately after update
const sortedStats = Array.from(this.allSecurityStats.values())
.filter(
(stat) =>
stat.netChgPrevDayPerc !== undefined &&
stat.netChgPrevDayPerc !== null,
)
.filter((stat) => stat.secSubType === 'EQTY')
.sort((a, b) => b.netChgPrevDayPerc - a.netChgPrevDayPerc);
const topGainers = sortedStats.slice(0, this.TOP_N);
const topGainersMessage = {
success: true,
text: 'Top Gainers',
type: MessageTypes.TopGainers,
data: topGainers,
};
const topLosers = sortedStats.slice(-this.TOP_N).reverse();
const topLosersMessage = {
success: true,
text: 'Top Losers',
type: MessageTypes.TopLosers,
data: topLosers,
};
const customMessage = {
success: true,
text: '',
type: MessageTypes.SecurityStats,
data: Array.from(this.allSecurityStats.values()),
};
this.server.emit('zanibalMessage', customMessage);
this.server.emit('zanibalMessage', topGainersMessage);
this.server.emit('zanibalMessage', topLosersMessage);
}
// Rest of the methods remain the same...
// Include all other methods from the original code that weren't modified
private async fetchApiKey() {
try {
const accessToken = await this.zanibalService.getAccessToken();
const response = await this.commonhttpService.get(
`${this.ZANIBAL_BASE_URL}/administration/api/v1/tenants/market-data/api-key/${this.tenantId}`,
{
Authorization: `Bearer ${accessToken}`,
'x-tenant-id': this.tenantId,
},
);
this.apiKey = response.apiKey;
return this.apiKey;
} catch (error) {
this.logger.error('Failed to fetch API key:', error);
}
}
private connectToExternalWebSocket() {
this.externalWs = new WebSocket(this.wsUrl);
this.externalWs.on('open', () => {
this.logger.log('Connected to external WebSocket');
this.retryCount = 0;
this.sendLogonRequest();
});
this.externalWs.on('message', (data: WebSocket.Data) => {
const message = JSON.parse(data.toString());
this.handleExternalMessage(message);
});
this.externalWs.on('close', () => {
this.logger.warn('Disconnected from external WebSocket. Reconnecting...');
this.isLoggedOn = false;
this.reconnect();
});
this.externalWs.on('error', (error) => {
this.logger.error('WebSocket error:', error);
});
}
private reconnect() {
if (this.retryCount < this.maxRetries) {
this.retryCount++;
const delay = Math.pow(2, this.retryCount) * 1000;
setTimeout(() => this.connectToExternalWebSocket(), delay);
} else {
this.logger.error(
'Max retry attempts reached. Please check the connection manually.',
);
}
}
private sendLogonRequest() {
const logonRequest = {
type: MessageTypes.LogonRequest,
text: 'Logon request',
data: {
tenant: this.tenantId,
apiKey: this.apiKey,
username: 'anonymous',
},
};
this.externalWs.send(JSON.stringify(logonRequest));
}
private handleExternalMessage(message: any) {
try {
switch (message.type) {
case MessageTypes.LogonResponse:
if (message.success) {
this.logger.log('Logon successful');
this.isLoggedOn = true;
this.handleMarketDefinitionRequest();
this.handleSubscriptionRequest();
} else {
this.logger.error('Logon failed:', message.text);
}
break;
case MessageTypes.SecurityStats:
if (message.data && message.success) {
this.updateSecurityStats(message.data);
}
break;
default:
this.server.emit('zanibalMessage', message);
}
} catch (error) {
this.logger.error('Error handling external message:', error, message);
}
}
private handleMarketDefinitionRequest() {
const marketDefinitionRequest = {
type: MessageTypes.MarketDefinitionRequest,
tenant: this.tenantId,
text: 'Market structure request',
data: {
tenant: this.tenantId,
username: 'anonymous',
},
};
this.externalWs.send(JSON.stringify(marketDefinitionRequest));
}
private handleSubscriptionRequest() {
if (this.externalWs.readyState === WebSocket.OPEN && this.isLoggedOn) {
const subscriptionRequest = {
type: MessageTypes.SubscriptionRequest,
tenant: this.tenantId,
text: 'Subscription update request',
data: {
tenant: this.tenantId,
username: 'anonymous',
marketCode: 'NGX',
board: '*',
secId: '*',
},
};
this.externalWs.send(JSON.stringify(subscriptionRequest));
} else {
this.logger.warn(
'Cannot send subscription request: WebSocket not ready or not logged on',
);
}
}
public async getSecurityStats() {
return Array.from(this.allSecurityStats.values());
}
@Cron(CronExpression.EVERY_HOUR)
private handleSubscriptionRequestCron() {
this.logger.log('Running hourly subscription request');
this.handleSubscriptionRequest();
}
@Cron('0 14 * * *') // 2 PM UTC = 3 PM WAT
private async disconnectFromServer() {
this.logger.log('Scheduled disconnection at 3 PM WAT');
// Stop live WebSocket connection
if (this.externalWs && this.externalWs.readyState === WebSocket.OPEN) {
const logoffRequest = {
type: MessageTypes.LogoffResponse,
text: 'Scheduled logoff',
data: {
tenant: this.tenantId,
username: 'anonymous',
},
};
try {
this.externalWs.send(JSON.stringify(logoffRequest));
this.externalWs.close();
} catch (error) {
this.logger.error('Error during scheduled disconnection:', error);
}
this.isLoggedOn = false;
}
// Switch to Redis-based emission
this.stopEmitInterval();
this.isEmittingFromRedis = true;
// Restart interval with Redis-based emission
this.startEmitInterval();
}
@Cron('0 9 * * *') // 9 AM UTC = 10 AM WAT
private async reconnectToServer() {
this.logger.log('Scheduled reconnection at 10 AM WAT');
// Stop Redis-based emission
this.stopEmitInterval();
this.isEmittingFromRedis = false;
// Reinitialize WebSocket connection if not in local environment
if (!this.isLocalEnvironment) {
// Ensure any existing connection is closed
if (this.externalWs) {
this.externalWs.close();
}
// Restart WebSocket setup
await this.setupExternalWebSocket();
// Restart live emit interval
this.startEmitInterval();
}
}
}
import { Injectable, OnModuleInit } from '@nestjs/common';
import { CommonHttpService } from 'src/common/common-http.service';
import { ZanibalService } from 'src/services/zanibal/zanibal.service';
import { EventEmitter } from 'stream';
import * as WebSocket from 'ws';
@Injectable()
export class ZanibalWebsocketService
extends EventEmitter
implements OnModuleInit
{
externalWs: WebSocket | null = null;
private apiKey: string;
private tenantId = process.env.ZANIBAL_TENANT_ID;
private ZANIBAL_BASE_URL = process.env.ZANIBAL_BASE_URL;
constructor(
private readonly commonhttpService: CommonHttpService,
private readonly zanibalService: ZanibalService,
) {
super();
}
async onModuleInit() {
// Preload the API key when the service initializes
await this.retrieveApiKey();
}
async connectToExternalWs() {
try {
if (!this.apiKey) {
await this.retrieveApiKey();
}
const env = this.ZANIBAL_BASE_URL.split('.').slice(2).join('.');
const WS_URL = `wss://${this.tenantId}.mds.${env}/ws`;
this.externalWs = new WebSocket(WS_URL);
this.externalWs.on('open', () => {
console.log('External WebSocket connected.');
this.logonToExternalWs();
});
this.externalWs.on('message', (message) => {
console.log('Message from external WebSocket:', message);
this.emit('externalMessage', message);
});
this.externalWs.on('error', (error) => {
console.error('External WebSocket error:', error.message);
});
this.externalWs.on('close', () => {
console.log('External WebSocket connection closed.');
this.reconnectToExternalWs();
});
} catch (error) {
console.error('Error connecting to external WebSocket:', error.message);
}
}
async retrieveApiKey(): Promise<string> {
const accessToken = await this.zanibalService.getAccessToken();
const response = await this.commonhttpService.get(
`${this.ZANIBAL_BASE_URL}/administration/api/v1/tenants/market-data/api-key/${this.tenantId}`,
{
Authorization: `Bearer ${accessToken}`,
'x-tenant-id': this.tenantId,
},
);
this.apiKey = response.apiKey;
return this.apiKey;
}
private logonToExternalWs() {
const logonMessage = {
type: 'LogonRequest',
text: 'Logon request',
data: {
tenant: this.tenantId,
apiKey: this.apiKey,
username: 'anonymous',
},
};
if (this.externalWs?.readyState === WebSocket.OPEN) {
this.externalWs.send(JSON.stringify(logonMessage));
console.log('Logon request sent to external WebSocket.');
}
}
async sendMessageToExternalWs(data: any) {
if (this.externalWs?.readyState === WebSocket.OPEN) {
this.externalWs.send(JSON.stringify(data));
} else {
console.error('External WebSocket is not open. Unable to send message.');
}
}
private reconnectToExternalWs() {
setTimeout(() => {
console.log('Reconnecting to external WebSocket...');
this.connectToExternalWs();
}, 5000);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment