Просмотр исходного кода

Merge pull request #10510 from growilabs/feat/94790-172036-add-audit-log-bulk-export-clean-up-cron-service

feat: add audit log bulk export clean up cron service
Yuki Takei 2 месяцев назад
Родитель
Сommit
b435d393a4
14 измененных файлов с 1538 добавлено и 19 удалено
  1. 3 1
      apps/app/src/features/audit-log-bulk-export/interfaces/audit-log-bulk-export.ts
  2. 2 0
      apps/app/src/features/audit-log-bulk-export/server/models/audit-log-bulk-export-job.ts
  3. 1 0
      apps/app/src/features/audit-log-bulk-export/server/routes/apiv3/audit-log-bulk-export.ts
  4. 234 0
      apps/app/src/features/audit-log-bulk-export/server/service/audit-log-bulk-export-job-clean-up-cron.integ.ts
  5. 155 0
      apps/app/src/features/audit-log-bulk-export/server/service/audit-log-bulk-export-job-clean-up-cron.ts
  6. 756 0
      apps/app/src/features/audit-log-bulk-export/server/service/audit-log-bulk-export-job-cron/audit-log-bulk-export-job-cron-service.integ.ts
  7. 11 0
      apps/app/src/features/audit-log-bulk-export/server/service/audit-log-bulk-export-job-cron/errors.ts
  8. 136 6
      apps/app/src/features/audit-log-bulk-export/server/service/audit-log-bulk-export-job-cron/index.ts
  9. 91 6
      apps/app/src/features/audit-log-bulk-export/server/service/audit-log-bulk-export-job-cron/steps/compress-and-upload.ts
  10. 136 5
      apps/app/src/features/audit-log-bulk-export/server/service/audit-log-bulk-export-job-cron/steps/exportAuditLogsToFsAsync.ts
  11. 1 1
      apps/app/src/features/audit-log-bulk-export/server/service/audit-log-bulk-export.integ.ts
  12. 4 0
      apps/app/src/interfaces/activity.ts
  13. 6 0
      apps/app/src/server/crowi/index.js
  14. 2 0
      apps/app/src/server/interfaces/attachment.ts

+ 3 - 1
apps/app/src/features/audit-log-bulk-export/interfaces/audit-log-bulk-export.ts

@@ -1,4 +1,5 @@
-import type { HasObjectId, IUser, Ref } from '@growi/core';
+import type { HasObjectId, IAttachment, IUser, Ref } from '@growi/core';
+
 import type { SupportedActionType } from '~/interfaces/activity';
 
 export const AuditLogBulkExportFormat = {
@@ -41,6 +42,7 @@ export interface IAuditLogBulkExportJob {
   totalExportedCount?: number; // total number of exported audit log entries
   createdAt?: Date;
   updatedAt?: Date;
+  attachment?: Ref<IAttachment>;
 }
 
 export interface IAuditLogBulkExportJobHasId

+ 2 - 0
apps/app/src/features/audit-log-bulk-export/server/models/audit-log-bulk-export-job.ts

@@ -1,5 +1,6 @@
 import type { HydratedDocument } from 'mongoose';
 import { type Model, Schema } from 'mongoose';
+
 import { AllSupportedActions } from '~/interfaces/activity';
 import { getOrCreateModel } from '~/server/util/mongoose-utils';
 
@@ -43,6 +44,7 @@ const auditLogBulkExportJobSchema = new Schema<IAuditLogBulkExportJob>(
     completedAt: { type: Date },
     restartFlag: { type: Boolean, required: true, default: false },
     totalExportedCount: { type: Number, default: 0 },
+    attachment: { type: Schema.Types.ObjectId, ref: 'Attachment' },
   },
   { timestamps: true },
 );

+ 1 - 0
apps/app/src/features/audit-log-bulk-export/server/routes/apiv3/audit-log-bulk-export.ts

@@ -4,6 +4,7 @@ import { ErrorV3 } from '@growi/core/dist/models';
 import type { Request } from 'express';
 import { Router } from 'express';
 import { body } from 'express-validator';
+
 import { AuditLogBulkExportFormat } from '~/features/audit-log-bulk-export/interfaces/audit-log-bulk-export';
 import type { SupportedActionType } from '~/interfaces/activity';
 import { AllSupportedActions } from '~/interfaces/activity';

+ 234 - 0
apps/app/src/features/audit-log-bulk-export/server/service/audit-log-bulk-export-job-clean-up-cron.integ.ts

@@ -0,0 +1,234 @@
+import type { IUser } from '@growi/core';
+import mongoose from 'mongoose';
+
+import type Crowi from '~/server/crowi';
+import { configManager } from '~/server/service/config-manager';
+
+import {
+  AuditLogBulkExportFormat,
+  AuditLogBulkExportJobStatus,
+} from '../../interfaces/audit-log-bulk-export';
+import AuditLogBulkExportJob from '../models/audit-log-bulk-export-job';
+import instantiateAuditLogBulkExportJobCleanUpCronService, {
+  auditLogBulkExportJobCleanUpCronService,
+} from './audit-log-bulk-export-job-clean-up-cron';
+
+const userSchema = new mongoose.Schema(
+  {
+    name: { type: String },
+    username: { type: String, required: true, unique: true },
+    email: { type: String, unique: true, sparse: true },
+  },
+  {
+    timestamps: true,
+  },
+);
+const User = mongoose.model<IUser>('User', userSchema);
+
+vi.mock('./audit-log-bulk-export-job-cron', () => {
+  return {
+    auditLogBulkExportJobCronService: {
+      cleanUpExportJobResources: vi.fn(() => Promise.resolve()),
+      notifyExportResultAndCleanUp: vi.fn(() => Promise.resolve()),
+    },
+  };
+});
+
+describe('AuditLogBulkExportJobCleanUpCronService', () => {
+  const crowi = {} as Crowi;
+  let user: IUser;
+
+  beforeAll(async () => {
+    await configManager.loadConfigs();
+    user = await User.create({
+      name: 'Example for AuditLogBulkExportJobCleanUpCronService Test',
+      username: 'audit log bulk export job cleanup cron test user',
+      email: 'auditLogBulkExportCleanUpCronTestUser@example.com',
+    });
+    instantiateAuditLogBulkExportJobCleanUpCronService(crowi);
+  });
+
+  beforeEach(async () => {
+    await AuditLogBulkExportJob.deleteMany();
+  });
+
+  describe('deleteExpiredExportJobs', () => {
+    const jobId1 = new mongoose.Types.ObjectId();
+    const jobId2 = new mongoose.Types.ObjectId();
+    const jobId3 = new mongoose.Types.ObjectId();
+    const jobId4 = new mongoose.Types.ObjectId();
+    beforeEach(async () => {
+      await configManager.updateConfig(
+        'app:bulkExportJobExpirationSeconds',
+        86400,
+      );
+
+      await AuditLogBulkExportJob.insertMany([
+        {
+          _id: jobId1,
+          user,
+          filters: {},
+          filterHash: 'hash1',
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.exporting,
+          restartFlag: false,
+          createdAt: new Date(Date.now()),
+        },
+        {
+          _id: jobId2,
+          user,
+          filters: {},
+          filterHash: 'hash2',
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.exporting,
+          restartFlag: false,
+          createdAt: new Date(Date.now() - 86400 * 1000 - 1),
+        },
+        {
+          _id: jobId3,
+          user,
+          filters: {},
+          filterHash: 'hash3',
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.uploading,
+          restartFlag: false,
+          createdAt: new Date(Date.now() - 86400 * 1000 - 2),
+        },
+        {
+          _id: jobId4,
+          user,
+          filters: {},
+          filterHash: 'hash4',
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.failed,
+          restartFlag: false,
+        },
+      ]);
+    });
+
+    test('should delete expired jobs', async () => {
+      expect(await AuditLogBulkExportJob.find()).toHaveLength(4);
+
+      await auditLogBulkExportJobCleanUpCronService?.deleteExpiredExportJobs();
+      const jobs = await AuditLogBulkExportJob.find();
+
+      expect(jobs).toHaveLength(2);
+      expect(jobs.map((job) => job._id).sort()).toStrictEqual(
+        [jobId1, jobId4].sort(),
+      );
+    });
+  });
+
+  describe('deleteDownloadExpiredExportJobs', () => {
+    const jobId1 = new mongoose.Types.ObjectId();
+    const jobId2 = new mongoose.Types.ObjectId();
+    const jobId3 = new mongoose.Types.ObjectId();
+    const jobId4 = new mongoose.Types.ObjectId();
+    beforeEach(async () => {
+      await configManager.updateConfig(
+        'app:bulkExportDownloadExpirationSeconds',
+        86400,
+      );
+
+      await AuditLogBulkExportJob.insertMany([
+        {
+          _id: jobId1,
+          user,
+          filters: {},
+          filterHash: 'hash1',
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.completed,
+          restartFlag: false,
+          completedAt: new Date(Date.now()),
+        },
+        {
+          _id: jobId2,
+          user,
+          filters: {},
+          filterHash: 'hash2',
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.completed,
+          restartFlag: false,
+          completedAt: new Date(Date.now() - 86400 * 1000 - 1),
+        },
+        {
+          _id: jobId3,
+          user,
+          filters: {},
+          filterHash: 'hash3',
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.exporting,
+          restartFlag: false,
+        },
+        {
+          _id: jobId4,
+          user,
+          filters: {},
+          filterHash: 'hash4',
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.failed,
+          restartFlag: false,
+        },
+      ]);
+    });
+
+    test('should delete download expired jobs', async () => {
+      expect(await AuditLogBulkExportJob.find()).toHaveLength(4);
+
+      await auditLogBulkExportJobCleanUpCronService?.deleteDownloadExpiredExportJobs();
+      const jobs = await AuditLogBulkExportJob.find();
+
+      expect(jobs).toHaveLength(3);
+      expect(jobs.map((job) => job._id).sort()).toStrictEqual(
+        [jobId1, jobId3, jobId4].sort(),
+      );
+    });
+  });
+
+  describe('deleteFailedExportJobs', () => {
+    const jobId1 = new mongoose.Types.ObjectId();
+    const jobId2 = new mongoose.Types.ObjectId();
+    const jobId3 = new mongoose.Types.ObjectId();
+    beforeEach(async () => {
+      await AuditLogBulkExportJob.insertMany([
+        {
+          _id: jobId1,
+          user,
+          filters: {},
+          filterHash: 'hash1',
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.failed,
+          restartFlag: false,
+        },
+        {
+          _id: jobId2,
+          user,
+          filters: {},
+          filterHash: 'hash2',
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.exporting,
+          restartFlag: false,
+        },
+        {
+          _id: jobId3,
+          user,
+          filters: {},
+          filterHash: 'hash3',
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.failed,
+          restartFlag: false,
+        },
+      ]);
+    });
+
+    test('should delete failed export jobs', async () => {
+      expect(await AuditLogBulkExportJob.find()).toHaveLength(3);
+
+      await auditLogBulkExportJobCleanUpCronService?.deleteFailedExportJobs();
+      const jobs = await AuditLogBulkExportJob.find();
+
+      expect(jobs).toHaveLength(1);
+      expect(jobs.map((job) => job._id)).toStrictEqual([jobId2]);
+    });
+  });
+});

+ 155 - 0
apps/app/src/features/audit-log-bulk-export/server/service/audit-log-bulk-export-job-clean-up-cron.ts

@@ -0,0 +1,155 @@
+import type { HydratedDocument } from 'mongoose';
+
+import type Crowi from '~/server/crowi';
+import { configManager } from '~/server/service/config-manager';
+import CronService from '~/server/service/cron';
+import loggerFactory from '~/utils/logger';
+
+import {
+  AuditLogBulkExportJobInProgressJobStatus,
+  AuditLogBulkExportJobStatus,
+} from '../../interfaces/audit-log-bulk-export';
+import type { AuditLogBulkExportJobDocument } from '../models/audit-log-bulk-export-job';
+import AuditLogBulkExportJob from '../models/audit-log-bulk-export-job';
+import { auditLogBulkExportJobCronService } from './audit-log-bulk-export-job-cron';
+
+const logger = loggerFactory(
+  'growi:service:audit-log-bulk-export-job-clean-up-cron',
+);
+
+/**
+ * Manages cronjob which deletes unnecessary audit log bulk export jobs
+ */
+class AuditLogBulkExportJobCleanUpCronService extends CronService {
+  crowi: Crowi;
+
+  constructor(crowi: Crowi) {
+    super();
+    this.crowi = crowi;
+  }
+
+  override getCronSchedule(): string {
+    return '0 */6 * * *';
+  }
+
+  override async executeJob(): Promise<void> {
+    await this.deleteExpiredExportJobs();
+    await this.deleteDownloadExpiredExportJobs();
+    await this.deleteFailedExportJobs();
+  }
+
+  /**
+   * Delete audit log bulk export jobs which are on-going and has passed the limit time for execution
+   */
+  async deleteExpiredExportJobs() {
+    const exportJobExpirationSeconds = configManager.getConfig(
+      'app:bulkExportJobExpirationSeconds',
+    );
+
+    const thresholdDate = new Date(
+      Date.now() - exportJobExpirationSeconds * 1000,
+    );
+
+    const expiredExportJobs = await AuditLogBulkExportJob.find({
+      $or: Object.values(AuditLogBulkExportJobInProgressJobStatus).map(
+        (status) => ({
+          status,
+        }),
+      ),
+      createdAt: {
+        $lt: thresholdDate,
+      },
+    });
+
+    if (auditLogBulkExportJobCronService != null) {
+      await this.cleanUpAndDeleteBulkExportJobs(
+        expiredExportJobs,
+        auditLogBulkExportJobCronService.cleanUpExportJobResources.bind(
+          auditLogBulkExportJobCronService,
+        ),
+      );
+    }
+  }
+
+  /**
+   * Delete audit log bulk export jobs which have completed but the due time for downloading has passed
+   */
+  async deleteDownloadExpiredExportJobs() {
+    const downloadExpirationSeconds = configManager.getConfig(
+      'app:bulkExportDownloadExpirationSeconds',
+    );
+    const thresholdDate = new Date(
+      Date.now() - downloadExpirationSeconds * 1000,
+    );
+
+    const downloadExpiredExportJobs = await AuditLogBulkExportJob.find({
+      status: AuditLogBulkExportJobStatus.completed,
+      completedAt: { $lt: thresholdDate },
+    });
+
+    const cleanUp = async (job: AuditLogBulkExportJobDocument) => {
+      await auditLogBulkExportJobCronService?.cleanUpExportJobResources(job);
+
+      const hasSameAttachmentAndDownloadNotExpired =
+        await AuditLogBulkExportJob.findOne({
+          attachment: job.attachment,
+          _id: { $ne: job._id },
+          completedAt: { $gte: thresholdDate },
+        });
+      if (hasSameAttachmentAndDownloadNotExpired == null) {
+        await this.crowi.attachmentService?.removeAttachment(job.attachment);
+      }
+    };
+
+    await this.cleanUpAndDeleteBulkExportJobs(
+      downloadExpiredExportJobs,
+      cleanUp,
+    );
+  }
+
+  /**
+   * Delete audit log bulk export jobs which have failed
+   */
+  async deleteFailedExportJobs() {
+    const failedExportJobs = await AuditLogBulkExportJob.find({
+      status: AuditLogBulkExportJobStatus.failed,
+    });
+
+    if (auditLogBulkExportJobCronService != null) {
+      await this.cleanUpAndDeleteBulkExportJobs(
+        failedExportJobs,
+        auditLogBulkExportJobCronService.cleanUpExportJobResources.bind(
+          auditLogBulkExportJobCronService,
+        ),
+      );
+    }
+  }
+
+  async cleanUpAndDeleteBulkExportJobs(
+    auditLogBulkExportJobs: HydratedDocument<AuditLogBulkExportJobDocument>[],
+    cleanUp: (job: AuditLogBulkExportJobDocument) => Promise<void>,
+  ): Promise<void> {
+    const results = await Promise.allSettled(
+      auditLogBulkExportJobs.map((job) => cleanUp(job)),
+    );
+    results.forEach((result) => {
+      if (result.status === 'rejected') logger.error(result.reason);
+    });
+
+    const cleanedUpJobs = auditLogBulkExportJobs.filter(
+      (_, index) => results[index].status === 'fulfilled',
+    );
+    if (cleanedUpJobs.length > 0) {
+      const cleanedUpJobIds = cleanedUpJobs.map((job) => job._id);
+      await AuditLogBulkExportJob.deleteMany({ _id: { $in: cleanedUpJobIds } });
+    }
+  }
+}
+
+export let auditLogBulkExportJobCleanUpCronService:
+  | AuditLogBulkExportJobCleanUpCronService
+  | undefined;
+export default function instantiate(crowi: Crowi): void {
+  auditLogBulkExportJobCleanUpCronService =
+    new AuditLogBulkExportJobCleanUpCronService(crowi);
+}

+ 756 - 0
apps/app/src/features/audit-log-bulk-export/server/service/audit-log-bulk-export-job-cron/audit-log-bulk-export-job-cron-service.integ.ts

@@ -0,0 +1,756 @@
+import fs from 'node:fs';
+import path from 'node:path';
+import { PassThrough } from 'node:stream';
+import { pipeline } from 'node:stream/promises';
+import type { IUser } from '@growi/core';
+import mongoose from 'mongoose';
+import type { MockedFunction } from 'vitest';
+import {
+  afterAll,
+  afterEach,
+  beforeAll,
+  beforeEach,
+  describe,
+  expect,
+  it,
+  vi,
+} from 'vitest';
+
+import { SupportedAction } from '~/interfaces/activity';
+import type Crowi from '~/server/crowi';
+import { ResponseMode } from '~/server/interfaces/attachment';
+import Activity, { type ActivityDocument } from '~/server/models/activity';
+import type { IAttachmentDocument } from '~/server/models/attachment';
+import { Attachment } from '~/server/models/attachment';
+import { configManager } from '~/server/service/config-manager';
+import type { FileUploader } from '~/server/service/file-uploader/file-uploader';
+import { MultipartUploader } from '~/server/service/file-uploader/multipart-uploader';
+
+import {
+  AuditLogBulkExportFormat,
+  AuditLogBulkExportJobStatus,
+} from '../../../interfaces/audit-log-bulk-export';
+import AuditLogBulkExportJob, {
+  type AuditLogBulkExportJobDocument,
+} from '../../models/audit-log-bulk-export-job';
+import {
+  AuditLogBulkExportJobExpiredError,
+  AuditLogBulkExportJobRestartedError,
+} from './errors';
+import instanciateAuditLogBulkExportJobCronService, {
+  auditLogBulkExportJobCronService,
+} from './index';
+
+type ExportedActivityData = Pick<
+  ActivityDocument,
+  '_id' | 'action' | 'user'
+> & {
+  createdAt: Date;
+};
+
+const userSchema = new mongoose.Schema(
+  {
+    name: { type: String },
+    username: { type: String, required: true, unique: true },
+    email: { type: String, unique: true, sparse: true },
+  },
+  {
+    timestamps: true,
+  },
+);
+const User = mongoose.model<IUser>('User', userSchema);
+
+async function waitForCondition(
+  condition: () => boolean | Promise<boolean>,
+  {
+    timeoutMs = 2000,
+    intervalMs = 50,
+  }: { timeoutMs?: number; intervalMs?: number } = {},
+): Promise<void> {
+  const start = Date.now();
+
+  while (true) {
+    if (await condition()) return;
+
+    if (Date.now() - start > timeoutMs) {
+      throw new Error('waitForCondition: timeout exceeded');
+    }
+
+    await new Promise((resolve) => setTimeout(resolve, intervalMs));
+  }
+}
+
+async function waitForJobStatus(
+  jobId: mongoose.Types.ObjectId,
+  status: AuditLogBulkExportJobStatus,
+): Promise<AuditLogBulkExportJobDocument> {
+  let latest: AuditLogBulkExportJobDocument | null = null;
+
+  await waitForCondition(async () => {
+    latest = await AuditLogBulkExportJob.findById(jobId);
+    return latest?.status === status;
+  });
+
+  if (!latest) {
+    throw new Error('Job not found after waitForCondition succeeded');
+  }
+  return latest;
+}
+
+class MockMultipartUploader extends MultipartUploader {
+  override get uploadId(): string {
+    return 'mock-upload-id';
+  }
+
+  override async initUpload(): Promise<void> {}
+  override async uploadPart(
+    _part: Buffer,
+    _partNumber: number,
+  ): Promise<void> {}
+  override async completeUpload(): Promise<void> {}
+  override async abortUpload(): Promise<void> {}
+  override async getUploadedFileSize(): Promise<number> {
+    return 0;
+  }
+}
+
+const mockFileUploadService: FileUploader = {
+  uploadAttachment: vi.fn(),
+  getIsUploadable: vi.fn(() => true),
+  isWritable: vi.fn(() => Promise.resolve(true)),
+  getIsReadable: vi.fn(() => true),
+  isValidUploadSettings: vi.fn(() => true),
+  getFileUploadEnabled: vi.fn(() => true),
+  listFiles: vi.fn(() => []),
+  saveFile: vi.fn(() => Promise.resolve()),
+  deleteFile: vi.fn(),
+  deleteFiles: vi.fn(),
+  getFileUploadTotalLimit: vi.fn(() => 1024 * 1024 * 1024),
+  getTotalFileSize: vi.fn(() => Promise.resolve(0)),
+  checkLimit: vi.fn(() => Promise.resolve({ isUploadable: true })),
+  determineResponseMode: vi.fn(() => ResponseMode.REDIRECT),
+  respond: vi.fn(),
+  findDeliveryFile: vi.fn(() => Promise.resolve(new PassThrough())),
+  generateTemporaryUrl: vi.fn(() =>
+    Promise.resolve({ url: 'mock-url', lifetimeSec: 3600 }),
+  ),
+  createMultipartUploader: vi.fn(
+    (uploadKey: string, maxPartSize: number) =>
+      new MockMultipartUploader(uploadKey, maxPartSize),
+  ),
+  abortPreviousMultipartUpload: vi.fn(() => Promise.resolve()),
+};
+
+const mockActivityService = {
+  createActivity: vi.fn(() => Promise.resolve({ _id: 'mock-activity-id' })),
+};
+
+const mockEventEmitter = {
+  emit: vi.fn(),
+};
+
+type MockCrowi = Pick<Crowi, 'fileUploadService'> & {
+  event: (eventName: string) => typeof mockEventEmitter;
+  activityService: typeof mockActivityService;
+};
+
+const createMockCrowi = (): MockCrowi => ({
+  fileUploadService: mockFileUploadService,
+  event: vi.fn(() => mockEventEmitter),
+  activityService: mockActivityService,
+});
+
+describe('AuditLogBulkExportJobCronService Integration Test', () => {
+  let cronService: NonNullable<typeof auditLogBulkExportJobCronService>;
+  let crowi: MockCrowi;
+  let testUser: IUser & mongoose.Document;
+  let testTmpDir: string;
+  let uploadAttachmentSpy: MockedFunction<
+    (
+      readable: NodeJS.ReadableStream,
+      attachment: IAttachmentDocument,
+    ) => Promise<void>
+  >;
+
+  const testActivities = [
+    {
+      action: SupportedAction.ACTION_PAGE_CREATE,
+      user: null,
+      createdAt: new Date('2023-01-01T10:00:00Z'),
+      snapshot: { username: 'testuser' },
+    },
+    {
+      action: SupportedAction.ACTION_PAGE_UPDATE,
+      user: null,
+      createdAt: new Date('2023-01-02T10:00:00Z'),
+      snapshot: { username: 'testuser' },
+    },
+    {
+      action: SupportedAction.ACTION_PAGE_DELETE,
+      user: null,
+      createdAt: new Date('2023-01-03T10:00:00Z'),
+      snapshot: { username: 'testuser' },
+    },
+    ...Array.from({ length: 50 }, (_, i) => {
+      const baseDate = new Date('2023-01-04T10:00:00Z');
+      const activityDate = new Date(baseDate.getTime() + i * 60000);
+      return {
+        action: SupportedAction.ACTION_PAGE_VIEW,
+        user: null,
+        createdAt: activityDate,
+        snapshot: { username: 'testuser' },
+      };
+    }),
+  ];
+
+  beforeAll(async () => {
+    await configManager.loadConfigs();
+
+    testUser = await User.create({
+      name: 'Test User for Audit Log Export',
+      username: 'auditlogexportcrontest',
+      email: 'auditlogexportcrontest@example.com',
+    });
+
+    testActivities.forEach((activity) => {
+      activity.user = testUser._id;
+    });
+  });
+
+  beforeEach(async () => {
+    crowi = createMockCrowi();
+    instanciateAuditLogBulkExportJobCronService(crowi as Crowi);
+    if (!auditLogBulkExportJobCronService) {
+      throw new Error('auditLogBulkExportJobCronService was not initialized');
+    }
+    cronService = auditLogBulkExportJobCronService;
+
+    testTmpDir = fs.mkdtempSync(path.join('/tmp', 'audit-log-export-test-'));
+    cronService.tmpOutputRootDir = testTmpDir;
+
+    cronService.maxLogsPerFile = 10;
+    cronService.pageBatchSize = 5;
+
+    uploadAttachmentSpy = vi
+      .fn()
+      .mockImplementation(
+        async (
+          readable: NodeJS.ReadableStream,
+          attachment: IAttachmentDocument,
+        ) => {
+          const passThrough = new PassThrough();
+          let totalSize = 0;
+
+          passThrough.on('data', (chunk) => {
+            totalSize += chunk.length;
+          });
+
+          await pipeline(readable, passThrough);
+
+          attachment.fileSize = totalSize;
+        },
+      );
+    mockFileUploadService.uploadAttachment = uploadAttachmentSpy;
+
+    await Activity.insertMany(testActivities);
+  });
+
+  afterEach(async () => {
+    await Activity.deleteMany({});
+    await AuditLogBulkExportJob.deleteMany({});
+    await Attachment.deleteMany({});
+
+    if (fs.existsSync(testTmpDir)) {
+      fs.rmSync(testTmpDir, { recursive: true, force: true });
+    }
+
+    vi.clearAllMocks();
+  });
+
+  afterAll(async () => {
+    await User.deleteOne({ _id: testUser._id });
+  });
+
+  describe('1. Basic Operations (Happy Path)', () => {
+    describe('1-1. No Filter → Export → ZIP → Upload', () => {
+      it('should export all activities, create JSON files, and upload ZIP', async () => {
+        const job = await AuditLogBulkExportJob.create({
+          user: testUser._id,
+          filters: {},
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.exporting,
+          filterHash: 'test-hash',
+          restartFlag: false,
+          totalExportedCount: 0,
+        });
+
+        await cronService.proceedBulkExportJob(job);
+        const afterExport = await waitForJobStatus(
+          job._id,
+          AuditLogBulkExportJobStatus.uploading,
+        );
+
+        const outputDir = cronService.getTmpOutputDir(afterExport);
+        let hasFiles = false;
+        let jsonFiles: string[] = [];
+
+        if (fs.existsSync(outputDir)) {
+          const files = fs.readdirSync(outputDir);
+          jsonFiles = files.filter((file) => file.endsWith('.json'));
+          hasFiles = jsonFiles.length > 0;
+        }
+
+        if (hasFiles) {
+          expect(jsonFiles.length).toBeGreaterThan(0);
+
+          const firstFile = path.join(outputDir, jsonFiles[0]);
+          const content = JSON.parse(fs.readFileSync(firstFile, 'utf8'));
+          expect(Array.isArray(content)).toBe(true);
+          expect(content.length).toBeLessThanOrEqual(
+            cronService.maxLogsPerFile,
+          );
+        }
+
+        await cronService.proceedBulkExportJob(afterExport);
+        await waitForCondition(() => uploadAttachmentSpy.mock.calls.length > 0);
+
+        expect(uploadAttachmentSpy).toHaveBeenCalledTimes(1);
+        const [readable, attachment] = uploadAttachmentSpy.mock.calls[0];
+        expect(readable).toBeDefined();
+        expect(attachment.originalName).toMatch(/audit-logs-.*\.zip$/);
+
+        const updatedJob = await AuditLogBulkExportJob.findById(job._id);
+        expect([
+          AuditLogBulkExportJobStatus.uploading,
+          AuditLogBulkExportJobStatus.completed,
+        ]).toContain(updatedJob?.status);
+        expect(updatedJob?.totalExportedCount).toBeGreaterThan(0);
+      });
+    });
+
+    describe('1-2. With Filters (actions / dateFrom / dateTo / users)', () => {
+      it('should export only filtered activities', async () => {
+        const job = await AuditLogBulkExportJob.create({
+          user: testUser._id,
+          filters: {
+            actions: [
+              SupportedAction.ACTION_PAGE_CREATE,
+              SupportedAction.ACTION_PAGE_UPDATE,
+            ],
+            dateFrom: new Date('2023-01-01T00:00:00Z'),
+            dateTo: new Date('2023-01-02T23:59:59Z'),
+            users: [testUser._id.toString()],
+          },
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.exporting,
+          filterHash: 'filtered-hash',
+          restartFlag: false,
+          totalExportedCount: 0,
+        });
+
+        await cronService.proceedBulkExportJob(job);
+        const afterExport = await waitForJobStatus(
+          job._id,
+          AuditLogBulkExportJobStatus.uploading,
+        );
+
+        const outputDir = cronService.getTmpOutputDir(afterExport);
+        const files = fs.readdirSync(outputDir);
+        const jsonFiles = files.filter((file) => file.endsWith('.json'));
+
+        if (jsonFiles.length > 0) {
+          const content = JSON.parse(
+            fs.readFileSync(path.join(outputDir, jsonFiles[0]), 'utf8'),
+          );
+
+          content.forEach((activity: ExportedActivityData) => {
+            expect([
+              SupportedAction.ACTION_PAGE_CREATE,
+              SupportedAction.ACTION_PAGE_UPDATE,
+            ]).toContain(activity.action);
+            expect(new Date(activity.createdAt)).toBeInstanceOf(Date);
+            expect(activity.user).toBe(testUser._id.toString());
+          });
+        }
+
+        const updatedJob = await AuditLogBulkExportJob.findById(job._id);
+        expect(updatedJob?.totalExportedCount).toBeLessThanOrEqual(2);
+      });
+    });
+
+    describe('1-3. Zero Results', () => {
+      it('should handle cases with no matching activities', async () => {
+        const job = await AuditLogBulkExportJob.create({
+          user: testUser._id,
+          filters: {
+            actions: [SupportedAction.ACTION_USER_LOGOUT],
+          },
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.exporting,
+          filterHash: 'no-match-hash',
+          restartFlag: false,
+          totalExportedCount: 0,
+        });
+
+        const notifySpy = vi.spyOn(cronService, 'notifyExportResultAndCleanUp');
+
+        await cronService.proceedBulkExportJob(job);
+        await waitForCondition(async () => {
+          const updatedJob = await AuditLogBulkExportJob.findById(job._id);
+          return updatedJob?.status !== AuditLogBulkExportJobStatus.exporting;
+        });
+
+        const afterExport = await AuditLogBulkExportJob.findById(job._id);
+        if (!afterExport) {
+          throw new Error('Job not found after export phase');
+        }
+
+        const outputDir = cronService.getTmpOutputDir(afterExport);
+        const files = fs.existsSync(outputDir) ? fs.readdirSync(outputDir) : [];
+        const jsonFiles = files.filter((file) => file.endsWith('.json'));
+
+        expect(jsonFiles.length).toBeLessThanOrEqual(1);
+
+        expect(afterExport.totalExportedCount).toBe(0);
+
+        expect(notifySpy).toHaveBeenCalledWith(
+          SupportedAction.ACTION_AUDIT_LOG_BULK_EXPORT_NO_RESULTS,
+          expect.objectContaining({ _id: job._id }),
+        );
+      });
+    });
+  });
+
+  describe('2. Resumability', () => {
+    describe('2-1. Resume from lastExportedId', () => {
+      it('should resume export from the last exported ID without duplicates', async () => {
+        const activities = await Activity.find({}).sort({ _id: 1 });
+        const middleIndex = Math.floor(activities.length / 2);
+        const lastExportedId = activities[middleIndex]._id.toString();
+
+        const job = await AuditLogBulkExportJob.create({
+          user: testUser._id,
+          filters: {},
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.exporting,
+          filterHash: 'resume-hash',
+          restartFlag: false,
+          totalExportedCount: middleIndex,
+          lastExportedId: lastExportedId,
+        });
+
+        await cronService.proceedBulkExportJob(job);
+        const afterExport = await waitForJobStatus(
+          job._id,
+          AuditLogBulkExportJobStatus.uploading,
+        );
+
+        const outputDir = cronService.getTmpOutputDir(afterExport);
+        const files = fs.readdirSync(outputDir);
+        const jsonFiles = files.filter((file) => file.endsWith('.json'));
+
+        if (jsonFiles.length > 0) {
+          const allExportedActivities: ExportedActivityData[] = [];
+
+          for (const file of jsonFiles) {
+            const content = JSON.parse(
+              fs.readFileSync(path.join(outputDir, file), 'utf8'),
+            );
+            allExportedActivities.push(...content);
+          }
+
+          allExportedActivities.forEach((activity) => {
+            expect(activity._id).not.toBe(lastExportedId);
+            expect(
+              new mongoose.Types.ObjectId(activity._id).getTimestamp(),
+            ).toBeInstanceOf(Date);
+          });
+        }
+
+        const updatedJob = await AuditLogBulkExportJob.findById(job._id);
+        expect(updatedJob?.totalExportedCount).toBeGreaterThan(middleIndex);
+      });
+    });
+
+    describe('2-2. totalExportedCount and lastExportedId Updates', () => {
+      it('should properly update totalExportedCount and lastExportedId', async () => {
+        const job = await AuditLogBulkExportJob.create({
+          user: testUser._id,
+          filters: {},
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.exporting,
+          filterHash: 'count-test-hash',
+          restartFlag: false,
+          totalExportedCount: 0,
+        });
+
+        const initialCount = job.totalExportedCount ?? 0;
+
+        await cronService.proceedBulkExportJob(job);
+        const updatedJob = await waitForJobStatus(
+          job._id,
+          AuditLogBulkExportJobStatus.uploading,
+        );
+        expect(updatedJob?.totalExportedCount).toBeGreaterThan(initialCount);
+        expect(updatedJob?.lastExportedId).toBeDefined();
+
+        const totalActivities = await Activity.countDocuments({});
+        expect(updatedJob?.totalExportedCount).toBeLessThanOrEqual(
+          totalActivities,
+        );
+      });
+    });
+  });
+
+  describe('3. Upload and Compression', () => {
+    describe('3-1. ZIP Content Validity', () => {
+      it('should create valid ZIP with JSON files in root', async () => {
+        const job = await AuditLogBulkExportJob.create({
+          user: testUser._id,
+          filters: {},
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.exporting,
+          filterHash: 'zip-test-hash',
+          restartFlag: false,
+          totalExportedCount: 0,
+        });
+
+        await cronService.proceedBulkExportJob(job);
+        const afterExport = await waitForJobStatus(
+          job._id,
+          AuditLogBulkExportJobStatus.uploading,
+        );
+
+        await cronService.proceedBulkExportJob(afterExport);
+        await waitForCondition(() => uploadAttachmentSpy.mock.calls.length > 0);
+
+        expect(uploadAttachmentSpy).toHaveBeenCalledTimes(1);
+        const [readable, attachment] = uploadAttachmentSpy.mock.calls[0];
+        expect(readable).toBeDefined();
+        expect(attachment.fileName).toMatch(/\.zip$/);
+      });
+    });
+
+    describe('3-2. Upload Failure Handling', () => {
+      it('should handle upload failures gracefully', async () => {
+        uploadAttachmentSpy.mockImplementationOnce(async (readable) => {
+          readable.on('error', () => {});
+          readable.resume();
+          throw new Error('Upload failed');
+        });
+
+        const job = await AuditLogBulkExportJob.create({
+          user: testUser._id,
+          filters: {},
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.uploading,
+          filterHash: 'upload-fail-hash',
+          restartFlag: false,
+          totalExportedCount: 10,
+        });
+
+        const notifySpy = vi.spyOn(cronService, 'notifyExportResultAndCleanUp');
+        const cleanSpy = vi.spyOn(cronService, 'cleanUpExportJobResources');
+        const handleSpy = vi.spyOn(cronService, 'handleError');
+
+        await expect(
+          cronService.proceedBulkExportJob(job),
+        ).resolves.toBeUndefined();
+
+        expect(uploadAttachmentSpy).toHaveBeenCalledTimes(1);
+        expect(handleSpy).toHaveBeenCalledTimes(1);
+        expect(notifySpy).toHaveBeenCalledWith(
+          expect.anything(),
+          expect.objectContaining({ _id: job._id }),
+        );
+        expect(cleanSpy).toHaveBeenCalledWith(
+          expect.objectContaining({ _id: job._id }),
+        );
+
+        const reloaded = await AuditLogBulkExportJob.findById(job._id).lean();
+        expect(reloaded?.status).toBe(AuditLogBulkExportJobStatus.failed);
+
+        const s = cronService.getStreamInExecution(job._id);
+        expect(s).toBeUndefined();
+      });
+    });
+  });
+
+  describe('4. Error Handling', () => {
+    describe('4-1. Nonexistent Users Filter', () => {
+      it('should throw error for nonexistent users', async () => {
+        const job = await AuditLogBulkExportJob.create({
+          user: testUser._id,
+          filters: {
+            users: [new mongoose.Types.ObjectId().toString()],
+          },
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.exporting,
+          filterHash: 'bad-user-hash',
+          restartFlag: false,
+          totalExportedCount: 0,
+        });
+
+        try {
+          await cronService.proceedBulkExportJob(job);
+          await waitForCondition(async () => {
+            const updatedJob = await AuditLogBulkExportJob.findById(job._id);
+            return updatedJob?.status === AuditLogBulkExportJobStatus.failed;
+          });
+        } catch (_error) {}
+
+        const updatedJob = await AuditLogBulkExportJob.findById(job._id);
+        expect([
+          AuditLogBulkExportJobStatus.exporting,
+          AuditLogBulkExportJobStatus.failed,
+        ]).toContain(updatedJob?.status);
+      });
+    });
+
+    describe('4-2. Stream/FS Errors', () => {
+      it('should handle filesystem errors', async () => {
+        cronService.tmpOutputRootDir = '/invalid/path/that/does/not/exist';
+
+        const job = await AuditLogBulkExportJob.create({
+          user: testUser._id,
+          filters: {},
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.exporting,
+          filterHash: 'fs-error-hash',
+          restartFlag: false,
+          totalExportedCount: 0,
+        });
+
+        await expect(async () => {
+          await cronService.proceedBulkExportJob(job);
+        }).not.toThrow();
+      });
+    });
+
+    describe('4-3. Job Expiry and Restart Errors', () => {
+      it('should handle AuditLogBulkExportJobExpiredError', async () => {
+        const job = await AuditLogBulkExportJob.create({
+          user: testUser._id,
+          filters: {},
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.exporting,
+          filterHash: 'expired-error-hash',
+          restartFlag: false,
+          totalExportedCount: 0,
+        });
+
+        const expiredError = new AuditLogBulkExportJobExpiredError();
+
+        await cronService.handleError(expiredError, job);
+
+        const updatedJob = await AuditLogBulkExportJob.findById(job._id);
+        expect(updatedJob?.status).toBe(AuditLogBulkExportJobStatus.failed);
+      });
+
+      it('should handle AuditLogBulkExportJobRestartedError', async () => {
+        const job = await AuditLogBulkExportJob.create({
+          user: testUser._id,
+          filters: {},
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.exporting,
+          filterHash: 'restarted-error-hash',
+          restartFlag: false,
+          totalExportedCount: 0,
+        });
+
+        const restartedError = new AuditLogBulkExportJobRestartedError();
+
+        await cronService.handleError(restartedError, job);
+      });
+    });
+  });
+
+  describe('5. State Transitions and Execution Control', () => {
+    describe('5-1. State Flow', () => {
+      it('should follow correct state transitions: exporting → uploading → completed', async () => {
+        const job = await AuditLogBulkExportJob.create({
+          user: testUser._id,
+          filters: {},
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.exporting,
+          filterHash: 'state-flow-hash',
+          restartFlag: false,
+          totalExportedCount: 0,
+        });
+
+        expect(job.status).toBe(AuditLogBulkExportJobStatus.exporting);
+
+        await cronService.proceedBulkExportJob(job);
+        const afterExport = await waitForJobStatus(
+          job._id,
+          AuditLogBulkExportJobStatus.uploading,
+        );
+
+        expect(afterExport?.status).toBe(AuditLogBulkExportJobStatus.uploading);
+
+        await cronService.proceedBulkExportJob(afterExport);
+        await waitForCondition(() => uploadAttachmentSpy.mock.calls.length > 0);
+
+        await cronService.notifyExportResultAndCleanUp(
+          SupportedAction.ACTION_AUDIT_LOG_BULK_EXPORT_COMPLETED,
+          afterExport,
+        );
+
+        const finalJob = await AuditLogBulkExportJob.findById(job._id);
+        expect(finalJob?.status).toBe(AuditLogBulkExportJobStatus.completed);
+      });
+    });
+
+    describe('5-2. Stream Lifecycle', () => {
+      it('should properly manage stream execution lifecycle', async () => {
+        const job = await AuditLogBulkExportJob.create({
+          user: testUser._id,
+          filters: {},
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.exporting,
+          filterHash: 'stream-lifecycle-hash',
+          restartFlag: false,
+          totalExportedCount: 0,
+        });
+
+        await cronService.proceedBulkExportJob(job);
+        const afterExport = await waitForJobStatus(
+          job._id,
+          AuditLogBulkExportJobStatus.uploading,
+        );
+
+        await cronService.cleanUpExportJobResources(afterExport);
+        const streamAfterCleanup = cronService.getStreamInExecution(job._id);
+        expect(streamAfterCleanup).toBeUndefined();
+      });
+    });
+
+    describe('5-3. Restart Flag Handling', () => {
+      it('should handle restartFlag correctly', async () => {
+        const job = await AuditLogBulkExportJob.create({
+          user: testUser._id,
+          filters: {},
+          format: AuditLogBulkExportFormat.json,
+          status: AuditLogBulkExportJobStatus.exporting,
+          filterHash: 'restart-flag-hash',
+          restartFlag: true,
+          totalExportedCount: 50,
+          lastExportedId: 'some-previous-id',
+        });
+
+        await cronService.proceedBulkExportJob(job);
+        await waitForCondition(async () => {
+          const updatedJob = await AuditLogBulkExportJob.findById(job._id);
+          return updatedJob?.restartFlag === false;
+        });
+
+        const updatedJob = await AuditLogBulkExportJob.findById(job._id);
+
+        expect(updatedJob?.restartFlag).toBe(false);
+        expect(updatedJob?.totalExportedCount).toBe(0);
+        expect(updatedJob?.lastExportedId).toBeUndefined();
+        expect(updatedJob?.status).toBe(AuditLogBulkExportJobStatus.exporting);
+      });
+    });
+  });
+});

+ 11 - 0
apps/app/src/features/audit-log-bulk-export/server/service/audit-log-bulk-export-job-cron/errors.ts

@@ -0,0 +1,11 @@
+export class AuditLogBulkExportJobExpiredError extends Error {
+  constructor() {
+    super('Audit-log-bulk-export job has expired');
+  }
+}
+
+export class AuditLogBulkExportJobRestartedError extends Error {
+  constructor() {
+    super('Audit-log-bulk-export job has restarted');
+  }
+}

+ 136 - 6
apps/app/src/features/audit-log-bulk-export/server/service/audit-log-bulk-export-job-cron/index.ts

@@ -1,10 +1,15 @@
+import fs from 'node:fs';
+import path from 'node:path';
+import type { Readable } from 'node:stream';
 import type { IUser } from '@growi/core';
 import { getIdForRef, isPopulated } from '@growi/core';
+import type archiver from 'archiver';
 import mongoose from 'mongoose';
 
 import type { SupportedActionType } from '~/interfaces/activity';
 import { SupportedAction, SupportedTargetModel } from '~/interfaces/activity';
 import type Crowi from '~/server/crowi';
+import type { ObjectIdLike } from '~/server/interfaces/mongoose-utils';
 import CronService from '~/server/service/cron';
 import loggerFactory from '~/utils/logger';
 
@@ -14,22 +19,45 @@ import {
 } from '../../../interfaces/audit-log-bulk-export';
 import type { AuditLogBulkExportJobDocument } from '../../models/audit-log-bulk-export-job';
 import AuditLogBulkExportJob from '../../models/audit-log-bulk-export-job';
+import {
+  AuditLogBulkExportJobExpiredError,
+  AuditLogBulkExportJobRestartedError,
+} from './errors';
 
 const logger = loggerFactory('growi:service:audit-log-export-job-cron');
 
 export interface IAuditLogBulkExportJobCronService {
   crowi: Crowi;
+  activityEvent: NodeJS.EventEmitter;
+  tmpOutputRootDir: string;
+  pageBatchSize: number;
+  maxLogsPerFile: number;
+  compressFormat: archiver.Format;
+  compressLevel: number;
   proceedBulkExportJob(
     auditLogBulkExportJob: AuditLogBulkExportJobDocument,
-  ): void;
+  ): Promise<void>;
+  getTmpOutputDir(auditLogBulkExportJob: AuditLogBulkExportJobDocument): string;
+  getStreamInExecution(jobId: ObjectIdLike): Readable | undefined;
+  setStreamInExecution(jobId: ObjectIdLike, stream: Readable): void;
+  removeStreamInExecution(jobId: ObjectIdLike): void;
   notifyExportResultAndCleanUp(
     action: SupportedActionType,
     auditLogBulkExportJob: AuditLogBulkExportJobDocument,
   ): Promise<void>;
+  handleError(
+    err: Error | null,
+    auditLogBulkExportJob: AuditLogBulkExportJobDocument,
+  ): Promise<void>;
+  cleanUpExportJobResources(
+    auditLogBulkExportJob: AuditLogBulkExportJobDocument,
+    restarted?: boolean,
+  ): Promise<void>;
 }
 
 import type { ActivityDocument } from '~/server/models/activity';
 import { preNotifyService } from '~/server/service/pre-notify';
+
 import { compressAndUpload } from './steps/compress-and-upload';
 import { exportAuditLogsToFsAsync } from './steps/exportAuditLogsToFsAsync';
 
@@ -47,6 +75,18 @@ class AuditLogBulkExportJobCronService
 
   private parallelExecLimit: number;
 
+  tmpOutputRootDir = '/tmp/audit-log-bulk-export';
+
+  pageBatchSize = 100;
+
+  maxLogsPerFile = 50;
+
+  compressFormat: archiver.Format = 'zip';
+
+  compressLevel = 6;
+
+  private streamInExecutionMemo: { [key: string]: Readable } = {};
+
   constructor(crowi: Crowi) {
     super();
     this.crowi = crowi;
@@ -69,8 +109,8 @@ class AuditLogBulkExportJobCronService
       .sort({ createdAt: 1 })
       .limit(this.parallelExecLimit);
     await Promise.all(
-      auditLogBulkExportJobInProgress.map((job) =>
-        this.proceedBulkExportJob(job),
+      auditLogBulkExportJobInProgress.map((auditLogBulkExportJob) =>
+        this.proceedBulkExportJob(auditLogBulkExportJob),
       ),
     );
   }
@@ -79,6 +119,15 @@ class AuditLogBulkExportJobCronService
     auditLogBulkExportJob: AuditLogBulkExportJobDocument,
   ) {
     try {
+      if (auditLogBulkExportJob.restartFlag) {
+        await this.cleanUpExportJobResources(auditLogBulkExportJob, true);
+        auditLogBulkExportJob.restartFlag = false;
+        auditLogBulkExportJob.status = AuditLogBulkExportJobStatus.exporting;
+        auditLogBulkExportJob.lastExportedId = undefined;
+        auditLogBulkExportJob.totalExportedCount = 0;
+        await auditLogBulkExportJob.save();
+        return;
+      }
       const User = mongoose.model<IUser>('User');
       const user = await User.findById(getIdForRef(auditLogBulkExportJob.user));
 
@@ -95,13 +144,42 @@ class AuditLogBulkExportJobCronService
       } else if (
         auditLogBulkExportJob.status === AuditLogBulkExportJobStatus.uploading
       ) {
-        await compressAndUpload.bind(this)(auditLogBulkExportJob);
+        await compressAndUpload.bind(this)(user, auditLogBulkExportJob);
       }
     } catch (err) {
       logger.error(err);
     }
   }
 
+  getTmpOutputDir(
+    auditLogBulkExportJob: AuditLogBulkExportJobDocument,
+  ): string {
+    const jobId = auditLogBulkExportJob._id.toString();
+    return path.join(this.tmpOutputRootDir, jobId);
+  }
+
+  /**
+   * Get the stream in execution for a job.
+   * A getter method that includes "undefined" in the return type
+   */
+  getStreamInExecution(jobId: ObjectIdLike): Readable | undefined {
+    return this.streamInExecutionMemo[jobId.toString()];
+  }
+
+  /**
+   * Set the stream in execution for a job
+   */
+  setStreamInExecution(jobId: ObjectIdLike, stream: Readable) {
+    this.streamInExecutionMemo[jobId.toString()] = stream;
+  }
+
+  /**
+   * Remove the stream in execution for a job
+   */
+  removeStreamInExecution(jobId: ObjectIdLike) {
+    delete this.streamInExecutionMemo[jobId.toString()];
+  }
+
   async notifyExportResultAndCleanUp(
     action: SupportedActionType,
     auditLogBulkExportJob: AuditLogBulkExportJobDocument,
@@ -117,8 +195,7 @@ class AuditLogBulkExportJobCronService
     } catch (err) {
       logger.error(err);
     }
-    // TODO: Implement cleanup process in a future task.
-    // The following method `cleanUpExportJobResources` will be called here once it's ready.
+    await this.cleanUpExportJobResources(auditLogBulkExportJob);
   }
 
   private async notifyExportResult(
@@ -154,6 +231,59 @@ class AuditLogBulkExportJobCronService
       preNotify,
     );
   }
+
+  async handleError(
+    err: Error | null,
+    auditLogBulkExportJob: AuditLogBulkExportJobDocument,
+  ) {
+    if (err == null) return;
+
+    if (err instanceof AuditLogBulkExportJobExpiredError) {
+      logger.error(err);
+      await this.notifyExportResultAndCleanUp(
+        SupportedAction.ACTION_AUDIT_LOG_BULK_EXPORT_JOB_EXPIRED,
+        auditLogBulkExportJob,
+      );
+    } else if (err instanceof AuditLogBulkExportJobRestartedError) {
+      logger.info(err.message);
+      await this.cleanUpExportJobResources(auditLogBulkExportJob);
+    } else {
+      logger.error(err);
+      await this.notifyExportResultAndCleanUp(
+        SupportedAction.ACTION_AUDIT_LOG_BULK_EXPORT_FAILED,
+        auditLogBulkExportJob,
+      );
+    }
+  }
+
+  async cleanUpExportJobResources(
+    auditLogBulkExportJob: AuditLogBulkExportJobDocument,
+    restarted = false,
+  ) {
+    const streamInExecution = this.getStreamInExecution(
+      auditLogBulkExportJob._id,
+    );
+    if (streamInExecution != null) {
+      if (restarted) {
+        streamInExecution.destroy(new AuditLogBulkExportJobRestartedError());
+      } else {
+        streamInExecution.destroy(new AuditLogBulkExportJobExpiredError());
+      }
+      this.removeStreamInExecution(auditLogBulkExportJob._id);
+    }
+
+    const promises = [
+      fs.promises.rm(this.getTmpOutputDir(auditLogBulkExportJob), {
+        recursive: true,
+        force: true,
+      }),
+    ];
+
+    const results = await Promise.allSettled(promises);
+    results.forEach((result) => {
+      if (result.status === 'rejected') logger.error(result.reason);
+    });
+  }
 }
 
 // eslint-disable-next-line import/no-mutable-exports

+ 91 - 6
apps/app/src/features/audit-log-bulk-export/server/service/audit-log-bulk-export-job-cron/steps/compress-and-upload.ts

@@ -1,19 +1,104 @@
+import type { IUser } from '@growi/core';
+import type { Archiver } from 'archiver';
+import archiver from 'archiver';
+
+import { AuditLogBulkExportJobStatus } from '~/features/audit-log-bulk-export/interfaces/audit-log-bulk-export';
 import { SupportedAction } from '~/interfaces/activity';
+import { AttachmentType } from '~/server/interfaces/attachment';
+import {
+  Attachment,
+  type IAttachmentDocument,
+} from '~/server/models/attachment';
+import type { FileUploader } from '~/server/service/file-uploader';
+import loggerFactory from '~/utils/logger';
+
 import type { AuditLogBulkExportJobDocument } from '../../../models/audit-log-bulk-export-job';
 import type { IAuditLogBulkExportJobCronService } from '..';
+
+const logger = loggerFactory(
+  'growi:service:audit-log-export-job-cron:compress-and-upload-async',
+);
+
+function setUpAuditLogArchiver(
+  this: IAuditLogBulkExportJobCronService,
+): Archiver {
+  const auditLogArchiver = archiver(this.compressFormat, {
+    zlib: { level: this.compressLevel },
+  });
+
+  // good practice to catch warnings (ie stat failures and other non-blocking errors)
+  auditLogArchiver.on('warning', (err) => {
+    if (err.code === 'ENOENT') {
+      logger.error(err);
+    } else {
+      auditLogArchiver.emit('error', err);
+    }
+  });
+
+  return auditLogArchiver;
+}
+
+async function postProcess(
+  this: IAuditLogBulkExportJobCronService,
+  auditLogBulkExportJob: AuditLogBulkExportJobDocument,
+  attachment: IAttachmentDocument,
+  fileSize: number,
+): Promise<void> {
+  attachment.fileSize = fileSize;
+  await attachment.save();
+
+  auditLogBulkExportJob.completedAt = new Date();
+  auditLogBulkExportJob.attachment = attachment._id;
+  auditLogBulkExportJob.status = AuditLogBulkExportJobStatus.completed;
+  await auditLogBulkExportJob.save();
+
+  this.removeStreamInExecution(auditLogBulkExportJob._id);
+  await this.notifyExportResultAndCleanUp(
+    SupportedAction.ACTION_AUDIT_LOG_BULK_EXPORT_COMPLETED,
+    auditLogBulkExportJob,
+  );
+}
+
 /**
  * Execute a pipeline that reads the audit log files from the temporal fs directory,
  * compresses them into a zip file, and uploads to the cloud storage.
- *
- * TODO: Implement the actual compression and upload logic in a future task.
- * Currently, this function only notifies a successful export completion.
  */
 export async function compressAndUpload(
   this: IAuditLogBulkExportJobCronService,
+  user: IUser,
   job: AuditLogBulkExportJobDocument,
 ): Promise<void> {
-  await this.notifyExportResultAndCleanUp(
-    SupportedAction.ACTION_AUDIT_LOG_BULK_EXPORT_COMPLETED,
-    job,
+  const auditLogArchiver = setUpAuditLogArchiver.bind(this)();
+
+  if (job.filterHash == null) throw new Error('filterHash is not set');
+
+  const originalName = `audit-logs-${job.filterHash}.zip`;
+  const attachment = Attachment.createWithoutSave(
+    null,
+    user,
+    originalName,
+    this.compressFormat,
+    0,
+    AttachmentType.AUDIT_LOG_BULK_EXPORT,
   );
+  const fileUploadService: FileUploader = this.crowi.fileUploadService;
+
+  auditLogArchiver.directory(this.getTmpOutputDir(job), false);
+  auditLogArchiver.finalize();
+
+  this.setStreamInExecution(job._id, auditLogArchiver);
+  try {
+    await fileUploadService.uploadAttachment(auditLogArchiver, attachment);
+  } catch (e) {
+    logger.error(e);
+    try {
+      await this.handleError(e as Error, job);
+    } catch (handleErrorErr) {
+      logger.error('Error in handleError:', handleErrorErr);
+    }
+    job.status = AuditLogBulkExportJobStatus.failed;
+    await job.save();
+    return;
+  }
+  await postProcess.bind(this)(job, attachment, auditLogArchiver.pointer());
 }

+ 136 - 5
apps/app/src/features/audit-log-bulk-export/server/service/audit-log-bulk-export-job-cron/steps/exportAuditLogsToFsAsync.ts

@@ -1,17 +1,148 @@
+import fs from 'node:fs';
+import path from 'node:path';
+import { pipeline, Writable } from 'node:stream';
+import type { IUser } from '@growi/core';
+import mongoose, { type FilterQuery } from 'mongoose';
+
 import { AuditLogBulkExportJobStatus } from '~/features/audit-log-bulk-export/interfaces/audit-log-bulk-export';
+import { SupportedAction } from '~/interfaces/activity';
+import Activity, { type ActivityDocument } from '~/server/models/activity';
+
 import type { AuditLogBulkExportJobDocument } from '../../../models/audit-log-bulk-export-job';
 import type { IAuditLogBulkExportJobCronService } from '..';
 
+/**
+ * Get a Writable that writes audit logs to JSON files
+ */
+function getAuditLogWritable(
+  this: IAuditLogBulkExportJobCronService,
+  job: AuditLogBulkExportJobDocument,
+): Writable {
+  const outputDir = this.getTmpOutputDir(job);
+  let buffer: ActivityDocument[] = [];
+  let fileIndex = 0;
+  return new Writable({
+    objectMode: true,
+    write: async (log: ActivityDocument, _encoding, callback) => {
+      try {
+        buffer.push(log);
+
+        // Update lastExportedId for resumability
+        job.lastExportedId = log._id.toString();
+        job.totalExportedCount = (job.totalExportedCount || 0) + 1;
+
+        if (buffer.length >= this.maxLogsPerFile) {
+          const filePath = path.join(
+            outputDir,
+            `audit-logs-${job._id.toString()}-${String(fileIndex).padStart(2, '0')}.json`,
+          );
+          await fs.promises.mkdir(path.dirname(filePath), { recursive: true });
+          await fs.promises.writeFile(
+            filePath,
+            JSON.stringify(buffer, null, 2),
+          );
+
+          await job.save();
+
+          buffer = [];
+          fileIndex++;
+        }
+      } catch (err) {
+        callback(err as Error);
+        return;
+      }
+      callback();
+    },
+    final: async (callback) => {
+      try {
+        if (buffer.length > 0) {
+          const filePath = path.join(
+            outputDir,
+            `audit-logs-${job._id.toString()}-${String(fileIndex).padStart(2, '0')}.json`,
+          );
+          await fs.promises.mkdir(path.dirname(filePath), { recursive: true });
+          await fs.promises.writeFile(
+            filePath,
+            JSON.stringify(buffer, null, 2),
+          );
+        }
+        job.status = AuditLogBulkExportJobStatus.uploading;
+        await job.save();
+      } catch (err) {
+        callback(err as Error);
+        return;
+      }
+      callback();
+    },
+  });
+}
+
 /**
  * Export audit logs to the file system before compressing and uploading.
- *
- * TODO: Implement the actual export logic in a later task.
- * For now, this function only updates the job status to `uploading`.
  */
 export async function exportAuditLogsToFsAsync(
   this: IAuditLogBulkExportJobCronService,
   job: AuditLogBulkExportJobDocument,
 ): Promise<void> {
-  job.status = AuditLogBulkExportJobStatus.uploading;
-  await job.save();
+  const filters = job.filters ?? {};
+  const query: FilterQuery<ActivityDocument> = {};
+
+  // Build query filters for searching activity logs based on user-defined filters
+  if (filters.actions && filters.actions.length > 0) {
+    query.action = { $in: filters.actions };
+  }
+  if (filters.dateFrom || filters.dateTo) {
+    query.createdAt = {};
+    if (filters.dateFrom) {
+      query.createdAt.$gte = new Date(filters.dateFrom);
+    }
+    if (filters.dateTo) {
+      query.createdAt.$lte = new Date(filters.dateTo);
+    }
+  }
+  if (filters.users && filters.users.length > 0) {
+    const User = mongoose.model<IUser>('User');
+    const userIds = await User.find({
+      _id: { $in: filters.users },
+    }).distinct('_id');
+    if (userIds.length === 0) {
+      throw new Error(
+        `No users found with userIDs: ${filters.users.join(', ')}`,
+      );
+    }
+    query.user = { $in: userIds };
+  }
+
+  // If the previous export was incomplete, resume from the last exported ID by adding it to the query filter
+  if (job.lastExportedId) {
+    query._id = { $gt: job.lastExportedId };
+  }
+
+  const hasAny = await Activity.exists(query);
+  if (!hasAny) {
+    job.totalExportedCount = 0;
+    job.status = AuditLogBulkExportJobStatus.completed;
+    job.lastExportedId = undefined;
+    await job.save();
+
+    await this.notifyExportResultAndCleanUp(
+      SupportedAction.ACTION_AUDIT_LOG_BULK_EXPORT_NO_RESULTS,
+      job,
+    );
+    return;
+  }
+
+  const logsCursor = Activity.find(query)
+
+    .sort({ _id: 1 })
+    .lean()
+    .cursor({ batchSize: this.pageBatchSize });
+
+  const writable = getAuditLogWritable.bind(this)(job);
+
+  this.setStreamInExecution(job._id, logsCursor);
+
+  pipeline(logsCursor, writable, (err) => {
+    this.handleError(err, job);
+  });
 }

+ 1 - 1
apps/app/src/features/audit-log-bulk-export/server/service/audit-log-bulk-export.integ.ts

@@ -1,4 +1,5 @@
 import mongoose from 'mongoose';
+
 import type { SupportedActionType } from '~/interfaces/activity';
 import { configManager } from '~/server/service/config-manager';
 
@@ -7,7 +8,6 @@ import {
   AuditLogBulkExportJobStatus,
 } from '../../interfaces/audit-log-bulk-export';
 import AuditLogBulkExportJob from '../models/audit-log-bulk-export-job';
-
 import {
   auditLogBulkExportService,
   DuplicateAuditLogBulkExportJobError,

+ 4 - 0
apps/app/src/interfaces/activity.ts

@@ -73,6 +73,8 @@ const ACTION_AUDIT_LOG_BULK_EXPORT_COMPLETED =
 const ACTION_AUDIT_LOG_BULK_EXPORT_FAILED = 'AUDIT_LOG_BULK_EXPORT_FAILED';
 const ACTION_AUDIT_LOG_BULK_EXPORT_JOB_EXPIRED =
   'AUDIT_LOG_BULK_EXPORT_JOB_EXPIRED';
+const ACTION_AUDIT_LOG_BULK_EXPORT_NO_RESULTS =
+  'ACTION_AUDIT_LOG_BULK_EXPORT_NO_RESULTS';
 const ACTION_TAG_UPDATE = 'TAG_UPDATE';
 const ACTION_IN_APP_NOTIFICATION_ALL_STATUSES_OPEN =
   'IN_APP_NOTIFICATION_ALL_STATUSES_OPEN';
@@ -377,6 +379,7 @@ export const SupportedAction = {
   ACTION_AUDIT_LOG_BULK_EXPORT_COMPLETED,
   ACTION_AUDIT_LOG_BULK_EXPORT_FAILED,
   ACTION_AUDIT_LOG_BULK_EXPORT_JOB_EXPIRED,
+  ACTION_AUDIT_LOG_BULK_EXPORT_NO_RESULTS,
 } as const;
 
 // Action required for notification
@@ -402,6 +405,7 @@ export const EssentialActionGroup = {
   ACTION_AUDIT_LOG_BULK_EXPORT_COMPLETED,
   ACTION_AUDIT_LOG_BULK_EXPORT_FAILED,
   ACTION_AUDIT_LOG_BULK_EXPORT_JOB_EXPIRED,
+  ACTION_AUDIT_LOG_BULK_EXPORT_NO_RESULTS,
 } as const;
 
 export const ActionGroupSize = {

+ 6 - 0
apps/app/src/server/crowi/index.js

@@ -6,6 +6,9 @@ import http from 'http';
 import mongoose from 'mongoose';
 import path from 'path';
 
+import instantiateAuditLogBulkExportJobCleanUpCronService, {
+  auditLogBulkExportJobCleanUpCronService,
+} from '~/features/audit-log-bulk-export/server/service/audit-log-bulk-export-job-clean-up-cron';
 import instantiateAuditLogBulkExportJobCronService from '~/features/audit-log-bulk-export/server/service/audit-log-bulk-export-job-cron';
 import { checkAuditLogExportJobInProgressCronService } from '~/features/audit-log-bulk-export/server/service/check-audit-log-bulk-export-job-in-progress-cron';
 import { KeycloakUserGroupSyncService } from '~/features/external-user-group/server/service/keycloak-user-group-sync';
@@ -370,6 +373,9 @@ Crowi.prototype.setupCron = function () {
   instantiateAuditLogBulkExportJobCronService(this);
   checkAuditLogExportJobInProgressCronService.startCron();
 
+  instantiateAuditLogBulkExportJobCleanUpCronService(this);
+  auditLogBulkExportJobCleanUpCronService.startCron();
+
   startOpenaiCronIfEnabled();
   startAccessTokenCron();
 };

+ 2 - 0
apps/app/src/server/interfaces/attachment.ts

@@ -3,6 +3,7 @@ export const AttachmentType = {
   WIKI_PAGE: 'WIKI_PAGE',
   PROFILE_IMAGE: 'PROFILE_IMAGE',
   PAGE_BULK_EXPORT: 'PAGE_BULK_EXPORT',
+  AUDIT_LOG_BULK_EXPORT: 'AUDIT_LOG_BULK_EXPORT',
 } as const;
 
 export type AttachmentType =
@@ -35,4 +36,5 @@ export const FilePathOnStoragePrefix = {
   attachment: 'attachment',
   user: 'user',
   pageBulkExport: 'page-bulk-export',
+  auditLogBulkExport: 'audit-log-bulk-export',
 } as const;