1221 lines
31 KiB
JavaScript
1221 lines
31 KiB
JavaScript
const formidable = require('formidable');
|
|
const fs = require('fs');
|
|
const config = require('../config');
|
|
const path = require('path');
|
|
const { pipeline } = require('stream/promises');
|
|
const { v4: uuid } = require('uuid');
|
|
const { format } = require('util');
|
|
const {
|
|
S3Client,
|
|
PutObjectCommand,
|
|
GetObjectCommand,
|
|
DeleteObjectCommand,
|
|
ListObjectsV2Command,
|
|
DeleteObjectsCommand,
|
|
} = require('@aws-sdk/client-s3');
|
|
const { getSignedUrl } = require('@aws-sdk/s3-request-presigner');
|
|
|
|
const ensureDirectoryExistence = (filePath) => {
|
|
const dirname = path.dirname(filePath);
|
|
|
|
if (fs.existsSync(dirname)) {
|
|
return true;
|
|
}
|
|
|
|
ensureDirectoryExistence(dirname);
|
|
fs.mkdirSync(dirname);
|
|
};
|
|
|
|
const UPLOAD_SESSIONS_DIR = path.join(config.uploadDir, 'upload_sessions');
|
|
const UPLOAD_SESSION_TTL_MS = 24 * 60 * 60 * 1000;
|
|
|
|
const sanitizeFolder = (folder) => {
|
|
const value = String(folder || '')
|
|
.trim()
|
|
.replace(/^\/+|\/+$/g, '');
|
|
|
|
if (!value || value.includes('..')) {
|
|
return null;
|
|
}
|
|
|
|
return value;
|
|
};
|
|
|
|
const sanitizeFilename = (filename) => {
|
|
const value = path.basename(String(filename || '').trim());
|
|
|
|
if (!value || value === '.' || value === '..') {
|
|
return null;
|
|
}
|
|
|
|
return value;
|
|
};
|
|
|
|
const getSessionDir = (sessionId) => path.join(UPLOAD_SESSIONS_DIR, sessionId);
|
|
const getSessionMetaPath = (sessionId) =>
|
|
path.join(getSessionDir(sessionId), 'meta.json');
|
|
const getSessionChunksDir = (sessionId) =>
|
|
path.join(getSessionDir(sessionId), 'chunks');
|
|
const getSessionChunkPath = (sessionId, chunkIndex) =>
|
|
path.join(getSessionChunksDir(sessionId), `${String(chunkIndex)}.part`);
|
|
|
|
const readSessionMeta = (sessionId) => {
|
|
const metaPath = getSessionMetaPath(sessionId);
|
|
|
|
if (!fs.existsSync(metaPath)) {
|
|
return null;
|
|
}
|
|
|
|
const raw = fs.readFileSync(metaPath, 'utf8');
|
|
return JSON.parse(raw);
|
|
};
|
|
|
|
const writeSessionMeta = (sessionId, payload) => {
|
|
const metaPath = getSessionMetaPath(sessionId);
|
|
ensureDirectoryExistence(metaPath);
|
|
fs.writeFileSync(metaPath, JSON.stringify(payload, null, 2), 'utf8');
|
|
};
|
|
|
|
const removeUploadSession = (sessionId) => {
|
|
const sessionDir = getSessionDir(sessionId);
|
|
|
|
if (fs.existsSync(sessionDir)) {
|
|
fs.rmSync(sessionDir, { recursive: true, force: true });
|
|
}
|
|
};
|
|
|
|
const cleanupExpiredUploadSessions = () => {
|
|
if (!fs.existsSync(UPLOAD_SESSIONS_DIR)) {
|
|
return;
|
|
}
|
|
|
|
const now = Date.now();
|
|
const sessionIds = fs.readdirSync(UPLOAD_SESSIONS_DIR);
|
|
|
|
sessionIds.forEach((sessionId) => {
|
|
try {
|
|
const meta = readSessionMeta(sessionId);
|
|
if (!meta) {
|
|
removeUploadSession(sessionId);
|
|
return;
|
|
}
|
|
|
|
const updatedAt = new Date(
|
|
meta.updatedAt || meta.createdAt || 0,
|
|
).getTime();
|
|
if (!updatedAt || now - updatedAt > UPLOAD_SESSION_TTL_MS) {
|
|
removeUploadSession(sessionId);
|
|
}
|
|
} catch (error) {
|
|
console.error(`Failed to cleanup upload session ${sessionId}`, error);
|
|
removeUploadSession(sessionId);
|
|
}
|
|
});
|
|
};
|
|
|
|
const streamAppendFile = async (targetPath, sourcePath) => {
|
|
await new Promise((resolve, reject) => {
|
|
const writeStream = fs.createWriteStream(targetPath, { flags: 'a' });
|
|
const readStream = fs.createReadStream(sourcePath);
|
|
|
|
writeStream.on('error', reject);
|
|
readStream.on('error', reject);
|
|
writeStream.on('finish', resolve);
|
|
readStream.pipe(writeStream, { end: true });
|
|
});
|
|
};
|
|
|
|
// S3 session storage helpers
|
|
const S3_UPLOAD_SESSIONS_PREFIX = '_upload_sessions';
|
|
|
|
const getS3SessionMetaKey = (prefix, sessionId) => {
|
|
const cleanPrefix = (prefix || '').replace(/^\/+|\/+$/g, '');
|
|
const sessionPath = `${S3_UPLOAD_SESSIONS_PREFIX}/${sessionId}/meta.json`;
|
|
return cleanPrefix ? `${cleanPrefix}/${sessionPath}` : sessionPath;
|
|
};
|
|
|
|
const getS3SessionChunkKey = (prefix, sessionId, chunkIndex) => {
|
|
const cleanPrefix = (prefix || '').replace(/^\/+|\/+$/g, '');
|
|
const chunkPath = `${S3_UPLOAD_SESSIONS_PREFIX}/${sessionId}/chunks/${chunkIndex}.part`;
|
|
return cleanPrefix ? `${cleanPrefix}/${chunkPath}` : chunkPath;
|
|
};
|
|
|
|
const getS3SessionPrefix = (prefix, sessionId) => {
|
|
const cleanPrefix = (prefix || '').replace(/^\/+|\/+$/g, '');
|
|
const sessionPath = `${S3_UPLOAD_SESSIONS_PREFIX}/${sessionId}/`;
|
|
return cleanPrefix ? `${cleanPrefix}/${sessionPath}` : sessionPath;
|
|
};
|
|
|
|
const readS3SessionMeta = async (client, bucket, prefix, sessionId) => {
|
|
try {
|
|
const key = getS3SessionMetaKey(prefix, sessionId);
|
|
const output = await client.send(
|
|
new GetObjectCommand({ Bucket: bucket, Key: key }),
|
|
);
|
|
|
|
if (!output || !output.Body) {
|
|
return null;
|
|
}
|
|
|
|
const bodyStr = await output.Body.transformToString();
|
|
return JSON.parse(bodyStr);
|
|
} catch (error) {
|
|
if (error.name === 'NoSuchKey') {
|
|
return null;
|
|
}
|
|
throw error;
|
|
}
|
|
};
|
|
|
|
const writeS3SessionMeta = async (
|
|
client,
|
|
bucket,
|
|
prefix,
|
|
sessionId,
|
|
payload,
|
|
) => {
|
|
const key = getS3SessionMetaKey(prefix, sessionId);
|
|
await client.send(
|
|
new PutObjectCommand({
|
|
Bucket: bucket,
|
|
Key: key,
|
|
Body: JSON.stringify(payload, null, 2),
|
|
ContentType: 'application/json',
|
|
}),
|
|
);
|
|
};
|
|
|
|
const uploadS3Chunk = async (
|
|
client,
|
|
bucket,
|
|
prefix,
|
|
sessionId,
|
|
chunkIndex,
|
|
body,
|
|
) => {
|
|
const key = getS3SessionChunkKey(prefix, sessionId, chunkIndex);
|
|
await client.send(
|
|
new PutObjectCommand({
|
|
Bucket: bucket,
|
|
Key: key,
|
|
Body: body,
|
|
}),
|
|
);
|
|
};
|
|
|
|
const downloadS3Chunk = async (
|
|
client,
|
|
bucket,
|
|
prefix,
|
|
sessionId,
|
|
chunkIndex,
|
|
) => {
|
|
const key = getS3SessionChunkKey(prefix, sessionId, chunkIndex);
|
|
try {
|
|
const output = await client.send(
|
|
new GetObjectCommand({ Bucket: bucket, Key: key }),
|
|
);
|
|
|
|
if (!output || !output.Body) {
|
|
return null;
|
|
}
|
|
|
|
return output.Body;
|
|
} catch (error) {
|
|
if (error.name === 'NoSuchKey') {
|
|
return null;
|
|
}
|
|
throw error;
|
|
}
|
|
};
|
|
|
|
const s3ChunkExists = async (client, bucket, prefix, sessionId, chunkIndex) => {
|
|
const key = getS3SessionChunkKey(prefix, sessionId, chunkIndex);
|
|
try {
|
|
await client.send(
|
|
new GetObjectCommand({ Bucket: bucket, Key: key, Range: 'bytes=0-0' }),
|
|
);
|
|
return true;
|
|
} catch (error) {
|
|
if (error.name === 'NoSuchKey') {
|
|
return false;
|
|
}
|
|
throw error;
|
|
}
|
|
};
|
|
|
|
const removeS3UploadSession = async (client, bucket, prefix, sessionId) => {
|
|
const sessionPrefix = getS3SessionPrefix(prefix, sessionId);
|
|
|
|
try {
|
|
const listResult = await client.send(
|
|
new ListObjectsV2Command({
|
|
Bucket: bucket,
|
|
Prefix: sessionPrefix,
|
|
}),
|
|
);
|
|
|
|
if (!listResult.Contents || listResult.Contents.length === 0) {
|
|
return;
|
|
}
|
|
|
|
const objectsToDelete = listResult.Contents.map((obj) => ({
|
|
Key: obj.Key,
|
|
}));
|
|
|
|
await client.send(
|
|
new DeleteObjectsCommand({
|
|
Bucket: bucket,
|
|
Delete: { Objects: objectsToDelete },
|
|
}),
|
|
);
|
|
} catch (error) {
|
|
console.error(`Failed to remove S3 upload session ${sessionId}`, error);
|
|
}
|
|
};
|
|
|
|
const cleanupExpiredS3UploadSessions = async () => {
|
|
const provider = getFileStorageProvider();
|
|
if (provider !== 's3') {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
const { client, bucket, prefix } = initS3();
|
|
const cleanPrefix = (prefix || '').replace(/^\/+|\/+$/g, '');
|
|
const sessionsPrefix = cleanPrefix
|
|
? `${cleanPrefix}/${S3_UPLOAD_SESSIONS_PREFIX}/`
|
|
: `${S3_UPLOAD_SESSIONS_PREFIX}/`;
|
|
|
|
const listResult = await client.send(
|
|
new ListObjectsV2Command({
|
|
Bucket: bucket,
|
|
Prefix: sessionsPrefix,
|
|
Delimiter: '/',
|
|
}),
|
|
);
|
|
|
|
if (!listResult.CommonPrefixes || listResult.CommonPrefixes.length === 0) {
|
|
return;
|
|
}
|
|
|
|
const now = Date.now();
|
|
|
|
for (const prefixObj of listResult.CommonPrefixes) {
|
|
const sessionPrefix = prefixObj.Prefix;
|
|
const sessionId = sessionPrefix
|
|
.replace(sessionsPrefix, '')
|
|
.replace(/\/$/, '');
|
|
|
|
if (!sessionId) continue;
|
|
|
|
try {
|
|
const meta = await readS3SessionMeta(client, bucket, prefix, sessionId);
|
|
if (!meta) {
|
|
await removeS3UploadSession(client, bucket, prefix, sessionId);
|
|
continue;
|
|
}
|
|
|
|
const updatedAt = new Date(
|
|
meta.updatedAt || meta.createdAt || 0,
|
|
).getTime();
|
|
if (!updatedAt || now - updatedAt > UPLOAD_SESSION_TTL_MS) {
|
|
await removeS3UploadSession(client, bucket, prefix, sessionId);
|
|
}
|
|
} catch (error) {
|
|
console.error(
|
|
`Failed to cleanup S3 upload session ${sessionId}`,
|
|
error,
|
|
);
|
|
await removeS3UploadSession(client, bucket, prefix, sessionId);
|
|
}
|
|
}
|
|
} catch (error) {
|
|
console.error('Failed to cleanup expired S3 upload sessions', error);
|
|
}
|
|
};
|
|
|
|
const uploadLocal = (
|
|
folder,
|
|
validations = {
|
|
entity: null,
|
|
folderIncludesAuthenticationUid: false,
|
|
},
|
|
) => {
|
|
return (req, res) => {
|
|
if (!req.currentUser) {
|
|
res.sendStatus(403);
|
|
return;
|
|
}
|
|
|
|
if (validations.entity) {
|
|
res.sendStatus(403);
|
|
return;
|
|
}
|
|
|
|
if (validations.folderIncludesAuthenticationUid) {
|
|
folder = folder.replace(':userId', req.currentUser.authenticationUid);
|
|
if (
|
|
!req.currentUser.authenticationUid ||
|
|
!folder.includes(req.currentUser.authenticationUid)
|
|
) {
|
|
res.sendStatus(403);
|
|
return;
|
|
}
|
|
}
|
|
|
|
const form = new formidable.IncomingForm();
|
|
form.uploadDir = config.uploadDir;
|
|
|
|
form.parse(req, function (err, fields, files) {
|
|
const filename = String(fields.filename);
|
|
const fileTempUrl = files.file.path;
|
|
|
|
if (!filename) {
|
|
fs.unlinkSync(fileTempUrl);
|
|
res.sendStatus(500);
|
|
return;
|
|
}
|
|
|
|
const privateUrl = path.join(form.uploadDir, folder, filename);
|
|
ensureDirectoryExistence(privateUrl);
|
|
fs.renameSync(fileTempUrl, privateUrl);
|
|
res.sendStatus(200);
|
|
});
|
|
|
|
form.on('error', function (err) {
|
|
res.status(500).send(err);
|
|
});
|
|
};
|
|
};
|
|
|
|
const downloadLocal = async (req, res) => {
|
|
const privateUrl = req.query.privateUrl;
|
|
if (!privateUrl) {
|
|
return res.sendStatus(404);
|
|
}
|
|
res.setHeader('Cross-Origin-Resource-Policy', 'cross-origin');
|
|
res.download(path.join(config.uploadDir, privateUrl));
|
|
};
|
|
|
|
const deleteLocal = async (privateUrl) => {
|
|
try {
|
|
if (!privateUrl) {
|
|
return;
|
|
}
|
|
|
|
const filePath = path.join(config.uploadDir, privateUrl);
|
|
|
|
if (fs.existsSync(filePath)) {
|
|
fs.unlinkSync(filePath);
|
|
}
|
|
} catch (error) {
|
|
console.error(`Cannot delete local file ${privateUrl}`, error);
|
|
throw error;
|
|
}
|
|
};
|
|
|
|
const initGCloud = () => {
|
|
const processFile = require('../middlewares/upload');
|
|
const { Storage } = require('@google-cloud/storage');
|
|
|
|
const hash = config.gcloud.hash;
|
|
|
|
const privateKey = process.env.GC_PRIVATE_KEY.replace(/\\\n/g, '\n');
|
|
|
|
const storage = new Storage({
|
|
projectId: process.env.GC_PROJECT_ID,
|
|
credentials: {
|
|
client_email: process.env.GC_CLIENT_EMAIL,
|
|
private_key: privateKey,
|
|
},
|
|
});
|
|
|
|
const bucket = storage.bucket(config.gcloud.bucket);
|
|
return { hash, bucket, processFile };
|
|
};
|
|
|
|
const getFileStorageProvider = () => {
|
|
const provider = (process.env.FILE_STORAGE_PROVIDER || '')
|
|
.trim()
|
|
.toLowerCase();
|
|
|
|
if (provider) {
|
|
return provider;
|
|
}
|
|
|
|
const hasS3Credentials = Boolean(
|
|
config.s3.bucket &&
|
|
config.s3.region &&
|
|
config.s3.accessKeyId &&
|
|
config.s3.secretAccessKey,
|
|
);
|
|
|
|
if (hasS3Credentials) {
|
|
return 's3';
|
|
}
|
|
|
|
const hasGCloudCredentials = Boolean(
|
|
process.env.GC_PROJECT_ID &&
|
|
process.env.GC_CLIENT_EMAIL &&
|
|
process.env.GC_PRIVATE_KEY &&
|
|
config.gcloud.bucket &&
|
|
config.gcloud.hash,
|
|
);
|
|
|
|
if (hasGCloudCredentials) {
|
|
return 'gcloud';
|
|
}
|
|
|
|
return 'local';
|
|
};
|
|
|
|
const initS3 = () => {
|
|
const processFile = require('../middlewares/upload');
|
|
const client = new S3Client({
|
|
region: config.s3.region,
|
|
credentials: {
|
|
accessKeyId: config.s3.accessKeyId,
|
|
secretAccessKey: config.s3.secretAccessKey,
|
|
},
|
|
});
|
|
|
|
return {
|
|
client,
|
|
bucket: config.s3.bucket,
|
|
region: config.s3.region,
|
|
prefix: config.s3.prefix,
|
|
processFile,
|
|
};
|
|
};
|
|
|
|
const buildStoragePath = (prefix, privateUrl) => {
|
|
const cleanPrefix = (prefix || '').replace(/^\/+|\/+$/g, '');
|
|
const cleanPrivateUrl = String(privateUrl || '').replace(/^\/+/, '');
|
|
|
|
if (!cleanPrefix) {
|
|
return cleanPrivateUrl;
|
|
}
|
|
|
|
return `${cleanPrefix}/${cleanPrivateUrl}`;
|
|
};
|
|
|
|
const uploadGCloud = async (folder, req, res) => {
|
|
try {
|
|
const { hash, bucket, processFile } = initGCloud();
|
|
await processFile(req, res);
|
|
let buffer = await req.file.buffer;
|
|
let filename = await req.body.filename;
|
|
|
|
if (!req.file) {
|
|
return res.status(400).send({ message: 'Please upload a file!' });
|
|
}
|
|
|
|
let path = `${hash}/${folder}/${filename}`;
|
|
let blob = bucket.file(path);
|
|
|
|
console.log(path);
|
|
|
|
const blobStream = blob.createWriteStream({
|
|
resumable: false,
|
|
});
|
|
|
|
blobStream.on('error', (err) => {
|
|
console.log('Upload error');
|
|
console.log(err.message);
|
|
res.status(500).send({ message: err.message });
|
|
});
|
|
|
|
console.log(`https://storage.googleapis.com/${bucket.name}/${blob.name}`);
|
|
|
|
blobStream.on('finish', async () => {
|
|
const publicUrl = format(
|
|
`https://storage.googleapis.com/${bucket.name}/${blob.name}`,
|
|
);
|
|
|
|
res.status(200).send({
|
|
message: 'Uploaded the file successfully: ' + path,
|
|
url: publicUrl,
|
|
});
|
|
});
|
|
|
|
blobStream.end(buffer);
|
|
} catch (err) {
|
|
console.log(err);
|
|
|
|
res.status(500).send({
|
|
message: `Could not upload the file. ${err}`,
|
|
});
|
|
}
|
|
};
|
|
|
|
const downloadGCloud = async (req, res) => {
|
|
try {
|
|
const { hash, bucket } = initGCloud();
|
|
|
|
const privateUrl = await req.query.privateUrl;
|
|
const filePath = `${hash}/${privateUrl}`;
|
|
const file = bucket.file(filePath);
|
|
const fileExists = await file.exists();
|
|
|
|
if (fileExists[0]) {
|
|
res.setHeader('Cross-Origin-Resource-Policy', 'cross-origin');
|
|
const stream = file.createReadStream();
|
|
stream.pipe(res);
|
|
} else {
|
|
res.status(404).send({
|
|
message: 'Could not download the file.',
|
|
});
|
|
}
|
|
} catch (err) {
|
|
res.status(404).send({
|
|
message: 'Could not download the file. ' + err,
|
|
});
|
|
}
|
|
};
|
|
|
|
const uploadS3 = async (folder, req, res) => {
|
|
try {
|
|
const { client, bucket, region, prefix, processFile } = initS3();
|
|
await processFile(req, res);
|
|
|
|
if (!req.file) {
|
|
return res.status(400).send({ message: 'Please upload a file!' });
|
|
}
|
|
|
|
const filename = req.body.filename;
|
|
|
|
if (!filename) {
|
|
return res.status(400).send({ message: 'Missing filename' });
|
|
}
|
|
|
|
const privateUrl = `${folder}/${filename}`;
|
|
const key = buildStoragePath(prefix, privateUrl);
|
|
|
|
await client.send(
|
|
new PutObjectCommand({
|
|
Bucket: bucket,
|
|
Key: key,
|
|
Body: req.file.buffer,
|
|
ContentType: req.file.mimetype,
|
|
}),
|
|
);
|
|
|
|
return res.status(200).send({
|
|
message: `Uploaded the file successfully: ${privateUrl}`,
|
|
url: `https://${bucket}.s3.${region}.amazonaws.com/${key}`,
|
|
});
|
|
} catch (error) {
|
|
console.error('S3 upload error', error);
|
|
return res.status(500).send({
|
|
message: `Could not upload the file. ${error.message || error}`,
|
|
});
|
|
}
|
|
};
|
|
|
|
const downloadS3 = async (req, res) => {
|
|
try {
|
|
const privateUrl = req.query.privateUrl;
|
|
|
|
if (!privateUrl) {
|
|
return res.status(404).send({ message: 'Missing privateUrl' });
|
|
}
|
|
|
|
const { client, bucket, prefix } = initS3();
|
|
const key = buildStoragePath(prefix, privateUrl);
|
|
const output = await client.send(
|
|
new GetObjectCommand({
|
|
Bucket: bucket,
|
|
Key: key,
|
|
}),
|
|
);
|
|
|
|
if (!output || !output.Body) {
|
|
return res.status(404).send({
|
|
message: 'Could not download the file.',
|
|
});
|
|
}
|
|
|
|
if (output.ContentType) {
|
|
res.setHeader('Content-Type', output.ContentType);
|
|
}
|
|
res.setHeader('Cross-Origin-Resource-Policy', 'cross-origin');
|
|
|
|
if (typeof output.Body.pipe === 'function') {
|
|
output.Body.pipe(res);
|
|
return;
|
|
}
|
|
|
|
if (typeof output.Body.transformToByteArray === 'function') {
|
|
const bytes = await output.Body.transformToByteArray();
|
|
res.send(Buffer.from(bytes));
|
|
return;
|
|
}
|
|
|
|
return res.send(output.Body);
|
|
} catch (error) {
|
|
const statusCode = error && error.name === 'NoSuchKey' ? 404 : 500;
|
|
return res.status(statusCode).send({
|
|
message: `Could not download the file. ${error.message || error}`,
|
|
});
|
|
}
|
|
};
|
|
|
|
const deleteGCloud = async (privateUrl) => {
|
|
try {
|
|
const { hash, bucket } = initGCloud();
|
|
const filePath = `${hash}/${privateUrl}`;
|
|
|
|
const file = bucket.file(filePath);
|
|
const fileExists = await file.exists();
|
|
|
|
if (fileExists[0]) {
|
|
file.delete();
|
|
}
|
|
} catch (err) {
|
|
console.log(`Cannot find the file ${privateUrl}`);
|
|
}
|
|
};
|
|
|
|
const deleteS3 = async (privateUrl) => {
|
|
try {
|
|
if (!privateUrl) {
|
|
return;
|
|
}
|
|
|
|
const { client, bucket, prefix } = initS3();
|
|
const key = buildStoragePath(prefix, privateUrl);
|
|
await client.send(
|
|
new DeleteObjectCommand({
|
|
Bucket: bucket,
|
|
Key: key,
|
|
}),
|
|
);
|
|
} catch (error) {
|
|
console.error(`Cannot delete S3 file ${privateUrl}`, error);
|
|
throw error;
|
|
}
|
|
};
|
|
|
|
const uploadStreamToGCloud = async (privateUrl, sourcePath) => {
|
|
const { hash, bucket } = initGCloud();
|
|
const fullPath = `${hash}/${privateUrl}`;
|
|
const blob = bucket.file(fullPath);
|
|
|
|
await pipeline(
|
|
fs.createReadStream(sourcePath),
|
|
blob.createWriteStream({ resumable: false }),
|
|
);
|
|
|
|
return format(`https://storage.googleapis.com/${bucket.name}/${blob.name}`);
|
|
};
|
|
|
|
const initUploadSession = async (req, res) => {
|
|
try {
|
|
if (!req.currentUser || !req.currentUser.id) {
|
|
return res.sendStatus(403);
|
|
}
|
|
|
|
const provider = getFileStorageProvider();
|
|
|
|
// Cleanup expired sessions (async for S3, sync for local)
|
|
if (provider === 's3') {
|
|
cleanupExpiredS3UploadSessions().catch((err) =>
|
|
console.error('S3 session cleanup failed', err),
|
|
);
|
|
} else {
|
|
cleanupExpiredUploadSessions();
|
|
}
|
|
|
|
const folder = sanitizeFolder(req.body?.folder);
|
|
const filename = sanitizeFilename(req.body?.filename);
|
|
const totalChunks = Number(req.body?.totalChunks);
|
|
const size = Number(req.body?.size);
|
|
const contentType = String(req.body?.contentType || '').trim();
|
|
|
|
if (!folder || !filename) {
|
|
return res.status(400).send({ message: 'Invalid folder or filename' });
|
|
}
|
|
|
|
if (!Number.isInteger(totalChunks) || totalChunks <= 0) {
|
|
return res.status(400).send({ message: 'Invalid totalChunks' });
|
|
}
|
|
|
|
if (!Number.isFinite(size) || size < 0) {
|
|
return res.status(400).send({ message: 'Invalid file size' });
|
|
}
|
|
|
|
const sessionId = uuid();
|
|
const now = new Date().toISOString();
|
|
const session = {
|
|
id: sessionId,
|
|
userId: req.currentUser.id,
|
|
folder,
|
|
filename,
|
|
totalChunks,
|
|
size,
|
|
contentType,
|
|
uploadedChunks: [],
|
|
status: 'active',
|
|
createdAt: now,
|
|
updatedAt: now,
|
|
};
|
|
|
|
if (provider === 's3') {
|
|
const { client, bucket, prefix } = initS3();
|
|
await writeS3SessionMeta(client, bucket, prefix, sessionId, session);
|
|
} else {
|
|
const chunksDir = getSessionChunksDir(sessionId);
|
|
fs.mkdirSync(chunksDir, { recursive: true });
|
|
writeSessionMeta(sessionId, session);
|
|
}
|
|
|
|
return res.status(200).send({
|
|
sessionId,
|
|
uploadedChunks: [],
|
|
totalChunks,
|
|
});
|
|
} catch (error) {
|
|
console.error('Failed to initialize upload session', error);
|
|
return res
|
|
.status(500)
|
|
.send({ message: 'Failed to initialize upload session' });
|
|
}
|
|
};
|
|
|
|
const getUploadSession = async (req, res) => {
|
|
try {
|
|
if (!req.currentUser || !req.currentUser.id) {
|
|
return res.sendStatus(403);
|
|
}
|
|
|
|
const sessionId = String(req.params.sessionId || '');
|
|
const provider = getFileStorageProvider();
|
|
let session;
|
|
|
|
if (provider === 's3') {
|
|
const { client, bucket, prefix } = initS3();
|
|
session = await readS3SessionMeta(client, bucket, prefix, sessionId);
|
|
} else {
|
|
session = readSessionMeta(sessionId);
|
|
}
|
|
|
|
if (!session) {
|
|
return res.status(404).send({ message: 'Upload session not found' });
|
|
}
|
|
|
|
if (session.userId !== req.currentUser.id) {
|
|
return res.sendStatus(403);
|
|
}
|
|
|
|
return res.status(200).send({
|
|
sessionId: session.id,
|
|
totalChunks: session.totalChunks,
|
|
uploadedChunks: session.uploadedChunks || [],
|
|
status: session.status,
|
|
});
|
|
} catch (error) {
|
|
console.error('Failed to get upload session', error);
|
|
return res.status(500).send({ message: 'Failed to get upload session' });
|
|
}
|
|
};
|
|
|
|
const uploadChunk = async (req, res) => {
|
|
try {
|
|
if (!req.currentUser || !req.currentUser.id) {
|
|
return res.sendStatus(403);
|
|
}
|
|
|
|
const sessionId = String(req.params.sessionId || '');
|
|
const chunkIndex = Number(req.params.chunkIndex);
|
|
|
|
if (!Number.isInteger(chunkIndex) || chunkIndex < 0) {
|
|
return res.status(400).send({ message: 'Invalid chunk index' });
|
|
}
|
|
|
|
const provider = getFileStorageProvider();
|
|
let session;
|
|
let s3Client, s3Bucket, s3Prefix;
|
|
|
|
if (provider === 's3') {
|
|
const s3 = initS3();
|
|
s3Client = s3.client;
|
|
s3Bucket = s3.bucket;
|
|
s3Prefix = s3.prefix;
|
|
session = await readS3SessionMeta(
|
|
s3Client,
|
|
s3Bucket,
|
|
s3Prefix,
|
|
sessionId,
|
|
);
|
|
} else {
|
|
session = readSessionMeta(sessionId);
|
|
}
|
|
|
|
if (!session) {
|
|
return res.status(404).send({ message: 'Upload session not found' });
|
|
}
|
|
|
|
if (session.userId !== req.currentUser.id) {
|
|
return res.sendStatus(403);
|
|
}
|
|
|
|
if (chunkIndex >= Number(session.totalChunks)) {
|
|
return res.status(400).send({ message: 'Chunk index is out of range' });
|
|
}
|
|
|
|
if (provider === 's3') {
|
|
// Collect chunk data from request stream
|
|
const chunks = [];
|
|
for await (const chunk of req) {
|
|
chunks.push(chunk);
|
|
}
|
|
const chunkBuffer = Buffer.concat(chunks);
|
|
|
|
// Upload chunk directly to S3
|
|
await uploadS3Chunk(
|
|
s3Client,
|
|
s3Bucket,
|
|
s3Prefix,
|
|
sessionId,
|
|
chunkIndex,
|
|
chunkBuffer,
|
|
);
|
|
} else {
|
|
// Local storage - write to temp file then rename
|
|
const chunkDir = getSessionChunksDir(sessionId);
|
|
fs.mkdirSync(chunkDir, { recursive: true });
|
|
|
|
const chunkPath = getSessionChunkPath(sessionId, chunkIndex);
|
|
const tempChunkPath = `${chunkPath}.tmp`;
|
|
|
|
if (fs.existsSync(tempChunkPath)) {
|
|
fs.unlinkSync(tempChunkPath);
|
|
}
|
|
|
|
await pipeline(req, fs.createWriteStream(tempChunkPath));
|
|
fs.renameSync(tempChunkPath, chunkPath);
|
|
}
|
|
|
|
const uploadedChunks = Array.from(
|
|
new Set([...(session.uploadedChunks || []), chunkIndex]),
|
|
).sort((a, b) => a - b);
|
|
|
|
session.uploadedChunks = uploadedChunks;
|
|
session.updatedAt = new Date().toISOString();
|
|
|
|
if (provider === 's3') {
|
|
await writeS3SessionMeta(
|
|
s3Client,
|
|
s3Bucket,
|
|
s3Prefix,
|
|
sessionId,
|
|
session,
|
|
);
|
|
} else {
|
|
writeSessionMeta(sessionId, session);
|
|
}
|
|
|
|
return res.status(200).send({
|
|
sessionId,
|
|
chunkIndex,
|
|
uploadedChunks: uploadedChunks.length,
|
|
totalChunks: session.totalChunks,
|
|
});
|
|
} catch (error) {
|
|
console.error('Failed to upload chunk', error);
|
|
return res.status(500).send({ message: 'Failed to upload chunk' });
|
|
}
|
|
};
|
|
|
|
const finalizeUploadSession = async (req, res) => {
|
|
try {
|
|
if (!req.currentUser || !req.currentUser.id) {
|
|
return res.sendStatus(403);
|
|
}
|
|
|
|
const sessionId = String(req.params.sessionId || '');
|
|
const provider = getFileStorageProvider();
|
|
let session;
|
|
let s3Client, s3Bucket, s3Prefix, s3Region;
|
|
|
|
if (provider === 's3') {
|
|
const s3 = initS3();
|
|
s3Client = s3.client;
|
|
s3Bucket = s3.bucket;
|
|
s3Prefix = s3.prefix;
|
|
s3Region = s3.region;
|
|
session = await readS3SessionMeta(
|
|
s3Client,
|
|
s3Bucket,
|
|
s3Prefix,
|
|
sessionId,
|
|
);
|
|
} else {
|
|
session = readSessionMeta(sessionId);
|
|
}
|
|
|
|
if (!session) {
|
|
return res.status(404).send({ message: 'Upload session not found' });
|
|
}
|
|
|
|
if (session.userId !== req.currentUser.id) {
|
|
return res.sendStatus(403);
|
|
}
|
|
|
|
const totalChunks = Number(session.totalChunks);
|
|
|
|
// Verify all chunks exist
|
|
if (provider === 's3') {
|
|
for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex += 1) {
|
|
const exists = await s3ChunkExists(
|
|
s3Client,
|
|
s3Bucket,
|
|
s3Prefix,
|
|
sessionId,
|
|
chunkIndex,
|
|
);
|
|
if (!exists) {
|
|
return res.status(400).send({
|
|
message: `Missing chunk ${chunkIndex}`,
|
|
missingChunk: chunkIndex,
|
|
});
|
|
}
|
|
}
|
|
} else {
|
|
for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex += 1) {
|
|
const chunkPath = getSessionChunkPath(sessionId, chunkIndex);
|
|
if (!fs.existsSync(chunkPath)) {
|
|
return res.status(400).send({
|
|
message: `Missing chunk ${chunkIndex}`,
|
|
missingChunk: chunkIndex,
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
// Create temp directory for assembly
|
|
const tempDir = path.join(config.uploadDir, '_temp_assembly');
|
|
fs.mkdirSync(tempDir, { recursive: true });
|
|
const assembledPath = path.join(tempDir, `${sessionId}.bin`);
|
|
|
|
if (fs.existsSync(assembledPath)) {
|
|
fs.unlinkSync(assembledPath);
|
|
}
|
|
|
|
// Download and assemble chunks
|
|
if (provider === 's3') {
|
|
for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex += 1) {
|
|
const chunkStream = await downloadS3Chunk(
|
|
s3Client,
|
|
s3Bucket,
|
|
s3Prefix,
|
|
sessionId,
|
|
chunkIndex,
|
|
);
|
|
if (!chunkStream) {
|
|
// Cleanup and return error
|
|
if (fs.existsSync(assembledPath)) fs.unlinkSync(assembledPath);
|
|
return res.status(400).send({
|
|
message: `Failed to download chunk ${chunkIndex}`,
|
|
missingChunk: chunkIndex,
|
|
});
|
|
}
|
|
|
|
// Write chunk to assembled file
|
|
await new Promise((resolve, reject) => {
|
|
const writeStream = fs.createWriteStream(assembledPath, {
|
|
flags: 'a',
|
|
});
|
|
writeStream.on('error', reject);
|
|
writeStream.on('finish', resolve);
|
|
|
|
if (typeof chunkStream.pipe === 'function') {
|
|
chunkStream.on('error', reject);
|
|
chunkStream.pipe(writeStream, { end: true });
|
|
} else if (typeof chunkStream.transformToByteArray === 'function') {
|
|
chunkStream
|
|
.transformToByteArray()
|
|
.then((bytes) => {
|
|
writeStream.write(Buffer.from(bytes));
|
|
writeStream.end();
|
|
})
|
|
.catch(reject);
|
|
} else {
|
|
writeStream.write(chunkStream);
|
|
writeStream.end();
|
|
}
|
|
});
|
|
}
|
|
} else {
|
|
for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex += 1) {
|
|
const chunkPath = getSessionChunkPath(sessionId, chunkIndex);
|
|
await streamAppendFile(assembledPath, chunkPath);
|
|
}
|
|
}
|
|
|
|
const assembledStats = fs.statSync(assembledPath);
|
|
if (
|
|
Number.isFinite(Number(session.size)) &&
|
|
Number(session.size) !== assembledStats.size
|
|
) {
|
|
// Cleanup
|
|
if (fs.existsSync(assembledPath)) fs.unlinkSync(assembledPath);
|
|
return res.status(400).send({
|
|
message: 'Assembled file size mismatch',
|
|
});
|
|
}
|
|
|
|
const privateUrl = `${session.folder}/${session.filename}`;
|
|
let publicUrl = '';
|
|
|
|
if (provider === 's3') {
|
|
// Upload assembled file to final S3 location
|
|
const key = buildStoragePath(s3Prefix, privateUrl);
|
|
await s3Client.send(
|
|
new PutObjectCommand({
|
|
Bucket: s3Bucket,
|
|
Key: key,
|
|
Body: fs.createReadStream(assembledPath),
|
|
ContentType: session.contentType || undefined,
|
|
}),
|
|
);
|
|
publicUrl = `https://${s3Bucket}.s3.${s3Region}.amazonaws.com/${key}`;
|
|
|
|
// Cleanup S3 session
|
|
await removeS3UploadSession(s3Client, s3Bucket, s3Prefix, sessionId);
|
|
} else if (provider === 'gcloud') {
|
|
publicUrl = await uploadStreamToGCloud(privateUrl, assembledPath);
|
|
removeUploadSession(sessionId);
|
|
} else {
|
|
const destinationPath = path.join(config.uploadDir, privateUrl);
|
|
ensureDirectoryExistence(destinationPath);
|
|
fs.renameSync(assembledPath, destinationPath);
|
|
publicUrl = `/file/download?privateUrl=${encodeURIComponent(privateUrl)}`;
|
|
removeUploadSession(sessionId);
|
|
}
|
|
|
|
// Cleanup temp assembled file (except for local where we renamed it)
|
|
if (provider !== 'local' && fs.existsSync(assembledPath)) {
|
|
fs.unlinkSync(assembledPath);
|
|
}
|
|
|
|
return res.status(200).send({
|
|
message: `Uploaded the file successfully: ${privateUrl}`,
|
|
privateUrl,
|
|
url: publicUrl,
|
|
});
|
|
} catch (error) {
|
|
console.error('Failed to finalize upload session', error);
|
|
return res
|
|
.status(500)
|
|
.send({ message: 'Failed to finalize upload session' });
|
|
}
|
|
};
|
|
|
|
const uploadFile = async (folder, req, res) => {
|
|
const provider = getFileStorageProvider();
|
|
|
|
if (provider === 's3') {
|
|
return uploadS3(folder, req, res);
|
|
}
|
|
|
|
if (provider === 'gcloud') {
|
|
return uploadGCloud(folder, req, res);
|
|
}
|
|
|
|
return uploadLocal(folder, {
|
|
entity: null,
|
|
folderIncludesAuthenticationUid: false,
|
|
})(req, res);
|
|
};
|
|
|
|
const downloadFile = async (req, res) => {
|
|
const provider = getFileStorageProvider();
|
|
|
|
if (provider === 's3') {
|
|
return downloadS3(req, res);
|
|
}
|
|
|
|
if (provider === 'gcloud') {
|
|
return downloadGCloud(req, res);
|
|
}
|
|
|
|
return downloadLocal(req, res);
|
|
};
|
|
|
|
const deleteFile = async (privateUrl) => {
|
|
const provider = getFileStorageProvider();
|
|
|
|
if (provider === 's3') {
|
|
return deleteS3(privateUrl);
|
|
}
|
|
|
|
if (provider === 'gcloud') {
|
|
return deleteGCloud(privateUrl);
|
|
}
|
|
|
|
return deleteLocal(privateUrl);
|
|
};
|
|
|
|
const PRESIGN_EXPIRY_SECONDS = 3600; // 1 hour
|
|
|
|
/**
|
|
* Generate presigned GET URLs for multiple assets.
|
|
* For S3: returns direct S3 signed URLs.
|
|
* For other providers: returns backend proxy URLs.
|
|
*
|
|
* @param {string[]} urls - Array of storage_key paths
|
|
* @returns {Promise<Record<string, string>>} Map of original path to presigned/proxy URL
|
|
*/
|
|
const generatePresignedUrls = async (urls) => {
|
|
const provider = getFileStorageProvider();
|
|
|
|
if (provider !== 's3') {
|
|
// For non-S3 providers, return backend proxy URLs
|
|
return urls.reduce((acc, url) => {
|
|
acc[url] = `/api/file/download?privateUrl=${encodeURIComponent(url)}`;
|
|
return acc;
|
|
}, {});
|
|
}
|
|
|
|
const { client, bucket, prefix } = initS3();
|
|
|
|
const presignedUrls = {};
|
|
|
|
await Promise.all(
|
|
urls.map(async (url) => {
|
|
const key = buildStoragePath(prefix, url);
|
|
const command = new GetObjectCommand({ Bucket: bucket, Key: key });
|
|
presignedUrls[url] = await getSignedUrl(client, command, {
|
|
expiresIn: PRESIGN_EXPIRY_SECONDS,
|
|
});
|
|
}),
|
|
);
|
|
|
|
return presignedUrls;
|
|
};
|
|
|
|
module.exports = {
|
|
initUploadSession,
|
|
getUploadSession,
|
|
uploadChunk,
|
|
finalizeUploadSession,
|
|
initGCloud,
|
|
initS3,
|
|
getFileStorageProvider,
|
|
uploadFile,
|
|
downloadFile,
|
|
deleteFile,
|
|
uploadLocal,
|
|
downloadLocal,
|
|
deleteLocal,
|
|
deleteGCloud,
|
|
deleteS3,
|
|
uploadGCloud,
|
|
downloadGCloud,
|
|
uploadS3,
|
|
downloadS3,
|
|
generatePresignedUrls,
|
|
};
|