Browse Source

Use stream instead of recursive function

Taichi Masuyama 4 năm trước cách đây
mục cha
commit
d1f4cb6d73

+ 75 - 56
packages/app/src/migrations/20211227060705-revision-path-to-page-id-schema-migration.js

@@ -1,8 +1,11 @@
 import mongoose from 'mongoose';
+import { Writable } from 'stream';
+import streamToPromise from 'stream-to-promise';
 
 import { getModelSafely, getMongoUri, mongoOptions } from '@growi/core';
 import loggerFactory from '~/utils/logger';
 import getPageModel from '~/server/models/page';
+import { createBatchStream } from '~/server/util/batch-stream';
 
 
 const logger = loggerFactory('growi:migrate:revision-path-to-page-id-schema-migration');
@@ -16,34 +19,42 @@ module.exports = {
     const Page = getModelSafely('Page') || getPageModel();
     const Revision = getModelSafely('Revision') || require('~/server/models/revision')();
 
-    const recursiveUpdate = async(offset = 0) => {
-      const pages = await Page.find({ revision: { $ne: null } }, { _id: 1, revision: 1 }).skip(offset).limit(LIMIT).exec();
-      if (pages.length === 0) {
-        return;
-      }
-
-      const updateManyOperations = pages.map((page) => {
-        return {
-          updateMany: {
-            filter: { _id: page.revision },
-            update: [
-              {
-                $unset: ['path'],
-              },
-              {
-                $set: { pageId: page._id },
-              },
-            ],
-          },
-        };
-      });
-
-      await Revision.bulkWrite(updateManyOperations);
-
-      await recursiveUpdate(offset + LIMIT);
-    };
-
-    await recursiveUpdate();
+    const pagesStream = await Page.find({ revision: { $ne: null } }, { _id: 1, revision: 1 }).cursor({ batch_size: LIMIT });
+    const batchStrem = createBatchStream(LIMIT);
+
+    const migratePagesStream = new Writable({
+      objectMode: true,
+      async write(pages, encoding, callback) {
+        const updateManyOperations = pages.map((page) => {
+          return {
+            updateMany: {
+              filter: { _id: page.revision },
+              update: [
+                {
+                  $unset: ['path'],
+                },
+                {
+                  $set: { pageId: page._id },
+                },
+              ],
+            },
+          };
+        });
+
+        await Revision.bulkWrite(updateManyOperations);
+
+        callback();
+      },
+      final(callback) {
+        callback();
+      },
+    });
+
+    pagesStream
+      .pipe(batchStrem)
+      .pipe(migratePagesStream);
+
+    await streamToPromise(migratePagesStream);
 
     logger.info('Migration has successfully applied');
   },
@@ -54,34 +65,42 @@ module.exports = {
     const Page = getModelSafely('Page') || getPageModel();
     const Revision = getModelSafely('Revision') || require('~/server/models/revision')();
 
-    const recursiveUpdate = async(offset = 0) => {
-      const pages = await Page.find({ revision: { $ne: null } }, { _id: 1, revision: 1, path: 1 }).skip(offset).limit(LIMIT).exec();
-      if (pages.length === 0) {
-        return;
-      }
-
-      const updateManyOperations = pages.map((page) => {
-        return {
-          updateMany: {
-            filter: { _id: page.revision },
-            update: [
-              {
-                $unset: ['pageId'],
-              },
-              {
-                $set: { path: page.path },
-              },
-            ],
-          },
-        };
-      });
-
-      await Revision.bulkWrite(updateManyOperations);
-
-      await recursiveUpdate(offset + LIMIT);
-    };
-
-    await recursiveUpdate();
+    const pagesStream = await Page.find({ revision: { $ne: null } }, { _id: 1, revision: 1, path: 1 }).cursor({ batch_size: LIMIT });
+    const batchStrem = createBatchStream(LIMIT);
+
+    const migratePagesStream = new Writable({
+      objectMode: true,
+      async write(pages, encoding, callback) {
+        const updateManyOperations = pages.map((page) => {
+          return {
+            updateMany: {
+              filter: { _id: page.revision },
+              update: [
+                {
+                  $unset: ['pageId'],
+                },
+                {
+                  $set: { path: page.path },
+                },
+              ],
+            },
+          };
+        });
+
+        await Revision.bulkWrite(updateManyOperations);
+
+        callback();
+      },
+      final(callback) {
+        callback();
+      },
+    });
+
+    pagesStream
+      .pipe(batchStrem)
+      .pipe(migratePagesStream);
+
+    await streamToPromise(migratePagesStream);
 
     logger.info('Migration down has successfully applied');
   },