|
|
@@ -1,27 +1,34 @@
|
|
|
-/**
|
|
|
- * @typedef {import("@types/unzip-stream").Parse} Parse
|
|
|
- * @typedef {import("@types/unzip-stream").Entry} Entry
|
|
|
- */
|
|
|
+import fs from 'fs';
|
|
|
+import path from 'path';
|
|
|
+import type { EventEmitter } from 'stream';
|
|
|
+import { Writable, Transform } from 'stream';
|
|
|
|
|
|
-import { parseISO } from 'date-fns/parseISO';
|
|
|
+import JSONStream from 'JSONStream';
|
|
|
import gc from 'expose-gc/function';
|
|
|
-
|
|
|
+import type {
|
|
|
+ BulkWriteResult, MongoBulkWriteError, UnorderedBulkOperation, WriteError,
|
|
|
+} from 'mongodb';
|
|
|
+import type { Document } from 'mongoose';
|
|
|
+import mongoose from 'mongoose';
|
|
|
+import streamToPromise from 'stream-to-promise';
|
|
|
+import unzipStream from 'unzip-stream';
|
|
|
+
|
|
|
+import type Crowi from '~/server/crowi';
|
|
|
+import { setupIndependentModels } from '~/server/crowi/setup-models';
|
|
|
+import type CollectionProgress from '~/server/models/vo/collection-progress';
|
|
|
import loggerFactory from '~/utils/logger';
|
|
|
|
|
|
-const fs = require('fs');
|
|
|
-const path = require('path');
|
|
|
-const { Writable, Transform } = require('stream');
|
|
|
-
|
|
|
-const JSONStream = require('JSONStream');
|
|
|
-const isIsoDate = require('is-iso-date');
|
|
|
-const mongoose = require('mongoose');
|
|
|
-const streamToPromise = require('stream-to-promise');
|
|
|
-const unzipStream = require('unzip-stream');
|
|
|
+import CollectionProgressingStatus from '../../models/vo/collection-progressing-status';
|
|
|
+import { createBatchStream } from '../../util/batch-stream';
|
|
|
+import { configManager } from '../config-manager';
|
|
|
|
|
|
-const CollectionProgressingStatus = require('../models/vo/collection-progressing-status');
|
|
|
-const { createBatchStream } = require('../util/batch-stream');
|
|
|
+import type { ConvertMap } from './construct-convert-map';
|
|
|
+import { constructConvertMap } from './construct-convert-map';
|
|
|
+import { getModelFromCollectionName } from './get-model-from-collection-name';
|
|
|
+import { ImportMode } from './import-mode';
|
|
|
+import type { ImportSettings, OverwriteParams } from './import-settings';
|
|
|
+import { keepOriginal } from './overwrite-function';
|
|
|
|
|
|
-const { ObjectId } = mongoose.Types;
|
|
|
|
|
|
const logger = loggerFactory('growi:services:ImportService'); // eslint-disable-line no-unused-vars
|
|
|
|
|
|
@@ -29,18 +36,10 @@ const logger = loggerFactory('growi:services:ImportService'); // eslint-disable-
|
|
|
const BULK_IMPORT_SIZE = 100;
|
|
|
|
|
|
|
|
|
-export class ImportSettings {
|
|
|
-
|
|
|
- constructor(mode) {
|
|
|
- this.mode = mode || 'insert';
|
|
|
- this.jsonFileName = null;
|
|
|
- this.overwriteParams = null;
|
|
|
- }
|
|
|
-
|
|
|
-}
|
|
|
-
|
|
|
class ImportingCollectionError extends Error {
|
|
|
|
|
|
+ collectionProgress: CollectionProgress;
|
|
|
+
|
|
|
constructor(collectionProgress, error) {
|
|
|
super(error);
|
|
|
this.collectionProgress = collectionProgress;
|
|
|
@@ -49,83 +48,33 @@ class ImportingCollectionError extends Error {
|
|
|
}
|
|
|
|
|
|
|
|
|
-class ImportService {
|
|
|
+export class ImportService {
|
|
|
|
|
|
- constructor(crowi) {
|
|
|
- this.crowi = crowi;
|
|
|
- this.growiBridgeService = crowi.growiBridgeService;
|
|
|
- this.getFile = this.growiBridgeService.getFile.bind(this);
|
|
|
- this.baseDir = path.join(crowi.tmpDir, 'imports');
|
|
|
- this.keepOriginal = this.keepOriginal.bind(this);
|
|
|
+ private crowi: Crowi;
|
|
|
|
|
|
- this.adminEvent = crowi.event('admin');
|
|
|
+ private growiBridgeService: any;
|
|
|
|
|
|
- // { pages: { _id: ..., path: ..., ...}, users: { _id: ..., username: ..., }, ... }
|
|
|
- this.convertMap = {};
|
|
|
- this.initConvertMap(crowi.models);
|
|
|
+ private adminEvent: EventEmitter;
|
|
|
|
|
|
- this.currentProgressingStatus = null;
|
|
|
- }
|
|
|
+ private currentProgressingStatus: CollectionProgressingStatus | null;
|
|
|
|
|
|
- /**
|
|
|
- * generate ImportSettings instance
|
|
|
- * @param {string} mode bulk operation mode (insert | upsert | flushAndInsert)
|
|
|
- */
|
|
|
- generateImportSettings(mode) {
|
|
|
- return new ImportSettings(mode);
|
|
|
- }
|
|
|
+ private convertMap: ConvertMap;
|
|
|
|
|
|
- /**
|
|
|
- * initialize convert map. set keepOriginal as default
|
|
|
- *
|
|
|
- * @memberOf ImportService
|
|
|
- * @param {object} models from models/index.js
|
|
|
- */
|
|
|
- initConvertMap(models) {
|
|
|
- // by default, original value is used for imported documents
|
|
|
- for (const model of Object.values(models)) {
|
|
|
- if (model.collection == null) {
|
|
|
- continue;
|
|
|
- }
|
|
|
+ constructor(crowi: Crowi) {
|
|
|
+ this.crowi = crowi;
|
|
|
+ this.growiBridgeService = crowi.growiBridgeService;
|
|
|
|
|
|
- const collectionName = model.collection.name;
|
|
|
- this.convertMap[collectionName] = {};
|
|
|
+ this.adminEvent = crowi.event('admin');
|
|
|
|
|
|
- for (const key of Object.keys(model.schema.paths)) {
|
|
|
- this.convertMap[collectionName][key] = this.keepOriginal;
|
|
|
- }
|
|
|
- }
|
|
|
+ this.currentProgressingStatus = null;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * keep original value
|
|
|
- * automatically convert ObjectId
|
|
|
- *
|
|
|
- * @memberOf ImportService
|
|
|
- * @param {any} value value from imported document
|
|
|
- * @param {{ document: object, schema: object, propertyName: string }}
|
|
|
- * @return {any} new value for the document
|
|
|
- *
|
|
|
- * @see https://mongoosejs.com/docs/api/schematype.html#schematype_SchemaType-cast
|
|
|
- */
|
|
|
- keepOriginal(value, { document, schema, propertyName }) {
|
|
|
- // Model
|
|
|
- if (schema != null && schema.path(propertyName) != null) {
|
|
|
- const schemaType = schema.path(propertyName);
|
|
|
- return schemaType.cast(value);
|
|
|
- }
|
|
|
-
|
|
|
- // _id
|
|
|
- if (propertyName === '_id' && ObjectId.isValid(value)) {
|
|
|
- return ObjectId(value);
|
|
|
- }
|
|
|
-
|
|
|
- // Date
|
|
|
- if (isIsoDate(value)) {
|
|
|
- return parseISO(value);
|
|
|
- }
|
|
|
+ get baseDir(): string {
|
|
|
+ return path.join(this.crowi.tmpDir, 'imports');
|
|
|
+ }
|
|
|
|
|
|
- return value;
|
|
|
+ getFile(fileName: string): string {
|
|
|
+ return this.growiBridgeService.getFile(fileName, this.baseDir);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -138,8 +87,8 @@ class ImportService {
|
|
|
const zipFiles = fs.readdirSync(this.baseDir).filter(file => path.extname(file) === '.zip');
|
|
|
|
|
|
// process serially so as not to waste memory
|
|
|
- const zipFileStats = [];
|
|
|
- const parseZipFilePromises = zipFiles.map((file) => {
|
|
|
+ const zipFileStats: any[] = [];
|
|
|
+ const parseZipFilePromises: Promise<any>[] = zipFiles.map((file) => {
|
|
|
const zipFile = this.getFile(file);
|
|
|
return this.growiBridgeService.parseZipFile(zipFile);
|
|
|
});
|
|
|
@@ -153,8 +102,6 @@ class ImportService {
|
|
|
// sort with ctime("Change Time" - Time when file status was last changed (inode data modification).)
|
|
|
filtered.sort((a, b) => { return a.fileStat.ctime - b.fileStat.ctime });
|
|
|
|
|
|
- const isImporting = this.currentProgressingStatus != null;
|
|
|
-
|
|
|
const zipFileStat = filtered.pop();
|
|
|
let isTheSameVersion = false;
|
|
|
|
|
|
@@ -173,18 +120,27 @@ class ImportService {
|
|
|
return {
|
|
|
isTheSameVersion,
|
|
|
zipFileStat,
|
|
|
- isImporting,
|
|
|
- progressList: isImporting ? this.currentProgressingStatus.progressList : null,
|
|
|
+ isImporting: this.currentProgressingStatus != null,
|
|
|
+ progressList: this.currentProgressingStatus?.progressList ?? null,
|
|
|
};
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ async preImport() {
|
|
|
+ await setupIndependentModels();
|
|
|
+
|
|
|
+ // initialize convertMap
|
|
|
+ this.convertMap = constructConvertMap();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* import collections from json
|
|
|
- *
|
|
|
- * @param {string} collections MongoDB collection name
|
|
|
- * @param {array} importSettingsMap key: collection name, value: ImportSettings instance
|
|
|
+ * @param collections MongoDB collection name
|
|
|
+ * @param importSettingsMap
|
|
|
*/
|
|
|
- async import(collections, importSettingsMap) {
|
|
|
+ async import(collections: string[], importSettingsMap: { [collectionName: string]: ImportSettings }): Promise<void> {
|
|
|
+ await this.preImport();
|
|
|
+
|
|
|
// init status object
|
|
|
this.currentProgressingStatus = new CollectionProgressingStatus(collections);
|
|
|
|
|
|
@@ -208,9 +164,9 @@ class ImportService {
|
|
|
this.currentProgressingStatus = null;
|
|
|
this.emitTerminateEvent();
|
|
|
|
|
|
- await this.crowi.configManager.loadConfigs();
|
|
|
+ await configManager.loadConfigs();
|
|
|
|
|
|
- const currentIsV5Compatible = this.crowi.configManager.getConfig('crowi', 'app:isV5Compatible');
|
|
|
+ const currentIsV5Compatible = configManager.getConfig('crowi', 'app:isV5Compatible');
|
|
|
const isImportPagesCollection = collections.includes('pages');
|
|
|
const shouldNormalizePages = currentIsV5Compatible && isImportPagesCollection;
|
|
|
|
|
|
@@ -221,11 +177,12 @@ class ImportService {
|
|
|
* import a collection from json
|
|
|
*
|
|
|
* @memberOf ImportService
|
|
|
- * @param {string} collectionName MongoDB collection name
|
|
|
- * @param {ImportSettings} importSettings
|
|
|
- * @return {insertedIds: Array.<string>, failedIds: Array.<string>}
|
|
|
*/
|
|
|
- async importCollection(collectionName, importSettings) {
|
|
|
+ protected async importCollection(collectionName: string, importSettings: ImportSettings): Promise<void> {
|
|
|
+ if (this.currentProgressingStatus == null) {
|
|
|
+ throw new Error('Something went wrong: currentProgressingStatus is not initialized');
|
|
|
+ }
|
|
|
+
|
|
|
// prepare functions invoked from custom streams
|
|
|
const convertDocuments = this.convertDocuments.bind(this);
|
|
|
const bulkOperate = this.bulkOperate.bind(this);
|
|
|
@@ -244,7 +201,7 @@ class ImportService {
|
|
|
this.validateImportSettings(collectionName, importSettings);
|
|
|
|
|
|
// flush
|
|
|
- if (mode === 'flushAndInsert') {
|
|
|
+ if (mode === ImportMode.flushAndInsert) {
|
|
|
await collection.deleteMany({});
|
|
|
}
|
|
|
|
|
|
@@ -279,10 +236,13 @@ class ImportService {
|
|
|
});
|
|
|
|
|
|
// exec
|
|
|
- const { insertedCount, modifiedCount, errors } = await execUnorderedBulkOpSafely(unorderedBulkOp);
|
|
|
- logger.debug(`Importing ${collectionName}. Inserted: ${insertedCount}. Modified: ${modifiedCount}. Failed: ${errors.length}.`);
|
|
|
+ const { result, errors } = await execUnorderedBulkOpSafely(unorderedBulkOp);
|
|
|
+ const { insertedCount, modifiedCount } = result;
|
|
|
+ const errorCount = errors?.length ?? 0;
|
|
|
+
|
|
|
+ logger.debug(`Importing ${collectionName}. Inserted: ${insertedCount}. Modified: ${modifiedCount}. Failed: ${errorCount}.`);
|
|
|
|
|
|
- const increment = insertedCount + modifiedCount + errors.length;
|
|
|
+ const increment = insertedCount + modifiedCount + errorCount;
|
|
|
collectionProgress.currentCount += increment;
|
|
|
collectionProgress.totalCount += increment;
|
|
|
collectionProgress.insertedCount += insertedCount;
|
|
|
@@ -334,7 +294,7 @@ class ImportService {
|
|
|
|
|
|
switch (collectionName) {
|
|
|
case 'configs':
|
|
|
- if (mode !== 'flushAndInsert') {
|
|
|
+ if (mode !== ImportMode.flushAndInsert) {
|
|
|
throw new Error(`The specified mode '${mode}' is not allowed when importing to 'configs' collection.`);
|
|
|
}
|
|
|
break;
|
|
|
@@ -392,7 +352,7 @@ class ImportService {
|
|
|
async unzip(zipFile) {
|
|
|
const readStream = fs.createReadStream(zipFile);
|
|
|
const unzipStreamPipe = readStream.pipe(unzipStream.Parse());
|
|
|
- const files = [];
|
|
|
+ const files: string[] = [];
|
|
|
|
|
|
unzipStreamPipe.on('entry', (/** @type {Entry} */ entry) => {
|
|
|
const fileName = entry.path;
|
|
|
@@ -426,61 +386,53 @@ class ImportService {
|
|
|
* execute unorderedBulkOp and ignore errors
|
|
|
*
|
|
|
* @memberOf ImportService
|
|
|
- * @param {object} unorderedBulkOp result of Model.collection.initializeUnorderedBulkOp()
|
|
|
- * @return {object} e.g. { insertedCount: 10, errors: [...] }
|
|
|
*/
|
|
|
- async execUnorderedBulkOpSafely(unorderedBulkOp) {
|
|
|
- let errors = [];
|
|
|
- let result = null;
|
|
|
-
|
|
|
+ async execUnorderedBulkOpSafely(unorderedBulkOp: UnorderedBulkOperation): Promise<{ result: BulkWriteResult, errors?: WriteError[] }> {
|
|
|
try {
|
|
|
- const log = await unorderedBulkOp.execute();
|
|
|
- result = log.result;
|
|
|
+ return {
|
|
|
+ result: await unorderedBulkOp.execute(),
|
|
|
+ };
|
|
|
}
|
|
|
catch (err) {
|
|
|
- result = err.result;
|
|
|
- errors = err.writeErrors || [err];
|
|
|
- errors.map((err) => {
|
|
|
- const moreDetailErr = err.err;
|
|
|
- return { _id: moreDetailErr.op._id, message: err.errmsg };
|
|
|
- });
|
|
|
- }
|
|
|
+ const errTypeGuard = (err): err is MongoBulkWriteError => {
|
|
|
+ return 'result' in err && 'writeErrors' in err;
|
|
|
+ };
|
|
|
+
|
|
|
+ if (errTypeGuard(err)) {
|
|
|
+ return {
|
|
|
+ result: err.result,
|
|
|
+ errors: Array.isArray(err.writeErrors) ? err.writeErrors : [err.writeErrors],
|
|
|
+ };
|
|
|
+ }
|
|
|
|
|
|
- const insertedCount = result.nInserted + result.nUpserted;
|
|
|
- const modifiedCount = result.nModified;
|
|
|
+ logger.error('Failed to execute unorderedBulkOp and the error could not handled.', err);
|
|
|
+ throw new Error('Failed to execute unorderedBulkOp and the error could not handled.', err);
|
|
|
+ }
|
|
|
|
|
|
- return {
|
|
|
- insertedCount,
|
|
|
- modifiedCount,
|
|
|
- errors,
|
|
|
- };
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* execute unorderedBulkOp and ignore errors
|
|
|
*
|
|
|
* @memberOf ImportService
|
|
|
- * @param {string} collectionName
|
|
|
- * @param {object} document document being imported
|
|
|
- * @param {object} overwriteParams overwrite each document with unrelated value. e.g. { creator: req.user }
|
|
|
- * @return {object} document to be persisted
|
|
|
+ * @param collectionName
|
|
|
+ * @param document document being imported
|
|
|
+ * @returns document to be persisted
|
|
|
*/
|
|
|
- convertDocuments(collectionName, document, overwriteParams) {
|
|
|
- const Model = this.growiBridgeService.getModelFromCollectionName(collectionName);
|
|
|
- const schema = (Model != null) ? Model.schema : null;
|
|
|
+ convertDocuments<D extends Document>(collectionName: string, document: D, overwriteParams: OverwriteParams): D {
|
|
|
+ const Model = getModelFromCollectionName(collectionName);
|
|
|
+ const schema = (Model != null) ? Model.schema : undefined;
|
|
|
const convertMap = this.convertMap[collectionName];
|
|
|
|
|
|
- const _document = {};
|
|
|
+ const _document: D = structuredClone(document);
|
|
|
+
|
|
|
+ // apply keepOriginal to all of properties
|
|
|
+ Object.entries(document).forEach(([propertyName, value]) => {
|
|
|
+ _document[propertyName] = keepOriginal(value, { document, propertyName });
|
|
|
+ });
|
|
|
|
|
|
- // not Mongoose Model
|
|
|
- if (convertMap == null) {
|
|
|
- // apply keepOriginal to all of properties
|
|
|
- Object.entries(document).forEach(([propertyName, value]) => {
|
|
|
- _document[propertyName] = this.keepOriginal(value, { document, propertyName });
|
|
|
- });
|
|
|
- }
|
|
|
// Mongoose Model
|
|
|
- else {
|
|
|
+ if (convertMap != null) {
|
|
|
// assign value from documents being imported
|
|
|
Object.entries(convertMap).forEach(([propertyName, convertedValue]) => {
|
|
|
const value = document[propertyName];
|
|
|
@@ -537,5 +489,3 @@ class ImportService {
|
|
|
}
|
|
|
|
|
|
}
|
|
|
-
|
|
|
-module.exports = ImportService;
|