Shun Miyazawa 1 год назад
Родитель
Сommit
cf22c1cef8
1 измененных файлов с 60 добавлено и 27 удалено
  1. 60 27
      bin/data-migrations/src/index.js

+ 60 - 27
bin/data-migrations/src/index.js

@@ -25,48 +25,81 @@ if (migrationModules.length === 0) {
 
 /** @type {ReplaceLatestRevisions} */
 function replaceLatestRevisions(body, migrationModules) {
-  var replacedBody = body;
-  migrationModules.forEach((migrationModule) => {
-    replacedBody = migrationModule(replacedBody);
-  });
-  return replacedBody;
+  return migrationModules.reduce((replacedBody, module) => module(replacedBody), body);
 }
 
-/** @type {Operations} */
-var operations = [];
-pagesCollection.find({}).forEach((/** @type {any} */ doc) => {
-  if (doc.revision) {
-    try {
-      var revision = revisionsCollection.findOne({ _id: doc.revision });
+var pipeline = [
+  // Join pages with revisions
+  {
+    $lookup: {
+      from: 'revisions',
+      localField: 'revision',
+      foreignField: '_id',
+      as: 'revisionDoc',
+    },
+  },
+  // Unwind the revision array
+  {
+    $unwind: '$revisionDoc',
+  },
+  // Project only needed fields
+  {
+    $project: {
+      _id: '$revisionDoc._id',
+      body: '$revisionDoc.body',
+    },
+  },
+];
 
-      if (revision == null || revision.body == null) {
-        return;
-      }
 
-      var replacedBody = replaceLatestRevisions(revision.body, [...migrationModules]);
-      var operation = {
+try {
+  /** @type {Operations} */
+  var operations = [];
+  var processedCount = 0;
+
+  var cursor = pagesCollection.aggregate(pipeline, {
+    allowDiskUse: true,
+    cursor: { batchSize },
+  });
+
+  while (cursor.hasNext()) {
+    var doc = cursor.next();
+
+    if (doc == null || doc.body == null) {
+      continue;
+    }
+
+    try {
+      var replacedBody = replaceLatestRevisions(doc.body, [...migrationModules]);
+
+      operations.push({
         updateOne: {
-          filter: { _id: revision._id },
+          filter: { _id: doc._id },
           update: {
             $set: { body: replacedBody },
           },
         },
-      };
-      operations.push(operation);
+      });
+
+      processedCount++;
 
-      // bulkWrite per 100 revisions
-      if (operations.length > (batchSize - 1)) {
+      if (operations.length >= batchSize || !cursor.hasNext()) {
         revisionsCollection.bulkWrite(operations);
-        // sleep time can be set from env var
-        sleep(batchSizeInterval);
+        if (batchSizeInterval > 0) {
+          sleep(batchSizeInterval);
+        }
+
         operations = [];
       }
     }
     catch (err) {
-      print(`Error in updating revision ${doc.revision}: ${err}`);
+      print(`Error processing document ${doc?._id}: ${err}`);
     }
   }
-});
-revisionsCollection.bulkWrite(operations);
 
-print('migration complete!');
+  print('Migration complete!');
+}
+catch (err) {
+  print(`Fatal error during migration: ${err}`);
+  throw err;
+}