|
@@ -244,14 +244,27 @@ export class ImportService {
|
|
|
|
|
|
|
|
// exec
|
|
// exec
|
|
|
const { result, errors } = await this.execUnorderedBulkOpSafely(unorderedBulkOp);
|
|
const { result, errors } = await this.execUnorderedBulkOpSafely(unorderedBulkOp);
|
|
|
- const { insertedCount, modifiedCount } = result;
|
|
|
|
|
|
|
+ const {
|
|
|
|
|
+ insertedCount, modifiedCount, upsertedCount, matchedCount,
|
|
|
|
|
+ } = result;
|
|
|
const errorCount = errors?.length ?? 0;
|
|
const errorCount = errors?.length ?? 0;
|
|
|
- logger.debug(`Importing ${collectionName}. Inserted: ${insertedCount}. Modified: ${modifiedCount}. Failed: ${errorCount}.`);
|
|
|
|
|
- const increment = insertedCount + modifiedCount + errorCount;
|
|
|
|
|
|
|
+
|
|
|
|
|
+ // For upsert operations, count matched documents as modified
|
|
|
|
|
+ const actualModifiedCount = importSettings.mode === ImportMode.upsert
|
|
|
|
|
+ ? (matchedCount || 0) // In upsert mode, matchedCount indicates documents that were found and potentially updated
|
|
|
|
|
+ : modifiedCount;
|
|
|
|
|
+
|
|
|
|
|
+ const actualInsertedCount = importSettings.mode === ImportMode.upsert
|
|
|
|
|
+ ? (upsertedCount || 0) // In upsert mode, upsertedCount indicates newly created documents
|
|
|
|
|
+ : insertedCount;
|
|
|
|
|
+
|
|
|
|
|
+ logger.debug(`Importing ${collectionName}. Inserted: ${actualInsertedCount}. Modified: ${actualModifiedCount}. Failed: ${errorCount}.`
|
|
|
|
|
+ + ` (Raw: inserted=${insertedCount}, modified=${modifiedCount}, upserted=${upsertedCount}, matched=${matchedCount})`);
|
|
|
|
|
+ const increment = actualInsertedCount + actualModifiedCount + errorCount;
|
|
|
collectionProgress.currentCount += increment;
|
|
collectionProgress.currentCount += increment;
|
|
|
collectionProgress.totalCount += increment;
|
|
collectionProgress.totalCount += increment;
|
|
|
- collectionProgress.insertedCount += insertedCount;
|
|
|
|
|
- collectionProgress.modifiedCount += modifiedCount;
|
|
|
|
|
|
|
+ collectionProgress.insertedCount += actualInsertedCount;
|
|
|
|
|
+ collectionProgress.modifiedCount += actualModifiedCount;
|
|
|
this.emitProgressEvent(collectionProgress, errors);
|
|
this.emitProgressEvent(collectionProgress, errors);
|
|
|
// First aid to prevent unexplained memory leaks
|
|
// First aid to prevent unexplained memory leaks
|
|
|
try {
|
|
try {
|
|
@@ -276,6 +289,12 @@ export class ImportService {
|
|
|
|
|
|
|
|
await pipeline(readStream, jsonStream, convertStream, batchStream, writeStream);
|
|
await pipeline(readStream, jsonStream, convertStream, batchStream, writeStream);
|
|
|
|
|
|
|
|
|
|
+ // Ensure final progress event is emitted even when no data was processed
|
|
|
|
|
+ if (collectionProgress.currentCount === 0) {
|
|
|
|
|
+ logger.info(`No data processed for collection ${collectionName}. Emitting final progress event.`);
|
|
|
|
|
+ this.emitProgressEvent(collectionProgress, null);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
// clean up tmp directory
|
|
// clean up tmp directory
|
|
|
fs.unlinkSync(jsonFile);
|
|
fs.unlinkSync(jsonFile);
|
|
|
}
|
|
}
|