|
|
@@ -11,7 +11,7 @@ const {
|
|
|
} = require('stream');
|
|
|
const streamToPromise = require('stream-to-promise');
|
|
|
|
|
|
-const BULK_REINDEX_SIZE = 10;
|
|
|
+const BULK_REINDEX_SIZE = 100;
|
|
|
|
|
|
function SearchClient(crowi, esUri) {
|
|
|
this.DEFAULT_OFFSET = 0;
|
|
|
@@ -380,11 +380,12 @@ SearchClient.prototype.addAllPages = async function() {
|
|
|
const totalCount = await Page.count(criteria);
|
|
|
|
|
|
const readStream = Page.find(criteria)
|
|
|
+ // populate data which will be referenced by prepareBodyForCreate()
|
|
|
.populate([
|
|
|
- { path: 'creator', model: 'User' },
|
|
|
- { path: 'revision', model: 'Revision' },
|
|
|
+ { path: 'creator', model: 'User', select: 'username' },
|
|
|
+ { path: 'revision', model: 'Revision', select: 'body' },
|
|
|
])
|
|
|
- .limit(100) // FIXME: remove debug code
|
|
|
+ .limit(500) // FIXME: remove debug code
|
|
|
.snapshot()
|
|
|
.lean()
|
|
|
.cursor();
|
|
|
@@ -392,9 +393,9 @@ SearchClient.prototype.addAllPages = async function() {
|
|
|
let skipped = 0;
|
|
|
const thinOutStream = new Transform({
|
|
|
objectMode: true,
|
|
|
- async transform(chunk, encoding, callback) {
|
|
|
- if (shouldIndexed(chunk)) {
|
|
|
- this.push(chunk);
|
|
|
+ async transform(doc, encoding, callback) {
|
|
|
+ if (shouldIndexed(doc)) {
|
|
|
+ this.push(doc);
|
|
|
}
|
|
|
else {
|
|
|
skipped++;
|
|
|
@@ -403,33 +404,13 @@ SearchClient.prototype.addAllPages = async function() {
|
|
|
},
|
|
|
});
|
|
|
|
|
|
- const addBookmarkCountStream = new Transform({
|
|
|
- objectMode: true,
|
|
|
- async transform(chunk, encoding, callback) {
|
|
|
- const bookmarkCount = await Bookmark.countByPageId(chunk._id);
|
|
|
- this.push({ ...chunk, bookmarkCount });
|
|
|
- callback();
|
|
|
- },
|
|
|
- });
|
|
|
-
|
|
|
- const addTagNamesStream = new Transform({
|
|
|
- objectMode: true,
|
|
|
- async transform(chunk, encoding, callback) {
|
|
|
- const tagRelations = await PageTagRelation.find({ relatedPage: chunk._id }).populate('relatedTag');
|
|
|
- const tagNames = tagRelations.map((relation) => { return relation.relatedTag.name });
|
|
|
- this.push({ ...chunk, tagNames });
|
|
|
- callback();
|
|
|
- },
|
|
|
- });
|
|
|
-
|
|
|
let batchBuffer = [];
|
|
|
const batchingStream = new Transform({
|
|
|
objectMode: true,
|
|
|
- transform(chunk, encoding, callback) {
|
|
|
- prepareBodyForCreate(batchBuffer, chunk);
|
|
|
+ transform(doc, encoding, callback) {
|
|
|
+ batchBuffer.push(doc);
|
|
|
|
|
|
- // send each `BULK_REINDEX_SIZE` docs. (body has 2 elements for each data)
|
|
|
- if (batchBuffer.length / 2 >= BULK_REINDEX_SIZE) {
|
|
|
+ if (batchBuffer.length >= BULK_REINDEX_SIZE) {
|
|
|
this.push(batchBuffer);
|
|
|
batchBuffer = [];
|
|
|
}
|
|
|
@@ -444,13 +425,47 @@ SearchClient.prototype.addAllPages = async function() {
|
|
|
},
|
|
|
});
|
|
|
|
|
|
+ const appendBookmarkCountStream = new Transform({
|
|
|
+ objectMode: true,
|
|
|
+ async transform(chunk, encoding, callback) {
|
|
|
+ const pageIds = chunk.map(doc => doc._id);
|
|
|
+
|
|
|
+ const idToCountMap = await Bookmark.getPageIdToCountMap(pageIds);
|
|
|
+ const idsHavingCount = Object.keys(idToCountMap);
|
|
|
+
|
|
|
+ // append count
|
|
|
+ chunk
|
|
|
+ .filter(doc => idsHavingCount.includes(doc._id.toString()))
|
|
|
+ .forEach((doc) => {
|
|
|
+ // append count from idToCountMap
|
|
|
+ doc.bookmarkCount = idToCountMap[doc._id.toString()];
|
|
|
+ });
|
|
|
+
|
|
|
+ this.push(chunk);
|
|
|
+ callback();
|
|
|
+ },
|
|
|
+ });
|
|
|
+
|
|
|
+ // const appendTagNamesStream = new Transform({
|
|
|
+ // objectMode: true,
|
|
|
+ // async transform(chunk, encoding, callback) {
|
|
|
+ // const tagRelations = await PageTagRelation.find({ relatedPage: chunk._id }).populate('relatedTag');
|
|
|
+ // const tagNames = tagRelations.map((relation) => { return relation.relatedTag.name });
|
|
|
+ // this.push({ ...chunk, tagNames });
|
|
|
+ // callback();
|
|
|
+ // },
|
|
|
+ // });
|
|
|
+
|
|
|
let count = 0;
|
|
|
const writeStream = new Writable({
|
|
|
objectMode: true,
|
|
|
async write(batch, encoding, callback) {
|
|
|
+ const body = [];
|
|
|
+ batch.forEach(doc => prepareBodyForCreate(body, doc));
|
|
|
+
|
|
|
try {
|
|
|
const res = await bulkWrite({
|
|
|
- body: batch,
|
|
|
+ body,
|
|
|
requestTimeout: Infinity,
|
|
|
});
|
|
|
|
|
|
@@ -474,9 +489,9 @@ SearchClient.prototype.addAllPages = async function() {
|
|
|
|
|
|
readStream
|
|
|
.pipe(thinOutStream)
|
|
|
- .pipe(addBookmarkCountStream)
|
|
|
- .pipe(addTagNamesStream)
|
|
|
.pipe(batchingStream)
|
|
|
+ .pipe(appendBookmarkCountStream)
|
|
|
+ // .pipe(appendTagNamesStream)
|
|
|
.pipe(writeStream);
|
|
|
|
|
|
return streamToPromise(readStream);
|