improved assets uploading

This commit is contained in:
Dmitri 2026-03-20 15:12:56 +04:00
parent 0d1676f942
commit e8f72cb390
3 changed files with 386 additions and 51 deletions

View File

@ -119,11 +119,20 @@ app.use(
app.use(cors({origin: true}));
require('./auth/auth');
// Request logger applied early so all routes are logged
app.use(requestLogger);
// Initialize passport JWT auth early (before file routes)
const jwtAuth = passport.authenticate('jwt', { session: false });
// Mount file upload routes BEFORE body-parser to avoid JSON parsing on binary uploads
// These routes handle their own body parsing (JSON for init/finalize, raw streams for chunks)
app.use('/api/file', fileRoutes);
// Body parser for all other routes
app.use(bodyParser.json({ limit: '1mb' }));
app.use(bodyParser.urlencoded({ extended: true, limit: '1mb' }));
app.use(requestLogger);
app.use(runtimeContextMiddleware);
const jwtAuth = passport.authenticate('jwt', { session: false });
const requireRuntimeReadOrAuth = (req, res, next) => {
const runtimeMode = req.runtimeContext?.mode;
@ -163,7 +172,6 @@ app.get('/api/health', async (req, res) => {
});
app.use('/api/auth', authRoutes);
app.use('/api/file', fileRoutes);
app.use('/api/pexels', pexelsRoutes);
app.use('/api/runtime-context', runtimeContextRoutes);
@ -249,6 +257,14 @@ if (fs.existsSync(publicDir)) {
});
}
// Generic error handler
app.use((err, req, res, next) => {
if (!res.headersSent) {
logger.error({ err, url: req.url, method: req.method }, 'Unhandled error');
res.status(500).json({ message: 'Internal server error' });
}
});
const PORT = process.env.NODE_ENV === 'dev_stage' ? 3000 : 8080;
app.listen(PORT, () => {

View File

@ -1,8 +1,19 @@
const express = require('express');
const passport = require('passport');
const bodyParser = require('body-parser');
const services = require('../services/file');
const router = express.Router();
// JSON body parser that ONLY parses application/json content-type
// This prevents errors when binary data is sent or no body is present
const jsonParser = bodyParser.json({
limit: '1mb',
type: (req) => {
const contentType = req.headers['content-type'] || '';
return contentType.includes('application/json');
},
});
router.get('/download', (req, res) => {
services.downloadFile(req, res);
});
@ -16,6 +27,7 @@ router.post('/upload/:table/:field', passport.authenticate('jwt', {session: fals
router.post(
'/upload-sessions/init',
passport.authenticate('jwt', { session: false }),
jsonParser,
(req, res) => {
services.initUploadSession(req, res);
},
@ -29,6 +41,7 @@ router.get(
},
);
// Chunk upload - NO body parser, raw stream is read directly by uploadChunk
router.put(
'/upload-sessions/:sessionId/chunks/:chunkIndex',
passport.authenticate('jwt', { session: false }),
@ -37,6 +50,7 @@ router.put(
},
);
// Finalize - NO body parser needed, only uses req.params.sessionId
router.post(
'/upload-sessions/:sessionId/finalize',
passport.authenticate('jwt', { session: false }),

View File

@ -10,6 +10,8 @@ const {
PutObjectCommand,
GetObjectCommand,
DeleteObjectCommand,
ListObjectsV2Command,
DeleteObjectsCommand,
} = require('@aws-sdk/client-s3');
const ensureDirectoryExistence = (filePath) => {
@ -118,6 +120,190 @@ const streamAppendFile = async (targetPath, sourcePath) => {
});
}
// S3 session storage helpers
const S3_UPLOAD_SESSIONS_PREFIX = '_upload_sessions';
const getS3SessionMetaKey = (prefix, sessionId) => {
const cleanPrefix = (prefix || '').replace(/^\/+|\/+$/g, '');
const sessionPath = `${S3_UPLOAD_SESSIONS_PREFIX}/${sessionId}/meta.json`;
return cleanPrefix ? `${cleanPrefix}/${sessionPath}` : sessionPath;
};
const getS3SessionChunkKey = (prefix, sessionId, chunkIndex) => {
const cleanPrefix = (prefix || '').replace(/^\/+|\/+$/g, '');
const chunkPath = `${S3_UPLOAD_SESSIONS_PREFIX}/${sessionId}/chunks/${chunkIndex}.part`;
return cleanPrefix ? `${cleanPrefix}/${chunkPath}` : chunkPath;
};
const getS3SessionPrefix = (prefix, sessionId) => {
const cleanPrefix = (prefix || '').replace(/^\/+|\/+$/g, '');
const sessionPath = `${S3_UPLOAD_SESSIONS_PREFIX}/${sessionId}/`;
return cleanPrefix ? `${cleanPrefix}/${sessionPath}` : sessionPath;
};
const readS3SessionMeta = async (client, bucket, prefix, sessionId) => {
try {
const key = getS3SessionMetaKey(prefix, sessionId);
const output = await client.send(
new GetObjectCommand({ Bucket: bucket, Key: key }),
);
if (!output || !output.Body) {
return null;
}
const bodyStr = await output.Body.transformToString();
return JSON.parse(bodyStr);
} catch (error) {
if (error.name === 'NoSuchKey') {
return null;
}
throw error;
}
};
const writeS3SessionMeta = async (client, bucket, prefix, sessionId, payload) => {
const key = getS3SessionMetaKey(prefix, sessionId);
await client.send(
new PutObjectCommand({
Bucket: bucket,
Key: key,
Body: JSON.stringify(payload, null, 2),
ContentType: 'application/json',
}),
);
};
const uploadS3Chunk = async (client, bucket, prefix, sessionId, chunkIndex, body) => {
const key = getS3SessionChunkKey(prefix, sessionId, chunkIndex);
await client.send(
new PutObjectCommand({
Bucket: bucket,
Key: key,
Body: body,
}),
);
};
const downloadS3Chunk = async (client, bucket, prefix, sessionId, chunkIndex) => {
const key = getS3SessionChunkKey(prefix, sessionId, chunkIndex);
try {
const output = await client.send(
new GetObjectCommand({ Bucket: bucket, Key: key }),
);
if (!output || !output.Body) {
return null;
}
return output.Body;
} catch (error) {
if (error.name === 'NoSuchKey') {
return null;
}
throw error;
}
};
const s3ChunkExists = async (client, bucket, prefix, sessionId, chunkIndex) => {
const key = getS3SessionChunkKey(prefix, sessionId, chunkIndex);
try {
await client.send(
new GetObjectCommand({ Bucket: bucket, Key: key, Range: 'bytes=0-0' }),
);
return true;
} catch (error) {
if (error.name === 'NoSuchKey') {
return false;
}
throw error;
}
};
const removeS3UploadSession = async (client, bucket, prefix, sessionId) => {
const sessionPrefix = getS3SessionPrefix(prefix, sessionId);
try {
const listResult = await client.send(
new ListObjectsV2Command({
Bucket: bucket,
Prefix: sessionPrefix,
}),
);
if (!listResult.Contents || listResult.Contents.length === 0) {
return;
}
const objectsToDelete = listResult.Contents.map((obj) => ({ Key: obj.Key }));
await client.send(
new DeleteObjectsCommand({
Bucket: bucket,
Delete: { Objects: objectsToDelete },
}),
);
} catch (error) {
console.error(`Failed to remove S3 upload session ${sessionId}`, error);
}
};
const cleanupExpiredS3UploadSessions = async () => {
const provider = getFileStorageProvider();
if (provider !== 's3') {
return;
}
try {
const { client, bucket, prefix } = initS3();
const cleanPrefix = (prefix || '').replace(/^\/+|\/+$/g, '');
const sessionsPrefix = cleanPrefix
? `${cleanPrefix}/${S3_UPLOAD_SESSIONS_PREFIX}/`
: `${S3_UPLOAD_SESSIONS_PREFIX}/`;
const listResult = await client.send(
new ListObjectsV2Command({
Bucket: bucket,
Prefix: sessionsPrefix,
Delimiter: '/',
}),
);
if (!listResult.CommonPrefixes || listResult.CommonPrefixes.length === 0) {
return;
}
const now = Date.now();
for (const prefixObj of listResult.CommonPrefixes) {
const sessionPrefix = prefixObj.Prefix;
const sessionId = sessionPrefix
.replace(sessionsPrefix, '')
.replace(/\/$/, '');
if (!sessionId) continue;
try {
const meta = await readS3SessionMeta(client, bucket, prefix, sessionId);
if (!meta) {
await removeS3UploadSession(client, bucket, prefix, sessionId);
continue;
}
const updatedAt = new Date(meta.updatedAt || meta.createdAt || 0).getTime();
if (!updatedAt || now - updatedAt > UPLOAD_SESSION_TTL_MS) {
await removeS3UploadSession(client, bucket, prefix, sessionId);
}
} catch (error) {
console.error(`Failed to cleanup S3 upload session ${sessionId}`, error);
await removeS3UploadSession(client, bucket, prefix, sessionId);
}
}
} catch (error) {
console.error('Failed to cleanup expired S3 upload sessions', error);
}
};
const uploadLocal = (
folder,
validations = {
@ -488,22 +674,6 @@ const deleteS3 = async (privateUrl) => {
}
}
const uploadStreamToS3 = async (privateUrl, sourcePath, contentType) => {
const { client, bucket, region, prefix } = initS3();
const key = buildStoragePath(prefix, privateUrl);
await client.send(
new PutObjectCommand({
Bucket: bucket,
Key: key,
Body: fs.createReadStream(sourcePath),
ContentType: contentType || undefined,
}),
);
return `https://${bucket}.s3.${region}.amazonaws.com/${key}`;
}
const uploadStreamToGCloud = async (privateUrl, sourcePath) => {
const { hash, bucket } = initGCloud();
const fullPath = `${hash}/${privateUrl}`;
@ -523,7 +693,16 @@ const initUploadSession = async (req, res) => {
return res.sendStatus(403);
}
cleanupExpiredUploadSessions();
const provider = getFileStorageProvider();
// Cleanup expired sessions (async for S3, sync for local)
if (provider === 's3') {
cleanupExpiredS3UploadSessions().catch((err) =>
console.error('S3 session cleanup failed', err),
);
} else {
cleanupExpiredUploadSessions();
}
const folder = sanitizeFolder(req.body?.folder);
const filename = sanitizeFilename(req.body?.filename);
@ -544,9 +723,6 @@ const initUploadSession = async (req, res) => {
}
const sessionId = uuid();
const chunksDir = getSessionChunksDir(sessionId);
fs.mkdirSync(chunksDir, { recursive: true });
const now = new Date().toISOString();
const session = {
id: sessionId,
@ -562,7 +738,14 @@ const initUploadSession = async (req, res) => {
updatedAt: now,
};
writeSessionMeta(sessionId, session);
if (provider === 's3') {
const { client, bucket, prefix } = initS3();
await writeS3SessionMeta(client, bucket, prefix, sessionId, session);
} else {
const chunksDir = getSessionChunksDir(sessionId);
fs.mkdirSync(chunksDir, { recursive: true });
writeSessionMeta(sessionId, session);
}
return res.status(200).send({
sessionId,
@ -582,7 +765,15 @@ const getUploadSession = async (req, res) => {
}
const sessionId = String(req.params.sessionId || '');
const session = readSessionMeta(sessionId);
const provider = getFileStorageProvider();
let session;
if (provider === 's3') {
const { client, bucket, prefix } = initS3();
session = await readS3SessionMeta(client, bucket, prefix, sessionId);
} else {
session = readSessionMeta(sessionId);
}
if (!session) {
return res.status(404).send({ message: 'Upload session not found' });
@ -617,7 +808,19 @@ const uploadChunk = async (req, res) => {
return res.status(400).send({ message: 'Invalid chunk index' });
}
const session = readSessionMeta(sessionId);
const provider = getFileStorageProvider();
let session;
let s3Client, s3Bucket, s3Prefix;
if (provider === 's3') {
const s3 = initS3();
s3Client = s3.client;
s3Bucket = s3.bucket;
s3Prefix = s3.prefix;
session = await readS3SessionMeta(s3Client, s3Bucket, s3Prefix, sessionId);
} else {
session = readSessionMeta(sessionId);
}
if (!session) {
return res.status(404).send({ message: 'Upload session not found' });
@ -631,19 +834,32 @@ const uploadChunk = async (req, res) => {
return res.status(400).send({ message: 'Chunk index is out of range' });
}
const chunkDir = getSessionChunksDir(sessionId);
fs.mkdirSync(chunkDir, { recursive: true });
if (provider === 's3') {
// Collect chunk data from request stream
const chunks = [];
for await (const chunk of req) {
chunks.push(chunk);
}
const chunkBuffer = Buffer.concat(chunks);
const chunkPath = getSessionChunkPath(sessionId, chunkIndex);
const tempChunkPath = `${chunkPath}.tmp`;
// Upload chunk directly to S3
await uploadS3Chunk(s3Client, s3Bucket, s3Prefix, sessionId, chunkIndex, chunkBuffer);
} else {
// Local storage - write to temp file then rename
const chunkDir = getSessionChunksDir(sessionId);
fs.mkdirSync(chunkDir, { recursive: true });
if (fs.existsSync(tempChunkPath)) {
fs.unlinkSync(tempChunkPath);
const chunkPath = getSessionChunkPath(sessionId, chunkIndex);
const tempChunkPath = `${chunkPath}.tmp`;
if (fs.existsSync(tempChunkPath)) {
fs.unlinkSync(tempChunkPath);
}
await pipeline(req, fs.createWriteStream(tempChunkPath));
fs.renameSync(tempChunkPath, chunkPath);
}
await pipeline(req, fs.createWriteStream(tempChunkPath));
fs.renameSync(tempChunkPath, chunkPath);
const uploadedChunks = Array.from(
new Set([...(session.uploadedChunks || []), chunkIndex]),
).sort((a, b) => a - b);
@ -651,7 +867,11 @@ const uploadChunk = async (req, res) => {
session.uploadedChunks = uploadedChunks;
session.updatedAt = new Date().toISOString();
writeSessionMeta(sessionId, session);
if (provider === 's3') {
await writeS3SessionMeta(s3Client, s3Bucket, s3Prefix, sessionId, session);
} else {
writeSessionMeta(sessionId, session);
}
return res.status(200).send({
sessionId,
@ -672,7 +892,20 @@ const finalizeUploadSession = async (req, res) => {
}
const sessionId = String(req.params.sessionId || '');
const session = readSessionMeta(sessionId);
const provider = getFileStorageProvider();
let session;
let s3Client, s3Bucket, s3Prefix, s3Region;
if (provider === 's3') {
const s3 = initS3();
s3Client = s3.client;
s3Bucket = s3.bucket;
s3Prefix = s3.prefix;
s3Region = s3.region;
session = await readS3SessionMeta(s3Client, s3Bucket, s3Prefix, sessionId);
} else {
session = readSessionMeta(sessionId);
}
if (!session) {
return res.status(404).send({ message: 'Upload session not found' });
@ -684,50 +917,122 @@ const finalizeUploadSession = async (req, res) => {
const totalChunks = Number(session.totalChunks);
for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex += 1) {
const chunkPath = getSessionChunkPath(sessionId, chunkIndex);
if (!fs.existsSync(chunkPath)) {
return res.status(400).send({
message: `Missing chunk ${chunkIndex}`,
missingChunk: chunkIndex,
});
// Verify all chunks exist
if (provider === 's3') {
for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex += 1) {
const exists = await s3ChunkExists(s3Client, s3Bucket, s3Prefix, sessionId, chunkIndex);
if (!exists) {
return res.status(400).send({
message: `Missing chunk ${chunkIndex}`,
missingChunk: chunkIndex,
});
}
}
} else {
for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex += 1) {
const chunkPath = getSessionChunkPath(sessionId, chunkIndex);
if (!fs.existsSync(chunkPath)) {
return res.status(400).send({
message: `Missing chunk ${chunkIndex}`,
missingChunk: chunkIndex,
});
}
}
}
const assembledPath = path.join(getSessionDir(sessionId), 'assembled.bin');
// Create temp directory for assembly
const tempDir = path.join(config.uploadDir, '_temp_assembly');
fs.mkdirSync(tempDir, { recursive: true });
const assembledPath = path.join(tempDir, `${sessionId}.bin`);
if (fs.existsSync(assembledPath)) {
fs.unlinkSync(assembledPath);
}
for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex += 1) {
const chunkPath = getSessionChunkPath(sessionId, chunkIndex);
await streamAppendFile(assembledPath, chunkPath);
// Download and assemble chunks
if (provider === 's3') {
for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex += 1) {
const chunkStream = await downloadS3Chunk(s3Client, s3Bucket, s3Prefix, sessionId, chunkIndex);
if (!chunkStream) {
// Cleanup and return error
if (fs.existsSync(assembledPath)) fs.unlinkSync(assembledPath);
return res.status(400).send({
message: `Failed to download chunk ${chunkIndex}`,
missingChunk: chunkIndex,
});
}
// Write chunk to assembled file
await new Promise((resolve, reject) => {
const writeStream = fs.createWriteStream(assembledPath, { flags: 'a' });
writeStream.on('error', reject);
writeStream.on('finish', resolve);
if (typeof chunkStream.pipe === 'function') {
chunkStream.on('error', reject);
chunkStream.pipe(writeStream, { end: true });
} else if (typeof chunkStream.transformToByteArray === 'function') {
chunkStream.transformToByteArray()
.then((bytes) => {
writeStream.write(Buffer.from(bytes));
writeStream.end();
})
.catch(reject);
} else {
writeStream.write(chunkStream);
writeStream.end();
}
});
}
} else {
for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex += 1) {
const chunkPath = getSessionChunkPath(sessionId, chunkIndex);
await streamAppendFile(assembledPath, chunkPath);
}
}
const assembledStats = fs.statSync(assembledPath);
if (Number.isFinite(Number(session.size)) && Number(session.size) !== assembledStats.size) {
// Cleanup
if (fs.existsSync(assembledPath)) fs.unlinkSync(assembledPath);
return res.status(400).send({
message: 'Assembled file size mismatch',
});
}
const privateUrl = `${session.folder}/${session.filename}`;
const provider = getFileStorageProvider();
let publicUrl = '';
if (provider === 's3') {
publicUrl = await uploadStreamToS3(privateUrl, assembledPath, session.contentType);
// Upload assembled file to final S3 location
const key = buildStoragePath(s3Prefix, privateUrl);
await s3Client.send(
new PutObjectCommand({
Bucket: s3Bucket,
Key: key,
Body: fs.createReadStream(assembledPath),
ContentType: session.contentType || undefined,
}),
);
publicUrl = `https://${s3Bucket}.s3.${s3Region}.amazonaws.com/${key}`;
// Cleanup S3 session
await removeS3UploadSession(s3Client, s3Bucket, s3Prefix, sessionId);
} else if (provider === 'gcloud') {
publicUrl = await uploadStreamToGCloud(privateUrl, assembledPath);
removeUploadSession(sessionId);
} else {
const destinationPath = path.join(config.uploadDir, privateUrl);
ensureDirectoryExistence(destinationPath);
fs.renameSync(assembledPath, destinationPath);
publicUrl = `/file/download?privateUrl=${encodeURIComponent(privateUrl)}`;
removeUploadSession(sessionId);
}
removeUploadSession(sessionId);
// Cleanup temp assembled file (except for local where we renamed it)
if (provider !== 'local' && fs.existsSync(assembledPath)) {
fs.unlinkSync(assembledPath);
}
return res.status(200).send({
message: `Uploaded the file successfully: ${privateUrl}`,