|
@@ -1,49 +1,47 @@
|
|
|
-import type { IPage, IUserHasId } from '@growi/core';
|
|
|
|
|
-import { YDocStatus } from '@growi/core/dist/consts';
|
|
|
|
|
-import type { IncomingMessage } from 'http';
|
|
|
|
|
|
|
+import type http from 'node:http';
|
|
|
|
|
+import { YDocStatus, YJS_WEBSOCKET_BASE_PATH } from '@growi/core/dist/consts';
|
|
|
import mongoose from 'mongoose';
|
|
import mongoose from 'mongoose';
|
|
|
import type { Server } from 'socket.io';
|
|
import type { Server } from 'socket.io';
|
|
|
-import type { Document } from 'y-socket.io/dist/server';
|
|
|
|
|
-import { type Document as Ydoc, YSocketIO } from 'y-socket.io/dist/server';
|
|
|
|
|
|
|
+import { WebSocketServer } from 'ws';
|
|
|
|
|
+import type { WSSharedDoc } from 'y-websocket/bin/utils';
|
|
|
|
|
+import { docs, setPersistence, setupWSConnection } from 'y-websocket/bin/utils';
|
|
|
|
|
|
|
|
-import { SocketEventName } from '~/interfaces/websocket';
|
|
|
|
|
|
|
+import type { SessionConfig } from '~/interfaces/session-config';
|
|
|
import type { SyncLatestRevisionBody } from '~/interfaces/yjs';
|
|
import type { SyncLatestRevisionBody } from '~/interfaces/yjs';
|
|
|
-import {
|
|
|
|
|
- getRoomNameWithId,
|
|
|
|
|
- RoomPrefix,
|
|
|
|
|
-} from '~/server/service/socket-io/helper';
|
|
|
|
|
import loggerFactory from '~/utils/logger';
|
|
import loggerFactory from '~/utils/logger';
|
|
|
|
|
|
|
|
-import type { PageModel } from '../../models/page';
|
|
|
|
|
import { Revision } from '../../models/revision';
|
|
import { Revision } from '../../models/revision';
|
|
|
import { normalizeLatestRevisionIfBroken } from '../revision/normalize-latest-revision-if-broken';
|
|
import { normalizeLatestRevisionIfBroken } from '../revision/normalize-latest-revision-if-broken';
|
|
|
import { createIndexes } from './create-indexes';
|
|
import { createIndexes } from './create-indexes';
|
|
|
import { createMongoDBPersistence } from './create-mongodb-persistence';
|
|
import { createMongoDBPersistence } from './create-mongodb-persistence';
|
|
|
import { MongodbPersistence } from './extended/mongodb-persistence';
|
|
import { MongodbPersistence } from './extended/mongodb-persistence';
|
|
|
|
|
+import { guardSocket } from './guard-socket';
|
|
|
import { syncYDoc } from './sync-ydoc';
|
|
import { syncYDoc } from './sync-ydoc';
|
|
|
|
|
+import { createUpgradeHandler } from './upgrade-handler';
|
|
|
|
|
|
|
|
const MONGODB_PERSISTENCE_COLLECTION_NAME = 'yjs-writings';
|
|
const MONGODB_PERSISTENCE_COLLECTION_NAME = 'yjs-writings';
|
|
|
const MONGODB_PERSISTENCE_FLUSH_SIZE = 100;
|
|
const MONGODB_PERSISTENCE_FLUSH_SIZE = 100;
|
|
|
|
|
+const YJS_PATH_PREFIX = `${YJS_WEBSOCKET_BASE_PATH}/`;
|
|
|
|
|
|
|
|
const logger = loggerFactory('growi:service:yjs');
|
|
const logger = loggerFactory('growi:service:yjs');
|
|
|
|
|
|
|
|
-type RequestWithUser = IncomingMessage & { user: IUserHasId };
|
|
|
|
|
-
|
|
|
|
|
export interface IYjsService {
|
|
export interface IYjsService {
|
|
|
getYDocStatus(pageId: string): Promise<YDocStatus>;
|
|
getYDocStatus(pageId: string): Promise<YDocStatus>;
|
|
|
syncWithTheLatestRevisionForce(
|
|
syncWithTheLatestRevisionForce(
|
|
|
pageId: string,
|
|
pageId: string,
|
|
|
editingMarkdownLength?: number,
|
|
editingMarkdownLength?: number,
|
|
|
): Promise<SyncLatestRevisionBody>;
|
|
): Promise<SyncLatestRevisionBody>;
|
|
|
- getCurrentYdoc(pageId: string): Ydoc | undefined;
|
|
|
|
|
|
|
+ getCurrentYdoc(pageId: string): WSSharedDoc | undefined;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
class YjsService implements IYjsService {
|
|
class YjsService implements IYjsService {
|
|
|
- private ysocketio: YSocketIO;
|
|
|
|
|
-
|
|
|
|
|
private mdb: MongodbPersistence;
|
|
private mdb: MongodbPersistence;
|
|
|
|
|
|
|
|
- constructor(io: Server) {
|
|
|
|
|
|
|
+ constructor(
|
|
|
|
|
+ httpServer: http.Server,
|
|
|
|
|
+ io: Server,
|
|
|
|
|
+ sessionConfig: SessionConfig,
|
|
|
|
|
+ ) {
|
|
|
const mdb = new MongodbPersistence(
|
|
const mdb = new MongodbPersistence(
|
|
|
{
|
|
{
|
|
|
// TODO: Required upgrading mongoose and unifying the versions of mongodb to omit 'as any'
|
|
// TODO: Required upgrading mongoose and unifying the versions of mongodb to omit 'as any'
|
|
@@ -57,80 +55,62 @@ class YjsService implements IYjsService {
|
|
|
);
|
|
);
|
|
|
this.mdb = mdb;
|
|
this.mdb = mdb;
|
|
|
|
|
|
|
|
- // initialize YSocketIO
|
|
|
|
|
- const ysocketio = new YSocketIO(io);
|
|
|
|
|
- this.injectPersistence(ysocketio, mdb);
|
|
|
|
|
- ysocketio.initialize();
|
|
|
|
|
- this.ysocketio = ysocketio;
|
|
|
|
|
-
|
|
|
|
|
// create indexes
|
|
// create indexes
|
|
|
createIndexes(MONGODB_PERSISTENCE_COLLECTION_NAME);
|
|
createIndexes(MONGODB_PERSISTENCE_COLLECTION_NAME);
|
|
|
|
|
|
|
|
- // register middlewares
|
|
|
|
|
- this.registerAccessiblePageChecker(ysocketio);
|
|
|
|
|
-
|
|
|
|
|
- ysocketio.on('document-loaded', async (doc: Document) => {
|
|
|
|
|
- const pageId = doc.name;
|
|
|
|
|
-
|
|
|
|
|
- const ydocStatus = await this.getYDocStatus(pageId);
|
|
|
|
|
-
|
|
|
|
|
- syncYDoc(mdb, doc, { ydocStatus });
|
|
|
|
|
- });
|
|
|
|
|
-
|
|
|
|
|
- ysocketio.on('awareness-update', async (doc: Document) => {
|
|
|
|
|
- const pageId = doc.name;
|
|
|
|
|
-
|
|
|
|
|
- if (pageId == null) return;
|
|
|
|
|
-
|
|
|
|
|
- const awarenessStateSize = doc.awareness.states.size;
|
|
|
|
|
|
|
+ // setup y-websocket persistence (includes awareness bridge and sync-on-load)
|
|
|
|
|
+ const persistence = createMongoDBPersistence(mdb, io, syncYDoc, (pageId) =>
|
|
|
|
|
+ this.getYDocStatus(pageId),
|
|
|
|
|
+ );
|
|
|
|
|
+ setPersistence(persistence);
|
|
|
|
|
|
|
|
- // Triggered when awareness changes
|
|
|
|
|
- io.in(getRoomNameWithId(RoomPrefix.PAGE, pageId)).emit(
|
|
|
|
|
- SocketEventName.YjsAwarenessStateSizeUpdated,
|
|
|
|
|
- awarenessStateSize,
|
|
|
|
|
- );
|
|
|
|
|
|
|
+ // setup WebSocket server
|
|
|
|
|
+ const wss = new WebSocketServer({ noServer: true });
|
|
|
|
|
+ const handleUpgrade = createUpgradeHandler(sessionConfig);
|
|
|
|
|
|
|
|
- // Triggered when the last user leaves the editor
|
|
|
|
|
- if (awarenessStateSize === 0) {
|
|
|
|
|
- const ydocStatus = await this.getYDocStatus(pageId);
|
|
|
|
|
- const hasYdocsNewerThanLatestRevision =
|
|
|
|
|
- ydocStatus === YDocStatus.DRAFT || ydocStatus === YDocStatus.ISOLATED;
|
|
|
|
|
|
|
+ httpServer.on('upgrade', async (request, socket, head) => {
|
|
|
|
|
+ const url = request.url ?? '';
|
|
|
|
|
|
|
|
- io.in(getRoomNameWithId(RoomPrefix.PAGE, pageId)).emit(
|
|
|
|
|
- SocketEventName.YjsHasYdocsNewerThanLatestRevisionUpdated,
|
|
|
|
|
- hasYdocsNewerThanLatestRevision,
|
|
|
|
|
- );
|
|
|
|
|
|
|
+ // Only handle /yjs/ paths; let Socket.IO and others pass through
|
|
|
|
|
+ if (!url.startsWith(YJS_PATH_PREFIX)) {
|
|
|
|
|
+ return;
|
|
|
}
|
|
}
|
|
|
- });
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- private injectPersistence(
|
|
|
|
|
- ysocketio: YSocketIO,
|
|
|
|
|
- mdb: MongodbPersistence,
|
|
|
|
|
- ): void {
|
|
|
|
|
- const persistece = createMongoDBPersistence(mdb);
|
|
|
|
|
-
|
|
|
|
|
- // foce set to private property
|
|
|
|
|
- // biome-ignore lint/complexity/useLiteralKeys: ignore
|
|
|
|
|
- ysocketio['persistence'] = persistece;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- private registerAccessiblePageChecker(ysocketio: YSocketIO): void {
|
|
|
|
|
- // check accessible page
|
|
|
|
|
- ysocketio.nsp?.use(async (socket, next) => {
|
|
|
|
|
- // extract page id from namespace
|
|
|
|
|
- const pageId = socket.nsp.name.replace(/\/yjs\|/, '');
|
|
|
|
|
- const user = (socket.request as RequestWithUser).user; // should be injected by SocketIOService
|
|
|
|
|
|
|
|
|
|
- const Page = mongoose.model<IPage, PageModel>('Page');
|
|
|
|
|
- const isAccessible = await Page.isAccessiblePageByViewer(pageId, user);
|
|
|
|
|
-
|
|
|
|
|
- if (!isAccessible) {
|
|
|
|
|
- return next(new Error('Forbidden'));
|
|
|
|
|
|
|
+ // Guard the socket against being closed by other upgrade handlers
|
|
|
|
|
+ // (e.g. Next.js's NextCustomServer.upgradeHandler) that run synchronously
|
|
|
|
|
+ // after this async handler yields at the first await.
|
|
|
|
|
+ const guard = guardSocket(socket);
|
|
|
|
|
+
|
|
|
|
|
+ try {
|
|
|
|
|
+ const result = await handleUpgrade(request, socket, head);
|
|
|
|
|
+
|
|
|
|
|
+ // Restore original socket methods now that all synchronous
|
|
|
|
|
+ // upgrade handlers have finished
|
|
|
|
|
+ guard.restore();
|
|
|
|
|
+
|
|
|
|
|
+ if (!result.authorized) {
|
|
|
|
|
+ // rejectUpgrade already wrote the HTTP error response but
|
|
|
|
|
+ // socket.destroy() was a no-op during the guard; clean up now
|
|
|
|
|
+ socket.destroy();
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ wss.handleUpgrade(result.request, socket, head, (ws) => {
|
|
|
|
|
+ wss.emit('connection', ws, result.request);
|
|
|
|
|
+ setupWSConnection(ws, result.request, { docName: result.pageId });
|
|
|
|
|
+ });
|
|
|
|
|
+ } catch (err) {
|
|
|
|
|
+ guard.restore();
|
|
|
|
|
+
|
|
|
|
|
+ logger.error('Yjs upgrade handler failed unexpectedly', { url, err });
|
|
|
|
|
+ if (socket.writable) {
|
|
|
|
|
+ socket.write('HTTP/1.1 500 Internal Server Error\r\n\r\n');
|
|
|
|
|
+ }
|
|
|
|
|
+ socket.destroy();
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
- return next();
|
|
|
|
|
});
|
|
});
|
|
|
|
|
+
|
|
|
|
|
+ logger.info('YjsService initialized with y-websocket');
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
public async getYDocStatus(pageId: string): Promise<YDocStatus> {
|
|
public async getYDocStatus(pageId: string): Promise<YDocStatus> {
|
|
@@ -187,14 +167,14 @@ class YjsService implements IYjsService {
|
|
|
pageId: string,
|
|
pageId: string,
|
|
|
editingMarkdownLength?: number,
|
|
editingMarkdownLength?: number,
|
|
|
): Promise<SyncLatestRevisionBody> {
|
|
): Promise<SyncLatestRevisionBody> {
|
|
|
- const doc = this.ysocketio.documents.get(pageId);
|
|
|
|
|
|
|
+ const doc = docs.get(pageId);
|
|
|
|
|
|
|
|
if (doc == null) {
|
|
if (doc == null) {
|
|
|
return { synced: false };
|
|
return { synced: false };
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- const ytextLength = doc?.getText('codemirror').length;
|
|
|
|
|
- syncYDoc(this.mdb, doc, true);
|
|
|
|
|
|
|
+ const ytextLength = doc.getText('codemirror').length;
|
|
|
|
|
+ await syncYDoc(this.mdb, doc, true);
|
|
|
|
|
|
|
|
return {
|
|
return {
|
|
|
synced: true,
|
|
synced: true,
|
|
@@ -205,24 +185,23 @@ class YjsService implements IYjsService {
|
|
|
};
|
|
};
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- public getCurrentYdoc(pageId: string): Ydoc | undefined {
|
|
|
|
|
- const currentYdoc = this.ysocketio.documents.get(pageId);
|
|
|
|
|
- return currentYdoc;
|
|
|
|
|
|
|
+ public getCurrentYdoc(pageId: string): WSSharedDoc | undefined {
|
|
|
|
|
+ return docs.get(pageId);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
let _instance: YjsService;
|
|
let _instance: YjsService;
|
|
|
|
|
|
|
|
-export const initializeYjsService = (io: Server): void => {
|
|
|
|
|
|
|
+export const initializeYjsService = (
|
|
|
|
|
+ httpServer: http.Server,
|
|
|
|
|
+ io: Server,
|
|
|
|
|
+ sessionConfig: SessionConfig,
|
|
|
|
|
+): void => {
|
|
|
if (_instance != null) {
|
|
if (_instance != null) {
|
|
|
throw new Error('YjsService is already initialized');
|
|
throw new Error('YjsService is already initialized');
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- if (io == null) {
|
|
|
|
|
- throw new Error("'io' is required if initialize YjsService");
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- _instance = new YjsService(io);
|
|
|
|
|
|
|
+ _instance = new YjsService(httpServer, io, sessionConfig);
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
export const getYjsService = (): YjsService => {
|
|
export const getYjsService = (): YjsService => {
|