Просмотр исходного кода

implement export and upload process, and add integ test

Naoki427 5 месяцев назад
Родитель
Сommit
6239864eba

+ 2 - 1
apps/app/src/features/audit-log-bulk-export/interfaces/audit-log-bulk-export.ts

@@ -1,4 +1,4 @@
-import type { HasObjectId, IUser, Ref } from '@growi/core';
+import type { HasObjectId, IAttachment, IUser, Ref } from '@growi/core';
 import type { SupportedActionType } from '~/interfaces/activity';
 
 export const AuditLogBulkExportFormat = {
@@ -41,6 +41,7 @@ export interface IAuditLogBulkExportJob {
   totalExportedCount?: number; // total number of exported audit log entries
   createdAt?: Date;
   updatedAt?: Date;
+  attachment?: Ref<IAttachment>;
 }
 
 export interface IAuditLogBulkExportJobHasId

+ 1 - 0
apps/app/src/features/audit-log-bulk-export/server/models/audit-log-bulk-export-job.ts

@@ -43,6 +43,7 @@ const auditLogBulkExportJobSchema = new Schema<IAuditLogBulkExportJob>(
     completedAt: { type: Date },
     restartFlag: { type: Boolean, required: true, default: false },
     totalExportedCount: { type: Number, default: 0 },
+    attachment: { type: Schema.Types.ObjectId, ref: 'Attachment' },
   },
   { timestamps: true },
 );

+ 690 - 0
apps/app/src/features/audit-log-bulk-export/server/service/audit-log-bulk-export-job-cron/audit-log-bulk-export-job-cron-service.integ.ts

@@ -0,0 +1,690 @@
+import fs from 'node:fs';
+import path from 'node:path';
+import { PassThrough } from 'node:stream';
+import { pipeline } from 'node:stream/promises';
+import type { IUser } from '@growi/core';
+import mongoose from 'mongoose';
+import type { MockedFunction } from 'vitest';
+import {
+  afterAll,
+  afterEach,
+  beforeAll,
+  beforeEach,
+  describe,
+  expect,
+  it,
+  vi,
+} from 'vitest';
+import { SupportedAction } from '~/interfaces/activity';
+import type Crowi from '~/server/crowi';
+import { ResponseMode } from '~/server/interfaces/attachment';
+import Activity, { type ActivityDocument } from '~/server/models/activity';
+import type { IAttachmentDocument } from '~/server/models/attachment';
+import { Attachment } from '~/server/models/attachment';
+import { configManager } from '~/server/service/config-manager';
+import type { FileUploader } from '~/server/service/file-uploader/file-uploader';
+import { MultipartUploader } from '~/server/service/file-uploader/multipart-uploader';
+
+import {
+  AuditLogBulkExportFormat,
+  AuditLogBulkExportJobStatus,
+} from '../../../interfaces/audit-log-bulk-export';
+import AuditLogBulkExportJob from '../../models/audit-log-bulk-export-job';
+import {
+  AuditLogBulkExportJobExpiredError,
+  AuditLogBulkExportJobRestartedError,
+} from './errors';
+import instanciateAuditLogBulkExportJobCronService, {
+  auditLogBulkExportJobCronService,
+} from './index';
+
+type ExportedActivityData = Pick<
+  ActivityDocument,
+  '_id' | 'action' | 'user'
+> & {
+  createdAt: Date;
+};
+
+const userSchema = new mongoose.Schema(
+  {
+    name: { type: String },
+    username: { type: String, required: true, unique: true },
+    email: { type: String, unique: true, sparse: true },
+  },
+  {
+    timestamps: true,
+  },
+);
+const User = mongoose.model<IUser>('User', userSchema);
+
+class MockMultipartUploader extends MultipartUploader {
+  override get uploadId(): string {
+    return 'mock-upload-id';
+  }
+
+  override async initUpload(): Promise<void> {}
+  override async uploadPart(
+    _part: Buffer,
+    _partNumber: number,
+  ): Promise<void> {}
+  override async completeUpload(): Promise<void> {}
+  override async abortUpload(): Promise<void> {}
+  override async getUploadedFileSize(): Promise<number> {
+    return 0;
+  }
+}
+
+const mockFileUploadService: FileUploader = {
+  uploadAttachment: vi.fn(),
+  getIsUploadable: vi.fn(() => true),
+  isWritable: vi.fn(() => Promise.resolve(true)),
+  getIsReadable: vi.fn(() => true),
+  isValidUploadSettings: vi.fn(() => true),
+  getFileUploadEnabled: vi.fn(() => true),
+  listFiles: vi.fn(() => []),
+  saveFile: vi.fn(() => Promise.resolve()),
+  deleteFiles: vi.fn(),
+  getFileUploadTotalLimit: vi.fn(() => 1024 * 1024 * 1024),
+  getTotalFileSize: vi.fn(() => Promise.resolve(0)),
+  doCheckLimit: vi.fn(() => Promise.resolve({ isUploadable: true })),
+  determineResponseMode: vi.fn(() => ResponseMode.REDIRECT),
+  respond: vi.fn(),
+  findDeliveryFile: vi.fn(() => Promise.resolve(new PassThrough())),
+  generateTemporaryUrl: vi.fn(() =>
+    Promise.resolve({ url: 'mock-url', lifetimeSec: 3600 }),
+  ),
+  createMultipartUploader: vi.fn(
+    (uploadKey: string, maxPartSize: number) =>
+      new MockMultipartUploader(uploadKey, maxPartSize),
+  ),
+  abortPreviousMultipartUpload: vi.fn(() => Promise.resolve()),
+};
+
+const mockActivityService = {
+  createActivity: vi.fn(() => Promise.resolve({ _id: 'mock-activity-id' })),
+};
+
+const mockEventEmitter = {
+  emit: vi.fn(),
+};
+
+type MockCrowi = Pick<Crowi, 'fileUploadService'> & {
+  event: (eventName: string) => typeof mockEventEmitter;
+  activityService: typeof mockActivityService;
+};
+
+const createMockCrowi = (): MockCrowi => ({
+  fileUploadService: mockFileUploadService,
+  event: vi.fn(() => mockEventEmitter),
+  activityService: mockActivityService,
+});
+
+describe('AuditLogBulkExportJobCronService Integration Test', () => {
+  let cronService: NonNullable<typeof auditLogBulkExportJobCronService>;
+  let crowi: MockCrowi;
+  let testUser: IUser & mongoose.Document;
+  let testTmpDir: string;
+  let uploadAttachmentSpy: MockedFunction<
+    (
+      readable: NodeJS.ReadableStream,
+      attachment: IAttachmentDocument,
+    ) => Promise<void>
+  >;
+
+  const testActivities = [
+    {
+      action: SupportedAction.ACTION_PAGE_CREATE,
+      user: null,
+      createdAt: new Date('2023-01-01T10:00:00Z'),
+      snapshot: { username: 'testuser' },
+    },
+    {
+      action: SupportedAction.ACTION_PAGE_UPDATE,
+      user: null,
+      createdAt: new Date('2023-01-02T10:00:00Z'),
+      snapshot: { username: 'testuser' },
+    },
+    {
+      action: SupportedAction.ACTION_PAGE_DELETE,
+      user: null,
+      createdAt: new Date('2023-01-03T10:00:00Z'),
+      snapshot: { username: 'testuser' },
+    },
+    ...Array.from({ length: 50 }, (_, i) => {
+      const baseDate = new Date('2023-01-04T10:00:00Z');
+      const activityDate = new Date(baseDate.getTime() + i * 60000);
+      return {
+        action: SupportedAction.ACTION_PAGE_VIEW,
+        user: null,
+        createdAt: activityDate,
+        snapshot: { username: 'testuser' },
+      };
+    }),
+  ];
+
+  beforeAll(async () => {
+    await configManager.loadConfigs();
+
+    testUser = await User.create({
+      name: 'Test User for Audit Log Export',
+      username: 'auditlogexportcrontest',
+      email: 'auditlogexportcrontest@example.com',
+    });
+
+    testActivities.forEach((activity) => {
+      activity.user = testUser._id;
+    });
+  });
+
+  beforeEach(async () => {
+    crowi = createMockCrowi();
+    instanciateAuditLogBulkExportJobCronService(crowi as Crowi);
+    if (!auditLogBulkExportJobCronService) {
+      throw new Error('auditLogBulkExportJobCronService was not initialized');
+    }
+    cronService = auditLogBulkExportJobCronService;
+
+    testTmpDir = fs.mkdtempSync(path.join('/tmp', 'audit-log-export-test-'));
+    cronService.tmpOutputRootDir = testTmpDir;
+
+    cronService.maxLogsPerFile = 10;
+    cronService.pageBatchSize = 5;
+
+    uploadAttachmentSpy = vi
+      .fn()
+      .mockImplementation(
+        async (
+          readable: NodeJS.ReadableStream,
+          attachment: IAttachmentDocument,
+        ) => {
+          const passThrough = new PassThrough();
+          let totalSize = 0;
+
+          passThrough.on('data', (chunk) => {
+            totalSize += chunk.length;
+          });
+
+          await pipeline(readable, passThrough);
+
+          attachment.fileSize = totalSize;
+        },
+      );
+    mockFileUploadService.uploadAttachment = uploadAttachmentSpy;
+
+    await Activity.insertMany(testActivities);
+  });
+
+  afterEach(async () => {
+    await Activity.deleteMany({});
+    await AuditLogBulkExportJob.deleteMany({});
+    await Attachment.deleteMany({});
+
+    if (fs.existsSync(testTmpDir)) {
+      fs.rmSync(testTmpDir, { recursive: true, force: true });
+    }
+
+    vi.clearAllMocks();
+  });
+
+  afterAll(async () => {
+    await User.deleteOne({ _id: testUser._id });
+  });
+
+  describe('1. Basic Operations (Happy Path)', () => {
+    describe('1-1. No Filter → Export → ZIP → Upload', () => {
+      it('should export all activities, create JSON files, and upload ZIP', async () => {
+        const job = await AuditLogBulkExportJob.create({
+          user: testUser._id,
+          filters: {},
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.exporting,
+          filterHash: 'test-hash',
+          restartFlag: false,
+          totalExportedCount: 0,
+        });
+
+        await cronService.proceedBulkExportJob(job);
+        await new Promise((resolve) => setTimeout(resolve, 100));
+
+        const outputDir = cronService.getTmpOutputDir(job);
+        let hasFiles = false;
+        let jsonFiles: string[] = [];
+
+        if (fs.existsSync(outputDir)) {
+          const files = fs.readdirSync(outputDir);
+          jsonFiles = files.filter((file) => file.endsWith('.json'));
+          hasFiles = jsonFiles.length > 0;
+        }
+
+        if (hasFiles) {
+          expect(jsonFiles.length).toBeGreaterThan(0);
+
+          const firstFile = path.join(outputDir, jsonFiles[0]);
+          const content = JSON.parse(fs.readFileSync(firstFile, 'utf8'));
+          expect(Array.isArray(content)).toBe(true);
+          expect(content.length).toBeLessThanOrEqual(
+            cronService.maxLogsPerFile,
+          );
+        }
+
+        await cronService.proceedBulkExportJob(job);
+        await new Promise((resolve) => setTimeout(resolve, 100));
+
+        expect(uploadAttachmentSpy).toHaveBeenCalledTimes(1);
+        const [readable, attachment] = uploadAttachmentSpy.mock.calls[0];
+        expect(readable).toBeDefined();
+        expect(attachment.originalName).toMatch(/audit-logs-.*\.zip$/);
+
+        const updatedJob = await AuditLogBulkExportJob.findById(job._id);
+        expect([
+          AuditLogBulkExportJobStatus.uploading,
+          AuditLogBulkExportJobStatus.completed,
+        ]).toContain(updatedJob?.status);
+        expect(updatedJob?.totalExportedCount).toBeGreaterThan(0);
+      });
+    });
+
+    describe('1-2. With Filters (actions / dateFrom / dateTo / users)', () => {
+      it('should export only filtered activities', async () => {
+        const job = await AuditLogBulkExportJob.create({
+          user: testUser._id,
+          filters: {
+            actions: [
+              SupportedAction.ACTION_PAGE_CREATE,
+              SupportedAction.ACTION_PAGE_UPDATE,
+            ],
+            dateFrom: new Date('2023-01-01T00:00:00Z'),
+            dateTo: new Date('2023-01-02T23:59:59Z'),
+            users: [testUser._id.toString()],
+          },
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.exporting,
+          filterHash: 'filtered-hash',
+          restartFlag: false,
+          totalExportedCount: 0,
+        });
+
+        await cronService.proceedBulkExportJob(job);
+        await new Promise((resolve) => setTimeout(resolve, 100));
+
+        const outputDir = cronService.getTmpOutputDir(job);
+        const files = fs.readdirSync(outputDir);
+        const jsonFiles = files.filter((file) => file.endsWith('.json'));
+
+        if (jsonFiles.length > 0) {
+          const content = JSON.parse(
+            fs.readFileSync(path.join(outputDir, jsonFiles[0]), 'utf8'),
+          );
+
+          content.forEach((activity: ExportedActivityData) => {
+            expect([
+              SupportedAction.ACTION_PAGE_CREATE,
+              SupportedAction.ACTION_PAGE_UPDATE,
+            ]).toContain(activity.action);
+            expect(new Date(activity.createdAt)).toBeInstanceOf(Date);
+            expect(activity.user).toBe(testUser._id.toString());
+          });
+        }
+
+        const updatedJob = await AuditLogBulkExportJob.findById(job._id);
+        expect(updatedJob?.totalExportedCount).toBeLessThanOrEqual(2);
+      });
+    });
+
+    describe('1-3. Zero Results', () => {
+      it('should handle cases with no matching activities', async () => {
+        const job = await AuditLogBulkExportJob.create({
+          user: testUser._id,
+          filters: {
+            actions: [SupportedAction.ACTION_USER_LOGOUT],
+          },
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.exporting,
+          filterHash: 'no-match-hash',
+          restartFlag: false,
+          totalExportedCount: 0,
+        });
+
+        const notifySpy = vi.spyOn(cronService, 'notifyExportResultAndCleanUp');
+
+        await cronService.proceedBulkExportJob(job);
+        await new Promise((resolve) => setTimeout(resolve, 100));
+
+        const outputDir = cronService.getTmpOutputDir(job);
+        const files = fs.existsSync(outputDir) ? fs.readdirSync(outputDir) : [];
+        const jsonFiles = files.filter((file) => file.endsWith('.json'));
+
+        expect(jsonFiles.length).toBeLessThanOrEqual(1);
+
+        const updatedJob = await AuditLogBulkExportJob.findById(job._id);
+        expect(updatedJob?.totalExportedCount).toBe(0);
+
+        expect(notifySpy).toHaveBeenCalledWith(
+          SupportedAction.ACTION_AUDIT_LOG_BULK_EXPORT_NO_RESULTS,
+          expect.objectContaining({ _id: job._id }),
+        );
+      });
+    });
+  });
+
+  describe('2. Resumability', () => {
+    describe('2-1. Resume from lastExportedId', () => {
+      it('should resume export from the last exported ID without duplicates', async () => {
+        const activities = await Activity.find({}).sort({ _id: 1 });
+        const middleIndex = Math.floor(activities.length / 2);
+        const lastExportedId = activities[middleIndex]._id.toString();
+
+        const job = await AuditLogBulkExportJob.create({
+          user: testUser._id,
+          filters: {},
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.exporting,
+          filterHash: 'resume-hash',
+          restartFlag: false,
+          totalExportedCount: middleIndex,
+          lastExportedId: lastExportedId,
+        });
+
+        await cronService.proceedBulkExportJob(job);
+        await new Promise((resolve) => setTimeout(resolve, 100));
+
+        const outputDir = cronService.getTmpOutputDir(job);
+        const files = fs.readdirSync(outputDir);
+        const jsonFiles = files.filter((file) => file.endsWith('.json'));
+
+        if (jsonFiles.length > 0) {
+          const allExportedActivities: ExportedActivityData[] = [];
+
+          for (const file of jsonFiles) {
+            const content = JSON.parse(
+              fs.readFileSync(path.join(outputDir, file), 'utf8'),
+            );
+            allExportedActivities.push(...content);
+          }
+
+          allExportedActivities.forEach((activity) => {
+            expect(activity._id).not.toBe(lastExportedId);
+            expect(
+              new mongoose.Types.ObjectId(activity._id).getTimestamp(),
+            ).toBeInstanceOf(Date);
+          });
+        }
+
+        const updatedJob = await AuditLogBulkExportJob.findById(job._id);
+        expect(updatedJob?.totalExportedCount).toBeGreaterThan(middleIndex);
+      });
+    });
+
+    describe('2-2. totalExportedCount and lastExportedId Updates', () => {
+      it('should properly update totalExportedCount and lastExportedId', async () => {
+        const job = await AuditLogBulkExportJob.create({
+          user: testUser._id,
+          filters: {},
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.exporting,
+          filterHash: 'count-test-hash',
+          restartFlag: false,
+          totalExportedCount: 0,
+        });
+
+        const initialCount = job.totalExportedCount ?? 0;
+
+        await cronService.proceedBulkExportJob(job);
+        await new Promise((resolve) => setTimeout(resolve, 100));
+
+        const updatedJob = await AuditLogBulkExportJob.findById(job._id);
+        expect(updatedJob?.totalExportedCount).toBeGreaterThan(initialCount);
+        expect(updatedJob?.lastExportedId).toBeDefined();
+
+        const totalActivities = await Activity.countDocuments({});
+        expect(updatedJob?.totalExportedCount).toBeLessThanOrEqual(
+          totalActivities,
+        );
+      });
+    });
+  });
+
+  describe('3. Upload and Compression', () => {
+    describe('3-1. ZIP Content Validity', () => {
+      it('should create valid ZIP with JSON files in root', async () => {
+        const job = await AuditLogBulkExportJob.create({
+          user: testUser._id,
+          filters: {},
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.exporting,
+          filterHash: 'zip-test-hash',
+          restartFlag: false,
+          totalExportedCount: 0,
+        });
+
+        await cronService.proceedBulkExportJob(job);
+        await new Promise((resolve) => setTimeout(resolve, 100));
+
+        await cronService.proceedBulkExportJob(job);
+        await new Promise((resolve) => setTimeout(resolve, 100));
+
+        expect(uploadAttachmentSpy).toHaveBeenCalledTimes(1);
+        const [readable, attachment] = uploadAttachmentSpy.mock.calls[0];
+        expect(readable).toBeDefined();
+        expect(attachment.fileName).toMatch(/\.zip$/);
+      });
+    });
+
+    describe('3-2. Upload Failure Handling', () => {
+      it('should handle upload failures gracefully', async () => {
+        uploadAttachmentSpy.mockImplementationOnce(async (readable) => {
+          readable.on('error', () => {});
+          readable.resume();
+          throw new Error('Upload failed');
+        });
+
+        const job = await AuditLogBulkExportJob.create({
+          user: testUser._id,
+          filters: {},
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.uploading,
+          filterHash: 'upload-fail-hash',
+          restartFlag: false,
+          totalExportedCount: 10,
+        });
+
+        const notifySpy = vi.spyOn(cronService, 'notifyExportResultAndCleanUp');
+        const cleanSpy = vi.spyOn(cronService, 'cleanUpExportJobResources');
+        const handleSpy = vi.spyOn(cronService, 'handleError');
+
+        await expect(
+          cronService.proceedBulkExportJob(job),
+        ).resolves.toBeUndefined();
+
+        expect(uploadAttachmentSpy).toHaveBeenCalledTimes(1);
+        expect(handleSpy).toHaveBeenCalledTimes(1);
+        expect(notifySpy).toHaveBeenCalledWith(
+          expect.anything(),
+          expect.objectContaining({ _id: job._id }),
+        );
+        expect(cleanSpy).toHaveBeenCalledWith(
+          expect.objectContaining({ _id: job._id }),
+        );
+
+        const reloaded = await AuditLogBulkExportJob.findById(job._id).lean();
+        expect(reloaded?.status).toBe(AuditLogBulkExportJobStatus.failed);
+
+        const s = cronService.getStreamInExecution(job._id);
+        expect(s).toBeUndefined();
+      });
+    });
+  });
+
+  describe('4. Error Handling', () => {
+    describe('4-1. Nonexistent Users Filter', () => {
+      it('should throw error for nonexistent users', async () => {
+        const job = await AuditLogBulkExportJob.create({
+          user: testUser._id,
+          filters: {
+            users: [new mongoose.Types.ObjectId().toString()],
+          },
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.exporting,
+          filterHash: 'bad-user-hash',
+          restartFlag: false,
+          totalExportedCount: 0,
+        });
+
+        try {
+          await cronService.proceedBulkExportJob(job);
+          await new Promise((resolve) => setTimeout(resolve, 200));
+        } catch (_error) {}
+
+        const updatedJob = await AuditLogBulkExportJob.findById(job._id);
+        expect([
+          AuditLogBulkExportJobStatus.exporting,
+          AuditLogBulkExportJobStatus.failed,
+        ]).toContain(updatedJob?.status);
+      });
+    });
+
+    describe('4-2. Stream/FS Errors', () => {
+      it('should handle filesystem errors', async () => {
+        cronService.tmpOutputRootDir = '/invalid/path/that/does/not/exist';
+
+        const job = await AuditLogBulkExportJob.create({
+          user: testUser._id,
+          filters: {},
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.exporting,
+          filterHash: 'fs-error-hash',
+          restartFlag: false,
+          totalExportedCount: 0,
+        });
+
+        await expect(async () => {
+          await cronService.proceedBulkExportJob(job);
+          await new Promise((resolve) => setTimeout(resolve, 100));
+        }).not.toThrow();
+      });
+    });
+
+    describe('4-3. Job Expiry and Restart Errors', () => {
+      it('should handle AuditLogBulkExportJobExpiredError', async () => {
+        const job = await AuditLogBulkExportJob.create({
+          user: testUser._id,
+          filters: {},
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.exporting,
+          filterHash: 'expired-error-hash',
+          restartFlag: false,
+          totalExportedCount: 0,
+        });
+
+        const expiredError = new AuditLogBulkExportJobExpiredError();
+
+        await cronService.handleError(expiredError, job);
+
+        const updatedJob = await AuditLogBulkExportJob.findById(job._id);
+        expect(updatedJob?.status).toBe(AuditLogBulkExportJobStatus.failed);
+      });
+
+      it('should handle AuditLogBulkExportJobRestartedError', async () => {
+        const job = await AuditLogBulkExportJob.create({
+          user: testUser._id,
+          filters: {},
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.exporting,
+          filterHash: 'restarted-error-hash',
+          restartFlag: false,
+          totalExportedCount: 0,
+        });
+
+        const restartedError = new AuditLogBulkExportJobRestartedError();
+
+        await cronService.handleError(restartedError, job);
+      });
+    });
+  });
+
+  describe('5. State Transitions and Execution Control', () => {
+    describe('5-1. State Flow', () => {
+      it('should follow correct state transitions: exporting → uploading → completed', async () => {
+        const job = await AuditLogBulkExportJob.create({
+          user: testUser._id,
+          filters: {},
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.exporting,
+          filterHash: 'state-flow-hash',
+          restartFlag: false,
+          totalExportedCount: 0,
+        });
+
+        expect(job.status).toBe(AuditLogBulkExportJobStatus.exporting);
+
+        await cronService.proceedBulkExportJob(job);
+        await new Promise((resolve) => setTimeout(resolve, 100));
+
+        const afterExport = await AuditLogBulkExportJob.findById(job._id);
+        expect(afterExport?.status).toBe(AuditLogBulkExportJobStatus.uploading);
+
+        if (!afterExport) {
+          throw new Error('Job not found after export phase');
+        }
+
+        await cronService.proceedBulkExportJob(afterExport);
+        await new Promise((resolve) => setTimeout(resolve, 100));
+
+        await cronService.notifyExportResultAndCleanUp(
+          SupportedAction.ACTION_AUDIT_LOG_BULK_EXPORT_COMPLETED,
+          afterExport,
+        );
+
+        const finalJob = await AuditLogBulkExportJob.findById(job._id);
+        expect(finalJob?.status).toBe(AuditLogBulkExportJobStatus.completed);
+      });
+    });
+
+    describe('5-2. Stream Lifecycle', () => {
+      it('should properly manage stream execution lifecycle', async () => {
+        const job = await AuditLogBulkExportJob.create({
+          user: testUser._id,
+          filters: {},
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.exporting,
+          filterHash: 'stream-lifecycle-hash',
+          restartFlag: false,
+          totalExportedCount: 0,
+        });
+
+        await cronService.proceedBulkExportJob(job);
+        await new Promise((resolve) => setTimeout(resolve, 100));
+
+        await cronService.cleanUpExportJobResources(job);
+        const streamAfterCleanup = cronService.getStreamInExecution(job._id);
+        expect(streamAfterCleanup).toBeUndefined();
+      });
+    });
+
+    describe('5-3. Restart Flag Handling', () => {
+      it('should handle restartFlag correctly', async () => {
+        const job = await AuditLogBulkExportJob.create({
+          user: testUser._id,
+          filters: {},
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.exporting,
+          filterHash: 'restart-flag-hash',
+          restartFlag: true,
+          totalExportedCount: 50,
+          lastExportedId: 'some-previous-id',
+        });
+
+        await cronService.proceedBulkExportJob(job);
+        await new Promise((resolve) => setTimeout(resolve, 100));
+
+        const updatedJob = await AuditLogBulkExportJob.findById(job._id);
+        await new Promise((resolve) => setTimeout(resolve, 50));
+        
+        expect(updatedJob?.restartFlag).toBe(false);
+        expect(updatedJob?.totalExportedCount).toBe(0);
+        expect(updatedJob?.lastExportedId).toBeUndefined();
+        expect(updatedJob?.status).toBe(AuditLogBulkExportJobStatus.exporting);
+      });
+    });
+  });
+});

+ 11 - 0
apps/app/src/features/audit-log-bulk-export/server/service/audit-log-bulk-export-job-cron/errors.ts

@@ -0,0 +1,11 @@
+export class AuditLogBulkExportJobExpiredError extends Error {
+  constructor() {
+    super('Audit-log-bulk-export job has expired');
+  }
+}
+
+export class AuditLogBulkExportJobRestartedError extends Error {
+  constructor() {
+    super('Audit-log-bulk-export job has restarted');
+  }
+}

+ 136 - 11
apps/app/src/features/audit-log-bulk-export/server/service/audit-log-bulk-export-job-cron/index.ts

@@ -1,31 +1,56 @@
+import fs from 'node:fs';
+import path from 'node:path';
+import type { Readable } from 'node:stream';
 import type { IUser } from '@growi/core';
 import { getIdForRef, isPopulated } from '@growi/core';
+import type archiver from 'archiver';
 import mongoose from 'mongoose';
-
 import type { SupportedActionType } from '~/interfaces/activity';
 import { SupportedAction, SupportedTargetModel } from '~/interfaces/activity';
 import type Crowi from '~/server/crowi';
+import type { ObjectIdLike } from '~/server/interfaces/mongoose-utils';
 import CronService from '~/server/service/cron';
 import loggerFactory from '~/utils/logger';
-
 import {
   AuditLogBulkExportJobInProgressJobStatus,
   AuditLogBulkExportJobStatus,
 } from '../../../interfaces/audit-log-bulk-export';
 import type { AuditLogBulkExportJobDocument } from '../../models/audit-log-bulk-export-job';
 import AuditLogBulkExportJob from '../../models/audit-log-bulk-export-job';
+import {
+  AuditLogBulkExportJobExpiredError,
+  AuditLogBulkExportJobRestartedError,
+} from './errors';
 
 const logger = loggerFactory('growi:service:audit-log-export-job-cron');
 
 export interface IAuditLogBulkExportJobCronService {
   crowi: Crowi;
+  activityEvent: NodeJS.EventEmitter;
+  tmpOutputRootDir: string;
+  pageBatchSize: number;
+  maxLogsPerFile: number;
+  compressFormat: archiver.Format;
+  compressLevel: number;
   proceedBulkExportJob(
     auditLogBulkExportJob: AuditLogBulkExportJobDocument,
-  ): void;
+  ): Promise<void>;
+  getTmpOutputDir(auditLogBulkExportJob: AuditLogBulkExportJobDocument): string;
+  getStreamInExecution(jobId: ObjectIdLike): Readable | undefined;
+  setStreamInExecution(jobId: ObjectIdLike, stream: Readable): void;
+  removeStreamInExecution(jobId: ObjectIdLike): void;
   notifyExportResultAndCleanUp(
     action: SupportedActionType,
     auditLogBulkExportJob: AuditLogBulkExportJobDocument,
   ): Promise<void>;
+  handleError(
+    err: Error | null,
+    auditLogBulkExportJob: AuditLogBulkExportJobDocument,
+  ): Promise<void>;
+  cleanUpExportJobResources(
+    auditLogBulkExportJob: AuditLogBulkExportJobDocument,
+    restarted?: boolean,
+  ): Promise<void>;
 }
 
 import type { ActivityDocument } from '~/server/models/activity';
@@ -47,6 +72,18 @@ class AuditLogBulkExportJobCronService
 
   private parallelExecLimit: number;
 
+  tmpOutputRootDir = '/tmp/audit-log-bulk-export';
+
+  pageBatchSize = 100;
+
+  maxLogsPerFile = 50;
+
+  compressFormat: archiver.Format = 'zip';
+
+  compressLevel = 6;
+
+  private streamInExecutionMemo: { [key: string]: Readable } = {};
+
   constructor(crowi: Crowi) {
     super();
     this.crowi = crowi;
@@ -68,17 +105,24 @@ class AuditLogBulkExportJobCronService
     })
       .sort({ createdAt: 1 })
       .limit(this.parallelExecLimit);
-    await Promise.all(
-      auditLogBulkExportJobInProgress.map((job) =>
-        this.proceedBulkExportJob(job),
-      ),
-    );
+    auditLogBulkExportJobInProgress.forEach((auditLogBulkExportJob) => {
+      this.proceedBulkExportJob(auditLogBulkExportJob);
+    });
   }
 
   async proceedBulkExportJob(
     auditLogBulkExportJob: AuditLogBulkExportJobDocument,
   ) {
     try {
+      if (auditLogBulkExportJob.restartFlag) {
+        await this.cleanUpExportJobResources(auditLogBulkExportJob, true);
+        auditLogBulkExportJob.restartFlag = false;
+        auditLogBulkExportJob.status = AuditLogBulkExportJobStatus.exporting;
+        auditLogBulkExportJob.lastExportedId = undefined;
+        auditLogBulkExportJob.totalExportedCount = 0;
+        await auditLogBulkExportJob.save();
+        return;
+      }
       const User = mongoose.model<IUser>('User');
       const user = await User.findById(getIdForRef(auditLogBulkExportJob.user));
 
@@ -95,13 +139,42 @@ class AuditLogBulkExportJobCronService
       } else if (
         auditLogBulkExportJob.status === AuditLogBulkExportJobStatus.uploading
       ) {
-        await compressAndUpload.bind(this)(auditLogBulkExportJob);
+        await compressAndUpload.bind(this)(user, auditLogBulkExportJob);
       }
     } catch (err) {
       logger.error(err);
     }
   }
 
+  getTmpOutputDir(
+    auditLogBulkExportJob: AuditLogBulkExportJobDocument,
+  ): string {
+    const jobId = auditLogBulkExportJob._id.toString();
+    return path.join(this.tmpOutputRootDir, jobId);
+  }
+
+  /**
+   * Get the stream in execution for a job.
+   * A getter method that includes "undefined" in the return type
+   */
+  getStreamInExecution(jobId: ObjectIdLike): Readable | undefined {
+    return this.streamInExecutionMemo[jobId.toString()];
+  }
+
+  /**
+   * Set the stream in execution for a job
+   */
+  setStreamInExecution(jobId: ObjectIdLike, stream: Readable) {
+    this.streamInExecutionMemo[jobId.toString()] = stream;
+  }
+
+  /**
+   * Remove the stream in execution for a job
+   */
+  removeStreamInExecution(jobId: ObjectIdLike) {
+    delete this.streamInExecutionMemo[jobId.toString()];
+  }
+
   async notifyExportResultAndCleanUp(
     action: SupportedActionType,
     auditLogBulkExportJob: AuditLogBulkExportJobDocument,
@@ -117,8 +190,7 @@ class AuditLogBulkExportJobCronService
     } catch (err) {
       logger.error(err);
     }
-    // TODO: Implement cleanup process in a future task.
-    // The following method `cleanUpExportJobResources` will be called here once it's ready.
+    await this.cleanUpExportJobResources(auditLogBulkExportJob);
   }
 
   private async notifyExportResult(
@@ -154,6 +226,59 @@ class AuditLogBulkExportJobCronService
       preNotify,
     );
   }
+
+  async handleError(
+    err: Error | null,
+    auditLogBulkExportJob: AuditLogBulkExportJobDocument,
+  ) {
+    if (err == null) return;
+
+    if (err instanceof AuditLogBulkExportJobExpiredError) {
+      logger.error(err);
+      await this.notifyExportResultAndCleanUp(
+        SupportedAction.ACTION_AUDIT_LOG_BULK_EXPORT_JOB_EXPIRED,
+        auditLogBulkExportJob,
+      );
+    } else if (err instanceof AuditLogBulkExportJobRestartedError) {
+      logger.info(err.message);
+      await this.cleanUpExportJobResources(auditLogBulkExportJob);
+    } else {
+      logger.error(err);
+      await this.notifyExportResultAndCleanUp(
+        SupportedAction.ACTION_AUDIT_LOG_BULK_EXPORT_FAILED,
+        auditLogBulkExportJob,
+      );
+    }
+  }
+
+  async cleanUpExportJobResources(
+    auditLogBulkExportJob: AuditLogBulkExportJobDocument,
+    restarted = false,
+  ) {
+    const streamInExecution = this.getStreamInExecution(
+      auditLogBulkExportJob._id,
+    );
+    if (streamInExecution != null) {
+      if (restarted) {
+        streamInExecution.destroy(new AuditLogBulkExportJobRestartedError());
+      } else {
+        streamInExecution.destroy(new AuditLogBulkExportJobExpiredError());
+      }
+      this.removeStreamInExecution(auditLogBulkExportJob._id);
+    }
+
+    const promises = [
+      fs.promises.rm(this.getTmpOutputDir(auditLogBulkExportJob), {
+        recursive: true,
+        force: true,
+      }),
+    ];
+
+    const results = await Promise.allSettled(promises);
+    results.forEach((result) => {
+      if (result.status === 'rejected') logger.error(result.reason);
+    });
+  }
 }
 
 // eslint-disable-next-line import/no-mutable-exports

+ 80 - 6
apps/app/src/features/audit-log-bulk-export/server/service/audit-log-bulk-export-job-cron/steps/compress-and-upload.ts

@@ -1,19 +1,93 @@
+import type { IUser } from '@growi/core';
+import type { Archiver } from 'archiver';
+import archiver from 'archiver';
+import { AuditLogBulkExportJobStatus } from '~/features/audit-log-bulk-export/interfaces/audit-log-bulk-export';
 import { SupportedAction } from '~/interfaces/activity';
+import { AttachmentType } from '~/server/interfaces/attachment';
+import {
+  Attachment,
+  type IAttachmentDocument,
+} from '~/server/models/attachment';
+import type { FileUploader } from '~/server/service/file-uploader';
+import loggerFactory from '~/utils/logger';
 import type { AuditLogBulkExportJobDocument } from '../../../models/audit-log-bulk-export-job';
 import type { IAuditLogBulkExportJobCronService } from '..';
+
+const logger = loggerFactory(
+  'growi:service:audit-log-export-job-cron:compress-and-upload-async',
+);
+
+function setUpAuditLogArchiver(
+  this: IAuditLogBulkExportJobCronService,
+): Archiver {
+  const auditLogArchiver = archiver(this.compressFormat, {
+    zlib: { level: this.compressLevel },
+  });
+
+  // good practice to catch warnings (ie stat failures and other non-blocking errors)
+  auditLogArchiver.on('warning', (err) => {
+    if (err.code === 'ENOENT') logger.error(err);
+    else throw err;
+  });
+
+  return auditLogArchiver;
+}
+
+async function postProcess(
+  this: IAuditLogBulkExportJobCronService,
+  auditLogBulkExportJob: AuditLogBulkExportJobDocument,
+  attachment: IAttachmentDocument,
+  fileSize: number,
+): Promise<void> {
+  attachment.fileSize = fileSize;
+  await attachment.save();
+
+  auditLogBulkExportJob.completedAt = new Date();
+  auditLogBulkExportJob.attachment = attachment._id;
+  auditLogBulkExportJob.status = AuditLogBulkExportJobStatus.completed;
+  await auditLogBulkExportJob.save();
+
+  this.removeStreamInExecution(auditLogBulkExportJob._id);
+  await this.notifyExportResultAndCleanUp(
+    SupportedAction.ACTION_AUDIT_LOG_BULK_EXPORT_COMPLETED,
+    auditLogBulkExportJob,
+  );
+}
+
 /**
  * Execute a pipeline that reads the audit log files from the temporal fs directory,
  * compresses them into a zip file, and uploads to the cloud storage.
- *
- * TODO: Implement the actual compression and upload logic in a future task.
- * Currently, this function only notifies a successful export completion.
  */
 export async function compressAndUpload(
   this: IAuditLogBulkExportJobCronService,
+  user: IUser,
   job: AuditLogBulkExportJobDocument,
 ): Promise<void> {
-  await this.notifyExportResultAndCleanUp(
-    SupportedAction.ACTION_AUDIT_LOG_BULK_EXPORT_COMPLETED,
-    job,
+  const auditLogArchiver = setUpAuditLogArchiver.bind(this)();
+
+  if (job.filterHash == null) throw new Error('filterHash is not set');
+
+  const originalName = `audit-logs-${job.filterHash}.zip`;
+  const attachment = Attachment.createWithoutSave(
+    null,
+    user,
+    originalName,
+    this.compressFormat,
+    0,
+    AttachmentType.AUDIT_LOG_BULK_EXPORT,
   );
+  const fileUploadService: FileUploader = this.crowi.fileUploadService;
+
+  auditLogArchiver.directory(this.getTmpOutputDir(job), false);
+  auditLogArchiver.finalize();
+
+  this.setStreamInExecution(job._id, auditLogArchiver);
+  try {
+    await fileUploadService.uploadAttachment(auditLogArchiver, attachment);
+  } catch (e) {
+    logger.error(e);
+    await this.handleError(e as Error, job);
+    return;
+  }
+  await postProcess.bind(this)(job, attachment, auditLogArchiver.pointer());
 }

+ 132 - 5
apps/app/src/features/audit-log-bulk-export/server/service/audit-log-bulk-export-job-cron/steps/exportAuditLogsToFsAsync.ts

@@ -1,17 +1,144 @@
+import fs from 'node:fs';
+import path from 'node:path';
+import { pipeline, Writable } from 'node:stream';
+import type { IUser } from '@growi/core';
+import mongoose, { type FilterQuery } from 'mongoose';
 import { AuditLogBulkExportJobStatus } from '~/features/audit-log-bulk-export/interfaces/audit-log-bulk-export';
+import { SupportedAction } from '~/interfaces/activity';
+import Activity, { type ActivityDocument } from '~/server/models/activity';
 import type { AuditLogBulkExportJobDocument } from '../../../models/audit-log-bulk-export-job';
 import type { IAuditLogBulkExportJobCronService } from '..';
 
+/**
+ * Get a Writable that writes audit logs to JSON files
+ */
+function getAuditLogWritable(
+  this: IAuditLogBulkExportJobCronService,
+  job: AuditLogBulkExportJobDocument,
+): Writable {
+  const outputDir = this.getTmpOutputDir(job);
+  let buffer: ActivityDocument[] = [];
+  let fileIndex = 0;
+  return new Writable({
+    objectMode: true,
+    write: async (log: ActivityDocument, _encoding, callback) => {
+      try {
+        buffer.push(log);
+
+        // Update lastExportedId for resumability
+        job.lastExportedId = log._id.toString();
+        job.totalExportedCount = (job.totalExportedCount || 0) + 1;
+
+        if (buffer.length >= this.maxLogsPerFile) {
+          const filePath = path.join(
+            outputDir,
+            `audit-logs-${job._id.toString()}-${String(fileIndex).padStart(2, '0')}.json`,
+          );
+          await fs.promises.mkdir(path.dirname(filePath), { recursive: true });
+          await fs.promises.writeFile(
+            filePath,
+            JSON.stringify(buffer, null, 2),
+          );
+
+          await job.save();
+
+          buffer = [];
+          fileIndex++;
+        }
+      } catch (err) {
+        callback(err as Error);
+        return;
+      }
+      callback();
+    },
+    final: async (callback) => {
+      try {
+        if (buffer.length > 0) {
+          const filePath = path.join(
+            outputDir,
+            `audit-logs-${job._id.toString()}-${String(fileIndex).padStart(2, '0')}.json`,
+          );
+          await fs.promises.mkdir(path.dirname(filePath), { recursive: true });
+          await fs.promises.writeFile(
+            filePath,
+            JSON.stringify(buffer, null, 2),
+          );
+        }
+        job.status = AuditLogBulkExportJobStatus.uploading;
+        await job.save();
+      } catch (err) {
+        callback(err as Error);
+        return;
+      }
+      callback();
+    },
+  });
+}
+
 /**
  * Export audit logs to the file system before compressing and uploading.
- *
- * TODO: Implement the actual export logic in a later task.
- * For now, this function only updates the job status to `uploading`.
  */
 export async function exportAuditLogsToFsAsync(
   this: IAuditLogBulkExportJobCronService,
   job: AuditLogBulkExportJobDocument,
 ): Promise<void> {
-  job.status = AuditLogBulkExportJobStatus.uploading;
-  await job.save();
+  const filters = job.filters ?? {};
+  const query: FilterQuery<ActivityDocument> = {};
+
+  // Build query filters for searching activity logs based on user-defined filters
+  if (filters.actions && filters.actions.length > 0) {
+    query.action = { $in: filters.actions };
+  }
+  if (filters.dateFrom || filters.dateTo) {
+    query.createdAt = {};
+    if (filters.dateFrom) {
+      query.createdAt.$gte = new Date(filters.dateFrom);
+    }
+    if (filters.dateTo) {
+      query.createdAt.$lte = new Date(filters.dateTo);
+    }
+  }
+  if (filters.users && filters.users.length > 0) {
+    const User = mongoose.model<IUser>('User');
+    const userIds = await User.find({
+      _id: { $in: filters.users },
+    }).distinct('_id');
+    if (userIds.length === 0) {
+      throw new Error(
+        `No users found with userIDs: ${filters.users.join(', ')}`,
+      );
+    }
+    query.user = { $in: userIds };
+  }
+
+  // If the previous export was incomplete, resume from the last exported ID by adding it to the query filter
+  if (job.lastExportedId) {
+    query._id = { $gt: job.lastExportedId };
+  }
+
+  const hasAny = await Activity.exists(query);
+  if (!hasAny) {
+    job.totalExportedCount = 0;
+    job.status = AuditLogBulkExportJobStatus.completed;
+    await job.save();
+
+    await this.notifyExportResultAndCleanUp(
+      SupportedAction.ACTION_AUDIT_LOG_BULK_EXPORT_NO_RESULTS,
+      job,
+    );
+    return;
+  }
+
+  const logsCursor = Activity.find(query)
+    .sort({ _id: 1 })
+    .lean()
+    .cursor({ batchSize: this.pageBatchSize });
+
+  const writable = getAuditLogWritable.bind(this)(job);
+
+  this.setStreamInExecution(job._id, logsCursor);
+
+  pipeline(logsCursor, writable, (err) => {
+    this.handleError(err, job);
+  });
 }

+ 4 - 0
apps/app/src/interfaces/activity.ts

@@ -65,6 +65,8 @@ const ACTION_AUDIT_LOG_BULK_EXPORT_COMPLETED =
 const ACTION_AUDIT_LOG_BULK_EXPORT_FAILED = 'AUDIT_LOG_BULK_EXPORT_FAILED';
 const ACTION_AUDIT_LOG_BULK_EXPORT_JOB_EXPIRED =
   'AUDIT_LOG_BULK_EXPORT_JOB_EXPIRED';
+const ACTION_AUDIT_LOG_BULK_EXPORT_NO_RESULTS =
+  'ACTION_AUDIT_LOG_BULK_EXPORT_NO_RESULTS';
 const ACTION_TAG_UPDATE = 'TAG_UPDATE';
 const ACTION_IN_APP_NOTIFICATION_ALL_STATUSES_OPEN =
   'IN_APP_NOTIFICATION_ALL_STATUSES_OPEN';
@@ -383,6 +385,7 @@ export const SupportedAction = {
   ACTION_AUDIT_LOG_BULK_EXPORT_COMPLETED,
   ACTION_AUDIT_LOG_BULK_EXPORT_FAILED,
   ACTION_AUDIT_LOG_BULK_EXPORT_JOB_EXPIRED,
+  ACTION_AUDIT_LOG_BULK_EXPORT_NO_RESULTS,
 } as const;
 
 // Action required for notification
@@ -407,6 +410,7 @@ export const EssentialActionGroup = {
   ACTION_AUDIT_LOG_BULK_EXPORT_COMPLETED,
   ACTION_AUDIT_LOG_BULK_EXPORT_FAILED,
   ACTION_AUDIT_LOG_BULK_EXPORT_JOB_EXPIRED,
+  ACTION_AUDIT_LOG_BULK_EXPORT_NO_RESULTS,
 } as const;
 
 export const ActionGroupSize = {

+ 2 - 0
apps/app/src/server/interfaces/attachment.ts

@@ -3,6 +3,7 @@ export const AttachmentType = {
   WIKI_PAGE: 'WIKI_PAGE',
   PROFILE_IMAGE: 'PROFILE_IMAGE',
   PAGE_BULK_EXPORT: 'PAGE_BULK_EXPORT',
+  AUDIT_LOG_BULK_EXPORT: 'AUDIT_LOG_BULK_EXPORT',
 } as const;
 
 export type AttachmentType = typeof AttachmentType[keyof typeof AttachmentType];
@@ -35,4 +36,5 @@ export const FilePathOnStoragePrefix = {
   attachment: 'attachment',
   user: 'user',
   pageBulkExport: 'page-bulk-export',
+  auditLogBulkExport: 'audit-log-bulk-export',
 } as const;