|
|
@@ -6,7 +6,7 @@ import { Writable, Transform } from 'stream';
|
|
|
import JSONStream from 'JSONStream';
|
|
|
import gc from 'expose-gc/function';
|
|
|
import type {
|
|
|
- BulkWriteOperationError, BulkWriteResult, ObjectId, UnorderedBulkOperation,
|
|
|
+ BulkWriteResult, MongoBulkWriteError, UnorderedBulkOperation, WriteError,
|
|
|
} from 'mongodb';
|
|
|
import mongoose from 'mongoose';
|
|
|
import streamToPromise from 'stream-to-promise';
|
|
|
@@ -24,6 +24,7 @@ import { configManager } from '../config-manager';
|
|
|
import type { ConvertMap } from './construct-convert-map';
|
|
|
import { constructConvertMap } from './construct-convert-map';
|
|
|
import { getModelFromCollectionName } from './get-model-from-collection-name';
|
|
|
+import type { ImportSettings } from './import-settings';
|
|
|
import { keepOriginal } from './overwrite-function';
|
|
|
|
|
|
|
|
|
@@ -60,18 +61,20 @@ export class ImportService {
|
|
|
constructor(crowi: Crowi) {
|
|
|
this.crowi = crowi;
|
|
|
this.growiBridgeService = crowi.growiBridgeService;
|
|
|
- // this.getFile = this.growiBridgeService.getFile.bind(this);
|
|
|
- // this.baseDir = path.join(crowi.tmpDir, 'imports');
|
|
|
|
|
|
this.adminEvent = crowi.event('admin');
|
|
|
|
|
|
this.currentProgressingStatus = null;
|
|
|
}
|
|
|
|
|
|
- private get baseDir(): string {
|
|
|
+ get baseDir(): string {
|
|
|
return path.join(this.crowi.tmpDir, 'imports');
|
|
|
}
|
|
|
|
|
|
+ getFile(fileName: string): string {
|
|
|
+ return this.growiBridgeService.getFile(fileName, this.baseDir);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* parse all zip files in downloads dir
|
|
|
*
|
|
|
@@ -84,7 +87,7 @@ export class ImportService {
|
|
|
// process serially so as not to waste memory
|
|
|
const zipFileStats: any[] = [];
|
|
|
const parseZipFilePromises: Promise<any>[] = zipFiles.map((file) => {
|
|
|
- const zipFile = this.growiBridgeService.getFile(file);
|
|
|
+ const zipFile = this.getFile(file);
|
|
|
return this.growiBridgeService.parseZipFile(zipFile);
|
|
|
});
|
|
|
for await (const stat of parseZipFilePromises) {
|
|
|
@@ -130,12 +133,10 @@ export class ImportService {
|
|
|
|
|
|
/**
|
|
|
* import collections from json
|
|
|
- *
|
|
|
- * @param {string[]} collections MongoDB collection name
|
|
|
- * @param {{ [collectionName: string]: ImportSettings }} importSettingsMap key: collection name, value: ImportSettings instance
|
|
|
- * @return {Promise<void>}
|
|
|
+ * @param collections MongoDB collection name
|
|
|
+ * @param importSettingsMap
|
|
|
*/
|
|
|
- async import(collections, importSettingsMap) {
|
|
|
+ async import(collections: string[], importSettingsMap: { [collectionName: string]: ImportSettings }): Promise<void> {
|
|
|
await this.preImport();
|
|
|
|
|
|
// init status object
|
|
|
@@ -174,11 +175,8 @@ export class ImportService {
|
|
|
* import a collection from json
|
|
|
*
|
|
|
* @memberOf ImportService
|
|
|
- * @param {string} collectionName MongoDB collection name
|
|
|
- * @param {ImportSettings} importSettings
|
|
|
- * @return {insertedIds: Array.<string>, failedIds: Array.<string>}
|
|
|
*/
|
|
|
- async importCollection(collectionName, importSettings) {
|
|
|
+ protected async importCollection(collectionName: string, importSettings: ImportSettings): Promise<void> {
|
|
|
if (this.currentProgressingStatus == null) {
|
|
|
throw new Error('Something went wrong: currentProgressingStatus is not initialized');
|
|
|
}
|
|
|
@@ -195,7 +193,7 @@ export class ImportService {
|
|
|
const collectionProgress = this.currentProgressingStatus.progressMap[collectionName];
|
|
|
|
|
|
try {
|
|
|
- const jsonFile = this.growiBridgeService.getFile(jsonFileName);
|
|
|
+ const jsonFile = this.getFile(jsonFileName);
|
|
|
|
|
|
// validate options
|
|
|
this.validateImportSettings(collectionName, importSettings);
|
|
|
@@ -236,10 +234,13 @@ export class ImportService {
|
|
|
});
|
|
|
|
|
|
// exec
|
|
|
- const { insertedCount, modifiedCount, errors } = await execUnorderedBulkOpSafely(unorderedBulkOp);
|
|
|
- logger.debug(`Importing ${collectionName}. Inserted: ${insertedCount}. Modified: ${modifiedCount}. Failed: ${errors.length}.`);
|
|
|
+ const { result, errors } = await execUnorderedBulkOpSafely(unorderedBulkOp);
|
|
|
+ const { insertedCount, modifiedCount } = result;
|
|
|
+ const errorCount = errors?.length ?? 0;
|
|
|
+
|
|
|
+ logger.debug(`Importing ${collectionName}. Inserted: ${insertedCount}. Modified: ${modifiedCount}. Failed: ${errorCount}.`);
|
|
|
|
|
|
- const increment = insertedCount + modifiedCount + errors.length;
|
|
|
+ const increment = insertedCount + modifiedCount + errorCount;
|
|
|
collectionProgress.currentCount += increment;
|
|
|
collectionProgress.totalCount += increment;
|
|
|
collectionProgress.insertedCount += insertedCount;
|
|
|
@@ -384,45 +385,27 @@ export class ImportService {
|
|
|
*
|
|
|
* @memberOf ImportService
|
|
|
*/
|
|
|
- async execUnorderedBulkOpSafely(unorderedBulkOp: UnorderedBulkOperation): Promise<{ insertedCount: number, modifiedCount: number, errors: unknown[] }> {
|
|
|
- let errors: unknown[] = [];
|
|
|
- let log: BulkWriteResult | null = null;
|
|
|
-
|
|
|
+ async execUnorderedBulkOpSafely(unorderedBulkOp: UnorderedBulkOperation): Promise<{ result: BulkWriteResult, errors?: WriteError[] }> {
|
|
|
try {
|
|
|
- log = await unorderedBulkOp.execute();
|
|
|
+ return {
|
|
|
+ result: await unorderedBulkOp.execute(),
|
|
|
+ };
|
|
|
}
|
|
|
catch (err) {
|
|
|
-
|
|
|
- const _errs = Array.isArray(err.writeErrors) ? err : [err];
|
|
|
-
|
|
|
- const errTypeGuard = (err: any): err is BulkWriteOperationError => {
|
|
|
- return 'index' in err;
|
|
|
+ const errTypeGuard = (err): err is MongoBulkWriteError => {
|
|
|
+ return 'result' in err && 'writeErrors' in err;
|
|
|
};
|
|
|
- const docTypeGuard = (op: any): op is { _id: ObjectId } => {
|
|
|
- return '_id' in op;
|
|
|
- };
|
|
|
-
|
|
|
- errors = _errs.map((e) => {
|
|
|
- if (errTypeGuard(e)) {
|
|
|
- const { op } = e;
|
|
|
- return {
|
|
|
- _id: docTypeGuard(op) ? op._id : undefined,
|
|
|
- message: err.errmsg,
|
|
|
- };
|
|
|
- }
|
|
|
- return err;
|
|
|
- });
|
|
|
- }
|
|
|
|
|
|
- assert(log != null);
|
|
|
- const insertedCount = log.nInserted + log.nUpserted;
|
|
|
- const modifiedCount = log.nModified;
|
|
|
+ if (errTypeGuard(err)) {
|
|
|
+ return {
|
|
|
+ result: err.result,
|
|
|
+ errors: Array.isArray(err.writeErrors) ? err.writeErrors : [err.writeErrors],
|
|
|
+ };
|
|
|
+ }
|
|
|
|
|
|
- return {
|
|
|
- insertedCount,
|
|
|
- modifiedCount,
|
|
|
- errors,
|
|
|
- };
|
|
|
+ logger.error('Failed to execute unorderedBulkOp and the error could not handled.', err);
|
|
|
+ throw new Error('Failed to execute unorderedBulkOp and the error could not handled.', err);
|
|
|
+ }
|
|
|
|
|
|
}
|
|
|
|