Просмотр исходного кода

Merge pull request #8419 from weseek/feat/136137-139511-use-stream-and-bulk-write-for-children-grant-update

use stream and bulk write for children grant update
Yuki Takei 2 лет назад
Родитель
Сommit
02f21c2584
1 измененных файлов с 39 добавлено и 16 удалено
  1. 39 16
      apps/app/src/server/service/page/index.ts

+ 39 - 16
apps/app/src/server/service/page/index.ts

@@ -7,7 +7,7 @@ import type {
   IPage, IPageInfo, IPageInfoAll, IPageInfoForEntity, IPageWithMeta, IGrantedGroup, IRevisionHasId,
 } from '@growi/core';
 import {
-  PageGrant, PageStatus, getIdForRef, isPopulated,
+  PageGrant, PageStatus, getIdForRef,
 } from '@growi/core';
 import {
   pagePathUtils, pathUtils,
@@ -34,7 +34,6 @@ import {
 import { createBatchStream } from '~/server/util/batch-stream';
 import loggerFactory from '~/utils/logger';
 import { prepareDeleteConfigValuesForCalc } from '~/utils/page-delete-config';
-import { batchProcessPromiseAll } from '~/utils/promise';
 
 import { ObjectIdLike } from '../../interfaces/mongoose-utils';
 import { Attachment } from '../../models';
@@ -2356,7 +2355,7 @@ class PageService implements IPageService {
     return updatedPage;
   }
 
-  private async applyScopesToDescendants(parentPage, user, isV4 = false) {
+  private async applyScopesToDescendantsWithStream(parentPage, user, isV4 = false) {
     const Page = this.crowi.model('Page');
     const builder = new Page.PageQueryBuilder(Page.find());
     builder.addConditionToListOnlyDescendants(parentPage.path);
@@ -2374,25 +2373,49 @@ class PageService implements IPageService {
     const grant = parentPage.grant;
 
     const userRelatedGroups = await this.pageGrantService.getUserRelatedGroups(user);
+    const userRelatedParentGrantedGroups = this.pageGrantService.getUserRelatedGrantedGroupsSyncronously(
+      userRelatedGroups, parentPage,
+    );
+
+    const childPagesReadableStream = builder.query.cursor({ batchSize: BULK_REINDEX_SIZE });
+
+    const childPagesWritable = new Writable({
+      objectMode: true,
+      write: async(batch, encoding, callback) => {
+        await this.updateChildPagesGrant(batch, grant, user, userRelatedGroups, userRelatedParentGrantedGroups);
+        callback();
+      },
+    });
+
+    childPagesReadableStream
+      .pipe(createBatchStream(BULK_REINDEX_SIZE))
+      .pipe(childPagesWritable);
+    await streamToPromise(childPagesWritable);
+  }
 
-    const childPages = await builder.query;
-    await batchProcessPromiseAll(childPages, 20, async(childPage: any) => {
+  async updateChildPagesGrant(
+      pages: PageDocument[], grant: PageGrant, user, userRelatedGroups: PopulatedGrantedGroup[], userRelatedParentGrantedGroups: IGrantedGroup[],
+  ): Promise<void> {
+    const Page = this.crowi.model('Page');
+    const operations: any = [];
+
+    pages.forEach((childPage) => {
       let newChildGrantedGroups: IGrantedGroup[] = [];
       if (grant === PageGrant.GRANT_USER_GROUP) {
-        const userRelatedParentGrantedGroups = this.pageGrantService.getUserRelatedGrantedGroupsSyncronously(
-          userRelatedGroups, parentPage,
-        );
         newChildGrantedGroups = this.getNewGrantedGroupsSyncronously(userRelatedGroups, userRelatedParentGrantedGroups, childPage);
       }
       const canChangeGrant = this.pageGrantService
         .validateGrantChangeSyncronously(userRelatedGroups, childPage.grantedGroups, PageGrant.GRANT_USER_GROUP, newChildGrantedGroups);
       if (canChangeGrant) {
-        childPage.grant = grant;
-        childPage.grantedUsers = grant === PageGrant.GRANT_OWNER ? [user._id] : null;
-        childPage.grantedGroups = newChildGrantedGroups;
-        await childPage.save();
+        operations.push({
+          updateOne: {
+            filter: { _id: childPage._id },
+            update: { $set: { grant, grantedUsers: grant === PageGrant.GRANT_OWNER ? [user._id] : [], grantedGroups: newChildGrantedGroups } },
+          },
+        });
       }
     });
+    await Page.bulkWrite(operations);
   }
 
   /**
@@ -3839,7 +3862,7 @@ class PageService implements IPageService {
 
     // update scopes for descendants
     if (options.overwriteScopesOfDescendants) {
-      await this.applyScopesToDescendants(page, user);
+      await this.applyScopesToDescendantsWithStream(page, user);
     }
 
     await PageOperation.findByIdAndDelete(pageOpId);
@@ -3891,7 +3914,7 @@ class PageService implements IPageService {
 
     // update scopes for descendants
     if (options.overwriteScopesOfDescendants) {
-      this.applyScopesToDescendants(savedPage, user, true);
+      this.applyScopesToDescendantsWithStream(savedPage, user, true);
     }
 
     return savedPage;
@@ -4044,7 +4067,7 @@ class PageService implements IPageService {
 
     // 3. Update scopes for descendants
     if (options.overwriteScopesOfDescendants) {
-      await this.applyScopesToDescendants(currentPage, user);
+      await this.applyScopesToDescendantsWithStream(currentPage, user);
     }
 
     await PageOperation.findByIdAndDelete(pageOpId);
@@ -4241,7 +4264,7 @@ class PageService implements IPageService {
 
     // update scopes for descendants
     if (options.overwriteScopesOfDescendants) {
-      this.applyScopesToDescendants(savedPage, user, true);
+      this.applyScopesToDescendantsWithStream(savedPage, user, true);
     }