Compare commits

..

No commits in common. "ai-dev" and "master" have entirely different histories.

2 changed files with 36 additions and 41 deletions

View File

@ -139,6 +139,8 @@ router.post('/bulk-import', wrapAsync(async (req, res) => {
const referer = req.headers.referer || `${req.protocol}://${req.hostname}${req.originalUrl}`; const referer = req.headers.referer || `${req.protocol}://${req.hostname}${req.originalUrl}`;
const link = new URL(referer); const link = new URL(referer);
await CompaniesService.bulkImport(req, res, true, link.host); await CompaniesService.bulkImport(req, res, true, link.host);
const payload = true;
res.status(200).send(payload);
})); }));
/** /**

View File

@ -7,6 +7,10 @@ const axios = require('axios');
const config = require('../config'); const config = require('../config');
const stream = require('stream'); const stream = require('stream');
module.exports = class CompaniesService { module.exports = class CompaniesService {
static async create(data, currentUser) { static async create(data, currentUser) {
const transaction = await db.sequelize.transaction(); const transaction = await db.sequelize.transaction();
@ -27,52 +31,37 @@ module.exports = class CompaniesService {
}; };
static async bulkImport(req, res, sendInvitationEmails = true, host) { static async bulkImport(req, res, sendInvitationEmails = true, host) {
await processFile(req, res);
if (!req.file) {
throw new Error('No file uploaded');
}
// Return job ID immediately (asynchronous processing)
const jobId = Date.now().toString();
res.status(202).send({ jobId });
// Background processing
this.processCsvInBackground(req, req.file.buffer, req.currentUser);
}
static async processCsvInBackground(req, fileBuffer, currentUser) {
const bufferStream = new stream.PassThrough();
bufferStream.end(fileBuffer);
let batch = [];
const BATCH_SIZE = 500;
for await (const data of bufferStream.pipe(csv())) {
batch.push(data);
if (batch.length >= BATCH_SIZE) {
await this.processBatch(batch, currentUser);
batch = [];
}
}
if (batch.length > 0) {
await this.processBatch(batch, currentUser);
}
console.log('Import finished');
}
static async processBatch(batch, currentUser) {
const transaction = await db.sequelize.transaction(); const transaction = await db.sequelize.transaction();
try { try {
await CompaniesDBApi.bulkImport(batch, { await processFile(req, res);
transaction, const bufferStream = new stream.PassThrough();
ignoreDuplicates: true, const results = [];
validate: true,
currentUser: currentUser await bufferStream.end(Buffer.from(req.file.buffer, "utf-8")); // convert Buffer to Stream
await new Promise((resolve, reject) => {
bufferStream
.pipe(csv())
.on('data', (data) => results.push(data))
.on('end', async () => {
console.log('CSV results', results);
resolve();
})
.on('error', (error) => reject(error));
})
await CompaniesDBApi.bulkImport(results, {
transaction,
ignoreDuplicates: true,
validate: true,
currentUser: req.currentUser
}); });
await transaction.commit(); await transaction.commit();
} catch (error) { } catch (error) {
await transaction.rollback(); await transaction.rollback();
console.error('Error processing batch:', error); throw error;
} }
} }
@ -142,4 +131,8 @@ module.exports = class CompaniesService {
throw error; throw error;
} }
} }
}; };