Merge pull request #7 from jhalitaksoy/feature/ping-pong

Feature/ping pong
This commit is contained in:
Halit Aksoy 2022-07-16 18:49:38 +03:00 committed by GitHub
commit dab02a247f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 55 additions and 9 deletions

View File

@ -5,4 +5,6 @@ export const channel_on_game_update = "on_game_update"
export const channel_leave_game = "leave_game" export const channel_leave_game = "leave_game"
export const channel_on_game_end = "on_game_end" export const channel_on_game_end = "on_game_end"
export const channel_on_game_crashed = "on_game_crashed" export const channel_on_game_crashed = "on_game_crashed"
export const channel_on_game_user_leave = "on_game_user_leave" export const channel_on_game_user_leave = "on_game_user_leave"
export const channel_ping = "ping"
export const channel_pong = "pong"

View File

@ -3,6 +3,9 @@ import { Bytes, OnMessage, RTMT } from "./rtmt"
import WebSocket from "ws" import WebSocket from "ws"
import * as http from 'http'; import * as http from 'http';
import { channel_ping, channel_pong } from "../consts/channel_names";
const PING_INTERVAL = 1000;
export class RTMTWS implements RTMT { export class RTMTWS implements RTMT {
@ -12,23 +15,24 @@ export class RTMTWS implements RTMT {
public clients = new Map<string, WebSocket>() public clients = new Map<string, WebSocket>()
private pingInterval?: NodeJS.Timeout = undefined;
constructor() { constructor() {
this.messageChannels = new Map<String, OnMessage>() this.messageChannels = new Map<String, OnMessage>()
this.wsServer = null this.wsServer = null
} }
initWebSocket(server: http.Server, onopen: (userKey : string) => any) { initWebSocket(server: http.Server, onopen: (userKey: string) => any) {
const wsServer = new WebSocket.Server({ server }) const wsServer = new WebSocket.Server({ server })
this.wsServer = wsServer this.wsServer = wsServer
this.clients = new Map<string, WebSocket>() this.clients = new Map<string, WebSocket>()
wsServer.on("connection", (ws: WebSocket, req: Request) => { wsServer.on("connection", (ws: WebSocket, req: Request) => {
console.log("Client Connected");
const regexResult = req.url.split(RegExp("\/\?userKey=")); const regexResult = req.url.split(RegExp("\/\?userKey="));
const clientID = regexResult[1] const clientID = regexResult[1]
this.clients.set(clientID, ws) this.clients.set(clientID, ws)
ws.on("message", (messageBytes: string) => { ws.on("message", (messageBytes: string) => {
console.log('received: %s', messageBytes);
this.onWebSocketMessage(clientID, messageBytes) this.onWebSocketMessage(clientID, messageBytes)
}) })
@ -42,18 +46,21 @@ export class RTMTWS implements RTMT {
this.clients.delete(clientID) this.clients.delete(clientID)
}) })
this.setWebSocketAsAlive(ws);
onopen(clientID) onopen(clientID)
}) })
this.startPingInterval(wsServer);
}
dispose() {
this.stopPingInterval();
} }
sendMessage(clientID: string, channel: string, message: Object) { sendMessage(clientID: string, channel: string, message: Object) {
if (this.wsServer) { if (this.wsServer) {
const client = this.clients.get(clientID) const client = this.clients.get(clientID)
if (client) { if (client) {
const data = encode({ channel: channel, message: message }) this.sendMessageInternal(client, channel, message);
console.log("(RTMT) Sending message to channel " + channel);
client.send(data)
} else { } else {
console.log("Client not connected!"); console.log("Client not connected!");
} }
@ -62,19 +69,29 @@ export class RTMTWS implements RTMT {
} }
} }
sendMessageInternal(ws: WebSocket, channel: string, message: Object) {
const data = encode({ channel: channel, message: message })
ws.send(data)
}
listenMessage(channel: string, callback: OnMessage) { listenMessage(channel: string, callback: OnMessage) {
this.messageChannels.set(channel, callback) this.messageChannels.set(channel, callback)
} }
onWebSocketMessage(clientID: string, messageBytes: string) { onWebSocketMessage(clientID: string, messageBytes: string) {
const message = decode(messageBytes) const message = decode(messageBytes)
console.log(message);
this.onMessage(clientID, message.channel, message.message) this.onMessage(clientID, message.channel, message.message)
} }
onMessage(clientID: string, channel: string, message: Object) { onMessage(clientID: string, channel: string, message: Object) {
if (channel === channel_pong) {
const ws = this.clients.get(clientID);
if (ws) {
this.heartbeat(ws);
}
return;
}
const callback = this.messageChannels.get(channel) const callback = this.messageChannels.get(channel)
if (callback) { if (callback) {
callback(clientID, message) callback(clientID, message)
@ -82,4 +99,31 @@ export class RTMTWS implements RTMT {
console.log("Channel callback not found! " + channel); console.log("Channel callback not found! " + channel);
} }
} }
startPingInterval(wsServer: WebSocket.Server) {
this.stopPingInterval();
this.pingInterval = setInterval(() => {
const sendMessageInternal = this.sendMessageInternal;
wsServer.clients.forEach(function each(ws) {
//@ts-ignore
if (ws.isAlive === false) return ws.terminate();
//@ts-ignore
ws.isAlive = false;
sendMessageInternal(ws, channel_ping, {});
});
}, PING_INTERVAL);
}
stopPingInterval() {
if (this.pingInterval) clearInterval(this.pingInterval);
}
heartbeat(ws: WebSocket) {
this.setWebSocketAsAlive(ws);
}
setWebSocketAsAlive(ws: WebSocket) {
//@ts-ignore
ws.isAlive = true;
}
} }