index.js 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. /* eslint-disable no-undef, no-var, vars-on-top, no-restricted-globals, regex/invalid, import/extensions */
  2. // ignore lint error because this file is js as mongoshell
  3. /**
  4. * @typedef {import('./types').MigrationModule} MigrationModule
  5. * @typedef {import('./types').ReplaceLatestRevisions} ReplaceLatestRevisions
  6. * @typedef {import('./types').Operatioins } Operations
  7. */
  8. var pagesCollection = db.getCollection('pages');
  9. var revisionsCollection = db.getCollection('revisions');
  10. var batchSize = Number(process.env.BATCH_SIZE ?? 100); // default 100 revisions in 1 bulkwrite
  11. var batchSizeInterval = Number(process.env.BATCH_INTERVAL ?? 3000); // default 3 sec
  12. var migrationModule = process.env.MIGRATION_MODULE;
  13. /** @type {MigrationModule[]} */
  14. var migrationModules = require(`./migrations/${migrationModule}`);
  15. if (migrationModules.length === 0) {
  16. throw Error('No valid migrationModules found. Please enter a valid environment variable');
  17. }
  18. /** @type {ReplaceLatestRevisions} */
  19. function replaceLatestRevisions(body, migrationModules) {
  20. return migrationModules.reduce((replacedBody, module) => module(replacedBody), body);
  21. }
  22. var pipeline = [
  23. // Join pages with revisions
  24. {
  25. $lookup: {
  26. from: 'revisions',
  27. localField: 'revision',
  28. foreignField: '_id',
  29. as: 'revisionDoc',
  30. },
  31. },
  32. // Unwind the revision array
  33. {
  34. $unwind: '$revisionDoc',
  35. },
  36. // Project only needed fields
  37. {
  38. $project: {
  39. _id: '$revisionDoc._id',
  40. body: '$revisionDoc.body',
  41. },
  42. },
  43. ];
  44. try {
  45. /** @type {Operations} */
  46. var operations = [];
  47. var processedCount = 0;
  48. var cursor = pagesCollection.aggregate(pipeline, {
  49. allowDiskUse: true,
  50. cursor: { batchSize },
  51. });
  52. while (cursor.hasNext()) {
  53. var doc = cursor.next();
  54. if (doc == null || doc.body == null) {
  55. continue;
  56. }
  57. try {
  58. var replacedBody = replaceLatestRevisions(doc.body, [...migrationModules]);
  59. operations.push({
  60. updateOne: {
  61. filter: { _id: doc._id },
  62. update: {
  63. $set: { body: replacedBody },
  64. },
  65. },
  66. });
  67. processedCount++;
  68. if (operations.length >= batchSize || !cursor.hasNext()) {
  69. revisionsCollection.bulkWrite(operations);
  70. if (batchSizeInterval > 0) {
  71. sleep(batchSizeInterval);
  72. }
  73. operations = [];
  74. }
  75. }
  76. catch (err) {
  77. print(`Error processing document ${doc?._id}: ${err}`);
  78. }
  79. }
  80. print('Migration complete!');
  81. }
  82. catch (err) {
  83. print(`Fatal error during migration: ${err}`);
  84. throw err;
  85. }