Просмотр исходного кода

Merge pull request #19 from hakumizuki/feat/g2g-nextjs-copy-attachments-resolving-todo

TODO 消化
Haku Mizuki 3 лет назад
Родитель
Сommit
a4ddc455af
1 измененных файлов с 98 добавлено и 82 удалено
  1. 98 82
      packages/app/src/server/service/g2g-transfer.ts

+ 98 - 82
packages/app/src/server/service/g2g-transfer.ts

@@ -7,6 +7,7 @@ import FormData from 'form-data';
 import { Types as MongooseTypes } from 'mongoose';
 
 import TransferKeyModel from '~/server/models/transfer-key';
+import { createBatchStream } from '~/server/util/batch-stream';
 import axios from '~/utils/axios';
 import loggerFactory from '~/utils/logger';
 import { TransferKey } from '~/utils/vo/transfer-key';
@@ -38,9 +39,10 @@ interface Pusher {
    */
   canTransfer(fromGROWIInfo: IDataGROWIInfo): Promise<boolean>
   /**
-   * TODO
+   * Transfer all Attachment data to destination GROWI
+   * @param {TransferKey} tk Transfer key
    */
-  transferAttachments(): Promise<void>
+  transferAttachments(tk: TransferKey): Promise<void>
   /**
    * Start transfer data between GROWIs
    * @param {TransferKey} tk TransferKey object
@@ -78,6 +80,51 @@ interface Receiver {
   receive(zippedGROWIDataStream: Readable): Promise<void>
 }
 
+const generateAxiosRequestConfigWithTransferKey = (tk: TransferKey, additionalHeaders: {[key: string]: string} = {}) => {
+  const { appUrl, key } = tk;
+
+  return {
+    baseURL: appUrl.origin,
+    headers: {
+      ...additionalHeaders,
+      [X_GROWI_TRANSFER_KEY_HEADER_NAME]: key,
+    },
+  };
+};
+
+/**
+ * generate GROWIInfo
+ * @param crowi Crowi instance
+ * @returns
+ */
+const generateGROWIInfo = (crowi: any): IDataGROWIInfo => {
+  // TODO: add attachment file limit, storage total limit
+  const { configManager } = crowi;
+  const userUpperLimit = configManager.getConfig('crowi', 'security:userUpperLimit');
+  const version = crowi.version;
+  const attachmentInfo = {
+    type: configManager.getConfig('crowi', 'app:fileUploadType'),
+    bucket: undefined,
+    customEndpoint: undefined, // for S3
+    uploadNamespace: undefined, // for GCS
+  };
+
+  // put storage location info to check storage identification
+  switch (attachmentInfo.type) {
+    case 'aws':
+      attachmentInfo.bucket = configManager.getConfig('crowi', 'aws:s3Bucket');
+      attachmentInfo.customEndpoint = configManager.getConfig('crowi', 'aws:s3CustomEndpoint');
+      break;
+    case 'gcs':
+      attachmentInfo.bucket = configManager.getConfig('crowi', 'gcs:bucket');
+      attachmentInfo.uploadNamespace = configManager.getConfig('crowi', 'gcs:uploadNamespace');
+      break;
+    default:
+  }
+
+  return { userUpperLimit, version, attachmentInfo };
+};
+
 export class G2GTransferPusherService implements Pusher {
 
   crowi: any;
@@ -91,7 +138,7 @@ export class G2GTransferPusherService implements Pusher {
     // axios get
     let toGROWIInfo: IDataGROWIInfo;
     try {
-      const res = await axios.get('/_api/v3/g2g-transfer/growi-info', this.generateAxiosRequestConfig(tk));
+      const res = await axios.get('/_api/v3/g2g-transfer/growi-info', generateAxiosRequestConfigWithTransferKey(tk));
       toGROWIInfo = res.data.growiInfo;
     }
     catch (err) {
@@ -119,53 +166,48 @@ export class G2GTransferPusherService implements Pusher {
   }
 
   public async transferAttachments(tk: TransferKey): Promise<void> {
+    const BATCH_SIZE = 100;
+
     const { appUrl, key } = tk;
     const { fileUploadService } = this.crowi;
     const Attachment = this.crowi.model('Attachment');
 
-    // TODO: batch get
-    const attachments = await Attachment.find();
-    for await (const attachment of attachments) {
-      logger.debug(`processing attachment: ${attachment}`);
-      let fileStream;
-      try {
-        // get read stream of each attachment
-        fileStream = await fileUploadService.findDeliveryFile(attachment);
-      }
-      catch (err) {
-        logger.warn(`Error occured when getting Attachment(ID=${attachment.id}), skipping: `, err);
-        continue;
-      }
-      // TODO: get attachmentLists from destination GROWI to avoid transferring files that the dest GROWI has
-      // TODO: refresh transfer key per 1 hour
-      // post each attachment file data to receiver
-      try {
-        // Use FormData to immitate browser's form data object
-        const form = new FormData();
-
-        form.append('content', fileStream, attachment.fileName);
-        form.append('attachmentMetadata', JSON.stringify(attachment));
-        await rawAxios.post('/_api/v3/g2g-transfer/attachment', form, {
-          baseURL: appUrl.origin,
-          headers: {
-            ...form.getHeaders(), // This generates a unique boundary for multi part form data
-            [X_GROWI_TRANSFER_KEY_HEADER_NAME]: key,
-          },
-        });
-      }
-      catch (errs) {
-        logger.error(`Error occured when uploading attachment(ID=${attachment.id})`, errs);
-        if (!Array.isArray(errs)) {
-          // TODO: socker.emit(failed_to_transfer);
-          return;
+    // batch get
+    const attachmentsCursor = await Attachment.find().cursor();
+    const batchStream = createBatchStream(BATCH_SIZE);
+
+    for await (const attachmentBatch of attachmentsCursor.pipe(batchStream)) {
+      for await (const attachment of attachmentBatch) {
+        logger.debug(`processing attachment: ${attachment}`);
+        let fileStream;
+        try {
+          // get read stream of each attachment
+          fileStream = await fileUploadService.findDeliveryFile(attachment);
         }
+        catch (err) {
+          logger.warn(`Error occured when getting Attachment(ID=${attachment.id}), skipping: `, err);
+          continue;
+        }
+        // TODO: get attachmentLists from destination GROWI to avoid transferring files that the dest GROWI has
+        // TODO: refresh transfer key per 1 hour
+        // post each attachment file data to receiver
+        try {
+          this.doTransferAttachment(tk, attachment, fileStream);
+        }
+        catch (errs) {
+          logger.error(`Error occured when uploading attachment(ID=${attachment.id})`, errs);
+          if (!Array.isArray(errs)) {
+            // TODO: socker.emit(failed_to_transfer);
+            return;
+          }
 
-        const err = errs[0];
-        logger.error(err);
+          const err = errs[0];
+          logger.error(err);
 
 
-        // TODO: socker.emit(failed_to_transfer);
-        return;
+          // TODO: socker.emit(failed_to_transfer);
+          return;
+        }
       }
     }
   }
@@ -196,13 +238,7 @@ export class G2GTransferPusherService implements Pusher {
       form.append('collections', JSON.stringify(collections));
       form.append('optionsMap', JSON.stringify(optionsMap));
       form.append('operatorUserId', user._id.toString());
-      await rawAxios.post('/_api/v3/g2g-transfer/', form, {
-        baseURL: appUrl.origin,
-        headers: {
-          ...form.getHeaders(), // This generates a unique boundary for multi part form data
-          [X_GROWI_TRANSFER_KEY_HEADER_NAME]: key,
-        },
-      });
+      await rawAxios.post('/_api/v3/g2g-transfer/', form, generateAxiosRequestConfigWithTransferKey(tk, form.getHeaders()));
     }
     catch (errs) {
       logger.error(errs);
@@ -220,15 +256,19 @@ export class G2GTransferPusherService implements Pusher {
     }
   }
 
-  private generateAxiosRequestConfig(tk: TransferKey) {
-    const { appUrl, key } = tk;
+  /**
+   * transfer attachment to destination GROWI
+   * @param tk Transfer key
+   * @param attachment Attachment model instance
+   * @param fileStream Attachment data(loaded from storage)
+   */
+  private async doTransferAttachment(tk: TransferKey, attachment, fileStream: Readable) {
+    // Use FormData to immitate browser's form data object
+    const form = new FormData();
 
-    return {
-      baseURL: appUrl.origin,
-      headers: {
-        [X_GROWI_TRANSFER_KEY_HEADER_NAME]: key,
-      },
-    };
+    form.append('content', fileStream, attachment.fileName);
+    form.append('attachmentMetadata', JSON.stringify(attachment));
+    await rawAxios.post('/_api/v3/g2g-transfer/attachment', form, generateAxiosRequestConfigWithTransferKey(tk, form.getHeaders()));
   }
 
 }
@@ -249,31 +289,7 @@ export class G2GTransferReceiverService implements Receiver {
   }
 
   public async answerGROWIInfo(): Promise<IDataGROWIInfo> {
-    // TODO: add attachment file limit, storage total limit
-    const { configManager } = this.crowi;
-    const userUpperLimit = configManager.getConfig('crowi', 'security:userUpperLimit');
-    const version = this.crowi.version;
-    const attachmentInfo = {
-      type: configManager.getConfig('crowi', 'app:fileUploadType'),
-      bucket: undefined,
-      customEndpoint: undefined, // for S3
-      uploadNamespace: undefined, // for GCS
-    };
-
-    // put storage location info to check storage identification
-    switch (attachmentInfo.type) {
-      case 'aws':
-        attachmentInfo.bucket = configManager.getConfig('crowi', 'aws:s3Bucket');
-        attachmentInfo.customEndpoint = configManager.getConfig('crowi', 'aws:s3CustomEndpoint');
-        break;
-      case 'gcs':
-        attachmentInfo.bucket = configManager.getConfig('crowi', 'gcs:bucket');
-        attachmentInfo.uploadNamespace = configManager.getConfig('crowi', 'gcs:uploadNamespace');
-        break;
-      default:
-    }
-
-    return { userUpperLimit, version, attachmentInfo };
+    return generateGROWIInfo(this.crowi);
   }
 
   public async createTransferKey(appSiteUrl: URL): Promise<string> {