index.js 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. /**
  2. * @typedef {import('./types').MigrationModule} MigrationModule
  3. * @typedef {import('./types').ReplaceLatestRevisions} ReplaceLatestRevisions
  4. * @typedef {import('./types').Operatioins } Operations
  5. */
  6. var pagesCollection = db.getCollection('pages');
  7. var revisionsCollection = db.getCollection('revisions');
  8. var batchSize = Number(process.env.BATCH_SIZE ?? 100); // default 100 revisions in 1 bulkwrite
  9. var batchSizeInterval = Number(process.env.BATCH_INTERVAL ?? 3000); // default 3 sec
  10. var migrationModule = process.env.MIGRATION_MODULE;
  11. /** @type {MigrationModule[]} */
  12. var migrationModules = require(`./migrations/${migrationModule}`);
  13. if (migrationModules.length === 0) {
  14. throw Error(
  15. 'No valid migrationModules found. Please enter a valid environment variable',
  16. );
  17. }
  18. /** @type {ReplaceLatestRevisions} */
  19. function replaceLatestRevisions(body, migrationModules) {
  20. return migrationModules.reduce(
  21. (replacedBody, module) => module(replacedBody),
  22. body,
  23. );
  24. }
  25. var pipeline = [
  26. // Join pages with revisions
  27. {
  28. $lookup: {
  29. from: 'revisions',
  30. localField: 'revision',
  31. foreignField: '_id',
  32. as: 'revisionDoc',
  33. },
  34. },
  35. // Unwind the revision array
  36. {
  37. $unwind: '$revisionDoc',
  38. },
  39. // Project only needed fields
  40. {
  41. $project: {
  42. _id: '$revisionDoc._id',
  43. body: '$revisionDoc.body',
  44. },
  45. },
  46. ];
  47. try {
  48. /** @type {Operations} */
  49. var operations = [];
  50. var processedCount = 0;
  51. var cursor = pagesCollection.aggregate(pipeline, {
  52. allowDiskUse: true,
  53. cursor: { batchSize },
  54. });
  55. while (cursor.hasNext()) {
  56. var doc = cursor.next();
  57. if (doc == null || doc.body == null) {
  58. continue;
  59. }
  60. try {
  61. var replacedBody = replaceLatestRevisions(doc.body, [
  62. ...migrationModules,
  63. ]);
  64. operations.push({
  65. updateOne: {
  66. filter: { _id: doc._id },
  67. update: {
  68. $set: { body: replacedBody },
  69. },
  70. },
  71. });
  72. processedCount++;
  73. if (operations.length >= batchSize || !cursor.hasNext()) {
  74. revisionsCollection.bulkWrite(operations);
  75. if (batchSizeInterval > 0) {
  76. sleep(batchSizeInterval);
  77. }
  78. operations = [];
  79. }
  80. } catch (err) {
  81. print(`Error processing document ${doc?._id}: ${err}`);
  82. }
  83. }
  84. print('Migration complete!');
  85. print(`Processed documents count: ${processedCount}`);
  86. } catch (err) {
  87. print(`Fatal error during migration: ${err}`);
  88. throw err;
  89. }