Compare commits

...

1 Commits

Author SHA1 Message Date
Flatlogic Bot
ed1476b36a V.1 2026-03-07 13:49:30 +00:00
2 changed files with 41 additions and 36 deletions

View File

@ -139,8 +139,6 @@ router.post('/bulk-import', wrapAsync(async (req, res) => {
const referer = req.headers.referer || `${req.protocol}://${req.hostname}${req.originalUrl}`; const referer = req.headers.referer || `${req.protocol}://${req.hostname}${req.originalUrl}`;
const link = new URL(referer); const link = new URL(referer);
await CompaniesService.bulkImport(req, res, true, link.host); await CompaniesService.bulkImport(req, res, true, link.host);
const payload = true;
res.status(200).send(payload);
})); }));
/** /**

View File

@ -7,10 +7,6 @@ const axios = require('axios');
const config = require('../config'); const config = require('../config');
const stream = require('stream'); const stream = require('stream');
module.exports = class CompaniesService { module.exports = class CompaniesService {
static async create(data, currentUser) { static async create(data, currentUser) {
const transaction = await db.sequelize.transaction(); const transaction = await db.sequelize.transaction();
@ -31,37 +27,52 @@ module.exports = class CompaniesService {
}; };
static async bulkImport(req, res, sendInvitationEmails = true, host) { static async bulkImport(req, res, sendInvitationEmails = true, host) {
const transaction = await db.sequelize.transaction();
try {
await processFile(req, res); await processFile(req, res);
if (!req.file) {
throw new Error('No file uploaded');
}
// Return job ID immediately (asynchronous processing)
const jobId = Date.now().toString();
res.status(202).send({ jobId });
// Background processing
this.processCsvInBackground(req, req.file.buffer, req.currentUser);
}
static async processCsvInBackground(req, fileBuffer, currentUser) {
const bufferStream = new stream.PassThrough(); const bufferStream = new stream.PassThrough();
const results = []; bufferStream.end(fileBuffer);
await bufferStream.end(Buffer.from(req.file.buffer, "utf-8")); // convert Buffer to Stream let batch = [];
const BATCH_SIZE = 500;
await new Promise((resolve, reject) => { for await (const data of bufferStream.pipe(csv())) {
bufferStream batch.push(data);
.pipe(csv()) if (batch.length >= BATCH_SIZE) {
.on('data', (data) => results.push(data)) await this.processBatch(batch, currentUser);
.on('end', async () => { batch = [];
console.log('CSV results', results); }
resolve(); }
}) if (batch.length > 0) {
.on('error', (error) => reject(error)); await this.processBatch(batch, currentUser);
}) }
console.log('Import finished');
}
await CompaniesDBApi.bulkImport(results, { static async processBatch(batch, currentUser) {
const transaction = await db.sequelize.transaction();
try {
await CompaniesDBApi.bulkImport(batch, {
transaction, transaction,
ignoreDuplicates: true, ignoreDuplicates: true,
validate: true, validate: true,
currentUser: req.currentUser currentUser: currentUser
}); });
await transaction.commit(); await transaction.commit();
} catch (error) { } catch (error) {
await transaction.rollback(); await transaction.rollback();
throw error; console.error('Error processing batch:', error);
} }
} }
@ -131,8 +142,4 @@ module.exports = class CompaniesService {
throw error; throw error;
} }
} }
}; };