|
|
@@ -11,6 +11,8 @@ const {
|
|
|
} = require('stream');
|
|
|
const streamToPromise = require('stream-to-promise');
|
|
|
|
|
|
+const { createBatchStream } = require('./batch-stream');
|
|
|
+
|
|
|
const BULK_REINDEX_SIZE = 100;
|
|
|
|
|
|
function SearchClient(crowi, esUri) {
|
|
|
@@ -301,6 +303,7 @@ SearchClient.prototype.updateOrInsertPages = async function(queryFactory, isEmit
|
|
|
|
|
|
const searchEvent = this.searchEvent;
|
|
|
|
|
|
+ // prepare functions invoked from custom streams
|
|
|
const prepareBodyForCreate = this.prepareBodyForCreate.bind(this);
|
|
|
const shouldIndexed = this.shouldIndexed.bind(this);
|
|
|
const bulkWrite = this.client.bulk.bind(this.client);
|
|
|
@@ -334,26 +337,7 @@ SearchClient.prototype.updateOrInsertPages = async function(queryFactory, isEmit
|
|
|
},
|
|
|
});
|
|
|
|
|
|
- let batchBuffer = [];
|
|
|
- const batchingStream = new Transform({
|
|
|
- objectMode: true,
|
|
|
- transform(doc, encoding, callback) {
|
|
|
- batchBuffer.push(doc);
|
|
|
-
|
|
|
- if (batchBuffer.length >= BULK_REINDEX_SIZE) {
|
|
|
- this.push(batchBuffer);
|
|
|
- batchBuffer = [];
|
|
|
- }
|
|
|
-
|
|
|
- callback();
|
|
|
- },
|
|
|
- final(callback) {
|
|
|
- if (batchBuffer.length > 0) {
|
|
|
- this.push(batchBuffer);
|
|
|
- }
|
|
|
- callback();
|
|
|
- },
|
|
|
- });
|
|
|
+ const batchStream = createBatchStream(BULK_REINDEX_SIZE);
|
|
|
|
|
|
const appendBookmarkCountStream = new Transform({
|
|
|
objectMode: true,
|
|
|
@@ -436,7 +420,7 @@ SearchClient.prototype.updateOrInsertPages = async function(queryFactory, isEmit
|
|
|
|
|
|
readStream
|
|
|
.pipe(thinOutStream)
|
|
|
- .pipe(batchingStream)
|
|
|
+ .pipe(batchStream)
|
|
|
.pipe(appendBookmarkCountStream)
|
|
|
.pipe(appendTagNamesStream)
|
|
|
.pipe(writeStream);
|