reiji-h пре 1 година
родитељ
комит
56bbb8aa93

+ 3 - 1
apps/app/src/features/growi-plugin/server/services/growi-plugin/growi-plugin.ts

@@ -1,5 +1,6 @@
 import fs, { readFileSync } from 'fs';
 import path from 'path';
+import { pipeline } from 'stream';
 
 import { GrowiPluginType } from '@growi/core';
 import type { GrowiThemeMetadata, ViteManifest } from '@growi/core';
@@ -212,7 +213,8 @@ export class GrowiPluginService implements IGrowiPluginService {
       const readZipStream = fs.createReadStream(zipFilePath);
       const writeUnZipStream = unzipStream.Extract({ path: destPath.toString() });
 
-      const unzipFileStream = readZipStream.on('error', () => { writeUnZipStream.end() }).pipe(writeUnZipStream).on('error', () => { readZipStream.destroy() });
+      const unzipFileStream = pipeline(readZipStream, writeUnZipStream);
+
 
       await streamToPromise(unzipFileStream);
     }

+ 2 - 15
apps/app/src/features/openai/server/services/openai.ts

@@ -1,5 +1,6 @@
 import assert from 'node:assert';
 import { Readable, Transform } from 'stream';
+import { pipeline } from 'stream/promises';
 
 import { PageGrant, isPopulated } from '@growi/core';
 import type { HydratedDocument, Types } from 'mongoose';
@@ -342,21 +343,7 @@ class OpenaiService implements IOpenaiService {
       },
     });
 
-    pagesStream
-      .on('error', () => {
-        batchStrem.end();
-        createVectorStoreFileStream.end();
-      })
-      .pipe(batchStrem)
-      .on('error', () => {
-        pagesStream.destroy();
-        createVectorStoreFileStream.end();
-      })
-      .pipe(createVectorStoreFileStream)
-      .on('error', () => {
-        pagesStream.destroy();
-        batchStrem.destroy();
-      });
+    await pipeline(pagesStream, batchStrem, createVectorStoreFileStream);
   }
 
   async rebuildVectorStore(page: HydratedDocument<PageDocument>) {

+ 3 - 31
apps/app/src/migrations/20211227060705-revision-path-to-page-id-schema-migration--fixed-7549.js

@@ -1,4 +1,4 @@
-import { Writable } from 'stream';
+import { pipeline, Writable } from 'stream';
 
 import mongoose from 'mongoose';
 import streamToPromise from 'stream-to-promise';
@@ -56,21 +56,7 @@ module.exports = {
       },
     });
 
-    pagesStream
-      .on('error', () => {
-        batchStrem.end();
-        migratePagesStream.end();
-      })
-      .pipe(batchStrem)
-      .on('error', () => {
-        pagesStream.destroy();
-        migratePagesStream.end();
-      })
-      .pipe(migratePagesStream)
-      .on('error', () => {
-        pagesStream.destroy();
-        batchStrem.destroy();
-      });
+    pipeline(pagesStream, batchStrem, migratePagesStream);
 
     await streamToPromise(migratePagesStream);
 
@@ -119,21 +105,7 @@ module.exports = {
       },
     });
 
-    pagesStream
-      .on('error', () => {
-        batchStrem.end();
-        migratePagesStream.end();
-      })
-      .pipe(batchStrem)
-      .on('error', () => {
-        pagesStream.destroy();
-        migratePagesStream.end();
-      })
-      .pipe(migratePagesStream)
-      .on('error', () => {
-        pagesStream.destroy();
-        batchStrem.destroy();
-      });
+    pipeline(pagesStream, batchStrem, migratePagesStream);
 
     await streamToPromise(migratePagesStream);
 

+ 2 - 2
apps/app/src/server/routes/apiv3/page/index.ts

@@ -1,5 +1,5 @@
 import path from 'path';
-import type { Readable } from 'stream';
+import { pipeline, type Readable } from 'stream';
 
 import type { IPage } from '@growi/core';
 import {
@@ -760,7 +760,7 @@ module.exports = (crowi) => {
     };
     await crowi.activityService.createActivity(parameters);
 
-    return stream.pipe(res).on('error', () => { stream.destroy() });
+    return pipeline(stream, res);
   });
 
   /**

+ 3 - 29
apps/app/src/server/service/export.js

@@ -7,7 +7,7 @@ const logger = loggerFactory('growi:services:ExportService'); // eslint-disable-
 
 const fs = require('fs');
 const path = require('path');
-const { Transform } = require('stream');
+const { Transform, pipeline } = require('stream');
 
 const archiver = require('archiver');
 const mongoose = require('mongoose');
@@ -196,30 +196,7 @@ class ExportService {
     const jsonFileToWrite = path.join(this.baseDir, `${collectionName}.json`);
     const writeStream = fs.createWriteStream(jsonFileToWrite, { encoding: this.growiBridgeService.getEncoding() });
 
-    readStream
-      .on('error', () => {
-        logStream.end();
-        transformStream.end();
-        writeStream.end();
-      })
-      .pipe(logStream)
-      .on('error', () => {
-        readStream.destroy();
-        transformStream.end();
-        writeStream.end();
-      })
-      .pipe(transformStream)
-      .on('error', () => {
-        readStream.destroy();
-        logStream.destroy();
-        writeStream.end();
-      })
-      .pipe(writeStream)
-      .on('error', () => {
-        readStream.destroy();
-        logStream.destroy();
-        readStream.destroy();
-      });
+    pipeline(readStream, logStream, transformStream, writeStream);
 
     await streamToPromise(writeStream);
 
@@ -375,10 +352,7 @@ class ExportService {
     const output = fs.createWriteStream(zipFile);
 
     // pipe archive data to the file
-    archive
-      .on('error', () => { output.end() })
-      .pipe(output)
-      .on('error', () => { archive.destroy() });
+    pipeline(archive, output);
 
     // finalize the archive (ie we are done appending files but streams have to finish yet)
     // 'close', 'end' or 'finish' may be fired right after calling this method so register to them beforehand

+ 3 - 9
apps/app/src/server/service/file-uploader/local.ts

@@ -1,5 +1,5 @@
 import type { ReadStream } from 'fs';
-import { Readable } from 'stream';
+import { pipeline, Readable } from 'stream';
 
 import type { Response } from 'express';
 
@@ -165,10 +165,7 @@ module.exports = function(crowi) {
 
     const writeStream = fs.createWriteStream(filePath);
 
-    const stream = fileStream
-      .on('error', () => { writeStream.end() })
-      .pipe(writeStream)
-      .on('error', () => { fileStream.destroy() });
+    const stream = pipeline(fileStream, writeStream);
     return streamToPromise(stream);
   };
 
@@ -183,10 +180,7 @@ module.exports = function(crowi) {
     fileStream.push(data);
     fileStream.push(null); // EOF
     const writeStream = fs.createWriteStream(absFilePath);
-    const stream = fileStream
-      .on('error', () => { writeStream.end() })
-      .pipe(writeStream)
-      .on('error', () => { fileStream.destroy() });
+    const stream = pipeline(fileStream, writeStream);
     return streamToPromise(stream);
   };
 

+ 2 - 4
apps/app/src/server/service/growi-bridge/index.ts

@@ -1,5 +1,6 @@
 import fs from 'fs';
 import path from 'path';
+import { pipeline } from 'stream';
 
 import streamToPromise from 'stream-to-promise';
 import unzipStream, { type Entry } from 'unzip-stream';
@@ -79,10 +80,7 @@ class GrowiBridgeService {
 
     const readStream = fs.createReadStream(zipFile);
     const parseStream = unzipStream.Parse();
-    const unzipStreamPipe = readStream
-      .on('error', () => parseStream.end())
-      .pipe(parseStream)
-      .on('error', () => readStream.destroy());
+    const unzipStreamPipe = pipeline(readStream, parseStream);
 
     let tapPromise;
 

+ 4 - 44
apps/app/src/server/service/import/import.ts

@@ -1,7 +1,7 @@
 import fs from 'fs';
 import path from 'path';
 import type { EventEmitter } from 'stream';
-import { Writable, Transform } from 'stream';
+import { Writable, Transform, pipeline } from 'stream';
 
 import JSONStream from 'JSONStream';
 import gc from 'expose-gc/function';
@@ -267,41 +267,7 @@ export class ImportService {
         },
       });
 
-      readStream
-        .on('error', () => {
-          jsonStream.end();
-          convertStream.end();
-          batchStream.end();
-          writeStream.end();
-        })
-        .pipe(jsonStream)
-        .on('error', () => {
-          readStream.destroy();
-          convertStream.end();
-          batchStream.end();
-          writeStream.end();
-        })
-        .pipe(convertStream)
-        .on('error', () => {
-          readStream.destroy();
-          jsonStream.destroy();
-          batchStream.end();
-          writeStream.end();
-        })
-        .pipe(batchStream)
-        .on('error', () => {
-          readStream.destroy();
-          jsonStream.destroy();
-          convertStream.destroy();
-          writeStream.end();
-        })
-        .pipe(writeStream)
-        .on('error', () => {
-          readStream.destroy();
-          jsonStream.destroy();
-          convertStream.destroy();
-          batchStream.destroy();
-        });
+      pipeline(readStream, jsonStream, convertStream, batchStream, writeStream);
 
       await streamToPromise(writeStream);
 
@@ -380,10 +346,7 @@ export class ImportService {
   async unzip(zipFile) {
     const readStream = fs.createReadStream(zipFile);
     const parseStream = unzipStream.Parse();
-    const unzipStreamPipe = readStream
-      .on('error', () => { parseStream.end() })
-      .pipe(parseStream)
-      .on('error', () => { readStream.destroy() });
+    const unzipStreamPipe = pipeline(readStream, parseStream);
     const files: string[] = [];
 
     unzipStreamPipe.on('entry', (/** @type {Entry} */ entry) => {
@@ -404,10 +367,7 @@ export class ImportService {
       else {
         const jsonFile = path.join(this.baseDir, fileName);
         const writeStream = fs.createWriteStream(jsonFile, { encoding: this.growiBridgeService.getEncoding() });
-        entry
-          .on('error', () => { writeStream.end() })
-          .pipe(writeStream)
-          .on('error', () => { entry.destroy() });
+        pipeline(entry, writeStream);
         files.push(jsonFile);
       }
     });

+ 2 - 16
apps/app/src/server/service/page/delete-completely-user-home-by-system.ts

@@ -1,4 +1,4 @@
-import { Writable } from 'stream';
+import { pipeline, Writable } from 'stream';
 
 import { getIdForRef } from '@growi/core';
 import type { IPage, Ref } from '@growi/core';
@@ -110,21 +110,7 @@ export const deleteCompletelyUserHomeBySystem = async(userHomepagePath: string,
       },
     });
 
-    readStream
-      .on('error', () => {
-        batchStream.end();
-        writeStream.end();
-      })
-      .pipe(batchStream)
-      .on('error', () => {
-        readStream.destroy();
-        writeStream.end();
-      })
-      .pipe(writeStream)
-      .on('error', () => {
-        readStream.destroy();
-        batchStream.destroy();
-      });
+    pipeline(readStream, batchStream, writeStream);
 
     await streamToPromise(writeStream);
     // ────────┤ end │─────────

+ 16 - 163
apps/app/src/server/service/page/index.ts

@@ -1,6 +1,6 @@
 import type EventEmitter from 'events';
 import pathlib from 'path';
-import { Readable, Writable } from 'stream';
+import { pipeline, Readable, Writable } from 'stream';
 
 import {
   PageStatus, YDocStatus, getIdForRef,
@@ -1045,21 +1045,8 @@ class PageService implements IPageService {
       },
     });
 
-    readStream
-      .on('error', () => {
-        batchStream.end();
-        writeStream.end();
-      })
-      .pipe(batchStream)
-      .on('error', () => {
-        readStream.destroy();
-        writeStream.end();
-      })
-      .pipe(writeStream)
-      .on('error', () => {
-        readStream.destroy();
-        batchStream.destroy();
-      });
+    pipeline(readStream, batchStream, writeStream);
+
     await streamToPromise(writeStream);
   }
 
@@ -1097,21 +1084,8 @@ class PageService implements IPageService {
       },
     });
 
-    readStream
-      .on('error', () => {
-        batchStream.end();
-        writeStream.end();
-      })
-      .pipe(batchStream)
-      .on('error', () => {
-        readStream.destroy();
-        writeStream.end();
-      })
-      .pipe(writeStream)
-      .on('error', () => {
-        readStream.destroy();
-        batchStream.destroy();
-      });
+    pipeline(readStream, batchStream, writeStream);
+
     await streamToPromise(writeStream);
   }
 
@@ -1527,21 +1501,7 @@ class PageService implements IPageService {
       },
     });
 
-    readStream
-      .on('error', () => {
-        batchStream.end();
-        writeStream.end();
-      })
-      .pipe(batchStream)
-      .on('error', () => {
-        readStream.destroy();
-        writeStream.end();
-      })
-      .pipe(writeStream)
-      .on('error', () => {
-        readStream.destroy();
-        batchStream.destroy();
-      });
+    pipeline(readStream, batchStream, writeStream);
 
     await streamToPromise(writeStream);
 
@@ -1581,21 +1541,7 @@ class PageService implements IPageService {
       },
     });
 
-    readStream
-      .on('error', () => {
-        batchStream.end();
-        writeStream.end();
-      })
-      .pipe(batchStream)
-      .on('error', () => {
-        readStream.destroy();
-        writeStream.end();
-      })
-      .pipe(writeStream)
-      .on('error', () => {
-        readStream.destroy();
-        batchStream.destroy();
-      });
+    pipeline(readStream, batchStream, writeStream);
 
     await streamToPromise(writeStream);
 
@@ -1925,21 +1871,7 @@ class PageService implements IPageService {
       },
     });
 
-    readStream
-      .on('error', () => {
-        batchStream.end();
-        writeStream.end();
-      })
-      .pipe(batchStream)
-      .on('error', () => {
-        readStream.destroy();
-        writeStream.end();
-      })
-      .pipe(writeStream)
-      .on('error', () => {
-        readStream.destroy();
-        batchStream.destroy();
-      });
+    pipeline(readStream, batchStream, writeStream);
 
     await streamToPromise(writeStream);
 
@@ -2202,21 +2134,7 @@ class PageService implements IPageService {
       },
     });
 
-    readStream
-      .on('error', () => {
-        batchStream.end();
-        writeStream.end();
-      })
-      .pipe(batchStream)
-      .on('error', () => {
-        readStream.destroy();
-        writeStream.end();
-      })
-      .pipe(writeStream)
-      .on('error', () => {
-        readStream.destroy();
-        batchStream.destroy();
-      });
+    pipeline(readStream, batchStream, writeStream);
 
     await streamToPromise(writeStream);
 
@@ -2503,21 +2421,8 @@ class PageService implements IPageService {
       },
     });
 
-    childPagesReadableStream
-      .on('error', () => {
-        batchStream.end();
-        childPagesWritable.end();
-      })
-      .pipe(batchStream)
-      .on('error', () => {
-        childPagesReadableStream.destroy();
-        childPagesWritable.end();
-      })
-      .pipe(childPagesWritable)
-      .on('error', () => {
-        childPagesReadableStream.destroy();
-        childPagesWritable.destroy();
-      });
+    pipeline(childPagesReadableStream, batchStream, childPagesWritable);
+
     await streamToPromise(childPagesWritable);
   }
 
@@ -2584,21 +2489,7 @@ class PageService implements IPageService {
       },
     });
 
-    readStream
-      .on('error', () => {
-        batchStream.end();
-        writeStream.end();
-      })
-      .pipe(batchStream)
-      .on('error', () => {
-        readStream.destroy();
-        writeStream.end();
-      })
-      .pipe(writeStream)
-      .on('error', () => {
-        readStream.destroy();
-        batchStream.destroy();
-      });
+    pipeline(readStream, batchStream, writeStream);
 
     await streamToPromise(writeStream);
 
@@ -2632,18 +2523,7 @@ class PageService implements IPageService {
       },
     });
 
-    readStream
-      .on('error', () => {
-        batchStream.end();
-        writeStream.end();
-      })
-      .pipe(batchStream)
-      .on('error', () => {
-        readStream.destroy();
-        writeStream.end();
-      })
-      .pipe(writeStream)
-      .on('error', () => {});
+    pipeline(readStream, batchStream, writeStream);
 
     await streamToPromise(readStream);
 
@@ -3491,21 +3371,7 @@ class PageService implements IPageService {
       },
     });
 
-    pagesStream
-      .on('error', () => {
-        batchStream.end();
-        migratePagesStream.end();
-      })
-      .pipe(batchStream)
-      .on('error', () => {
-        pagesStream.destroy();
-        migratePagesStream.end();
-      })
-      .pipe(migratePagesStream)
-      .on('error', () => {
-        pagesStream.destroy();
-        batchStream.destroy();
-      });
+    pipeline(pagesStream, batchStream, migratePagesStream);
 
     await streamToPromise(migratePagesStream);
 
@@ -3622,21 +3488,8 @@ class PageService implements IPageService {
         callback();
       },
     });
-    pageCursor
-      .on('error', () => {
-        batchStream.end();
-        recountWriteStream.end();
-      })
-      .pipe(batchStream)
-      .on('error', () => {
-        pageCursor.destroy();
-        recountWriteStream.end();
-      })
-      .pipe(recountWriteStream)
-      .on('error', () => {
-        pageCursor.destroy();
-        batchStream.destroy();
-      });
+
+    pipeline(pageCursor, batchStream, recountWriteStream);
 
     await streamToPromise(recountWriteStream);
   }

+ 9 - 27
apps/app/src/server/service/search-delegator/elasticsearch.ts

@@ -1,4 +1,4 @@
-import { Writable, Transform } from 'stream';
+import { Writable, Transform, pipeline } from 'stream';
 import { URL } from 'url';
 
 import { getIdStringForRef, type IPage } from '@growi/core';
@@ -553,32 +553,14 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
       },
     });
 
-    readStream
-      .on('error', () => {
-        batchStream.end();
-        appendTagNamesStream.end();
-        writeStream.end();
-      })
-      .pipe(batchStream)
-      .on('error', () => {
-        readStream.destroy();
-        appendTagNamesStream.end();
-        writeStream.end();
-      })
-      .pipe(appendTagNamesStream)
-      .on('error', () => {
-        readStream.destroy();
-        batchStream.destroy();
-        writeStream.end();
-      })
-      // .pipe(appendEmbeddingStream)
-      // .pipe(appendFileUploadedStream)
-      .pipe(writeStream)
-      .on('error', () => {
-        readStream.destroy();
-        batchStream.destroy();
-        appendTagNamesStream.destroy();
-      });
+    pipeline(
+      readStream,
+      batchStream,
+      appendTagNamesStream,
+      // appendEmbeddingStream,
+      // appendFileUploadedStream,
+      writeStream,
+    );
 
     return streamToPromise(writeStream);
   }