Parcourir la source

Merge branch 'master' into feat/183351-add-installed-at-metrics

Yuki Takei il y a 2 semaines
Parent
commit
33f4b7ea4e

+ 8 - 0
apps/app/src/features/opentelemetry/server/custom-metrics/index.ts

@@ -1,8 +1,10 @@
 export { addApplicationMetrics } from './application-metrics';
 export { addInstalledAtMetrics } from './installed-at-metrics';
+export { addMongooseConnectionPoolMetrics } from './mongoose-connection-pool-metrics';
 export { addPageCountsMetrics } from './page-counts-metrics';
 export { addSystemMetrics } from './system-metrics';
 export { addUserCountsMetrics } from './user-counts-metrics';
+export { addYjsMetrics } from './yjs-metrics';
 
 export const setupCustomMetrics = async (): Promise<void> => {
   const { addApplicationMetrics } = await import('./application-metrics');
@@ -10,6 +12,10 @@ export const setupCustomMetrics = async (): Promise<void> => {
   const { addUserCountsMetrics } = await import('./user-counts-metrics');
   const { addPageCountsMetrics } = await import('./page-counts-metrics');
   const { addSystemMetrics } = await import('./system-metrics');
+  const { addYjsMetrics } = await import('./yjs-metrics');
+  const { addMongooseConnectionPoolMetrics } = await import(
+    './mongoose-connection-pool-metrics'
+  );
 
   // Add custom metrics
   addApplicationMetrics();
@@ -17,4 +23,6 @@ export const setupCustomMetrics = async (): Promise<void> => {
   addUserCountsMetrics();
   addPageCountsMetrics();
   addSystemMetrics();
+  addYjsMetrics();
+  addMongooseConnectionPoolMetrics();
 };

+ 277 - 0
apps/app/src/features/opentelemetry/server/custom-metrics/mongoose-connection-pool-metrics.spec.ts

@@ -0,0 +1,277 @@
+import { type Meter, metrics, type ObservableGauge } from '@opentelemetry/api';
+import { mock } from 'vitest-mock-extended';
+
+import {
+  addMongooseConnectionPoolMetrics,
+  getPoolStats,
+} from './mongoose-connection-pool-metrics';
+
+vi.mock('~/utils/logger', () => ({
+  default: () => ({
+    info: vi.fn(),
+    warn: vi.fn(),
+  }),
+}));
+
+vi.mock('@opentelemetry/api', () => ({
+  diag: {
+    createComponentLogger: vi.fn(() => ({ error: vi.fn() })),
+  },
+  metrics: {
+    getMeter: vi.fn(),
+  },
+}));
+
+const { mockGetClient } = vi.hoisted(() => ({
+  mockGetClient: vi.fn(),
+}));
+vi.mock('mongoose', () => ({
+  default: {
+    connection: { getClient: mockGetClient },
+  },
+}));
+
+// ---- helpers ----
+
+function makePool(
+  total: number,
+  checkedOut: number,
+  available: number,
+): {
+  totalConnectionCount: number;
+  currentCheckedOutCount: number;
+  availableConnectionCount: number;
+} {
+  return {
+    totalConnectionCount: total,
+    currentCheckedOutCount: checkedOut,
+    availableConnectionCount: available,
+  };
+}
+
+function makeClient(
+  servers: Map<string, { s?: { pool?: ReturnType<typeof makePool> } }>,
+) {
+  return {
+    topology: {
+      s: { servers },
+    },
+  };
+}
+
+// ---- getPoolStats unit tests ----
+
+describe('getPoolStats', () => {
+  it('returns zeros when client has no topology', () => {
+    expect(getPoolStats({})).toEqual({ total: 0, checkedOut: 0, available: 0 });
+  });
+
+  it('returns zeros when topology.s is missing', () => {
+    expect(getPoolStats({ topology: {} })).toEqual({
+      total: 0,
+      checkedOut: 0,
+      available: 0,
+    });
+  });
+
+  it('returns zeros when servers map is empty', () => {
+    const client = makeClient(new Map());
+    expect(getPoolStats(client)).toEqual({
+      total: 0,
+      checkedOut: 0,
+      available: 0,
+    });
+  });
+
+  it('returns pool stats for a single server', () => {
+    const pool = makePool(5, 2, 3);
+    const client = makeClient(new Map([['localhost:27017', { s: { pool } }]]));
+    expect(getPoolStats(client)).toEqual({
+      total: 5,
+      checkedOut: 2,
+      available: 3,
+    });
+  });
+
+  it('sums stats across multiple servers', () => {
+    const pool1 = makePool(3, 1, 2);
+    const pool2 = makePool(4, 2, 2);
+    const client = makeClient(
+      new Map([
+        ['host1:27017', { s: { pool: pool1 } }],
+        ['host2:27017', { s: { pool: pool2 } }],
+      ]),
+    );
+    expect(getPoolStats(client)).toEqual({
+      total: 7,
+      checkedOut: 3,
+      available: 4,
+    });
+  });
+
+  it('skips servers with no pool', () => {
+    const pool = makePool(2, 1, 1);
+    const client = makeClient(
+      new Map([
+        ['host1:27017', { s: { pool } }],
+        ['host2:27017', {}],
+      ]),
+    );
+    expect(getPoolStats(client)).toEqual({
+      total: 2,
+      checkedOut: 1,
+      available: 1,
+    });
+  });
+
+  it('treats undefined pool fields as 0', () => {
+    const client = makeClient(
+      new Map([
+        ['localhost:27017', { s: { pool: {} as ReturnType<typeof makePool> } }],
+      ]),
+    );
+    expect(getPoolStats(client)).toEqual({
+      total: 0,
+      checkedOut: 0,
+      available: 0,
+    });
+  });
+
+  it('returns zeros and does not throw when an error is thrown internally', () => {
+    const badClient = {
+      get topology(): never {
+        throw new Error('unexpected');
+      },
+    };
+    expect(() => getPoolStats(badClient)).not.toThrow();
+    expect(getPoolStats(badClient)).toEqual({
+      total: 0,
+      checkedOut: 0,
+      available: 0,
+    });
+  });
+});
+
+// ---- addMongooseConnectionPoolMetrics unit tests ----
+
+describe('addMongooseConnectionPoolMetrics', () => {
+  const mockMeter = mock<Meter>();
+  const mockPoolSizeGauge = mock<ObservableGauge>();
+  const mockCheckedOutGauge = mock<ObservableGauge>();
+  const mockAvailableGauge = mock<ObservableGauge>();
+
+  beforeEach(() => {
+    vi.clearAllMocks();
+    vi.mocked(metrics.getMeter).mockReturnValue(mockMeter);
+    mockMeter.createObservableGauge
+      .mockReturnValueOnce(mockPoolSizeGauge)
+      .mockReturnValueOnce(mockCheckedOutGauge)
+      .mockReturnValueOnce(mockAvailableGauge);
+  });
+
+  afterEach(() => {
+    vi.restoreAllMocks();
+  });
+
+  it('returns early without registering meters when getClient() returns null', () => {
+    mockGetClient.mockReturnValue(null);
+    addMongooseConnectionPoolMetrics();
+    expect(metrics.getMeter).not.toHaveBeenCalled();
+  });
+
+  it('creates meter with correct name and version', () => {
+    mockGetClient.mockReturnValue({ topology: { s: { servers: new Map() } } });
+    addMongooseConnectionPoolMetrics();
+    expect(metrics.getMeter).toHaveBeenCalledWith(
+      'growi-mongoose-metrics',
+      '1.0.0',
+    );
+  });
+
+  it('creates three ObservableGauges with the correct names', () => {
+    mockGetClient.mockReturnValue({ topology: { s: { servers: new Map() } } });
+    addMongooseConnectionPoolMetrics();
+
+    const names = mockMeter.createObservableGauge.mock.calls.map(
+      ([name]) => name,
+    );
+    expect(names).toEqual([
+      'growi.mongoose.pool.size',
+      'growi.mongoose.pool.checked_out',
+      'growi.mongoose.pool.available',
+    ]);
+  });
+
+  it('creates all gauges with unit {connection}', () => {
+    mockGetClient.mockReturnValue({ topology: { s: { servers: new Map() } } });
+    addMongooseConnectionPoolMetrics();
+
+    for (const [, options] of mockMeter.createObservableGauge.mock.calls) {
+      expect(options).toMatchObject({ unit: '{connection}' });
+    }
+  });
+
+  it('registers a batch callback covering all three gauges', () => {
+    mockGetClient.mockReturnValue({ topology: { s: { servers: new Map() } } });
+    addMongooseConnectionPoolMetrics();
+
+    expect(mockMeter.addBatchObservableCallback).toHaveBeenCalledTimes(1);
+    const [, gauges] = mockMeter.addBatchObservableCallback.mock.calls[0];
+    expect(gauges).toContain(mockPoolSizeGauge);
+    expect(gauges).toContain(mockCheckedOutGauge);
+    expect(gauges).toContain(mockAvailableGauge);
+  });
+
+  it('observes correct pool stats in the callback', async () => {
+    const pool = makePool(10, 3, 7);
+    const client = makeClient(new Map([['localhost:27017', { s: { pool } }]]));
+    mockGetClient.mockReturnValue(client);
+
+    addMongooseConnectionPoolMetrics();
+
+    const mockResult = { observe: vi.fn() };
+    const [callback] = mockMeter.addBatchObservableCallback.mock.calls[0];
+    await callback(mockResult);
+
+    expect(mockResult.observe).toHaveBeenCalledWith(mockPoolSizeGauge, 10);
+    expect(mockResult.observe).toHaveBeenCalledWith(mockCheckedOutGauge, 3);
+    expect(mockResult.observe).toHaveBeenCalledWith(mockAvailableGauge, 7);
+  });
+
+  it('observes zeros when the topology has no servers', async () => {
+    mockGetClient.mockReturnValue(makeClient(new Map()));
+    addMongooseConnectionPoolMetrics();
+
+    const mockResult = { observe: vi.fn() };
+    const [callback] = mockMeter.addBatchObservableCallback.mock.calls[0];
+    await callback(mockResult);
+
+    expect(mockResult.observe).toHaveBeenCalledWith(mockPoolSizeGauge, 0);
+    expect(mockResult.observe).toHaveBeenCalledWith(mockCheckedOutGauge, 0);
+    expect(mockResult.observe).toHaveBeenCalledWith(mockAvailableGauge, 0);
+  });
+
+  it('reflects updated pool stats across multiple callback invocations', async () => {
+    const pool = makePool(2, 1, 1);
+    const servers = new Map([['localhost:27017', { s: { pool } }]]);
+    mockGetClient.mockReturnValue(makeClient(servers));
+
+    addMongooseConnectionPoolMetrics();
+
+    const mockResult = { observe: vi.fn() };
+    const [callback] = mockMeter.addBatchObservableCallback.mock.calls[0];
+
+    await callback(mockResult);
+    expect(mockResult.observe).toHaveBeenCalledWith(mockPoolSizeGauge, 2);
+
+    // Simulate pool growth
+    pool.totalConnectionCount = 8;
+    pool.currentCheckedOutCount = 5;
+    pool.availableConnectionCount = 3;
+
+    await callback(mockResult);
+    expect(mockResult.observe).toHaveBeenCalledWith(mockPoolSizeGauge, 8);
+    expect(mockResult.observe).toHaveBeenCalledWith(mockCheckedOutGauge, 5);
+    expect(mockResult.observe).toHaveBeenCalledWith(mockAvailableGauge, 3);
+  });
+});

+ 119 - 0
apps/app/src/features/opentelemetry/server/custom-metrics/mongoose-connection-pool-metrics.ts

@@ -0,0 +1,119 @@
+import { diag, metrics } from '@opentelemetry/api';
+import mongoose from 'mongoose';
+
+import loggerFactory from '~/utils/logger';
+
+const logger = loggerFactory(
+  'growi:opentelemetry:custom-metrics:mongoose-connection-pool',
+);
+const loggerDiag = diag.createComponentLogger({
+  namespace: 'growi:custom-metrics:mongoose-connection-pool',
+});
+
+// Internal pool shape accessed via topology internals (mongodb driver 4.x).
+// Wrapped in try/catch so metrics degrade gracefully if the driver changes.
+type ServerPool = {
+  totalConnectionCount?: number;
+  currentCheckedOutCount?: number;
+  availableConnectionCount?: number;
+};
+
+type ServerInternal = { s?: { pool?: ServerPool } };
+
+type TopologyInternal = { s?: { servers?: Map<string, ServerInternal> } };
+
+export type PoolStats = {
+  total: number;
+  checkedOut: number;
+  available: number;
+};
+
+/**
+ * Reads current connection pool stats from the mongodb driver topology.
+ * Sums across all servers (typically one in a standalone/replica-set primary scenario).
+ * Returns zeros if the topology internals are not accessible.
+ */
+export function getPoolStats(client: {
+  topology?: TopologyInternal;
+}): PoolStats {
+  try {
+    const servers = client.topology?.s?.servers;
+    if (!servers) return { total: 0, checkedOut: 0, available: 0 };
+
+    let total = 0;
+    let checkedOut = 0;
+    let available = 0;
+
+    for (const server of servers.values()) {
+      const pool = server?.s?.pool;
+      if (pool) {
+        total += pool.totalConnectionCount ?? 0;
+        checkedOut += pool.currentCheckedOutCount ?? 0;
+        available += pool.availableConnectionCount ?? 0;
+      }
+    }
+
+    return { total, checkedOut, available };
+  } catch {
+    return { total: 0, checkedOut: 0, available: 0 };
+  }
+}
+
+export function addMongooseConnectionPoolMetrics(): void {
+  logger.info('Starting mongoose connection pool metrics collection');
+
+  const client = mongoose.connection.getClient();
+  if (client == null) {
+    logger.warn(
+      'Mongoose client not available; skipping connection pool metrics',
+    );
+    return;
+  }
+
+  const meter = metrics.getMeter('growi-mongoose-metrics', '1.0.0');
+
+  const poolSizeGauge = meter.createObservableGauge(
+    'growi.mongoose.pool.size',
+    {
+      description:
+        'Total number of connections in the MongoDB connection pool (available + pending + checked out)',
+      unit: '{connection}',
+    },
+  );
+  const checkedOutGauge = meter.createObservableGauge(
+    'growi.mongoose.pool.checked_out',
+    {
+      description:
+        'Number of MongoDB connections currently checked out (in use)',
+      unit: '{connection}',
+    },
+  );
+  const availableGauge = meter.createObservableGauge(
+    'growi.mongoose.pool.available',
+    {
+      description:
+        'Number of MongoDB connections currently available in the pool',
+      unit: '{connection}',
+    },
+  );
+
+  meter.addBatchObservableCallback(
+    (result) => {
+      try {
+        const stats = getPoolStats(client as { topology?: TopologyInternal });
+        result.observe(poolSizeGauge, stats.total);
+        result.observe(checkedOutGauge, stats.checkedOut);
+        result.observe(availableGauge, stats.available);
+      } catch (error) {
+        loggerDiag.error('Failed to collect mongoose connection pool metrics', {
+          error,
+        });
+      }
+    },
+    [poolSizeGauge, checkedOutGauge, availableGauge],
+  );
+
+  logger.info(
+    'Mongoose connection pool metrics collection started successfully',
+  );
+}

+ 179 - 0
apps/app/src/features/opentelemetry/server/custom-metrics/yjs-metrics.spec.ts

@@ -0,0 +1,179 @@
+import { type Meter, metrics, type ObservableGauge } from '@opentelemetry/api';
+import { mock } from 'vitest-mock-extended';
+
+import { addYjsMetrics, getDocsCount } from './yjs-metrics';
+
+// Mock external dependencies
+vi.mock('~/utils/logger', () => ({
+  default: () => ({
+    info: vi.fn(),
+  }),
+}));
+
+const { mockDiagError } = vi.hoisted(() => ({
+  mockDiagError: vi.fn(),
+}));
+
+vi.mock('@opentelemetry/api', () => ({
+  diag: {
+    createComponentLogger: vi.fn(() => ({ error: mockDiagError })),
+  },
+  metrics: {
+    getMeter: vi.fn(),
+  },
+}));
+
+// Controlled docs Map mock
+const mockDocs = new Map<string, unknown>();
+vi.mock('y-websocket/bin/utils', () => ({
+  get docs() {
+    return mockDocs;
+  },
+}));
+
+describe('addYjsMetrics', () => {
+  const mockMeter = mock<Meter>();
+  const mockGauge = mock<ObservableGauge>();
+
+  beforeEach(() => {
+    vi.clearAllMocks();
+    mockDocs.clear();
+    mockDiagError.mockReset();
+    vi.mocked(metrics.getMeter).mockReturnValue(mockMeter);
+    mockMeter.createObservableGauge.mockReturnValue(mockGauge);
+  });
+
+  afterEach(() => {
+    vi.restoreAllMocks();
+  });
+
+  describe('meter and gauge setup', () => {
+    it('should create meter with correct name and version', () => {
+      addYjsMetrics();
+
+      expect(metrics.getMeter).toHaveBeenCalledWith(
+        'growi-yjs-metrics',
+        '1.0.0',
+      );
+      expect(metrics.getMeter).toHaveBeenCalledTimes(1);
+    });
+
+    it('should create ObservableGauge with name growi.yjs.docs.count (Req 4.1)', () => {
+      addYjsMetrics();
+
+      expect(mockMeter.createObservableGauge).toHaveBeenCalledWith(
+        'growi.yjs.docs.count',
+        expect.objectContaining({ unit: '{document}' }),
+      );
+    });
+
+    it('should create ObservableGauge with unit {document} (Req 4.2)', () => {
+      addYjsMetrics();
+
+      const [, options] = mockMeter.createObservableGauge.mock.calls[0];
+      expect(options).toMatchObject({ unit: '{document}' });
+    });
+
+    it('should create ObservableGauge with an appropriate description', () => {
+      addYjsMetrics();
+
+      const [name, options] = mockMeter.createObservableGauge.mock.calls[0];
+      expect(name).toBe('growi.yjs.docs.count');
+      expect(options?.description).toBeTruthy();
+      expect(typeof options?.description).toBe('string');
+    });
+
+    it('should register a callback via addBatchObservableCallback (Req 4.2)', () => {
+      addYjsMetrics();
+
+      expect(mockMeter.addBatchObservableCallback).toHaveBeenCalledTimes(1);
+      const [, gaugeArray] = mockMeter.addBatchObservableCallback.mock.calls[0];
+      expect(gaugeArray).toContain(mockGauge);
+    });
+  });
+
+  describe('callback behavior — docs.size reflects current count', () => {
+    it('should observe 0 when docs is empty (Req 4.1)', async () => {
+      mockDocs.clear(); // size === 0
+
+      addYjsMetrics();
+
+      const mockResult = { observe: vi.fn() };
+      const callback = mockMeter.addBatchObservableCallback.mock.calls[0][0];
+      await callback(mockResult);
+
+      expect(mockResult.observe).toHaveBeenCalledWith(mockGauge, 0);
+    });
+
+    it('should observe docs.size when docs has N entries (Req 4.1)', async () => {
+      mockDocs.set('doc-1', {});
+      mockDocs.set('doc-2', {});
+      mockDocs.set('doc-3', {});
+
+      addYjsMetrics();
+
+      const mockResult = { observe: vi.fn() };
+      const callback = mockMeter.addBatchObservableCallback.mock.calls[0][0];
+      await callback(mockResult);
+
+      expect(mockResult.observe).toHaveBeenCalledWith(mockGauge, 3);
+    });
+
+    it('should reflect updated docs.size across multiple callback invocations', async () => {
+      addYjsMetrics();
+
+      const mockResult = { observe: vi.fn() };
+      const callback = mockMeter.addBatchObservableCallback.mock.calls[0][0];
+
+      // First invocation — no docs
+      await callback(mockResult);
+      expect(mockResult.observe).toHaveBeenLastCalledWith(mockGauge, 0);
+
+      // Add docs and invoke again
+      mockDocs.set('doc-a', {});
+      mockDocs.set('doc-b', {});
+      await callback(mockResult);
+      expect(mockResult.observe).toHaveBeenLastCalledWith(mockGauge, 2);
+    });
+
+    it('does not propagate errors and logs via diag when observation throws', async () => {
+      addYjsMetrics();
+
+      const throwingResult = {
+        observe: vi.fn().mockImplementation(() => {
+          throw new Error('otel observe failed');
+        }),
+      };
+      const callback = mockMeter.addBatchObservableCallback.mock.calls[0][0];
+
+      await expect(async () => callback(throwingResult)).not.toThrow();
+      expect(mockDiagError).toHaveBeenCalledWith(
+        expect.stringContaining('yjs'),
+        expect.objectContaining({ error: expect.any(Error) }),
+      );
+    });
+  });
+
+  describe('getDocsCount — defensive helper', () => {
+    it('should return 0 when docs is undefined (Req design: defensive check)', () => {
+      expect(getDocsCount(undefined)).toBe(0);
+    });
+
+    it('should return 0 when docs is null (Req design: defensive check)', () => {
+      expect(getDocsCount(null)).toBe(0);
+    });
+
+    it('should return 0 when docs is an empty map', () => {
+      expect(getDocsCount(new Map())).toBe(0);
+    });
+
+    it('should return the map size when docs has entries', () => {
+      const m = new Map<string, unknown>([
+        ['a', {}],
+        ['b', {}],
+        ['c', {}],
+      ]);
+      expect(getDocsCount(m)).toBe(3);
+    });
+  });
+});

+ 47 - 0
apps/app/src/features/opentelemetry/server/custom-metrics/yjs-metrics.ts

@@ -0,0 +1,47 @@
+import { diag, metrics } from '@opentelemetry/api';
+import { docs } from 'y-websocket/bin/utils';
+
+import loggerFactory from '~/utils/logger';
+
+const logger = loggerFactory('growi:opentelemetry:custom-metrics:yjs');
+const loggerDiag = diag.createComponentLogger({
+  namespace: 'growi:custom-metrics:yjs',
+});
+
+/**
+ * Returns the number of documents in the given map.
+ * Returns 0 when the map is undefined or null (y-websocket not yet initialised).
+ */
+export function getDocsCount(
+  d: ReadonlyMap<string, unknown> | undefined | null,
+): number {
+  return d?.size ?? 0;
+}
+
+export function addYjsMetrics(): void {
+  logger.info('Starting yjs metrics collection');
+
+  const meter = metrics.getMeter('growi-yjs-metrics', '1.0.0');
+
+  const yjsDocsCountGauge = meter.createObservableGauge(
+    'growi.yjs.docs.count',
+    {
+      description:
+        'Current number of collaborative documents held by y-websocket',
+      unit: '{document}',
+    },
+  );
+
+  meter.addBatchObservableCallback(
+    (result) => {
+      try {
+        result.observe(yjsDocsCountGauge, getDocsCount(docs));
+      } catch (error) {
+        loggerDiag.error('Failed to collect yjs metrics', { error });
+      }
+    },
+    [yjsDocsCountGauge],
+  );
+
+  logger.info('Yjs metrics collection started successfully');
+}

+ 9 - 0
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/index.ts

@@ -256,6 +256,15 @@ class PageBulkExportJobCronService
         ? PageBulkExportJobStatus.completed
         : PageBulkExportJobStatus.failed;
 
+    // Guarantee completedAt is set for every completion path (including the
+    // duplicate-reuse path in createPageSnapshotsAsync, which marks the job as
+    // completed without setting completedAt). Without this, the download-expiration
+    // cleanup query `{ completedAt: { $lt } }` never matches such jobs (MongoDB
+    // type bracketing excludes null), so they accumulate forever.
+    if (action === SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED) {
+      pageBulkExportJob.completedAt ??= new Date();
+    }
+
     try {
       await pageBulkExportJob.save();
       await this.notifyExportResult(pageBulkExportJob, action);

+ 150 - 0
apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/notify-export-result-and-clean-up.integ.ts

@@ -0,0 +1,150 @@
+import type { EventEmitter } from 'node:events';
+import mongoose from 'mongoose';
+import { mock } from 'vitest-mock-extended';
+
+import { SupportedAction } from '~/interfaces/activity';
+import type Crowi from '~/server/crowi';
+import { configManager } from '~/server/service/config-manager';
+
+import {
+  PageBulkExportFormat,
+  PageBulkExportJobStatus,
+} from '../../../interfaces/page-bulk-export';
+import PageBulkExportJob from '../../models/page-bulk-export-job';
+import instanciatePageBulkExportJobCronService, {
+  pageBulkExportJobCronService,
+} from './index';
+
+/**
+ * Every completion path must set `completedAt`. The duplicate-reuse path
+ * (createPageSnapshotsAsync) marks a job completed without it, which made the
+ * download-expiration cleanup unable to ever match the job. `notifyExportResultAndCleanUp`
+ * is the single choke point that finalizes the status, so it now backfills `completedAt`.
+ */
+describe('PageBulkExportJobCronService.notifyExportResultAndCleanUp', () => {
+  // `mock<Crowi>` auto-stubs `crowi.activityService.createActivity` (called by the
+  // private `notifyExportResult`). `events.activity` is the source `this.activityEvent`
+  // is bound to in the service constructor, so stubbing it here is enough to silence
+  // `this.activityEvent.emit('updated', ...)` without spying on the private method.
+  const crowi = mock<Crowi>({
+    events: {
+      activity: mock<EventEmitter>(),
+    },
+  });
+
+  beforeAll(async () => {
+    await configManager.loadConfigs();
+    instanciatePageBulkExportJobCronService(crowi);
+    // The fs/resource cleanup step is unrelated to the completedAt contract under test.
+    vi.spyOn(
+      // biome-ignore lint/style/noNonNullAssertion: instanciated above
+      pageBulkExportJobCronService!,
+      'cleanUpExportJobResources',
+    ).mockResolvedValue(undefined);
+  });
+
+  beforeEach(async () => {
+    await PageBulkExportJob.deleteMany();
+  });
+
+  test('should set completedAt when a job is completed without it (duplicate-reuse path)', async () => {
+    // arrange: the only precondition the bug requires is "completedAt is null on entry";
+    // the initial status is irrelevant because notifyExportResultAndCleanUp overwrites it
+    // from the action argument. (Real duplicate-reuse path enters with status=completed,
+    // simplified to status=exporting here.)
+    const job = await PageBulkExportJob.create({
+      user: new mongoose.Types.ObjectId(),
+      page: new mongoose.Types.ObjectId(),
+      format: PageBulkExportFormat.md,
+      status: PageBulkExportJobStatus.exporting,
+    });
+    expect(job.completedAt).toBeUndefined();
+
+    // act
+    const before = Date.now();
+    await pageBulkExportJobCronService?.notifyExportResultAndCleanUp(
+      SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED,
+      job,
+    );
+    const after = Date.now();
+
+    // assert: completedAt is a Date stamped during this call (not a sentinel
+    // like new Date(0) or a stale value), so the download-expiration query
+    // `{ completedAt: { $lt: thresholdDate } }` will correctly include it.
+    const updated = await PageBulkExportJob.findById(job._id);
+    expect(updated?.status).toBe(PageBulkExportJobStatus.completed);
+    const completedAtMs = updated?.completedAt?.getTime();
+    expect(completedAtMs).toBeGreaterThanOrEqual(before);
+    expect(completedAtMs).toBeLessThanOrEqual(after);
+  });
+
+  test('should preserve an already-set completedAt (normal completion path)', async () => {
+    // arrange
+    const originalCompletedAt = new Date('2020-01-01T00:00:00.000Z');
+    const job = await PageBulkExportJob.create({
+      user: new mongoose.Types.ObjectId(),
+      page: new mongoose.Types.ObjectId(),
+      format: PageBulkExportFormat.md,
+      status: PageBulkExportJobStatus.uploading,
+      completedAt: originalCompletedAt,
+    });
+
+    // act
+    await pageBulkExportJobCronService?.notifyExportResultAndCleanUp(
+      SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED,
+      job,
+    );
+
+    // assert
+    const updated = await PageBulkExportJob.findById(job._id);
+    expect(updated?.completedAt?.toISOString()).toBe(
+      originalCompletedAt.toISOString(),
+    );
+  });
+
+  test('should not set completedAt when the job failed', async () => {
+    // arrange
+    const job = await PageBulkExportJob.create({
+      user: new mongoose.Types.ObjectId(),
+      page: new mongoose.Types.ObjectId(),
+      format: PageBulkExportFormat.md,
+      status: PageBulkExportJobStatus.exporting,
+    });
+
+    // act
+    await pageBulkExportJobCronService?.notifyExportResultAndCleanUp(
+      SupportedAction.ACTION_PAGE_BULK_EXPORT_FAILED,
+      job,
+    );
+
+    // assert
+    const updated = await PageBulkExportJob.findById(job._id);
+    expect(updated?.status).toBe(PageBulkExportJobStatus.failed);
+    expect(updated?.completedAt).toBeUndefined();
+  });
+
+  // JOB_EXPIRED is a third action that flows through the same choke point.
+  // Under the current branch (`action === COMPLETED`) it is equivalent to
+  // FAILED, but pinning it ensures a future split between the two does not
+  // silently start backfilling completedAt on expired jobs.
+  test('should not set completedAt when the job expired', async () => {
+    // arrange
+    const job = await PageBulkExportJob.create({
+      user: new mongoose.Types.ObjectId(),
+      page: new mongoose.Types.ObjectId(),
+      format: PageBulkExportFormat.md,
+      status: PageBulkExportJobStatus.exporting,
+    });
+
+    // act
+    await pageBulkExportJobCronService?.notifyExportResultAndCleanUp(
+      SupportedAction.ACTION_PAGE_BULK_EXPORT_JOB_EXPIRED,
+      job,
+    );
+
+    // assert
+    const updated = await PageBulkExportJob.findById(job._id);
+    expect(updated?.status).toBe(PageBulkExportJobStatus.failed);
+    expect(updated?.completedAt).toBeUndefined();
+  });
+});