diff --git a/src/rtmt/rtmt.ts b/src/rtmt/rtmt.ts index bc04659..4e5e010 100644 --- a/src/rtmt/rtmt.ts +++ b/src/rtmt/rtmt.ts @@ -1,8 +1,16 @@ +import EventEmitter2, { Listener } from "eventemitter2" + export type Bytes = Uint8Array export type OnMessage = (message : Object) => any -export interface RTMT{ - sendMessage : (channel : string, message : Object) => any - listenMessage : (channel : string, callback : OnMessage) => any - unlistenMessage : (channel : string, callback : OnMessage) => any + +export type RtmtEventTypes = "open" | "close" | "connected" | "error" | "disconnected" | "message"; + +export interface RTMT extends EventEmitter2 { + sendMessage: (channel: string, message: Object) => void; + addMessageListener(channel: string, callback: (message: any) => void); + removeMessageListener(channel: string, callback: (message: any) => void); + on(event: RtmtEventTypes, callback: (...value: any[]) => void): Listener | this; + off(event: RtmtEventTypes, callback: (...value: any[]) => void): this; + dispose(); } \ No newline at end of file diff --git a/src/rtmt/rtmt_websocket.ts b/src/rtmt/rtmt_websocket.ts index 180d3c2..a60d2e4 100644 --- a/src/rtmt/rtmt_websocket.ts +++ b/src/rtmt/rtmt_websocket.ts @@ -1,101 +1,93 @@ import { decode, encode } from "./encode_decode_message"; -import { Bytes, OnMessage, RTMT } from "./rtmt"; import { channel_ping, channel_pong } from "../const/channel_names"; import { server } from "../const/config"; +import EventEmitter2, { Listener } from "eventemitter2"; +import { Bytes, RTMT, RtmtEventTypes } from "./rtmt"; const PING_INTERVAL = 15000, PING_INTERVAL_BUFFER_TIME = 1000; +const MESSAGE_CHANNEL_PREFIX = "message_channel"; -export class RTMTWS implements RTMT { - private messageChannels: Map; - private ws: WebSocket; - +export class RTMTWS extends EventEmitter2 implements RTMT { + private webSocket: WebSocket; private pingTimeout?: number = undefined; constructor() { - this.messageChannels = new Map(); + super(); } - initWebSocket( - userKey: string, - onopen: () => any, - onClose: () => any, - onError: (event: Event) => any - ) { + public initWebSocket(userKey: string) { const url = server.wsServerAdress + "?userKey=" + userKey; - const ws = new WebSocket(url); - ws.binaryType = "arraybuffer"; //for firefox - ws.onopen = () => { - console.info("(RTMT) ws has opened"); - this.ws = ws; + const webSocket = new WebSocket(url); + webSocket.onopen = () => { + console.info("(RTMT) WebSocket has opened"); + this.webSocket = webSocket; this.heartbeat(); - onopen(); + this.emit("open"); }; - ws.onclose = () => { - console.info("(RTMT) ws has closed"); - //this.ws = undefined + webSocket.onclose = () => { + console.info("(RTMT) WebSocket has closed"); + //this.WebSocket = undefined clearTimeout(this.pingTimeout); - onClose(); + this.emit("close"); }; - - ws.onmessage = (event: MessageEvent) => { - this.onWebSocketMessage(this, event); + webSocket.onmessage = (event: MessageEvent) => { + const { channel, message } = decode(event.data); + this.onMessage(channel, message); }; - - ws.addEventListener("error", (ev) => { - console.error({ ws_error: ev }); - onError(ev); - }); + webSocket.onerror = (error) => { + console.error(error); + this.emit("error", error); + } } - heartbeat() { + private heartbeat() { clearTimeout(this.pingTimeout); - // Use `WebSocket#terminate()`, which immediately destroys the connection, // instead of `WebSocket#close()`, which waits for the close timer. // Delay should be equal to the interval at which your server // sends out pings plus a conservative assumption of the latency. this.pingTimeout = setTimeout(() => { - this.ws.close(); + this.webSocket.close(); }, PING_INTERVAL + PING_INTERVAL_BUFFER_TIME); } - - sendMessage(channel: string, message: Object) { - if (this.ws === undefined) { - console.error("(RTMT) ws is undefined"); + public sendMessage(channel: string, message: Object) { + if (this.webSocket === undefined) { + console.error("(RTMT) WebSocket is undefined"); return; } const data = encode(channel, message); - this.ws.send(data); + this.webSocket.send(data); } - // todo: support multible listeners - listenMessage(channel: string, callback: OnMessage) { - this.messageChannels.set(channel, callback); - } - - // todo: support multible listeners - unlistenMessage(channel : string, callback : OnMessage) { - this.messageChannels.set(channel, undefined); - } - - onWebSocketMessage(rtmt: RTMTWS, event: MessageEvent) { - const { channel, message } = decode(event.data); - rtmt.onMessage(channel, message); - } - - onMessage(channel: string, message: Bytes) { - if(channel === channel_ping) { + private onMessage(channel: string, message: Bytes) { + if (channel === channel_ping) { this.heartbeat(); this.sendMessage(channel_pong, {}); return; } - const callback = this.messageChannels.get(channel); + // TODO: Maybe we should warn if there is not any listener for channel + this.emit(MESSAGE_CHANNEL_PREFIX + channel, message); + } - if (callback) { - callback(message); - } else { - console.warn("(RTMT) Channel callback not found!" + channel); - } + public on(event: RtmtEventTypes, callback: (...value: any[]) => void): Listener | this { + return super.on(event, callback); + } + + public off(event: RtmtEventTypes, callback: (...value: any[]) => void): this { + return super.off(event, callback); + } + + public addMessageListener(channel: string, callback: (message: any) => void) { + super.on(MESSAGE_CHANNEL_PREFIX + channel, callback); + } + + public removeMessageListener(channel: string, callback: (message: any) => void) { + super.off(MESSAGE_CHANNEL_PREFIX + channel, callback); + } + + public dispose() { + this.webSocket.close(); + this.removeAllListeners(); } }