reiji-h 1 год назад
Родитель
Сommit
7642fad846

+ 4 - 3
apps/app/src/migrations/20211227060705-revision-path-to-page-id-schema-migration--fixed-7549.js

@@ -1,4 +1,5 @@
-import { pipeline, Writable } from 'stream';
+import { Writable } from 'stream';
+import { pipeline } from 'stream/promises';
 
 import mongoose from 'mongoose';
 import streamToPromise from 'stream-to-promise';
@@ -56,7 +57,7 @@ module.exports = {
       },
     });
 
-    pipeline(pagesStream, batchStrem, migratePagesStream);
+    await pipeline(pagesStream, batchStrem, migratePagesStream);
 
     await streamToPromise(migratePagesStream);
 
@@ -105,7 +106,7 @@ module.exports = {
       },
     });
 
-    pipeline(pagesStream, batchStrem, migratePagesStream);
+    await pipeline(pagesStream, batchStrem, migratePagesStream);
 
     await streamToPromise(migratePagesStream);
 

+ 4 - 3
apps/app/src/server/service/export.js

@@ -7,7 +7,8 @@ const logger = loggerFactory('growi:services:ExportService'); // eslint-disable-
 
 const fs = require('fs');
 const path = require('path');
-const { Transform, pipeline } = require('stream');
+const { Transform } = require('stream');
+const { pipeline } = require('stream/promises');
 
 const archiver = require('archiver');
 const mongoose = require('mongoose');
@@ -196,7 +197,7 @@ class ExportService {
     const jsonFileToWrite = path.join(this.baseDir, `${collectionName}.json`);
     const writeStream = fs.createWriteStream(jsonFileToWrite, { encoding: this.growiBridgeService.getEncoding() });
 
-    pipeline(readStream, logStream, transformStream, writeStream);
+    await pipeline(readStream, logStream, transformStream, writeStream);
 
     await streamToPromise(writeStream);
 
@@ -352,7 +353,7 @@ class ExportService {
     const output = fs.createWriteStream(zipFile);
 
     // pipe archive data to the file
-    pipeline(archive, output);
+    await pipeline(archive, output);
 
     // finalize the archive (ie we are done appending files but streams have to finish yet)
     // 'close', 'end' or 'finish' may be fired right after calling this method so register to them beforehand

+ 2 - 1
apps/app/src/server/service/import/import.ts

@@ -2,6 +2,7 @@ import fs from 'fs';
 import path from 'path';
 import type { EventEmitter } from 'stream';
 import { Writable, Transform, pipeline } from 'stream';
+import { pipeline as pipelinePromise } from 'stream/promises';
 
 import JSONStream from 'JSONStream';
 import gc from 'expose-gc/function';
@@ -267,7 +268,7 @@ export class ImportService {
         },
       });
 
-      pipeline(readStream, jsonStream, convertStream, batchStream, writeStream);
+      await pipelinePromise(readStream, jsonStream, convertStream, batchStream, writeStream);
 
       await streamToPromise(writeStream);
 

+ 3 - 2
apps/app/src/server/service/page/delete-completely-user-home-by-system.ts

@@ -1,4 +1,5 @@
-import { pipeline, Writable } from 'stream';
+import { Writable } from 'stream';
+import { pipeline } from 'stream/promises';
 
 import { getIdForRef } from '@growi/core';
 import type { IPage, Ref } from '@growi/core';
@@ -110,7 +111,7 @@ export const deleteCompletelyUserHomeBySystem = async(userHomepagePath: string,
       },
     });
 
-    pipeline(readStream, batchStream, writeStream);
+    await pipeline(readStream, batchStream, writeStream);
 
     await streamToPromise(writeStream);
     // ────────┤ end │─────────

+ 13 - 12
apps/app/src/server/service/page/index.ts

@@ -1,6 +1,7 @@
 import type EventEmitter from 'events';
 import pathlib from 'path';
-import { pipeline, Readable, Writable } from 'stream';
+import { Readable, Writable } from 'stream';
+import { pipeline } from 'stream/promises';
 
 import {
   PageStatus, YDocStatus, getIdForRef,
@@ -1045,7 +1046,7 @@ class PageService implements IPageService {
       },
     });
 
-    pipeline(readStream, batchStream, writeStream);
+    await pipeline(readStream, batchStream, writeStream);
 
     await streamToPromise(writeStream);
   }
@@ -1084,7 +1085,7 @@ class PageService implements IPageService {
       },
     });
 
-    pipeline(readStream, batchStream, writeStream);
+    await pipeline(readStream, batchStream, writeStream);
 
     await streamToPromise(writeStream);
   }
@@ -1501,7 +1502,7 @@ class PageService implements IPageService {
       },
     });
 
-    pipeline(readStream, batchStream, writeStream);
+    await pipeline(readStream, batchStream, writeStream);
 
     await streamToPromise(writeStream);
 
@@ -1541,7 +1542,7 @@ class PageService implements IPageService {
       },
     });
 
-    pipeline(readStream, batchStream, writeStream);
+    await pipeline(readStream, batchStream, writeStream);
 
     await streamToPromise(writeStream);
 
@@ -1871,7 +1872,7 @@ class PageService implements IPageService {
       },
     });
 
-    pipeline(readStream, batchStream, writeStream);
+    await pipeline(readStream, batchStream, writeStream);
 
     await streamToPromise(writeStream);
 
@@ -2134,7 +2135,7 @@ class PageService implements IPageService {
       },
     });
 
-    pipeline(readStream, batchStream, writeStream);
+    await pipeline(readStream, batchStream, writeStream);
 
     await streamToPromise(writeStream);
 
@@ -2421,7 +2422,7 @@ class PageService implements IPageService {
       },
     });
 
-    pipeline(childPagesReadableStream, batchStream, childPagesWritable);
+    await pipeline(childPagesReadableStream, batchStream, childPagesWritable);
 
     await streamToPromise(childPagesWritable);
   }
@@ -2489,7 +2490,7 @@ class PageService implements IPageService {
       },
     });
 
-    pipeline(readStream, batchStream, writeStream);
+    await pipeline(readStream, batchStream, writeStream);
 
     await streamToPromise(writeStream);
 
@@ -2523,7 +2524,7 @@ class PageService implements IPageService {
       },
     });
 
-    pipeline(readStream, batchStream, writeStream);
+    await pipeline(readStream, batchStream, writeStream);
 
     await streamToPromise(readStream);
 
@@ -3371,7 +3372,7 @@ class PageService implements IPageService {
       },
     });
 
-    pipeline(pagesStream, batchStream, migratePagesStream);
+    await pipeline(pagesStream, batchStream, migratePagesStream);
 
     await streamToPromise(migratePagesStream);
 
@@ -3489,7 +3490,7 @@ class PageService implements IPageService {
       },
     });
 
-    pipeline(pageCursor, batchStream, recountWriteStream);
+    await pipeline(pageCursor, batchStream, recountWriteStream);
 
     await streamToPromise(recountWriteStream);
   }

+ 3 - 2
apps/app/src/server/service/search-delegator/elasticsearch.ts

@@ -1,4 +1,5 @@
-import { Writable, Transform, pipeline } from 'stream';
+import { Writable, Transform } from 'stream';
+import { pipeline } from 'stream/promises';
 import { URL } from 'url';
 
 import { getIdStringForRef, type IPage } from '@growi/core';
@@ -553,7 +554,7 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
       },
     });
 
-    pipeline(
+    await pipeline(
       readStream,
       batchStream,
       appendTagNamesStream,