243 lines
6.2 KiB
TypeScript
243 lines
6.2 KiB
TypeScript
import { ManagedSocketErrorType, type SocketDisconnectCode } from "./constants";
|
|
import { ManagedSocketError } from "./errors";
|
|
import { SocketSpeak } from "./messages";
|
|
|
|
const ONE_SECOND = 1000;
|
|
const HEARTBEAT_INTERVAL = 30 * ONE_SECOND;
|
|
const MAX_RECONNECT_DELAY = 30 * ONE_SECOND;
|
|
|
|
type SocketDataHandler = (data: unknown) => void | Promise<void>;
|
|
type SocketErrorHandler = (error: ManagedSocketError) => void;
|
|
type SocketOpenHandler = () => void;
|
|
type SocketCloseHandler = () => void;
|
|
type SocketCatostrophicErrorHandler = (error: ManagedSocketError) => void;
|
|
|
|
export type ManagedSocketConnectionState =
|
|
| "connecting"
|
|
| "open"
|
|
| "closing"
|
|
| "closed"
|
|
| "reconnecting"
|
|
| "terminated";
|
|
|
|
export class ManagedSocket {
|
|
public readonly url: string;
|
|
public state: ManagedSocketConnectionState = "connecting";
|
|
private ws: WebSocket;
|
|
|
|
private onMessageHandler: SocketDataHandler | null = null;
|
|
private onErrorHandler: SocketErrorHandler | null = null;
|
|
private onOpenHandler: SocketOpenHandler | null = null;
|
|
private onCloseHandler: SocketCloseHandler | null = null;
|
|
private onCatostrophicErrorHandler: SocketCatostrophicErrorHandler | null =
|
|
null;
|
|
|
|
private reconnectAttempts: number;
|
|
private maxReconnectDelay: number;
|
|
private heartbeatInterval: Timer | null;
|
|
private messageQueue: unknown[] = [];
|
|
private attemptReconnect = true;
|
|
|
|
private debugMode = false;
|
|
|
|
constructor(url: string) {
|
|
this.url = url;
|
|
this.ws = this.connect();
|
|
this.reconnectAttempts = 0;
|
|
this.maxReconnectDelay = MAX_RECONNECT_DELAY;
|
|
this.heartbeatInterval = null;
|
|
}
|
|
|
|
get connected() {
|
|
return this.ws.readyState === WebSocket.OPEN;
|
|
}
|
|
|
|
get native() {
|
|
return this.ws;
|
|
}
|
|
|
|
set debug(value: boolean) {
|
|
this.debugMode = value;
|
|
}
|
|
|
|
debugMessage(...args: unknown[]) {
|
|
if (this.debugMode) {
|
|
const output = ["[ManagedSocketDebug]", ...args];
|
|
console.log(...output);
|
|
}
|
|
}
|
|
|
|
private sendQueuedMessages() {
|
|
// Make a copy of the queue and clear it first.
|
|
// this way if the messages fail to send again,
|
|
// they just get requeued
|
|
const messagesToSend = [...this.messageQueue];
|
|
this.messageQueue = [];
|
|
for (const message of messagesToSend) {
|
|
this.send(message);
|
|
}
|
|
}
|
|
|
|
private connect() {
|
|
this.ws = new WebSocket(this.url);
|
|
|
|
this.ws.addEventListener("open", () => {
|
|
this.reconnectAttempts = 0;
|
|
|
|
// Announce successful reconnects
|
|
if (this.state === "reconnecting") {
|
|
this.debugMessage("Reconnected to socket server");
|
|
}
|
|
|
|
this.onOpenHandler?.();
|
|
this.state = "open";
|
|
this.startHeartbeat();
|
|
this.sendQueuedMessages();
|
|
});
|
|
|
|
this.ws.addEventListener("close", (event) => {
|
|
const { code } = event;
|
|
this.debugMessage(code);
|
|
this.onCloseHandler?.();
|
|
|
|
// If the code received was catostrophic, terminate the connection
|
|
if (SocketSpeak.isCatatrophicCloseCode(code)) {
|
|
this.handleCatostophicError(code);
|
|
}
|
|
|
|
this.state = "closed";
|
|
|
|
if (this.attemptReconnect) {
|
|
this.debugMessage(
|
|
"Socket connection closed. Attempting reconnection...",
|
|
);
|
|
this.stopHeartbeat();
|
|
this.reconnect();
|
|
}
|
|
});
|
|
|
|
this.ws.addEventListener("error", (error) => {
|
|
this.debugMessage("WebSocket error", error);
|
|
|
|
const errorType =
|
|
this.state === "connecting"
|
|
? ManagedSocketErrorType.CONNECTION_REJECTED
|
|
: ManagedSocketErrorType.SOCKET_ERROR;
|
|
|
|
const socketError = new ManagedSocketError(
|
|
errorType,
|
|
new Error(error.currentTarget?.toString()),
|
|
errorType,
|
|
);
|
|
|
|
if (this.onErrorHandler !== null) {
|
|
return this.onErrorHandler(socketError);
|
|
}
|
|
|
|
throw error;
|
|
});
|
|
|
|
this.ws.addEventListener("message", (event) => {
|
|
const message = SocketSpeak.deserialize(event.data.toString());
|
|
|
|
// Ignore heartbeats
|
|
if (message.type === "heartbeat") {
|
|
this.debugMessage("Received heartbeat");
|
|
return;
|
|
}
|
|
|
|
if (message.data === undefined || message.data === null) {
|
|
this.debugMessage("Received message with no data");
|
|
return;
|
|
}
|
|
|
|
this.debugMessage("Received message", message.data);
|
|
this.onMessageHandler?.(message.data);
|
|
});
|
|
|
|
return this.ws;
|
|
}
|
|
|
|
handleCatostophicError(code: SocketDisconnectCode) {
|
|
this.state = "terminated";
|
|
this.attemptReconnect = false;
|
|
this.debugMessage(
|
|
"Socket connection terminated due to non-reconnect close code",
|
|
);
|
|
|
|
const socketError = new ManagedSocketError(
|
|
"Socket connection terminated due to non-reconnect close code",
|
|
new Error(code.toString()),
|
|
ManagedSocketErrorType.CATOSTROPHIC_CLOSE,
|
|
);
|
|
|
|
if (this.onCatostrophicErrorHandler !== null) {
|
|
return this.onCatostrophicErrorHandler(socketError);
|
|
}
|
|
|
|
throw socketError;
|
|
}
|
|
|
|
onMessage(handler: SocketDataHandler) {
|
|
this.onMessageHandler = handler;
|
|
}
|
|
|
|
onError(handler: (error: ManagedSocketError) => void) {
|
|
this.onErrorHandler = handler;
|
|
}
|
|
|
|
onOpen(handler: () => void) {
|
|
this.onOpenHandler = handler;
|
|
}
|
|
|
|
onClose(handler: () => void) {
|
|
this.onCloseHandler = handler;
|
|
}
|
|
|
|
reconnect() {
|
|
this.attemptReconnect = true;
|
|
this.state = "reconnecting";
|
|
const delay = Math.min(
|
|
1000 * 2 ** this.reconnectAttempts,
|
|
this.maxReconnectDelay,
|
|
);
|
|
this.debugMessage(`Attempting to reconnect in ${delay}ms`);
|
|
setTimeout(() => {
|
|
this.reconnectAttempts++;
|
|
this.connect();
|
|
}, delay);
|
|
}
|
|
|
|
close() {
|
|
this.attemptReconnect = false;
|
|
this.state = "closing";
|
|
this.ws.close();
|
|
}
|
|
|
|
private startHeartbeat() {
|
|
this.heartbeatInterval = setInterval(() => {
|
|
if (this.ws.readyState === WebSocket.OPEN) {
|
|
this.ws.send(
|
|
SocketSpeak.serialize(SocketSpeak.createHeartbeatMessage()),
|
|
);
|
|
}
|
|
}, HEARTBEAT_INTERVAL);
|
|
}
|
|
|
|
private stopHeartbeat() {
|
|
if (this.heartbeatInterval !== null) {
|
|
clearInterval(this.heartbeatInterval);
|
|
this.heartbeatInterval = null;
|
|
}
|
|
}
|
|
|
|
send<T>(data: T) {
|
|
if (this.ws.readyState !== WebSocket.OPEN) {
|
|
this.debugMessage("WebSocket is not open. Queuing message.");
|
|
this.messageQueue.push(data);
|
|
return;
|
|
}
|
|
|
|
this.ws.send(SocketSpeak.prepare(data));
|
|
}
|
|
}
|