import.js 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. const logger = require('@alias/logger')('growi:services:ImportService'); // eslint-disable-line no-unused-vars
  2. const fs = require('fs');
  3. const path = require('path');
  4. const JSONStream = require('JSONStream');
  5. const streamToPromise = require('stream-to-promise');
  6. class ImportService {
  7. constructor(crowi) {
  8. this.baseDir = path.join(crowi.tmpDir, 'downloads');
  9. this.encoding = 'utf-8';
  10. this.per = 100;
  11. this.zlibLevel = 9; // 0(min) - 9(max)
  12. }
  13. /**
  14. * import a collection from json
  15. *
  16. * @memberOf ImportService
  17. * @param {object} Model instance of mongoose model
  18. * @param {string} filePath path to zipped json
  19. */
  20. async importFromZip(Model, filePath) {
  21. const { collectionName } = Model.collection;
  22. let counter = 0;
  23. let nInsertedTotal = 0;
  24. let failedIds = [];
  25. let unorderedBulkOp = Model.collection.initializeUnorderedBulkOp();
  26. const readStream = fs.createReadStream(path.join(this.baseDir, 'pages.json'));
  27. const jsonStream = readStream.pipe(JSONStream.parse('*'));
  28. jsonStream.on('data', async(document) => {
  29. // documents are not persisted until unorderedBulkOp.execute()
  30. unorderedBulkOp.insert(document);
  31. counter++;
  32. if (counter % this.per === 0) {
  33. // puase jsonStream to prevent more items to be added to unorderedBulkOp
  34. jsonStream.pause();
  35. const { nInserted, failed } = await this.execUnorderedBulkOpSafely(unorderedBulkOp);
  36. nInsertedTotal += nInserted;
  37. failedIds = [...failedIds, ...failed];
  38. // reset initializeUnorderedBulkOp
  39. unorderedBulkOp = Model.collection.initializeUnorderedBulkOp();
  40. // resume jsonStream
  41. jsonStream.resume();
  42. }
  43. });
  44. jsonStream.on('end', async(data) => {
  45. // insert the rest. avoid errors when unorderedBulkOp has no items
  46. if (unorderedBulkOp.s.currentBatch !== null) {
  47. const { nInserted, failed } = await this.execUnorderedBulkOpSafely(unorderedBulkOp);
  48. nInsertedTotal += nInserted;
  49. failedIds = [...failedIds, ...failed];
  50. }
  51. logger.info(`Done. Inserted ${nInsertedTotal} ${collectionName}.`);
  52. if (failedIds.length > 0) {
  53. logger.error(`Failed to insert ${failedIds.length} ${collectionName}: ${failedIds.join(', ')}.`);
  54. }
  55. });
  56. // streamToPromise(jsonStream) throws error, so await readStream instead
  57. await streamToPromise(readStream);
  58. }
  59. /**
  60. * execute unorderedBulkOp and ignore errors
  61. *
  62. * @memberOf ImportService
  63. * @param {object} unorderedBulkOp result of Model.collection.initializeUnorderedBulkOp()
  64. * @param {string} filePath path to zipped json
  65. * @return {{nInserted: number, failed: string[]}} number of docuemnts inserted and failed
  66. */
  67. async execUnorderedBulkOpSafely(unorderedBulkOp) {
  68. // keep the number of documents inserted and failed for logger
  69. let nInserted = 0;
  70. const failed = [];
  71. // try catch to skip errors
  72. try {
  73. const log = await unorderedBulkOp.execute();
  74. nInserted = log.result.nInserted;
  75. }
  76. catch (err) {
  77. for (const error of err.result.result.writeErrors) {
  78. logger.error(error.errmsg);
  79. failed.push(error.err.op._id);
  80. }
  81. nInserted = err.result.result.nInserted;
  82. }
  83. logger.debug(`Importing ${unorderedBulkOp.s.collection.s.name}. Inserted: ${nInserted}. Failed: ${failed.length}.`);
  84. return {
  85. nInserted,
  86. failed,
  87. };
  88. }
  89. }
  90. module.exports = ImportService;