Naoki427 4 месяцев назад
Родитель
Сommit
ed47acee00

+ 96 - 32
apps/app/src/features/audit-log-bulk-export/server/service/audit-log-bulk-export-job-cron/audit-log-bulk-export-job-cron-service.integ.ts

@@ -29,7 +29,9 @@ import {
   AuditLogBulkExportFormat,
   AuditLogBulkExportJobStatus,
 } from '../../../interfaces/audit-log-bulk-export';
-import AuditLogBulkExportJob from '../../models/audit-log-bulk-export-job';
+import AuditLogBulkExportJob, {
+  type AuditLogBulkExportJobDocument,
+} from '../../models/audit-log-bulk-export-job';
 import {
   AuditLogBulkExportJobExpiredError,
   AuditLogBulkExportJobRestartedError,
@@ -57,6 +59,43 @@ const userSchema = new mongoose.Schema(
 );
 const User = mongoose.model<IUser>('User', userSchema);
 
+async function waitForCondition(
+  condition: () => boolean | Promise<boolean>,
+  {
+    timeoutMs = 2000,
+    intervalMs = 50,
+  }: { timeoutMs?: number; intervalMs?: number } = {},
+): Promise<void> {
+  const start = Date.now();
+
+  while (true) {
+    if (await condition()) return;
+
+    if (Date.now() - start > timeoutMs) {
+      throw new Error('waitForCondition: timeout exceeded');
+    }
+
+    await new Promise((resolve) => setTimeout(resolve, intervalMs));
+  }
+}
+
+async function waitForJobStatus(
+  jobId: mongoose.Types.ObjectId,
+  status: AuditLogBulkExportJobStatus,
+): Promise<AuditLogBulkExportJobDocument> {
+  let latest: AuditLogBulkExportJobDocument | null = null;
+
+  await waitForCondition(async () => {
+    latest = await AuditLogBulkExportJob.findById(jobId);
+    return latest?.status === status;
+  });
+
+  if (!latest) {
+    throw new Error('Job not found after waitForCondition succeeded');
+  }
+  return latest;
+}
+
 class MockMultipartUploader extends MultipartUploader {
   override get uploadId(): string {
     return 'mock-upload-id';
@@ -244,9 +283,12 @@ describe('AuditLogBulkExportJobCronService Integration Test', () => {
         });
 
         await cronService.proceedBulkExportJob(job);
-        await new Promise((resolve) => setTimeout(resolve, 100));
+        const afterExport = await waitForJobStatus(
+          job._id,
+          AuditLogBulkExportJobStatus.uploading,
+        );
 
-        const outputDir = cronService.getTmpOutputDir(job);
+        const outputDir = cronService.getTmpOutputDir(afterExport);
         let hasFiles = false;
         let jsonFiles: string[] = [];
 
@@ -267,8 +309,8 @@ describe('AuditLogBulkExportJobCronService Integration Test', () => {
           );
         }
 
-        await cronService.proceedBulkExportJob(job);
-        await new Promise((resolve) => setTimeout(resolve, 100));
+        await cronService.proceedBulkExportJob(afterExport);
+        await waitForCondition(() => uploadAttachmentSpy.mock.calls.length > 0);
 
         expect(uploadAttachmentSpy).toHaveBeenCalledTimes(1);
         const [readable, attachment] = uploadAttachmentSpy.mock.calls[0];
@@ -305,9 +347,12 @@ describe('AuditLogBulkExportJobCronService Integration Test', () => {
         });
 
         await cronService.proceedBulkExportJob(job);
-        await new Promise((resolve) => setTimeout(resolve, 100));
+        const afterExport = await waitForJobStatus(
+          job._id,
+          AuditLogBulkExportJobStatus.uploading,
+        );
 
-        const outputDir = cronService.getTmpOutputDir(job);
+        const outputDir = cronService.getTmpOutputDir(afterExport);
         const files = fs.readdirSync(outputDir);
         const jsonFiles = files.filter((file) => file.endsWith('.json'));
 
@@ -348,16 +393,23 @@ describe('AuditLogBulkExportJobCronService Integration Test', () => {
         const notifySpy = vi.spyOn(cronService, 'notifyExportResultAndCleanUp');
 
         await cronService.proceedBulkExportJob(job);
-        await new Promise((resolve) => setTimeout(resolve, 100));
+        await waitForCondition(async () => {
+          const updatedJob = await AuditLogBulkExportJob.findById(job._id);
+          return updatedJob?.status !== AuditLogBulkExportJobStatus.exporting;
+        });
 
-        const outputDir = cronService.getTmpOutputDir(job);
+        const afterExport = await AuditLogBulkExportJob.findById(job._id);
+        if (!afterExport) {
+          throw new Error('Job not found after export phase');
+        }
+
+        const outputDir = cronService.getTmpOutputDir(afterExport);
         const files = fs.existsSync(outputDir) ? fs.readdirSync(outputDir) : [];
         const jsonFiles = files.filter((file) => file.endsWith('.json'));
 
         expect(jsonFiles.length).toBeLessThanOrEqual(1);
 
-        const updatedJob = await AuditLogBulkExportJob.findById(job._id);
-        expect(updatedJob?.totalExportedCount).toBe(0);
+        expect(afterExport.totalExportedCount).toBe(0);
 
         expect(notifySpy).toHaveBeenCalledWith(
           SupportedAction.ACTION_AUDIT_LOG_BULK_EXPORT_NO_RESULTS,
@@ -386,9 +438,12 @@ describe('AuditLogBulkExportJobCronService Integration Test', () => {
         });
 
         await cronService.proceedBulkExportJob(job);
-        await new Promise((resolve) => setTimeout(resolve, 100));
+        const afterExport = await waitForJobStatus(
+          job._id,
+          AuditLogBulkExportJobStatus.uploading,
+        );
 
-        const outputDir = cronService.getTmpOutputDir(job);
+        const outputDir = cronService.getTmpOutputDir(afterExport);
         const files = fs.readdirSync(outputDir);
         const jsonFiles = files.filter((file) => file.endsWith('.json'));
 
@@ -430,9 +485,10 @@ describe('AuditLogBulkExportJobCronService Integration Test', () => {
         const initialCount = job.totalExportedCount ?? 0;
 
         await cronService.proceedBulkExportJob(job);
-        await new Promise((resolve) => setTimeout(resolve, 100));
-
-        const updatedJob = await AuditLogBulkExportJob.findById(job._id);
+        const updatedJob = await waitForJobStatus(
+          job._id,
+          AuditLogBulkExportJobStatus.uploading,
+        );
         expect(updatedJob?.totalExportedCount).toBeGreaterThan(initialCount);
         expect(updatedJob?.lastExportedId).toBeDefined();
 
@@ -458,10 +514,13 @@ describe('AuditLogBulkExportJobCronService Integration Test', () => {
         });
 
         await cronService.proceedBulkExportJob(job);
-        await new Promise((resolve) => setTimeout(resolve, 100));
+        const afterExport = await waitForJobStatus(
+          job._id,
+          AuditLogBulkExportJobStatus.uploading,
+        );
 
-        await cronService.proceedBulkExportJob(job);
-        await new Promise((resolve) => setTimeout(resolve, 100));
+        await cronService.proceedBulkExportJob(afterExport);
+        await waitForCondition(() => uploadAttachmentSpy.mock.calls.length > 0);
 
         expect(uploadAttachmentSpy).toHaveBeenCalledTimes(1);
         const [readable, attachment] = uploadAttachmentSpy.mock.calls[0];
@@ -532,7 +591,10 @@ describe('AuditLogBulkExportJobCronService Integration Test', () => {
 
         try {
           await cronService.proceedBulkExportJob(job);
-          await new Promise((resolve) => setTimeout(resolve, 200));
+          await waitForCondition(async () => {
+            const updatedJob = await AuditLogBulkExportJob.findById(job._id);
+            return updatedJob?.status === AuditLogBulkExportJobStatus.failed;
+          });
         } catch (_error) {}
 
         const updatedJob = await AuditLogBulkExportJob.findById(job._id);
@@ -559,7 +621,6 @@ describe('AuditLogBulkExportJobCronService Integration Test', () => {
 
         await expect(async () => {
           await cronService.proceedBulkExportJob(job);
-          await new Promise((resolve) => setTimeout(resolve, 100));
         }).not.toThrow();
       });
     });
@@ -618,17 +679,15 @@ describe('AuditLogBulkExportJobCronService Integration Test', () => {
         expect(job.status).toBe(AuditLogBulkExportJobStatus.exporting);
 
         await cronService.proceedBulkExportJob(job);
-        await new Promise((resolve) => setTimeout(resolve, 100));
+        const afterExport = await waitForJobStatus(
+          job._id,
+          AuditLogBulkExportJobStatus.uploading,
+        );
 
-        const afterExport = await AuditLogBulkExportJob.findById(job._id);
         expect(afterExport?.status).toBe(AuditLogBulkExportJobStatus.uploading);
 
-        if (!afterExport) {
-          throw new Error('Job not found after export phase');
-        }
-
         await cronService.proceedBulkExportJob(afterExport);
-        await new Promise((resolve) => setTimeout(resolve, 100));
+        await waitForCondition(() => uploadAttachmentSpy.mock.calls.length > 0);
 
         await cronService.notifyExportResultAndCleanUp(
           SupportedAction.ACTION_AUDIT_LOG_BULK_EXPORT_COMPLETED,
@@ -653,9 +712,12 @@ describe('AuditLogBulkExportJobCronService Integration Test', () => {
         });
 
         await cronService.proceedBulkExportJob(job);
-        await new Promise((resolve) => setTimeout(resolve, 100));
+        const afterExport = await waitForJobStatus(
+          job._id,
+          AuditLogBulkExportJobStatus.uploading,
+        );
 
-        await cronService.cleanUpExportJobResources(job);
+        await cronService.cleanUpExportJobResources(afterExport);
         const streamAfterCleanup = cronService.getStreamInExecution(job._id);
         expect(streamAfterCleanup).toBeUndefined();
       });
@@ -675,10 +737,12 @@ describe('AuditLogBulkExportJobCronService Integration Test', () => {
         });
 
         await cronService.proceedBulkExportJob(job);
-        await new Promise((resolve) => setTimeout(resolve, 100));
+        await waitForCondition(async () => {
+          const updatedJob = await AuditLogBulkExportJob.findById(job._id);
+          return updatedJob?.restartFlag === false;
+        });
 
         const updatedJob = await AuditLogBulkExportJob.findById(job._id);
-        await new Promise((resolve) => setTimeout(resolve, 50));
 
         expect(updatedJob?.restartFlag).toBe(false);
         expect(updatedJob?.totalExportedCount).toBe(0);

+ 5 - 3
apps/app/src/features/audit-log-bulk-export/server/service/audit-log-bulk-export-job-cron/index.ts

@@ -105,9 +105,11 @@ class AuditLogBulkExportJobCronService
     })
       .sort({ createdAt: 1 })
       .limit(this.parallelExecLimit);
-    auditLogBulkExportJobInProgress.forEach((auditLogBulkExportJob) => {
-      this.proceedBulkExportJob(auditLogBulkExportJob);
-    });
+    await Promise.all(
+      auditLogBulkExportJobInProgress.map((auditLogBulkExportJob) =>
+        this.proceedBulkExportJob(auditLogBulkExportJob),
+      ),
+    );
   }
 
   async proceedBulkExportJob(

+ 12 - 3
apps/app/src/features/audit-log-bulk-export/server/service/audit-log-bulk-export-job-cron/steps/compress-and-upload.ts

@@ -26,8 +26,11 @@ function setUpAuditLogArchiver(
 
   // good practice to catch warnings (ie stat failures and other non-blocking errors)
   auditLogArchiver.on('warning', (err) => {
-    if (err.code === 'ENOENT') logger.error(err);
-    else throw err;
+    if (err.code === 'ENOENT') {
+      logger.error(err);
+    } else {
+      auditLogArchiver.emit('error', err);
+    }
   });
 
   return auditLogArchiver;
@@ -86,7 +89,13 @@ export async function compressAndUpload(
     await fileUploadService.uploadAttachment(auditLogArchiver, attachment);
   } catch (e) {
     logger.error(e);
-    await this.handleError(e as Error, job);
+    try {
+      await this.handleError(e as Error, job);
+    } catch (handleErrorErr) {
+      logger.error('Error in handleError:', handleErrorErr);
+    }
+    job.status = AuditLogBulkExportJobStatus.failed;
+    await job.save();
     return;
   }
   await postProcess.bind(this)(job, attachment, auditLogArchiver.pointer());

+ 2 - 0
apps/app/src/features/audit-log-bulk-export/server/service/audit-log-bulk-export-job-cron/steps/exportAuditLogsToFsAsync.ts

@@ -120,6 +120,7 @@ export async function exportAuditLogsToFsAsync(
   if (!hasAny) {
     job.totalExportedCount = 0;
     job.status = AuditLogBulkExportJobStatus.completed;
+    job.lastExportedId = undefined;
     await job.save();
 
     await this.notifyExportResultAndCleanUp(
@@ -130,6 +131,7 @@ export async function exportAuditLogsToFsAsync(
   }
 
   const logsCursor = Activity.find(query)
+
     .sort({ _id: 1 })
     .lean()
     .cursor({ batchSize: this.pageBatchSize });