From e8f72cb3907ffeb6c0c311a4434fc2c96488a78c Mon Sep 17 00:00:00 2001 From: Dmitri Date: Fri, 20 Mar 2026 15:12:56 +0400 Subject: [PATCH] improved assets uploading --- backend/src/index.js | 22 +- backend/src/routes/file.js | 14 ++ backend/src/services/file.js | 401 ++++++++++++++++++++++++++++++----- 3 files changed, 386 insertions(+), 51 deletions(-) diff --git a/backend/src/index.js b/backend/src/index.js index a274faa..1787c90 100644 --- a/backend/src/index.js +++ b/backend/src/index.js @@ -119,11 +119,20 @@ app.use( app.use(cors({origin: true})); require('./auth/auth'); +// Request logger applied early so all routes are logged +app.use(requestLogger); + +// Initialize passport JWT auth early (before file routes) +const jwtAuth = passport.authenticate('jwt', { session: false }); + +// Mount file upload routes BEFORE body-parser to avoid JSON parsing on binary uploads +// These routes handle their own body parsing (JSON for init/finalize, raw streams for chunks) +app.use('/api/file', fileRoutes); + +// Body parser for all other routes app.use(bodyParser.json({ limit: '1mb' })); app.use(bodyParser.urlencoded({ extended: true, limit: '1mb' })); -app.use(requestLogger); app.use(runtimeContextMiddleware); -const jwtAuth = passport.authenticate('jwt', { session: false }); const requireRuntimeReadOrAuth = (req, res, next) => { const runtimeMode = req.runtimeContext?.mode; @@ -163,7 +172,6 @@ app.get('/api/health', async (req, res) => { }); app.use('/api/auth', authRoutes); -app.use('/api/file', fileRoutes); app.use('/api/pexels', pexelsRoutes); app.use('/api/runtime-context', runtimeContextRoutes); @@ -249,6 +257,14 @@ if (fs.existsSync(publicDir)) { }); } +// Generic error handler +app.use((err, req, res, next) => { + if (!res.headersSent) { + logger.error({ err, url: req.url, method: req.method }, 'Unhandled error'); + res.status(500).json({ message: 'Internal server error' }); + } +}); + const PORT = process.env.NODE_ENV === 'dev_stage' ? 3000 : 8080; app.listen(PORT, () => { diff --git a/backend/src/routes/file.js b/backend/src/routes/file.js index 487fdd5..cb70536 100644 --- a/backend/src/routes/file.js +++ b/backend/src/routes/file.js @@ -1,8 +1,19 @@ const express = require('express'); const passport = require('passport'); +const bodyParser = require('body-parser'); const services = require('../services/file'); const router = express.Router(); +// JSON body parser that ONLY parses application/json content-type +// This prevents errors when binary data is sent or no body is present +const jsonParser = bodyParser.json({ + limit: '1mb', + type: (req) => { + const contentType = req.headers['content-type'] || ''; + return contentType.includes('application/json'); + }, +}); + router.get('/download', (req, res) => { services.downloadFile(req, res); }); @@ -16,6 +27,7 @@ router.post('/upload/:table/:field', passport.authenticate('jwt', {session: fals router.post( '/upload-sessions/init', passport.authenticate('jwt', { session: false }), + jsonParser, (req, res) => { services.initUploadSession(req, res); }, @@ -29,6 +41,7 @@ router.get( }, ); +// Chunk upload - NO body parser, raw stream is read directly by uploadChunk router.put( '/upload-sessions/:sessionId/chunks/:chunkIndex', passport.authenticate('jwt', { session: false }), @@ -37,6 +50,7 @@ router.put( }, ); +// Finalize - NO body parser needed, only uses req.params.sessionId router.post( '/upload-sessions/:sessionId/finalize', passport.authenticate('jwt', { session: false }), diff --git a/backend/src/services/file.js b/backend/src/services/file.js index 791130c..cfe29b7 100644 --- a/backend/src/services/file.js +++ b/backend/src/services/file.js @@ -10,6 +10,8 @@ const { PutObjectCommand, GetObjectCommand, DeleteObjectCommand, + ListObjectsV2Command, + DeleteObjectsCommand, } = require('@aws-sdk/client-s3'); const ensureDirectoryExistence = (filePath) => { @@ -118,6 +120,190 @@ const streamAppendFile = async (targetPath, sourcePath) => { }); } +// S3 session storage helpers +const S3_UPLOAD_SESSIONS_PREFIX = '_upload_sessions'; + +const getS3SessionMetaKey = (prefix, sessionId) => { + const cleanPrefix = (prefix || '').replace(/^\/+|\/+$/g, ''); + const sessionPath = `${S3_UPLOAD_SESSIONS_PREFIX}/${sessionId}/meta.json`; + return cleanPrefix ? `${cleanPrefix}/${sessionPath}` : sessionPath; +}; + +const getS3SessionChunkKey = (prefix, sessionId, chunkIndex) => { + const cleanPrefix = (prefix || '').replace(/^\/+|\/+$/g, ''); + const chunkPath = `${S3_UPLOAD_SESSIONS_PREFIX}/${sessionId}/chunks/${chunkIndex}.part`; + return cleanPrefix ? `${cleanPrefix}/${chunkPath}` : chunkPath; +}; + +const getS3SessionPrefix = (prefix, sessionId) => { + const cleanPrefix = (prefix || '').replace(/^\/+|\/+$/g, ''); + const sessionPath = `${S3_UPLOAD_SESSIONS_PREFIX}/${sessionId}/`; + return cleanPrefix ? `${cleanPrefix}/${sessionPath}` : sessionPath; +}; + +const readS3SessionMeta = async (client, bucket, prefix, sessionId) => { + try { + const key = getS3SessionMetaKey(prefix, sessionId); + const output = await client.send( + new GetObjectCommand({ Bucket: bucket, Key: key }), + ); + + if (!output || !output.Body) { + return null; + } + + const bodyStr = await output.Body.transformToString(); + return JSON.parse(bodyStr); + } catch (error) { + if (error.name === 'NoSuchKey') { + return null; + } + throw error; + } +}; + +const writeS3SessionMeta = async (client, bucket, prefix, sessionId, payload) => { + const key = getS3SessionMetaKey(prefix, sessionId); + await client.send( + new PutObjectCommand({ + Bucket: bucket, + Key: key, + Body: JSON.stringify(payload, null, 2), + ContentType: 'application/json', + }), + ); +}; + +const uploadS3Chunk = async (client, bucket, prefix, sessionId, chunkIndex, body) => { + const key = getS3SessionChunkKey(prefix, sessionId, chunkIndex); + await client.send( + new PutObjectCommand({ + Bucket: bucket, + Key: key, + Body: body, + }), + ); +}; + +const downloadS3Chunk = async (client, bucket, prefix, sessionId, chunkIndex) => { + const key = getS3SessionChunkKey(prefix, sessionId, chunkIndex); + try { + const output = await client.send( + new GetObjectCommand({ Bucket: bucket, Key: key }), + ); + + if (!output || !output.Body) { + return null; + } + + return output.Body; + } catch (error) { + if (error.name === 'NoSuchKey') { + return null; + } + throw error; + } +}; + +const s3ChunkExists = async (client, bucket, prefix, sessionId, chunkIndex) => { + const key = getS3SessionChunkKey(prefix, sessionId, chunkIndex); + try { + await client.send( + new GetObjectCommand({ Bucket: bucket, Key: key, Range: 'bytes=0-0' }), + ); + return true; + } catch (error) { + if (error.name === 'NoSuchKey') { + return false; + } + throw error; + } +}; + +const removeS3UploadSession = async (client, bucket, prefix, sessionId) => { + const sessionPrefix = getS3SessionPrefix(prefix, sessionId); + + try { + const listResult = await client.send( + new ListObjectsV2Command({ + Bucket: bucket, + Prefix: sessionPrefix, + }), + ); + + if (!listResult.Contents || listResult.Contents.length === 0) { + return; + } + + const objectsToDelete = listResult.Contents.map((obj) => ({ Key: obj.Key })); + + await client.send( + new DeleteObjectsCommand({ + Bucket: bucket, + Delete: { Objects: objectsToDelete }, + }), + ); + } catch (error) { + console.error(`Failed to remove S3 upload session ${sessionId}`, error); + } +}; + +const cleanupExpiredS3UploadSessions = async () => { + const provider = getFileStorageProvider(); + if (provider !== 's3') { + return; + } + + try { + const { client, bucket, prefix } = initS3(); + const cleanPrefix = (prefix || '').replace(/^\/+|\/+$/g, ''); + const sessionsPrefix = cleanPrefix + ? `${cleanPrefix}/${S3_UPLOAD_SESSIONS_PREFIX}/` + : `${S3_UPLOAD_SESSIONS_PREFIX}/`; + + const listResult = await client.send( + new ListObjectsV2Command({ + Bucket: bucket, + Prefix: sessionsPrefix, + Delimiter: '/', + }), + ); + + if (!listResult.CommonPrefixes || listResult.CommonPrefixes.length === 0) { + return; + } + + const now = Date.now(); + + for (const prefixObj of listResult.CommonPrefixes) { + const sessionPrefix = prefixObj.Prefix; + const sessionId = sessionPrefix + .replace(sessionsPrefix, '') + .replace(/\/$/, ''); + + if (!sessionId) continue; + + try { + const meta = await readS3SessionMeta(client, bucket, prefix, sessionId); + if (!meta) { + await removeS3UploadSession(client, bucket, prefix, sessionId); + continue; + } + + const updatedAt = new Date(meta.updatedAt || meta.createdAt || 0).getTime(); + if (!updatedAt || now - updatedAt > UPLOAD_SESSION_TTL_MS) { + await removeS3UploadSession(client, bucket, prefix, sessionId); + } + } catch (error) { + console.error(`Failed to cleanup S3 upload session ${sessionId}`, error); + await removeS3UploadSession(client, bucket, prefix, sessionId); + } + } + } catch (error) { + console.error('Failed to cleanup expired S3 upload sessions', error); + } +}; + const uploadLocal = ( folder, validations = { @@ -488,22 +674,6 @@ const deleteS3 = async (privateUrl) => { } } -const uploadStreamToS3 = async (privateUrl, sourcePath, contentType) => { - const { client, bucket, region, prefix } = initS3(); - const key = buildStoragePath(prefix, privateUrl); - - await client.send( - new PutObjectCommand({ - Bucket: bucket, - Key: key, - Body: fs.createReadStream(sourcePath), - ContentType: contentType || undefined, - }), - ); - - return `https://${bucket}.s3.${region}.amazonaws.com/${key}`; -} - const uploadStreamToGCloud = async (privateUrl, sourcePath) => { const { hash, bucket } = initGCloud(); const fullPath = `${hash}/${privateUrl}`; @@ -523,7 +693,16 @@ const initUploadSession = async (req, res) => { return res.sendStatus(403); } - cleanupExpiredUploadSessions(); + const provider = getFileStorageProvider(); + + // Cleanup expired sessions (async for S3, sync for local) + if (provider === 's3') { + cleanupExpiredS3UploadSessions().catch((err) => + console.error('S3 session cleanup failed', err), + ); + } else { + cleanupExpiredUploadSessions(); + } const folder = sanitizeFolder(req.body?.folder); const filename = sanitizeFilename(req.body?.filename); @@ -544,9 +723,6 @@ const initUploadSession = async (req, res) => { } const sessionId = uuid(); - const chunksDir = getSessionChunksDir(sessionId); - fs.mkdirSync(chunksDir, { recursive: true }); - const now = new Date().toISOString(); const session = { id: sessionId, @@ -562,7 +738,14 @@ const initUploadSession = async (req, res) => { updatedAt: now, }; - writeSessionMeta(sessionId, session); + if (provider === 's3') { + const { client, bucket, prefix } = initS3(); + await writeS3SessionMeta(client, bucket, prefix, sessionId, session); + } else { + const chunksDir = getSessionChunksDir(sessionId); + fs.mkdirSync(chunksDir, { recursive: true }); + writeSessionMeta(sessionId, session); + } return res.status(200).send({ sessionId, @@ -582,7 +765,15 @@ const getUploadSession = async (req, res) => { } const sessionId = String(req.params.sessionId || ''); - const session = readSessionMeta(sessionId); + const provider = getFileStorageProvider(); + let session; + + if (provider === 's3') { + const { client, bucket, prefix } = initS3(); + session = await readS3SessionMeta(client, bucket, prefix, sessionId); + } else { + session = readSessionMeta(sessionId); + } if (!session) { return res.status(404).send({ message: 'Upload session not found' }); @@ -617,7 +808,19 @@ const uploadChunk = async (req, res) => { return res.status(400).send({ message: 'Invalid chunk index' }); } - const session = readSessionMeta(sessionId); + const provider = getFileStorageProvider(); + let session; + let s3Client, s3Bucket, s3Prefix; + + if (provider === 's3') { + const s3 = initS3(); + s3Client = s3.client; + s3Bucket = s3.bucket; + s3Prefix = s3.prefix; + session = await readS3SessionMeta(s3Client, s3Bucket, s3Prefix, sessionId); + } else { + session = readSessionMeta(sessionId); + } if (!session) { return res.status(404).send({ message: 'Upload session not found' }); @@ -631,19 +834,32 @@ const uploadChunk = async (req, res) => { return res.status(400).send({ message: 'Chunk index is out of range' }); } - const chunkDir = getSessionChunksDir(sessionId); - fs.mkdirSync(chunkDir, { recursive: true }); + if (provider === 's3') { + // Collect chunk data from request stream + const chunks = []; + for await (const chunk of req) { + chunks.push(chunk); + } + const chunkBuffer = Buffer.concat(chunks); - const chunkPath = getSessionChunkPath(sessionId, chunkIndex); - const tempChunkPath = `${chunkPath}.tmp`; + // Upload chunk directly to S3 + await uploadS3Chunk(s3Client, s3Bucket, s3Prefix, sessionId, chunkIndex, chunkBuffer); + } else { + // Local storage - write to temp file then rename + const chunkDir = getSessionChunksDir(sessionId); + fs.mkdirSync(chunkDir, { recursive: true }); - if (fs.existsSync(tempChunkPath)) { - fs.unlinkSync(tempChunkPath); + const chunkPath = getSessionChunkPath(sessionId, chunkIndex); + const tempChunkPath = `${chunkPath}.tmp`; + + if (fs.existsSync(tempChunkPath)) { + fs.unlinkSync(tempChunkPath); + } + + await pipeline(req, fs.createWriteStream(tempChunkPath)); + fs.renameSync(tempChunkPath, chunkPath); } - await pipeline(req, fs.createWriteStream(tempChunkPath)); - fs.renameSync(tempChunkPath, chunkPath); - const uploadedChunks = Array.from( new Set([...(session.uploadedChunks || []), chunkIndex]), ).sort((a, b) => a - b); @@ -651,7 +867,11 @@ const uploadChunk = async (req, res) => { session.uploadedChunks = uploadedChunks; session.updatedAt = new Date().toISOString(); - writeSessionMeta(sessionId, session); + if (provider === 's3') { + await writeS3SessionMeta(s3Client, s3Bucket, s3Prefix, sessionId, session); + } else { + writeSessionMeta(sessionId, session); + } return res.status(200).send({ sessionId, @@ -672,7 +892,20 @@ const finalizeUploadSession = async (req, res) => { } const sessionId = String(req.params.sessionId || ''); - const session = readSessionMeta(sessionId); + const provider = getFileStorageProvider(); + let session; + let s3Client, s3Bucket, s3Prefix, s3Region; + + if (provider === 's3') { + const s3 = initS3(); + s3Client = s3.client; + s3Bucket = s3.bucket; + s3Prefix = s3.prefix; + s3Region = s3.region; + session = await readS3SessionMeta(s3Client, s3Bucket, s3Prefix, sessionId); + } else { + session = readSessionMeta(sessionId); + } if (!session) { return res.status(404).send({ message: 'Upload session not found' }); @@ -684,50 +917,122 @@ const finalizeUploadSession = async (req, res) => { const totalChunks = Number(session.totalChunks); - for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex += 1) { - const chunkPath = getSessionChunkPath(sessionId, chunkIndex); - if (!fs.existsSync(chunkPath)) { - return res.status(400).send({ - message: `Missing chunk ${chunkIndex}`, - missingChunk: chunkIndex, - }); + // Verify all chunks exist + if (provider === 's3') { + for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex += 1) { + const exists = await s3ChunkExists(s3Client, s3Bucket, s3Prefix, sessionId, chunkIndex); + if (!exists) { + return res.status(400).send({ + message: `Missing chunk ${chunkIndex}`, + missingChunk: chunkIndex, + }); + } + } + } else { + for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex += 1) { + const chunkPath = getSessionChunkPath(sessionId, chunkIndex); + if (!fs.existsSync(chunkPath)) { + return res.status(400).send({ + message: `Missing chunk ${chunkIndex}`, + missingChunk: chunkIndex, + }); + } } } - const assembledPath = path.join(getSessionDir(sessionId), 'assembled.bin'); + // Create temp directory for assembly + const tempDir = path.join(config.uploadDir, '_temp_assembly'); + fs.mkdirSync(tempDir, { recursive: true }); + const assembledPath = path.join(tempDir, `${sessionId}.bin`); if (fs.existsSync(assembledPath)) { fs.unlinkSync(assembledPath); } - for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex += 1) { - const chunkPath = getSessionChunkPath(sessionId, chunkIndex); - await streamAppendFile(assembledPath, chunkPath); + // Download and assemble chunks + if (provider === 's3') { + for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex += 1) { + const chunkStream = await downloadS3Chunk(s3Client, s3Bucket, s3Prefix, sessionId, chunkIndex); + if (!chunkStream) { + // Cleanup and return error + if (fs.existsSync(assembledPath)) fs.unlinkSync(assembledPath); + return res.status(400).send({ + message: `Failed to download chunk ${chunkIndex}`, + missingChunk: chunkIndex, + }); + } + + // Write chunk to assembled file + await new Promise((resolve, reject) => { + const writeStream = fs.createWriteStream(assembledPath, { flags: 'a' }); + writeStream.on('error', reject); + writeStream.on('finish', resolve); + + if (typeof chunkStream.pipe === 'function') { + chunkStream.on('error', reject); + chunkStream.pipe(writeStream, { end: true }); + } else if (typeof chunkStream.transformToByteArray === 'function') { + chunkStream.transformToByteArray() + .then((bytes) => { + writeStream.write(Buffer.from(bytes)); + writeStream.end(); + }) + .catch(reject); + } else { + writeStream.write(chunkStream); + writeStream.end(); + } + }); + } + } else { + for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex += 1) { + const chunkPath = getSessionChunkPath(sessionId, chunkIndex); + await streamAppendFile(assembledPath, chunkPath); + } } const assembledStats = fs.statSync(assembledPath); if (Number.isFinite(Number(session.size)) && Number(session.size) !== assembledStats.size) { + // Cleanup + if (fs.existsSync(assembledPath)) fs.unlinkSync(assembledPath); return res.status(400).send({ message: 'Assembled file size mismatch', }); } const privateUrl = `${session.folder}/${session.filename}`; - const provider = getFileStorageProvider(); let publicUrl = ''; if (provider === 's3') { - publicUrl = await uploadStreamToS3(privateUrl, assembledPath, session.contentType); + // Upload assembled file to final S3 location + const key = buildStoragePath(s3Prefix, privateUrl); + await s3Client.send( + new PutObjectCommand({ + Bucket: s3Bucket, + Key: key, + Body: fs.createReadStream(assembledPath), + ContentType: session.contentType || undefined, + }), + ); + publicUrl = `https://${s3Bucket}.s3.${s3Region}.amazonaws.com/${key}`; + + // Cleanup S3 session + await removeS3UploadSession(s3Client, s3Bucket, s3Prefix, sessionId); } else if (provider === 'gcloud') { publicUrl = await uploadStreamToGCloud(privateUrl, assembledPath); + removeUploadSession(sessionId); } else { const destinationPath = path.join(config.uploadDir, privateUrl); ensureDirectoryExistence(destinationPath); fs.renameSync(assembledPath, destinationPath); publicUrl = `/file/download?privateUrl=${encodeURIComponent(privateUrl)}`; + removeUploadSession(sessionId); } - removeUploadSession(sessionId); + // Cleanup temp assembled file (except for local where we renamed it) + if (provider !== 'local' && fs.existsSync(assembledPath)) { + fs.unlinkSync(assembledPath); + } return res.status(200).send({ message: `Uploaded the file successfully: ${privateUrl}`,