1305 lines
40 KiB
JavaScript

/**
* File Service
*
* Unified file storage service using Strategy Pattern providers.
* Supports S3, GCloud, and Local storage backends.
*
* Features:
* - Comprehensive error handling with proper HTTP status codes
* - AbortController support for request cancellation
* - Structured logging with Pino
* - Path validation for security
*/
const fs = require('fs');
const path = require('path');
const crypto = require('crypto');
const { pipeline } = require('stream/promises');
const { format } = require('util');
const config = require('../config');
// ============================================================================
// S3 Cache Helpers
// ============================================================================
/**
* Get the local cache path for an S3 key
*/
const getCachePath = (privateUrl) => {
// Create a safe filename from the URL
const hash = crypto.createHash('md5').update(privateUrl).digest('hex');
const ext = path.extname(privateUrl) || '';
return path.join(config.s3CacheDir, `${hash}${ext}`);
};
/**
* Check if a cached file exists and is still valid
* Returns invalid if a download is in progress (.downloading file exists)
*/
const getCachedFile = async (cachePath) => {
try {
// Check if download is in progress - if so, don't use cache
const downloadingPath = cachePath + '.downloading';
try {
await fs.promises.access(downloadingPath);
// Download in progress, cache is not valid
return { stats: null, valid: false };
} catch {
// No download in progress, continue checking cache
}
const stats = await fs.promises.stat(cachePath);
const age = (Date.now() - stats.mtimeMs) / 1000;
if (age < config.s3CacheMaxAge) {
return { stats, valid: true };
}
return { stats, valid: false };
} catch {
return { stats: null, valid: false };
}
};
/**
* Ensure cache directory exists
*/
const ensureCacheDir = async () => {
try {
await fs.promises.mkdir(config.s3CacheDir, { recursive: true });
} catch (err) {
if (err.code !== 'EEXIST') throw err;
}
};
/**
* Generate ETag from file stats
*/
const generateETag = (stats) => {
return `"${stats.size.toString(16)}-${stats.mtimeMs.toString(16)}"`;
};
const { logger } = require('../utils/logger');
const S3StorageProvider = require('./file/S3StorageProvider');
const LocalStorageProvider = require('./file/LocalStorageProvider');
const UploadSessionManager = require('./file/UploadSessionManager');
// ============================================================================
// Provider Initialization (Singleton)
// ============================================================================
let s3Provider = null;
let localProvider = null;
let gcloudBucket = null;
let gcloudHash = null;
let uploadSessionManager = null;
const getFileStorageProvider = () => {
const provider = (process.env.FILE_STORAGE_PROVIDER || '')
.trim()
.toLowerCase();
if (provider) return provider;
const hasS3 = Boolean(
config.s3.bucket &&
config.s3.region &&
config.s3.accessKeyId &&
config.s3.secretAccessKey,
);
if (hasS3) return 's3';
const hasGCloud = Boolean(
process.env.GC_PROJECT_ID &&
process.env.GC_CLIENT_EMAIL &&
process.env.GC_PRIVATE_KEY &&
config.gcloud.bucket &&
config.gcloud.hash,
);
if (hasGCloud) return 'gcloud';
return 'local';
};
const getS3Provider = () => {
if (!s3Provider) {
s3Provider = new S3StorageProvider({
bucket: config.s3.bucket,
region: config.s3.region,
accessKeyId: config.s3.accessKeyId,
secretAccessKey: config.s3.secretAccessKey,
prefix: config.s3.prefix,
// Timeout and connection pool configuration from config
connectionTimeout: config.s3.connectionTimeout,
requestTimeout: config.s3.requestTimeout,
maxAttempts: config.s3.maxAttempts,
maxSockets: config.s3.maxSockets,
keepAlive: config.s3.keepAlive,
});
logger.info(
{
provider: 's3',
bucket: config.s3.bucket,
region: config.s3.region,
connectionTimeout: config.s3.connectionTimeout,
requestTimeout: config.s3.requestTimeout,
maxAttempts: config.s3.maxAttempts,
},
'S3 storage provider initialized',
);
}
return s3Provider;
};
const getLocalProvider = () => {
if (!localProvider) {
localProvider = new LocalStorageProvider({ basePath: config.uploadDir });
}
return localProvider;
};
const getGCloudBucket = () => {
if (!gcloudBucket) {
const { Storage } = require('@google-cloud/storage');
const privateKey = process.env.GC_PRIVATE_KEY.replace(/\\\n/g, '\n');
const storage = new Storage({
projectId: process.env.GC_PROJECT_ID,
credentials: {
client_email: process.env.GC_CLIENT_EMAIL,
private_key: privateKey,
},
});
gcloudBucket = storage.bucket(config.gcloud.bucket);
gcloudHash = config.gcloud.hash;
}
return { bucket: gcloudBucket, hash: gcloudHash };
};
const getUploadSessionManager = () => {
if (!uploadSessionManager) {
uploadSessionManager = new UploadSessionManager({
sessionDir: path.join(config.uploadDir, 'upload_sessions'),
ttlMs: 24 * 60 * 60 * 1000,
});
}
return uploadSessionManager;
};
// ============================================================================
// Error Handling Utilities
// ============================================================================
/**
* Standardized error response format
* @param {string} message - Error message
* @param {string} [code] - Error code for programmatic handling
* @param {Object} [details] - Additional error details
*/
const createErrorResponse = (message, code = null, details = null) => {
const response = { message };
if (code) response.code = code;
if (details) response.details = details;
return response;
};
/**
* Get HTTP status code for S3 errors
*/
const getS3ErrorStatusCode = (error) => {
return S3StorageProvider.getErrorStatusCode(error);
};
/**
* Build user-friendly error message based on error type
*/
const getErrorMessage = (error, operation = 'process') => {
const errorName = error?.name || '';
const errorCode = error?.code || '';
if (
errorName === 'NoSuchKey' ||
errorName === 'NotFound' ||
errorName === 'NoSuchBucket'
) {
return 'File not found';
}
if (errorName === 'AccessDenied' || errorName === 'InvalidAccessKeyId') {
return 'Access denied to file';
}
if (errorName === 'TimeoutError' || errorCode === 'ETIMEDOUT') {
return 'Request timed out while accessing file';
}
if (errorCode === 'ECONNRESET' || errorCode === 'ECONNREFUSED') {
return 'Connection error while accessing storage';
}
if (error?.name === 'AbortError') {
return 'Request was cancelled';
}
return `Could not ${operation} the file`;
};
// ============================================================================
// Path Validation
// ============================================================================
/**
* Validate that a path doesn't contain traversal attacks
* @param {string} urlPath - The path to validate
* @returns {boolean} Whether the path is valid
*/
const isValidPath = (urlPath) => {
if (!urlPath || typeof urlPath !== 'string') return false;
const trimmed = urlPath.trim();
if (!trimmed) return false;
// Check for path traversal attempts
if (trimmed.includes('..')) return false;
if (trimmed.includes('\0')) return false;
// Check for double slashes (potential injection)
if (trimmed.includes('//')) return false;
// Check for protocol indicators
if (/^[a-zA-Z]+:/.test(trimmed)) return false;
return true;
};
// ============================================================================
// Unified Upload/Download/Delete Interface
// ============================================================================
const uploadFile = async (folder, req, res) => {
const provider = getFileStorageProvider();
const log = req.log || logger;
try {
const processFile = require('../middlewares/upload');
await processFile(req, res);
if (!req.file)
return res
.status(400)
.send(createErrorResponse('Please upload a file!', 'MISSING_FILE'));
const filename = req.body.filename;
if (!filename)
return res
.status(400)
.send(createErrorResponse('Missing filename', 'MISSING_FILENAME'));
const privateUrl = `${folder}/${filename}`;
let publicUrl = '';
if (provider === 's3') {
const s3 = getS3Provider();
const result = await s3.upload(privateUrl, req.file.buffer, {
contentType: req.file.mimetype,
});
publicUrl = result.url;
} else if (provider === 'gcloud') {
const { bucket, hash } = getGCloudBucket();
const filePath = `${hash}/${privateUrl}`;
const blob = bucket.file(filePath);
await new Promise((resolve, reject) => {
const blobStream = blob.createWriteStream({ resumable: false });
blobStream.on('error', reject);
blobStream.on('finish', resolve);
blobStream.end(req.file.buffer);
});
publicUrl = format(
`https://storage.googleapis.com/${bucket.name}/${blob.name}`,
);
} else {
const local = getLocalProvider();
await local.upload(privateUrl, req.file.buffer);
publicUrl = `/api/file/download?privateUrl=${encodeURIComponent(privateUrl)}`;
}
log.info({ provider, privateUrl }, 'File uploaded successfully');
return res.status(200).send({
message: `Uploaded the file successfully: ${privateUrl}`,
url: publicUrl,
});
} catch (error) {
log.error({ err: error, provider }, 'Failed to upload file');
return res
.status(500)
.send(
createErrorResponse(
`Could not upload the file. ${error.message || error}`,
'UPLOAD_ERROR',
),
);
}
};
/**
* Parse Range header value
* @param {string} rangeHeader - Range header value (e.g., "bytes=0-1000")
* @param {number} totalSize - Total file size
* @returns {{start: number, end: number} | null}
*/
const parseRangeHeader = (rangeHeader, totalSize) => {
if (!rangeHeader || !rangeHeader.startsWith('bytes=')) return null;
const range = rangeHeader.slice(6); // Remove "bytes="
const parts = range.split('-');
let start = parseInt(parts[0], 10);
let end = parts[1] ? parseInt(parts[1], 10) : totalSize - 1;
// Handle suffix ranges (e.g., bytes=-500 means last 500 bytes)
if (isNaN(start)) {
start = totalSize - end;
end = totalSize - 1;
}
// Validate range
if (isNaN(start) || isNaN(end) || start > end || start >= totalSize) {
return null;
}
// Cap end to file size
end = Math.min(end, totalSize - 1);
return { start, end };
};
const downloadFile = async (req, res) => {
const provider = getFileStorageProvider();
const privateUrl = req.query.privateUrl;
const log = req.log || logger;
if (!privateUrl)
return res
.status(400)
.send(
createErrorResponse(
'Missing privateUrl parameter',
'MISSING_PARAMETER',
),
);
// Validate path
if (!isValidPath(privateUrl)) {
log.warn({ privateUrl }, 'Invalid file path requested');
return res
.status(400)
.send(createErrorResponse('Invalid file path', 'INVALID_PATH'));
}
res.setHeader('Cross-Origin-Resource-Policy', 'cross-origin');
res.setHeader('Accept-Ranges', 'bytes');
// Create AbortController for request cancellation
const abortController = new AbortController();
const { signal } = abortController;
// Abort S3 request if client disconnects
req.on('close', () => {
if (!res.writableEnded) {
log.debug({ privateUrl }, 'Client disconnected, aborting download');
abortController.abort();
}
});
try {
const startTime = Date.now();
if (provider === 's3') {
const s3 = getS3Provider();
const cachePath = getCachePath(privateUrl);
const useCache = config.s3CacheEnabled;
// Check local cache first
if (useCache) {
const { stats, valid } = await getCachedFile(cachePath);
if (valid && stats) {
// Serve from cache
const etag = generateETag(stats);
// Check If-None-Match for conditional request
if (req.headers['if-none-match'] === etag) {
return res.status(304).end();
}
// Determine content type from extension
const ext = path.extname(privateUrl).toLowerCase();
const mimeTypes = {
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.gif': 'image/gif',
'.webp': 'image/webp',
'.svg': 'image/svg+xml',
'.ico': 'image/x-icon',
'.mp4': 'video/mp4',
'.webm': 'video/webm',
'.mp3': 'audio/mpeg',
'.wav': 'audio/wav',
'.pdf': 'application/pdf',
};
if (mimeTypes[ext]) {
res.setHeader('Content-Type', mimeTypes[ext]);
}
// Handle Range requests for cached files
const rangeHeader = req.headers.range;
if (rangeHeader) {
const range = parseRangeHeader(rangeHeader, stats.size);
if (range) {
const { start, end } = range;
const chunkSize = end - start + 1;
res.status(206);
res.setHeader('Content-Range', `bytes ${start}-${end}/${stats.size}`);
res.setHeader('Content-Length', chunkSize);
res.setHeader('ETag', etag);
res.setHeader('Cache-Control', `public, max-age=${config.s3CacheMaxAge}`);
return fs.createReadStream(cachePath, { start, end }).pipe(res);
}
// Invalid range - return 416
res.setHeader('Content-Range', `bytes */${stats.size}`);
return res.status(416).end();
}
// Set caching headers for full file
res.setHeader('ETag', etag);
res.setHeader(
'Cache-Control',
`public, max-age=${config.s3CacheMaxAge}`,
);
res.setHeader('Content-Length', stats.size);
// Stream from cache
return fs.createReadStream(cachePath).pipe(res);
}
}
// Handle Range requests for S3 (bypass cache for partial requests)
const rangeHeader = req.headers.range;
if (rangeHeader) {
// For Range requests, we need to get file size first via headObject
const headResult = await s3.download(privateUrl, { signal, headOnly: true });
const totalSize = headResult.contentLength;
if (!totalSize) {
log.warn({ privateUrl }, 'Cannot determine file size for range request');
return res.status(500).send(createErrorResponse('Cannot determine file size', 'SIZE_UNKNOWN'));
}
const range = parseRangeHeader(rangeHeader, totalSize);
if (!range) {
res.setHeader('Content-Range', `bytes */${totalSize}`);
return res.status(416).end();
}
const { start, end } = range;
const chunkSize = end - start + 1;
// Download range from S3
const rangeResult = await s3.download(privateUrl, {
signal,
range: `bytes=${start}-${end}`,
});
res.status(206);
res.setHeader('Content-Range', `bytes ${start}-${end}/${totalSize}`);
res.setHeader('Content-Length', chunkSize);
if (rangeResult.contentType) res.setHeader('Content-Type', rangeResult.contentType);
res.setHeader('Cache-Control', `public, max-age=${config.s3CacheMaxAge}`);
if (typeof rangeResult.body.pipe === 'function') {
return rangeResult.body.pipe(res);
} else if (typeof rangeResult.body.transformToByteArray === 'function') {
const bytes = await rangeResult.body.transformToByteArray();
return res.send(Buffer.from(bytes));
} else {
return res.send(rangeResult.body);
}
}
// Download from S3 (full file)
const result = await s3.download(privateUrl, { signal });
if (result.contentType) res.setHeader('Content-Type', result.contentType);
if (result.contentLength)
res.setHeader('Content-Length', result.contentLength);
// Add caching headers for browser
res.setHeader('Cache-Control', `public, max-age=${config.s3CacheMaxAge}`);
if (useCache && typeof result.body.pipe === 'function') {
// Stream to both response and cache file using atomic writes
await ensureCacheDir();
const tempPath = cachePath + '.tmp';
const downloadingPath = cachePath + '.downloading';
// Create marker file to indicate download in progress
await fs.promises.writeFile(downloadingPath, '');
const cacheStream = fs.createWriteStream(tempPath);
// Use pipeline for proper error handling
const { PassThrough } = require('stream');
const passThrough = new PassThrough();
result.body.pipe(passThrough);
passThrough.pipe(res);
passThrough.pipe(cacheStream);
// Track bytes written to verify complete download
let bytesWritten = 0;
passThrough.on('data', (chunk) => {
bytesWritten += chunk.length;
});
cacheStream.on('finish', async () => {
try {
// Verify we got the expected size
const expectedSize = result.contentLength;
if (expectedSize && bytesWritten !== expectedSize) {
log.warn(
{ cachePath, bytesWritten, expectedSize },
'Cache file size mismatch, discarding',
);
await fs.promises.unlink(tempPath).catch(() => {});
} else {
// Atomic rename: temp → final
await fs.promises.rename(tempPath, cachePath);
log.debug(
{ cachePath, bytesWritten },
'Cache file written successfully',
);
}
} catch (err) {
log.warn({ err, cachePath }, 'Failed to finalize cache file');
} finally {
// Remove download marker
await fs.promises.unlink(downloadingPath).catch(() => {});
}
});
cacheStream.on('error', async (err) => {
log.warn({ err, cachePath }, 'Failed to write to cache');
// Cleanup temp and marker files
await fs.promises.unlink(tempPath).catch(() => {});
await fs.promises.unlink(downloadingPath).catch(() => {});
});
} else if (typeof result.body.pipe === 'function') {
result.body.pipe(res);
} else if (typeof result.body.transformToByteArray === 'function') {
const bytes = await result.body.transformToByteArray();
const buffer = Buffer.from(bytes);
// Cache the buffer atomically (write to temp, then rename)
if (useCache) {
await ensureCacheDir();
const tempPath = cachePath + '.tmp';
fs.promises
.writeFile(tempPath, buffer)
.then(() => fs.promises.rename(tempPath, cachePath))
.catch((err) => {
log.warn({ err, cachePath }, 'Failed to write to cache');
fs.promises.unlink(tempPath).catch(() => {});
});
}
res.send(buffer);
} else {
res.send(result.body);
}
log.debug(
{
provider,
privateUrl,
duration: Date.now() - startTime,
cached: false,
},
'File downloaded from S3',
);
} else if (provider === 'gcloud') {
const { bucket, hash } = getGCloudBucket();
const file = bucket.file(`${hash}/${privateUrl}`);
const [exists] = await file.exists();
if (exists) {
file.createReadStream().pipe(res);
} else {
res
.status(404)
.send(createErrorResponse('File not found', 'NOT_FOUND'));
}
} else {
// Local storage - support Range requests for video streaming
const localFilePath = path.join(config.uploadDir, privateUrl);
if (!fs.existsSync(localFilePath)) {
return res.status(404).send(createErrorResponse('File not found', 'NOT_FOUND'));
}
const stats = fs.statSync(localFilePath);
const totalSize = stats.size;
// Determine content type from extension
const ext = path.extname(privateUrl).toLowerCase();
const mimeTypes = {
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.gif': 'image/gif',
'.webp': 'image/webp',
'.svg': 'image/svg+xml',
'.ico': 'image/x-icon',
'.mp4': 'video/mp4',
'.webm': 'video/webm',
'.mp3': 'audio/mpeg',
'.wav': 'audio/wav',
'.pdf': 'application/pdf',
};
if (mimeTypes[ext]) {
res.setHeader('Content-Type', mimeTypes[ext]);
}
// Handle Range requests
const rangeHeader = req.headers.range;
if (rangeHeader) {
const range = parseRangeHeader(rangeHeader, totalSize);
if (range) {
const { start, end } = range;
const chunkSize = end - start + 1;
res.status(206);
res.setHeader('Content-Range', `bytes ${start}-${end}/${totalSize}`);
res.setHeader('Content-Length', chunkSize);
return fs.createReadStream(localFilePath, { start, end }).pipe(res);
}
// Invalid range - return 416
res.setHeader('Content-Range', `bytes */${totalSize}`);
return res.status(416).end();
}
// Full file download
res.setHeader('Content-Length', totalSize);
return fs.createReadStream(localFilePath).pipe(res);
}
} catch (error) {
// Don't log abort errors as they're expected when client disconnects
if (error.name === 'AbortError') {
log.debug({ privateUrl }, 'Download aborted by client');
if (!res.headersSent) {
return res.status(499).end(); // Client Closed Request
}
return;
}
const statusCode = provider === 's3' ? getS3ErrorStatusCode(error) : 500;
const errorMessage = getErrorMessage(error, 'download');
log.error(
{
err: error,
provider,
privateUrl,
statusCode,
errorName: error?.name,
errorCode: error?.code,
},
'Failed to download file',
);
if (!res.headersSent) {
return res
.status(statusCode)
.send(
createErrorResponse(errorMessage, error?.name || 'DOWNLOAD_ERROR'),
);
}
}
};
/**
* Delete a file from storage
* @param {string} privateUrl - The file path to delete
* @param {Object} [options] - Delete options
* @param {boolean} [options.throwOnError=false] - Whether to throw errors instead of swallowing them
* @returns {Promise<{ success: boolean, error?: Error }>}
*/
const deleteFile = async (privateUrl, options = {}) => {
if (!privateUrl)
return { success: false, error: new Error('Missing privateUrl') };
const { throwOnError = false } = options;
const provider = getFileStorageProvider();
try {
if (provider === 's3') {
const s3 = getS3Provider();
await s3.delete(privateUrl);
} else if (provider === 'gcloud') {
const { bucket, hash } = getGCloudBucket();
const file = bucket.file(`${hash}/${privateUrl}`);
const [exists] = await file.exists();
if (exists) await file.delete();
} else {
const local = getLocalProvider();
await local.delete(privateUrl);
}
logger.debug({ provider, privateUrl }, 'File deleted successfully');
return { success: true };
} catch (error) {
logger.error({ err: error, provider, privateUrl }, 'Failed to delete file');
if (throwOnError) {
throw error;
}
return { success: false, error };
}
};
// ============================================================================
// Download to Buffer (for processing)
// ============================================================================
/**
* Download a file to buffer (for processing)
* @param {string} privateUrl - Storage key/path
* @returns {Promise<Buffer>}
*/
const downloadToBuffer = async (privateUrl) => {
const provider = getFileStorageProvider();
if (provider === 's3') {
const s3 = getS3Provider();
const result = await s3.download(privateUrl);
// Convert stream to buffer
if (typeof result.body.transformToByteArray === 'function') {
const bytes = await result.body.transformToByteArray();
return Buffer.from(bytes);
}
// Handle readable stream
const chunks = [];
for await (const chunk of result.body) {
chunks.push(chunk);
}
return Buffer.concat(chunks);
} else if (provider === 'gcloud') {
const { bucket, hash } = getGCloudBucket();
const file = bucket.file(`${hash}/${privateUrl}`);
const [data] = await file.download();
return data;
} else {
// Local provider - read directly from filesystem
return fs.readFileSync(path.join(config.uploadDir, privateUrl));
}
};
/**
* Upload buffer to storage
* @param {string} privateUrl - Storage key/path
* @param {Buffer} buffer - File buffer to upload
* @param {Object} options - Upload options
* @param {string} [options.contentType] - MIME type
* @returns {Promise<{ url: string }>}
*/
const uploadBuffer = async (privateUrl, buffer, options = {}) => {
const provider = getFileStorageProvider();
const { contentType = 'application/octet-stream' } = options;
if (provider === 's3') {
const s3 = getS3Provider();
const result = await s3.upload(privateUrl, buffer, { contentType });
return { url: result.url };
} else if (provider === 'gcloud') {
const { bucket, hash } = getGCloudBucket();
const filePath = `${hash}/${privateUrl}`;
const blob = bucket.file(filePath);
await new Promise((resolve, reject) => {
const blobStream = blob.createWriteStream({ resumable: false });
blobStream.on('error', reject);
blobStream.on('finish', resolve);
blobStream.end(buffer);
});
return {
url: `https://storage.googleapis.com/${bucket.name}/${blob.name}`,
};
} else {
const local = getLocalProvider();
await local.upload(privateUrl, buffer);
return {
url: `/api/file/download?privateUrl=${encodeURIComponent(privateUrl)}`,
};
}
};
// ============================================================================
// Chunked Upload Session Management
// ============================================================================
const sanitizeFolder = (folder) => {
const value = String(folder || '')
.trim()
.replace(/^\/+|\/+$/g, '');
return !value || value.includes('..') ? null : value;
};
const sanitizeFilename = (filename) => {
const value = path.basename(String(filename || '').trim());
return !value || value === '.' || value === '..' ? null : value;
};
const initUploadSession = async (req, res) => {
const log = req.log || logger;
try {
if (!req.currentUser?.id) return res.sendStatus(403);
const sessionManager = getUploadSessionManager();
sessionManager.cleanupExpiredSessions();
const folder = sanitizeFolder(req.body?.folder);
const filename = sanitizeFilename(req.body?.filename);
const totalChunks = Number(req.body?.totalChunks);
const size = Number(req.body?.size);
const contentType = String(req.body?.contentType || '').trim();
if (!folder || !filename)
return res
.status(400)
.send(
createErrorResponse('Invalid folder or filename', 'INVALID_INPUT'),
);
if (!Number.isInteger(totalChunks) || totalChunks <= 0)
return res
.status(400)
.send(createErrorResponse('Invalid totalChunks', 'INVALID_INPUT'));
if (!Number.isFinite(size) || size < 0)
return res
.status(400)
.send(createErrorResponse('Invalid file size', 'INVALID_INPUT'));
const sessionId = sessionManager.createSession({
userId: req.currentUser.id,
folder,
filename,
totalChunks,
totalSize: size,
contentType,
});
return res.status(200).send({
sessionId,
uploadedChunks: [],
totalChunks,
});
} catch (error) {
log.error({ err: error }, 'Failed to initialize upload session');
return res
.status(500)
.send(
createErrorResponse(
'Failed to initialize upload session',
'SESSION_INIT_ERROR',
),
);
}
};
const getUploadSession = async (req, res) => {
try {
if (!req.currentUser?.id) return res.sendStatus(403);
const sessionId = String(req.params.sessionId || '');
const sessionManager = getUploadSessionManager();
const session = sessionManager.readMeta(sessionId);
if (!session)
return res
.status(404)
.send(
createErrorResponse('Upload session not found', 'SESSION_NOT_FOUND'),
);
if (session.userId !== req.currentUser.id) return res.sendStatus(403);
return res.status(200).send({
sessionId: session.sessionId,
totalChunks: session.totalChunks,
uploadedChunks: Object.keys(session.uploadedChunks || {}).map(Number),
status: sessionManager.isComplete(sessionId) ? 'complete' : 'uploading',
});
} catch (error) {
const log = req.log || logger;
log.error({ err: error }, 'Failed to get upload session');
return res
.status(500)
.send(
createErrorResponse(
'Failed to get upload session',
'SESSION_GET_ERROR',
),
);
}
};
const uploadChunk = async (req, res) => {
const log = req.log || logger;
try {
if (!req.currentUser?.id) return res.sendStatus(403);
const sessionId = String(req.params.sessionId || '');
const chunkIndex = Number(req.params.chunkIndex);
if (!Number.isInteger(chunkIndex) || chunkIndex < 0) {
return res
.status(400)
.send(createErrorResponse('Invalid chunk index', 'INVALID_INPUT'));
}
const sessionManager = getUploadSessionManager();
const session = sessionManager.readMeta(sessionId);
if (!session)
return res
.status(404)
.send(
createErrorResponse('Upload session not found', 'SESSION_NOT_FOUND'),
);
if (session.userId !== req.currentUser.id) return res.sendStatus(403);
if (chunkIndex >= session.totalChunks)
return res
.status(400)
.send(
createErrorResponse('Chunk index is out of range', 'INVALID_INPUT'),
);
// Collect chunk data
const chunks = [];
for await (const chunk of req) chunks.push(chunk);
const chunkBuffer = Buffer.concat(chunks);
await sessionManager.saveChunk(sessionId, chunkIndex, chunkBuffer);
const updatedSession = sessionManager.readMeta(sessionId);
return res.status(200).send({
sessionId,
chunkIndex,
uploadedChunks: Object.keys(updatedSession.uploadedChunks || {}).length,
totalChunks: session.totalChunks,
});
} catch (error) {
log.error({ err: error }, 'Failed to upload chunk');
return res
.status(500)
.send(
createErrorResponse('Failed to upload chunk', 'CHUNK_UPLOAD_ERROR'),
);
}
};
const finalizeUploadSession = async (req, res) => {
const log = req.log || logger;
try {
if (!req.currentUser?.id) return res.sendStatus(403);
const sessionId = String(req.params.sessionId || '');
const sessionManager = getUploadSessionManager();
const session = sessionManager.readMeta(sessionId);
if (!session)
return res
.status(404)
.send(
createErrorResponse('Upload session not found', 'SESSION_NOT_FOUND'),
);
if (session.userId !== req.currentUser.id) return res.sendStatus(403);
// Verify all chunks exist
for (let i = 0; i < session.totalChunks; i++) {
if (!sessionManager.chunkExists(sessionId, i)) {
return res.status(400).send(
createErrorResponse(`Missing chunk ${i}`, 'MISSING_CHUNK', {
missingChunk: i,
}),
);
}
}
// Assemble file to temp location
const assembledPath = path.join(
config.uploadDir,
`assembled_${sessionId}_${Date.now()}`,
);
await sessionManager.assembleChunks(sessionId, assembledPath);
const privateUrl = `${session.folder}/${session.filename}`;
let publicUrl = '';
const provider = getFileStorageProvider();
try {
if (provider === 's3') {
const s3 = getS3Provider();
const data = fs.readFileSync(assembledPath);
const result = await s3.upload(privateUrl, data, {
contentType: session.contentType,
});
publicUrl = result.url;
} else if (provider === 'gcloud') {
const { bucket, hash } = getGCloudBucket();
const blob = bucket.file(`${hash}/${privateUrl}`);
await pipeline(
fs.createReadStream(assembledPath),
blob.createWriteStream({ resumable: false }),
);
publicUrl = format(
`https://storage.googleapis.com/${bucket.name}/${blob.name}`,
);
} else {
const local = getLocalProvider();
const data = fs.readFileSync(assembledPath);
await local.upload(privateUrl, data);
publicUrl = `/api/file/download?privateUrl=${encodeURIComponent(privateUrl)}`;
}
} finally {
// Cleanup temp assembled file
if (fs.existsSync(assembledPath)) fs.unlinkSync(assembledPath);
}
// Cleanup session
sessionManager.removeSession(sessionId);
log.info({ sessionId, provider, privateUrl }, 'Upload session finalized');
return res.status(200).send({
message: `Uploaded the file successfully: ${privateUrl}`,
privateUrl,
url: publicUrl,
});
} catch (error) {
log.error({ err: error }, 'Failed to finalize upload session');
return res
.status(500)
.send(
createErrorResponse(
'Failed to finalize upload session',
'SESSION_FINALIZE_ERROR',
),
);
}
};
// ============================================================================
// File Copy Utility
// ============================================================================
/**
* Get MIME type from file extension
* @param {string} filepath - File path or storage key
* @returns {string} MIME type or default 'application/octet-stream'
*/
const getMimeTypeFromExtension = (filepath) => {
const ext = path.extname(filepath).toLowerCase();
const mimeTypes = {
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.gif': 'image/gif',
'.webp': 'image/webp',
'.svg': 'image/svg+xml',
'.ico': 'image/x-icon',
'.mp4': 'video/mp4',
'.webm': 'video/webm',
'.mp3': 'audio/mpeg',
'.wav': 'audio/wav',
'.ogg': 'audio/ogg',
'.pdf': 'application/pdf',
'.json': 'application/json',
};
return mimeTypes[ext] || 'application/octet-stream';
};
/**
* Copy a file within storage using provider's native copy
* S3: Uses CopyObjectCommand (server-side, no download/upload)
* Local: Uses fs.copyFile
* GCloud: Falls back to download/upload
*
* @param {string} sourceKey - Source storage key
* @param {string} destKey - Destination storage key
* @param {Object} [options] - Copy options
* @param {string} [options.contentType] - MIME type (auto-detected if not provided)
* @returns {Promise<{ url: string } | { key: string }>}
*/
const copyFile = async (sourceKey, destKey, options = {}) => {
const provider = getFileStorageProvider();
const contentType =
options.contentType || getMimeTypeFromExtension(sourceKey);
if (provider === 's3') {
const s3 = getS3Provider();
const result = await s3.copy(sourceKey, destKey, { contentType });
logger.debug(
{ sourceKey, destKey, provider: 's3' },
'File copied (server-side)',
);
return { url: result.url };
}
if (provider === 'local') {
const local = getLocalProvider();
await local.copy(sourceKey, destKey);
logger.debug({ sourceKey, destKey, provider: 'local' }, 'File copied');
return {
url: `/api/file/download?privateUrl=${encodeURIComponent(destKey)}`,
};
}
// GCloud fallback: download + upload (no native copy implemented)
if (provider === 'gcloud') {
const buffer = await downloadToBuffer(sourceKey);
logger.debug(
{ sourceKey, destKey, provider: 'gcloud', size: buffer.length },
'File copied (download/upload fallback)',
);
return uploadBuffer(destKey, buffer, { contentType });
}
throw new Error(`Unknown storage provider: ${provider}`);
};
/**
* Copy multiple files in parallel with concurrency limit
* @param {Array<{sourceKey: string, destKey: string, contentType?: string}>} copies - Array of copy operations
* @param {Object} [options] - Options
* @param {number} [options.concurrency=10] - Max concurrent copies
* @param {boolean} [options.continueOnError=true] - Continue if individual copy fails
* @returns {Promise<{succeeded: Array, failed: Array<{sourceKey: string, error: string}>}>}
*/
const copyFilesParallel = async (copies, options = {}) => {
const { concurrency = 10, continueOnError = true } = options;
const succeeded = [];
const failed = [];
// Process in chunks for concurrency control
for (let i = 0; i < copies.length; i += concurrency) {
const chunk = copies.slice(i, i + concurrency);
const results = await Promise.allSettled(
chunk.map(({ sourceKey, destKey, contentType }) =>
copyFile(sourceKey, destKey, { contentType }).then((result) => ({
sourceKey,
destKey,
result,
})),
),
);
for (let j = 0; j < results.length; j++) {
const result = results[j];
const copy = chunk[j];
if (result.status === 'fulfilled') {
succeeded.push({ sourceKey: copy.sourceKey, destKey: copy.destKey });
} else {
const errorMsg = result.reason?.message || 'Unknown error';
failed.push({
sourceKey: copy.sourceKey,
destKey: copy.destKey,
error: errorMsg,
});
if (!continueOnError) {
throw new Error(`Copy failed for ${copy.sourceKey}: ${errorMsg}`);
}
logger.warn(
{ sourceKey: copy.sourceKey, error: errorMsg },
'File copy failed',
);
}
}
}
logger.info(
{
succeeded: succeeded.length,
failed: failed.length,
total: copies.length,
},
'Batch file copy completed',
);
return { succeeded, failed };
};
// ============================================================================
// Presigned URLs
// ============================================================================
const getPresignExpirySeconds = () => config.s3.presignExpirySeconds || 3600;
const generatePresignedUrls = async (urls) => {
const provider = getFileStorageProvider();
if (provider !== 's3') {
return urls.reduce((acc, url) => {
acc[url] = `/api/file/download?privateUrl=${encodeURIComponent(url)}`;
return acc;
}, {});
}
const s3 = getS3Provider();
const presignedUrls = {};
const expirySeconds = getPresignExpirySeconds();
await Promise.all(
urls.map(async (url) => {
presignedUrls[url] = await s3.getSignedUrl(url, expirySeconds);
}),
);
return presignedUrls;
};
// ============================================================================
// Exports
// ============================================================================
module.exports = {
// Provider detection
getFileStorageProvider,
getS3Provider,
getLocalProvider,
getGCloudBucket,
// Unified interface
uploadFile,
downloadFile,
deleteFile,
// Buffer operations
downloadToBuffer,
uploadBuffer,
// File copy utilities
copyFile,
copyFilesParallel,
getMimeTypeFromExtension,
// Session-based chunked uploads
initUploadSession,
getUploadSession,
uploadChunk,
finalizeUploadSession,
// Presigned URLs
generatePresignedUrls,
// Utilities (for testing/routes)
isValidPath,
createErrorResponse,
getS3ErrorStatusCode,
};