|
|
@@ -1,13 +1,13 @@
|
|
|
import fs from 'fs';
|
|
|
import path from 'path';
|
|
|
import type { EventEmitter } from 'stream';
|
|
|
-import { Writable, Transform, pipeline } from 'stream';
|
|
|
-import { finished, pipeline as pipelinePromise } from 'stream/promises';
|
|
|
+import { Writable, Transform } from 'stream';
|
|
|
+import { pipeline } from 'stream/promises';
|
|
|
|
|
|
import JSONStream from 'JSONStream';
|
|
|
import gc from 'expose-gc/function';
|
|
|
import type {
|
|
|
- BulkWriteResult, MongoBulkWriteError, UnorderedBulkOperation, WriteError,
|
|
|
+ BulkWriteResult, MongoBulkWriteError, UnorderedBulkOperation, WriteError, BulkOperationBase,
|
|
|
} from 'mongodb';
|
|
|
import type { Document } from 'mongoose';
|
|
|
import mongoose from 'mongoose';
|
|
|
@@ -51,6 +51,8 @@ class ImportingCollectionError extends Error {
|
|
|
|
|
|
export class ImportService {
|
|
|
|
|
|
+ private modelCache: Map<string, { Model: any, schema: any }> = new Map();
|
|
|
+
|
|
|
private crowi: Crowi;
|
|
|
|
|
|
private growiBridgeService: any;
|
|
|
@@ -59,7 +61,7 @@ export class ImportService {
|
|
|
|
|
|
private currentProgressingStatus: CollectionProgressingStatus | null;
|
|
|
|
|
|
- private convertMap: ConvertMap;
|
|
|
+ private convertMap: ConvertMap | undefined;
|
|
|
|
|
|
constructor(crowi: Crowi) {
|
|
|
this.crowi = crowi;
|
|
|
@@ -139,7 +141,7 @@ export class ImportService {
|
|
|
* @param collections MongoDB collection name
|
|
|
* @param importSettingsMap
|
|
|
*/
|
|
|
- async import(collections: string[], importSettingsMap: { [collectionName: string]: ImportSettings }): Promise<void> {
|
|
|
+ async import(collections: string[], importSettingsMap: Map<string, ImportSettings>): Promise<void> {
|
|
|
await this.preImport();
|
|
|
|
|
|
// init status object
|
|
|
@@ -147,7 +149,10 @@ export class ImportService {
|
|
|
|
|
|
// process serially so as not to waste memory
|
|
|
const promises = collections.map((collectionName) => {
|
|
|
- const importSettings = importSettingsMap[collectionName];
|
|
|
+ const importSettings = importSettingsMap.get(collectionName);
|
|
|
+ if (importSettings == null) {
|
|
|
+ throw new Error(`ImportSettings for ${collectionName} is not found`);
|
|
|
+ }
|
|
|
return this.importCollection(collectionName, importSettings);
|
|
|
});
|
|
|
for await (const promise of promises) {
|
|
|
@@ -172,6 +177,10 @@ export class ImportService {
|
|
|
const shouldNormalizePages = currentIsV5Compatible && isImportPagesCollection;
|
|
|
|
|
|
if (shouldNormalizePages) await this.crowi.pageService.normalizeAllPublicPages();
|
|
|
+
|
|
|
+ // Release caches after import process
|
|
|
+ this.modelCache.clear();
|
|
|
+ this.convertMap = undefined;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -183,13 +192,7 @@ export class ImportService {
|
|
|
if (this.currentProgressingStatus == null) {
|
|
|
throw new Error('Something went wrong: currentProgressingStatus is not initialized');
|
|
|
}
|
|
|
-
|
|
|
- // prepare functions invoked from custom streams
|
|
|
- const convertDocuments = this.convertDocuments.bind(this);
|
|
|
- const bulkOperate = this.bulkOperate.bind(this);
|
|
|
- const execUnorderedBulkOpSafely = this.execUnorderedBulkOpSafely.bind(this);
|
|
|
- const emitProgressEvent = this.emitProgressEvent.bind(this);
|
|
|
-
|
|
|
+ // Avoid closure references by passing direct method references
|
|
|
const collection = mongoose.connection.collection(collectionName);
|
|
|
|
|
|
const { mode, jsonFileName, overwriteParams } = importSettings;
|
|
|
@@ -215,52 +218,71 @@ export class ImportService {
|
|
|
// stream 3
|
|
|
const convertStream = new Transform({
|
|
|
objectMode: true,
|
|
|
- transform(doc, encoding, callback) {
|
|
|
- const converted = convertDocuments(collectionName, doc, overwriteParams);
|
|
|
- this.push(converted);
|
|
|
- callback();
|
|
|
+ transform(this: Transform, doc, encoding, callback) {
|
|
|
+ try {
|
|
|
+ // Direct reference to convertDocuments
|
|
|
+ const converted = (importSettings as any).service.convertDocuments(collectionName, doc, overwriteParams);
|
|
|
+ this.push(converted);
|
|
|
+ callback();
|
|
|
+ }
|
|
|
+ catch (error) {
|
|
|
+ callback(error);
|
|
|
+ }
|
|
|
},
|
|
|
});
|
|
|
+ // Reference for importService within Transform
|
|
|
+ (importSettings as any).service = this;
|
|
|
|
|
|
// stream 4
|
|
|
const batchStream = createBatchStream(BULK_IMPORT_SIZE);
|
|
|
-
|
|
|
- // stream 5
|
|
|
const writeStream = new Writable({
|
|
|
objectMode: true,
|
|
|
- async write(batch, encoding, callback) {
|
|
|
- const unorderedBulkOp = collection.initializeUnorderedBulkOp();
|
|
|
-
|
|
|
- // documents are not persisted until unorderedBulkOp.execute()
|
|
|
- batch.forEach((document) => {
|
|
|
- bulkOperate(unorderedBulkOp, collectionName, document, importSettings);
|
|
|
- });
|
|
|
-
|
|
|
- // exec
|
|
|
- const { result, errors } = await execUnorderedBulkOpSafely(unorderedBulkOp);
|
|
|
- const { insertedCount, modifiedCount } = result;
|
|
|
- const errorCount = errors?.length ?? 0;
|
|
|
-
|
|
|
- logger.debug(`Importing ${collectionName}. Inserted: ${insertedCount}. Modified: ${modifiedCount}. Failed: ${errorCount}.`);
|
|
|
-
|
|
|
- const increment = insertedCount + modifiedCount + errorCount;
|
|
|
- collectionProgress.currentCount += increment;
|
|
|
- collectionProgress.totalCount += increment;
|
|
|
- collectionProgress.insertedCount += insertedCount;
|
|
|
- collectionProgress.modifiedCount += modifiedCount;
|
|
|
-
|
|
|
- emitProgressEvent(collectionProgress, errors);
|
|
|
-
|
|
|
+ write: async(batch, encoding, callback) => {
|
|
|
try {
|
|
|
+ const unorderedBulkOp = collection.initializeUnorderedBulkOp();
|
|
|
+ // documents are not persisted until unorderedBulkOp.execute()
|
|
|
+ batch.forEach((document) => {
|
|
|
+ this.bulkOperate(unorderedBulkOp, collectionName, document, importSettings);
|
|
|
+ });
|
|
|
+
|
|
|
+ // exec
|
|
|
+ const { result, errors } = await this.execUnorderedBulkOpSafely(unorderedBulkOp);
|
|
|
+ const {
|
|
|
+ insertedCount, modifiedCount, upsertedCount, matchedCount,
|
|
|
+ } = result;
|
|
|
+ const errorCount = errors?.length ?? 0;
|
|
|
+
|
|
|
+ // For upsert operations, count matched documents as modified
|
|
|
+ const actualModifiedCount = importSettings.mode === ImportMode.upsert
|
|
|
+ ? (matchedCount || 0) // In upsert mode, matchedCount indicates documents that were found and potentially updated
|
|
|
+ : modifiedCount;
|
|
|
+
|
|
|
+ const actualInsertedCount = importSettings.mode === ImportMode.upsert
|
|
|
+ ? (upsertedCount || 0) // In upsert mode, upsertedCount indicates newly created documents
|
|
|
+ : insertedCount;
|
|
|
+
|
|
|
+ logger.debug(`Importing ${collectionName}. Inserted: ${actualInsertedCount}. Modified: ${actualModifiedCount}. Failed: ${errorCount}.`
|
|
|
+ + ` (Raw: inserted=${insertedCount}, modified=${modifiedCount}, upserted=${upsertedCount}, matched=${matchedCount})`);
|
|
|
+ const increment = actualInsertedCount + actualModifiedCount + errorCount;
|
|
|
+ collectionProgress.currentCount += increment;
|
|
|
+ collectionProgress.totalCount += increment;
|
|
|
+ collectionProgress.insertedCount += actualInsertedCount;
|
|
|
+ collectionProgress.modifiedCount += actualModifiedCount;
|
|
|
+ this.emitProgressEvent(collectionProgress, errors);
|
|
|
// First aid to prevent unexplained memory leaks
|
|
|
- logger.info('global.gc() invoked.');
|
|
|
- gc();
|
|
|
+ try {
|
|
|
+ logger.info('global.gc() invoked.');
|
|
|
+ gc();
|
|
|
+ }
|
|
|
+ catch (err) {
|
|
|
+ logger.error('fail garbage collection: ', err);
|
|
|
+ }
|
|
|
+ callback();
|
|
|
}
|
|
|
catch (err) {
|
|
|
- logger.error('fail garbage collection: ', err);
|
|
|
+ logger.error('Error in writeStream:', err);
|
|
|
+ callback(err);
|
|
|
}
|
|
|
-
|
|
|
- callback();
|
|
|
},
|
|
|
final(callback) {
|
|
|
logger.info(`Importing ${collectionName} has completed.`);
|
|
|
@@ -268,7 +290,13 @@ export class ImportService {
|
|
|
},
|
|
|
});
|
|
|
|
|
|
- await pipelinePromise(readStream, jsonStream, convertStream, batchStream, writeStream);
|
|
|
+ await pipeline(readStream, jsonStream, convertStream, batchStream, writeStream);
|
|
|
+
|
|
|
+ // Ensure final progress event is emitted even when no data was processed
|
|
|
+ if (collectionProgress.currentCount === 0) {
|
|
|
+ logger.info(`No data processed for collection ${collectionName}. Emitting final progress event.`);
|
|
|
+ this.emitProgressEvent(collectionProgress, null);
|
|
|
+ }
|
|
|
|
|
|
// clean up tmp directory
|
|
|
fs.unlinkSync(jsonFile);
|
|
|
@@ -276,15 +304,9 @@ export class ImportService {
|
|
|
catch (err) {
|
|
|
throw new ImportingCollectionError(collectionProgress, err);
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- *
|
|
|
- * @param {string} collectionName
|
|
|
- * @param {importSettings} importSettings
|
|
|
- */
|
|
|
- validateImportSettings(collectionName, importSettings) {
|
|
|
+ validateImportSettings(collectionName: string, importSettings: ImportSettings): void {
|
|
|
const { mode } = importSettings;
|
|
|
|
|
|
switch (collectionName) {
|
|
|
@@ -298,15 +320,18 @@ export class ImportService {
|
|
|
|
|
|
/**
|
|
|
* process bulk operation
|
|
|
- * @param bulk MongoDB Bulk instance
|
|
|
- * @param collectionName collection name
|
|
|
*/
|
|
|
- bulkOperate(bulk, collectionName: string, document, importSettings: ImportSettings) {
|
|
|
+ bulkOperate(
|
|
|
+ bulk: UnorderedBulkOperation,
|
|
|
+ collectionName: string,
|
|
|
+ document: Record<string, unknown>,
|
|
|
+ importSettings: ImportSettings,
|
|
|
+ ): BulkOperationBase | void {
|
|
|
// insert
|
|
|
if (importSettings.mode !== ImportMode.upsert) {
|
|
|
+ // Optimization such as splitting and adding large documents can be considered
|
|
|
return bulk.insert(document);
|
|
|
}
|
|
|
-
|
|
|
// upsert
|
|
|
switch (collectionName) {
|
|
|
case 'pages':
|
|
|
@@ -321,7 +346,7 @@ export class ImportService {
|
|
|
* @param {CollectionProgress} collectionProgress
|
|
|
* @param {object} appendedErrors key: collection name, value: array of error object
|
|
|
*/
|
|
|
- emitProgressEvent(collectionProgress, appendedErrors) {
|
|
|
+ emitProgressEvent(collectionProgress: CollectionProgress, appendedErrors: any): void {
|
|
|
const { collectionName } = collectionProgress;
|
|
|
|
|
|
// send event (in progress in global)
|
|
|
@@ -331,7 +356,7 @@ export class ImportService {
|
|
|
/**
|
|
|
* emit terminate event
|
|
|
*/
|
|
|
- emitTerminateEvent() {
|
|
|
+ emitTerminateEvent(): void {
|
|
|
this.adminEvent.emit('onTerminateForImport');
|
|
|
}
|
|
|
|
|
|
@@ -342,13 +367,12 @@ export class ImportService {
|
|
|
* @param {string} zipFile absolute path to zip file
|
|
|
* @return {Array.<string>} array of absolute paths to extracted files
|
|
|
*/
|
|
|
- async unzip(zipFile) {
|
|
|
+ async unzip(zipFile: string): Promise<string[]> {
|
|
|
const readStream = fs.createReadStream(zipFile);
|
|
|
const parseStream = unzipStream.Parse();
|
|
|
- const unzipEntryStream = pipeline(readStream, parseStream, () => {});
|
|
|
- const files: string[] = [];
|
|
|
+ const entryPromises: Promise<string | null>[] = [];
|
|
|
|
|
|
- unzipEntryStream.on('entry', (/** @type {Entry} */ entry) => {
|
|
|
+ parseStream.on('entry', (/** @type {Entry} */ entry) => {
|
|
|
const fileName = entry.path;
|
|
|
// https://regex101.com/r/mD4eZs/6
|
|
|
// prevent from unexpecting attack doing unzip file (path traversal attack)
|
|
|
@@ -356,6 +380,7 @@ export class ImportService {
|
|
|
// ../../src/server/example.html
|
|
|
if (fileName.match(/(\.\.\/|\.\.\\)/)) {
|
|
|
logger.error('File path is not appropriate.', fileName);
|
|
|
+ entry.autodrain();
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
@@ -364,16 +389,28 @@ export class ImportService {
|
|
|
entry.autodrain();
|
|
|
}
|
|
|
else {
|
|
|
- const jsonFile = path.join(this.baseDir, fileName);
|
|
|
- const writeStream = fs.createWriteStream(jsonFile, { encoding: this.growiBridgeService.getEncoding() });
|
|
|
- pipeline(entry, writeStream, () => {});
|
|
|
- files.push(jsonFile);
|
|
|
+ const entryPromise = new Promise<string | null>((resolve) => {
|
|
|
+ const jsonFile = path.join(this.baseDir, fileName);
|
|
|
+ const writeStream = fs.createWriteStream(jsonFile, { encoding: this.growiBridgeService.getEncoding() });
|
|
|
+
|
|
|
+ pipeline(entry, writeStream)
|
|
|
+ .then(() => resolve(jsonFile))
|
|
|
+ .catch((err) => {
|
|
|
+ logger.error('Failed to extract entry:', err);
|
|
|
+ resolve(null); // Continue processing other entries
|
|
|
+ });
|
|
|
+ });
|
|
|
+
|
|
|
+ entryPromises.push(entryPromise);
|
|
|
}
|
|
|
});
|
|
|
|
|
|
- await finished(unzipEntryStream);
|
|
|
+ await pipeline(readStream, parseStream);
|
|
|
+ const results = await Promise.allSettled(entryPromises);
|
|
|
|
|
|
- return files;
|
|
|
+ return results
|
|
|
+ .filter((result): result is PromiseFulfilledResult<string> => result.status === 'fulfilled' && result.value !== null)
|
|
|
+ .map(result => result.value);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -414,32 +451,32 @@ export class ImportService {
|
|
|
* @returns document to be persisted
|
|
|
*/
|
|
|
convertDocuments<D extends Document>(collectionName: string, document: D, overwriteParams: OverwriteParams): D {
|
|
|
- const Model = getModelFromCollectionName(collectionName);
|
|
|
- const schema = (Model != null) ? Model.schema : undefined;
|
|
|
- const convertMap = this.convertMap[collectionName];
|
|
|
+ // Model and schema cache (optimization)
|
|
|
+ if (!this.modelCache) {
|
|
|
+ this.modelCache = new Map();
|
|
|
+ }
|
|
|
|
|
|
- const _document: D = structuredClone(document);
|
|
|
+ let modelInfo = this.modelCache.get(collectionName);
|
|
|
+ if (!modelInfo) {
|
|
|
+ const Model = getModelFromCollectionName(collectionName);
|
|
|
+ const schema = (Model != null) ? Model.schema : undefined;
|
|
|
+ modelInfo = { Model, schema };
|
|
|
+ this.modelCache.set(collectionName, modelInfo);
|
|
|
+ }
|
|
|
|
|
|
- // apply keepOriginal to all of properties
|
|
|
- Object.entries(document).forEach(([propertyName, value]) => {
|
|
|
- _document[propertyName] = keepOriginal(value, { document, propertyName });
|
|
|
- });
|
|
|
+ const { schema } = modelInfo;
|
|
|
+ const convertMap = this.convertMap?.[collectionName];
|
|
|
|
|
|
- // Mongoose Model
|
|
|
- if (convertMap != null) {
|
|
|
- // assign value from documents being imported
|
|
|
- Object.entries(convertMap).forEach(([propertyName, convertedValue]) => {
|
|
|
- const value = document[propertyName];
|
|
|
+ // Use shallow copy instead of structuredClone() when sufficient
|
|
|
+ const _document: D = (typeof document === 'object' && document !== null && !Array.isArray(document)) ? { ...document } : structuredClone(document);
|
|
|
|
|
|
- // distinguish between null and undefined
|
|
|
- if (value === undefined) {
|
|
|
- return; // next entry
|
|
|
- }
|
|
|
+ Object.entries(document).forEach(([propertyName, value]) => {
|
|
|
+ // Check if there's a custom convert function for this property, otherwise use keepOriginal
|
|
|
+ const convertedValue = convertMap?.[propertyName];
|
|
|
+ const convertFunc = (convertedValue != null && typeof convertedValue === 'function') ? convertedValue : keepOriginal;
|
|
|
|
|
|
- const convertFunc = (typeof convertedValue === 'function') ? convertedValue : null;
|
|
|
- _document[propertyName] = (convertFunc != null) ? convertFunc(value, { document, propertyName, schema }) : convertedValue;
|
|
|
- });
|
|
|
- }
|
|
|
+ _document[propertyName] = convertFunc(value, { document, propertyName, schema });
|
|
|
+ });
|
|
|
|
|
|
// overwrite documents with custom values
|
|
|
Object.entries(overwriteParams).forEach(([propertyName, overwriteValue]) => {
|
|
|
@@ -451,7 +488,6 @@ export class ImportService {
|
|
|
_document[propertyName] = (overwriteFunc != null) ? overwriteFunc(value, { document: _document, propertyName, schema }) : overwriteValue;
|
|
|
}
|
|
|
});
|
|
|
-
|
|
|
return _document;
|
|
|
}
|
|
|
|
|
|
@@ -463,7 +499,7 @@ export class ImportService {
|
|
|
* @memberOf ImportService
|
|
|
* @param {object} meta meta data from meta.json
|
|
|
*/
|
|
|
- validate(meta) {
|
|
|
+ validate(meta: any): void {
|
|
|
if (meta.version !== getGrowiVersion()) {
|
|
|
throw new Error('The version of this GROWI and the uploaded GROWI data are not the same');
|
|
|
}
|
|
|
@@ -476,7 +512,7 @@ export class ImportService {
|
|
|
/**
|
|
|
* Delete all uploaded files
|
|
|
*/
|
|
|
- deleteAllZipFiles() {
|
|
|
+ deleteAllZipFiles(): void {
|
|
|
fs.readdirSync(this.baseDir)
|
|
|
.filter(file => path.extname(file) === '.zip')
|
|
|
.forEach(file => fs.unlinkSync(path.join(this.baseDir, file)));
|