// VIP Matting Queue Manager // Uses BiRefNet for high-quality matting via WebSocket with matting-server const fs = require('fs'); const path = require('path'); const archiver = require('archiver'); const unzipper = require('unzipper'); const { getDatabase } = require('./sql'); const { isMattingServerConnected, sendMattingTask } = require('./socket-connecting'); const QUEUE_FILE = path.join(__dirname, 'vip-matting-queue.json'); // Timeout config (ms) const TASK_TIMEOUT = 5 * 60 * 1000; // 5 minutes // Queue state let queue = []; let isProcessing = false; let dbInstance = null; let timeoutCheckInterval = null; // Get database instance async function getDB() { if (!dbInstance) { dbInstance = await getDatabase(); } return dbInstance; } // Initialize: load queue function initQueue() { try { if (fs.existsSync(QUEUE_FILE)) { const data = fs.readFileSync(QUEUE_FILE, 'utf-8'); queue = JSON.parse(data); console.log('[VIPMatting] Queue loaded, ' + queue.length + ' tasks'); } } catch (error) { console.error('[VIPMatting] Failed to load queue:', error); queue = []; } } // Save queue function saveQueue() { try { fs.writeFileSync(QUEUE_FILE, JSON.stringify(queue, null, 2), 'utf-8'); } catch (error) { console.error('[VIPMatting] Failed to save queue:', error); } } // Generate unique task ID (timestamp to ms + random string) function generateTaskId() { const timestamp = Date.now(); const random = Math.random().toString(36).substr(2, 9); return 'vip_' + timestamp + '_' + random; } // Add task to queue async function addToQueue(username, taskData) { const taskId = generateTaskId(); const task = { id: taskId, username: username.toLowerCase(), type: 'vip-matting', status: queue.length === 0 && !isProcessing ? 'rendering' : 'queued', createdAt: new Date().toISOString(), ...taskData }; queue.push(task); saveQueue(); // Add to history try { const db = await getDB(); db.addAIHistory(taskId, username, task.status, null); } catch (error) { console.error('[VIPMatting] Failed to add history:', error); } // If queue has only one task and not processing, start immediately if (queue.length === 1 && !isProcessing) { processQueue(); } return taskId; } // Check timeout tasks async function checkTimeoutTasks() { try { const db = await getDB(); const timedOutTasks = db.checkAndMarkTimeoutTasks(TASK_TIMEOUT); if (timedOutTasks && timedOutTasks.length > 0) { console.log('[VIPMatting] Found ' + timedOutTasks.length + ' timeout tasks, marked as failed'); } } catch (error) { console.error('[VIPMatting] Failed to check timeout tasks:', error); } } // Start timeout checker function startTimeoutChecker() { if (timeoutCheckInterval) { clearInterval(timeoutCheckInterval); } timeoutCheckInterval = setInterval(checkTimeoutTasks, 30000); console.log('[VIPMatting] Timeout checker started'); } // Create ZIP buffer async function createZipBuffer(imageBase64, jsonData, taskId) { return new Promise((resolve, reject) => { const buffers = []; const archive = archiver('zip', { zlib: { level: 5 } }); archive.on('data', (chunk) => buffers.push(chunk)); archive.on('end', () => resolve(Buffer.concat(buffers))); archive.on('error', (err) => reject(err)); // Add image file const imageBuffer = Buffer.from(imageBase64, 'base64'); archive.append(imageBuffer, { name: 'image.png' }); // Add JSON data if exists if (jsonData) { const jsonBuffer = Buffer.from(JSON.stringify(jsonData, null, 2), 'utf-8'); archive.append(jsonBuffer, { name: 'spritesheet.json' }); } archive.finalize(); }); } // Extract image from ZIP async function extractImageFromZip(zipBuffer) { return new Promise((resolve, reject) => { const chunks = []; const directory = unzipper.Parse(); directory.on('entry', (entry) => { const fileName = entry.path; if (fileName === 'image_matted.png' || fileName === 'image.png') { entry.buffer().then(buffer => { resolve(buffer); }).catch(reject); } else { entry.autodrain(); } }); directory.on('close', () => { if (chunks.length === 0) { reject(new Error('No processed image found in ZIP')); } }); directory.on('error', reject); const { Readable } = require('stream'); const stream = new Readable(); stream.push(zipBuffer); stream.push(null); stream.pipe(directory); }); } // Process queue async function processQueue() { if (isProcessing || queue.length === 0) { return; } isProcessing = true; while (queue.length > 0) { const task = queue[0]; if (!isMattingServerConnected()) { console.log('[VIPMatting] matting-server not connected, waiting...'); await new Promise(resolve => setTimeout(resolve, 5000)); if (!isMattingServerConnected()) { console.error('[VIPMatting] matting-server still not connected, keeping tasks in queue'); isProcessing = false; return; } } if (task.status === 'queued') { task.status = 'rendering'; task.renderStartTime = Date.now(); await updateTaskStatus(task.id, 'rendering', null, null, task.renderStartTime); } try { console.log('[VIPMatting] Processing task: ' + task.id); const zipBuffer = await createZipBuffer(task.imageBase64, task.jsonData, task.id); console.log('[VIPMatting] ZIP created, size: ' + zipBuffer.length + ' bytes'); const resultZipBuffer = await new Promise((resolve, reject) => { sendMattingTask(task.id, zipBuffer, (error, data) => { if (error) { reject(error); } else { resolve(data); } }, TASK_TIMEOUT); }); console.log('[VIPMatting] Result received, size: ' + resultZipBuffer.length + ' bytes'); const processedImageBuffer = await extractImageFromZip(resultZipBuffer); const imageBase64 = processedImageBuffer.toString('base64'); const imageUrl = await saveVIPMattingImage(task.username, task.id, imageBase64); task.status = 'completed'; task.imageUrl = imageUrl; task.completedAt = new Date().toISOString(); await updateTaskStatus(task.id, 'completed', imageUrl); console.log('[VIPMatting] Task completed: ' + task.id); } catch (error) { console.error('[VIPMatting] Task failed: ' + task.id, error); task.status = 'failed'; task.error = error.message; task.completedAt = new Date().toISOString(); await updateTaskStatus(task.id, 'failed', null, error.message); } queue.shift(); saveQueue(); } isProcessing = false; } // Save VIP matting image async function saveVIPMattingImage(username, taskId, imageBase64) { const usersDir = path.join(__dirname, 'users'); const userDir = path.join(usersDir, username.toLowerCase()); const aiDir = path.join(userDir, 'ai-images'); if (!fs.existsSync(aiDir)) { fs.mkdirSync(aiDir, { recursive: true }); } const imagePath = path.join(aiDir, taskId + '.png'); const imageBuffer = Buffer.from(imageBase64, 'base64'); fs.writeFileSync(imagePath, imageBuffer); return '/api/ai/image?username=' + encodeURIComponent(username) + '&id=' + encodeURIComponent(taskId); } // Update task status async function updateTaskStatus(taskId, status, imageUrl, error, renderStartTime) { try { const db = await getDB(); db.updateAITaskStatus(taskId, status, imageUrl || null, error || null, renderStartTime || null); } catch (err) { console.error('[VIPMatting] Failed to update task status:', err); } } // Handle VIP matting queue request function handleQueueRequest(req, res) { if (req.method !== 'POST') { res.writeHead(405, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ error: 'Method not allowed' })); return; } let body = ''; req.on('data', (chunk) => { body += chunk.toString(); }); req.on('end', async () => { try { const data = JSON.parse(body); const { username, imageBase64, fileName, jsonData } = data; if (!username) { res.writeHead(400, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ success: false, error: 'Missing username' })); return; } if (!imageBase64) { res.writeHead(400, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ success: false, error: 'Missing image data' })); return; } const taskId = await addToQueue(username, { imageBase64, fileName: fileName || 'vip-matting', jsonData: jsonData || null }); res.writeHead(200, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ success: true, taskId: taskId, message: 'VIP matting task added to queue' })); processQueue(); } catch (error) { console.error('[VIPMatting] Request failed:', error); res.writeHead(500, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ success: false, error: 'Processing failed', details: error.message })); } }); req.on('error', (error) => { console.error('[VIPMatting] Request error:', error); res.writeHead(500, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ success: false, error: 'Request error', details: error.message })); }); } // Initialize initQueue(); startTimeoutChecker(); module.exports = { handleQueueRequest, processQueue, TASK_TIMEOUT };