Created
December 19, 2024 17:01
-
-
Save adejorosam/22ac4d3067c504bee33f50558c123ce9 to your computer and use it in GitHub Desktop.
websocket.service.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { | |
WebSocketGateway, | |
WebSocketServer, | |
OnGatewayConnection, | |
OnGatewayDisconnect, | |
} from '@nestjs/websockets'; | |
import { Server, Socket } from 'socket.io'; | |
import * as WebSocket from 'ws'; | |
import { Injectable, Logger } from '@nestjs/common'; | |
import { ConfigService } from '@nestjs/config'; | |
import { CommonHttpService } from 'src/common/common-http.service'; | |
import { ZanibalService } from 'src/services/zanibal/zanibal.service'; | |
import { Cron, CronExpression } from '@nestjs/schedule'; | |
import { Redis } from 'ioredis'; | |
enum MessageTypes { | |
SessionStatus = 'SessionStatus', | |
SecurityList = 'SecurityList', | |
SecurityStats = 'SecurityStats', | |
TradeEvent = 'TradeEvent', | |
MarketNews = 'MarketNews', | |
LogonRequest = 'LogonRequest', | |
LogonResponse = 'LogonResponse', | |
LogoffResponse = 'LogoffResponse', | |
SubscriptionRequest = 'SubscriptionRequest', | |
SubscriptionResponse = 'SubscriptionResponse', | |
MarketDefinitionRequest = 'MarketDefinitionRequest', | |
MarketDefinitionResponse = 'MarketDefinitionResponse', | |
TopGainers = 'TopGainers', | |
TopLosers = 'TopLosers', | |
} | |
@Injectable() | |
@WebSocketGateway() | |
export class ZanbibalWebsocketGateway | |
implements OnGatewayConnection, OnGatewayDisconnect | |
{ | |
@WebSocketServer() | |
server: Server; | |
private externalWs: WebSocket; | |
private apiKey: string; | |
private tenantId: string; | |
private wsUrl: string; | |
private isLoggedOn: boolean = false; | |
private logger = new Logger(ZanbibalWebsocketGateway.name); | |
private maxRetries = 5; | |
private retryCount = 0; | |
private ZANIBAL_BASE_URL = process.env.ZANIBAL_BASE_URL; | |
private allSecurityStats: Map<string, any> = new Map(); | |
private readonly TOP_N = 10; | |
private isLocalEnvironment: boolean; | |
private redis: Redis; | |
private lastSecurityStatsTime: number = Date.now(); | |
private emitInterval: NodeJS.Timeout | null = null; | |
private readonly INACTIVITY_THRESHOLD = 60000; // 1 minute | |
private readonly EMIT_INTERVAL = 5000; // 5 seconds | |
private readonly REDIS_KEY = 'security-stats'; | |
private isEmittingFromRedis: boolean = false; | |
constructor( | |
private configService: ConfigService, | |
private readonly commonhttpService: CommonHttpService, | |
private readonly zanibalService: ZanibalService, | |
) { | |
this.isLocalEnvironment = process.env.NODE_ENV === 'local'; | |
this.setupRedis(); | |
// Only set up WebSocket if not in local environment | |
if (!this.isLocalEnvironment) { | |
this.setupExternalWebSocket(); | |
} else { | |
this.logger.log( | |
'Running in local environment. WebSocket connections disabled.', | |
); | |
} | |
} | |
private setupRedis() { | |
this.redis = new Redis({ | |
host: this.configService.get<string>('REDIS_HOST'), | |
port: this.configService.get<number>('REDIS_PORT'), | |
password: this.configService.get<string>('REDIS_PASSWORD'), | |
}); | |
this.redis.on('error', (err) => { | |
this.logger.error('Redis connection error:', err); | |
}); | |
this.redis.on('connect', () => { | |
this.logger.log('Connected to Redis'); | |
}); | |
} | |
async onModuleInit() { | |
// Load saved data from Redis | |
await this.loadSavedData(); | |
// Start emit interval | |
this.startEmitInterval(); | |
} | |
private async setupExternalWebSocket() { | |
const env = this.ZANIBAL_BASE_URL.split('.').slice(1).join('.'); | |
this.tenantId = this.configService.get<string>('ZANIBAL_TENANT_ID'); | |
this.wsUrl = `wss://${this.tenantId}.mds.${env}/ws`; | |
await this.fetchApiKey(); | |
this.connectToExternalWebSocket(); | |
} | |
async handleConnection(client: Socket) { | |
this.logger.log('Client connected'); | |
if (this.externalWs?.readyState === WebSocket.OPEN) { | |
this.sendLogonRequest(); | |
this.handleSubscriptionRequest(); | |
} | |
} | |
handleDisconnect(client: Socket) { | |
this.logger.log('Client disconnected'); | |
} | |
private async loadSavedData() { | |
try { | |
const savedData = await this.redis.get(this.REDIS_KEY); | |
if (savedData) { | |
const savedStats = JSON.parse(savedData); | |
// Only load saved data if current map is empty | |
if (this.allSecurityStats.size === 0) { | |
savedStats.forEach((stat: any) => { | |
this.allSecurityStats.set(stat.secId, stat); | |
}); | |
this.logger.log( | |
`Loaded ${this.allSecurityStats.size} security stats from Redis`, | |
); | |
// Emit loaded data immediately if available | |
if (this.allSecurityStats.size > 0) { | |
this.emitSecurityStats(); | |
} | |
} | |
} | |
} catch (error) { | |
this.logger.error( | |
'Failed to load saved security stats from Redis:', | |
error, | |
); | |
} | |
} | |
private async saveSecurityStats() { | |
try { | |
if (this.allSecurityStats.size === 0) { | |
return; | |
} | |
const statsArray = Array.from(this.allSecurityStats.values()); | |
await this.redis.set(this.REDIS_KEY, JSON.stringify(statsArray)); | |
// Optional: Set expiry if needed (set to 0 for no expiry) | |
await this.redis.persist(this.REDIS_KEY); | |
} catch (error) { | |
this.logger.error('Failed to save security stats to Redis:', error); | |
} | |
} | |
onModuleDestroy() { | |
this.stopEmitInterval(); | |
// Save data one last time before shutting down | |
this.saveSecurityStats().catch((err) => | |
this.logger.error('Failed to save security stats during shutdown:', err), | |
); | |
// Close Redis connection | |
this.redis.disconnect(); | |
} | |
private startEmitInterval() { | |
this.emitInterval = setInterval(() => { | |
const timeSinceLastMessage = Date.now() - this.lastSecurityStatsTime; | |
if ( | |
(this.isEmittingFromRedis || | |
timeSinceLastMessage >= this.INACTIVITY_THRESHOLD) && | |
(this.isEmittingFromRedis || this.allSecurityStats.size > 0) | |
) { | |
this.emitSecurityStats(); | |
} | |
}, this.EMIT_INTERVAL); | |
} | |
private stopEmitInterval() { | |
if (this.emitInterval) { | |
clearInterval(this.emitInterval); | |
} | |
} | |
private async emitSecurityStats() { | |
let statsToEmit: any[]; | |
// Use Redis-saved data if currently in Redis emission mode | |
if (this.isEmittingFromRedis) { | |
try { | |
const savedStatsJson = await this.redis.get(this.REDIS_KEY); | |
if (savedStatsJson) { | |
statsToEmit = JSON.parse(savedStatsJson); | |
} else { | |
this.logger.warn('No saved stats found in Redis'); | |
return; | |
} | |
} catch (error) { | |
this.logger.error('Error retrieving stats from Redis:', error); | |
return; | |
} | |
} else { | |
// Use live data from the Map | |
statsToEmit = Array.from(this.allSecurityStats.values()); | |
} | |
const sortedStats = statsToEmit | |
.filter( | |
(stat) => | |
stat.netChgPrevDayPerc !== undefined && | |
stat.netChgPrevDayPerc !== null, | |
) | |
.filter((stat) => stat.secSubType === 'EQTY') | |
.sort((a, b) => b.netChgPrevDayPerc - a.netChgPrevDayPerc); | |
const topGainers = sortedStats.slice(0, this.TOP_N); | |
const topLosers = sortedStats.slice(-this.TOP_N).reverse(); | |
const messages = [ | |
{ | |
success: true, | |
type: MessageTypes.SecurityStats, | |
data: statsToEmit, | |
}, | |
{ | |
success: true, | |
text: 'Top Gainers', | |
type: MessageTypes.TopGainers, | |
data: topGainers, | |
}, | |
{ | |
success: true, | |
text: 'Top Losers', | |
type: MessageTypes.TopLosers, | |
data: topLosers, | |
}, | |
]; | |
messages.forEach((message) => { | |
this.server.emit('zanibalMessage', message); | |
}); | |
} | |
private async updateSecurityStats(data: any) { | |
this.allSecurityStats.set(data.secId, data); | |
await this.saveSecurityStats(); // Save to Redis immediately after update | |
const sortedStats = Array.from(this.allSecurityStats.values()) | |
.filter( | |
(stat) => | |
stat.netChgPrevDayPerc !== undefined && | |
stat.netChgPrevDayPerc !== null, | |
) | |
.filter((stat) => stat.secSubType === 'EQTY') | |
.sort((a, b) => b.netChgPrevDayPerc - a.netChgPrevDayPerc); | |
const topGainers = sortedStats.slice(0, this.TOP_N); | |
const topGainersMessage = { | |
success: true, | |
text: 'Top Gainers', | |
type: MessageTypes.TopGainers, | |
data: topGainers, | |
}; | |
const topLosers = sortedStats.slice(-this.TOP_N).reverse(); | |
const topLosersMessage = { | |
success: true, | |
text: 'Top Losers', | |
type: MessageTypes.TopLosers, | |
data: topLosers, | |
}; | |
const customMessage = { | |
success: true, | |
text: '', | |
type: MessageTypes.SecurityStats, | |
data: Array.from(this.allSecurityStats.values()), | |
}; | |
this.server.emit('zanibalMessage', customMessage); | |
this.server.emit('zanibalMessage', topGainersMessage); | |
this.server.emit('zanibalMessage', topLosersMessage); | |
} | |
// Rest of the methods remain the same... | |
// Include all other methods from the original code that weren't modified | |
private async fetchApiKey() { | |
try { | |
const accessToken = await this.zanibalService.getAccessToken(); | |
const response = await this.commonhttpService.get( | |
`${this.ZANIBAL_BASE_URL}/administration/api/v1/tenants/market-data/api-key/${this.tenantId}`, | |
{ | |
Authorization: `Bearer ${accessToken}`, | |
'x-tenant-id': this.tenantId, | |
}, | |
); | |
this.apiKey = response.apiKey; | |
return this.apiKey; | |
} catch (error) { | |
this.logger.error('Failed to fetch API key:', error); | |
} | |
} | |
private connectToExternalWebSocket() { | |
this.externalWs = new WebSocket(this.wsUrl); | |
this.externalWs.on('open', () => { | |
this.logger.log('Connected to external WebSocket'); | |
this.retryCount = 0; | |
this.sendLogonRequest(); | |
}); | |
this.externalWs.on('message', (data: WebSocket.Data) => { | |
const message = JSON.parse(data.toString()); | |
this.handleExternalMessage(message); | |
}); | |
this.externalWs.on('close', () => { | |
this.logger.warn('Disconnected from external WebSocket. Reconnecting...'); | |
this.isLoggedOn = false; | |
this.reconnect(); | |
}); | |
this.externalWs.on('error', (error) => { | |
this.logger.error('WebSocket error:', error); | |
}); | |
} | |
private reconnect() { | |
if (this.retryCount < this.maxRetries) { | |
this.retryCount++; | |
const delay = Math.pow(2, this.retryCount) * 1000; | |
setTimeout(() => this.connectToExternalWebSocket(), delay); | |
} else { | |
this.logger.error( | |
'Max retry attempts reached. Please check the connection manually.', | |
); | |
} | |
} | |
private sendLogonRequest() { | |
const logonRequest = { | |
type: MessageTypes.LogonRequest, | |
text: 'Logon request', | |
data: { | |
tenant: this.tenantId, | |
apiKey: this.apiKey, | |
username: 'anonymous', | |
}, | |
}; | |
this.externalWs.send(JSON.stringify(logonRequest)); | |
} | |
private handleExternalMessage(message: any) { | |
try { | |
switch (message.type) { | |
case MessageTypes.LogonResponse: | |
if (message.success) { | |
this.logger.log('Logon successful'); | |
this.isLoggedOn = true; | |
this.handleMarketDefinitionRequest(); | |
this.handleSubscriptionRequest(); | |
} else { | |
this.logger.error('Logon failed:', message.text); | |
} | |
break; | |
case MessageTypes.SecurityStats: | |
if (message.data && message.success) { | |
this.updateSecurityStats(message.data); | |
} | |
break; | |
default: | |
this.server.emit('zanibalMessage', message); | |
} | |
} catch (error) { | |
this.logger.error('Error handling external message:', error, message); | |
} | |
} | |
private handleMarketDefinitionRequest() { | |
const marketDefinitionRequest = { | |
type: MessageTypes.MarketDefinitionRequest, | |
tenant: this.tenantId, | |
text: 'Market structure request', | |
data: { | |
tenant: this.tenantId, | |
username: 'anonymous', | |
}, | |
}; | |
this.externalWs.send(JSON.stringify(marketDefinitionRequest)); | |
} | |
private handleSubscriptionRequest() { | |
if (this.externalWs.readyState === WebSocket.OPEN && this.isLoggedOn) { | |
const subscriptionRequest = { | |
type: MessageTypes.SubscriptionRequest, | |
tenant: this.tenantId, | |
text: 'Subscription update request', | |
data: { | |
tenant: this.tenantId, | |
username: 'anonymous', | |
marketCode: 'NGX', | |
board: '*', | |
secId: '*', | |
}, | |
}; | |
this.externalWs.send(JSON.stringify(subscriptionRequest)); | |
} else { | |
this.logger.warn( | |
'Cannot send subscription request: WebSocket not ready or not logged on', | |
); | |
} | |
} | |
public async getSecurityStats() { | |
return Array.from(this.allSecurityStats.values()); | |
} | |
@Cron(CronExpression.EVERY_HOUR) | |
private handleSubscriptionRequestCron() { | |
this.logger.log('Running hourly subscription request'); | |
this.handleSubscriptionRequest(); | |
} | |
@Cron('0 14 * * *') // 2 PM UTC = 3 PM WAT | |
private async disconnectFromServer() { | |
this.logger.log('Scheduled disconnection at 3 PM WAT'); | |
// Stop live WebSocket connection | |
if (this.externalWs && this.externalWs.readyState === WebSocket.OPEN) { | |
const logoffRequest = { | |
type: MessageTypes.LogoffResponse, | |
text: 'Scheduled logoff', | |
data: { | |
tenant: this.tenantId, | |
username: 'anonymous', | |
}, | |
}; | |
try { | |
this.externalWs.send(JSON.stringify(logoffRequest)); | |
this.externalWs.close(); | |
} catch (error) { | |
this.logger.error('Error during scheduled disconnection:', error); | |
} | |
this.isLoggedOn = false; | |
} | |
// Switch to Redis-based emission | |
this.stopEmitInterval(); | |
this.isEmittingFromRedis = true; | |
// Restart interval with Redis-based emission | |
this.startEmitInterval(); | |
} | |
@Cron('0 9 * * *') // 9 AM UTC = 10 AM WAT | |
private async reconnectToServer() { | |
this.logger.log('Scheduled reconnection at 10 AM WAT'); | |
// Stop Redis-based emission | |
this.stopEmitInterval(); | |
this.isEmittingFromRedis = false; | |
// Reinitialize WebSocket connection if not in local environment | |
if (!this.isLocalEnvironment) { | |
// Ensure any existing connection is closed | |
if (this.externalWs) { | |
this.externalWs.close(); | |
} | |
// Restart WebSocket setup | |
await this.setupExternalWebSocket(); | |
// Restart live emit interval | |
this.startEmitInterval(); | |
} | |
} | |
} | |
import { Injectable, OnModuleInit } from '@nestjs/common'; | |
import { CommonHttpService } from 'src/common/common-http.service'; | |
import { ZanibalService } from 'src/services/zanibal/zanibal.service'; | |
import { EventEmitter } from 'stream'; | |
import * as WebSocket from 'ws'; | |
@Injectable() | |
export class ZanibalWebsocketService | |
extends EventEmitter | |
implements OnModuleInit | |
{ | |
externalWs: WebSocket | null = null; | |
private apiKey: string; | |
private tenantId = process.env.ZANIBAL_TENANT_ID; | |
private ZANIBAL_BASE_URL = process.env.ZANIBAL_BASE_URL; | |
constructor( | |
private readonly commonhttpService: CommonHttpService, | |
private readonly zanibalService: ZanibalService, | |
) { | |
super(); | |
} | |
async onModuleInit() { | |
// Preload the API key when the service initializes | |
await this.retrieveApiKey(); | |
} | |
async connectToExternalWs() { | |
try { | |
if (!this.apiKey) { | |
await this.retrieveApiKey(); | |
} | |
const env = this.ZANIBAL_BASE_URL.split('.').slice(2).join('.'); | |
const WS_URL = `wss://${this.tenantId}.mds.${env}/ws`; | |
this.externalWs = new WebSocket(WS_URL); | |
this.externalWs.on('open', () => { | |
console.log('External WebSocket connected.'); | |
this.logonToExternalWs(); | |
}); | |
this.externalWs.on('message', (message) => { | |
console.log('Message from external WebSocket:', message); | |
this.emit('externalMessage', message); | |
}); | |
this.externalWs.on('error', (error) => { | |
console.error('External WebSocket error:', error.message); | |
}); | |
this.externalWs.on('close', () => { | |
console.log('External WebSocket connection closed.'); | |
this.reconnectToExternalWs(); | |
}); | |
} catch (error) { | |
console.error('Error connecting to external WebSocket:', error.message); | |
} | |
} | |
async retrieveApiKey(): Promise<string> { | |
const accessToken = await this.zanibalService.getAccessToken(); | |
const response = await this.commonhttpService.get( | |
`${this.ZANIBAL_BASE_URL}/administration/api/v1/tenants/market-data/api-key/${this.tenantId}`, | |
{ | |
Authorization: `Bearer ${accessToken}`, | |
'x-tenant-id': this.tenantId, | |
}, | |
); | |
this.apiKey = response.apiKey; | |
return this.apiKey; | |
} | |
private logonToExternalWs() { | |
const logonMessage = { | |
type: 'LogonRequest', | |
text: 'Logon request', | |
data: { | |
tenant: this.tenantId, | |
apiKey: this.apiKey, | |
username: 'anonymous', | |
}, | |
}; | |
if (this.externalWs?.readyState === WebSocket.OPEN) { | |
this.externalWs.send(JSON.stringify(logonMessage)); | |
console.log('Logon request sent to external WebSocket.'); | |
} | |
} | |
async sendMessageToExternalWs(data: any) { | |
if (this.externalWs?.readyState === WebSocket.OPEN) { | |
this.externalWs.send(JSON.stringify(data)); | |
} else { | |
console.error('External WebSocket is not open. Unable to send message.'); | |
} | |
} | |
private reconnectToExternalWs() { | |
setTimeout(() => { | |
console.log('Reconnecting to external WebSocket...'); | |
this.connectToExternalWs(); | |
}, 5000); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment