ignis/packages/shim/src/ws-client.js

267 lines
5.6 KiB
JavaScript

// Vault-scoped WebSocket client.Single connection per shim instance.
// Multiple consumers attach via subscribe/channel.
const RECONNECT_DELAY_MS = 2000;
export function createWsClient() {
let ws = null;
let vaultId = null;
let reconnectTimer = null;
let manuallyClosed = false;
let state = "closed"; // "closed" | "connecting" | "open"
const globalSubs = new Map(); // type -> Set<handler>
const channelSubs = new Map(); // channelName -> Map<type, Set<handler>>
const channelSubCount = new Map(); // channelName -> integer
const stateSubs = new Set(); // handler(state)
function setState(next) {
if (state === next) {
return;
}
state = next;
for (const fn of stateSubs) {
try {
fn(state);
} catch (e) {
console.error("[ws] state subscriber threw:", e);
}
}
}
function postRaw(message) {
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(message));
}
}
function sendSubscribeChannel(name) {
postRaw({ type: "subscribe-channel", channel: name });
}
function sendUnsubscribeChannel(name) {
postRaw({ type: "unsubscribe-channel", channel: name });
}
function dispatch(msg) {
if (msg.channel) {
const types = channelSubs.get(msg.channel);
const handlers = types && types.get(msg.type);
if (handlers) {
for (const fn of handlers) {
try {
fn(msg);
} catch (e) {
console.error(
`[ws] channel subscriber for ${msg.channel}:${msg.type} threw:`,
e,
);
}
}
}
return;
}
const handlers = globalSubs.get(msg.type);
if (handlers) {
for (const fn of handlers) {
try {
fn(msg);
} catch (e) {
console.error(`[ws] subscriber for ${msg.type} threw:`, e);
}
}
}
}
function openSocket() {
if (ws) {
return;
}
setState("connecting");
const protocol = window.location.protocol === "https:" ? "wss:" : "ws:";
const url = `${protocol}//${window.location.host}/ws?vault=${encodeURIComponent(vaultId)}`;
try {
ws = new WebSocket(url);
} catch (e) {
console.error("[ws] failed to create WebSocket:", e);
ws = null;
setState("closed");
scheduleReconnect();
return;
}
ws.onopen = () => {
console.log("[ws] connected");
setState("open");
// Re-establish channel subscriptions on the new connection.
for (const name of channelSubCount.keys()) {
sendSubscribeChannel(name);
}
};
ws.onmessage = (event) => {
let msg;
try {
msg = JSON.parse(event.data);
} catch (e) {
console.error("[ws] failed to parse message:", e);
return;
}
dispatch(msg);
};
ws.onclose = () => {
ws = null;
setState("closed");
if (!manuallyClosed) {
scheduleReconnect();
}
};
ws.onerror = (e) => {
console.error("[ws] error:", e);
};
}
function scheduleReconnect() {
if (reconnectTimer || manuallyClosed) {
return;
}
reconnectTimer = setTimeout(() => {
reconnectTimer = null;
console.log("[ws] reconnecting...");
openSocket();
}, RECONNECT_DELAY_MS);
}
function connect(id) {
if (!id) {
console.warn("[ws] no vault id; skipping connect");
return;
}
vaultId = id;
manuallyClosed = false;
openSocket();
}
function disconnect() {
manuallyClosed = true;
if (reconnectTimer) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
if (ws) {
ws.close();
ws = null;
}
setState("closed");
}
function subscribe(type, handler) {
if (!globalSubs.has(type)) {
globalSubs.set(type, new Set());
}
globalSubs.get(type).add(handler);
return () => {
globalSubs.get(type)?.delete(handler);
};
}
function send(type, payload) {
postRaw({ type, ...(payload || {}) });
}
function channel(name) {
return {
subscribe(type, handler) {
if (!channelSubs.has(name)) {
channelSubs.set(name, new Map());
}
const types = channelSubs.get(name);
if (!types.has(type)) {
types.set(type, new Set());
}
types.get(type).add(handler);
// First subscriber for this channel: upgrade the server-side gate.
const prevCount = channelSubCount.get(name) || 0;
channelSubCount.set(name, prevCount + 1);
if (prevCount === 0) {
sendSubscribeChannel(name);
}
return () => {
const set = types.get(type);
if (!set || !set.has(handler)) {
return;
}
set.delete(handler);
const newCount = (channelSubCount.get(name) || 0) - 1;
if (newCount <= 0) {
channelSubCount.delete(name);
sendUnsubscribeChannel(name);
} else {
channelSubCount.set(name, newCount);
}
};
},
send(type, payload) {
postRaw({ channel: name, type, ...(payload || {}) });
},
};
}
function isOpen() {
return state === "open";
}
function onStateChange(handler) {
stateSubs.add(handler);
return () => {
stateSubs.delete(handler);
};
}
return {
connect,
disconnect,
subscribe,
send,
channel,
isOpen,
onStateChange,
};
}
// Singleton instance. The shim has one WebSocket per page; consumers all share it.
export const wsClient = createWsClient();