import { decode, encode } from "./encode_decode_message" import { Bytes, OnMessage, RTMT } from "./rtmt" import WebSocket from "ws" import * as http from 'http'; import { channel_ping, channel_pong } from "../consts/channel_names"; const PING_INTERVAL = 1000; export class RTMTWS implements RTMT { private messageChannels: Map private wsServer: WebSocket.Server | null public clients = new Map() private pingInterval?: NodeJS.Timeout = undefined; constructor() { this.messageChannels = new Map() this.wsServer = null } initWebSocket(server: http.Server, onopen: (userKey: string) => any) { const wsServer = new WebSocket.Server({ server }) this.wsServer = wsServer this.clients = new Map() wsServer.on("connection", (ws: WebSocket, req: Request) => { console.log("Client Connected"); const regexResult = req.url.split(RegExp("\/\?userKey=")); const clientID = regexResult[1] this.clients.set(clientID, ws) ws.on("message", (messageBytes: string) => { this.onWebSocketMessage(clientID, messageBytes) }) ws.on("close", (code: number, reason: string) => { console.log("WS Closed! code : " + code + " reason : " + reason); this.clients.delete(clientID) }) ws.on("error", (err: Error) => { console.log("WS Closed with error! error : " + err.message); this.clients.delete(clientID) }) this.setWebSocketAsAlive(ws); onopen(clientID) }) this.startPingInterval(wsServer); } dispose() { this.stopPingInterval(); } sendMessage(clientID: string, channel: string, message: Object) { if (this.wsServer) { const client = this.clients.get(clientID) if (client) { this.sendMessageInternal(client, channel, message); } else { console.log("Client not connected!"); } } else { console.log('ws is undefined') } } sendMessageInternal(ws: WebSocket, channel: string, message: Object) { const data = encode({ channel: channel, message: message }) ws.send(data) } listenMessage(channel: string, callback: OnMessage) { this.messageChannels.set(channel, callback) } onWebSocketMessage(clientID: string, messageBytes: string) { const message = decode(messageBytes) this.onMessage(clientID, message.channel, message.message) } onMessage(clientID: string, channel: string, message: Object) { if (channel === channel_pong) { const ws = this.clients.get(clientID); if (ws) { this.heartbeat(ws); } return; } const callback = this.messageChannels.get(channel) if (callback) { callback(clientID, message) } else { console.log("Channel callback not found! " + channel); } } startPingInterval(wsServer: WebSocket.Server) { this.stopPingInterval(); this.pingInterval = setInterval(() => { const sendMessageInternal = this.sendMessageInternal; wsServer.clients.forEach(function each(ws) { //@ts-ignore if (ws.isAlive === false) return ws.terminate(); //@ts-ignore ws.isAlive = false; sendMessageInternal(ws, channel_ping, {}); }); }, PING_INTERVAL); } stopPingInterval() { if (this.pingInterval) clearInterval(this.pingInterval); } heartbeat(ws: WebSocket) { this.setWebSocketAsAlive(ws); } setWebSocketAsAlive(ws: WebSocket) { //@ts-ignore ws.isAlive = true; } isClientOnline(clientID: string): boolean { const ws = this.clients.has(clientID); //@ts-ignore return ws && ws.isAlive; } }