import.js 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. const logger = require('@alias/logger')('growi:services:ImportService'); // eslint-disable-line no-unused-vars
  2. const fs = require('fs');
  3. const path = require('path');
  4. const JSONStream = require('JSONStream');
  5. const streamToPromise = require('stream-to-promise');
  6. const unzipper = require('unzipper');
  7. const { ObjectId } = require('mongoose').Types;
  8. class ImportService {
  9. constructor(crowi) {
  10. this.crowi = crowi;
  11. this.growiBridgeService = crowi.growiBridgeService;
  12. this.getFile = this.growiBridgeService.getFile.bind(this);
  13. this.baseDir = path.join(crowi.tmpDir, 'imports');
  14. this.per = 100;
  15. this.keepOriginal = this.keepOriginal.bind(this);
  16. // { pages: { _id: ..., path: ..., ...}, users: { _id: ..., username: ..., }, ... }
  17. this.convertMap = {};
  18. this.initConvertMap(crowi.models);
  19. }
  20. /**
  21. * initialize convert map. set keepOriginal as default
  22. *
  23. * @memberOf ImportService
  24. * @param {object} models from models/index.js
  25. */
  26. initConvertMap(models) {
  27. // by default, original value is used for imported documents
  28. for (const model of Object.values(models)) {
  29. const { collectionName } = model.collection;
  30. this.convertMap[collectionName] = {};
  31. for (const key of Object.keys(model.schema.paths)) {
  32. this.convertMap[collectionName][key] = this.keepOriginal;
  33. }
  34. }
  35. }
  36. /**
  37. * keep original value
  38. * automatically convert ObjectId
  39. *
  40. * @memberOf ImportService
  41. * @param {any} _value value from imported document
  42. * @param {{ _document: object, schema: object, key: string }}
  43. * @return {any} new value for the document
  44. */
  45. keepOriginal(_value, { _document, schema, key }) {
  46. let value;
  47. if (schema[key].instance === 'ObjectID' && ObjectId.isValid(_value)) {
  48. value = ObjectId(_value);
  49. }
  50. else {
  51. value = _value;
  52. }
  53. return value;
  54. }
  55. /**
  56. * import a collection from json
  57. *
  58. * @memberOf ImportService
  59. * @param {object} Model instance of mongoose model
  60. * @param {string} jsonFile absolute path to the jsonFile being imported
  61. * @param {object} overwriteParams overwrite each document with unrelated value. e.g. { creator: req.user }
  62. */
  63. async import(Model, jsonFile, overwriteParams = {}) {
  64. const { collectionName } = Model.collection;
  65. let counter = 0;
  66. let nInsertedTotal = 0;
  67. let failedIds = [];
  68. let unorderedBulkOp = Model.collection.initializeUnorderedBulkOp();
  69. const readStream = fs.createReadStream(jsonFile, { encoding: this.growiBridgeService.getEncoding() });
  70. const jsonStream = readStream.pipe(JSONStream.parse('*'));
  71. jsonStream.on('data', async(document) => {
  72. // documents are not persisted until unorderedBulkOp.execute()
  73. unorderedBulkOp.insert(this.convertDocuments(Model, document, overwriteParams));
  74. counter++;
  75. if (counter % this.per === 0) {
  76. // puase jsonStream to prevent more items to be added to unorderedBulkOp
  77. jsonStream.pause();
  78. const { nInserted, failed } = await this.execUnorderedBulkOpSafely(unorderedBulkOp);
  79. nInsertedTotal += nInserted;
  80. failedIds = [...failedIds, ...failed];
  81. // reset initializeUnorderedBulkOp
  82. unorderedBulkOp = Model.collection.initializeUnorderedBulkOp();
  83. // resume jsonStream
  84. jsonStream.resume();
  85. }
  86. });
  87. jsonStream.on('end', async(data) => {
  88. // insert the rest. avoid errors when unorderedBulkOp has no items
  89. if (unorderedBulkOp.s.currentBatch !== null) {
  90. const { nInserted, failed } = await this.execUnorderedBulkOpSafely(unorderedBulkOp);
  91. nInsertedTotal += nInserted;
  92. failedIds = [...failedIds, ...failed];
  93. }
  94. logger.info(`Done. Inserted ${nInsertedTotal} ${collectionName}.`);
  95. if (failedIds.length > 0) {
  96. logger.error(`Failed to insert ${failedIds.length} ${collectionName}: ${failedIds.join(', ')}.`);
  97. }
  98. });
  99. // streamToPromise(jsonStream) throws error, so await readStream instead
  100. await streamToPromise(readStream);
  101. // clean up tmp directory
  102. fs.unlinkSync(jsonFile);
  103. }
  104. /**
  105. * extract a zip file
  106. *
  107. * @memberOf ImportService
  108. * @param {string} zipFile absolute path to zip file
  109. * @return {Array.<string>} array of absolute paths to extracted files
  110. */
  111. async unzip(zipFile) {
  112. const readStream = fs.createReadStream(zipFile);
  113. const unzipStream = readStream.pipe(unzipper.Parse());
  114. const files = [];
  115. unzipStream.on('entry', (entry) => {
  116. const fileName = entry.path;
  117. if (fileName === this.growiBridgeService.getMetaFileName()) {
  118. // skip meta.json
  119. entry.autodrain();
  120. }
  121. else {
  122. const jsonFile = path.join(this.baseDir, fileName);
  123. const writeStream = fs.createWriteStream(jsonFile, { encoding: this.growiBridgeService.getEncoding() });
  124. entry.pipe(writeStream);
  125. files.push(jsonFile);
  126. }
  127. });
  128. await streamToPromise(unzipStream);
  129. return files;
  130. }
  131. /**
  132. * execute unorderedBulkOp and ignore errors
  133. *
  134. * @memberOf ImportService
  135. * @param {object} unorderedBulkOp result of Model.collection.initializeUnorderedBulkOp()
  136. * @return {{nInserted: number, failed: Array.<string>}} number of docuemnts inserted and failed
  137. */
  138. async execUnorderedBulkOpSafely(unorderedBulkOp) {
  139. // keep the number of documents inserted and failed for logger
  140. let nInserted = 0;
  141. const failed = [];
  142. // try catch to skip errors
  143. try {
  144. const log = await unorderedBulkOp.execute();
  145. nInserted = log.result.nInserted;
  146. }
  147. catch (err) {
  148. const collectionName = unorderedBulkOp.s.namespace;
  149. for (const error of err.result.result.writeErrors) {
  150. logger.error(`${collectionName}: ${error.errmsg}`);
  151. failed.push(error.err.op._id);
  152. }
  153. nInserted = err.result.result.nInserted;
  154. }
  155. logger.debug(`Importing ${unorderedBulkOp.s.collection.s.name}. Inserted: ${nInserted}. Failed: ${failed.length}.`);
  156. return {
  157. nInserted,
  158. failed,
  159. };
  160. }
  161. /**
  162. * execute unorderedBulkOp and ignore errors
  163. *
  164. * @memberOf ImportService
  165. * @param {object} Model instance of mongoose model
  166. * @param {object} _document document being imported
  167. * @param {object} overwriteParams overwrite each document with unrelated value. e.g. { creator: req.user }
  168. * @return {object} document to be persisted
  169. */
  170. convertDocuments(Model, _document, overwriteParams) {
  171. const collectionName = Model.collection.collectionName;
  172. const schema = Model.schema.paths;
  173. const convertMap = this.convertMap[collectionName];
  174. if (convertMap == null) {
  175. throw new Error(`attribute map is not defined for ${collectionName}`);
  176. }
  177. const document = {};
  178. // assign value from documents being imported
  179. for (const entry of Object.entries(convertMap)) {
  180. const [key, value] = entry;
  181. // distinguish between null and undefined
  182. if (_document[key] === undefined) {
  183. continue; // next entry
  184. }
  185. document[key] = (typeof value === 'function') ? value(_document[key], { _document, key, schema }) : value;
  186. }
  187. // overwrite documents with custom values
  188. for (const entry of Object.entries(overwriteParams)) {
  189. const [key, value] = entry;
  190. // distinguish between null and undefined
  191. if (_document[key] !== undefined) {
  192. document[key] = (typeof value === 'function') ? value(_document[key], { _document, key, schema }) : value;
  193. }
  194. }
  195. return document;
  196. }
  197. /**
  198. * validate using meta.json
  199. * to pass validation, all the criteria must be met
  200. * - ${version of this growi} === ${version of growi that exported data}
  201. *
  202. * @memberOf ImportService
  203. * @param {object} meta meta data from meta.json
  204. */
  205. validate(meta) {
  206. if (meta.version !== this.crowi.version) {
  207. throw new Error('the version of this growi and the growi that exported the data are not met');
  208. }
  209. // TODO: check if all migrations are completed
  210. // - export: throw err if there are pending migrations
  211. // - import: throw err if there are pending migrations
  212. }
  213. }
  214. module.exports = ImportService;