|
|
@@ -1,11 +1,11 @@
|
|
|
import { Writable, Transform } from 'stream';
|
|
|
import { URL } from 'url';
|
|
|
|
|
|
+import type { IPage } from '@growi/core';
|
|
|
import gc from 'expose-gc/function';
|
|
|
import mongoose from 'mongoose';
|
|
|
import streamToPromise from 'stream-to-promise';
|
|
|
|
|
|
-import { Comment } from '~/features/comment/server';
|
|
|
import { SearchDelegatorName } from '~/interfaces/named-query';
|
|
|
import type { ISearchResult, ISearchResultData } from '~/interfaces/search';
|
|
|
import { SORT_AXIS, SORT_ORDER } from '~/interfaces/search';
|
|
|
@@ -109,10 +109,6 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
|
|
|
return `${this.indexName}-alias`;
|
|
|
}
|
|
|
|
|
|
- shouldIndexed(page) {
|
|
|
- return page.revision != null;
|
|
|
- }
|
|
|
-
|
|
|
initClient() {
|
|
|
const { host, auth, indexName } = this.getConnectionInfo();
|
|
|
|
|
|
@@ -456,45 +452,112 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
|
|
|
async updateOrInsertPages(queryFactory, option: UpdateOrInsertPagesOpts = {}) {
|
|
|
const { shouldEmitProgress = false, invokeGarbageCollection = false } = option;
|
|
|
|
|
|
- const Page = mongoose.model('Page') as unknown as PageModel;
|
|
|
+ const Page = mongoose.model<IPage, PageModel>('Page');
|
|
|
const { PageQueryBuilder } = Page;
|
|
|
+
|
|
|
const Bookmark = mongoose.model('Bookmark') as any; // TODO: typescriptize model
|
|
|
|
|
|
const socket = shouldEmitProgress ? this.socketIoService.getAdminSocket() : null;
|
|
|
|
|
|
// prepare functions invoked from custom streams
|
|
|
const prepareBodyForCreate = this.prepareBodyForCreate.bind(this);
|
|
|
- const shouldIndexed = this.shouldIndexed.bind(this);
|
|
|
const bulkWrite = this.client.bulk.bind(this.client);
|
|
|
|
|
|
- const findQuery = new PageQueryBuilder(queryFactory()).query;
|
|
|
- const countQuery = new PageQueryBuilder(queryFactory()).query;
|
|
|
+ const matchQuery = new PageQueryBuilder(queryFactory()).query;
|
|
|
|
|
|
+ const countQuery = new PageQueryBuilder(queryFactory()).query;
|
|
|
const totalCount = await countQuery.count();
|
|
|
|
|
|
- const readStream = findQuery
|
|
|
- // populate data which will be referenced by prepareBodyForCreate()
|
|
|
- .populate([
|
|
|
- { path: 'creator', model: 'User', select: 'username' },
|
|
|
- { path: 'revision', model: 'Revision', select: 'body' },
|
|
|
+ const readStream = Page
|
|
|
+ .aggregate([
|
|
|
+ // filter targets
|
|
|
+ { $match: matchQuery.getQuery() },
|
|
|
+
|
|
|
+ // join Revision
|
|
|
+ {
|
|
|
+ $lookup: {
|
|
|
+ from: 'revisions',
|
|
|
+ localField: 'revision',
|
|
|
+ foreignField: '_id',
|
|
|
+ as: 'revision',
|
|
|
+ },
|
|
|
+ },
|
|
|
+ // unwind and filter pages that does not have revision
|
|
|
+ {
|
|
|
+ $unwind: {
|
|
|
+ path: '$revision',
|
|
|
+ },
|
|
|
+ },
|
|
|
+ {
|
|
|
+ $addFields: {
|
|
|
+ bodyLength: { $strLenCP: '$revision.body' },
|
|
|
+ },
|
|
|
+ },
|
|
|
+
|
|
|
+ // join User
|
|
|
+ {
|
|
|
+ $lookup: {
|
|
|
+ from: 'users',
|
|
|
+ localField: 'creator',
|
|
|
+ foreignField: '_id',
|
|
|
+ as: 'creator',
|
|
|
+ },
|
|
|
+ },
|
|
|
+ {
|
|
|
+ $unwind: {
|
|
|
+ path: '$creator',
|
|
|
+ preserveNullAndEmptyArrays: true,
|
|
|
+ },
|
|
|
+ },
|
|
|
+
|
|
|
+ // join Comment
|
|
|
+ {
|
|
|
+ $lookup: {
|
|
|
+ from: 'comments',
|
|
|
+ localField: '_id',
|
|
|
+ foreignField: 'page',
|
|
|
+ pipeline: [
|
|
|
+ {
|
|
|
+ $addFields: {
|
|
|
+ commentLength: { $strLenCP: '$comment' },
|
|
|
+ },
|
|
|
+ },
|
|
|
+ {
|
|
|
+ $match: {
|
|
|
+ commentLength: { $lte: 500 },
|
|
|
+ },
|
|
|
+ },
|
|
|
+ ],
|
|
|
+ as: 'comments',
|
|
|
+ },
|
|
|
+ },
|
|
|
+
|
|
|
+ // project
|
|
|
+ {
|
|
|
+ $project: {
|
|
|
+ path: 1,
|
|
|
+ createdAt: 1,
|
|
|
+ updatedAt: 1,
|
|
|
+ liker: 1,
|
|
|
+ seenUsers: 1,
|
|
|
+ grant: 1,
|
|
|
+ grantedUsers: 1,
|
|
|
+ grantedGroups: 1,
|
|
|
+ 'revision.body': {
|
|
|
+ $cond: {
|
|
|
+ if: { $lte: ['$bodyLength', 100000] },
|
|
|
+ then: '$revision.body',
|
|
|
+ else: '',
|
|
|
+ },
|
|
|
+ },
|
|
|
+ 'comments.comment': 1,
|
|
|
+ 'creator.username': 1,
|
|
|
+ 'creator.email': 1,
|
|
|
+ },
|
|
|
+ },
|
|
|
])
|
|
|
- .lean()
|
|
|
.cursor();
|
|
|
|
|
|
- let skipped = 0;
|
|
|
- const thinOutStream = new Transform({
|
|
|
- objectMode: true,
|
|
|
- async transform(doc, encoding, callback) {
|
|
|
- if (shouldIndexed(doc)) {
|
|
|
- this.push(doc);
|
|
|
- }
|
|
|
- else {
|
|
|
- skipped++;
|
|
|
- }
|
|
|
- callback();
|
|
|
- },
|
|
|
- });
|
|
|
-
|
|
|
const bulkSize: number = configManager.getConfig('crowi', 'app:elasticsearchReindexBulkSize');
|
|
|
const batchStream = createBatchStream(bulkSize);
|
|
|
|
|
|
@@ -519,28 +582,6 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
|
|
|
},
|
|
|
});
|
|
|
|
|
|
-
|
|
|
- const appendCommentStream = new Transform({
|
|
|
- objectMode: true,
|
|
|
- async transform(chunk, encoding, callback) {
|
|
|
- const pageIds = chunk.map(doc => doc._id);
|
|
|
-
|
|
|
- const idToCommentMap = await Comment.getPageIdToCommentMap(pageIds);
|
|
|
- const idsHavingComment = Object.keys(idToCommentMap);
|
|
|
-
|
|
|
- // append comments
|
|
|
- chunk
|
|
|
- .filter(doc => idsHavingComment.includes(doc._id.toString()))
|
|
|
- .forEach((doc) => {
|
|
|
- // append comments from idToCommentMap
|
|
|
- doc.comments = idToCommentMap[doc._id.toString()];
|
|
|
- });
|
|
|
-
|
|
|
- this.push(chunk);
|
|
|
- callback();
|
|
|
- },
|
|
|
- });
|
|
|
-
|
|
|
const appendTagNamesStream = new Transform({
|
|
|
objectMode: true,
|
|
|
async transform(chunk, encoding, callback) {
|
|
|
@@ -580,7 +621,7 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
|
|
|
logger.info(`Adding pages progressing: (count=${count}, errors=${bulkResponse.errors}, took=${bulkResponse.took}ms)`);
|
|
|
|
|
|
if (shouldEmitProgress) {
|
|
|
- socket?.emit(SocketEventName.AddPageProgress, { totalCount, count, skipped });
|
|
|
+ socket?.emit(SocketEventName.AddPageProgress, { totalCount, count });
|
|
|
}
|
|
|
}
|
|
|
catch (err) {
|
|
|
@@ -601,20 +642,18 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
|
|
|
callback();
|
|
|
},
|
|
|
final(callback) {
|
|
|
- logger.info(`Adding pages has completed: (totalCount=${totalCount}, skipped=${skipped})`);
|
|
|
+ logger.info(`Adding pages has completed: (totalCount=${totalCount})`);
|
|
|
|
|
|
if (shouldEmitProgress) {
|
|
|
- socket?.emit(SocketEventName.FinishAddPage, { totalCount, count, skipped });
|
|
|
+ socket?.emit(SocketEventName.FinishAddPage, { totalCount, count });
|
|
|
}
|
|
|
callback();
|
|
|
},
|
|
|
});
|
|
|
|
|
|
readStream
|
|
|
- .pipe(thinOutStream)
|
|
|
.pipe(batchStream)
|
|
|
.pipe(appendBookmarkCountStream)
|
|
|
- .pipe(appendCommentStream)
|
|
|
.pipe(appendTagNamesStream)
|
|
|
.pipe(writeStream);
|
|
|
|
|
|
@@ -977,30 +1016,12 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
|
|
|
|
|
|
async syncPageUpdated(page, user) {
|
|
|
logger.debug('SearchClient.syncPageUpdated', page.path);
|
|
|
-
|
|
|
- // delete if page should not indexed
|
|
|
- if (!this.shouldIndexed(page)) {
|
|
|
- try {
|
|
|
- await this.deletePages([page]);
|
|
|
- }
|
|
|
- catch (err) {
|
|
|
- logger.error('deletePages:ES Error', err);
|
|
|
- }
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
return this.updateOrInsertPageById(page._id);
|
|
|
}
|
|
|
|
|
|
// remove pages whitch should nod Indexed
|
|
|
async syncPagesUpdated(pages, user) {
|
|
|
const shoudDeletePages: any[] = [];
|
|
|
- pages.forEach((page) => {
|
|
|
- logger.debug('SearchClient.syncPageUpdated', page.path);
|
|
|
- if (!this.shouldIndexed(page)) {
|
|
|
- shoudDeletePages.push(page);
|
|
|
- }
|
|
|
- });
|
|
|
|
|
|
// delete if page should not indexed
|
|
|
try {
|