Browse Source

efficient batch handling in stream

Futa Arai 2 years ago
parent
commit
da19184035

+ 0 - 2
apps/app/src/features/page-bulk-export/server/routes/apiv3/page-bulk-export.ts

@@ -34,9 +34,7 @@ module.exports = (crowi: Crowi): Router => {
 
     const { path, format } = req.body;
 
-    console.time('pageBulkExport');
     pageBulkExportService?.bulkExportWithBasePagePath(path);
-    console.timeEnd('pageBulkExport');
     return res.apiv3({}, 204);
   });
 

+ 60 - 12
apps/app/src/features/page-bulk-export/server/service/page-bulk-export.ts

@@ -15,6 +15,8 @@ const logger = loggerFactory('growi:services:PageBulkExportService');
 
 const streamToPromise = require('stream-to-promise');
 
+const sleep = (msec: number): Promise<void> => new Promise(resolve => setTimeout(resolve, msec));
+
 class PageBulkExportService {
 
   crowi: any;
@@ -42,7 +44,9 @@ class PageBulkExportService {
       zipArchiver.pipe(multipartUploadWritable);
       pageReadableStream.pipe(pagesWritable);
 
+      console.time('pageBulkExport');
       await streamToPromise(multipartUploadWritable);
+      console.timeEnd('pageBulkExport');
     }
     catch (err) {
       logger.error(err);
@@ -70,9 +74,14 @@ class PageBulkExportService {
    * Get a Writable that writes the page body to a zip file
    */
   private getPageWritable(zipArchiver: Archiver) {
+    let totalTime = 0;
+    let documentsProcessed = 0;
+
     return new Writable({
       objectMode: true,
       write: async(page: PageDocument, encoding, callback) => {
+        const startTime = performance.now();
+
         try {
           const revision = page.revision;
 
@@ -82,6 +91,12 @@ class PageBulkExportService {
             const pathNormalized = normalizePath(page.path);
             zipArchiver.append(markdownBody, { name: `${pathNormalized}.md` });
           }
+
+          documentsProcessed++;
+          if (documentsProcessed % 100 === 0) { // Assuming batchSize is 100
+            console.log(`Batch retrieved. Documents processed: ${documentsProcessed}`);
+            // await sleep(10000);
+          }
         }
         catch (err) {
           logger.error(err);
@@ -89,28 +104,35 @@ class PageBulkExportService {
         }
 
         callback();
+
+        const endTime = performance.now();
+
+        const elapsedTime = endTime - startTime;
+        totalTime += elapsedTime;
       },
       final: (callback) => {
         zipArchiver.finalize();
         callback();
+
+        console.log('Total time of zip write stream: ', totalTime);
       },
     });
   }
 
   private setUpZipArchiver(): Archiver {
-    const archive = archiver('zip', {
+    const zipArchiver = archiver('zip', {
       zlib: { level: 9 }, // maximum compression
     });
 
     // good practice to catch warnings (ie stat failures and other non-blocking errors)
-    archive.on('warning', (err) => {
+    zipArchiver.on('warning', (err) => {
       if (err.code === 'ENOENT') logger.error(err);
       else throw err;
     });
     // good practice to catch this error explicitly
-    archive.on('error', (err) => { throw err });
+    zipArchiver.on('error', (err) => { throw err });
 
-    return archive;
+    return zipArchiver;
   }
 
   private async getMultipartUploadWritable(uploadKey: string): Promise<Writable> {
@@ -122,12 +144,25 @@ class PageBulkExportService {
 
     let partNumber = 1;
     // Buffer to store stream data before upload. When the buffer is full, it will be uploaded as a part.
-    let buffer = Buffer.alloc(0);
+    let part = Buffer.alloc(this.partSize);
+    let filledPartSize = 0;
 
     await multipartUploader.initUpload();
+    logger.info(`Multipart upload initialized. Upload key: ${uploadKey}`);
+
+    let totalTime = 0;
+
+    let chunkCount = 0;
 
     return new Writable({
       write: async(chunk: Buffer, encoding, callback) => {
+        chunkCount++;
+        if (chunkCount % 100 === 0) {
+          console.log('chunk is being read. size:', chunk.length);
+          chunkCount = 0;
+        }
+        const startTime = performance.now();
+
         let offset = 0;
         while (offset < chunk.length) {
           // The data size to add to buffer.
@@ -136,16 +171,19 @@ class PageBulkExportService {
           // - If the remaining chunk size is larger than the remaining buffer size:
           //     - Fill the buffer, and upload => dataSize is the remaining buffer size
           //     - The remaining chunk after upload will be added to buffer in the next iteration
-          const dataSize = Math.min(this.partSize - buffer.length, chunk.length - offset);
+          const dataSize = Math.min(this.partSize - filledPartSize, chunk.length - offset);
           // Add chunk data to buffer
-          buffer = Buffer.concat([buffer, chunk.slice(offset, offset + dataSize)]);
+          // buffer = Buffer.concat([buffer, chunk.slice(offset, offset + dataSize)]);
+          chunk.copy(part, filledPartSize, offset, offset + dataSize);
+          filledPartSize += dataSize;
 
           // When buffer reaches partSize, upload
-          if (buffer.length === this.partSize) {
+          if (filledPartSize === this.partSize) {
             // eslint-disable-next-line no-await-in-loop
-            await multipartUploader.uploadPart(buffer, partNumber);
+            await multipartUploader.uploadPart(part, partNumber);
             // Reset buffer after upload
-            buffer = Buffer.alloc(0);
+            part = Buffer.alloc(this.partSize);
+            filledPartSize = 0;
             partNumber += 1;
           }
 
@@ -153,13 +191,23 @@ class PageBulkExportService {
         }
 
         callback();
+
+        const endTime = performance.now();
+
+        const elapsedTime = endTime - startTime;
+        totalTime += elapsedTime;
       },
       async final(callback) {
-        if (buffer.length > 0) {
-          await multipartUploader.uploadPart(buffer, partNumber);
+        if (filledPartSize > 0) {
+          const finalPart = Buffer.alloc(filledPartSize);
+          part.copy(finalPart, 0, 0, filledPartSize);
+          await multipartUploader.uploadPart(finalPart, partNumber);
         }
         await multipartUploader.completeUpload();
+        logger.info(`Multipart upload completed. Upload key: ${uploadKey}`);
         callback();
+
+        console.log('Total time of mutipart upload stream: ', totalTime);
       },
     });
   }