Просмотр исходного кода

refactor addAllPages with stream

Yuki Takei 6 лет назад
Родитель
Сommit
89e0dc9714
2 измененных файлов с 110 добавлено и 74 удалено
  1. 0 19
      src/server/models/page.js
  2. 110 55
      src/server/util/search.js

+ 0 - 19
src/server/models/page.js

@@ -923,21 +923,6 @@ module.exports = function(crowi) {
     return { templateBody, templateTags };
   };
 
-  /**
-   * Bulk get (for internal only)
-   */
-  pageSchema.statics.getStreamOfFindAll = function(options) {
-    const criteria = { redirectTo: null };
-
-    return this.find(criteria)
-      .populate([
-        { path: 'creator', model: 'User' },
-        { path: 'revision', model: 'Revision' },
-      ])
-      .lean()
-      .cursor();
-  };
-
   async function pushRevision(pageData, newRevision, user) {
     await newRevision.save();
     debug('Successfully saved new revision', newRevision);
@@ -1384,10 +1369,6 @@ module.exports = function(crowi) {
     return addSlashOfEnd(path);
   };
 
-  pageSchema.statics.allPageCount = function() {
-    return this.count({ redirectTo: null });
-  };
-
   pageSchema.statics.GRANT_PUBLIC = GRANT_PUBLIC;
   pageSchema.statics.GRANT_RESTRICTED = GRANT_RESTRICTED;
   pageSchema.statics.GRANT_SPECIFIED = GRANT_SPECIFIED;

+ 110 - 55
src/server/util/search.js

@@ -6,7 +6,12 @@ const elasticsearch = require('elasticsearch');
 const debug = require('debug')('growi:lib:search');
 const logger = require('@alias/logger')('growi:lib:search');
 
-const BULK_REINDEX_SIZE = 100;
+const {
+  Writable, Transform,
+} = require('stream');
+const streamToPromise = require('stream-to-promise');
+
+const BULK_REINDEX_SIZE = 10;
 
 function SearchClient(crowi, esUri) {
   this.DEFAULT_OFFSET = 0;
@@ -100,7 +105,7 @@ SearchClient.prototype.registerUpdateEvent = function() {
 };
 
 SearchClient.prototype.shouldIndexed = function(page) {
-  return (page.redirectTo == null);
+  return page.creator != null && page.revision != null && page.redirectTo == null;
 };
 
 // BONSAI_URL is following format:
@@ -362,69 +367,119 @@ SearchClient.prototype.deletePages = function(pages) {
 };
 
 SearchClient.prototype.addAllPages = async function() {
-  const self = this;
   const Page = this.crowi.model('Page');
-  const allPageCount = await Page.allPageCount();
   const Bookmark = this.crowi.model('Bookmark');
   const PageTagRelation = this.crowi.model('PageTagRelation');
-  const cursor = Page.getStreamOfFindAll();
-  let body = [];
-  let sent = 0;
+
+  const searchEvent = this.searchEvent;
+  const prepareBodyForCreate = this.prepareBodyForCreate.bind(this);
+  const shouldIndexed = this.shouldIndexed.bind(this);
+  const bulkWrite = this.client.bulk.bind(this.client);
+
+  const criteria = { redirectTo: null };
+  const totalCount = await Page.count(criteria);
+
+  const readStream = Page.find(criteria)
+    .populate([
+      { path: 'creator', model: 'User' },
+      { path: 'revision', model: 'Revision' },
+    ])
+    .limit(100) // FIXME: remove debug code
+    .snapshot()
+    .lean()
+    .cursor();
+
   let skipped = 0;
-  let total = 0;
+  const thinOutStream = new Transform({
+    objectMode: true,
+    async transform(chunk, encoding, callback) {
+      if (shouldIndexed(chunk)) {
+        this.push(chunk);
+      }
+      else {
+        skipped++;
+      }
+      callback();
+    },
+  });
 
-  return new Promise((resolve, reject) => {
-    const bulkSend = (body) => {
-      self.client
-        .bulk({
-          body,
+  const addBookmarkCountStream = new Transform({
+    objectMode: true,
+    async transform(chunk, encoding, callback) {
+      const bookmarkCount = await Bookmark.countByPageId(chunk._id);
+      this.push({ ...chunk, bookmarkCount });
+      callback();
+    },
+  });
+
+  const addTagNamesStream = new Transform({
+    objectMode: true,
+    async transform(chunk, encoding, callback) {
+      const tagRelations = await PageTagRelation.find({ relatedPage: chunk._id }).populate('relatedTag');
+      const tagNames = tagRelations.map((relation) => { return relation.relatedTag.name });
+      this.push({ ...chunk, tagNames });
+      callback();
+    },
+  });
+
+  let batchBuffer = [];
+  const batchingStream = new Transform({
+    objectMode: true,
+    transform(chunk, encoding, callback) {
+      prepareBodyForCreate(batchBuffer, chunk);
+
+      // send each `BULK_REINDEX_SIZE` docs. (body has 2 elements for each data)
+      if (batchBuffer.length >= BULK_REINDEX_SIZE * 2) {
+        this.push(batchBuffer);
+        batchBuffer = [];
+      }
+
+      callback();
+    },
+    final(callback) {
+      if (batchBuffer.length > 0) {
+        this.push(batchBuffer);
+      }
+      callback();
+    },
+  });
+
+  let count = 0;
+  const writeStream = new Writable({
+    objectMode: true,
+    async write(batch, encoding, callback) {
+      try {
+        const res = await bulkWrite({
+          body: batch,
           requestTimeout: Infinity,
-        })
-        .then((res) => {
-          logger.info('addAllPages add anyway (items, errors, took): ', (res.items || []).length, res.errors, res.took, 'ms');
-        })
-        .catch((err) => {
-          logger.error('addAllPages error on add anyway: ', err);
         });
-    };
 
-    cursor
-      .eachAsync(async(doc) => {
-        if (!doc.creator || !doc.revision || !self.shouldIndexed(doc)) {
-          // debug('Skipped', doc.path);
-          skipped++;
-          return;
-        }
-        total++;
-
-        const bookmarkCount = await Bookmark.countByPageId(doc._id);
-        const tagRelations = await PageTagRelation.find({ relatedPage: doc._id }).populate('relatedTag');
-        const page = { ...doc, bookmarkCount, tagNames: tagRelations.map((relation) => { return relation.relatedTag.name }) };
-        self.prepareBodyForCreate(body, page);
-
-        // send each `BULK_REINDEX_SIZE` docs. (body has 2 elements for each data)
-        if (body.length >= BULK_REINDEX_SIZE * 2) {
-          sent++;
-          logger.debug('Sending request (seq, total, skipped)', sent, total, skipped);
-          bulkSend(body);
-          this.searchEvent.emit('addPageProgress', allPageCount, total, skipped);
-
-          body = [];
-        }
-      })
-      .then(() => {
-        // send all remaining data on body[]
-        logger.debug('Sending last body of bulk operation:', body.length);
-        bulkSend(body);
-        this.searchEvent.emit('finishAddPage', allPageCount, total, skipped);
+        count += (res.items || []).length;
 
-        resolve();
-      })
-      .catch((e) => {
-        logger.error('Error wile iterating cursor.eachAsync()', e);
-        reject(e);
-      });
+        logger.info('addAllPages add anyway (items, errors, took): ', `(${count}, ${res.errors}, ${res.took}ms)`);
+        searchEvent.emit('addPageProgress', totalCount, count, skipped);
+      }
+      catch (err) {
+        logger.error('addAllPages error on add anyway: ', err);
+      }
+
+      callback();
+    },
+    final(callback) {
+      searchEvent.emit('finishAddPage', totalCount, count, skipped);
+      callback();
+    },
   });
+
+  readStream
+    .pipe(thinOutStream)
+    .pipe(addBookmarkCountStream)
+    .pipe(addTagNamesStream)
+    .pipe(batchingStream)
+    .pipe(writeStream);
+
+  return streamToPromise(readStream);
+
 };
 
 /**