import.js 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. const logger = require('@alias/logger')('growi:services:ImportService'); // eslint-disable-line no-unused-vars
  2. const fs = require('fs');
  3. const path = require('path');
  4. const JSONStream = require('JSONStream');
  5. const streamToPromise = require('stream-to-promise');
  6. const unzipper = require('unzipper');
  7. const { ObjectId } = require('mongoose').Types;
  8. class ImportService {
  9. constructor(crowi) {
  10. this.baseDir = path.join(crowi.tmpDir, 'imports');
  11. this.metaFileName = 'meta.json';
  12. this.encoding = 'utf-8';
  13. this.per = 100;
  14. this.keepOriginal = this.keepOriginal.bind(this);
  15. // { pages: Page, users: User, ... }
  16. this.collectionMap = {};
  17. this.initCollectionMap(crowi.models);
  18. // { pages: { _id: ..., path: ..., ...}, users: { _id: ..., username: ..., }, ... }
  19. this.convertMap = {};
  20. this.initConvertMap(crowi.models);
  21. }
  22. /**
  23. * initialize collection map
  24. *
  25. * @memberOf ImportService
  26. * @param {object} models from models/index.js
  27. */
  28. initCollectionMap(models) {
  29. for (const model of Object.values(models)) {
  30. this.collectionMap[model.collection.collectionName] = model;
  31. }
  32. }
  33. /**
  34. * initialize convert map. set keepOriginal as default
  35. *
  36. * @memberOf ImportService
  37. * @param {object} models from models/index.js
  38. */
  39. initConvertMap(models) {
  40. // by default, original value is used for imported documents
  41. for (const model of Object.values(models)) {
  42. const { collectionName } = model.collection;
  43. this.convertMap[collectionName] = {};
  44. for (const key of Object.keys(model.schema.paths)) {
  45. this.convertMap[collectionName][key] = this.keepOriginal;
  46. }
  47. }
  48. }
  49. /**
  50. * keep original value
  51. * automatically convert ObjectId
  52. *
  53. * @memberOf ImportService
  54. * @param {array<object>} _value value from imported document
  55. * @param {{ _document: object, schema: object, key: string }}
  56. * @return {any} new value for the document
  57. */
  58. keepOriginal(_value, { _document, schema, key }) {
  59. let value;
  60. if (schema[key].instance === 'ObjectID' && ObjectId.isValid(_value)) {
  61. value = ObjectId(_value);
  62. }
  63. else {
  64. value = _value;
  65. }
  66. return value;
  67. }
  68. /**
  69. * import a collection from json
  70. *
  71. * @memberOf ImportService
  72. * @param {object} Model instance of mongoose model
  73. * @param {object} overwriteParams overwrite each document with unrelated value. e.g. { creator: req.user }
  74. */
  75. async import(Model, jsonFile, overwriteParams = {}) {
  76. const { collectionName } = Model.collection;
  77. let counter = 0;
  78. let nInsertedTotal = 0;
  79. let failedIds = [];
  80. let unorderedBulkOp = Model.collection.initializeUnorderedBulkOp();
  81. const readStream = fs.createReadStream(jsonFile, { encoding: this.encoding });
  82. const jsonStream = readStream.pipe(JSONStream.parse('*'));
  83. jsonStream.on('data', async(document) => {
  84. // documents are not persisted until unorderedBulkOp.execute()
  85. unorderedBulkOp.insert(this.convertDocuments(Model, document, overwriteParams));
  86. counter++;
  87. if (counter % this.per === 0) {
  88. // puase jsonStream to prevent more items to be added to unorderedBulkOp
  89. jsonStream.pause();
  90. const { nInserted, failed } = await this.execUnorderedBulkOpSafely(unorderedBulkOp);
  91. nInsertedTotal += nInserted;
  92. failedIds = [...failedIds, ...failed];
  93. // reset initializeUnorderedBulkOp
  94. unorderedBulkOp = Model.collection.initializeUnorderedBulkOp();
  95. // resume jsonStream
  96. jsonStream.resume();
  97. }
  98. });
  99. jsonStream.on('end', async(data) => {
  100. // insert the rest. avoid errors when unorderedBulkOp has no items
  101. if (unorderedBulkOp.s.currentBatch !== null) {
  102. const { nInserted, failed } = await this.execUnorderedBulkOpSafely(unorderedBulkOp);
  103. nInsertedTotal += nInserted;
  104. failedIds = [...failedIds, ...failed];
  105. }
  106. logger.info(`Done. Inserted ${nInsertedTotal} ${collectionName}.`);
  107. if (failedIds.length > 0) {
  108. logger.error(`Failed to insert ${failedIds.length} ${collectionName}: ${failedIds.join(', ')}.`);
  109. }
  110. });
  111. // streamToPromise(jsonStream) throws error, so await readStream instead
  112. await streamToPromise(readStream);
  113. // clean up tmp directory
  114. fs.unlinkSync(jsonFile);
  115. }
  116. /**
  117. * extract a zip file
  118. *
  119. * @memberOf ImportService
  120. * @param {string} zipFile path to zip file
  121. * @return {object} meta{object} and files{array<object>}
  122. */
  123. async unzip(zipFile) {
  124. const readStream = fs.createReadStream(zipFile);
  125. const unzipStream = readStream.pipe(unzipper.Parse());
  126. const files = [];
  127. unzipStream.on('entry', (entry) => {
  128. const fileName = entry.path;
  129. const size = entry.vars.uncompressedSize; // There is also compressedSize;
  130. if (fileName === this.metaFileName) {
  131. // TODO: parse meta.json
  132. entry.autodrain();
  133. }
  134. else {
  135. const writeStream = fs.createWriteStream(this.getJsonFile(fileName), { encoding: this.encoding });
  136. entry.pipe(writeStream);
  137. files.push({
  138. fileName,
  139. collectionName: path.basename(fileName, '.json'),
  140. size,
  141. });
  142. }
  143. });
  144. await streamToPromise(unzipStream);
  145. return {
  146. meta: {},
  147. files,
  148. };
  149. }
  150. /**
  151. * execute unorderedBulkOp and ignore errors
  152. *
  153. * @memberOf ImportService
  154. * @param {object} unorderedBulkOp result of Model.collection.initializeUnorderedBulkOp()
  155. * @return {{nInserted: number, failed: string[]}} number of docuemnts inserted and failed
  156. */
  157. async execUnorderedBulkOpSafely(unorderedBulkOp) {
  158. // keep the number of documents inserted and failed for logger
  159. let nInserted = 0;
  160. const failed = [];
  161. // try catch to skip errors
  162. try {
  163. const log = await unorderedBulkOp.execute();
  164. nInserted = log.result.nInserted;
  165. }
  166. catch (err) {
  167. for (const error of err.result.result.writeErrors) {
  168. logger.error(error.errmsg);
  169. failed.push(error.err.op._id);
  170. }
  171. nInserted = err.result.result.nInserted;
  172. }
  173. logger.debug(`Importing ${unorderedBulkOp.s.collection.s.name}. Inserted: ${nInserted}. Failed: ${failed.length}.`);
  174. return {
  175. nInserted,
  176. failed,
  177. };
  178. }
  179. /**
  180. * execute unorderedBulkOp and ignore errors
  181. *
  182. * @memberOf ImportService
  183. * @param {object} Model instance of mongoose model
  184. * @param {object} _document document being imported
  185. * @param {object} overwriteParams overwrite each document with unrelated value. e.g. { creator: req.user }
  186. * @return {object} document to be persisted
  187. */
  188. convertDocuments(Model, _document, overwriteParams) {
  189. const collectionName = Model.collection.collectionName;
  190. const schema = Model.schema.paths;
  191. const convertMap = this.convertMap[collectionName];
  192. if (convertMap == null) {
  193. throw new Error(`attribute map is not defined for ${collectionName}`);
  194. }
  195. const document = {};
  196. // assign value from documents being imported
  197. for (const entry of Object.entries(convertMap)) {
  198. const [key, value] = entry;
  199. // distinguish between null and undefined
  200. if (_document[key] === undefined) {
  201. continue; // next entry
  202. }
  203. document[key] = (typeof value === 'function') ? value(_document[key], { _document, key, schema }) : value;
  204. }
  205. // overwrite documents with custom values
  206. for (const entry of Object.entries(overwriteParams)) {
  207. const [key, value] = entry;
  208. // distinguish between null and undefined
  209. if (_document[key] !== undefined) {
  210. document[key] = (typeof value === 'function') ? value(_document[key], { _document, key, schema }) : value;
  211. }
  212. }
  213. return document;
  214. }
  215. /**
  216. * get a model from collection name
  217. *
  218. * @memberOf ImportService
  219. * @param {object} collectionName collection name
  220. * @return {object} instance of mongoose model
  221. */
  222. getModelFromCollectionName(collectionName) {
  223. const Model = this.collectionMap[collectionName];
  224. if (Model == null) {
  225. throw new Error(`cannot find a model for collection name "${collectionName}"`);
  226. }
  227. return Model;
  228. }
  229. /**
  230. * get the absolute path to a file
  231. *
  232. * @memberOf ImportService
  233. * @param {object} fileName base name of file
  234. * @param {boolean} [validate=false] boolean to check if the file exists
  235. * @return {string} absolute path to the file
  236. */
  237. getJsonFile(fileName, validate = false) {
  238. const jsonFile = path.join(this.baseDir, fileName);
  239. if (validate) {
  240. try {
  241. fs.accessSync(jsonFile);
  242. }
  243. catch (err) {
  244. if (err.code === 'ENOENT') {
  245. logger.error(`${jsonFile} does not exist`, err);
  246. }
  247. else {
  248. logger.error(err);
  249. }
  250. throw err;
  251. }
  252. }
  253. return jsonFile;
  254. }
  255. }
  256. module.exports = ImportService;