Browse Source

devide pipeline to index

Yuki Takei 1 year ago
parent
commit
50c277571a

+ 139 - 0
apps/app/src/server/service/search-delegator/aggregate-to-index.ts

@@ -0,0 +1,139 @@
+import type { IPage } from '@growi/core';
+import type { PipelineStage, Query } from 'mongoose';
+
+import type { PageModel } from '~/server/models/page';
+
+export const aggregatePipelineToIndex = (maxBodyLengthToIndex: number, query?: Query<PageModel, IPage>): PipelineStage[] => {
+
+  const basePipeline = query == null
+    ? []
+    : [{ $match: query.getQuery() }];
+
+  return [
+    ...basePipeline,
+
+    // join Revision
+    {
+      $lookup: {
+        from: 'revisions',
+        localField: 'revision',
+        foreignField: '_id',
+        as: 'revision',
+      },
+    },
+    // unwind and filter pages that does not have revision
+    {
+      $unwind: {
+        path: '$revision',
+      },
+    },
+    {
+      $addFields: {
+        bodyLength: { $strLenCP: '$revision.body' },
+      },
+    },
+
+    // join User
+    {
+      $lookup: {
+        from: 'users',
+        localField: 'creator',
+        foreignField: '_id',
+        as: 'creator',
+      },
+    },
+    {
+      $unwind: {
+        path: '$creator',
+        preserveNullAndEmptyArrays: true,
+      },
+    },
+
+    // join Comment
+    {
+      $lookup: {
+        from: 'comments',
+        localField: '_id',
+        foreignField: 'page',
+        pipeline: [
+          {
+            $addFields: {
+              commentLength: { $strLenCP: '$comment' },
+            },
+          },
+        ],
+        as: 'comments',
+      },
+    },
+    {
+      $addFields: {
+        commentsCount: { $size: '$comments' },
+      },
+    },
+
+    // join Bookmark
+    {
+      $lookup: {
+        from: 'bookmarks',
+        localField: '_id',
+        foreignField: 'page',
+        as: 'bookmarks',
+      },
+    },
+    {
+      $addFields: {
+        bookmarksCount: { $size: '$bookmarks' },
+      },
+    },
+
+    // add counts for embedded arrays
+    {
+      $addFields: {
+        likeCount: { $size: '$liker' },
+      },
+    },
+    {
+      $addFields: {
+        seenUsersCount: { $size: '$seenUsers' },
+      },
+    },
+
+    // project
+    {
+      $project: {
+        path: 1,
+        createdAt: 1,
+        updatedAt: 1,
+        grant: 1,
+        grantedUsers: 1,
+        grantedGroups: 1,
+        'revision.body': {
+          $cond: {
+            if: { $lte: ['$bodyLength', maxBodyLengthToIndex] },
+            then: '$revision.body',
+            else: '',
+          },
+        },
+        comments: {
+          $map: {
+            input: '$comments',
+            as: 'comment',
+            in: {
+              $cond: {
+                if: { $lte: ['$$comment.commentLength', maxBodyLengthToIndex] },
+                then: '$$comment.comment',
+                else: '',
+              },
+            },
+          },
+        },
+        commentsCount: 1,
+        bookmarksCount: 1,
+        likeCount: 1,
+        seenUsersCount: 1,
+        'creator.username': 1,
+        'creator.email': 1,
+      },
+    },
+  ];
+};

+ 4 - 130
apps/app/src/server/service/search-delegator/elasticsearch.ts

@@ -22,6 +22,7 @@ import { configManager } from '../config-manager';
 import type { UpdateOrInsertPagesOpts } from '../interfaces/search';
 
 
+import { aggregatePipelineToIndex } from './aggregate-to-index';
 import type { AggregatedPage, BulkWriteBody, BulkWriteCommand } from './bulk-write';
 import ElasticsearchClient from './elasticsearch-client';
 
@@ -450,136 +451,9 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
 
     const maxBodyLengthToIndex = configManager.getConfig('crowi', 'app:elasticsearchMaxBodyLengthToIndex');
 
-    const readStream = Page
-      .aggregate<AggregatedPage>([
-        // filter targets
-        { $match: matchQuery.getQuery() },
-
-        // join Revision
-        {
-          $lookup: {
-            from: 'revisions',
-            localField: 'revision',
-            foreignField: '_id',
-            as: 'revision',
-          },
-        },
-        // unwind and filter pages that does not have revision
-        {
-          $unwind: {
-            path: '$revision',
-          },
-        },
-        {
-          $addFields: {
-            bodyLength: { $strLenCP: '$revision.body' },
-          },
-        },
-
-        // join User
-        {
-          $lookup: {
-            from: 'users',
-            localField: 'creator',
-            foreignField: '_id',
-            as: 'creator',
-          },
-        },
-        {
-          $unwind: {
-            path: '$creator',
-            preserveNullAndEmptyArrays: true,
-          },
-        },
-
-        // join Comment
-        {
-          $lookup: {
-            from: 'comments',
-            localField: '_id',
-            foreignField: 'page',
-            pipeline: [
-              {
-                $addFields: {
-                  commentLength: { $strLenCP: '$comment' },
-                },
-              },
-            ],
-            as: 'comments',
-          },
-        },
-        {
-          $addFields: {
-            commentsCount: { $size: '$comments' },
-          },
-        },
-
-        // join Bookmark
-        {
-          $lookup: {
-            from: 'bookmarks',
-            localField: '_id',
-            foreignField: 'page',
-            as: 'bookmarks',
-          },
-        },
-        {
-          $addFields: {
-            bookmarksCount: { $size: '$bookmarks' },
-          },
-        },
-
-        // add counts for embedded arrays
-        {
-          $addFields: {
-            likeCount: { $size: '$liker' },
-          },
-        },
-        {
-          $addFields: {
-            seenUsersCount: { $size: '$seenUsers' },
-          },
-        },
-
-        // project
-        {
-          $project: {
-            path: 1,
-            createdAt: 1,
-            updatedAt: 1,
-            grant: 1,
-            grantedUsers: 1,
-            grantedGroups: 1,
-            'revision.body': {
-              $cond: {
-                if: { $lte: ['$bodyLength', maxBodyLengthToIndex] },
-                then: '$revision.body',
-                else: '',
-              },
-            },
-            comments: {
-              $map: {
-                input: '$comments',
-                as: 'comment',
-                in: {
-                  $cond: {
-                    if: { $lte: ['$$comment.commentLength', maxBodyLengthToIndex] },
-                    then: '$$comment.comment',
-                    else: '',
-                  },
-                },
-              },
-            },
-            commentsCount: 1,
-            bookmarksCount: 1,
-            likeCount: 1,
-            seenUsersCount: 1,
-            'creator.username': 1,
-            'creator.email': 1,
-          },
-        },
-      ])
-      .cursor();
+    const readStream = Page.aggregate<AggregatedPage>(
+      aggregatePipelineToIndex(maxBodyLengthToIndex, matchQuery),
+    ).cursor();
 
     const bulkSize: number = configManager.getConfig('crowi', 'app:elasticsearchReindexBulkSize');
     const batchStream = createBatchStream(bulkSize);