add event emitter to rtmt

This commit is contained in:
Halit Aksoy 2022-09-02 00:01:55 +03:00
parent 1dd2f352f3
commit 0b251367e1
2 changed files with 65 additions and 65 deletions

View File

@ -1,8 +1,16 @@
import EventEmitter2, { Listener } from "eventemitter2"
export type Bytes = Uint8Array export type Bytes = Uint8Array
export type OnMessage = (message : Object) => any export type OnMessage = (message : Object) => any
export interface RTMT{
sendMessage : (channel : string, message : Object) => any export type RtmtEventTypes = "open" | "close" | "connected" | "error" | "disconnected" | "message";
listenMessage : (channel : string, callback : OnMessage) => any
unlistenMessage : (channel : string, callback : OnMessage) => any export interface RTMT extends EventEmitter2 {
sendMessage: (channel: string, message: Object) => void;
addMessageListener(channel: string, callback: (message: any) => void);
removeMessageListener(channel: string, callback: (message: any) => void);
on(event: RtmtEventTypes, callback: (...value: any[]) => void): Listener | this;
off(event: RtmtEventTypes, callback: (...value: any[]) => void): this;
dispose();
} }

View File

@ -1,101 +1,93 @@
import { decode, encode } from "./encode_decode_message"; import { decode, encode } from "./encode_decode_message";
import { Bytes, OnMessage, RTMT } from "./rtmt";
import { channel_ping, channel_pong } from "../const/channel_names"; import { channel_ping, channel_pong } from "../const/channel_names";
import { server } from "../const/config"; import { server } from "../const/config";
import EventEmitter2, { Listener } from "eventemitter2";
import { Bytes, RTMT, RtmtEventTypes } from "./rtmt";
const PING_INTERVAL = 15000, PING_INTERVAL_BUFFER_TIME = 1000; const PING_INTERVAL = 15000, PING_INTERVAL_BUFFER_TIME = 1000;
const MESSAGE_CHANNEL_PREFIX = "message_channel";
export class RTMTWS implements RTMT { export class RTMTWS extends EventEmitter2 implements RTMT {
private messageChannels: Map<String, OnMessage | undefined>; private webSocket: WebSocket;
private ws: WebSocket;
private pingTimeout?: number = undefined; private pingTimeout?: number = undefined;
constructor() { constructor() {
this.messageChannels = new Map<String, OnMessage>(); super();
} }
initWebSocket( public initWebSocket(userKey: string) {
userKey: string,
onopen: () => any,
onClose: () => any,
onError: (event: Event) => any
) {
const url = server.wsServerAdress + "?userKey=" + userKey; const url = server.wsServerAdress + "?userKey=" + userKey;
const ws = new WebSocket(url); const webSocket = new WebSocket(url);
ws.binaryType = "arraybuffer"; //for firefox webSocket.onopen = () => {
ws.onopen = () => { console.info("(RTMT) WebSocket has opened");
console.info("(RTMT) ws has opened"); this.webSocket = webSocket;
this.ws = ws;
this.heartbeat(); this.heartbeat();
onopen(); this.emit("open");
}; };
ws.onclose = () => { webSocket.onclose = () => {
console.info("(RTMT) ws has closed"); console.info("(RTMT) WebSocket has closed");
//this.ws = undefined //this.WebSocket = undefined
clearTimeout(this.pingTimeout); clearTimeout(this.pingTimeout);
onClose(); this.emit("close");
}; };
webSocket.onmessage = (event: MessageEvent) => {
ws.onmessage = (event: MessageEvent) => { const { channel, message } = decode(event.data);
this.onWebSocketMessage(this, event); this.onMessage(channel, message);
}; };
webSocket.onerror = (error) => {
ws.addEventListener("error", (ev) => { console.error(error);
console.error({ ws_error: ev }); this.emit("error", error);
onError(ev); }
});
} }
heartbeat() { private heartbeat() {
clearTimeout(this.pingTimeout); clearTimeout(this.pingTimeout);
// Use `WebSocket#terminate()`, which immediately destroys the connection, // Use `WebSocket#terminate()`, which immediately destroys the connection,
// instead of `WebSocket#close()`, which waits for the close timer. // instead of `WebSocket#close()`, which waits for the close timer.
// Delay should be equal to the interval at which your server // Delay should be equal to the interval at which your server
// sends out pings plus a conservative assumption of the latency. // sends out pings plus a conservative assumption of the latency.
this.pingTimeout = setTimeout(() => { this.pingTimeout = setTimeout(() => {
this.ws.close(); this.webSocket.close();
}, PING_INTERVAL + PING_INTERVAL_BUFFER_TIME); }, PING_INTERVAL + PING_INTERVAL_BUFFER_TIME);
} }
public sendMessage(channel: string, message: Object) {
sendMessage(channel: string, message: Object) { if (this.webSocket === undefined) {
if (this.ws === undefined) { console.error("(RTMT) WebSocket is undefined");
console.error("(RTMT) ws is undefined");
return; return;
} }
const data = encode(channel, message); const data = encode(channel, message);
this.ws.send(data); this.webSocket.send(data);
} }
// todo: support multible listeners private onMessage(channel: string, message: Bytes) {
listenMessage(channel: string, callback: OnMessage) { if (channel === channel_ping) {
this.messageChannels.set(channel, callback);
}
// todo: support multible listeners
unlistenMessage(channel : string, callback : OnMessage) {
this.messageChannels.set(channel, undefined);
}
onWebSocketMessage(rtmt: RTMTWS, event: MessageEvent) {
const { channel, message } = decode(event.data);
rtmt.onMessage(channel, message);
}
onMessage(channel: string, message: Bytes) {
if(channel === channel_ping) {
this.heartbeat(); this.heartbeat();
this.sendMessage(channel_pong, {}); this.sendMessage(channel_pong, {});
return; return;
} }
const callback = this.messageChannels.get(channel); // TODO: Maybe we should warn if there is not any listener for channel
this.emit(MESSAGE_CHANNEL_PREFIX + channel, message);
if (callback) {
callback(message);
} else {
console.warn("(RTMT) Channel callback not found!" + channel);
} }
public on(event: RtmtEventTypes, callback: (...value: any[]) => void): Listener | this {
return super.on(event, callback);
}
public off(event: RtmtEventTypes, callback: (...value: any[]) => void): this {
return super.off(event, callback);
}
public addMessageListener(channel: string, callback: (message: any) => void) {
super.on(MESSAGE_CHANNEL_PREFIX + channel, callback);
}
public removeMessageListener(channel: string, callback: (message: any) => void) {
super.off(MESSAGE_CHANNEL_PREFIX + channel, callback);
}
public dispose() {
this.webSocket.close();
this.removeAllListeners();
} }
} }