|
|
@@ -1,11 +1,11 @@
|
|
|
import { Writable, Transform } from 'stream';
|
|
|
import { URL } from 'url';
|
|
|
|
|
|
+import { getIdForRef, type IPage } from '@growi/core';
|
|
|
import gc from 'expose-gc/function';
|
|
|
import mongoose from 'mongoose';
|
|
|
import streamToPromise from 'stream-to-promise';
|
|
|
|
|
|
-import { Comment } from '~/features/comment/server';
|
|
|
import { SearchDelegatorName } from '~/interfaces/named-query';
|
|
|
import type { ISearchResult, ISearchResultData } from '~/interfaces/search';
|
|
|
import { SORT_AXIS, SORT_ORDER } from '~/interfaces/search';
|
|
|
@@ -22,6 +22,8 @@ import { configManager } from '../config-manager';
|
|
|
import type { UpdateOrInsertPagesOpts } from '../interfaces/search';
|
|
|
|
|
|
|
|
|
+import { aggregatePipelineToIndex } from './aggregate-to-index';
|
|
|
+import type { AggregatedPage, BulkWriteBody, BulkWriteCommand } from './bulk-write';
|
|
|
import ElasticsearchClient from './elasticsearch-client';
|
|
|
|
|
|
const logger = loggerFactory('growi:service:search-delegator:elasticsearch');
|
|
|
@@ -109,10 +111,6 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
|
|
|
return `${this.indexName}-alias`;
|
|
|
}
|
|
|
|
|
|
- shouldIndexed(page) {
|
|
|
- return page.revision != null;
|
|
|
- }
|
|
|
-
|
|
|
initClient() {
|
|
|
const { host, auth, indexName } = this.getConnectionInfo();
|
|
|
|
|
|
@@ -128,7 +126,7 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
|
|
|
this.indexName = indexName;
|
|
|
}
|
|
|
|
|
|
- getType() {
|
|
|
+ getType(): '_doc' | undefined {
|
|
|
return this.isElasticsearchV7 ? '_doc' : undefined;
|
|
|
}
|
|
|
|
|
|
@@ -358,20 +356,9 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
|
|
|
/**
|
|
|
* generate object that is related to page.grant*
|
|
|
*/
|
|
|
- generateDocContentsRelatedToRestriction(page) {
|
|
|
- let grantedUserIds = null;
|
|
|
- if (page.grantedUsers != null && page.grantedUsers.length > 0) {
|
|
|
- grantedUserIds = page.grantedUsers.map((user) => {
|
|
|
- const userId = (user._id == null) ? user : user._id;
|
|
|
- return userId.toString();
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- let grantedGroupIds = [];
|
|
|
- grantedGroupIds = page.grantedGroups.map((group) => {
|
|
|
- const groupId = (group.item._id == null) ? group.item : group.item._id;
|
|
|
- return groupId.toString();
|
|
|
- });
|
|
|
+ generateDocContentsRelatedToRestriction(page: AggregatedPage) {
|
|
|
+ const grantedUserIds = page.grantedUsers.map(user => getIdForRef(user));
|
|
|
+ const grantedGroupIds = page.grantedGroups.map(group => getIdForRef(group.item));
|
|
|
|
|
|
return {
|
|
|
grant: page.grant,
|
|
|
@@ -380,10 +367,7 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
|
|
|
};
|
|
|
}
|
|
|
|
|
|
- prepareBodyForCreate(body, page) {
|
|
|
- if (!Array.isArray(body)) {
|
|
|
- throw new Error('Body must be an array.');
|
|
|
- }
|
|
|
+ prepareBodyForCreate(page: AggregatedPage): [BulkWriteCommand, BulkWriteBody] {
|
|
|
|
|
|
const command = {
|
|
|
index: {
|
|
|
@@ -393,27 +377,22 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
|
|
|
},
|
|
|
};
|
|
|
|
|
|
- const bookmarkCount = page.bookmarkCount || 0;
|
|
|
- const seenUsersCount = page.seenUsers?.length || 0;
|
|
|
- let document = {
|
|
|
+ const document: BulkWriteBody = {
|
|
|
path: page.path,
|
|
|
body: page.revision.body,
|
|
|
- // username: page.creator?.username, // available Node.js v14 and above
|
|
|
- username: page.creator != null ? page.creator.username : null,
|
|
|
- comments: page.comments,
|
|
|
- comment_count: page.commentCount,
|
|
|
- bookmark_count: bookmarkCount,
|
|
|
- seenUsers_count: seenUsersCount,
|
|
|
- like_count: page.liker?.length || 0,
|
|
|
+ username: page.creator?.username,
|
|
|
+ comments: page.commentsCount > 0 ? page.comments : undefined,
|
|
|
+ comment_count: page.commentsCount,
|
|
|
+ bookmark_count: page.bookmarksCount,
|
|
|
+ like_count: page.likeCount,
|
|
|
+ seenUsers_count: page.seenUsersCount,
|
|
|
created_at: page.createdAt,
|
|
|
updated_at: page.updatedAt,
|
|
|
tag_names: page.tagNames,
|
|
|
+ ...this.generateDocContentsRelatedToRestriction(page),
|
|
|
};
|
|
|
|
|
|
- document = Object.assign(document, this.generateDocContentsRelatedToRestriction(page));
|
|
|
-
|
|
|
- body.push(command);
|
|
|
- body.push(document);
|
|
|
+ return [command, document];
|
|
|
}
|
|
|
|
|
|
prepareBodyForDelete(body, page) {
|
|
|
@@ -456,91 +435,29 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
|
|
|
async updateOrInsertPages(queryFactory, option: UpdateOrInsertPagesOpts = {}) {
|
|
|
const { shouldEmitProgress = false, invokeGarbageCollection = false } = option;
|
|
|
|
|
|
- const Page = mongoose.model('Page') as unknown as PageModel;
|
|
|
+ const Page = mongoose.model<IPage, PageModel>('Page');
|
|
|
const { PageQueryBuilder } = Page;
|
|
|
- const Bookmark = mongoose.model('Bookmark') as any; // TODO: typescriptize model
|
|
|
|
|
|
const socket = shouldEmitProgress ? this.socketIoService.getAdminSocket() : null;
|
|
|
|
|
|
// prepare functions invoked from custom streams
|
|
|
const prepareBodyForCreate = this.prepareBodyForCreate.bind(this);
|
|
|
- const shouldIndexed = this.shouldIndexed.bind(this);
|
|
|
const bulkWrite = this.client.bulk.bind(this.client);
|
|
|
|
|
|
- const findQuery = new PageQueryBuilder(queryFactory()).query;
|
|
|
- const countQuery = new PageQueryBuilder(queryFactory()).query;
|
|
|
+ const matchQuery = new PageQueryBuilder(queryFactory()).query;
|
|
|
|
|
|
+ const countQuery = new PageQueryBuilder(queryFactory()).query;
|
|
|
const totalCount = await countQuery.count();
|
|
|
|
|
|
- const readStream = findQuery
|
|
|
- // populate data which will be referenced by prepareBodyForCreate()
|
|
|
- .populate([
|
|
|
- { path: 'creator', model: 'User', select: 'username' },
|
|
|
- { path: 'revision', model: 'Revision', select: 'body' },
|
|
|
- ])
|
|
|
- .lean()
|
|
|
- .cursor();
|
|
|
-
|
|
|
- let skipped = 0;
|
|
|
- const thinOutStream = new Transform({
|
|
|
- objectMode: true,
|
|
|
- async transform(doc, encoding, callback) {
|
|
|
- if (shouldIndexed(doc)) {
|
|
|
- this.push(doc);
|
|
|
- }
|
|
|
- else {
|
|
|
- skipped++;
|
|
|
- }
|
|
|
- callback();
|
|
|
- },
|
|
|
- });
|
|
|
+ const maxBodyLengthToIndex = configManager.getConfig('crowi', 'app:elasticsearchMaxBodyLengthToIndex');
|
|
|
+
|
|
|
+ const readStream = Page.aggregate<AggregatedPage>(
|
|
|
+ aggregatePipelineToIndex(maxBodyLengthToIndex, matchQuery),
|
|
|
+ ).cursor();
|
|
|
|
|
|
const bulkSize: number = configManager.getConfig('crowi', 'app:elasticsearchReindexBulkSize');
|
|
|
const batchStream = createBatchStream(bulkSize);
|
|
|
|
|
|
- const appendBookmarkCountStream = new Transform({
|
|
|
- objectMode: true,
|
|
|
- async transform(chunk, encoding, callback) {
|
|
|
- const pageIds = chunk.map(doc => doc._id);
|
|
|
-
|
|
|
- const idToCountMap = await Bookmark.getPageIdToCountMap(pageIds);
|
|
|
- const idsHavingCount = Object.keys(idToCountMap);
|
|
|
-
|
|
|
- // append count
|
|
|
- chunk
|
|
|
- .filter(doc => idsHavingCount.includes(doc._id.toString()))
|
|
|
- .forEach((doc) => {
|
|
|
- // append count from idToCountMap
|
|
|
- doc.bookmarkCount = idToCountMap[doc._id.toString()];
|
|
|
- });
|
|
|
-
|
|
|
- this.push(chunk);
|
|
|
- callback();
|
|
|
- },
|
|
|
- });
|
|
|
-
|
|
|
-
|
|
|
- const appendCommentStream = new Transform({
|
|
|
- objectMode: true,
|
|
|
- async transform(chunk, encoding, callback) {
|
|
|
- const pageIds = chunk.map(doc => doc._id);
|
|
|
-
|
|
|
- const idToCommentMap = await Comment.getPageIdToCommentMap(pageIds);
|
|
|
- const idsHavingComment = Object.keys(idToCommentMap);
|
|
|
-
|
|
|
- // append comments
|
|
|
- chunk
|
|
|
- .filter(doc => idsHavingComment.includes(doc._id.toString()))
|
|
|
- .forEach((doc) => {
|
|
|
- // append comments from idToCommentMap
|
|
|
- doc.comments = idToCommentMap[doc._id.toString()];
|
|
|
- });
|
|
|
-
|
|
|
- this.push(chunk);
|
|
|
- callback();
|
|
|
- },
|
|
|
- });
|
|
|
-
|
|
|
const appendTagNamesStream = new Transform({
|
|
|
objectMode: true,
|
|
|
async transform(chunk, encoding, callback) {
|
|
|
@@ -552,7 +469,7 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
|
|
|
// append tagNames
|
|
|
chunk
|
|
|
.filter(doc => idsHavingTagNames.includes(doc._id.toString()))
|
|
|
- .forEach((doc) => {
|
|
|
+ .forEach((doc: AggregatedPage) => {
|
|
|
// append tagName from idToTagNamesMap
|
|
|
doc.tagNames = idToTagNamesMap[doc._id.toString()];
|
|
|
});
|
|
|
@@ -566,8 +483,10 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
|
|
|
const writeStream = new Writable({
|
|
|
objectMode: true,
|
|
|
async write(batch, encoding, callback) {
|
|
|
- const body = [];
|
|
|
- batch.forEach(doc => prepareBodyForCreate(body, doc));
|
|
|
+ const body: (BulkWriteCommand|BulkWriteBody)[] = [];
|
|
|
+ batch.forEach((doc: AggregatedPage) => {
|
|
|
+ body.push(...prepareBodyForCreate(doc));
|
|
|
+ });
|
|
|
|
|
|
try {
|
|
|
const bulkResponse = await bulkWrite({
|
|
|
@@ -580,7 +499,7 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
|
|
|
logger.info(`Adding pages progressing: (count=${count}, errors=${bulkResponse.errors}, took=${bulkResponse.took}ms)`);
|
|
|
|
|
|
if (shouldEmitProgress) {
|
|
|
- socket?.emit(SocketEventName.AddPageProgress, { totalCount, count, skipped });
|
|
|
+ socket?.emit(SocketEventName.AddPageProgress, { totalCount, count });
|
|
|
}
|
|
|
}
|
|
|
catch (err) {
|
|
|
@@ -601,20 +520,17 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
|
|
|
callback();
|
|
|
},
|
|
|
final(callback) {
|
|
|
- logger.info(`Adding pages has completed: (totalCount=${totalCount}, skipped=${skipped})`);
|
|
|
+ logger.info(`Adding pages has completed: (totalCount=${totalCount})`);
|
|
|
|
|
|
if (shouldEmitProgress) {
|
|
|
- socket?.emit(SocketEventName.FinishAddPage, { totalCount, count, skipped });
|
|
|
+ socket?.emit(SocketEventName.FinishAddPage, { totalCount, count });
|
|
|
}
|
|
|
callback();
|
|
|
},
|
|
|
});
|
|
|
|
|
|
readStream
|
|
|
- .pipe(thinOutStream)
|
|
|
.pipe(batchStream)
|
|
|
- .pipe(appendBookmarkCountStream)
|
|
|
- .pipe(appendCommentStream)
|
|
|
.pipe(appendTagNamesStream)
|
|
|
.pipe(writeStream);
|
|
|
|
|
|
@@ -977,30 +893,12 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
|
|
|
|
|
|
async syncPageUpdated(page, user) {
|
|
|
logger.debug('SearchClient.syncPageUpdated', page.path);
|
|
|
-
|
|
|
- // delete if page should not indexed
|
|
|
- if (!this.shouldIndexed(page)) {
|
|
|
- try {
|
|
|
- await this.deletePages([page]);
|
|
|
- }
|
|
|
- catch (err) {
|
|
|
- logger.error('deletePages:ES Error', err);
|
|
|
- }
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
return this.updateOrInsertPageById(page._id);
|
|
|
}
|
|
|
|
|
|
// remove pages whitch should nod Indexed
|
|
|
async syncPagesUpdated(pages, user) {
|
|
|
const shoudDeletePages: any[] = [];
|
|
|
- pages.forEach((page) => {
|
|
|
- logger.debug('SearchClient.syncPageUpdated', page.path);
|
|
|
- if (!this.shouldIndexed(page)) {
|
|
|
- shoudDeletePages.push(page);
|
|
|
- }
|
|
|
- });
|
|
|
|
|
|
// delete if page should not indexed
|
|
|
try {
|