Explorar el Código

add on error for stream pipe

reiji-h hace 1 año
padre
commit
1a06229b70

+ 3 - 2
apps/app/src/server/routes/apiv3/page/index.ts

@@ -1,4 +1,5 @@
 import path from 'path';
+import type { Readable } from 'stream';
 
 import type { IPage } from '@growi/core';
 import {
@@ -734,7 +735,7 @@ module.exports = (crowi) => {
       fileName = '_top';
     }
 
-    let stream;
+    let stream: Readable;
 
     try {
       stream = exportService.getReadStreamFromRevision(revision, format);
@@ -759,7 +760,7 @@ module.exports = (crowi) => {
     };
     await crowi.activityService.createActivity(parameters);
 
-    return stream.pipe(res);
+    return stream.pipe(res).on('error', () => { stream.destroy() });
   });
 
   /**

+ 21 - 1
apps/app/src/server/service/export.js

@@ -197,9 +197,29 @@ class ExportService {
     const writeStream = fs.createWriteStream(jsonFileToWrite, { encoding: this.growiBridgeService.getEncoding() });
 
     readStream
+      .on('error', () => {
+        logStream.end();
+        transformStream.end();
+        writeStream.end();
+      })
       .pipe(logStream)
+      .on('error', () => {
+        readStream.destroy();
+        transformStream.end();
+        writeStream.end();
+      })
       .pipe(transformStream)
-      .pipe(writeStream);
+      .on('error', () => {
+        readStream.destroy();
+        logStream.destroy();
+        writeStream.end();
+      })
+      .pipe(writeStream)
+      .on('error', () => {
+        readStream.destroy();
+        logStream.destroy();
+        readStream.destroy();
+      });
 
     await streamToPromise(writeStream);
 

+ 3 - 1
apps/app/src/server/service/growi-bridge/index.ts

@@ -78,7 +78,9 @@ class GrowiBridgeService {
     let meta = {};
 
     const readStream = fs.createReadStream(zipFile);
-    const unzipStreamPipe = readStream.pipe(unzipStream.Parse());
+    const parseStream = unzipStream.Parse();
+    const unzipStreamPipe = readStream.on('error', () => parseStream.end()).pipe(parseStream).on('error', () => readStream.destroy());
+
     let tapPromise;
 
     const unzipEntryStream = unzipStreamPipe.on('entry', (entry: Entry) => {

+ 31 - 1
apps/app/src/server/service/import/import.ts

@@ -268,10 +268,40 @@ export class ImportService {
       });
 
       readStream
+        .on('error', () => {
+          jsonStream.end();
+          convertStream.end();
+          batchStream.end();
+          writeStream.end();
+        })
         .pipe(jsonStream)
+        .on('error', () => {
+          readStream.destroy();
+          convertStream.end();
+          batchStream.end();
+          writeStream.end();
+        })
         .pipe(convertStream)
+        .on('error', () => {
+          readStream.destroy();
+          jsonStream.destroy();
+          batchStream.end();
+          writeStream.end();
+        })
         .pipe(batchStream)
-        .pipe(writeStream);
+        .on('error', () => {
+          readStream.destroy();
+          jsonStream.destroy();
+          convertStream.destroy();
+          writeStream.end();
+        })
+        .pipe(writeStream)
+        .on('error', () => {
+          readStream.destroy();
+          jsonStream.destroy();
+          convertStream.destroy();
+          batchStream.destroy();
+        });
 
       await streamToPromise(writeStream);
 

+ 16 - 3
apps/app/src/server/service/page/delete-completely-user-home-by-system.ts

@@ -87,8 +87,9 @@ export const deleteCompletelyUserHomeBySystem = async(userHomepagePath: string,
       .lean()
       .cursor({ batchSize: BULK_REINDEX_SIZE });
 
-    let count = 0;
+    const batchStream = createBatchStream(BULK_REINDEX_SIZE);
 
+    let count = 0;
     const writeStream = new Writable({
       objectMode: true,
       async write(batch, encoding, callback) {
@@ -110,8 +111,20 @@ export const deleteCompletelyUserHomeBySystem = async(userHomepagePath: string,
     });
 
     readStream
-      .pipe(createBatchStream(BULK_REINDEX_SIZE))
-      .pipe(writeStream);
+      .on('error', () => {
+        batchStream.end();
+        writeStream.end();
+      })
+      .pipe(batchStream)
+      .on('error', () => {
+        readStream.destroy();
+        writeStream.end();
+      })
+      .pipe(writeStream)
+      .on('error', () => {
+        readStream.destroy();
+        batchStream.destroy();
+      });
 
     await streamToPromise(writeStream);
     // ────────┤ end │─────────

+ 162 - 24
apps/app/src/server/service/page/index.ts

@@ -1006,6 +1006,8 @@ class PageService implements IPageService {
     const factory = new PageCursorsForDescendantsFactory(user, targetPage, true);
     const readStream = await factory.generateReadable();
 
+    const batchStream = createBatchStream(BULK_REINDEX_SIZE);
+
     const newPagePathPrefix = newPagePath;
     const pathRegExp = new RegExp(`^${escapeStringRegexp(targetPage.path)}`, 'i');
 
@@ -1044,15 +1046,27 @@ class PageService implements IPageService {
     });
 
     readStream
-      .pipe(createBatchStream(BULK_REINDEX_SIZE))
-      .pipe(writeStream);
-
+      .on('error', () => {
+        batchStream.end();
+        writeStream.end();
+      })
+      .pipe(batchStream)
+      .on('error', () => {
+        readStream.destroy();
+        writeStream.end();
+      })
+      .pipe(writeStream)
+      .on('error', () => {
+        readStream.destroy();
+        batchStream.destroy();
+      });
     await streamToPromise(writeStream);
   }
 
   private async renameDescendantsWithStreamV4(targetPage, newPagePath, user, options = {}) {
 
     const readStream = await this.generateReadStreamToOperateOnlyDescendants(targetPage.path, user);
+    const batchStream = createBatchStream(BULK_REINDEX_SIZE);
 
     const newPagePathPrefix = newPagePath;
     const pathRegExp = new RegExp(`^${escapeStringRegexp(targetPage.path)}`, 'i');
@@ -1084,9 +1098,20 @@ class PageService implements IPageService {
     });
 
     readStream
-      .pipe(createBatchStream(BULK_REINDEX_SIZE))
-      .pipe(writeStream);
-
+      .on('error', () => {
+        batchStream.end();
+        writeStream.end();
+      })
+      .pipe(batchStream)
+      .on('error', () => {
+        readStream.destroy();
+        writeStream.end();
+      })
+      .pipe(writeStream)
+      .on('error', () => {
+        readStream.destroy();
+        batchStream.destroy();
+      });
     await streamToPromise(writeStream);
   }
 
@@ -1469,6 +1494,7 @@ class PageService implements IPageService {
 
     const iterableFactory = new PageCursorsForDescendantsFactory(user, page, true);
     const readStream = await iterableFactory.generateReadable();
+    const batchStream = createBatchStream(BULK_REINDEX_SIZE);
 
     const newPagePathPrefix = newPagePath;
     const pathRegExp = new RegExp(`^${escapeStringRegexp(page.path)}`, 'i');
@@ -1502,8 +1528,20 @@ class PageService implements IPageService {
     });
 
     readStream
-      .pipe(createBatchStream(BULK_REINDEX_SIZE))
-      .pipe(writeStream);
+      .on('error', () => {
+        batchStream.end();
+        writeStream.end();
+      })
+      .pipe(batchStream)
+      .on('error', () => {
+        readStream.destroy();
+        writeStream.end();
+      })
+      .pipe(writeStream)
+      .on('error', () => {
+        readStream.destroy();
+        batchStream.destroy();
+      });
 
     await streamToPromise(writeStream);
 
@@ -1512,6 +1550,7 @@ class PageService implements IPageService {
 
   private async duplicateDescendantsWithStreamV4(page, newPagePath, user, onlyDuplicateUserRelatedResources: boolean) {
     const readStream = await this.generateReadStreamToOperateOnlyDescendants(page.path, user);
+    const batchStream = createBatchStream(BULK_REINDEX_SIZE);
 
     const newPagePathPrefix = newPagePath;
     const pathRegExp = new RegExp(`^${escapeStringRegexp(page.path)}`, 'i');
@@ -1543,8 +1582,20 @@ class PageService implements IPageService {
     });
 
     readStream
-      .pipe(createBatchStream(BULK_REINDEX_SIZE))
-      .pipe(writeStream);
+      .on('error', () => {
+        batchStream.end();
+        writeStream.end();
+      })
+      .pipe(batchStream)
+      .on('error', () => {
+        readStream.destory();
+        writeStream.end();
+      })
+      .pipe(writeStream)
+      .on('error', () => {
+        readStream.destory();
+        batchStream.destroy();
+      });
 
     await streamToPromise(writeStream);
 
@@ -1841,6 +1892,7 @@ class PageService implements IPageService {
       readStream = await factory.generateReadable();
     }
 
+    const batchStream = createBatchStream(BULK_REINDEX_SIZE);
 
     const deleteDescendants = this.deleteDescendants.bind(this);
     let count = 0;
@@ -1874,8 +1926,20 @@ class PageService implements IPageService {
     });
 
     readStream
-      .pipe(createBatchStream(BULK_REINDEX_SIZE))
-      .pipe(writeStream);
+      .on('error', () => {
+        batchStream.end();
+        writeStream.end();
+      })
+      .pipe(batchStream)
+      .on('error', () => {
+        readStream.destory();
+        writeStream.end();
+      })
+      .pipe(writeStream)
+      .on('error', () => {
+        readStream.destory();
+        batchStream.destroy();
+      });
 
     await streamToPromise(writeStream);
 
@@ -2105,6 +2169,8 @@ class PageService implements IPageService {
       readStream = await factory.generateReadable();
     }
 
+    const batchStream = createBatchStream(BULK_REINDEX_SIZE);
+
     let count = 0;
     let nDeletedNonEmptyPages = 0; // used for updating descendantCount
 
@@ -2137,8 +2203,20 @@ class PageService implements IPageService {
     });
 
     readStream
-      .pipe(createBatchStream(BULK_REINDEX_SIZE))
-      .pipe(writeStream);
+      .on('error', () => {
+        batchStream.end();
+        writeStream.end();
+      })
+      .pipe(batchStream)
+      .on('error', () => {
+        readStream.destory();
+        writeStream.end();
+      })
+      .pipe(writeStream)
+      .on('error', () => {
+        readStream.destory();
+        batchStream.destroy();
+      });
 
     await streamToPromise(writeStream);
 
@@ -2416,7 +2494,7 @@ class PageService implements IPageService {
     );
 
     const childPagesReadableStream = builder.query.cursor({ batchSize: BULK_REINDEX_SIZE });
-
+    const batchStream = createBatchStream(BULK_REINDEX_SIZE);
     const childPagesWritable = new Writable({
       objectMode: true,
       write: async(batch, encoding, callback) => {
@@ -2426,8 +2504,20 @@ class PageService implements IPageService {
     });
 
     childPagesReadableStream
-      .pipe(createBatchStream(BULK_REINDEX_SIZE))
-      .pipe(childPagesWritable);
+      .on('error', () => {
+        batchStream.end();
+        childPagesWritable.end();
+      })
+      .pipe(batchStream)
+      .on('error', () => {
+        childPagesReadableStream.destroy();
+        childPagesWritable.end();
+      })
+      .pipe(childPagesWritable)
+      .on('error', () => {
+        childPagesReadableStream.destroy();
+        childPagesWritable.destroy();
+      });
     await streamToPromise(childPagesWritable);
   }
 
@@ -2465,6 +2555,7 @@ class PageService implements IPageService {
     }
 
     const readStream = await this.generateReadStreamToOperateOnlyDescendants(targetPage.path, user);
+    const batchStream = createBatchStream(BULK_REINDEX_SIZE);
 
     const revertDeletedDescendants = this.revertDeletedDescendants.bind(this);
     let count = 0;
@@ -2494,8 +2585,20 @@ class PageService implements IPageService {
     });
 
     readStream
-      .pipe(createBatchStream(BULK_REINDEX_SIZE))
-      .pipe(writeStream);
+      .on('error', () => {
+        batchStream.end();
+        writeStream.end();
+      })
+      .pipe(batchStream)
+      .on('error', () => {
+        readStream.destroy();
+        writeStream.end();
+      })
+      .pipe(writeStream)
+      .on('error', () => {
+        readStream.destroy();
+        batchStream.destroy();
+      });
 
     await streamToPromise(writeStream);
 
@@ -2504,6 +2607,7 @@ class PageService implements IPageService {
 
   private async revertDeletedDescendantsWithStreamV4(targetPage, user, options = {}) {
     const readStream = await this.generateReadStreamToOperateOnlyDescendants(targetPage.path, user);
+    const batchStream = createBatchStream(BULK_REINDEX_SIZE);
 
     const revertDeletedDescendants = this.revertDeletedDescendants.bind(this);
     let count = 0;
@@ -2529,8 +2633,17 @@ class PageService implements IPageService {
     });
 
     readStream
-      .pipe(createBatchStream(BULK_REINDEX_SIZE))
-      .pipe(writeStream);
+      .on('error', () => {
+        batchStream.end();
+        writeStream.end();
+      })
+      .pipe(batchStream)
+      .on('error', () => {
+        readStream.destory();
+        writeStream.end();
+      })
+      .pipe(writeStream)
+      .on('error', () => {});
 
     await streamToPromise(readStream);
 
@@ -3379,8 +3492,20 @@ class PageService implements IPageService {
     });
 
     pagesStream
+      .on('error', () => {
+        batchStream.end();
+        migratePagesStream.end();
+      })
       .pipe(batchStream)
-      .pipe(migratePagesStream);
+      .on('error', () => {
+        pagesStream.destroy();
+        migratePagesStream.end();
+      })
+      .pipe(migratePagesStream)
+      .on('error', () => {
+        pagesStream.destroy();
+        batchStream.destroy();
+      });
 
     await streamToPromise(migratePagesStream);
 
@@ -3483,6 +3608,7 @@ class PageService implements IPageService {
    */
   async recountAndUpdateDescendantCountOfPages(pageCursor: Cursor<any>, batchSize:number): Promise<void> {
     const Page = this.crowi.model('Page');
+    const batchStream = createBatchStream(batchSize);
     const recountWriteStream = new Writable({
       objectMode: true,
       async write(pageDocuments, encoding, callback) {
@@ -3497,8 +3623,20 @@ class PageService implements IPageService {
       },
     });
     pageCursor
-      .pipe(createBatchStream(batchSize))
-      .pipe(recountWriteStream);
+      .on('error', () => {
+        batchStream.end();
+        recountWriteStream.end();
+      })
+      .pipe(batchStream)
+      .on('error', () => {
+        pageCursor.destroy();
+        recountWriteStream.end();
+      })
+      .pipe(recountWriteStream)
+      .on('error', () => {
+        pageCursor.destroy();
+        batchStream.destroy();
+      });
 
     await streamToPromise(recountWriteStream);
   }