Elasticsearch
# 简介
Elasticsearch是Java语言开发的,基于Lucene实现,是一款非常强大的开源搜索引擎,elasticsearch结合kibana、Logstash、Beats,也就是elastic stack(ELK)。被广泛应用在日志数据分析、实时监控等领域。
Lucene是一个Java语言的搜索引擎类库,是Apache公司的顶级项目,由DougCutting于1999年研发。
Lucene的优势:
- 易扩展
- 高性能(基于倒排索引)
Lucene的缺点:
- 只限于Java语言开发
- 学习曲线陡峭
- 不支持水平扩展
2004年Shay Banon基于Lucene开发了Compass,2010年Shay Banon重写了Compass命名为Elasticsearch
相比Lucene,Elasticsearch具备下列优势:
- 支持分布式,可水平扩展
- 提供Restful接口,可被任何语言调用
应用场景
- 海量数据的查询(京东,淘宝商品搜索)
- 日志数据分析 (ELK ELasticsearch + Logstach + Kibna 搭建日志监控平台)
- 实时数据分析
# 倒排索引
ES的底层原理使用的是倒排索引,也是ES为什么查询海量数据时为什么这么快功能这么强大的原因。
数据库,例如MySQL使用的是正排索引,正排索引存在一个问题:
- 在使用模糊查询时,会使索引失效,导致全表扫描,一条条记录比较,筛选出包含关键字的记录,性能低下。
- 关系型数据库提供的查询功能太弱,比如我有一个商品是'华为手机',我搜'我想买台华为手机',就搜不到了。
elasticsearch基于Lucene使用的是倒排索引,倒排索引表的构建过程:
- 对所有的数据进行拆分(分词),拆分成唯一的一个个词语(词条term)
- 然后建立词条与每条数据的文档ID的对应关系出现频率
比如mysql中有这么两条数据,id是索引字段
| id | name |
|---|---|
| 1 | 华为手机 |
| 2 | 小米手机 |
在ES建立的索引表示例如下:
| (Term 词条) | (Doc ID,Freq 频率) | (Pos 位置) |
|---|---|---|
| 华为 | (1,1) | (1,0) |
| 小米 | (2,1) | (2,0) |
| 手机 | (1,1)(2,1) | (1,1)(2,1) |
# 存储和搜索原理
首先ES没有数据库和表的概念,对比一下MySQL:
| MySQL | Elasticsearch | 说明 |
|---|---|---|
| Table | Index | 索引(index),就是文档的集合,类似数据库的表(table) |
| Row | Document | 文档(Document),就是一条条的数据,类似数据库中的行(Row),文档都是JSON格式 |
| Column | Field | 字段(Field),就是JSON文档中的字段,类似数据库中的列(Column) |
| Schema | Mapping | Mapping(映射)是索引中文档的约束,例如字段类型约束。类似数据库的表结构(Schema) |
| SQL | DSL(Domain Specification Lanuage) | DSL是elasticsearch提供的JSON风格的请求语句,用来操作elasticsearch,实现CRUD |
索引库中包含文档
概念理解:
- 索引库(index)=数据库
- 类型(type)=表
- 映射(mapping)=表结构
- 文档(document)=一行数据
type在ES7.0已经存在默认值,一般不需要设置,在8.0移除了这个概念
假设MySQL数据库存在以下几条数据:
ES中存储及搜索原理图:
MySQL的表导入ES后会将每行的数据转化成文档,ES在存储文档的时候,会使用分词器对它需要分词的字段内容进行切分,切分成一个个词条,再建立每个词条与文档唯一标识(id)的对应关系,即倒排索引,最后生成索引库。
在搜索时,同样也会使用分词器对用户输入的检索词进行分词,然后将分词后的结果根据这个索引库的倒排索引匹配对应的文档id,最后再通过这个id去查询对应的文档数据。
MySQL中查询存在的问题ES是怎么解决的?
- 性能低:使用模糊查询,左边有通配符,不会走索引,会全表扫描,性能低
- ES解决方案:如果使用"手机"作为关键字查询,ES生成的倒排索引中,词条会排序,形成一颗树形结构,提升词条的查询速度。
- 功能弱:如果以"华为手机" 作为条件,查询不出来数据
- ES解决方案:如果使用"华为手机"作为关键字查询,ES也可以对搜索的关键字进行分词,比如将华为手机拆分成"华为"、"手机",然后根据两个词分词去倒排索引中进行查询,然后取结果的并集。
MySQL和ES在企业中往往是结合使用的:
- Mysql:擅长事务类型操作,可以确保数据的安全和一致性,进行复杂的多表查询
- Elasticsearch:擅长海量数据的搜索、分析、计算
- 对安全性要求较高的写操作,使用mysql实现
- 对查询性能要求较高的搜索需求,使用elasticsearch实现
- 两者再基于某种方式,实现数据的同步,保证一致性

# Docker安装和启动ES与Kibana
docker拉取ES与Kibana的镜像
创建并运行ES容器
docker run -id --name elasticsearch -d -p 9200:9200 -v /usr/share/elasticsearch/plugins:/usr/share/elasticsearch/plugins -e "discovery.type=single-node" elasticsearch:7.4.0
9200端口:http端口
创建并运行Kibana容器
docker run -d -p 5601:5601 --link elasticsearch --name=kibana -e "ELASTICSEARCH_URL=http://192.168.66.133:9200" kibana:7.4.0
6501端口:http端口
# DSL操作
# 索引库操作
# 操作索引库
# 新增索引库
PUT person
# 查询索引库
GET person
# 删除索引库
DELETE person
# 关闭索引库(注意:删除索引库会删除数据,但关闭索引库不会删除数据,只是暂时无法修改数据)
POST person/_close
# 打开索引库
POST person/_open
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 映射操作
# 添加映射/创建索引,并设置映射
PUT person
{
"mappings": {
"properties": {
"name":{
"type": "text"
},
"age":{
"type": "integer"
},
"pic":{
"type": "keyword",
"index": false // 不作为查询条件参与查询,参与查询条件会报错
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
**注意:**在ES中,一旦索引库的映射创建成功,字段不能修改或删除。如果修改字段,只能重构索引库,重后索引库后需要全量同步索引库(每天定时做)
ES中包含两类数据类型:简单数据类型和复杂数据类型
1) 简单数据类型
- 字符串
- text:可以分词,不支持聚合(统计)
- keyword:不会分词,将全部内容作为一个词条,支持聚合(统计)
例如:有个文档(相当于数据库一条数据),其中一个字段的值是华为手机
text: 华为、手机
keyword: 华为手机
2
3
- 数值 一定不分词

- 布尔(boolean)
- 二进制(binary)
- 范围类型(integer_range, float_range, long_range, double_range, date_range)(了解即可)
- 日期(date)
2)复杂数据类型
- 数组 []:没有专用的array数据类型,任何一个字段的值,都可以被添加0个到多个,但要求他们的类型必须一致,当类型一直含有多个值存储到ES中会自动转化成数组类型 ["eric","jack"] List Set
- 对象 {} Map User {key:value,key:value}

面试问题:你在使用ES时有没有什么心得?
可以说一下创建索引库导入MySQL数据时,对于字段类型的考究,比如尽量的减少分词字段的出现,避免不必要的分词,减少分词开销,加快ES的检索效率。以及可以维护一个冗余字段,利用copy_to来整合需要参与查询的分词字段,之后查询直接查询这个冗余字段即可,就不需要分开查询多个字段了。
# 操作文档
# 文档操作
# 新增文档(没有分配_id值)
POST person/_doc
{
"name":"翠花",
"age":18,
"sex":"女"
}
# 新增文档(自行分配_id值)id存在=覆盖
POST person/_doc/2
{
"name":"如花",
"age":18,
"sex":"男"
}
# 修改文档
PUT person/_doc/2
{
"name":"如花",
"age":20,
"sex":"男"
}
# 删除文档
DELETE person/_doc/2
# 查询所有文档
GET person/_search
# 根据_id查询文档
GET person/_doc/DWI8sYABW73hEu2kmHr-
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
关于Elasticsearch的type类型问题
type是什么? 类型,把文档进行分类
在ES6.x以前,必须自定义type
在ES7.x时,必须设置一个_doc类型(ES分配了默认_doc)
**注意:**ES每个文档必须有一个_id字段(类型keyword)
如果不分配_id值,ES会自动生成随机ID值,如果自行分配_id值,则使用我们的值填充_id值,为了方便数据操作和维护,一般会将_id值与MySQL导入的文档数据的主键id绑定。
# 建库示例
PUT /hotel
{
"settings": {
"analysis": {
"analyzer": {
"ik_py":{
"tokenizer":"ik_smart",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"id":{
"type": "keyword"
},
"name":{
"type": "text",
"analyzer": "ik_py",
"search_analyzer": "ik_smart",
"copy_to": "all"
},
"address":{
"type": "keyword",
"index": false
},
"price":{
"type": "integer"
},
"score":{
"type": "integer"
},
"brand":{
"type": "keyword",
"copy_to": "all"
},
"city":{
"type": "keyword"
},
"starName":{
"type": "keyword"
},
"business":{
"type": "keyword",
"copy_to": "all"
},
"location":{
"type": "geo_point"
},
"pic":{
"type": "keyword",
"index": false
},
"all":{
"type": "text",
"analyzer": "ik_py",
"search_analyzer": "ik_smart"
},
"suggestion":{
"type": "completion",
"analyzer": "ik_py",
"search_analyzer": "ik_smart"
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# 分词器
分词器(Analyzer)是将一段文本,按照一定逻辑,拆分成多个词语的一种工具,如:华为手机 ---> 华为、手、手机,ElasticSearch 内置分词器有以下几种:
- Standard Analyzer - 默认分词器,按词/字切分,小写处理 (英文)
- Simple Analyzer - 按照非字母切分(符号被过滤),小写处理
- Stop Analyzer - 小写处理,停用词过滤(the,a,is)
- Whitespace Analyzer - 按照空格切分,不转小写
- Keyword Analyzer - 不分词,直接将输入当作输出
- Patter Analyzer - 正则表达式,默认\W+(非字符分割) (中文会被去掉)
- Language - 提供了30多种常见语言的分词器
ES提供了一个接口给我们来验证分词效果,如下所示:
# 分词效果验证
GET _analyze
{
"text": "我爱黑马程序员",
"analyzer": "standard"
}
2
3
4
5
6
**注意:**ElasticSearch 内置分词器对中文很不友好,处理方式为:一个字一个词。
#
IK分词器
IKAnalyzer是一个开源的,基于java语言开发的轻量级的中文分词工具包,是一个基于Maven构建的项目,具有60万字/秒的高速处理能力,并且支持用户词典扩展定义。下载地址:https://github.com/medcl/elasticsearch-analysis-ik/releases (opens new window)
分词器的核心:
- 词库
- 分词算法
IK分词器提供了两种分词算法:
- ik_smart:最小分词法 我是程序员 -> 我 是 程序员
- ik_max_word:最细分词法 我是程序员 -> 我 是 程序员 程序 员
一般最小分词就够用了,过细反而会影响性能
# 安装
登录Elasticsearch容器,拷贝文件
docker exec -it elasticsearch /usr/share/elasticsearch/plugins
或者上传ik分词器目录到linux(放在/usr/share/elasticsearch/plugins目录下)
重启ES容器
docker restart elasticsearch
# 扩展词典
1)打开IK分词器config目录:
2)在IKAnalyzer.cfg.xml配置文件内容添加:
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
<properties>
<comment>IK Analyzer 扩展配置</comment>
<!--用户可以在这里配置自己的扩展字典 *** 添加扩展词典-->
<entry key="ext_dict">ext.dic</entry>
</properties>
2
3
4
5
6
7
3)新建一个 ext.dic,可以参考config目录下复制一个配置文件进行修改
传智播客
奥力给
2
**注意:**ext.dic 文件的编码必须是 UTF-8 格式,严禁使用Windows记事本编辑
4)重启elasticsearch
docker restart es
# 查看 日志
docker logs -f elasticsearch
2
3
4

# 停用词词典
IK分词器也提供了强大的停用词功能,让我们在索引时就直接忽略当前的停用词汇表中的内容。
1)IKAnalyzer.cfg.xml配置文件内容添加:
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
<properties>
<comment>IK Analyzer 扩展配置</comment>
<!--用户可以在这里配置自己的扩展字典-->
<entry key="ext_dict">ext.dic</entry>
<!--用户可以在这里配置自己的扩展停止词字典 *** 添加停用词词典-->
<entry key="ext_stopwords">stopword.dic</entry>
</properties>
2
3
4
5
6
7
8
9
3)在 stopword.dic 添加停用词
就业
**注意:**stopword.dic 文件的编码必须是 UTF-8 格式,严禁使用Windows记事本编辑
4)重启elasticsearch
# 重启服务
docker restart elasticsearch
docker restart kibana
# 查看 日志
docker logs -f elasticsearch
2
3
4
5
6
日志中已经成功加载stopword.dic配置文件
# 拼音分词器
要实现拼音分词检索,就必须对文档按照拼音分词。在GitHub上恰好有elasticsearch的拼音分词插件。地址:https://github.com/medcl/elasticsearch-analysis-pinyin (opens new window)
详细安装步骤可以参考IK分词器的安装过程。
测试如下
POST /_analyze
{
"text": "如家酒店还不错",
"analyzer": "pinyin"
}
2
3
4
5

# 自定义分词器
默认的拼音分词器会将每个汉字单独分为拼音,而我们希望的是每个词条形成一组拼音,需要对拼音分词器做个性化定制,形成自定义分词器。
elasticsearch中分词器(analyzer)的组成包含三部分:
- character filters:在tokenizer之前对文本进行处理。例如删除字符、替换字符
- tokenizer:将文本按照一定的规则切割成词条(term)。例如keyword,就是不分词;还有ik_smart
- tokenizer filter:将tokenizer输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等

声明自定义分词器的语法如下:
PUT /test
{
"settings": {
"analysis": {
"analyzer": { // 自定义分词器
"my_analyzer": { // 分词器名称
"tokenizer": "ik_max_word",
"filter": "py"
}
},
"filter": { // 自定义tokenizer filter
"py": { // 过滤器名称
"type": "pinyin", // 过滤器类型,这里是pinyin
// 设置分词器参数
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"name": {
"type": "text",
"analyzer": "my_analyzer",
"search_analyzer": "ik_max_word"
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# HighLevelAPI
ES官方提供了各种不同语言的客户端,用来操作ES。这些客户端的本质就是组装DSL语句,通过http请求发送给ES。官方文档地址:https://www.elastic.co/guide/en/elasticsearch/client/index.html (opens new window)
其中的Java Rest Client又包括两种:
- Java Low Level Rest Client
- Java High Level Rest Client

# SpringBoot整合HighLevelAPI
1)导入相关依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.8.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>cn.itcast.demo</groupId>
<artifactId>hotel-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>hotel-demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<!-- 导入elasticsearch的RestAPI依赖 -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
2)配置application.yml
spring:
elasticsearch:
rest:
uris:
- http://192.168.66.133:9200
2
3
4
5
3)编写测试类
package cn.itcast.hotel;
import org.elasticsearch.client.RestHighLevelClient;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* 演示HighLevelRestClient的操作
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = HotelDemoApplication.class )
public class HotelIndexTest {
/**
* 注入RestHighLevelClient对象
*/
@Autowired
private RestHighLevelClient highLevelClient;
/**
* 测试环境是否可用
*/
@Test
public void testConn(){
System.out.println(highLevelClient);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 建立索引
package cn.itcast.hotel;
import org.elasticsearch.client.IndicesClient;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.io.IOException;
/**
* 演示HighLevelRestClient的操作
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = HotelDemoApplication.class )
public class HotelIndexTest {
/**
* 注入RestHighLevelClient对象
*/
@Autowired
private RestHighLevelClient highLevelClient;
/**
* 测试连接
*/
@Test
public void testConn(){
System.out.println(highLevelClient);
}
/**
* 创建索引库
*/
@Test
public void testCreataIndex() throws Exception {
//1.创建索引操作对象
IndicesClient indicesClient = highLevelClient.indices();
//2.创建请求(创建索引): 用于DSL语句
CreateIndexRequest request = new CreateIndexRequest("hotel"); //hotel: 就是索引库名
//3.执行请求:发送DSL
CreateIndexResponse response = indicesClient.create(request, RequestOptions.DEFAULT);
System.out.println(response.isAcknowledged());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# 创建索引并添加映射
在Kibana上测试编写DSL语句
# index属性:该字段是否索引,默认true
# all 为了把用户搜索条件聚合在一起设计的,方便搜索
# copy_to: 把字段值复制到另一个字段上面去
GET hotel/_mapping
PUT hotel
{
"mappings": {
"properties": {
"id":{
"type": "keyword"
},
"name":{
"type": "text",
"analyzer": "ik_smart",
"copy_to": "all"
},
"address":{
"type": "keyword"
},
"price":{
"type": "integer"
},
"score":{
"type": "integer"
},
"brand":{
"type": "keyword"
},
"city":{
"type": "keyword"
},
"starName":{
"type": "keyword"
},
"business":{
"type": "text",
"analyzer": "ik_smart",
"copy_to": "all"
},
"location":{
"type": "geo_point"
},
"pic":{
"type": "keyword",
"index": false
},
"all":{
"type": "text",
"analyzer": "ik_smart"
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* 创建索引库并设置映射
*/
@Test
public void testCreateIndexAndMaping() throws IOException {
//获取客户端对象
IndicesClient indices = highLevelClient.indices();
//创建操作对象
CreateIndexRequest request = new CreateIndexRequest("hotel");
//设置索引库的映射信息
String mapping = "{\n" +
" \"properties\": {\n" +
" \"id\":{\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"name\":{\n" +
" \"type\": \"text\",\n" +
" \"analyzer\": \"ik_smart\",\n" +
" \"copy_to\": \"all\"\n" +
" },\n" +
" \"address\":{\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"price\":{\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"score\":{\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"brand\":{\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"city\":{\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"starName\":{\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"business\":{\n" +
" \"type\": \"text\",\n" +
" \"analyzer\": \"ik_smart\",\n" +
" \"copy_to\": \"all\"\n" +
" },\n" +
" \"location\":{\n" +
" \"type\": \"geo_point\"\n" +
" },\n" +
" \"pic\":{\n" +
" \"type\": \"keyword\",\n" +
" \"index\": false\n" +
" },\n" +
" \"all\":{\n" +
" \"type\": \"text\",\n" +
" \"analyzer\": \"ik_smart\"\n" +
" }\n" +
" }\n" +
" }";
request.mapping(mapping, XContentType.JSON);
//执行操作
CreateIndexResponse response = indices.create(request, RequestOptions.DEFAULT);
System.out.println(response.isAcknowledged());
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# 删除索引
/**
* 删除索引库
*/
@Test
public void testDeleteIndex() throws IOException {
//获取客户端对象
IndicesClient indices = highLevelClient.indices();
//创建操作对象
DeleteIndexRequest request = new DeleteIndexRequest("hotel"); //索引名
//执行操作
AcknowledgedResponse response = indices.delete(request, RequestOptions.DEFAULT);
System.out.println(response.isAcknowledged());
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 添加文档
/**
* 新增文档
*/
@Test
public void testAddDoc() throws IOException {
//根据id查询酒店信息
Hotel hotel = hotelMapper.selectById(36934L);
//把酒店存入ES中
HotelDoc hotelDoc = new HotelDoc(hotel);
//创建请求对象
IndexRequest indexRequest = new IndexRequest("hotel").id(hotelDoc.getId().toString());
//转换json
String json = objectMapper.writeValueAsString(hotelDoc);
indexRequest.source(json,XContentType.JSON);//设置内容
//执行操作
IndexResponse index = highLevelClient.index(indexRequest, RequestOptions.DEFAULT);
System.out.println(index.getId());
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 修改文档
/**
* 修改文档
*/
@Test
public void testUpdateDoc() throws Exception{
//1.查询一个酒店信息
Hotel hotel = hotelService.getById(36934L);
//2.把Hotel转换为HotelDoc
HotelDoc hotelDoc = new HotelDoc(hotel);
//修改价格
hotelDoc.setPrice(386);
//3.转换成json字符串
String hotelJson = mapper.writeValueAsString(hotelDoc);
//4.创建操作请求
IndexRequest request = new IndexRequest("hotel").id(hotelDoc.getId().toString()).source(hotelJson,XContentType.JSON);
//5.执行请求
IndexResponse response = highLevelClient.index(request, RequestOptions.DEFAULT);
System.out.println(response.getId());
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 查询文档
/**
* 查询一个文档
*/
@Test
public void testFindDoc() throws Exception{
Long id = 36934L;
//1.创建请求
GetRequest request = new GetRequest("hotel").id(id.toString());
//2.执行请求
GetResponse response = highLevelClient.get(request, RequestOptions.DEFAULT);
//3.取出结果
String hotelJson = response.getSourceAsString();
HotelDoc hotelDoc = mapper.readValue(hotelJson, HotelDoc.class);
System.out.println(hotelDoc);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 删除文档
/**
* 删除文档
*/
@Test
public void testDeleteDoc() throws IOException {
Long id = 36934L;
//1.创建请求.
DeleteRequest request = new DeleteRequest("hotel").id(id.toString());
//2.执行请求
DeleteResponse response = highLevelClient.delete(request, RequestOptions.DEFAULT);
System.out.println(response.getId());
}
2
3
4
5
6
7
8
9
10
11
12
13
14
# 批量添加
Bulk 批量操作是将文档的增删改查一些列操作,通过一次请求全都做完。减少网络传输次数。
**应用场景:**ES索引库数据初始化的时候,可以将数据库的数据查询出来通过批量操作导入到索引库中
/**
* 批量新增文档
*/
@Test
public void testBulkAddDoc() throws IOException {
//全部查询
List<Hotel> hotelList = hotelMapper.selectList(null);
//创建批量操作的DSL请求对象
BulkRequest bulkRequest = new BulkRequest(); //本质上就是内存缓存区
for(Hotel hotel:hotelList){
//把酒店存入ES中
HotelDoc hotelDoc = new HotelDoc(hotel);
//创建请求对象
IndexRequest indexRequest = new IndexRequest("hotel").id(hotelDoc.getId().toString());
//转换json
String json = objectMapper.writeValueAsString(hotelDoc);
indexRequest.source(json,XContentType.JSON);//设置内容
//加入批量缓存对象(在ES没有执行DSL)
bulkRequest.add(indexRequest);
}
//最后执行批量操作(把DSL发送给ES执行)
highLevelClient.bulk(bulkRequest,RequestOptions.DEFAULT);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 异步批量添加
@Test
public void addAll() throws InterruptedException {
// 88475
int totalPage = (88475 / 7000) + 1;
long start = System.currentTimeMillis();
CountDownLatch countDownLatch = new CountDownLatch(totalPage);
for (int i = 1; i <= totalPage; i++) {
// 查询数据
PageDTO<Item> pageDTO = itemClient.findByPage(i, 7000);
List<Item> list = pageDTO.getList();
CompletableFuture.runAsync(() -> {
try {
// 创建批量操作的DSL请求对象
BulkRequest bulkRequest = new BulkRequest(); //本质上就是内存缓存区
for (Item item : list) {
// 创建请求对象
IndexRequest request = new IndexRequest("item").id(item.getId().toString());
// 转json
String json = objectMapper.writeValueAsString(item);
// 放入请求
request.source(json, XContentType.JSON);
// 加入请求缓存
bulkRequest.add(request);
}
highLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
countDownLatch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}, Executors.newFixedThreadPool(totalPage));
}
countDownLatch.await();
log.info("[耗时]:" + (System.currentTimeMillis() - start));
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# ES高级搜索
Elasticsearch提供了基于JSON的DSL(Domain Specific Language (opens new window))来定义查询。常见的查询类型包括:
- 查询所有:查询出所有数据,一般测试用。例如:match_all
- 全文检索(full text)查询:利用分词器对用户输入内容分词,然后去倒排索引库中匹配。例如:
- match_query
- multi_match_query
- 精确查询:根据精确词条值查找数据,一般是查找keyword、数值、日期、boolean等类型字段。例如:
- ids
- range
- term
- 地理(geo)查询:根据经纬度查询。例如:
- geo_distance
- geo_bounding_box
- 复合(compound)查询:复合查询可以将上述各种查询条件组合起来,合并查询条件。例如:
- bool
- function_score
# 全文检索
常见的全文检索查询包括:
- match查询:单字段查询
- multi_match查询:多字段查询,任意一个字段符合条件就算符合查询条件
会对用户输入查询进行分词
match查询语法如下:
GET /indexName/_search
{
"query": {
"match": {
"FIELD": "TEXT"
}
}
}
2
3
4
5
6
7
8
mulit_match语法如下:
GET /indexName/_search
{
"query": {
"multi_match": {
"query": "TEXT",
"fields": ["FIELD1", " FIELD12"]
}
}
}
# multi_match 多字段全文检索
GET hotel/_search
{
"query": {
"multi_match": {
"query": "虹桥如家",
"fields": ["name","brand","business"]
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
交并集
# 操作方法(operator):
# or 默认值 把所有词条结果取并集
# and 把所有词条结果取交集
GET hotel/_search
{
"query": {
"match": {
"all": {
"operator": "and",
"query": "虹桥如家"
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
RestAPI
/**
* 全文检索
*/
@Test
public void testMatch() throws Exception {
//创建请求对象
SearchRequest request = new SearchRequest("hotel"); // hotel:索引库名称
//构建条件(拼接DSL语句)
//query里面的条件,使用QueryBuilders
//sort里面的条件,使用SortBuilders
//request.source().query(QueryBuilders.matchQuery("all","虹桥如家"));
//修改operator属性值
request.source().query(QueryBuilders.matchQuery("all","虹桥如家").operator(Operator.AND));
//执行条件,获取结果
SearchResponse response = highLevelClient.search(request, RequestOptions.DEFAULT);
//处理搜索的结果
//取出命中结果
SearchHits searchHits = response.getHits();
//取出命中总数
long total = searchHits.getTotalHits().value;
System.out.println("命中数:"+total);
//遍历命中结果列表
List<HotelDoc> hotelDocList = new ArrayList<>();
for(SearchHit searchHit : searchHits){
//获取结果json字符串
String json = searchHit.getSourceAsString();
//转换为对象
HotelDoc hotelDoc = mapper.readValue(json, HotelDoc.class);
hotelDocList.add(hotelDoc);
}
//打印结果
if(CollectionUtils.isNotEmpty(hotelDocList)){
hotelDocList.forEach(System.out::println);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 精确查询
精确查询一般是查找keyword、数值、日期、boolean等类型字段。所以不会对搜索条件分词。常见的有:
- term:根据词条精确值查询(针对非数值类型)
- 根据词条精确匹配,一般搜索keyword类型、数值类型、布尔类型、日期类型字段
- range:根据值的范围查询(针对数值类型)
- 根据数值范围查询,可以是数值、日期的范围
term查询语法
// term查询
GET /indexName/_search
{
"query": {
"term": {
"FIELD": {
"value": "VALUE"
}
}
}
}
# 需求:查询上海的酒店
GET hotel/_search
{
"query": {
"term": {
"city": {
"value": "上海"
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
range查询语法
// range查询
GET /indexName/_search
{
"query": {
"range": {
"FIELD": {
"gte": 10, // 这里的gte代表大于等于,gt则代表大于
"lte": 20 // lte代表小于等于,lt则代表小于
}
}
}
}
# 需求:查询价格500-100的酒店
GET hotel/_search
{
"query": {
"range": {
"price": {
"gte": 500,
"lte": 1000
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
RestAPI
/**
* 精确检索
*/
@Test
public void testTerm() throws Exception {
//创建请求对象
SearchRequest request = new SearchRequest("hotel"); // hotel:索引库名称
//构建条件(拼接DSL语句)
//request.source().query(QueryBuilders.termQuery("city","上海"));
request.source().query(QueryBuilders.rangeQuery("price").gte(500).lte(1000));
//执行条件,获取结果
SearchResponse response = highLevelClient.search(request, RequestOptions.DEFAULT);
handleResponse(response);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 地理坐标查询
所谓的地理坐标查询,其实就是根据经纬度查询,官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/current/geo-queries.html (opens new window)
常见的使用场景包括:
- 携程:搜索我附近的酒店
- 滴滴:搜索我附近的出租车
- 微信:搜索我附近的人
矩形范围查询,也就是geo_bounding_box查询,查询坐标落在某个矩形范围的所有文档:
查询时,需要指定矩形的左上、右下两个点的坐标,然后画出一个矩形,落在该矩形内的都是符合条件的点。
// geo_bounding_box查询
GET /indexName/_search
{
"query": {
"geo_bounding_box": {
"FIELD": {
"top_left": { // 左上点
"lat": 31.1,
"lon": 121.5
},
"bottom_right": { // 右下点
"lat": 30.9,
"lon": 121.7
}
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
附近查询
附近查询,也叫做距离查询(geo_distance):查询到指定中心点小于某个距离值的所有文档。
换句话来说,在地图上找一个点作为圆心,以指定距离为半径,画一个圆,落在圆内的坐标都算符合条件:
// geo_distance 查询
GET /indexName/_search
{
"query": {
"geo_distance": {
"FIELD": "31.21,121.5",// 圆心
"distance": "15km" // 半径
}
}
}
# 距离位置检索(圆形位置检索)
GET hotel/_search
{
"query": {
"geo_distance":{
"location":"31.1,121.5",
"distance": "10km"
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 地理位置检索
*/
@Test
public void testGeo() throws Exception {
//创建请求对象
SearchRequest request = new SearchRequest("hotel"); // hotel:索引库名称
//构建条件(拼接DSL语句)
request.source().query(QueryBuilders
.geoDistanceQuery("location").point(new GeoPoint("31.1,121.5"))
//.distance("10km"));
.distance("15",DistanceUnit.KILOMETERS));
//执行条件,获取结果
SearchResponse response = highLevelClient.search(request, RequestOptions.DEFAULT);
handleResponse(response);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 复合查询-算分查询
当我们利用match查询时,文档结果会根据与搜索词条的关联度打分(_score),返回结果时按照分值降序排列。
在elasticsearch中,早期使用的打分算法是TF-IDF算法,公式如下:
TF (词条频率): 描述某一词在一篇文档中出现的频繁程度。 IDF(逆文档频率): 降低所有文档中几乎都会出现的关键词的权重 。
在后来的5.1版本升级中,elasticsearch将算法改进为BM25算法,公式如下:
TF-IDF算法有一个缺陷,就是词条频率越高,文档得分也会越高,单个词条对文档影响较大。而BM25则会让单个词条的算分有一个上限,曲线更加平滑:
DSL语句
function score 查询中包含四部分内容:
- 原始查询条件:query部分,基于这个条件搜索文档,并且基于BM25算法给文档打分,原始算分(query score)
- 过滤条件:filter部分,符合该条件的文档才会重新算分
- 算分函数:符合filter条件的文档要根据这个函数做运算,得到的函数算分(function score),有四种函数
- weight:函数结果是常量
- field_value_factor:以文档中的某个字段值作为函数结果
- random_score:以随机数作为函数结果
- script_score:自定义算分函数算法
- 运算模式:算分函数的结果、原始查询的相关性算分,两者之间的运算方式,包括:
- multiply:相乘
- replace:用function score替换query score
- 其它,例如:sum、avg、max、min
function score的运行流程如下:
- 1)根据原始条件查询搜索文档,并且计算相关性算分,称为原始算分(query score)
- 2)根据过滤条件,过滤文档
- 3)符合过滤条件的文档,基于算分函数运算,得到函数算分(function score)
- 4)将原始算分(query score)和函数算分(function score)基于运算模式做运算,得到最终结果,作为相关性算分。
其中的关键点是:
- 过滤条件:决定哪些文档的算分被修改
- 算分函数:决定函数算分的算法
- 运算模式:决定最终算分结果
示例
GET /hotel/_search
{
"query": {
"function_score": {
"query": { .... }, // 原始查询,可以是任意条件
"functions": [ // 算分函数
{
"filter": { // 满足的条件,品牌必须是如家
"term": {
"brand": "如家"
}
},
"weight": 2 // 算分权重为2
}
],
"boost_mode": "sum" // 加权模式,求和
}
}
}
# 优先显示"北京"地区的 "如家"酒店
GET hotel/_search
{
"query": {
"function_score": {
"query": {
"match": {
"all": "如家"
}
},
"functions": [
{
"filter": {
"term": {
"city": "北京"
}
},
"weight": 20
}
],
"boost_mode": "multiply"
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# 复合查询-布尔查询
布尔查询是一个或多个查询子句的组合,每一个子句就是一个子查询。子查询的组合方式有:
- must:必须匹配每个子查询,类似“与”,参与算分
- should:选择性匹配子查询,类似“或”,参与算分
- must_not:必须不匹配,不参与算分,类似“非”
- filter:必须匹配,不参与算分
比如在搜索酒店时,除了关键字搜索外,我们还可能根据品牌、价格、城市等字段做过滤,每一个不同的字段,其查询的条件、方式都不一样,必须是多个不同的查询,而要组合这些查询,就必须用bool查询了。
需要注意的是,搜索时,参与打分的字段越多,查询的性能也越差。因此这种多条件查询时,建议这样做:
- 搜索框的关键字搜索,是全文检索查询,使用must查询,参与算分
- 其它过滤条件,采用filter查询。不参与算分
# 搜索名字包含“如家”,价格不高于400,在坐标31.21,121.5周围10km范围内的酒店
GET hotel/_search
{
"query": {
"bool": {
"must": [
{
"match": {
"all": "如家"
}
}
],
"must_not": [
{
"range": {
"price": {
"gt": 400
}
}
}
],
"filter": {
"geo_distance": {
"distance": "10km",
"location": "31.21,121.5"
}
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
RestAPI
/**
* 复合查询-算法函数检索
*/
@Test
public void testScore() throws Exception {
//创建请求对象
SearchRequest request = new SearchRequest("hotel"); // hotel:索引库名称
// 布尔查询
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
// 关键字
if (StringUtils.isEmpty(params.getKey())) {
boolQueryBuilder.must(QueryBuilders.matchAllQuery());
} else {
boolQueryBuilder.must(QueryBuilders.matchQuery("all", params.getKey()));
}
// 设置权重排名
FunctionScoreQueryBuilder functionScoreQueryBuilder = QueryBuilders.functionScoreQuery(
boolQueryBuilder,
new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{
new FunctionScoreQueryBuilder
.FilterFunctionBuilder(QueryBuilders.termQuery("isAD",true),
ScoreFunctionBuilders.weightFactorFunction(10))
}).boostMode(CombineFunction.MULTIPLY);
// 设置进请求中
request.source().query(functionScoreQueryBuilder);
handleResponse(response);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
通用布尔方法示例
private BoolQueryBuilder buildBasicBoolQuery(RequestParams params) {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
// 关键字
if (StringUtils.isEmpty(params.getKey())) {
boolQueryBuilder.must(QueryBuilders.matchAllQuery());
} else {
boolQueryBuilder.must(QueryBuilders.matchQuery("all", params.getKey()));
}
// 城市
if (StringUtils.isNotEmpty(params.getCity())) {
boolQueryBuilder.filter(QueryBuilders.termQuery("city", params.getCity()));
}
// 星级
if (StringUtils.isNotEmpty(params.getStarName())) {
boolQueryBuilder.filter(QueryBuilders.termQuery("starName", params.getStarName()));
}
// 品牌
if (StringUtils.isNotEmpty(params.getBrand())) {
boolQueryBuilder.filter(QueryBuilders.termQuery("brand", params.getBrand()));
}
// 价格
if (params.getMinPrice() != null && params.getMaxPrice() != null) {
boolQueryBuilder.filter(QueryBuilders.rangeQuery("price").gte(params.getMinPrice()).lte(params.getMaxPrice()));
}
return boolQueryBuilder;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 查询结果处理
elasticsearch默认是根据相关度算分(_score)来排序,但是也支持自定义方式对搜索结果排序 (opens new window)。可以排序字段类型有:keyword类型、数值类型、地理坐标类型、日期类型等。
# 普通字段排序
GET /indexName/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"FIELD": "desc" // 排序字段、排序方式ASC、DESC
}
]
}
# 需求:对“如家”酒店价格进行倒序排序
GET hotel/_search
{
"query": {
"match": {
"all": "如家"
}
},
"sort": [
{
"price": {
"order": "asc"
}
}
]
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
需求描述:酒店数据按照用户评价(score)降序排序,评价相同的按照价格(price)升序排序
# 地理坐标排序
GET /indexName/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"_geo_distance" : {
"FIELD" : "纬度,经度", // 文档中geo_point类型的字段名、目标坐标点
"order" : "asc", // 排序方式
"unit" : "km" // 排序的距离单位
}
}
]
}
# 需求:我的坐标是31.034661,121.612282,寻找我周围距离最近的酒店(由近到远)
GET hotel/_search
{
"sort": [
{
"_geo_distance": {
"location": "31.034661,121.612282",
"order": "asc",
"unit": "km"
}
}
]
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
这个查询的含义是:
- 指定一个坐标,作为目标点
- 计算每一个文档中,指定字段(必须是geo_point类型)的坐标 到目标点的距离是多少
- 根据距离排序
RestAPI
/**
* 结果排序
*/
@Test
public void testSort() throws Exception {
//创建请求对象
SearchRequest request = new SearchRequest("hotel"); // hotel:索引库名称
//request.source().query(QueryBuilders.matchQuery("all","如家"));
//普通字段排序
//request.source().sort(SortBuilders.fieldSort("price").order(SortOrder.ASC));
//地理位置排序
request.source().sort(SortBuilders
.geoDistanceSort("location",new GeoPoint("31.034661,121.612282"))
.order(SortOrder.ASC)
.unit(DistanceUnit.KILOMETERS));
//执行条件,获取结果
SearchResponse response = highLevelClient.search(request, RequestOptions.DEFAULT);
handleResponse(response);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 分页
elasticsearch 默认情况下只返回top10的数据。而如果要查询更多数据就需要修改分页参数了。elasticsearch中通过修改from、size参数来控制要返回的分页结果:
- from:从第几个文档开始,从0开始的
- size:总共查询几个文档
类似于mysql中的limit ?, ?
# 需求:查询第1页,每页显示20条
# from值算法=(页码-1)* size
GET hotel/_search
{
"query": {
"match": {
"all": "虹桥如家"
}
},
"from": 0,
"size": 20
}
2
3
4
5
6
7
8
9
10
11
12
13
RestAPI
/**
* 结果分页
*/
@Test
public void testPage() throws Exception {
//创建请求对象
SearchRequest request = new SearchRequest("hotel"); // hotel:索引库名称
request.source().query(QueryBuilders.matchQuery("all","虹桥如家"));
//设置from和size值进行分页
request.source().from(20).size(20);
//执行条件,获取结果
SearchResponse response = highLevelClient.search(request, RequestOptions.DEFAULT);
handleResponse(response);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 高亮显示
高亮显示的实现分为两步:
- 给文档中的所有关键字都添加一个标签,例如
<em>标签 - 页面给
<em>标签编写CSS样式
GET /hotel/_search
{
"query": {
"match": {
"FIELD": "TEXT" // 查询条件,高亮一定要使用全文检索查询
}
},
"highlight": {
"fields": { // 指定要高亮的字段
"FIELD": {
"pre_tags": "<em>", // 用来标记高亮字段的前置标签,默认拼接
"post_tags": "</em>" // 用来标记高亮字段的后置标签,默认拼接
}
}
}
}
# 对name字段内容进行高亮
# 高亮属性配置
# require_field_match: 该高亮字段是否必须参与搜索条件,true:必须参数(默认值),false: 可以不参与
GET hotel/_search
{
"query": {
"match": {
"all": "虹桥如家"
}
},
"highlight": {
"fields": {
"name": {
"require_field_match": "false"
}
},
"pre_tags": "<font color='red'>",
"post_tags": "</font>"
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
注意:
- 高亮是对关键字高亮,因此搜索条件必须带有关键字,而不能是范围这样的查询。
- 默认情况下,高亮的字段,必须与搜索指定的字段一致,否则无法高亮
- 如果要对非搜索字段高亮,则需要添加一个属性:required_field_match=false
/**
* 结果高亮
*/
@Test
public void testHighlight() throws Exception {
//创建请求对象
SearchRequest request = new SearchRequest("hotel"); // hotel:索引库名称
request.source().query(QueryBuilders.matchQuery("all","虹桥如家"));
//添加高亮处理
HighlightBuilder highlightBuilder = new HighlightBuilder()
.field("name").requireFieldMatch(false)
.preTags("<font color='red'>")
.postTags("</font>");
request.source().highlighter(highlightBuilder);
//执行条件,获取结果
SearchResponse response = highLevelClient.search(request, RequestOptions.DEFAULT);
handleResponse(response);
}
/**
* 搜索结果处理
* @param response
* @throws com.fasterxml.jackson.core.JsonProcessingException
*/
private void handleResponse(SearchResponse response) throws com.fasterxml.jackson.core.JsonProcessingException {
//处理搜索的结果
//取出命中结果
SearchHits searchHits = response.getHits();
//取出命中总数
long total = searchHits.getTotalHits().value;
System.out.println("命中数:"+total);
//遍历命中结果列表
List<HotelDoc> hotelDocList = new ArrayList<>();
for(SearchHit searchHit : searchHits){
//获取结果json字符串
String json = searchHit.getSourceAsString();
//转换为对象
HotelDoc hotelDoc = mapper.readValue(json, HotelDoc.class);
// 取出距离
if (sortValues != null && sortValues.length > 0){
System.out.println(hotelDoc.getId()+"距离您"+sortValues[0]+"公里");
}
//取出高亮的结果
HighlightField highlightField = searchHit.getHighlightFields().get("name");
if(highlightField!=null){
//覆盖原有的name值
String name = highlightField.getFragments()[0].toString();
hotelDoc.setName(name);
}
hotelDocList.add(hotelDoc);
}
//打印结果
if(CollectionUtils.isNotEmpty(hotelDocList)){
hotelDocList.forEach(System.out::println);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# 聚合搜索
聚合常见的有三类:
- **桶(Bucket)**聚合:用来对文档做分组 类似mysql的group by
- TermAggregation:按照文档字段值分组,例如按照品牌值分组、按照国家分组
- Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
- **度量(Metric)**聚合:用以计算一些值,比如:最大值、最小值、平均值等
- 类似于mysql统计函数 count,sum,max,min
- Avg:求平均值
- Max:求最大值
- Min:求最小值
- Stats:同时求max、min、avg、sum等
- **管道(pipeline)**聚合:其它聚合的结果为基础做聚合
**注意:**参加聚合的字段必须是keyword、日期、数值、布尔类型
# Bucket聚合(分组)
基本语法
GET /hotel/_search
{
"size": 0, // 设置分页size为0,不查询hit结果,只包含聚合结果
"aggs": { // 定义聚合
"brandAgg": { //给聚合起个名字
"terms": { // 聚合的类型,按照品牌值聚合,所以选择term
"field": "brand", // 参与聚合的字段
"size": 20 // 希望获取的聚合结果数量
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
聚合结果排序
默认情况下,Bucket聚合会统计Bucket内的文档数量,记为_count,并且按照_count降序排序。
GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"order": {
"_count": "asc" // 按照_count升序排列
},
"size": 20
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
限定聚合范围
默认情况下,Bucket聚合是对索引库的所有文档做聚合,但真实场景下,用户会输入搜索条件,因此聚合必须是对搜索结果聚合。那么聚合必须添加限定条件。
GET /hotel/_search
{
"query": {
"range": {
"price": {
"lte": 200 // 只对200元以下的文档聚合
}
}
},
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# Metric聚合(度量聚合)
统计每个品牌酒店的用户评分的min、max、avg等值
GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20
},
"aggs": { // 是brands聚合的子聚合,也就是分组后对每组分别计算
"score_stats": { // 聚合名称
"stats": { // 聚合类型,这里stats可以计算min、max、avg等
"field": "score" // 聚合字段,这里是score
}
}
}
}
}
}
# 需求:统计每个酒店品牌的平均得分,根据平均分倒序
GET hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20,
"order": {
"scoreAgg.avg": "desc"
}
},
"aggs": {
"scoreAgg": {
"stats": {
"field": "score"
}
}
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
RestAPI
/**
* 聚合查询
*/
@Test
public void testAgg() throws Exception {
//创建请求对象
SearchRequest request = new SearchRequest("hotel"); // hotel:索引库名称
request.source().size(0);
request.source().aggregation(AggregationBuilders
.terms("brandAgg").field("brand").size(20).order(BucketOrder.aggregation("scoreAgg.avg",false))
.subAggregation(
AggregationBuilders.stats("scoreAgg").field("score")
));
//执行条件,获取结果
SearchResponse response = highLevelClient.search(request, RequestOptions.DEFAULT);
//获取聚合结果
Aggregations aggregations = response.getAggregations();
Terms terms = aggregations.get("brandAgg");
List<? extends Terms.Bucket> buckets = terms.getBuckets();
for(Terms.Bucket bucket:buckets){
String brandName = bucket.getKeyAsString();
Stats stats = bucket.getAggregations().get("scoreAgg");
double sum = stats.getSum();
double max = stats.getMax();
double min = stats.getMin();
double avg = stats.getAvg();
System.out.println("品牌:"+brandName+",min:"+min+",max:"+max);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
**注意:**bucket聚合的结果在Terms对象中获取,Metric聚合的结果在度量对象中获取,如Stats、Avg。
桶数据获取公用方法示例
private List<String> getAggByName(Terms terms) {
ArrayList<String> list = new ArrayList<>();
List<? extends Terms.Bucket> buckets = terms.getBuckets();
for (Terms.Bucket bucket : buckets) {
list.add(bucket.getKeyAsString());
}
return list;
}
2
3
4
5
6
7
8
# 自动补全查询
Elasticsearch提供了Completion Suggester (opens new window)查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回。为了提高补全查询的效率,对于文档中字段的类型有一些约束:
- 参与补全查询的字段必须是completion类型。
- 字段的内容一般是用来补全的多个词条形成的数组。["如家酒店","如家宾馆"]
比如,一个这样的索引库:
PUT test
{
"mappings": {
"properties": {
"title":{
"type": "completion"
}
}
}
}
2
3
4
5
6
7
8
9
10
然后插入下面的数据:
# 示例数据
POST test/_doc
{
"title": ["Sony","WH-1000XM3"]
}
POST test/_doc
{
"title":["SK-II","PITERA"]
}
POST test/_doc
{
"title":["Nintendo","switch"]
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
查询的DSL语句如下:
// 自动补全查询
GET /test/_search
{
"suggest":{
"title_suggest":{
"text":"s",
"completion":{
"field":"title",
"skip_duplicates":true, // 去重
"size":10
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
示例
public List<String> suggestion(String key) {
try {
SearchRequest request = new SearchRequest("hotel");
request.source().size(0);
request.source().suggest(new SuggestBuilder().addSuggestion("sug",
SuggestBuilders.completionSuggestion("suggestion")
.prefix(key)
.skipDuplicates(true)
.size(100)));
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 获取响应结果
Suggest suggest = response.getSuggest();
CompletionSuggestion suggestion = suggest.getSuggestion("sug");
List<CompletionSuggestion.Entry.Option> options = suggestion.getOptions();
ArrayList<String> list = new ArrayList<>();
for (CompletionSuggestion.Entry.Option option : options) {
list.add(option.getText().toString());
}
return list;
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 搜索结果封装

/**
* 搜索结果处理
* @param response
* @throws com.fasterxml.jackson.core.JsonProcessingException
*/
private void handleResponse(SearchResponse response) throws com.fasterxml.jackson.core.JsonProcessingException {
//处理搜索的结果
//取出命中结果
SearchHits searchHits = response.getHits();
//取出命中总数
long total = searchHits.getTotalHits().value;
System.out.println("命中数:"+total);
//遍历命中结果列表
List<HotelDoc> hotelDocList = new ArrayList<>();
for(SearchHit searchHit : searchHits){
//获取结果json字符串
String json = searchHit.getSourceAsString();
//转换为对象
HotelDoc hotelDoc = mapper.readValue(json, HotelDoc.class);
// 取出距离
if (sortValues != null && sortValues.length > 0){
System.out.println(hotelDoc.getId()+"距离您"+sortValues[0]+"公里");
}
//取出高亮的结果
HighlightField highlightField = searchHit.getHighlightFields().get("name");
if(highlightField!=null){
//覆盖原有的name值
String name = highlightField.getFragments()[0].toString();
hotelDoc.setName(name);
}
hotelDocList.add(hotelDoc);
}
//打印结果
if(CollectionUtils.isNotEmpty(hotelDocList)){
hotelDocList.forEach(System.out::println);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# 业务DEMO扒取
import com.alibaba.cloud.commons.lang.StringUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hemall.search.pojo.RequestParams;
import com.hemall.search.service.ESService;
import com.hmall.common.dto.PageDTO;
import com.hmall.common.pojo.ItemDoc;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder;
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.SuggestBuilders;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Service
public class ESServiceImpl implements ESService {
@Autowired
private RestHighLevelClient highLevelClient;
private ObjectMapper objectMapper = new ObjectMapper();
/**
* 自动补全
*
* @param key
* @return
*/
@Override
public List<String> getSuggestList(String key) {
try {
// 创建请求
SearchRequest searchRequest = new SearchRequest("item");
// 构建DSL
searchRequest.source().size(0);
searchRequest.source().suggest(new SuggestBuilder().addSuggestion("sug",
SuggestBuilders.completionSuggestion("suggestion")
.prefix(key)
.skipDuplicates(true)
.size(13)
));
SearchResponse response = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
// 获取响应结果
Suggest suggest = response.getSuggest();
CompletionSuggestion suggestion = suggest.getSuggestion("sug");
List<CompletionSuggestion.Entry.Option> options = suggestion.getOptions();
ArrayList<String> list = new ArrayList<>();
for (CompletionSuggestion.Entry.Option option : options) {
list.add(option.getText().toString());
}
return list;
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
}
/**
* 过滤聚合项
*
* @param params
* @return
*/
@Override
public Map<String, List<String>> filters(RequestParams params) {
// 检查参数
checkParams(params);
try {
// 获取请求
SearchRequest searchRequest = new SearchRequest("item");
// 获取布尔查询结果
BoolQueryBuilder boolQueryBuilder = BascBoolQuery(params);
// 构建DSL
searchRequest.source().query(boolQueryBuilder);
searchRequest.source().size(0);
searchRequest.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(10));
searchRequest.source().aggregation(AggregationBuilders.terms("categoryAgg").field("category").size(10));
SearchResponse response = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
//获取聚合结果
Aggregations aggregations = response.getAggregations();
Terms brandAgg = aggregations.get("brandAgg");
Terms categoryAgg = aggregations.get("categoryAgg");
List<String> brandList = getAggByName(brandAgg);
List<String> categoryList = getAggByName(categoryAgg);
Map<String, List<String>> resultMap = new HashMap<>();
resultMap.put("brand", brandList);
resultMap.put("category", categoryList);
return resultMap;
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
}
/**
* 查询列表
*
* @param params
* @return
*/
@Override
public PageDTO<ItemDoc> getlist(RequestParams params) {
checkParams(params);
try {
// 创建请求对象
SearchRequest searchRequest = new SearchRequest("item");
// 构建DSL
BoolQueryBuilder boolQueryBuilder = BascBoolQuery(params);
// 算分
FunctionScoreQueryBuilder functionScoreQueryBuilder = QueryBuilders.functionScoreQuery(
boolQueryBuilder,
new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{
new FunctionScoreQueryBuilder
.FilterFunctionBuilder(QueryBuilders.termQuery("isAD", true),
ScoreFunctionBuilders.weightFactorFunction(10)
)});
searchRequest.source().query(functionScoreQueryBuilder);
Integer page = (params.getPage() - 1) * params.getSize();
searchRequest.source().from(page);
searchRequest.source().size(params.getSize());
searchRequest.source().trackTotalHits(true);
// 设置高亮
searchRequest.source().highlighter(new HighlightBuilder()
.field("name").requireFieldMatch(false)
.field("brand").requireFieldMatch(false)
.field("category").requireFieldMatch(false)
);
// 排序
if (!"default".equals(params.getSortBy()) && StringUtils.isNotEmpty(params.getSortBy())) {
searchRequest.source().sort(SortBuilders.fieldSort(params.getSortBy()).order(SortOrder.ASC));
}
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(functionScoreQueryBuilder);
System.out.println("拼接的查询请求======");
System.out.println(searchSourceBuilder.toString());
// 发送请求
SearchResponse response = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
// 取出命中结果
SearchHits searchHits = response.getHits();
// 取出命中总数
long total = searchHits.getTotalHits().value;
// 取出
List<ItemDoc> itemDocs = handleResponse(searchHits);
return new PageDTO<>(total, itemDocs);
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
}
/**
* ====================== 公用方法 ==========================
*/
private List<String> getAggByName(Terms terms) {
ArrayList<String> list = new ArrayList<>();
List<? extends Terms.Bucket> buckets = terms.getBuckets();
for (Terms.Bucket bucket : buckets) {
list.add(bucket.getKeyAsString());
}
return list;
}
private BoolQueryBuilder BascBoolQuery(RequestParams params) {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
// 关键字
if (StringUtils.isEmpty(params.getKey())) {
boolQueryBuilder.must(QueryBuilders.matchAllQuery());
} else {
boolQueryBuilder.must(QueryBuilders.matchQuery("all", params.getKey()));
}
// 品牌
if (StringUtils.isNotEmpty(params.getBrand())) {
boolQueryBuilder.filter(QueryBuilders.termQuery("brand", params.getBrand()));
}
// 星级
if (StringUtils.isNotEmpty(params.getCategory())) {
boolQueryBuilder.filter(QueryBuilders.termQuery("category", params.getCategory()));
}
// 价格
if (params.getMinPrice() != null && params.getMaxPrice() != null) {
boolQueryBuilder.filter(QueryBuilders.rangeQuery("price").gte(params.getMinPrice()).lte(params.getMaxPrice()));
}
return boolQueryBuilder;
}
/**
* 搜索结果处理
*
* @param
* @throws com.fasterxml.jackson.core.JsonProcessingException
*/
private List<ItemDoc> handleResponse(SearchHits searchHits) throws com.fasterxml.jackson.core.JsonProcessingException {
//处理搜索的结果
//遍历命中结果列表
List<ItemDoc> itemDocList = new ArrayList<>();
for (SearchHit searchHit : searchHits) {
//获取结果json字符串
String json = searchHit.getSourceAsString();
//转换为对象
ItemDoc itemDoc = objectMapper.readValue(json, ItemDoc.class);
//取出高亮的结果
HighlightField name = searchHit.getHighlightFields().get("name");
if (name != null) {
itemDoc.setName(name.getFragments()[0].toString());
}
HighlightField brand = searchHit.getHighlightFields().get("brand");
if (brand != null) {
itemDoc.setBrand(brand.getFragments()[0].toString());
}
HighlightField category = searchHit.getHighlightFields().get("category");
if (category != null) {
itemDoc.setCategory(category.getFragments()[0].toString());
}
itemDocList.add(itemDoc);
}
return itemDocList;
}
private void checkParams(RequestParams params) {
if (params.getPage() == null) params.setPage(1);
if (params.getSize() == null) params.setSize(20);
if (params.getMinPrice() == null) {
params.setMinPrice(0);
} else {
params.setMinPrice(params.getMinPrice() * 100);
}
if (params.getMaxPrice() == null) {
params.setMaxPrice(99999900);
} else {
params.setMaxPrice(params.getMaxPrice() * 100);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
# 数据同步方案
常见的数据同步方案有三种:
- 同步调用
- 异步通知
- 监听binlog
# 同步调用

- **优点:**实时性高,实现简单,粗暴
- **缺点:**会阻塞主进程,效率低,耦合度高
# 异步通知
通过MQ实现消息通知
- **优点:**效率高,低耦合,实现难度一般
- **缺点:**实时性会略差一些,依赖MQ的可用性
binlog监听
流程如下:
给mysql开启binlog功能
mysql完成增、删、改操作都会记录在binlog中
hotel-demo基于canal监听binlog变化,实时更新elasticsearch中的内容
**优点:**完全接触服务间耦合
**缺点:**开启binlog增加数据库负担、实现复杂度高,只能在MySQL使用
示例代码
@PostMapping
public void save(@RequestBody Item item) {
itemService.save(item);
rabbitTemplate.convertAndSend(ITEM_EXCHANGE, INSERT_KEY, item.getId());
}
@PutMapping
public void update(@RequestBody Item item) {
if (item.getStatus() == 2) {
itemService.updateById(item);
rabbitTemplate.convertAndSend(ITEM_EXCHANGE, INSERT_KEY, item.getId());
}
}
@DeleteMapping("/{id}")
public void delete(@PathVariable Long id) {
itemService.delete(id);
rabbitTemplate.convertAndSend(ITEM_EXCHANGE, DELETE_KEY, id);
}
@PutMapping("/status/{id}/{status}")
public void changeShelf(@PathVariable("id") Long id, @PathVariable("status") Integer status) {
itemService.changeShelf(id, status);
// 上架操作则添加
if (status == 1) {
rabbitTemplate.convertAndSend(ITEM_EXCHANGE, INSERT_KEY, id);
}
// 下架操作则删除
if (status == 2) {
rabbitTemplate.convertAndSend(ITEM_EXCHANGE, DELETE_KEY, id);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Slf4j
@Component
public class ItemListener {
private final static String ITEM_INDEX = "item";
@Autowired
private ItemClient itemClient;
@Autowired
private RestHighLevelClient restHighLevelClient;
private ObjectMapper objectMapper = new ObjectMapper();
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = ITEM_INSERT_QUEUE),
exchange = @Exchange(name = ITEM_EXCHANGE, type = ExchangeTypes.TOPIC),
key = {INSERT_KEY}
))
public void insertListener(Long id) {
try {
Item item = itemClient.findById(id);
// 如果为空或者为下架商品则不添加
if (item == null || item.getStatus() == 2) {
return;
}
// 转换json
ItemDoc itemDoc = new ItemDoc(item);
String json = objectMapper.writeValueAsString(itemDoc);
// 创建请求对象
IndexRequest request = new IndexRequest(ITEM_INDEX).id(id.toString());
request.source(json, XContentType.JSON);
restHighLevelClient.index(request, RequestOptions.DEFAULT);
log.info("文档写入ES成功");
} catch (IOException e) {
e.printStackTrace();
log.error("文档写入失败" + e.getMessage());
}
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = ITEM_DELETE_QUEUE),
exchange = @Exchange(name = ITEM_EXCHANGE, type = ExchangeTypes.TOPIC),
key = {DELETE_KEY}
))
public void deleteListener(Long id) {
try {
DeleteRequest request = new DeleteRequest(ITEM_INDEX).id(id.toString());
restHighLevelClient.delete(request, RequestOptions.DEFAULT);
log.info("文档删除成功");
} catch (IOException e) {
e.printStackTrace();
log.error("文档删除失败" + e.getMessage());
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# ES集群

**注意:**ES中的分布式是指分布式存储,就是所谓ES的分片存储,上图其实就是一个ES集群,只是将一个索引库的文档数据分片存储在了集群中的各个节点。
- 节点(node) :集群中的一个 Elasticearch 服务实例。在Elasticsearch中,节点的类型主要分为如下三种:
- master eligible节点:有资格参加选举成为Master的节点,默认为true(可以通过node.master: false设置)。
- data节点:保存数据的节点,默认为true(可以通过node.data: false设置)。
- Coordinating 节点:客户端节点。负责接收客户端请求,将请求发送到合适的节点,最终把结果汇集到一起返回,默认为true。
- 集群(cluster):一组拥有相同集群名称的节点,集群名称默认是elasticsearch。
- 索引(index) :es存储数据的地方,相当于关系数据库中的database。
- 分片(shard):索引库可以被拆分为不同的部分进行存储,称为分片。在集群环境下,一个索引库的不同分片可以拆分到放到不同的节点中,分片的好处有如下两点。
- 提高查询性能(多个节点并行查询)
- 提高数据安全性(鸡蛋不要放在一个篮子里)
- 主分片(Primary shard):相对于副本分片的定义。
- 副本分片(Replica shard):即对主分片数据的备份,每个主分片可以有一个或者多个副本,数据和主分片一样,副本的好处有如下两点:
- 数据备份,防止数据丢失
- 一定程度提高查询的并发能力(同一份完整的索引库的数据,分成了两份,都可以查询)

上图的主分片和副本分片的配置可以描述为:3个主分片,一份副本
主分片与副本3:1的比例也是一个比较标准的比例,一般开发中若是需要伸缩集群也是以3的倍数进行伸缩
# ES集群搭建步骤
为了方便搭建ES集群,我们采用docker-compose方式
# 1)创建相应目录
在/root/es-cluster目录下创建以下目录
mkdir -p es01/data
mkdir -p es01/logs
mkdir -p es02/data
mkdir -p es02/logs
mkdir -p es03/data
mkdir -p es03/logs
mkdir -p kibana_config
2
3
4
5
6
7
8
9
10
# 2)创建配置文件(项目配置)
创建docker-compose.yml
version: '3'
services:
es01:
image: elasticsearch:7.4.0
container_name: es01
environment:
- node.name=es01
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es02,es03
- cluster.initial_master_nodes=es01,es02,es03
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- ./es01/data:/usr/share/elasticsearch/data
- ./es01/logs:/usr/share/elasticsearch/logs
- ./elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
ports:
- 9201:9200
networks:
- elastic
es02:
image: elasticsearch:7.4.0
container_name: es02
environment:
- node.name=es02
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es01,es03
- cluster.initial_master_nodes=es01,es02,es03
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- ./es02/data:/usr/share/elasticsearch/data
- ./es02/logs:/usr/share/elasticsearch/logs
- ./elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
ports:
- 9202:9200
networks:
- elastic
es03:
image: elasticsearch:7.4.0
container_name: es03
environment:
- node.name=es03
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es01,es02
- cluster.initial_master_nodes=es01,es02,es03
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- ./es03/data:/usr/share/elasticsearch/data
- ./es03/logs:/usr/share/elasticsearch/logs
- ./elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
ports:
- 9203:9200
networks:
- elastic
kibana01:
image: kibana:7.4.0
container_name: kibana01
links:
- es01
- es02
- es03
ports:
- 5602:5601
volumes:
- ./kibana_config/:/usr/local/kibana/config/
environment:
ELASTICSEARCH_HOSTS: http://es01:9200
networks:
- elastic
networks:
elastic:
driver: bridge
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
创建elasticsearch.yml文件
network.host: 0.0.0.0
http.cors.enabled: true
http.cors.allow-origin: "*"
2
3
4
# 3)修改目录文件权限
sudo chown -R 1000:1000 /root/es-cluster/
# 4)运行docker-compose命令
进入/root/es-cluster目录下,执行以下名称
docker-compose up -d
如果需要停止则输入
docker-compose down
# 5)安装cerebro监控ES集群
docker search cerebro
docker pull yannart/cerebro
docker run -d --name cerebro -p 9000:9000 yannart/cerebro
2
3
4
5
访问:http://192.168.66.133:9000/ (opens new window)
# 6)创建索引库
创建一个test索引库,指定分片数为3,副本数为1
PUT test
{
"settings": {
"number_of_shards": 3
, "number_of_replicas": 1
}
}
2
3
4
5
6
7
# JavaAPI操作ES集群
修改yml配置即可
spring:
elasticsearch:
rest:
uris:
- http://192.168.66.133:9201
- http://192.168.66.133:9202
- http://192.168.66.133:9203
2
3
4
5
6
7
将单节点配置为多节点,其余操作与单节点的使用一致
# ES集群路由原理
文档存入对应的分片,ES计算分片编号的过程,称为路由,具体路由到哪一个节点,是经过一个路由算法实现的,并且路由是主节点进行的。
**路由算法 :**shard_index(分片编号) = hash(文档id) % number_of_primary_shards(主分片个数)
简单说就是在存储文档时,对文档id进行hash计算,最后再模与节点数,以得到路由的节点号数。
存储数据路由过程:
最终计算出2,将文档路由至节点3进行存储,并且在节点2的分片副本中也存储一份
检索数据路由过程:
当要查询 id = 5 的文档,同样也要先进行hash计算,计算分片位置,路由到对应的分片进行数据查询。
# ES脑裂问题
一个正常es集群中只有一个主节点(Master),主节点负责管理整个集群。如创建或删除索引,并决定哪些分片分配给哪些节点。此外还跟踪哪些节点是集群的一部分。
脑裂,指的是一个集群中出现一个以上的主节点,导致集群分裂,使得集群处于异常状态,简单说就是一个集群应该只能有一个老大,当出现了两个老大后就不知道该听谁的,于是出现了异常。
以下就是一个脑裂的情况,这八个节点都处于一个集群中。
**后果:**可能会出现脏数据,比如存在多个重复的数据,只不过脑裂的情况比较少,发生后需要运维人员手动关闭多余的主节点,并且校对数据删除脏数据。
脑裂原因
- 网络原因:网络延迟
一般es集群会在内网部署,也可能在外网部署,比如阿里云。
内网一般不会出现此问题,外网的网络出现问题的可能性大些。 - 节点负载
主节点的角色既为master又为data。数据访问量较大时,可能会导致Master节点停止响应(假死状态)。 - JVM内存回收
当Master节点设置的JVM内存较小时,引发JVM的大规模内存回收,造成ES进程失去响应
避免脑裂:
- 网络原因:discovery.zen.ping.timeout 超时时间配置大一点。默认是3S
- 节点负载:角色分离策略
- 主节点配置:
node.master: true # 是否有资格参加选举成为master
node.data: false # 是否存储数据
2
- 数据节点配置:
node.master: false # 是否有资格参加选举成为master
node.data: true # 是否存储数据
2
- JVM内存回收:修改 config/jvm.options 文件的 -Xms 和 -Xmx 为服务器的物理内存一半。
- 还可以在选举层面解决脑裂问题(即不让第二个老大产生):
# 声明获4得大于几票,主节点才有效,请设置为(master eligble nodes / 2) + 1
discovery.zen.minimum_master_nodes: 5
2

- 比如上面存在8个节点(假如都是master eligble节点),那需要设置discovery.zen.minimum_master_nodes: 5,代表至少5票投某个节点,才有效。如果某个时刻两个机房网络中断了,右边的机房里四个节点揭竿而起从新选举,也不够票数。
# ES集群故障迁移
集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其它节点,确保数据安全,这个叫做故障转移。
1)例如一个集群结构如图:
现在,node1是主节点,其它两个节点是从节点。
2)突然,node1发生了故障
宕机后的第一件事,需要重新选主,例如选中了node2:
node2成为主节点后,会检测集群监控状态,发现:shard-1、shard-0没有副本节点。因此需要将node1上的数据迁移到node2、node3: