Compare commits

...

1 Commits

Author SHA1 Message Date
Flatlogic Bot
ed1476b36a V.1 2026-03-07 13:49:30 +00:00
2 changed files with 41 additions and 36 deletions

View File

@ -139,8 +139,6 @@ router.post('/bulk-import', wrapAsync(async (req, res) => {
const referer = req.headers.referer || `${req.protocol}://${req.hostname}${req.originalUrl}`;
const link = new URL(referer);
await CompaniesService.bulkImport(req, res, true, link.host);
const payload = true;
res.status(200).send(payload);
}));
/**

View File

@ -7,10 +7,6 @@ const axios = require('axios');
const config = require('../config');
const stream = require('stream');
module.exports = class CompaniesService {
static async create(data, currentUser) {
const transaction = await db.sequelize.transaction();
@ -31,37 +27,52 @@ module.exports = class CompaniesService {
};
static async bulkImport(req, res, sendInvitationEmails = true, host) {
await processFile(req, res);
if (!req.file) {
throw new Error('No file uploaded');
}
// Return job ID immediately (asynchronous processing)
const jobId = Date.now().toString();
res.status(202).send({ jobId });
// Background processing
this.processCsvInBackground(req, req.file.buffer, req.currentUser);
}
static async processCsvInBackground(req, fileBuffer, currentUser) {
const bufferStream = new stream.PassThrough();
bufferStream.end(fileBuffer);
let batch = [];
const BATCH_SIZE = 500;
for await (const data of bufferStream.pipe(csv())) {
batch.push(data);
if (batch.length >= BATCH_SIZE) {
await this.processBatch(batch, currentUser);
batch = [];
}
}
if (batch.length > 0) {
await this.processBatch(batch, currentUser);
}
console.log('Import finished');
}
static async processBatch(batch, currentUser) {
const transaction = await db.sequelize.transaction();
try {
await processFile(req, res);
const bufferStream = new stream.PassThrough();
const results = [];
await bufferStream.end(Buffer.from(req.file.buffer, "utf-8")); // convert Buffer to Stream
await new Promise((resolve, reject) => {
bufferStream
.pipe(csv())
.on('data', (data) => results.push(data))
.on('end', async () => {
console.log('CSV results', results);
resolve();
})
.on('error', (error) => reject(error));
})
await CompaniesDBApi.bulkImport(results, {
transaction,
ignoreDuplicates: true,
validate: true,
currentUser: req.currentUser
await CompaniesDBApi.bulkImport(batch, {
transaction,
ignoreDuplicates: true,
validate: true,
currentUser: currentUser
});
await transaction.commit();
} catch (error) {
await transaction.rollback();
throw error;
console.error('Error processing batch:', error);
}
}
@ -131,8 +142,4 @@ module.exports = class CompaniesService {
throw error;
}
}
};
};