import.js 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. const logger = require('@alias/logger')('growi:services:ImportService'); // eslint-disable-line no-unused-vars
  2. const fs = require('fs');
  3. const path = require('path');
  4. const JSONStream = require('JSONStream');
  5. const streamToPromise = require('stream-to-promise');
  6. const unzip = require('unzipper');
  7. const { ObjectId } = require('mongoose').Types;
  8. class ImportService {
  9. constructor(crowi) {
  10. this.baseDir = path.join(crowi.tmpDir, 'imports');
  11. this.encoding = 'utf-8';
  12. this.per = 100;
  13. this.keepOriginal = this.keepOriginal.bind(this);
  14. // { pages: Page, users: User, ... }
  15. this.collectionMap = {};
  16. this.initCollectionMap(crowi.models);
  17. // { pages: { _id: ..., path: ..., ...}, users: { _id: ..., username: ..., }, ... }
  18. this.convertMap = {
  19. pages: {
  20. status: 'published', // FIXME when importing users and user groups
  21. grant: 1, // FIXME when importing users and user groups
  22. grantedUsers: [], // FIXME when importing users and user groups
  23. grantedGroup: null, // FIXME when importing users and user groups
  24. liker: [], // FIXME when importing users
  25. seenUsers: [], // FIXME when importing users
  26. commentCount: 0, // FIXME when importing comments
  27. extended: {}, // FIXME when ?
  28. pageIdOnHackmd: undefined, // FIXME when importing hackmd?
  29. revisionHackmdSynced: undefined, // FIXME when importing hackmd?
  30. hasDraftOnHackmd: undefined, // FIXME when importing hackmd?
  31. },
  32. };
  33. this.initConvertMap(crowi.models);
  34. }
  35. /**
  36. * initialize collection map
  37. *
  38. * @memberOf ImportService
  39. * @param {object} models from models/index.js
  40. */
  41. initCollectionMap(models) {
  42. for (const model of Object.values(models)) {
  43. this.collectionMap[model.collection.collectionName] = model;
  44. }
  45. }
  46. /**
  47. * initialize convert map. set keepOriginal as default
  48. *
  49. * @memberOf ImportService
  50. * @param {object} models from models/index.js
  51. */
  52. initConvertMap(models) {
  53. // by default, original value is used for imported documents
  54. for (const model of Object.values(models)) {
  55. const { collectionName } = model.collection;
  56. // temporary store for convert map
  57. const _convertMap = this.convertMap[collectionName];
  58. this.convertMap[collectionName] = {};
  59. for (const key of Object.keys(model.schema.paths)) {
  60. this.convertMap[collectionName][key] = this.keepOriginal;
  61. }
  62. // assign back the original convert map
  63. Object.assign(this.convertMap[collectionName], _convertMap);
  64. }
  65. }
  66. /**
  67. * keep original value
  68. * automatically convert ObjectId
  69. *
  70. * @memberOf ImportService
  71. * @param {array<object>} _value value from imported document
  72. * @param {{ _document: object, schema: object, key: string }}
  73. * @return {any} new value for the document
  74. */
  75. keepOriginal(_value, { _document, schema, key }) {
  76. let value;
  77. if (schema[key].instance === 'ObjectID' && ObjectId.isValid(_value)) {
  78. value = ObjectId(_value);
  79. }
  80. else {
  81. value = _value;
  82. }
  83. return value;
  84. }
  85. /**
  86. * import a collection from json
  87. *
  88. * @memberOf ImportService
  89. * @param {object} Model instance of mongoose model
  90. * @param {string} filePath path to zipped json
  91. * @param {object} overwriteParams overwrite each document with unrelated value. e.g. { creator: req.user }
  92. */
  93. async importFromZip(Model, filePath, overwriteParams = {}) {
  94. const { collectionName } = Model.collection;
  95. // extract zip file
  96. await this.unzip(filePath);
  97. let counter = 0;
  98. let nInsertedTotal = 0;
  99. let failedIds = [];
  100. let unorderedBulkOp = Model.collection.initializeUnorderedBulkOp();
  101. const tmpJson = path.join(this.baseDir, `${collectionName}.json`);
  102. const readStream = fs.createReadStream(tmpJson, { encoding: this.encoding });
  103. const jsonStream = readStream.pipe(JSONStream.parse('*'));
  104. jsonStream.on('data', async(document) => {
  105. // documents are not persisted until unorderedBulkOp.execute()
  106. unorderedBulkOp.insert(this.convertDocuments(Model, document, overwriteParams));
  107. counter++;
  108. if (counter % this.per === 0) {
  109. // puase jsonStream to prevent more items to be added to unorderedBulkOp
  110. jsonStream.pause();
  111. const { nInserted, failed } = await this.execUnorderedBulkOpSafely(unorderedBulkOp);
  112. nInsertedTotal += nInserted;
  113. failedIds = [...failedIds, ...failed];
  114. // reset initializeUnorderedBulkOp
  115. unorderedBulkOp = Model.collection.initializeUnorderedBulkOp();
  116. // resume jsonStream
  117. jsonStream.resume();
  118. }
  119. });
  120. jsonStream.on('end', async(data) => {
  121. // insert the rest. avoid errors when unorderedBulkOp has no items
  122. if (unorderedBulkOp.s.currentBatch !== null) {
  123. const { nInserted, failed } = await this.execUnorderedBulkOpSafely(unorderedBulkOp);
  124. nInsertedTotal += nInserted;
  125. failedIds = [...failedIds, ...failed];
  126. }
  127. logger.info(`Done. Inserted ${nInsertedTotal} ${collectionName}.`);
  128. if (failedIds.length > 0) {
  129. logger.error(`Failed to insert ${failedIds.length} ${collectionName}: ${failedIds.join(', ')}.`);
  130. }
  131. });
  132. // streamToPromise(jsonStream) throws error, so await readStream instead
  133. await streamToPromise(readStream);
  134. // clean up tmp directory
  135. fs.unlinkSync(tmpJson);
  136. }
  137. /**
  138. * extract a zip file
  139. *
  140. * @memberOf ImportService
  141. * @param {string} zipFilePath path to zip file
  142. */
  143. unzip(zipFilePath) {
  144. return new Promise((resolve, reject) => {
  145. const unzipStream = fs.createReadStream(zipFilePath).pipe(unzip.Extract({ path: this.baseDir }));
  146. unzipStream.on('error', (err) => {
  147. reject(err);
  148. });
  149. unzipStream.on('close', () => {
  150. resolve();
  151. });
  152. });
  153. }
  154. /**
  155. * execute unorderedBulkOp and ignore errors
  156. *
  157. * @memberOf ImportService
  158. * @param {object} unorderedBulkOp result of Model.collection.initializeUnorderedBulkOp()
  159. * @return {{nInserted: number, failed: string[]}} number of docuemnts inserted and failed
  160. */
  161. async execUnorderedBulkOpSafely(unorderedBulkOp) {
  162. // keep the number of documents inserted and failed for logger
  163. let nInserted = 0;
  164. const failed = [];
  165. // try catch to skip errors
  166. try {
  167. const log = await unorderedBulkOp.execute();
  168. nInserted = log.result.nInserted;
  169. }
  170. catch (err) {
  171. for (const error of err.result.result.writeErrors) {
  172. logger.error(error.errmsg);
  173. failed.push(error.err.op._id);
  174. }
  175. nInserted = err.result.result.nInserted;
  176. }
  177. logger.debug(`Importing ${unorderedBulkOp.s.collection.s.name}. Inserted: ${nInserted}. Failed: ${failed.length}.`);
  178. return {
  179. nInserted,
  180. failed,
  181. };
  182. }
  183. /**
  184. * execute unorderedBulkOp and ignore errors
  185. *
  186. * @memberOf ImportService
  187. * @param {object} Model instance of mongoose model
  188. * @param {object} _document document being imported
  189. * @param {object} overwriteParams overwrite each document with unrelated value. e.g. { creator: req.user }
  190. * @return {object} document to be persisted
  191. */
  192. convertDocuments(Model, _document, overwriteParams) {
  193. const collectionName = Model.collection.collectionName;
  194. const schema = Model.schema.paths;
  195. const convertMap = this.convertMap[collectionName];
  196. if (convertMap == null) {
  197. throw new Error(`attribute map is not defined for ${collectionName}`);
  198. }
  199. const document = {};
  200. // assign value from documents being imported
  201. for (const entry of Object.entries(convertMap)) {
  202. const [key, value] = entry;
  203. // distinguish between null and undefined
  204. if (_document[key] === undefined) {
  205. continue; // next entry
  206. }
  207. document[key] = (typeof value === 'function') ? value(_document[key], { _document, key, schema }) : value;
  208. }
  209. // overwrite documents with custom values
  210. for (const entry of Object.entries(overwriteParams)) {
  211. const [key, value] = entry;
  212. // distinguish between null and undefined
  213. if (_document[key] !== undefined) {
  214. document[key] = (typeof value === 'function') ? value(_document[key], { _document, key, schema }) : value;
  215. }
  216. }
  217. return document;
  218. }
  219. /**
  220. * get a model from collection name
  221. *
  222. * @memberOf ImportService
  223. * @param {object} collectionName collection name
  224. * @return {object} instance of mongoose model
  225. */
  226. getModelFromCollectionName(collectionName) {
  227. const Model = this.collectionMap[collectionName];
  228. if (Model == null) {
  229. throw new Error(`cannot find a model for collection name "${collectionName}"`);
  230. }
  231. return Model;
  232. }
  233. }
  234. module.exports = ImportService;