Browse Source

Merge pull request #7623 from weseek/feat/120705-elasticsearchv8-module

feat: Elasticsearchv8 module
Shun Miyazawa 3 years ago
parent
commit
6e07a781ba

+ 2 - 2
apps/app/resource/search/mappings-es7-for-ci.json → apps/app/resource/search/mappings-es8-for-ci.json

@@ -104,11 +104,11 @@
       },
       "created_at": {
         "type": "date",
-        "format": "dateOptionalTime"
+        "format": "date_optional_time"
       },
       "updated_at": {
         "type": "date",
-        "format": "dateOptionalTime"
+        "format": "date_optional_time"
       },
       "tag_names": {
         "type": "keyword"

+ 115 - 0
apps/app/resource/search/mappings-es8.json

@@ -0,0 +1,115 @@
+{
+  "settings": {
+    "analysis": {
+      "filter": {
+        "english_stop": {
+          "type":       "stop",
+          "stopwords":  "_english_"
+        }
+      },
+      "tokenizer": {
+        "edge_ngram_tokenizer": {
+          "type": "edge_ngram",
+          "min_gram": 2,
+          "max_gram": 20,
+          "token_chars": ["letter", "digit"]
+        }
+      },
+      "analyzer": {
+        "japanese": {
+          "tokenizer": "kuromoji_tokenizer",
+          "char_filter" : ["icu_normalizer"]
+        },
+        "english_edge_ngram": {
+          "tokenizer": "edge_ngram_tokenizer",
+          "filter": [
+            "lowercase",
+            "english_stop"
+          ]
+        }
+      }
+    }
+  },
+  "mappings": {
+    "properties" : {
+      "path": {
+        "type": "text",
+        "fields": {
+          "raw": {
+            "type": "text",
+            "analyzer": "keyword"
+          },
+          "ja": {
+            "type": "text",
+            "analyzer": "japanese"
+          },
+          "en": {
+            "type": "text",
+            "analyzer": "english_edge_ngram",
+            "search_analyzer": "standard"
+          }
+        }
+      },
+      "body": {
+        "type": "text",
+        "fields": {
+          "ja": {
+            "type": "text",
+            "analyzer": "japanese"
+          },
+          "en": {
+            "type": "text",
+            "analyzer": "english_edge_ngram",
+            "search_analyzer": "standard"
+          }
+        }
+      },
+      "comments": {
+        "type": "text",
+        "fields": {
+          "ja": {
+            "type": "text",
+            "analyzer": "japanese"
+          },
+          "en": {
+            "type": "text",
+            "analyzer": "english_edge_ngram",
+            "search_analyzer": "standard"
+          }
+        }
+      },
+      "username": {
+        "type": "keyword"
+      },
+      "comment_count": {
+        "type": "integer"
+      },
+      "bookmark_count": {
+        "type": "integer"
+      },
+      "like_count": {
+        "type": "integer"
+      },
+      "grant": {
+        "type": "integer"
+      },
+      "granted_users": {
+        "type": "keyword"
+      },
+      "granted_group": {
+        "type": "keyword"
+      },
+      "created_at": {
+        "type": "date",
+        "format": "date_optional_time"
+      },
+      "updated_at": {
+        "type": "date",
+        "format": "date_optional_time"
+      },
+      "tag_names": {
+        "type": "keyword"
+      }
+    }
+  }
+}

+ 57 - 102
apps/app/src/server/service/search-delegator/elasticsearch-client.ts

@@ -1,6 +1,7 @@
 /* eslint-disable implicit-arrow-linebreak */
 /* eslint-disable no-confusing-arrow */
 import { Client as ES7Client, ApiResponse as ES7ApiResponse, RequestParams as ES7RequestParams } from '@elastic/elasticsearch7';
+import { Client as ES8Client, estypes } from '@elastic/elasticsearch8';
 
 import {
   BulkResponse,
@@ -16,134 +17,88 @@ import {
   ReindexResponse,
 } from './elasticsearch-client-types';
 
-// type ApiResponse<T = any, C = any> = ES6ApiResponse<T, C> | ES7ApiResponse<T, C>
-
-// export default class ElasticsearchClient {
-
-//   client: ES6Client | ES7Client;
-
-//   constructor(client: ES6Client | ES7Client) {
-//     this.client = client;
-//   }
-
-//   bulk(params: ES6RequestParams.Bulk & ES7RequestParams.Bulk): Promise<ApiResponse<BulkResponse>> {
-//     return this.client instanceof ES6Client ? this.client.bulk(params) : this.client.bulk(params);
-//   }
-
-//   // TODO: cat is not used in current Implementation, remove cat?
-//   cat = {
-//     aliases: (params: ES6RequestParams.CatAliases & ES7RequestParams.CatAliases): Promise<ApiResponse<CatAliasesResponse>> =>
-//       this.client instanceof ES6Client ? this.client.cat.aliases(params) : this.client.cat.aliases(params),
-//     indices: (params: ES6RequestParams.CatIndices & ES7RequestParams.CatIndices): Promise<ApiResponse<CatIndicesResponse>> =>
-//       this.client instanceof ES6Client ? this.client.cat.indices(params) : this.client.cat.indices(params),
-//   };
-
-//   cluster = {
-//     health: (params: ES6RequestParams.ClusterHealth & ES7RequestParams.ClusterHealth): Promise<ApiResponse<ClusterHealthResponse>> =>
-//       this.client instanceof ES6Client ? this.client.cluster.health(params) : this.client.cluster.health(params),
-//   };
-
-//   indices = {
-//     create: (params: ES6RequestParams.IndicesCreate & ES7RequestParams.IndicesCreate) =>
-//       this.client instanceof ES6Client ? this.client.indices.create(params) : this.client.indices.create(params),
-//     delete: (params: ES6RequestParams.IndicesDelete & ES7RequestParams.IndicesDelete) =>
-//       this.client instanceof ES6Client ? this.client.indices.delete(params) : this.client.indices.delete(params),
-//     exists: (params: ES6RequestParams.IndicesExists & ES7RequestParams.IndicesExists): Promise<ApiResponse<IndicesExistsResponse>> =>
-//       this.client instanceof ES6Client ? this.client.indices.exists(params) : this.client.indices.exists(params),
-//     existsAlias: (params: ES6RequestParams.IndicesExistsAlias & ES7RequestParams.IndicesExistsAlias): Promise<ApiResponse<IndicesExistsAliasResponse>> =>
-//       this.client instanceof ES6Client ? this.client.indices.existsAlias(params) : this.client.indices.existsAlias(params),
-//     putAlias: (params: ES6RequestParams.IndicesPutAlias & ES7RequestParams.IndicesPutAlias) =>
-//       this.client instanceof ES6Client ? this.client.indices.putAlias(params) : this.client.indices.putAlias(params),
-//     getAlias: (params: ES6RequestParams.IndicesGetAlias & ES7RequestParams.IndicesGetAlias) =>
-//       this.client instanceof ES6Client ? this.client.indices.getAlias(params) : this.client.indices.getAlias(params),
-//     updateAliases: (params: ES6RequestParams.IndicesUpdateAliases & ES7RequestParams.IndicesUpdateAliases) =>
-//       this.client instanceof ES6Client ? this.client.indices.updateAliases(params) : this.client.indices.updateAliases(params),
-//     validateQuery: (params: ES6RequestParams.IndicesValidateQuery & ES7RequestParams.IndicesValidateQuery): Promise<ApiResponse<ValidateQueryResponse>> =>
-//       this.client instanceof ES6Client ? this.client.indices.validateQuery(params) : this.client.indices.validateQuery(params),
-//     stats: (params: ES6RequestParams.IndicesStats & ES7RequestParams.IndicesStats): Promise<ApiResponse<IndicesStatsResponse>> =>
-//       this.client instanceof ES6Client ? this.client.indices.stats(params) : this.client.indices.stats(params),
-//   };
-
-//   nodes = {
-//     info: (): Promise<ApiResponse<NodesInfoResponse>> => (this.client instanceof ES6Client ? this.client.nodes.info() : this.client.nodes.info()),
-//   };
-
-//   ping() {
-//     return this.client instanceof ES6Client ? this.client.ping() : this.client.ping();
-//   }
-
-//   reindex(params: ES6RequestParams.Reindex & ES7RequestParams.Reindex): Promise<ApiResponse<ReindexResponse>> {
-//     return this.client instanceof ES6Client ? this.client.reindex(params) : this.client.reindex(params);
-//   }
-
-//   search(params: ES6RequestParams.Search & ES7RequestParams.Search): Promise<ApiResponse<SearchResponse>> {
-//     return this.client instanceof ES6Client ? this.client.search(params) : this.client.search(params);
-//   }
-
-// }
-
-
-type ApiResponse<T = any, C = any> = ES7ApiResponse<T, C>
 export default class ElasticsearchClient {
 
-  client: ES7Client;
+  client: ES7Client | ES8Client;
 
-  constructor(client: ES7Client) {
+  constructor(client: ES7Client | ES8Client) {
     this.client = client;
   }
 
-  bulk(params: ES7RequestParams.Bulk): Promise<ApiResponse<BulkResponse>> {
-    return this.client.bulk(params);
+  async bulk(params: ES7RequestParams.Bulk & estypes.BulkRequest): Promise<BulkResponse | estypes.BulkResponse> {
+    return this.client instanceof ES7Client ? (await this.client.bulk(params)).body as BulkResponse : this.client.bulk(params);
   }
 
   // TODO: cat is not used in current Implementation, remove cat?
   cat = {
-    aliases: (params: ES7RequestParams.CatAliases): Promise<ApiResponse<CatAliasesResponse>> =>
-      this.client.cat.aliases(params),
-    indices: (params: ES7RequestParams.CatIndices): Promise<ApiResponse<CatIndicesResponse>> =>
-      this.client.cat.indices(params),
+    aliases: (params: ES7RequestParams.CatAliases & estypes.CatAliasesRequest): Promise<ES7ApiResponse<CatAliasesResponse> | estypes.CatAliasesResponse> =>
+      this.client instanceof ES7Client ? this.client.cat.aliases(params) : this.client.cat.aliases(params),
+
+    indices: (params: ES7RequestParams.CatIndices & estypes.CatIndicesRequest): Promise<ES7ApiResponse<CatIndicesResponse> | estypes.CatAliasesResponse> =>
+      this.client instanceof ES7Client ? this.client.cat.indices(params) : this.client.cat.indices(params),
   };
 
   cluster = {
-    health: (params: ES7RequestParams.ClusterHealth): Promise<ApiResponse<ClusterHealthResponse>> =>
-      this.client.cluster.health(params),
+    health: (params: ES7RequestParams.ClusterHealth & estypes.ClusterHealthRequest)
+    : Promise<ES7ApiResponse<ClusterHealthResponse> | estypes.ClusterHealthResponse> =>
+      this.client instanceof ES7Client ? this.client.cluster.health(params) : this.client.cluster.health(params),
   };
 
   indices = {
-    create: (params: ES7RequestParams.IndicesCreate) =>
-      this.client.indices.create(params),
-    delete: (params: ES7RequestParams.IndicesDelete) =>
-      this.client.indices.delete(params),
-    exists: (params: ES7RequestParams.IndicesExists): Promise<ApiResponse<IndicesExistsResponse>> =>
-      this.client.indices.exists(params),
-    existsAlias: (params: ES7RequestParams.IndicesExistsAlias): Promise<ApiResponse<IndicesExistsAliasResponse>> =>
-      this.client.indices.existsAlias(params),
-    putAlias: (params: ES7RequestParams.IndicesPutAlias) =>
-      this.client.indices.putAlias(params),
-    getAlias: (params: ES7RequestParams.IndicesGetAlias) =>
-      this.client.indices.getAlias(params),
-    updateAliases: (params: ES7RequestParams.IndicesUpdateAliases) =>
-      this.client.indices.updateAliases(params),
-    validateQuery: (params: ES7RequestParams.IndicesValidateQuery): Promise<ApiResponse<ValidateQueryResponse>> =>
-      this.client.indices.validateQuery(params),
-    stats: (params: ES7RequestParams.IndicesStats): Promise<ApiResponse<IndicesStatsResponse>> =>
-      this.client.indices.stats(params),
+    // eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
+    create: (params: ES7RequestParams.IndicesCreate & estypes.IndicesCreateRequest) =>
+      this.client instanceof ES7Client ? this.client.indices.create(params) : this.client.indices.create(params),
+
+    // eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
+    delete: (params: ES7RequestParams.IndicesDelete & estypes.IndicesDeleteRequest) =>
+      this.client instanceof ES7Client ? this.client.indices.delete(params) : this.client.indices.delete(params),
+
+    exists: async(params: ES7RequestParams.IndicesExists & estypes.IndicesExistsRequest)
+    : Promise<IndicesExistsResponse | estypes.IndicesExistsResponse> =>
+      this.client instanceof ES7Client ? (await this.client.indices.exists(params)).body as IndicesExistsResponse : this.client.indices.exists(params),
+
+    existsAlias: (params: ES7RequestParams.IndicesExistsAlias & estypes.IndicesExistsAliasRequest)
+    : Promise<ES7ApiResponse<IndicesExistsAliasResponse> | estypes.IndicesExistsAliasResponse> =>
+      this.client instanceof ES7Client ? this.client.indices.existsAlias(params) : this.client.indices.existsAlias(params),
+
+    // eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
+    putAlias: (params: ES7RequestParams.IndicesPutAlias & estypes.IndicesPutAliasRequest) =>
+      this.client instanceof ES7Client ? this.client.indices.putAlias(params) : this.client.indices.putAlias(params),
+
+    // eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
+    getAlias: async(params: ES7RequestParams.IndicesGetAlias & estypes.IndicesGetAliasRequest) =>
+      this.client instanceof ES7Client ? (await this.client.indices.getAlias(params)).body : this.client.indices.getAlias(params),
+
+    // eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
+    updateAliases: (params: ES7RequestParams.IndicesUpdateAliases & estypes.IndicesUpdateAliasesRequest) =>
+      this.client instanceof ES7Client ? this.client.indices.updateAliases(params) : this.client.indices.updateAliases(params),
+
+    validateQuery: async(params: ES7RequestParams.IndicesValidateQuery & estypes.IndicesValidateQueryRequest)
+    : Promise<ValidateQueryResponse | estypes.IndicesValidateQueryResponse> =>
+      // eslint-disable-next-line max-len
+      this.client instanceof ES7Client ? (await this.client.indices.validateQuery(params)).body as ValidateQueryResponse : this.client.indices.validateQuery(params),
+
+    stats: async(params: ES7RequestParams.IndicesStats & estypes.IndicesStatsRequest)
+    : Promise<IndicesStatsResponse | estypes.IndicesStatsResponse> =>
+      this.client instanceof ES7Client ? (await this.client.indices.stats(params)).body as IndicesStatsResponse : this.client.indices.stats(params),
   };
 
   nodes = {
-    info: (): Promise<ApiResponse<NodesInfoResponse>> => (this.client.nodes.info()),
+    info: (): Promise<ES7ApiResponse<NodesInfoResponse> | estypes.NodesInfoResponse> =>
+      (this.client instanceof ES7Client ? this.client.nodes.info() : this.client.nodes.info()),
   };
 
+  // eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
   ping() {
-    return this.client.ping();
+    return this.client instanceof ES7Client ? this.client.ping() : this.client.ping();
   }
 
-  reindex(params: ES7RequestParams.Reindex): Promise<ApiResponse<ReindexResponse>> {
-    return this.client.reindex(params);
+  reindex(params: ES7RequestParams.Reindex & estypes.ReindexRequest): Promise<ES7ApiResponse<ReindexResponse> | estypes.ReindexResponse> {
+    return this.client instanceof ES7Client ? this.client.reindex(params) : this.client.reindex(params);
   }
 
-  search(params: ES7RequestParams.Search): Promise<ApiResponse<SearchResponse>> {
-    return this.client.search(params);
+  async search(params: ES7RequestParams.Search & estypes.SearchRequest): Promise<SearchResponse | estypes.SearchResponse> {
+    return this.client instanceof ES7Client ? (await this.client.search(params)).body as SearchResponse : this.client.search(params);
   }
 
 }

+ 43 - 31
apps/app/src/server/service/search-delegator/elasticsearch.ts

@@ -2,6 +2,7 @@ import { Writable, Transform } from 'stream';
 import { URL } from 'url';
 
 import elasticsearch7 from '@elastic/elasticsearch7';
+import elasticsearch8 from '@elastic/elasticsearch8';
 import gc from 'expose-gc/function';
 import mongoose from 'mongoose';
 import streamToPromise from 'stream-to-promise';
@@ -53,7 +54,7 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
 
   socketIoService!: any;
 
-  isElasticsearchV6: boolean;
+  isElasticsearchV7: boolean;
 
   isElasticsearchReindexOnBoot: boolean;
 
@@ -74,13 +75,13 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
 
     const elasticsearchVersion: number = this.configManager.getConfig('crowi', 'app:elasticsearchVersion');
 
-    if (elasticsearchVersion !== 6 && elasticsearchVersion !== 7) {
+    if (elasticsearchVersion !== 7 && elasticsearchVersion !== 8) {
       throw new Error('Unsupported Elasticsearch version. Please specify a valid number to \'ELASTICSEARCH_VERSION\'');
     }
 
-    this.isElasticsearchV6 = false;
+    this.isElasticsearchV7 = elasticsearchVersion === 7;
 
-    this.elasticsearch = elasticsearch7;
+    this.elasticsearch = this.isElasticsearchV7 ? elasticsearch7 : elasticsearch8;
     this.isElasticsearchReindexOnBoot = this.configManager.getConfig('crowi', 'app:elasticsearchReindexOnBoot');
     this.client = null;
 
@@ -128,7 +129,7 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
   }
 
   getType() {
-    return this.isElasticsearchV6 ? 'pages' : '_doc';
+    return this.isElasticsearchV7 ? '_doc' : undefined;
   }
 
   /**
@@ -231,8 +232,8 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
     const tmpIndexName = `${indexName}-tmp`;
 
     // check existence
-    const { body: isExistsMainIndex } = await client.indices.exists({ index: indexName });
-    const { body: isExistsTmpIndex } = await client.indices.exists({ index: tmpIndexName });
+    const isExistsMainIndex = await client.indices.exists({ index: indexName });
+    const isExistsTmpIndex = await client.indices.exists({ index: tmpIndexName });
 
     // create indices name list
     const existingIndices: string[] = [];
@@ -248,9 +249,10 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
       };
     }
 
-    const { body: indicesBody } = await client.indices.stats({ index: existingIndices, metric: ['docs', 'store', 'indexing'] });
-    const { indices } = indicesBody;
-    const { body: aliases } = await client.indices.getAlias({ index: existingIndices });
+    const indicesStats = await client.indices.stats({ index: existingIndices, metric: ['docs', 'store', 'indexing'] });
+    const { indices } = indicesStats;
+
+    const aliases = await client.indices.getAlias({ index: existingIndices });
 
     const isMainIndexHasAlias = isExistsMainIndex && aliases[indexName].aliases != null && aliases[indexName].aliases[aliasName] != null;
     const isTmpIndexHasAlias = isExistsTmpIndex && aliases[tmpIndexName].aliases != null && aliases[tmpIndexName].aliases[aliasName] != null;
@@ -262,6 +264,7 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
       aliases,
       isNormalized,
     };
+
   }
 
   /**
@@ -272,16 +275,24 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
 
     const tmpIndexName = `${indexName}-tmp`;
 
-    try {
-      // reindex to tmp index
-      await this.createIndex(tmpIndexName);
-      await client.reindex({
+    const reindexRequest = this.isElasticsearchV7
+      ? {
         waitForCompletion: false,
         body: {
           source: { index: indexName },
           dest: { index: tmpIndexName },
         },
-      });
+      }
+      : {
+        wait_for_completion: false,
+        source: { index: indexName },
+        dest: { index: tmpIndexName },
+      };
+
+    try {
+      // reindex to tmp index
+      await this.createIndex(tmpIndexName);
+      await client.reindex(reindexRequest);
 
       // update alias
       await client.indices.updateAliases({
@@ -322,13 +333,13 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
     const tmpIndexName = `${indexName}-tmp`;
 
     // remove tmp index
-    const { body: isExistsTmpIndex } = await client.indices.exists({ index: tmpIndexName });
+    const isExistsTmpIndex = await client.indices.exists({ index: tmpIndexName });
     if (isExistsTmpIndex) {
       await client.indices.delete({ index: tmpIndexName });
     }
 
     // create index
-    const { body: isExistsIndex } = await client.indices.exists({ index: indexName });
+    const isExistsIndex = await client.indices.exists({ index: indexName });
     if (!isExistsIndex) {
       await this.createIndex(indexName);
     }
@@ -344,10 +355,12 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
   }
 
   async createIndex(index) {
-    let mappings = require('^/resource/search/mappings-es7.json');
+    let mappings = this.isElasticsearchV7
+      ? require('^/resource/search/mappings-es7.json')
+      : require('^/resource/search/mappings-es8.json');
 
     if (process.env.CI) {
-      mappings = require('^/resource/search/mappings-es7-for-ci.json');
+      mappings = require('^/resource/search/mappings-es8-for-ci.json');
     }
 
     return this.client.indices.create({
@@ -572,14 +585,14 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
         batch.forEach(doc => prepareBodyForCreate(body, doc));
 
         try {
-          const { body: res } = await bulkWrite({
+          const bulkResponse = await bulkWrite({
             body,
             // requestTimeout: Infinity,
           });
 
-          count += (res.items || []).length;
+          count += (bulkResponse.items || []).length;
 
-          logger.info(`Adding pages progressing: (count=${count}, errors=${res.errors}, took=${res.took}ms)`);
+          logger.info(`Adding pages progressing: (count=${count}, errors=${bulkResponse.errors}, took=${bulkResponse.took}ms)`);
 
           if (shouldEmitProgress) {
             socket?.emit('addPageProgress', { totalCount, count, skipped });
@@ -646,7 +659,7 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
     if (process.env.NODE_ENV === 'development') {
       logger.debug('query: ', JSON.stringify(query, null, 2));
 
-      const { body: result } = await this.client.indices.validateQuery({
+      const validateQueryResponse = await this.client.indices.validateQuery({
         index: query.index,
         type: query.type,
         explain: true,
@@ -654,21 +667,20 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
           query: query.body.query,
         },
       });
+
       // for debug
-      logger.debug('ES result: ', result);
+      logger.debug('ES result: ', validateQueryResponse);
     }
 
-    const { body: result } = await this.client.search(query);
-
-    const totalValue = this.isElasticsearchV6 ? result.hits.total : result.hits.total.value;
+    const searchResponse = await this.client.search(query);
 
     return {
       meta: {
-        took: result.took,
-        total: totalValue,
-        hitsCount: result.hits.hits.length,
+        took: searchResponse.took,
+        total: searchResponse.hits.total.value,
+        hitsCount: searchResponse.hits.hits.length,
       },
-      data: result.hits.hits.map((elm) => {
+      data: searchResponse.hits.hits.map((elm) => {
         return {
           _id: elm._id,
           _score: elm._score,