Просмотр исходного кода

use async pipeline and remove stream-to-promise

reiji-h 1 год назад
Родитель
Сommit
b78e1999d0

+ 0 - 4
apps/app/src/migrations/20211227060705-revision-path-to-page-id-schema-migration--fixed-7549.js

@@ -59,8 +59,6 @@ module.exports = {
 
     await pipeline(pagesStream, batchStrem, migratePagesStream);
 
-    await streamToPromise(migratePagesStream);
-
     logger.info('Migration has successfully applied');
   },
 
@@ -108,8 +106,6 @@ module.exports = {
 
     await pipeline(pagesStream, batchStrem, migratePagesStream);
 
-    await streamToPromise(migratePagesStream);
-
     logger.info('Migration down has successfully applied');
   },
 };

+ 2 - 5
apps/app/src/server/service/export.js

@@ -199,8 +199,6 @@ class ExportService {
 
     await pipeline(readStream, logStream, transformStream, writeStream);
 
-    await streamToPromise(writeStream);
-
     return writeStream.path;
   }
 
@@ -353,13 +351,12 @@ class ExportService {
     const output = fs.createWriteStream(zipFile);
 
     // pipe archive data to the file
-    await pipeline(archive, output);
+    const stream = pipeline(archive, output);
 
     // finalize the archive (ie we are done appending files but streams have to finish yet)
     // 'close', 'end' or 'finish' may be fired right after calling this method so register to them beforehand
     archive.finalize();
-
-    await streamToPromise(archive);
+    await stream;
 
     logger.info(`zipped GROWI data into ${zipFile} (${archive.pointer()} bytes)`);
 

+ 4 - 6
apps/app/src/server/service/file-uploader/local.ts

@@ -1,5 +1,6 @@
 import type { ReadStream } from 'fs';
-import { pipeline, Readable } from 'stream';
+import { Readable } from 'stream';
+import { pipeline } from 'stream/promises';
 
 import type { Response } from 'express';
 
@@ -24,7 +25,6 @@ const fsPromises = require('fs/promises');
 const path = require('path');
 
 const mkdir = require('mkdirp');
-const streamToPromise = require('stream-to-promise');
 const urljoin = require('url-join');
 
 
@@ -165,8 +165,7 @@ module.exports = function(crowi) {
 
     const writeStream = fs.createWriteStream(filePath);
 
-    const stream = pipeline(fileStream, writeStream);
-    return streamToPromise(stream);
+    return pipeline(fileStream, writeStream);
   };
 
   lib.saveFile = async function({ filePath, contentType, data }) {
@@ -180,8 +179,7 @@ module.exports = function(crowi) {
     fileStream.push(data);
     fileStream.push(null); // EOF
     const writeStream = fs.createWriteStream(absFilePath);
-    const stream = pipeline(fileStream, writeStream);
-    return streamToPromise(stream);
+    return pipeline(fileStream, writeStream);
   };
 
   /**

+ 2 - 3
apps/app/src/server/service/growi-bridge/index.ts

@@ -1,8 +1,7 @@
 import fs from 'fs';
 import path from 'path';
-import { pipeline } from 'stream';
+import { pipeline } from 'stream/promises';
 
-import streamToPromise from 'stream-to-promise';
 import unzipStream, { type Entry } from 'unzip-stream';
 
 import loggerFactory from '~/utils/logger';
@@ -103,7 +102,7 @@ class GrowiBridgeService {
     });
 
     try {
-      await streamToPromise(unzipEntryStream);
+      await unzipEntryStream;
       await tapPromise;
     }
     // if zip is broken

+ 2 - 5
apps/app/src/server/service/import/import.ts

@@ -11,7 +11,6 @@ import type {
 } from 'mongodb';
 import type { Document } from 'mongoose';
 import mongoose from 'mongoose';
-import streamToPromise from 'stream-to-promise';
 import unzipStream from 'unzip-stream';
 
 import { ImportMode } from '~/models/admin/import-mode';
@@ -270,8 +269,6 @@ export class ImportService {
 
       await pipelinePromise(readStream, jsonStream, convertStream, batchStream, writeStream);
 
-      await streamToPromise(writeStream);
-
       // clean up tmp directory
       fs.unlinkSync(jsonFile);
     }
@@ -347,7 +344,7 @@ export class ImportService {
   async unzip(zipFile) {
     const readStream = fs.createReadStream(zipFile);
     const parseStream = unzipStream.Parse();
-    const unzipStreamPipe = pipeline(readStream, parseStream);
+    const unzipStreamPipe = pipelinePromise(readStream, parseStream);
     const files: string[] = [];
 
     unzipStreamPipe.on('entry', (/** @type {Entry} */ entry) => {
@@ -373,7 +370,7 @@ export class ImportService {
       }
     });
 
-    await streamToPromise(unzipStreamPipe);
+    await unzipStreamPipe;
 
     return files;
   }

+ 0 - 3
apps/app/src/server/service/page/delete-completely-user-home-by-system.ts

@@ -6,7 +6,6 @@ import type { IPage, Ref } from '@growi/core';
 import { isUsersHomepage } from '@growi/core/dist/utils/page-path-utils';
 import type { HydratedDocument } from 'mongoose';
 import mongoose from 'mongoose';
-import streamToPromise from 'stream-to-promise';
 
 import type { PageDocument, PageModel } from '~/server/models/page';
 import { createBatchStream } from '~/server/util/batch-stream';
@@ -112,8 +111,6 @@ export const deleteCompletelyUserHomeBySystem = async(userHomepagePath: string,
     });
 
     await pipeline(readStream, batchStream, writeStream);
-
-    await streamToPromise(writeStream);
     // ────────┤ end │─────────
   }
   catch (err) {

+ 0 - 22
apps/app/src/server/service/page/index.ts

@@ -19,7 +19,6 @@ import {
 import escapeStringRegexp from 'escape-string-regexp';
 import type { Cursor, HydratedDocument } from 'mongoose';
 import mongoose from 'mongoose';
-import streamToPromise from 'stream-to-promise';
 
 import { Comment } from '~/features/comment/server';
 import type { ExternalUserGroupDocument } from '~/features/external-user-group/server/models/external-user-group';
@@ -1047,8 +1046,6 @@ class PageService implements IPageService {
     });
 
     await pipeline(readStream, batchStream, writeStream);
-
-    await streamToPromise(writeStream);
   }
 
   private async renameDescendantsWithStreamV4(targetPage, newPagePath, user, options = {}) {
@@ -1086,8 +1083,6 @@ class PageService implements IPageService {
     });
 
     await pipeline(readStream, batchStream, writeStream);
-
-    await streamToPromise(writeStream);
   }
 
   /*
@@ -1512,8 +1507,6 @@ class PageService implements IPageService {
 
     await pipeline(readStream, batchStream, writeStream);
 
-    await streamToPromise(writeStream);
-
     return nNonEmptyDuplicatedPages;
   }
 
@@ -1552,8 +1545,6 @@ class PageService implements IPageService {
 
     await pipeline(readStream, batchStream, writeStream);
 
-    await streamToPromise(writeStream);
-
     return count;
   }
 
@@ -1882,8 +1873,6 @@ class PageService implements IPageService {
 
     await pipeline(readStream, batchStream, writeStream);
 
-    await streamToPromise(writeStream);
-
     return nDeletedNonEmptyPages;
   }
 
@@ -2149,8 +2138,6 @@ class PageService implements IPageService {
 
     await pipeline(readStream, batchStream, writeStream);
 
-    await streamToPromise(writeStream);
-
     return nDeletedNonEmptyPages;
   }
 
@@ -2436,7 +2423,6 @@ class PageService implements IPageService {
 
     await pipeline(childPagesReadableStream, batchStream, childPagesWritable);
 
-    await streamToPromise(childPagesWritable);
   }
 
   async updateChildPagesGrant(
@@ -2504,8 +2490,6 @@ class PageService implements IPageService {
 
     await pipeline(readStream, batchStream, writeStream);
 
-    await streamToPromise(writeStream);
-
     return count;
   }
 
@@ -2538,8 +2522,6 @@ class PageService implements IPageService {
 
     await pipeline(readStream, batchStream, writeStream);
 
-    await streamToPromise(readStream);
-
     return count;
   }
 
@@ -3386,8 +3368,6 @@ class PageService implements IPageService {
 
     await pipeline(pagesStream, batchStream, migratePagesStream);
 
-    await streamToPromise(migratePagesStream);
-
     if (await Page.exists(matchFilter) && shouldContinue) {
       return this._normalizeParentRecursively(
         pathOrRegExps,
@@ -3503,8 +3483,6 @@ class PageService implements IPageService {
     });
 
     await pipeline(pageCursor, batchStream, recountWriteStream);
-
-    await streamToPromise(recountWriteStream);
   }
 
   // update descendantCount of all pages that are ancestors of a provided pageId by count

+ 2 - 4
apps/app/src/server/service/search-delegator/elasticsearch.ts

@@ -5,7 +5,6 @@ import { URL } from 'url';
 import { getIdStringForRef, type IPage } from '@growi/core';
 import gc from 'expose-gc/function';
 import mongoose from 'mongoose';
-import streamToPromise from 'stream-to-promise';
 
 import { SearchDelegatorName } from '~/interfaces/named-query';
 import type { ISearchResult, ISearchResultData } from '~/interfaces/search';
@@ -554,7 +553,8 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
       },
     });
 
-    await pipeline(
+
+    return pipeline(
       readStream,
       batchStream,
       appendTagNamesStream,
@@ -562,8 +562,6 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
       // appendFileUploadedStream,
       writeStream,
     );
-
-    return streamToPromise(writeStream);
   }
 
   deletePages(pages) {