diff --git a/backend/src/routes/companies.js b/backend/src/routes/companies.js index 90fb487..b62d64c 100644 --- a/backend/src/routes/companies.js +++ b/backend/src/routes/companies.js @@ -139,8 +139,6 @@ router.post('/bulk-import', wrapAsync(async (req, res) => { const referer = req.headers.referer || `${req.protocol}://${req.hostname}${req.originalUrl}`; const link = new URL(referer); await CompaniesService.bulkImport(req, res, true, link.host); - const payload = true; - res.status(200).send(payload); })); /** diff --git a/backend/src/services/companies.js b/backend/src/services/companies.js index 67af406..e9568d2 100644 --- a/backend/src/services/companies.js +++ b/backend/src/services/companies.js @@ -7,10 +7,6 @@ const axios = require('axios'); const config = require('../config'); const stream = require('stream'); - - - - module.exports = class CompaniesService { static async create(data, currentUser) { const transaction = await db.sequelize.transaction(); @@ -31,37 +27,52 @@ module.exports = class CompaniesService { }; static async bulkImport(req, res, sendInvitationEmails = true, host) { + await processFile(req, res); + if (!req.file) { + throw new Error('No file uploaded'); + } + + // Return job ID immediately (asynchronous processing) + const jobId = Date.now().toString(); + res.status(202).send({ jobId }); + + // Background processing + this.processCsvInBackground(req, req.file.buffer, req.currentUser); + } + + static async processCsvInBackground(req, fileBuffer, currentUser) { + const bufferStream = new stream.PassThrough(); + bufferStream.end(fileBuffer); + + let batch = []; + const BATCH_SIZE = 500; + + for await (const data of bufferStream.pipe(csv())) { + batch.push(data); + if (batch.length >= BATCH_SIZE) { + await this.processBatch(batch, currentUser); + batch = []; + } + } + if (batch.length > 0) { + await this.processBatch(batch, currentUser); + } + console.log('Import finished'); + } + + static async processBatch(batch, currentUser) { const transaction = await db.sequelize.transaction(); - try { - await processFile(req, res); - const bufferStream = new stream.PassThrough(); - const results = []; - - await bufferStream.end(Buffer.from(req.file.buffer, "utf-8")); // convert Buffer to Stream - - await new Promise((resolve, reject) => { - bufferStream - .pipe(csv()) - .on('data', (data) => results.push(data)) - .on('end', async () => { - console.log('CSV results', results); - resolve(); - }) - .on('error', (error) => reject(error)); - }) - - await CompaniesDBApi.bulkImport(results, { - transaction, - ignoreDuplicates: true, - validate: true, - currentUser: req.currentUser + await CompaniesDBApi.bulkImport(batch, { + transaction, + ignoreDuplicates: true, + validate: true, + currentUser: currentUser }); - await transaction.commit(); } catch (error) { await transaction.rollback(); - throw error; + console.error('Error processing batch:', error); } } @@ -131,8 +142,4 @@ module.exports = class CompaniesService { throw error; } } - - -}; - - +}; \ No newline at end of file