Shun Miyazawa 2 лет назад
Родитель
Сommit
1f61762419

+ 16 - 15
apps/app/src/server/service/search-delegator/elasticsearch-client.ts

@@ -25,8 +25,8 @@ export default class ElasticsearchClient {
     this.client = client;
   }
 
-  bulk(params: ES7RequestParams.Bulk & estypes.BulkRequest): Promise<ES7ApiResponse<BulkResponse> | estypes.BulkResponse> {
-    return this.client instanceof ES7Client ? this.client.bulk(params) : this.client.bulk(params);
+  async bulk(params: ES7RequestParams.Bulk & estypes.BulkRequest): Promise<BulkResponse | estypes.BulkResponse> {
+    return this.client instanceof ES7Client ? (await this.client.bulk(params)).body as BulkResponse : this.client.bulk(params);
   }
 
   // TODO: cat is not used in current Implementation, remove cat?
@@ -50,9 +50,9 @@ export default class ElasticsearchClient {
     // eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
     delete: (params: ES7RequestParams.IndicesDelete & estypes.IndicesDeleteRequest) =>
       this.client instanceof ES7Client ? this.client.indices.delete(params) : this.client.indices.delete(params),
-    exists: (params: ES7RequestParams.IndicesExists & estypes.IndicesExistsRequest)
-    : Promise<ES7ApiResponse<IndicesExistsResponse> | estypes.IndicesExistsResponse> =>
-      this.client instanceof ES7Client ? this.client.indices.exists(params) : this.client.indices.exists(params),
+    exists: async(params: ES7RequestParams.IndicesExists & estypes.IndicesExistsRequest)
+    : Promise<IndicesExistsResponse | estypes.IndicesExistsResponse> =>
+      this.client instanceof ES7Client ? (await this.client.indices.exists(params)).body as IndicesExistsResponse : this.client.indices.exists(params),
     existsAlias: (params: ES7RequestParams.IndicesExistsAlias & estypes.IndicesExistsAliasRequest)
     : Promise<ES7ApiResponse<IndicesExistsAliasResponse> | estypes.IndicesExistsAliasResponse> =>
       this.client instanceof ES7Client ? this.client.indices.existsAlias(params) : this.client.indices.existsAlias(params),
@@ -60,17 +60,18 @@ export default class ElasticsearchClient {
     putAlias: (params: ES7RequestParams.IndicesPutAlias & estypes.IndicesPutAliasRequest) =>
       this.client instanceof ES7Client ? this.client.indices.putAlias(params) : this.client.indices.putAlias(params),
     // eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
-    getAlias: (params: ES7RequestParams.IndicesGetAlias & estypes.IndicesGetAliasRequest) =>
-      this.client instanceof ES7Client ? this.client.indices.getAlias(params) : this.client.indices.getAlias(params),
+    getAlias: async(params: ES7RequestParams.IndicesGetAlias & estypes.IndicesGetAliasRequest) =>
+      this.client instanceof ES7Client ? (await this.client.indices.getAlias(params)).body : this.client.indices.getAlias(params),
     // eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
     updateAliases: (params: ES7RequestParams.IndicesUpdateAliases & estypes.IndicesUpdateAliasesRequest) =>
       this.client instanceof ES7Client ? this.client.indices.updateAliases(params) : this.client.indices.updateAliases(params),
-    validateQuery: (params: ES7RequestParams.IndicesValidateQuery & estypes.IndicesUpdateAliasesRequest)
-    : Promise<ES7ApiResponse<ValidateQueryResponse> | estypes.IndicesValidateQueryResponse> =>
-      this.client instanceof ES7Client ? this.client.indices.validateQuery(params) : this.client.indices.validateQuery(params),
-    stats: (params: ES7RequestParams.IndicesStats & estypes.IndicesStatsRequest)
-    : Promise<ES7ApiResponse<IndicesStatsResponse> | estypes.IndicesStatsResponse> =>
-      this.client instanceof ES7Client ? this.client.indices.stats(params) : this.client.indices.stats(params),
+    validateQuery: async(params: ES7RequestParams.IndicesValidateQuery & estypes.IndicesValidateQueryRequest)
+    : Promise<ValidateQueryResponse | estypes.IndicesValidateQueryResponse> =>
+      // eslint-disable-next-line max-len
+      this.client instanceof ES7Client ? (await this.client.indices.validateQuery(params)).body as ValidateQueryResponse : this.client.indices.validateQuery(params),
+    stats: async(params: ES7RequestParams.IndicesStats & estypes.IndicesStatsRequest)
+    : Promise<IndicesStatsResponse | estypes.IndicesStatsResponse> =>
+      this.client instanceof ES7Client ? (await this.client.indices.stats(params)).body as IndicesStatsResponse : this.client.indices.stats(params),
   };
 
   nodes = {
@@ -87,8 +88,8 @@ export default class ElasticsearchClient {
     return this.client instanceof ES7Client ? this.client.reindex(params) : this.client.reindex(params);
   }
 
-  search(params: ES7RequestParams.Search & estypes.SearchRequest): Promise<ES7ApiResponse<SearchResponse> | estypes.SearchResponse> {
-    return this.client instanceof ES7Client ? this.client.search(params) : this.client.search(params);
+  async search(params: ES7RequestParams.Search & estypes.SearchRequest): Promise<SearchResponse | estypes.SearchResponse> {
+    return this.client instanceof ES7Client ? (await this.client.search(params)).body as SearchResponse : this.client.search(params);
   }
 
 }

+ 12 - 23
apps/app/src/server/service/search-delegator/elasticsearch.ts

@@ -232,11 +232,8 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
     const tmpIndexName = `${indexName}-tmp`;
 
     // check existence
-    const indicesExistsResponse = await client.indices.exists({ index: indexName });
-    const isExistsMainIndex = this.isElasticsearchV7 ? indicesExistsResponse.body : indicesExistsResponse;
-
-    const tmpIndicesExistsResponse = await client.indices.exists({ index: tmpIndexName });
-    const isExistsTmpIndex = this.isElasticsearchV7 ? tmpIndicesExistsResponse.body : tmpIndicesExistsResponse;
+    const isExistsMainIndex = await client.indices.exists({ index: indexName });
+    const isExistsTmpIndex = await client.indices.exists({ index: tmpIndexName });
 
     // create indices name list
     const existingIndices: string[] = [];
@@ -252,12 +249,10 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
       };
     }
 
-    const indicesStatsResponse = await client.indices.stats({ index: existingIndices, metric: ['docs', 'store', 'indexing'] });
-    const indicesStats = this.isElasticsearchV7 ? indicesStatsResponse.body : indicesStatsResponse;
+    const indicesStats = await client.indices.stats({ index: existingIndices, metric: ['docs', 'store', 'indexing'] });
     const { indices } = indicesStats;
 
-    const indicesGetAliasResponse = await client.indices.getAlias({ index: existingIndices });
-    const aliases = this.isElasticsearchV7 ? indicesGetAliasResponse.body : indicesGetAliasResponse;
+    const aliases = await client.indices.getAlias({ index: existingIndices });
 
     const isMainIndexHasAlias = isExistsMainIndex && aliases[indexName].aliases != null && aliases[indexName].aliases[aliasName] != null;
     const isTmpIndexHasAlias = isExistsTmpIndex && aliases[tmpIndexName].aliases != null && aliases[tmpIndexName].aliases[aliasName] != null;
@@ -338,15 +333,13 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
     const tmpIndexName = `${indexName}-tmp`;
 
     // remove tmp index
-    const tmpIndicesExistsResponse = await client.indices.exists({ index: tmpIndexName });
-    const isExistsTmpIndex = this.isElasticsearchV7 ? tmpIndicesExistsResponse.body : tmpIndicesExistsResponse;
+    const isExistsTmpIndex = await client.indices.exists({ index: tmpIndexName });
     if (isExistsTmpIndex) {
       await client.indices.delete({ index: tmpIndexName });
     }
 
     // create index
-    const indicesExistsResponse = await client.indices.exists({ index: indexName });
-    const isExistsIndex = this.isElasticsearchV7 ? indicesExistsResponse.body : indicesExistsResponse;
+    const isExistsIndex = await client.indices.exists({ index: indexName });
     if (!isExistsIndex) {
       await this.createIndex(indexName);
     }
@@ -585,7 +578,6 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
     });
 
     let count = 0;
-    const isElasticsearchV7 = this.isElasticsearchV7;
     const writeStream = new Writable({
       objectMode: true,
       async write(batch, encoding, callback) {
@@ -597,11 +589,10 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
             body,
             // requestTimeout: Infinity,
           });
-          const result = isElasticsearchV7 ? bulkResponse.body : bulkResponse;
 
           count += (bulkResponse.items || []).length;
 
-          logger.info(`Adding pages progressing: (count=${count}, errors=${result.errors}, took=${result.took}ms)`);
+          logger.info(`Adding pages progressing: (count=${count}, errors=${bulkResponse.errors}, took=${bulkResponse.took}ms)`);
 
           if (shouldEmitProgress) {
             socket?.emit('addPageProgress', { totalCount, count, skipped });
@@ -676,22 +667,20 @@ class ElasticsearchDelegator implements SearchDelegator<Data, ESTermsKey, ESQuer
           query: query.body.query,
         },
       });
-      const result = this.isElasticsearchV7 ? validateQueryResponse.body : validateQueryResponse;
 
       // for debug
-      logger.debug('ES result: ', result);
+      logger.debug('ES result: ', validateQueryResponse);
     }
 
     const searchResponse = await this.client.search(query);
-    const result = this.isElasticsearchV7 ? searchResponse.body : searchResponse;
 
     return {
       meta: {
-        took: result.took,
-        total: result.hits.total.value,
-        hitsCount: result.hits.hits.length,
+        took: searchResponse.took,
+        total: searchResponse.hits.total.value,
+        hitsCount: searchResponse.hits.hits.length,
       },
-      data: result.hits.hits.map((elm) => {
+      data: searchResponse.hits.hits.map((elm) => {
         return {
           _id: elm._id,
           _score: elm._score,