Compare commits

..

3 commits

Author SHA1 Message Date
ClaraCrazy
51d608c2c0
[Fix]: Fix Stats-store for multi-instance DB support 2026-01-08 12:59:13 +01:00
ClaraCrazy
8a5c85c8fb
[Fix]: Chunk everything
I hate IMAP, respectfully.
2026-01-08 12:45:53 +01:00
ClaraCrazy
b8b198125d
[Fix]: Large purge failures
I swear writing a tempmail service is hard
2026-01-08 11:06:45 +01:00
2 changed files with 147 additions and 40 deletions

View file

@ -253,17 +253,46 @@ class ImapService extends EventEmitter {
// IMAP date filters are unreliable - some servers search internal date, not Date header // IMAP date filters are unreliable - some servers search internal date, not Date header
// Always fetch all UIDs and filter by date header in JavaScript instead // Always fetch all UIDs and filter by date header in JavaScript instead
const deleteOlderThan = helper.purgeTimeStamp();
const searchQuery = [ const searchQuery = [
['!DELETED'] ['!DELETED']
]; ];
uids = await this._searchWithoutFetch(searchQuery); uids = await this._searchWithoutFetch(searchQuery);
if (uids.length === 0) return; if (uids.length === 0) return;
const deleteOlderThan = helper.purgeTimeStamp(); debug(`Starting deleteOldMails. Total UIDs: ${uids.length}`);
const exampleUids = this.config.email.examples.uids.map(x => parseInt(x)); const HEADER_BATCH_SIZE = this.config.imap.fetchChunkSize
const headers = await this._getMailHeaders(uids); const concurrency = this.config.imap.fetchConcurrency || 4;
// Chunk UIDs
const uidChunks = [];
for (let i = 0; i < uids.length; i += HEADER_BATCH_SIZE) {
uidChunks.push(uids.slice(i, i + HEADER_BATCH_SIZE));
}
let headers = [];
let workerId = 0;
const runNext = async() => {
if (workerId >= uidChunks.length) return;
const chunkId = workerId++;
const batch = uidChunks[chunkId];
try {
const batchHeaders = await Promise.race([
this._getMailHeaders(batch),
new Promise((_, reject) => setTimeout(() => reject(new Error('Header fetch timeout')), 30000))
]);
headers = headers.concat(batchHeaders);
} catch (err) {
debug(`ERROR in batch ${chunkId+1}: ${err.message}`);
}
await runNext();
};
// Start workers
const pool = [];
const workers = Math.min(concurrency, uidChunks.length);
for (let i = 0; i < workers; i++) {
pool.push(runNext());
}
await Promise.all(pool);
// Get locked inboxes if available // Get locked inboxes if available
let lockedAddresses = []; let lockedAddresses = [];
@ -276,19 +305,26 @@ class ImapService extends EventEmitter {
} }
} }
// Filter out mails that are too new, whitelisted, or belong to locked inboxes // Ensure exampleUids is defined before filtering
const toDelete = headers const exampleUids = this.config.email.examples && this.config.email.examples.uids ?
.filter(mail => { this.config.email.examples.uids.map(x => parseInt(x)) : [];
const date = mail.attributes.date;
const uid = parseInt(mail.attributes.uid);
const toAddresses = Array.isArray(mail.parts[0].body.to) ?
mail.parts[0].body.to.map(a => a.toLowerCase()) : [String(mail.parts[0].body.to).toLowerCase()];
if (exampleUids.includes(uid)) return false; // Filter out mails that are too new, whitelisted, or belong to locked inboxes
if (toAddresses.some(addr => lockedAddresses.includes(addr))) return false; let filtered = headers;
return date <= deleteOlderThan;
}) // Exclude exampleUids
.map(mail => parseInt(mail.attributes.uid)); filtered = filtered.filter(mail => !exampleUids.includes(parseInt(mail.attributes.uid)));
// Exclude locked inboxes
filtered = filtered.filter(mail => {
const toAddresses = Array.isArray(mail.parts[0].body.to) ?
mail.parts[0].body.to.map(a => a.toLowerCase()) : [String(mail.parts[0].body.to).toLowerCase()];
return !toAddresses.some(addr => lockedAddresses.includes(addr));
});
// Exclude too new (compare timestamps for Date objects)
filtered = filtered.filter(mail => mail.attributes.date && mail.attributes.date.getTime() <= deleteOlderThan.getTime());
const toDelete = filtered.map(mail => parseInt(mail.attributes.uid));
if (toDelete.length === 0) { if (toDelete.length === 0) {
debug('No mails to delete. (after locked inbox exclusion)'); debug('No mails to delete. (after locked inbox exclusion)');
@ -296,11 +332,15 @@ class ImapService extends EventEmitter {
} }
debug(`Deleting mails ${toDelete}`); debug(`Deleting mails ${toDelete}`);
await this.connection.deleteMessage(toDelete); // Batch deletes to avoid IMAP argument limits
const BATCH_SIZE = this.config.imap.fetchChunkSize;
toDelete.forEach(uid => { for (let i = 0; i < toDelete.length; i += BATCH_SIZE) {
this.emit(ImapService.EVENT_DELETED_MAIL, uid); const batch = toDelete.slice(i, i + BATCH_SIZE);
}); await this.connection.deleteMessage(batch);
batch.forEach(uid => {
this.emit(ImapService.EVENT_DELETED_MAIL, uid);
});
}
} }
@ -338,17 +378,39 @@ class ImapService extends EventEmitter {
* @private * @private
*/ */
async _searchWithoutFetch(searchCriteria) { async _searchWithoutFetch(searchCriteria) {
const imapUnderlying = this.connection.imap const imapUnderlying = this.connection.imap;
// If searching by UID and the list is too long, batch it
return new Promise((resolve, reject) => { const UID_BATCH_SIZE = 500;
imapUnderlying.search(searchCriteria, (err, uids) => { // Detect UID search: ['UID', [array]] or ['UID', '1:1000']
if (err) { if (Array.isArray(searchCriteria) && searchCriteria.length === 1 && Array.isArray(searchCriteria[0]) && searchCriteria[0][0] === 'UID' && Array.isArray(searchCriteria[0][1]) && searchCriteria[0][1].length > UID_BATCH_SIZE) {
reject(err) const allUids = searchCriteria[0][1];
} else { let allResults = [];
resolve(uids || []) for (let i = 0; i < allUids.length; i += UID_BATCH_SIZE) {
} const batch = allUids.slice(i, i + UID_BATCH_SIZE);
}) const batchCriteria = [
}) ['UID', batch]
];
// eslint-disable-next-line no-await-in-loop
const batchResult = await new Promise((resolve, reject) => {
imapUnderlying.search(batchCriteria, (err, uids) => {
if (err) reject(err);
else resolve(uids || []);
});
});
allResults = allResults.concat(batchResult);
}
return allResults;
} else {
return new Promise((resolve, reject) => {
imapUnderlying.search(searchCriteria, (err, uids) => {
if (err) {
reject(err);
} else {
resolve(uids || []);
}
});
});
}
} }
_createMailSummary(message) { _createMailSummary(message) {
@ -487,4 +549,4 @@ ImapService.EVENT_DELETED_MAIL = 'mailDeleted'
ImapService.EVENT_INITIAL_LOAD_DONE = 'initial load done' ImapService.EVENT_INITIAL_LOAD_DONE = 'initial load done'
ImapService.EVENT_ERROR = 'error' ImapService.EVENT_ERROR = 'error'
module.exports = ImapService module.exports = ImapService

View file

@ -26,6 +26,7 @@ class StatisticsStore {
this.imapHash = this._computeImapHash(); this.imapHash = this._computeImapHash();
if (this.db) { if (this.db) {
this._autoMigrateRemoveIdCheck();
this._autoMigrateInstanceId(); this._autoMigrateInstanceId();
this._autoMigrateImapHash(); this._autoMigrateImapHash();
this._loadFromDatabase(); this._loadFromDatabase();
@ -33,6 +34,48 @@ class StatisticsStore {
debug('Statistics store initialized'); debug('Statistics store initialized');
} }
_autoMigrateRemoveIdCheck() {
debug('Starting CHECK(id = 1) migration check...');
// Remove CHECK(id = 1) constraint from statistics table if present
try {
const pragma = this.db.prepare("PRAGMA table_info(statistics)").all();
// Check if id column exists and if table has only one row (singleton)
const hasId = pragma.some(col => col.name === 'id');
if (hasId) {
// Check for CHECK constraint by inspecting sqlite_master
const tableSql = this.db.prepare("SELECT sql FROM sqlite_master WHERE type='table' AND name='statistics'").get();
debug('Table SQL:', tableSql && tableSql.sql);
if (tableSql && tableSql.sql && tableSql.sql.includes('CHECK(id = 1)')) {
debug('Detected CHECK(id = 1) constraint on statistics table, migrating...');
// Backup data
const rows = this.db.prepare('SELECT * FROM statistics').all();
// Drop and recreate table without CHECK constraint
this.db.prepare('DROP TABLE IF EXISTS statistics').run();
this.db.prepare(`CREATE TABLE IF NOT EXISTS statistics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
instance_id TEXT NOT NULL,
largest_uid INTEGER NOT NULL DEFAULT 0,
hourly_data TEXT,
last_updated INTEGER NOT NULL,
imap_hash TEXT NULL
)`).run();
// Restore data
const insert = this.db.prepare('INSERT INTO statistics (id, instance_id, largest_uid, hourly_data, last_updated, imap_hash) VALUES (?, ?, ?, ?, ?, ?)');
rows.forEach(row => {
insert.run(row.id, row.instance_id, row.largest_uid, row.hourly_data, row.last_updated, row.imap_hash);
});
debug('Migration complete: CHECK(id = 1) removed from statistics table.');
} else {
debug('No CHECK(id = 1) constraint found in statistics table.');
}
} else {
debug('No id column found in statistics table.');
}
} catch (e) {
debug('Auto-migration for CHECK(id = 1) failed:', e.message);
}
}
_autoMigrateInstanceId() { _autoMigrateInstanceId() {
// Add and backfill instance_id for statistics table // Add and backfill instance_id for statistics table
try { try {
@ -113,8 +156,9 @@ class StatisticsStore {
debug(`Loaded from database: largestUid=${this.largestUid}, hourlyData=${this.hourlyData.length} entries`); debug(`Loaded from database: largestUid=${this.largestUid}, hourlyData=${this.hourlyData.length} entries`);
} else { } else {
// No row for this hash, insert new row // No row for this hash, insert new row
const insert = this.db.prepare('INSERT INTO statistics (imap_hash, largest_uid, hourly_data, last_updated) VALUES (?, ?, ?, ?)'); const instanceId = config.instanceId || this.imapHash;
insert.run(this.imapHash, 0, JSON.stringify([]), Date.now()); const insert = this.db.prepare('INSERT OR REPLACE INTO statistics (instance_id, imap_hash, largest_uid, hourly_data, last_updated) VALUES (?, ?, ?, ?, ?)');
insert.run(instanceId, this.imapHash, 0, JSON.stringify([]), Date.now());
this.largestUid = 0; this.largestUid = 0;
this.hourlyData = []; this.hourlyData = [];
debug('Created new statistics row for imap_hash'); debug('Created new statistics row for imap_hash');
@ -127,17 +171,18 @@ class StatisticsStore {
_saveToDatabase() { _saveToDatabase() {
if (!this.db) return; if (!this.db) return;
try { try {
const instanceId = config.instanceId || this.imapHash;
const stmt = this.db.prepare(` const stmt = this.db.prepare(`
UPDATE statistics UPDATE statistics
SET largest_uid = ?, hourly_data = ?, last_updated = ? SET largest_uid = ?, hourly_data = ?, last_updated = ?
WHERE imap_hash = ? WHERE instance_id = ? AND imap_hash = ?
`); `);
const result = stmt.run(this.largestUid, JSON.stringify(this.hourlyData), Date.now(), this.imapHash); const result = stmt.run(this.largestUid, JSON.stringify(this.hourlyData), Date.now(), instanceId, this.imapHash);
// If no row was updated, insert new row // If no row was updated, insert new row
if (result.changes === 0) { if (result.changes === 0) {
const insert = this.db.prepare('INSERT INTO statistics (imap_hash, largest_uid, hourly_data, last_updated) VALUES (?, ?, ?, ?)'); const insert = this.db.prepare('INSERT INTO statistics (instance_id, imap_hash, largest_uid, hourly_data, last_updated) VALUES (?, ?, ?, ?, ?)');
insert.run(this.imapHash, this.largestUid, JSON.stringify(this.hourlyData), Date.now()); insert.run(instanceId, this.imapHash, this.largestUid, JSON.stringify(this.hourlyData), Date.now());
debug('Inserted new statistics row for imap_hash'); debug('Inserted new statistics row for instance_id and imap_hash');
} else { } else {
debug('Statistics saved to database'); debug('Statistics saved to database');
} }