fix write coalescer

This commit is contained in:
Nystik 2026-05-14 21:46:24 +02:00
parent 8ebc7dac5a
commit ad8252b216

View file

@ -1,21 +1,18 @@
// Write coalescer for slow filesystems (rclone, FUSE, NFS, SMB). // Write coalescer for slow filesystems (rclone, FUSE, NFS, SMB).
// //
// First write to a path goes to disk immediately. Subsequent writes within // First write to a path goes to disk immediately. Subsequent writes within the coalesce window are buffered and flushed when the debounce timer fires; the timer resets on each write.
// the coalesce window are buffered; the timer resets on each write. After
// the window elapses with no new writes, the buffered data is flushed.
// //
// This prevents rapid-fire writes (e.g. workspace.json saved 20x/min) // Buffered writes respond to the HTTP client right away with synthetic mtime/size. Otherwise the browser's per-host connection cap blocks unrelated reads while writes sit in the buffer.
// from overwhelming network-mounted filesystems.
const fs = require("fs"); const fs = require("fs");
const config = require("./config"); const config = require("./config");
const FLUSH_TIMEOUT_MS = 10000; const FLUSH_TIMEOUT_MS = 10000;
// absPath -> timestamp of last completed write // absPath -> timestamp of last completed (or scheduled) write
const lastWriteTime = new Map(); const lastWriteTime = new Map();
// absPath -> { data, encoding, timer, resolvers: [{ resolve, reject }] } // absPath -> { data, encoding, timer }
const pending = new Map(); const pending = new Map();
async function writeToDisk(absPath, data, encoding) { async function writeToDisk(absPath, data, encoding) {
@ -41,18 +38,9 @@ function flushEntry(absPath) {
clearTimeout(entry.timer); clearTimeout(entry.timer);
pending.delete(absPath); pending.delete(absPath);
writeToDisk(absPath, entry.data, entry.encoding).then( writeToDisk(absPath, entry.data, entry.encoding).catch((err) => {
(result) => { console.error(`[write-coalesce] Flush failed for ${absPath}:`, err);
for (const r of entry.resolvers) { });
r.resolve(result);
}
},
(err) => {
for (const r of entry.resolvers) {
r.reject(err);
}
},
);
} }
function scheduleFlush(absPath) { function scheduleFlush(absPath) {
@ -66,18 +54,24 @@ function scheduleFlush(absPath) {
entry.timer = setTimeout(() => flushEntry(absPath), config.writeCoalesceMs); entry.timer = setTimeout(() => flushEntry(absPath), config.writeCoalesceMs);
} }
function estimateSize(data, encoding) {
if (typeof data === "string") {
return Buffer.byteLength(data, encoding === "binary" ? "utf-8" : encoding);
}
return data.length || data.byteLength || 0;
}
/** /**
* Write file content, coalescing rapid writes. * Write file content, coalescing rapid writes.
* The returned promise resolves with { mtime, size } once data hits disk. * Fresh writes resolve with real mtime/size once data is on disk. Buffered writes resolve immediately with synthetic values; the disk flush happens later when the debounce timer fires.
*/ */
async function writeCoalesced(absPath, data, encoding) { async function writeCoalesced(absPath, data, encoding) {
const windowMs = config.writeCoalesceMs; const windowMs = config.writeCoalesceMs;
// Coalescing disabled or first write to this path
const last = lastWriteTime.get(absPath); const last = lastWriteTime.get(absPath);
// Fast path: coalescing disabled or far enough from the last write.
if (windowMs <= 0 || !last || Date.now() - last >= windowMs) { if (windowMs <= 0 || !last || Date.now() - last >= windowMs) {
// Resolve any pending write for this path first
if (pending.has(absPath)) { if (pending.has(absPath)) {
clearTimeout(pending.get(absPath).timer); clearTimeout(pending.get(absPath).timer);
pending.delete(absPath); pending.delete(absPath);
@ -86,28 +80,23 @@ async function writeCoalesced(absPath, data, encoding) {
return writeToDisk(absPath, data, encoding); return writeToDisk(absPath, data, encoding);
} }
// Within the coalesce window: buffer the write // Within the coalesce window: buffer the write and respond immediately.
return new Promise((resolve, reject) => {
const existing = pending.get(absPath); const existing = pending.get(absPath);
if (existing) { if (existing) {
// Update data and add another resolver
existing.data = data; existing.data = data;
existing.encoding = encoding; existing.encoding = encoding;
existing.resolvers.push({ resolve, reject });
scheduleFlush(absPath); scheduleFlush(absPath);
} else { } else {
const entry = { pending.set(absPath, {
data, data,
encoding, encoding,
timer: null, timer: null,
resolvers: [{ resolve, reject }], });
};
pending.set(absPath, entry);
scheduleFlush(absPath); scheduleFlush(absPath);
} }
});
return { mtime: Date.now(), size: estimateSize(data, encoding) };
} }
/** /**
@ -136,7 +125,6 @@ async function flushAll() {
console.log(`[write-coalesce] Flushing ${paths.length} pending write(s)...`); console.log(`[write-coalesce] Flushing ${paths.length} pending write(s)...`);
// Clear all timers
for (const entry of pending.values()) { for (const entry of pending.values()) {
clearTimeout(entry.timer); clearTimeout(entry.timer);
} }
@ -145,20 +133,9 @@ async function flushAll() {
const entry = pending.get(absPath); const entry = pending.get(absPath);
pending.delete(absPath); pending.delete(absPath);
return writeToDisk(absPath, entry.data, entry.encoding).then( return writeToDisk(absPath, entry.data, entry.encoding).catch((err) => {
(result) => {
for (const r of entry.resolvers) {
r.resolve(result);
}
},
(err) => {
console.error(`[write-coalesce] Failed to flush ${absPath}:`, err); console.error(`[write-coalesce] Failed to flush ${absPath}:`, err);
});
for (const r of entry.resolvers) {
r.reject(err);
}
},
);
}); });
const timeout = new Promise((resolve) => { const timeout = new Promise((resolve) => {