| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348 |
- import type { IncomingMessage } from 'http';
- import type { IPage, IUserHasId } from '@growi/core';
- import { YDocStatus } from '@growi/core/dist/consts';
- import mongoose from 'mongoose';
- import type { Server } from 'socket.io';
- import { MongodbPersistence } from 'y-mongodb-provider';
- import type { Document, Persistence } from 'y-socket.io/dist/server';
- import { YSocketIO, type Document as Ydoc } from 'y-socket.io/dist/server';
- import * as Y from 'yjs';
- import loggerFactory from '~/utils/logger';
- import type { PageModel } from '../models/page';
- import { Revision } from '../models/revision';
- const MONGODB_PERSISTENCE_COLLECTION_NAME = 'yjs-writings';
- const MONGODB_PERSISTENCE_FLUSH_SIZE = 100;
- const logger = loggerFactory('growi:service:yjs');
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- type Delta = Array<{insert?:Array<any>|string, delete?:number, retain?:number}>;
- type RequestWithUser = IncomingMessage & { user: IUserHasId };
- export interface IYjsService {
- getYDocStatus(pageId: string): Promise<YDocStatus>;
- // handleYDocSync(pageId: string, initialValue: string): Promise<void>;
- handleYDocUpdate(pageId: string, newValue: string): Promise<void>;
- getCurrentYdoc(pageId: string): Ydoc | undefined;
- }
- class YjsService implements IYjsService {
- private ysocketio: YSocketIO;
- private mdb: MongodbPersistence;
- constructor(io: Server) {
- const mdb = new MongodbPersistence(
- // ignore TS2345: Argument of type '{ client: any; db: any; }' is not assignable to parameter of type 'string'.
- // eslint-disable-next-line @typescript-eslint/ban-ts-comment
- // @ts-ignore
- {
- // TODO: Required upgrading mongoose and unifying the versions of mongodb to omit 'as any'
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- client: mongoose.connection.getClient() as any,
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- db: mongoose.connection.db as any,
- },
- {
- collectionName: MONGODB_PERSISTENCE_COLLECTION_NAME,
- flushSize: MONGODB_PERSISTENCE_FLUSH_SIZE,
- },
- );
- this.mdb = mdb;
- const ysocketio = new YSocketIO(io);
- this.injectPersistence(ysocketio, mdb);
- ysocketio.initialize();
- this.ysocketio = ysocketio;
- this.createIndexes();
- ysocketio.on('document-loaded', async(doc: Document) => {
- const pageId = doc.name;
- if (pageId == null) {
- return;
- }
- const ydocStatus = await this.getYDocStatus(pageId);
- const shouldSync = ydocStatus === YDocStatus.NEW || ydocStatus === YDocStatus.OUTDATED;
- if (shouldSync) {
- logger.debug(`Initialize the page ('${pageId}') with the latest revision body`);
- const revision = await Revision
- .findOne({ pageId })
- .sort({ createdAt: -1 })
- .lean();
- if (revision?.body != null) {
- const ytext = doc.getText('codemirror');
- const delta: Delta = (ydocStatus === YDocStatus.OUTDATED && ytext.length > 0)
- ? [
- { delete: ytext.length },
- { insert: revision.body },
- ]
- : [
- { insert: revision.body },
- ];
- ytext.applyDelta(delta, { sanitize: false });
- }
- mdb.setMeta(doc.name, 'updatedAt', revision?.createdAt.getTime() ?? Date.now());
- }
- });
- // io.on('connection', (socket) => {
- // ysocketio.on('awareness-update', async(doc: Document) => {
- // const pageId = extractPageIdFromYdocId(doc.name);
- // if (pageId == null) return;
- // const awarenessStateSize = doc.awareness.states.size;
- // // Triggered when awareness changes
- // io
- // .in(getRoomNameWithId(RoomPrefix.PAGE, pageId))
- // .emit(SocketEventName.YjsAwarenessStateSizeUpdated, awarenessStateSize);
- // // Triggered when the last user leaves the editor
- // if (awarenessStateSize === 0) {
- // const hasYdocsNewerThanLatestRevision = await this.hasYdocsNewerThanLatestRevision(pageId);
- // io
- // .in(getRoomNameWithId(RoomPrefix.PAGE, pageId))
- // .emit(SocketEventName.YjsHasYdocsNewerThanLatestRevisionUpdated, hasYdocsNewerThanLatestRevision);
- // }
- // });
- // socket.on(GlobalSocketEventName.YDocSync, async({ pageId, initialValue }) => {
- // try {
- // await this.handleYDocSync(pageId, initialValue);
- // }
- // catch (error) {
- // logger.warn(error.message);
- // socket.emit(GlobalSocketEventName.YDocSyncError, 'An error occurred during YDoc synchronization.');
- // }
- // });
- // });
- }
- private injectPersistence(ysocketio: YSocketIO, mdb: MongodbPersistence): void {
- const persistece: Persistence = {
- provider: mdb,
- bindState: async(docName, ydoc) => {
- logger.debug('bindState', { docName });
- const persistedYdoc = await mdb.getYDoc(docName);
- // get the state vector so we can just store the diffs between client and server
- const persistedStateVector = Y.encodeStateVector(persistedYdoc);
- const diff = Y.encodeStateAsUpdate(ydoc, persistedStateVector);
- // store the new data in db (if there is any: empty update is an array of 0s)
- if (diff.reduce((previousValue, currentValue) => previousValue + currentValue, 0) > 0) {
- mdb.storeUpdate(docName, diff);
- mdb.setMeta(docName, 'updatedAt', Date.now());
- }
- // send the persisted data to clients
- Y.applyUpdate(ydoc, Y.encodeStateAsUpdate(persistedYdoc));
- // store updates of the document in db
- ydoc.on('update', async(update) => {
- mdb.storeUpdate(docName, update);
- mdb.setMeta(docName, 'updatedAt', Date.now());
- });
- // cleanup some memory
- persistedYdoc.destroy();
- },
- writeState: async(docName) => {
- logger.debug('writeState', { docName });
- // This is called when all connections to the document are closed.
- // flush document on close to have the smallest possible database
- await mdb.flushDocument(docName);
- },
- };
- // foce set to private property
- // eslint-disable-next-line dot-notation
- ysocketio['persistence'] = persistece;
- }
- private async createIndexes(): Promise<void> {
- const collection = mongoose.connection.collection(MONGODB_PERSISTENCE_COLLECTION_NAME);
- try {
- await collection.createIndexes([
- {
- key: {
- version: 1,
- docName: 1,
- action: 1,
- clock: 1,
- part: 1,
- },
- },
- // for metaKey
- {
- key: {
- version: 1,
- docName: 1,
- metaKey: 1,
- },
- },
- // for flushDocument / clearDocument
- {
- key: {
- docName: 1,
- clock: 1,
- },
- },
- ]);
- }
- catch (err) {
- logger.error('Failed to create Index', err);
- throw err;
- }
- }
- public async getYDocStatus(pageId: string): Promise<YDocStatus> {
- const dumpLog = (status: YDocStatus, args?: { [key: string]: number }) => {
- logger.debug(`getYDocStatus('${pageId}') detected '${status}'`, args ?? {});
- };
- // get the latest revision createdAt
- const result = await Revision
- .findOne(
- // filter
- { pageId },
- // projection
- { createdAt: 1 },
- { sort: { createdAt: -1 } },
- );
- if (result == null) {
- dumpLog(YDocStatus.ISOLATED);
- return YDocStatus.ISOLATED;
- }
- // count yjs-writings documents with updatedAt > latestRevision.updatedAt
- const ydocUpdatedAt: number | undefined = await this.mdb.getMeta(pageId, 'updatedAt');
- if (ydocUpdatedAt == null) {
- dumpLog(YDocStatus.NEW);
- return YDocStatus.NEW;
- }
- const { createdAt } = result;
- const lastRevisionCreatedAt = createdAt.getTime();
- if (lastRevisionCreatedAt < ydocUpdatedAt) {
- dumpLog(YDocStatus.DRAFT, { lastRevisionCreatedAt, ydocUpdatedAt });
- return YDocStatus.DRAFT;
- }
- if (lastRevisionCreatedAt === ydocUpdatedAt) {
- dumpLog(YDocStatus.SYNCED, { lastRevisionCreatedAt, ydocUpdatedAt });
- return YDocStatus.SYNCED;
- }
- dumpLog(YDocStatus.OUTDATED, { lastRevisionCreatedAt, ydocUpdatedAt });
- return YDocStatus.OUTDATED;
- }
- // public async handleYDocSync(pageId: string, initialValue: string): Promise<void> {
- // const currentYdoc = this.getCurrentYdoc(pageId);
- // if (currentYdoc == null) {
- // return;
- // }
- // const persistedYdoc = await this.getPersistedYdoc(pageId);
- // const persistedStateVector = Y.encodeStateVector(persistedYdoc);
- // await this.mdb.flushDocument(pageId);
- // // If no write operation has been performed, insert initial value
- // const clientsSize = persistedYdoc.store.clients.size;
- // if (clientsSize === 0) {
- // currentYdoc.getText('codemirror').insert(0, initialValue);
- // }
- // const diff = Y.encodeStateAsUpdate(currentYdoc, persistedStateVector);
- // if (diff.reduce((prev, curr) => prev + curr, 0) > 0) {
- // this.mdb.storeUpdate(pageId, diff);
- // this.mdb.setMeta(pageId, 'updatedAt', Date.now());
- // }
- // Y.applyUpdate(currentYdoc, Y.encodeStateAsUpdate(persistedYdoc));
- // currentYdoc.on('update', async(update) => {
- // this.mdb.storeUpdate(pageId, update);
- // this.mdb.setMeta(pageId, 'updatedAt', Date.now());
- // });
- // currentYdoc.on('destroy', async() => {
- // this.mdb.flushDocument(pageId);
- // });
- // persistedYdoc.destroy();
- // }
- public async handleYDocUpdate(pageId: string, newValue: string): Promise<void> {
- // TODO: https://redmine.weseek.co.jp/issues/132775
- // It's necessary to confirm that the user is not editing the target page in the Editor
- const currentYdoc = this.getCurrentYdoc(pageId);
- if (currentYdoc == null) {
- return;
- }
- const currentMarkdownLength = currentYdoc.getText('codemirror').length;
- currentYdoc.getText('codemirror').delete(0, currentMarkdownLength);
- currentYdoc.getText('codemirror').insert(0, newValue);
- Y.encodeStateAsUpdate(currentYdoc);
- }
- public getCurrentYdoc(pageId: string): Ydoc | undefined {
- const currentYdoc = this.ysocketio.documents.get(pageId);
- return currentYdoc;
- }
- }
- let _instance: YjsService;
- export const initializeYjsService = (io: Server): void => {
- if (_instance != null) {
- throw new Error('YjsService is already initialized');
- }
- if (io == null) {
- throw new Error("'io' is required if initialize YjsService");
- }
- _instance = new YjsService(io);
- };
- export const getYjsService = (): YjsService => {
- if (_instance == null) {
- throw new Error('YjsService is not initialized yet');
- }
- return _instance;
- };
|