Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ed1476b36a |
@ -139,8 +139,6 @@ router.post('/bulk-import', wrapAsync(async (req, res) => {
|
||||
const referer = req.headers.referer || `${req.protocol}://${req.hostname}${req.originalUrl}`;
|
||||
const link = new URL(referer);
|
||||
await CompaniesService.bulkImport(req, res, true, link.host);
|
||||
const payload = true;
|
||||
res.status(200).send(payload);
|
||||
}));
|
||||
|
||||
/**
|
||||
|
||||
@ -7,10 +7,6 @@ const axios = require('axios');
|
||||
const config = require('../config');
|
||||
const stream = require('stream');
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
module.exports = class CompaniesService {
|
||||
static async create(data, currentUser) {
|
||||
const transaction = await db.sequelize.transaction();
|
||||
@ -31,37 +27,52 @@ module.exports = class CompaniesService {
|
||||
};
|
||||
|
||||
static async bulkImport(req, res, sendInvitationEmails = true, host) {
|
||||
await processFile(req, res);
|
||||
if (!req.file) {
|
||||
throw new Error('No file uploaded');
|
||||
}
|
||||
|
||||
// Return job ID immediately (asynchronous processing)
|
||||
const jobId = Date.now().toString();
|
||||
res.status(202).send({ jobId });
|
||||
|
||||
// Background processing
|
||||
this.processCsvInBackground(req, req.file.buffer, req.currentUser);
|
||||
}
|
||||
|
||||
static async processCsvInBackground(req, fileBuffer, currentUser) {
|
||||
const bufferStream = new stream.PassThrough();
|
||||
bufferStream.end(fileBuffer);
|
||||
|
||||
let batch = [];
|
||||
const BATCH_SIZE = 500;
|
||||
|
||||
for await (const data of bufferStream.pipe(csv())) {
|
||||
batch.push(data);
|
||||
if (batch.length >= BATCH_SIZE) {
|
||||
await this.processBatch(batch, currentUser);
|
||||
batch = [];
|
||||
}
|
||||
}
|
||||
if (batch.length > 0) {
|
||||
await this.processBatch(batch, currentUser);
|
||||
}
|
||||
console.log('Import finished');
|
||||
}
|
||||
|
||||
static async processBatch(batch, currentUser) {
|
||||
const transaction = await db.sequelize.transaction();
|
||||
|
||||
try {
|
||||
await processFile(req, res);
|
||||
const bufferStream = new stream.PassThrough();
|
||||
const results = [];
|
||||
|
||||
await bufferStream.end(Buffer.from(req.file.buffer, "utf-8")); // convert Buffer to Stream
|
||||
|
||||
await new Promise((resolve, reject) => {
|
||||
bufferStream
|
||||
.pipe(csv())
|
||||
.on('data', (data) => results.push(data))
|
||||
.on('end', async () => {
|
||||
console.log('CSV results', results);
|
||||
resolve();
|
||||
})
|
||||
.on('error', (error) => reject(error));
|
||||
})
|
||||
|
||||
await CompaniesDBApi.bulkImport(results, {
|
||||
transaction,
|
||||
ignoreDuplicates: true,
|
||||
validate: true,
|
||||
currentUser: req.currentUser
|
||||
await CompaniesDBApi.bulkImport(batch, {
|
||||
transaction,
|
||||
ignoreDuplicates: true,
|
||||
validate: true,
|
||||
currentUser: currentUser
|
||||
});
|
||||
|
||||
await transaction.commit();
|
||||
} catch (error) {
|
||||
await transaction.rollback();
|
||||
throw error;
|
||||
console.error('Error processing batch:', error);
|
||||
}
|
||||
}
|
||||
|
||||
@ -131,8 +142,4 @@ module.exports = class CompaniesService {
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
};
|
||||
|
||||
|
||||
};
|
||||
Loading…
x
Reference in New Issue
Block a user