|
|
@@ -1,6 +1,7 @@
|
|
|
import type EventEmitter from 'events';
|
|
|
import pathlib from 'path';
|
|
|
import { Readable, Writable } from 'stream';
|
|
|
+import { pipeline } from 'stream/promises';
|
|
|
|
|
|
import {
|
|
|
PageStatus, YDocStatus, getIdForRef,
|
|
|
@@ -18,7 +19,6 @@ import {
|
|
|
import escapeStringRegexp from 'escape-string-regexp';
|
|
|
import type { Cursor, HydratedDocument } from 'mongoose';
|
|
|
import mongoose from 'mongoose';
|
|
|
-import streamToPromise from 'stream-to-promise';
|
|
|
|
|
|
import { Comment } from '~/features/comment/server';
|
|
|
import type { ExternalUserGroupDocument } from '~/features/external-user-group/server/models/external-user-group';
|
|
|
@@ -1006,6 +1006,8 @@ class PageService implements IPageService {
|
|
|
const factory = new PageCursorsForDescendantsFactory(user, targetPage, true);
|
|
|
const readStream = await factory.generateReadable();
|
|
|
|
|
|
+ const batchStream = createBatchStream(BULK_REINDEX_SIZE);
|
|
|
+
|
|
|
const newPagePathPrefix = newPagePath;
|
|
|
const pathRegExp = new RegExp(`^${escapeStringRegexp(targetPage.path)}`, 'i');
|
|
|
|
|
|
@@ -1043,16 +1045,13 @@ class PageService implements IPageService {
|
|
|
},
|
|
|
});
|
|
|
|
|
|
- readStream
|
|
|
- .pipe(createBatchStream(BULK_REINDEX_SIZE))
|
|
|
- .pipe(writeStream);
|
|
|
-
|
|
|
- await streamToPromise(writeStream);
|
|
|
+ await pipeline(readStream, batchStream, writeStream);
|
|
|
}
|
|
|
|
|
|
private async renameDescendantsWithStreamV4(targetPage, newPagePath, user, options = {}) {
|
|
|
|
|
|
const readStream = await this.generateReadStreamToOperateOnlyDescendants(targetPage.path, user);
|
|
|
+ const batchStream = createBatchStream(BULK_REINDEX_SIZE);
|
|
|
|
|
|
const newPagePathPrefix = newPagePath;
|
|
|
const pathRegExp = new RegExp(`^${escapeStringRegexp(targetPage.path)}`, 'i');
|
|
|
@@ -1083,11 +1082,7 @@ class PageService implements IPageService {
|
|
|
},
|
|
|
});
|
|
|
|
|
|
- readStream
|
|
|
- .pipe(createBatchStream(BULK_REINDEX_SIZE))
|
|
|
- .pipe(writeStream);
|
|
|
-
|
|
|
- await streamToPromise(writeStream);
|
|
|
+ await pipeline(readStream, batchStream, writeStream);
|
|
|
}
|
|
|
|
|
|
/*
|
|
|
@@ -1477,6 +1472,7 @@ class PageService implements IPageService {
|
|
|
|
|
|
const iterableFactory = new PageCursorsForDescendantsFactory(user, page, true);
|
|
|
const readStream = await iterableFactory.generateReadable();
|
|
|
+ const batchStream = createBatchStream(BULK_REINDEX_SIZE);
|
|
|
|
|
|
const newPagePathPrefix = newPagePath;
|
|
|
const pathRegExp = new RegExp(`^${escapeStringRegexp(page.path)}`, 'i');
|
|
|
@@ -1509,17 +1505,14 @@ class PageService implements IPageService {
|
|
|
},
|
|
|
});
|
|
|
|
|
|
- readStream
|
|
|
- .pipe(createBatchStream(BULK_REINDEX_SIZE))
|
|
|
- .pipe(writeStream);
|
|
|
-
|
|
|
- await streamToPromise(writeStream);
|
|
|
+ await pipeline(readStream, batchStream, writeStream);
|
|
|
|
|
|
return nNonEmptyDuplicatedPages;
|
|
|
}
|
|
|
|
|
|
private async duplicateDescendantsWithStreamV4(page, newPagePath, user, onlyDuplicateUserRelatedResources: boolean) {
|
|
|
const readStream = await this.generateReadStreamToOperateOnlyDescendants(page.path, user);
|
|
|
+ const batchStream = createBatchStream(BULK_REINDEX_SIZE);
|
|
|
|
|
|
const newPagePathPrefix = newPagePath;
|
|
|
const pathRegExp = new RegExp(`^${escapeStringRegexp(page.path)}`, 'i');
|
|
|
@@ -1550,11 +1543,7 @@ class PageService implements IPageService {
|
|
|
},
|
|
|
});
|
|
|
|
|
|
- readStream
|
|
|
- .pipe(createBatchStream(BULK_REINDEX_SIZE))
|
|
|
- .pipe(writeStream);
|
|
|
-
|
|
|
- await streamToPromise(writeStream);
|
|
|
+ await pipeline(readStream, batchStream, writeStream);
|
|
|
|
|
|
return count;
|
|
|
}
|
|
|
@@ -1849,6 +1838,7 @@ class PageService implements IPageService {
|
|
|
readStream = await factory.generateReadable();
|
|
|
}
|
|
|
|
|
|
+ const batchStream = createBatchStream(BULK_REINDEX_SIZE);
|
|
|
|
|
|
const deleteDescendants = this.deleteDescendants.bind(this);
|
|
|
let count = 0;
|
|
|
@@ -1881,11 +1871,7 @@ class PageService implements IPageService {
|
|
|
},
|
|
|
});
|
|
|
|
|
|
- readStream
|
|
|
- .pipe(createBatchStream(BULK_REINDEX_SIZE))
|
|
|
- .pipe(writeStream);
|
|
|
-
|
|
|
- await streamToPromise(writeStream);
|
|
|
+ await pipeline(readStream, batchStream, writeStream);
|
|
|
|
|
|
return nDeletedNonEmptyPages;
|
|
|
}
|
|
|
@@ -2117,6 +2103,8 @@ class PageService implements IPageService {
|
|
|
readStream = await factory.generateReadable();
|
|
|
}
|
|
|
|
|
|
+ const batchStream = createBatchStream(BULK_REINDEX_SIZE);
|
|
|
+
|
|
|
let count = 0;
|
|
|
let nDeletedNonEmptyPages = 0; // used for updating descendantCount
|
|
|
|
|
|
@@ -2148,11 +2136,7 @@ class PageService implements IPageService {
|
|
|
},
|
|
|
});
|
|
|
|
|
|
- readStream
|
|
|
- .pipe(createBatchStream(BULK_REINDEX_SIZE))
|
|
|
- .pipe(writeStream);
|
|
|
-
|
|
|
- await streamToPromise(writeStream);
|
|
|
+ await pipeline(readStream, batchStream, writeStream);
|
|
|
|
|
|
return nDeletedNonEmptyPages;
|
|
|
}
|
|
|
@@ -2428,7 +2412,7 @@ class PageService implements IPageService {
|
|
|
);
|
|
|
|
|
|
const childPagesReadableStream = builder.query.cursor({ batchSize: BULK_REINDEX_SIZE });
|
|
|
-
|
|
|
+ const batchStream = createBatchStream(BULK_REINDEX_SIZE);
|
|
|
const childPagesWritable = new Writable({
|
|
|
objectMode: true,
|
|
|
write: async(batch, encoding, callback) => {
|
|
|
@@ -2437,10 +2421,8 @@ class PageService implements IPageService {
|
|
|
},
|
|
|
});
|
|
|
|
|
|
- childPagesReadableStream
|
|
|
- .pipe(createBatchStream(BULK_REINDEX_SIZE))
|
|
|
- .pipe(childPagesWritable);
|
|
|
- await streamToPromise(childPagesWritable);
|
|
|
+ await pipeline(childPagesReadableStream, batchStream, childPagesWritable);
|
|
|
+
|
|
|
}
|
|
|
|
|
|
async updateChildPagesGrant(
|
|
|
@@ -2477,6 +2459,7 @@ class PageService implements IPageService {
|
|
|
}
|
|
|
|
|
|
const readStream = await this.generateReadStreamToOperateOnlyDescendants(targetPage.path, user);
|
|
|
+ const batchStream = createBatchStream(BULK_REINDEX_SIZE);
|
|
|
|
|
|
const revertDeletedDescendants = this.revertDeletedDescendants.bind(this);
|
|
|
let count = 0;
|
|
|
@@ -2505,17 +2488,14 @@ class PageService implements IPageService {
|
|
|
},
|
|
|
});
|
|
|
|
|
|
- readStream
|
|
|
- .pipe(createBatchStream(BULK_REINDEX_SIZE))
|
|
|
- .pipe(writeStream);
|
|
|
-
|
|
|
- await streamToPromise(writeStream);
|
|
|
+ await pipeline(readStream, batchStream, writeStream);
|
|
|
|
|
|
return count;
|
|
|
}
|
|
|
|
|
|
private async revertDeletedDescendantsWithStreamV4(targetPage, user, options = {}) {
|
|
|
const readStream = await this.generateReadStreamToOperateOnlyDescendants(targetPage.path, user);
|
|
|
+ const batchStream = createBatchStream(BULK_REINDEX_SIZE);
|
|
|
|
|
|
const revertDeletedDescendants = this.revertDeletedDescendants.bind(this);
|
|
|
let count = 0;
|
|
|
@@ -2540,11 +2520,7 @@ class PageService implements IPageService {
|
|
|
},
|
|
|
});
|
|
|
|
|
|
- readStream
|
|
|
- .pipe(createBatchStream(BULK_REINDEX_SIZE))
|
|
|
- .pipe(writeStream);
|
|
|
-
|
|
|
- await streamToPromise(readStream);
|
|
|
+ await pipeline(readStream, batchStream, writeStream);
|
|
|
|
|
|
return count;
|
|
|
}
|
|
|
@@ -3390,11 +3366,7 @@ class PageService implements IPageService {
|
|
|
},
|
|
|
});
|
|
|
|
|
|
- pagesStream
|
|
|
- .pipe(batchStream)
|
|
|
- .pipe(migratePagesStream);
|
|
|
-
|
|
|
- await streamToPromise(migratePagesStream);
|
|
|
+ await pipeline(pagesStream, batchStream, migratePagesStream);
|
|
|
|
|
|
if (await Page.exists(matchFilter) && shouldContinue) {
|
|
|
return this._normalizeParentRecursively(
|
|
|
@@ -3495,6 +3467,7 @@ class PageService implements IPageService {
|
|
|
*/
|
|
|
async recountAndUpdateDescendantCountOfPages(pageCursor: Cursor<any>, batchSize:number): Promise<void> {
|
|
|
const Page = this.crowi.model('Page');
|
|
|
+ const batchStream = createBatchStream(batchSize);
|
|
|
const recountWriteStream = new Writable({
|
|
|
objectMode: true,
|
|
|
async write(pageDocuments, encoding, callback) {
|
|
|
@@ -3508,11 +3481,8 @@ class PageService implements IPageService {
|
|
|
callback();
|
|
|
},
|
|
|
});
|
|
|
- pageCursor
|
|
|
- .pipe(createBatchStream(batchSize))
|
|
|
- .pipe(recountWriteStream);
|
|
|
|
|
|
- await streamToPromise(recountWriteStream);
|
|
|
+ await pipeline(pageCursor, batchStream, recountWriteStream);
|
|
|
}
|
|
|
|
|
|
// update descendantCount of all pages that are ancestors of a provided pageId by count
|