From daa53208d0684fac12c88a8b655c14be896f2867 Mon Sep 17 00:00:00 2001 From: Halit Aksoy Date: Sat, 16 Jul 2022 18:46:04 +0300 Subject: [PATCH] add ping pong messaging --- src/rtmt/rtmt_websocket.ts | 60 +++++++++++++++++++++++++++++++++----- 1 file changed, 52 insertions(+), 8 deletions(-) diff --git a/src/rtmt/rtmt_websocket.ts b/src/rtmt/rtmt_websocket.ts index d72749c..46e58fd 100644 --- a/src/rtmt/rtmt_websocket.ts +++ b/src/rtmt/rtmt_websocket.ts @@ -3,6 +3,9 @@ import { Bytes, OnMessage, RTMT } from "./rtmt" import WebSocket from "ws" import * as http from 'http'; +import { channel_ping, channel_pong } from "../consts/channel_names"; + +const PING_INTERVAL = 1000; export class RTMTWS implements RTMT { @@ -12,23 +15,24 @@ export class RTMTWS implements RTMT { public clients = new Map() + private pingInterval?: NodeJS.Timeout = undefined; + constructor() { this.messageChannels = new Map() this.wsServer = null } - initWebSocket(server: http.Server, onopen: (userKey : string) => any) { + initWebSocket(server: http.Server, onopen: (userKey: string) => any) { const wsServer = new WebSocket.Server({ server }) this.wsServer = wsServer this.clients = new Map() wsServer.on("connection", (ws: WebSocket, req: Request) => { - + console.log("Client Connected"); const regexResult = req.url.split(RegExp("\/\?userKey=")); const clientID = regexResult[1] this.clients.set(clientID, ws) ws.on("message", (messageBytes: string) => { - console.log('received: %s', messageBytes); this.onWebSocketMessage(clientID, messageBytes) }) @@ -42,18 +46,21 @@ export class RTMTWS implements RTMT { this.clients.delete(clientID) }) + this.setWebSocketAsAlive(ws); onopen(clientID) }) + this.startPingInterval(wsServer); + } + dispose() { + this.stopPingInterval(); } sendMessage(clientID: string, channel: string, message: Object) { if (this.wsServer) { const client = this.clients.get(clientID) if (client) { - const data = encode({ channel: channel, message: message }) - console.log("(RTMT) Sending message to channel " + channel); - client.send(data) + this.sendMessageInternal(client, channel, message); } else { console.log("Client not connected!"); } @@ -62,19 +69,29 @@ export class RTMTWS implements RTMT { } } + sendMessageInternal(ws: WebSocket, channel: string, message: Object) { + const data = encode({ channel: channel, message: message }) + ws.send(data) + } + listenMessage(channel: string, callback: OnMessage) { this.messageChannels.set(channel, callback) } onWebSocketMessage(clientID: string, messageBytes: string) { const message = decode(messageBytes) - console.log(message); - this.onMessage(clientID, message.channel, message.message) } onMessage(clientID: string, channel: string, message: Object) { + if (channel === channel_pong) { + const ws = this.clients.get(clientID); + if (ws) { + this.heartbeat(ws); + } + return; + } const callback = this.messageChannels.get(channel) if (callback) { callback(clientID, message) @@ -82,4 +99,31 @@ export class RTMTWS implements RTMT { console.log("Channel callback not found! " + channel); } } + + startPingInterval(wsServer: WebSocket.Server) { + this.stopPingInterval(); + this.pingInterval = setInterval(() => { + const sendMessageInternal = this.sendMessageInternal; + wsServer.clients.forEach(function each(ws) { + //@ts-ignore + if (ws.isAlive === false) return ws.terminate(); + //@ts-ignore + ws.isAlive = false; + sendMessageInternal(ws, channel_ping, {}); + }); + }, PING_INTERVAL); + } + + stopPingInterval() { + if (this.pingInterval) clearInterval(this.pingInterval); + } + + heartbeat(ws: WebSocket) { + this.setWebSocketAsAlive(ws); + } + + setWebSocketAsAlive(ws: WebSocket) { + //@ts-ignore + ws.isAlive = true; + } } \ No newline at end of file