Просмотр исходного кода

Emit node event then emit socket event

Taichi Masuyama 3 лет назад
Родитель
Сommit
aa61e10aef

+ 8 - 0
packages/app/src/interfaces/events/page.ts

@@ -0,0 +1,8 @@
+export const PageEventName = {
+  // Public migration
+  PMStarted: 'PublicMigrationStarted',
+  PMMigrating: 'PublicMigrationMigrating',
+  PMErrorCount: 'PublicMigrationErrorCount',
+  PMEnded: 'PublicMigrationEnded',
+} as const;
+export type PageEventName = typeof PageEventName[keyof typeof PageEventName];

+ 41 - 8
packages/app/src/server/service/page.ts

@@ -33,6 +33,7 @@ import { PageRedirectModel } from '../models/page-redirect';
 import { serializePageSecurely } from '../models/serializers/page-serializer';
 import Subscription from '../models/subscription';
 import { V5ConversionError } from '../models/vo/v5-conversion-error';
+import { PageEventName } from '~/interfaces/events/page';
 
 const debug = require('debug')('growi:services:page');
 
@@ -235,6 +236,39 @@ class PageService {
         logger.error(err);
       }
     });
+
+    // parent normalization
+    this.pageEvent.on(PageEventName.PMStarted, async(data: { total: number }, user) => {
+      if (user == null || !user.isAdmin) {
+        return;
+      }
+
+      this.crowi.socketIoService.getAdminSocket().emit(SocketEventName.PMStarted, data);
+    });
+
+    this.pageEvent.on(PageEventName.PMMigrating, async(data: { count: number }, user) => {
+      if (user == null || !user.isAdmin) {
+        return;
+      }
+
+      this.crowi.socketIoService.getAdminSocket().emit(SocketEventName.PMMigrating, data);
+    });
+
+    this.pageEvent.on(PageEventName.PMErrorCount, async(data: { skip: number }, user) => {
+      if (user == null || !user.isAdmin) {
+        return;
+      }
+
+      this.crowi.socketIoService.getAdminSocket().emit(SocketEventName.PMErrorCount, data);
+    });
+
+    this.pageEvent.on(PageEventName.PMEnded, async(data: { isSucceeded: boolean }, user) => {
+      if (user == null || !user.isAdmin) {
+        return;
+      }
+
+      this.crowi.socketIoService.getAdminSocket().emit(SocketEventName.PMEnded, data);
+    });
   }
 
   canDeleteCompletely(creatorId: ObjectIdLike, operator, isRecursively: boolean): boolean {
@@ -2783,8 +2817,6 @@ class PageService {
     const BATCH_SIZE = 100;
     const PAGES_LIMIT = 1000;
 
-    const socket = this.crowi.socketIoService.getAdminSocket();
-
     const Page = mongoose.model('Page') as unknown as PageModel;
     const { PageQueryBuilder } = Page;
 
@@ -2805,7 +2837,7 @@ class PageService {
     // Limit pages to get
     const total = await Page.countDocuments(matchFilter);
     if (isFirst) {
-      socket.emit(SocketEventName.PMStarted, { total });
+      this.pageEvent.emit(PageEventName.PMStarted, { total });
     }
     if (total > PAGES_LIMIT) {
       baseAggregation = baseAggregation.limit(Math.floor(total * 0.3));
@@ -2820,6 +2852,7 @@ class PageService {
 
     // eslint-disable-next-line max-len
     const buildPipelineToCreateEmptyPagesByUser = this.buildPipelineToCreateEmptyPagesByUser.bind(this);
+    const pageEvent = this.pageEvent;
 
     const migratePagesStream = new Writable({
       objectMode: true,
@@ -2912,13 +2945,13 @@ class PageService {
           nextSkiped += res.result.writeErrors.length;
           logger.info(`Page migration processing: (migratedPages=${res.result.nModified})`);
 
-          socket.emit(SocketEventName.PMMigrating, { count: nextCount });
-          socket.emit(SocketEventName.PMErrorCount, { skip: nextSkiped });
+          pageEvent.emit(PageEventName.PMMigrating, { count: nextCount });
+          pageEvent.emit(PageEventName.PMErrorCount, { skip: nextSkiped });
 
           // Throw if any error is found
           if (res.result.writeErrors.length > 0) {
             logger.error('Failed to migrate some pages', res.result.writeErrors);
-            socket.emit(SocketEventName.PMEnded, { isSucceeded: false });
+            pageEvent.emit(PageEventName.PMEnded, { isSucceeded: false });
             throw Error('Failed to migrate some pages');
           }
 
@@ -2926,7 +2959,7 @@ class PageService {
           if (res.result.nModified === 0 && res.result.nMatched === 0) {
             shouldContinue = false;
             logger.error('Migration is unable to continue', 'parentPaths:', parentPaths, 'bulkWriteResult:', res);
-            socket.emit(SocketEventName.PMEnded, { isSucceeded: false });
+            pageEvent.emit(PageEventName.PMEnded, { isSucceeded: false });
           }
         }
         catch (err) {
@@ -2952,7 +2985,7 @@ class PageService {
     }
 
     // End
-    socket.emit(SocketEventName.PMEnded, { isSucceeded: true });
+    this.pageEvent.emit(PageEventName.PMEnded, { isSucceeded: true });
 
     return nextCount;
   }