通过Elasticsearch+RabbitMQ+Canal实现对首页的课程的多条件过滤和数据同步

首先我们先看看Elastic的官网:/

Elastic有一条完整的产品线:Elasticsearch、Kibana、Logstash等,前三个简称为ELK技术栈。

Elasticsearch:

 

它有以下的特点:

①分布式,不需要人工去搭建集群

②Restful风格,比较容易上手

③搜索时,数据更新在Elasticsearch中几乎是完全同步的

 倒排索引:

 与传统的按照索引去查询数据不同,它是先查询键,再根据键去查询值。

Lucene:

它是一个库,用来创建倒排索引的,Elasticsearch是基于它的开源实时分布式搜索和分析引擎。

这个引擎安装起来比较复杂,就不细说了,接下来直接进入正题,在项目中具体的使用。

首先我们新建一个搜索服务:

因为是微服务,所以我的主项目的依赖是:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns=".0.0" xmlns:xsi=""xsi:schemaLocation=".0.0 .0.0.xsd"><modelVersion>4.0.0</modelVersion><packaging>pom</packaging><modules><module>common_api</module></modules><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.4.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.blb</groupId><artifactId>wisdom_education</artifactId><version>0.0.1-SNAPSHOT</version><name>wisdom_education</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version><spring.cloud-version>Hoxton.SR8</spring.cloud-version><spring-cloud-alibaba.version>2.2.5.RELEASE</spring-cloud-alibaba.version></properties><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring.cloud-version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>${spring-cloud-alibaba.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

想要在搜索服务中添加这个引擎,得先导入它依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns=".0.0" xmlns:xsi=""xsi:schemaLocation=".0.0 .0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.blb</groupId><artifactId>wisdom_education</artifactId><version>0.0.1-SNAPSHOT</version></parent><groupId>com.blb</groupId><artifactId>wisdom_education_elasticsearch</artifactId><version>0.0.1-SNAPSHOT</version><name>wisdom_education_elasticsearch</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.blb</groupId><artifactId>common_api</artifactId><version>0.0.1-SNAPSHOT</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

这是我整个服务所需要的所有依赖,可以根据需求添加;

接着在配置文件中配置uri:

先把配置文件修改一下把后缀名改成yaml结尾。

spring:application:name: wisdom-education-elasticsearchcloud:nacos:server-addr: localhost:8848elasticsearch:rest:uris: 192.168.64.200:9200rabbitmq:host: 192.168.64.200port: 5672virtual-host: myhostusername: guestpassword: guestlistener:simple:acknowledge-mode: manual #消费者手动确认redis:host: 192.168.64.200port: 6379
server:port: 8801

这里的IP根据系统来,安装的是windows版本的话,IP就是本机IP,Linux系统安装的话,IP就是虚拟机的IP。

然后是我的Nacos上面的配置:

spring:application:name: wisdom-education-elasticsearchcloud:nacos:server-addr: localhost:8848elasticsearch:rest:uris: 192.168.64.200:9200
server:port: 8801

上面的两个配置文件里的配置包含了其它的配置比如Nacos注册配置中心,不需要可以删掉在本地配置也行(比如redis),推荐尽量配置Nacos中。RabbitMQ是必要的,后面将数据同步需要配置,所以提前配置也行。接着我们将需要进行过滤的数据同步到Elasticsearch服务中:

通过SpringCloud的组件feign调用课程服务的接口;

这人是课程数据服务的controller的接口

接着我们在搜索服务中调用它:

值得注意的是,调用的路径不能出错,以及@FeignClient("wisdom-education-course") 括号里的服务名称要和课程服务里的服务名一致,不然调用会失败。

 接着我们调用初始化键的方法,去创建索引。

首先写DAO层:

这是导的包,有些包导错的话,会出问题,不确定可以对着导:

import com.alibaba.fastjson.JSON;
import com.blbmon.entity.PageEntity;
import com.blb.wisdom_education_elasticsearch.entity.ElasticEntity;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearchmon.text.Text;
import org.elasticsearchmon.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import org.springframework.util.StringUtils;import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

其中在Entity中自定义了一个实体类:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class ElasticEntity {private String id;private Object data;
}

然后就是DAO层里的接口了,其中包含了索引键的CRUD和课程的排序和高亮显示:

@Slf4j
@Repository
public class ElasticsearchDAO {@Autowiredprivate RestHighLevelClient client;/*** 判断索引存在* @param indexName* @return* @throws IOException*/public boolean existIndex(String indexName) throws IOException {//创建索引查询的请求GetIndexRequest request = new GetIndexRequest(indexName);//判断索引是否存在boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);log.info("{}是否存在{}",indexName,exists);return exists;}/*** 删除索引* @param indexName* @throws IOException*/public void deleteIndex(String indexName) throws IOException {//删除索引请求DeleteIndexRequest request = new DeleteIndexRequest(indexName);//发送删除请求AcknowledgedResponse delete = client.indices().delete(request, RequestOptions.DEFAULT);log.info("{}删除成功",indexName);}/*** 创建索引* @param indexName* @throws IOException*/public void createIndex(String indexName) throws IOException {//创建索引请求CreateIndexRequest request = new CreateIndexRequest(indexName);//发送创建索引请求CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);log.info("{}创建成功");}/*** 批量插入* @param indexName* @param list* @throws IOException*/public void insertBatch(String indexName, List<ElasticEntity> list) throws IOException {//创建批量操作的请求BulkRequest request = new BulkRequest(indexName);//请求加入每个插入数据list.forEach(entity -> {//每个索引请求,设置id和数据request.add(new IndexRequest().id(entity.getId()).source(JSON.toJSONString(entity.getData()),XContentType.JSON));});//执行批量操作client.bulk(request,RequestOptions.DEFAULT);log.info("批量插入完成");}/*** 按多个条件搜索内容* @param indexName 索引名* @param from 开始位置* @param size 分页长度* @param map 条件集合* @param sort 排序集合* @param clazz 搜索的类型* @param <T>* @return*/public <T> PageEntity<T> searchPageByMap(String indexName,int from,int size,Map<String,String> map,Map<String,String> sort,Class<T> clazz,String... highlightFields) throws IOException {//创建搜索请求SearchRequest request = new SearchRequest(indexName);//请求生成器SearchSourceBuilder builder = new SearchSourceBuilder();//创建bool组合查询BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();//如果搜索条件不为空,就设置搜索条件if(map != null && map.size() > 0){for(String key : map.keySet()){String value = map.get(key);if(!StringUtils.isEmpty(value)){//设置过滤条件boolQuery.filter(QueryBuilders.matchPhraseQuery(key,value));}}}if(boolQuery.filter().size() > 0){//如果设置了过滤条件,就按过滤搜索builder.query(boolQuery);}//设置分页参数builder.from(from);builder.size(size);//设置排序 field typeif(sort != null && sort.size() > 0){builder.sort(sort.get("field"), SortOrder.fromString(sort.get("type")));}//创建高亮生成器HighlightBuilder highlightBuilder = new HighlightBuilder();//设置高亮字段for(String field : highlightFields){highlightBuilder.field(field);}//设置前后标签highlightBuilder.preTags("<span style='color:red'>").postTags("</span>");builder.highlighter(highlightBuilder);//执行搜索获得结果request.source(builder);SearchResponse response = client.search(request, RequestOptions.DEFAULT);SearchHit[] hits = response.getHits().getHits();List<T> data = new ArrayList<>();//将JSON格式的数据转换为Java对象for(SearchHit hit : hits){T res = JSON.parseObject(hit.getSourceAsString(),clazz);data.add(res);//获得所有高亮的字段Map<String, HighlightField> hFields = hit.getHighlightFields();for(String hField : hFields.keySet()){//获得反射字段try {Field declaredField = clazz.getDeclaredField(hField);//将字段的值替换为带高亮效果的属性值declaredField.setAccessible(true);Text[] fragments = hFields.get(hField).fragments();if(declaredField != null && fragments.length > 0){declaredField.set(res,fragments[0].string());}} catch (Exception e) {e.printStackTrace();}}}//返回分页数据return new PageEntity<>(from,response.getHits().getTotalHits().value,size,data);}/*** 添加或更新数据*/public void saveOrUpdate(String indexName,ElasticEntity entity) throws IOException {IndexRequest request = new IndexRequest(indexName);request.id(entity.getId()).source(JSON.toJSONString(entity.getData()), XContentType.JSON);IndexResponse response = client.index(request, RequestOptions.DEFAULT);log.info("{}添加或更新数据成功 {}",indexName, JSON.toJSONString(response));}/*** 通过条件删除对象*/public void deleteByQuery(String indexName, QueryBuilder builder) throws IOException {DeleteByQueryRequest request = new DeleteByQueryRequest(indexName);request.setQuery(builder);BulkByScrollResponse response = client.deleteByQuery(request, RequestOptions.DEFAULT);log.info("{}批查询删除数据成功 {}",indexName, JSON.toJSONString(response));}
}

在搜索服务的Service层写一个接口:

    /*** 初始化课程索引*/void initCourseIndex();

接着去写它的实现类实现这个接口:

@Overridepublic void initCourseIndex() {//查询课程索引是否存在try {boolean exist = elasticsearchDAO.existIndex(INDEX_NAME);if(exist){//删除原有索引elasticsearchDAO.deleteIndex(INDEX_NAME);}//创建索引elasticsearchDAO.createIndex(INDEX_NAME);//获得课程服务的所有课程信息List<Course> courses = courseFeignClient.getAllCourses();List<ElasticEntity> entities = new ArrayList<>();courses.forEach(course -> {entities.add(new ElasticEntity(String.valueOf(course.getId()),course));log.info("course:{}",course);});//批量添加到ES中elasticsearchDAO.insertBatch(INDEX_NAME,entities);log.info("courses:{},entities:{}",courses.size(),entities.size());} catch (IOException e) {e.printStackTrace();}}

我们通过这个接口去对键做一个初始化操作,在Test测试中进行。

如下则表示初始化成功了,还有别忘了,Elasticsearch要保持运行,不然会报连接超时:

 接着我们去搜索服务通过Feign调用这个接口,获取数据:这也是上面提到的Feign调用

结果如下则说明数据导入成功了。

 接着有了数据我们就可以去写Service层和Controller层的接口了;

Service:

public interface ICourseService {/*** 初始化课程索引*/void initCourseIndex();/*** 分页搜索课程* @param args* @return*/PageEntity<Course> searchCoursePage(Map<String,String> args);/*** 添加或更新课程* @param course*/void saveOrUpdate(Course course);/*** 按id删除课程* @param id*/void removeById(String id);
}

实现类:


import com.alibaba.fastjson.JSON;
import com.blbmon.entity.Course;
import com.blbmon.entity.PageEntity;
import com.blb.wisdom_education_elasticsearch.dao.ElasticsearchDAO;
import com.blb.wisdom_education_elasticsearch.entity.ElasticEntity;
import com.blb.wisdom_education_elasticsearch.feign.CourseFeign;
import com.blb.wisdom_education_elasticsearch.service.ICourseService;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.index.query.QueryBuilders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;@Slf4j
@Service
public class CourseServiceImpl implements ICourseService {public static final String INDEX_NAME = "course";@Autowiredprivate CourseFeign courseFeignClient;@Autowiredprivate ElasticsearchDAO elasticsearchDAO;@Overridepublic void initCourseIndex() {//查询课程索引是否存在try {boolean exist = elasticsearchDAO.existIndex(INDEX_NAME);if(exist){//删除原有索引elasticsearchDAO.deleteIndex(INDEX_NAME);}//创建索引elasticsearchDAO.createIndex(INDEX_NAME);//获得课程服务的所有课程信息List<Course> courses = courseFeignClient.getAllCourses();List<ElasticEntity> entities = new ArrayList<>();courses.forEach(course -> {entities.add(new ElasticEntity(String.valueOf(course.getId()),course));log.info("course:{}",course);});//批量添加到ES中elasticsearchDAO.insertBatch(INDEX_NAME,entities);log.info("courses:{},entities:{}",courses.size(),entities.size());} catch (IOException e) {e.printStackTrace();}}@Overridepublic PageEntity<Course> searchCoursePage(Map<String, String> args) {int current = 1;int size = 1;Map<String,String> search = null;Map<String,String> sort = null;if(args.containsKey("current")){current = Integer.valueOf(args.get("current"));}if(args.containsKey("size")){size = Integer.valueOf(args.get("size"));}if(args.containsKey("search")){search = JSON.parseObject(args.get("search"),Map.class);}if(args.containsKey("sort")){sort = JSON.parseObject(args.get("sort"),Map.class);}try {return elasticsearchDAO.searchPageByMap("course",(current - 1) * size,size,search,sort,Course.class,"courseName","brief");} catch (IOException e) {log.error("出现异常",e);throw new RuntimeException(e);}}/*** 添加或更新课程* @param course*/public void saveOrUpdate(Course course){try {elasticsearchDAO.saveOrUpdate(INDEX_NAME,new ElasticEntity(String.valueOf(course.getId()),course));} catch (IOException e) {log.error("出现异常",e);throw new RuntimeException(e);}}/*** 按id删除课程* @param id*/public void removeById(String id){try {elasticsearchDAO.deleteByQuery(INDEX_NAME, QueryBuilders.termQuery("id",id));} catch (IOException e) {log.error("出现异常",e);throw new RuntimeException(e);}}
}

然后是Controller层:

import com.blbmon.entity.Course;
import com.blbmon.entity.PageEntity;
import com.blb.wisdom_education_elasticsearch.service.ICourseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Map;@RestController
@Slf4j
public class CourseController {@Autowiredprivate ICourseService service;@RequestMapping("/search-courses")public ResponseEntity<PageEntity<Course>> searchCoursePage(@RequestBody Map<String, String> map) {PageEntity<Course> page = service.searchCoursePage(map);log.info("map:{}" + map);log.info("page:{}" + page);return ResponseEntity.ok(page);}
}

到此,我们的搜索服务久差不多完成了。不考虑数据同步这一块的话,已经具备了按多条件的查询、排序,以及输入文字的高亮显示:

 那么问题来了,我们这个是一步一步的将数据挪过来的,当数据库的数据发生了变化,我们怎么去对这个数据进行一个同步的跟新呢?

接下来就要介绍的工具:通过RabbitMQ消息队列+Canal同步工具实现对数据库的数据的一个实时监控,这个工具的安装久不细说了,有很多教程。我们直接往下去实现好了。

首先我们需要新加一个数据同步服务:继承父项目,下面是依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns=".0.0" xmlns:xsi=""xsi:schemaLocation=".0.0 .0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.blb</groupId><artifactId>wisdom_education</artifactId><version>0.0.1-SNAPSHOT</version></parent><groupId>com.blb</groupId><artifactId>wisdom_education_datasync</artifactId><version>0.0.1-SNAPSHOT</version><name>wisdom_education_datasync</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.xpand</groupId><artifactId>starter-canal</artifactId><version>0.0.1-SNAPSHOT</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.29</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.0.1</version></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId></dependency><dependency><groupId>com.blb</groupId><artifactId>common_api</artifactId><version>0.0.1-SNAPSHOT</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

然后就是RabbitMQ的一些配置了。

在登录上RabbitMQ后,新建一个admin 账户:

创建两个交换机:

 名字尽量取好点,点添加就好:

 好需要创建两个队列:

还有Canal的安装直接用window是的比较方便,要是启动报错记得去,可以去一个配置文件中处理一行配置,具体是什么忘了,希望别碰到最好,哈哈。

我们得先看看在数据库中的binlog配置

SHOW VARIABLES LIKE '%log_bin%'

ON表示开启了,OFF的话需要去设置它开启:

  windows配置文件是MySQL安装目录的my.ini

   linux在/etc/myf 

   修改:

   [mysqld]log-bin=mysql-binbinlog-format=ROWserver_id=1

进入mysql,创建canal用户并授权:

  create user canal@'%'IDENTIFIED WITH mysql_native_password BY 'canal';GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';FLUSH PRIVILEGES;

还需要添加配置:

进入canal目录,修改配置文件:

我们先在数据库中输入命令:

show master status

接着去修改canal配置: 

vim conf/example/instance.peoperties

windows的话直接去目录找:

然后启动canal:

它主要的问题有:

      1. 异常信息 authentication error,数据库账号和密码配置错误
      2. 异常信息 can't find position,检查配置的文件名和位置,再删除conf/example/meta.dat 重启
      3. 客户端版本兼容问题,canal的版本和客户端的版本要一致
 

然后去服务中配置它们:这是我的数据同步服务的配置文件的配置:

canal.client.instances.example.host=127.0.0.1
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000spring.application.name=wisdom-education-datasync
# 注册nacos
spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848
# nacos配置中心地址
spring.cloud.nacos.config.server-addr=127.0.0.1:8848
# 配置文件的前缀
spring.cloud.nacos.config.prefix=wisdom-education-datasync
# 后缀
spring.cloud.nacos.config.file-extension=properties
# profile
spring.profiles.active=dev
#开启全部端点
management.endpoints.web.exposure.include=*spring.rabbitmq.host=192.168.64.200
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=myhost

 Nacos部分:

server.port=8812spring.application.name=wisdom-education-datasyncspring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/wisdom_education_advertisement?serverTimeZone=UTC&useUnicode=true&characterEncoding=UTF-8
spring.datasource.username=root
spring.datasource.password=rootmybatis-plus.mapper-locations=classpath:mapper/*.xml
mybatis-plus.configuration.map-underscore-to-camel-case=true
mybatis-plus.type-aliases-package=com.blbmon_api.entityspring.redis.host=192.168.64.200
spring.redis.port=6379
spring.redis.database=0
spring.redis.jedis.pool.max-active=100
spring.redis.jedis.pool.max-wait=100ms
spring.redis.jedis.pool.max-idle=100
spring.redis.jedis.pool.min-idle=10

我们得写一个接口,通过广告课程数据的id去查询所有课程的接口:

这里就不细说了,很简单的一个接口:同样的用feign去调用它

接下来是RabbitMQ的配置类;

package com.blb.wisdom_education_datasync.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {public static final String QUEUE_SAVE = "edu.course.save.queue";public static final String QUEUE_REMOVE = "edu.course.remove.queue";public static final String EXCHANGE_COURSE = "edu.course.exchange";public static final String KEY_SAVE = "edu.course.save.key";public static final String KEY_REMOVE = "edu.course.remove.key";//生成队列@Beanpublic Queue queueSave(){return new Queue(QUEUE_SAVE);}@Beanpublic Queue queueRemove(){return new Queue(QUEUE_REMOVE);}//生成主题模式交换机@Beanpublic TopicExchange exchange(){return new TopicExchange(EXCHANGE_COURSE);}//生成绑定@Beanpublic Binding bindingSave(){return BindingBuilder.bind(queueSave()).to(exchange()).with(KEY_SAVE);}@Beanpublic Binding bindingRemove(){return BindingBuilder.bind(queueRemove()).to(exchange()).with(KEY_REMOVE);}
}

 我们要实现数据的同步的话,整个流程大致是这个样的;

 所以我们在搜索服务中也需要加一个listener监听器:

package com.blb.wisdom_education_elasticsearch.listener;import com.alibaba.fastjson.JSON;
import com.blbmon.entity.Course;
import com.blb.wisdom_education_elasticsearch.service.ICourseService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** 课程消息监听器*/
@Slf4j
@Component
public class CourseMsgListener {public static final String QUEUE_SAVE = "edu.course.save.queue";public static final String QUEUE_REMOVE = "edu.course.remove.queue";public static final String EXCHANGE_COURSE = "edu.course.exchange";public static final String KEY_SAVE = "edu.course.save.key";public static final String KEY_REMOVE = "edu.course.remove.key";@Autowiredprivate ICourseService courseService;//监听添加或更新操作的队列@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE_SAVE, durable = "true"),exchange = @Exchange(value = EXCHANGE_COURSE, ignoreDeclarationExceptions = "true"),key = KEY_SAVE))public void receiveSaveMsg(String json, Message message, Channel channel) {//收到同步服务发送的课程对象Course course = JSON.parseObject(json, Course.class);courseService.saveOrUpdate(course);log.info("接收课程数据:{}", course);}//监听删除操作的队列@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE_REMOVE, durable = "true"),exchange = @Exchange(value = EXCHANGE_COURSE, ignoreDeclarationExceptions = "true"),key = KEY_REMOVE))public void receiveRemoveMsg(String id, Message message, Channel channel) {courseService.removeById(id);log.info("接收删除id:{}", id);}
}

然后是数据同步服务的监听器:

package com.blb.wisdom_education_datasync.listener;import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.blbmon.entity.Course;
import com.blb.wisdom_education_datasync.config.RabbitMQConfig;
import com.blb.wisdom_education_datasync.feign.CourseFeignClient;
import com.xpand.starter.canal.annotation.CanalEventListener;
import com.xpand.starter.canal.annotation.ListenPoint;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;import java.util.List;/*** 监听课程表的监听器*/
@CanalEventListener
public class CourseCanalListener {@Autowiredprivate CourseFeignClient courseFeignClient;@Autowiredprivate RabbitTemplate rabbitTemplate;@ListenPoint(schema = "wisdom_education_course",table = "course")public void courseChange(CanalEntry.EventType eventType, CanalEntry.RowData rowData){System.out.println("1111");//获得操作的类型String type = eventType.toString();if("INSERT".equals(type) || "UPDATE".equals(type)){List<CanalEntry.Column> list = rowData.getAfterColumnsList();//获得课程idString id = null;for(CanalEntry.Column column : list){if("id".equals(column.getName())){id = column.getValue();break;}}//调用课程服务查询课程信息Course course = courseFeignClient.getCourseById(Long.valueOf(id));//发送消息给搜索服务System.out.println(type+":"+course);rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_COURSE,RabbitMQConfig.KEY_SAVE, JSON.toJSONString(course));}else if("DELETE".equals(type)){List<CanalEntry.Column> list = rowData.getBeforeColumnsList();//获得课程idString id = null;for(CanalEntry.Column column : list){if("id".equals(column.getName())){id = column.getValue();break;}}//发送消息给搜索服务rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_COURSE,RabbitMQConfig.KEY_REMOVE, id);}}
}

至此加上前端的代码就可以完成一个Elasticsearch+RabbitMQ+Canal的一个数据过滤和数据同步的效果了。运行有bug的话,希望自己加油d,RabbiotMQ消息保证问题久不说了添加一些小配置即可。

完结,谢谢。

都看到这儿了不给个赞或者订阅嘛? 

通过Elasticsearch+RabbitMQ+Canal实现对首页的课程的多条件过滤和数据同步

首先我们先看看Elastic的官网:/

Elastic有一条完整的产品线:Elasticsearch、Kibana、Logstash等,前三个简称为ELK技术栈。

Elasticsearch:

 

它有以下的特点:

①分布式,不需要人工去搭建集群

②Restful风格,比较容易上手

③搜索时,数据更新在Elasticsearch中几乎是完全同步的

 倒排索引:

 与传统的按照索引去查询数据不同,它是先查询键,再根据键去查询值。

Lucene:

它是一个库,用来创建倒排索引的,Elasticsearch是基于它的开源实时分布式搜索和分析引擎。

这个引擎安装起来比较复杂,就不细说了,接下来直接进入正题,在项目中具体的使用。

首先我们新建一个搜索服务:

因为是微服务,所以我的主项目的依赖是:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns=".0.0" xmlns:xsi=""xsi:schemaLocation=".0.0 .0.0.xsd"><modelVersion>4.0.0</modelVersion><packaging>pom</packaging><modules><module>common_api</module></modules><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.4.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.blb</groupId><artifactId>wisdom_education</artifactId><version>0.0.1-SNAPSHOT</version><name>wisdom_education</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version><spring.cloud-version>Hoxton.SR8</spring.cloud-version><spring-cloud-alibaba.version>2.2.5.RELEASE</spring-cloud-alibaba.version></properties><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring.cloud-version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>${spring-cloud-alibaba.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

想要在搜索服务中添加这个引擎,得先导入它依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns=".0.0" xmlns:xsi=""xsi:schemaLocation=".0.0 .0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.blb</groupId><artifactId>wisdom_education</artifactId><version>0.0.1-SNAPSHOT</version></parent><groupId>com.blb</groupId><artifactId>wisdom_education_elasticsearch</artifactId><version>0.0.1-SNAPSHOT</version><name>wisdom_education_elasticsearch</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.blb</groupId><artifactId>common_api</artifactId><version>0.0.1-SNAPSHOT</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

这是我整个服务所需要的所有依赖,可以根据需求添加;

接着在配置文件中配置uri:

先把配置文件修改一下把后缀名改成yaml结尾。

spring:application:name: wisdom-education-elasticsearchcloud:nacos:server-addr: localhost:8848elasticsearch:rest:uris: 192.168.64.200:9200rabbitmq:host: 192.168.64.200port: 5672virtual-host: myhostusername: guestpassword: guestlistener:simple:acknowledge-mode: manual #消费者手动确认redis:host: 192.168.64.200port: 6379
server:port: 8801

这里的IP根据系统来,安装的是windows版本的话,IP就是本机IP,Linux系统安装的话,IP就是虚拟机的IP。

然后是我的Nacos上面的配置:

spring:application:name: wisdom-education-elasticsearchcloud:nacos:server-addr: localhost:8848elasticsearch:rest:uris: 192.168.64.200:9200
server:port: 8801

上面的两个配置文件里的配置包含了其它的配置比如Nacos注册配置中心,不需要可以删掉在本地配置也行(比如redis),推荐尽量配置Nacos中。RabbitMQ是必要的,后面将数据同步需要配置,所以提前配置也行。接着我们将需要进行过滤的数据同步到Elasticsearch服务中:

通过SpringCloud的组件feign调用课程服务的接口;

这人是课程数据服务的controller的接口

接着我们在搜索服务中调用它:

值得注意的是,调用的路径不能出错,以及@FeignClient("wisdom-education-course") 括号里的服务名称要和课程服务里的服务名一致,不然调用会失败。

 接着我们调用初始化键的方法,去创建索引。

首先写DAO层:

这是导的包,有些包导错的话,会出问题,不确定可以对着导:

import com.alibaba.fastjson.JSON;
import com.blbmon.entity.PageEntity;
import com.blb.wisdom_education_elasticsearch.entity.ElasticEntity;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearchmon.text.Text;
import org.elasticsearchmon.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import org.springframework.util.StringUtils;import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

其中在Entity中自定义了一个实体类:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class ElasticEntity {private String id;private Object data;
}

然后就是DAO层里的接口了,其中包含了索引键的CRUD和课程的排序和高亮显示:

@Slf4j
@Repository
public class ElasticsearchDAO {@Autowiredprivate RestHighLevelClient client;/*** 判断索引存在* @param indexName* @return* @throws IOException*/public boolean existIndex(String indexName) throws IOException {//创建索引查询的请求GetIndexRequest request = new GetIndexRequest(indexName);//判断索引是否存在boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);log.info("{}是否存在{}",indexName,exists);return exists;}/*** 删除索引* @param indexName* @throws IOException*/public void deleteIndex(String indexName) throws IOException {//删除索引请求DeleteIndexRequest request = new DeleteIndexRequest(indexName);//发送删除请求AcknowledgedResponse delete = client.indices().delete(request, RequestOptions.DEFAULT);log.info("{}删除成功",indexName);}/*** 创建索引* @param indexName* @throws IOException*/public void createIndex(String indexName) throws IOException {//创建索引请求CreateIndexRequest request = new CreateIndexRequest(indexName);//发送创建索引请求CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);log.info("{}创建成功");}/*** 批量插入* @param indexName* @param list* @throws IOException*/public void insertBatch(String indexName, List<ElasticEntity> list) throws IOException {//创建批量操作的请求BulkRequest request = new BulkRequest(indexName);//请求加入每个插入数据list.forEach(entity -> {//每个索引请求,设置id和数据request.add(new IndexRequest().id(entity.getId()).source(JSON.toJSONString(entity.getData()),XContentType.JSON));});//执行批量操作client.bulk(request,RequestOptions.DEFAULT);log.info("批量插入完成");}/*** 按多个条件搜索内容* @param indexName 索引名* @param from 开始位置* @param size 分页长度* @param map 条件集合* @param sort 排序集合* @param clazz 搜索的类型* @param <T>* @return*/public <T> PageEntity<T> searchPageByMap(String indexName,int from,int size,Map<String,String> map,Map<String,String> sort,Class<T> clazz,String... highlightFields) throws IOException {//创建搜索请求SearchRequest request = new SearchRequest(indexName);//请求生成器SearchSourceBuilder builder = new SearchSourceBuilder();//创建bool组合查询BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();//如果搜索条件不为空,就设置搜索条件if(map != null && map.size() > 0){for(String key : map.keySet()){String value = map.get(key);if(!StringUtils.isEmpty(value)){//设置过滤条件boolQuery.filter(QueryBuilders.matchPhraseQuery(key,value));}}}if(boolQuery.filter().size() > 0){//如果设置了过滤条件,就按过滤搜索builder.query(boolQuery);}//设置分页参数builder.from(from);builder.size(size);//设置排序 field typeif(sort != null && sort.size() > 0){builder.sort(sort.get("field"), SortOrder.fromString(sort.get("type")));}//创建高亮生成器HighlightBuilder highlightBuilder = new HighlightBuilder();//设置高亮字段for(String field : highlightFields){highlightBuilder.field(field);}//设置前后标签highlightBuilder.preTags("<span style='color:red'>").postTags("</span>");builder.highlighter(highlightBuilder);//执行搜索获得结果request.source(builder);SearchResponse response = client.search(request, RequestOptions.DEFAULT);SearchHit[] hits = response.getHits().getHits();List<T> data = new ArrayList<>();//将JSON格式的数据转换为Java对象for(SearchHit hit : hits){T res = JSON.parseObject(hit.getSourceAsString(),clazz);data.add(res);//获得所有高亮的字段Map<String, HighlightField> hFields = hit.getHighlightFields();for(String hField : hFields.keySet()){//获得反射字段try {Field declaredField = clazz.getDeclaredField(hField);//将字段的值替换为带高亮效果的属性值declaredField.setAccessible(true);Text[] fragments = hFields.get(hField).fragments();if(declaredField != null && fragments.length > 0){declaredField.set(res,fragments[0].string());}} catch (Exception e) {e.printStackTrace();}}}//返回分页数据return new PageEntity<>(from,response.getHits().getTotalHits().value,size,data);}/*** 添加或更新数据*/public void saveOrUpdate(String indexName,ElasticEntity entity) throws IOException {IndexRequest request = new IndexRequest(indexName);request.id(entity.getId()).source(JSON.toJSONString(entity.getData()), XContentType.JSON);IndexResponse response = client.index(request, RequestOptions.DEFAULT);log.info("{}添加或更新数据成功 {}",indexName, JSON.toJSONString(response));}/*** 通过条件删除对象*/public void deleteByQuery(String indexName, QueryBuilder builder) throws IOException {DeleteByQueryRequest request = new DeleteByQueryRequest(indexName);request.setQuery(builder);BulkByScrollResponse response = client.deleteByQuery(request, RequestOptions.DEFAULT);log.info("{}批查询删除数据成功 {}",indexName, JSON.toJSONString(response));}
}

在搜索服务的Service层写一个接口:

    /*** 初始化课程索引*/void initCourseIndex();

接着去写它的实现类实现这个接口:

@Overridepublic void initCourseIndex() {//查询课程索引是否存在try {boolean exist = elasticsearchDAO.existIndex(INDEX_NAME);if(exist){//删除原有索引elasticsearchDAO.deleteIndex(INDEX_NAME);}//创建索引elasticsearchDAO.createIndex(INDEX_NAME);//获得课程服务的所有课程信息List<Course> courses = courseFeignClient.getAllCourses();List<ElasticEntity> entities = new ArrayList<>();courses.forEach(course -> {entities.add(new ElasticEntity(String.valueOf(course.getId()),course));log.info("course:{}",course);});//批量添加到ES中elasticsearchDAO.insertBatch(INDEX_NAME,entities);log.info("courses:{},entities:{}",courses.size(),entities.size());} catch (IOException e) {e.printStackTrace();}}

我们通过这个接口去对键做一个初始化操作,在Test测试中进行。

如下则表示初始化成功了,还有别忘了,Elasticsearch要保持运行,不然会报连接超时:

 接着我们去搜索服务通过Feign调用这个接口,获取数据:这也是上面提到的Feign调用

结果如下则说明数据导入成功了。

 接着有了数据我们就可以去写Service层和Controller层的接口了;

Service:

public interface ICourseService {/*** 初始化课程索引*/void initCourseIndex();/*** 分页搜索课程* @param args* @return*/PageEntity<Course> searchCoursePage(Map<String,String> args);/*** 添加或更新课程* @param course*/void saveOrUpdate(Course course);/*** 按id删除课程* @param id*/void removeById(String id);
}

实现类:


import com.alibaba.fastjson.JSON;
import com.blbmon.entity.Course;
import com.blbmon.entity.PageEntity;
import com.blb.wisdom_education_elasticsearch.dao.ElasticsearchDAO;
import com.blb.wisdom_education_elasticsearch.entity.ElasticEntity;
import com.blb.wisdom_education_elasticsearch.feign.CourseFeign;
import com.blb.wisdom_education_elasticsearch.service.ICourseService;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.index.query.QueryBuilders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;@Slf4j
@Service
public class CourseServiceImpl implements ICourseService {public static final String INDEX_NAME = "course";@Autowiredprivate CourseFeign courseFeignClient;@Autowiredprivate ElasticsearchDAO elasticsearchDAO;@Overridepublic void initCourseIndex() {//查询课程索引是否存在try {boolean exist = elasticsearchDAO.existIndex(INDEX_NAME);if(exist){//删除原有索引elasticsearchDAO.deleteIndex(INDEX_NAME);}//创建索引elasticsearchDAO.createIndex(INDEX_NAME);//获得课程服务的所有课程信息List<Course> courses = courseFeignClient.getAllCourses();List<ElasticEntity> entities = new ArrayList<>();courses.forEach(course -> {entities.add(new ElasticEntity(String.valueOf(course.getId()),course));log.info("course:{}",course);});//批量添加到ES中elasticsearchDAO.insertBatch(INDEX_NAME,entities);log.info("courses:{},entities:{}",courses.size(),entities.size());} catch (IOException e) {e.printStackTrace();}}@Overridepublic PageEntity<Course> searchCoursePage(Map<String, String> args) {int current = 1;int size = 1;Map<String,String> search = null;Map<String,String> sort = null;if(args.containsKey("current")){current = Integer.valueOf(args.get("current"));}if(args.containsKey("size")){size = Integer.valueOf(args.get("size"));}if(args.containsKey("search")){search = JSON.parseObject(args.get("search"),Map.class);}if(args.containsKey("sort")){sort = JSON.parseObject(args.get("sort"),Map.class);}try {return elasticsearchDAO.searchPageByMap("course",(current - 1) * size,size,search,sort,Course.class,"courseName","brief");} catch (IOException e) {log.error("出现异常",e);throw new RuntimeException(e);}}/*** 添加或更新课程* @param course*/public void saveOrUpdate(Course course){try {elasticsearchDAO.saveOrUpdate(INDEX_NAME,new ElasticEntity(String.valueOf(course.getId()),course));} catch (IOException e) {log.error("出现异常",e);throw new RuntimeException(e);}}/*** 按id删除课程* @param id*/public void removeById(String id){try {elasticsearchDAO.deleteByQuery(INDEX_NAME, QueryBuilders.termQuery("id",id));} catch (IOException e) {log.error("出现异常",e);throw new RuntimeException(e);}}
}

然后是Controller层:

import com.blbmon.entity.Course;
import com.blbmon.entity.PageEntity;
import com.blb.wisdom_education_elasticsearch.service.ICourseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Map;@RestController
@Slf4j
public class CourseController {@Autowiredprivate ICourseService service;@RequestMapping("/search-courses")public ResponseEntity<PageEntity<Course>> searchCoursePage(@RequestBody Map<String, String> map) {PageEntity<Course> page = service.searchCoursePage(map);log.info("map:{}" + map);log.info("page:{}" + page);return ResponseEntity.ok(page);}
}

到此,我们的搜索服务久差不多完成了。不考虑数据同步这一块的话,已经具备了按多条件的查询、排序,以及输入文字的高亮显示:

 那么问题来了,我们这个是一步一步的将数据挪过来的,当数据库的数据发生了变化,我们怎么去对这个数据进行一个同步的跟新呢?

接下来就要介绍的工具:通过RabbitMQ消息队列+Canal同步工具实现对数据库的数据的一个实时监控,这个工具的安装久不细说了,有很多教程。我们直接往下去实现好了。

首先我们需要新加一个数据同步服务:继承父项目,下面是依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns=".0.0" xmlns:xsi=""xsi:schemaLocation=".0.0 .0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.blb</groupId><artifactId>wisdom_education</artifactId><version>0.0.1-SNAPSHOT</version></parent><groupId>com.blb</groupId><artifactId>wisdom_education_datasync</artifactId><version>0.0.1-SNAPSHOT</version><name>wisdom_education_datasync</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.xpand</groupId><artifactId>starter-canal</artifactId><version>0.0.1-SNAPSHOT</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.29</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.0.1</version></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId></dependency><dependency><groupId>com.blb</groupId><artifactId>common_api</artifactId><version>0.0.1-SNAPSHOT</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

然后就是RabbitMQ的一些配置了。

在登录上RabbitMQ后,新建一个admin 账户:

创建两个交换机:

 名字尽量取好点,点添加就好:

 好需要创建两个队列:

还有Canal的安装直接用window是的比较方便,要是启动报错记得去,可以去一个配置文件中处理一行配置,具体是什么忘了,希望别碰到最好,哈哈。

我们得先看看在数据库中的binlog配置

SHOW VARIABLES LIKE '%log_bin%'

ON表示开启了,OFF的话需要去设置它开启:

  windows配置文件是MySQL安装目录的my.ini

   linux在/etc/myf 

   修改:

   [mysqld]log-bin=mysql-binbinlog-format=ROWserver_id=1

进入mysql,创建canal用户并授权:

  create user canal@'%'IDENTIFIED WITH mysql_native_password BY 'canal';GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';FLUSH PRIVILEGES;

还需要添加配置:

进入canal目录,修改配置文件:

我们先在数据库中输入命令:

show master status

接着去修改canal配置: 

vim conf/example/instance.peoperties

windows的话直接去目录找:

然后启动canal:

它主要的问题有:

      1. 异常信息 authentication error,数据库账号和密码配置错误
      2. 异常信息 can't find position,检查配置的文件名和位置,再删除conf/example/meta.dat 重启
      3. 客户端版本兼容问题,canal的版本和客户端的版本要一致
 

然后去服务中配置它们:这是我的数据同步服务的配置文件的配置:

canal.client.instances.example.host=127.0.0.1
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000spring.application.name=wisdom-education-datasync
# 注册nacos
spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848
# nacos配置中心地址
spring.cloud.nacos.config.server-addr=127.0.0.1:8848
# 配置文件的前缀
spring.cloud.nacos.config.prefix=wisdom-education-datasync
# 后缀
spring.cloud.nacos.config.file-extension=properties
# profile
spring.profiles.active=dev
#开启全部端点
management.endpoints.web.exposure.include=*spring.rabbitmq.host=192.168.64.200
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=myhost

 Nacos部分:

server.port=8812spring.application.name=wisdom-education-datasyncspring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/wisdom_education_advertisement?serverTimeZone=UTC&useUnicode=true&characterEncoding=UTF-8
spring.datasource.username=root
spring.datasource.password=rootmybatis-plus.mapper-locations=classpath:mapper/*.xml
mybatis-plus.configuration.map-underscore-to-camel-case=true
mybatis-plus.type-aliases-package=com.blbmon_api.entityspring.redis.host=192.168.64.200
spring.redis.port=6379
spring.redis.database=0
spring.redis.jedis.pool.max-active=100
spring.redis.jedis.pool.max-wait=100ms
spring.redis.jedis.pool.max-idle=100
spring.redis.jedis.pool.min-idle=10

我们得写一个接口,通过广告课程数据的id去查询所有课程的接口:

这里就不细说了,很简单的一个接口:同样的用feign去调用它

接下来是RabbitMQ的配置类;

package com.blb.wisdom_education_datasync.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {public static final String QUEUE_SAVE = "edu.course.save.queue";public static final String QUEUE_REMOVE = "edu.course.remove.queue";public static final String EXCHANGE_COURSE = "edu.course.exchange";public static final String KEY_SAVE = "edu.course.save.key";public static final String KEY_REMOVE = "edu.course.remove.key";//生成队列@Beanpublic Queue queueSave(){return new Queue(QUEUE_SAVE);}@Beanpublic Queue queueRemove(){return new Queue(QUEUE_REMOVE);}//生成主题模式交换机@Beanpublic TopicExchange exchange(){return new TopicExchange(EXCHANGE_COURSE);}//生成绑定@Beanpublic Binding bindingSave(){return BindingBuilder.bind(queueSave()).to(exchange()).with(KEY_SAVE);}@Beanpublic Binding bindingRemove(){return BindingBuilder.bind(queueRemove()).to(exchange()).with(KEY_REMOVE);}
}

 我们要实现数据的同步的话,整个流程大致是这个样的;

 所以我们在搜索服务中也需要加一个listener监听器:

package com.blb.wisdom_education_elasticsearch.listener;import com.alibaba.fastjson.JSON;
import com.blbmon.entity.Course;
import com.blb.wisdom_education_elasticsearch.service.ICourseService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** 课程消息监听器*/
@Slf4j
@Component
public class CourseMsgListener {public static final String QUEUE_SAVE = "edu.course.save.queue";public static final String QUEUE_REMOVE = "edu.course.remove.queue";public static final String EXCHANGE_COURSE = "edu.course.exchange";public static final String KEY_SAVE = "edu.course.save.key";public static final String KEY_REMOVE = "edu.course.remove.key";@Autowiredprivate ICourseService courseService;//监听添加或更新操作的队列@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE_SAVE, durable = "true"),exchange = @Exchange(value = EXCHANGE_COURSE, ignoreDeclarationExceptions = "true"),key = KEY_SAVE))public void receiveSaveMsg(String json, Message message, Channel channel) {//收到同步服务发送的课程对象Course course = JSON.parseObject(json, Course.class);courseService.saveOrUpdate(course);log.info("接收课程数据:{}", course);}//监听删除操作的队列@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE_REMOVE, durable = "true"),exchange = @Exchange(value = EXCHANGE_COURSE, ignoreDeclarationExceptions = "true"),key = KEY_REMOVE))public void receiveRemoveMsg(String id, Message message, Channel channel) {courseService.removeById(id);log.info("接收删除id:{}", id);}
}

然后是数据同步服务的监听器:

package com.blb.wisdom_education_datasync.listener;import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.blbmon.entity.Course;
import com.blb.wisdom_education_datasync.config.RabbitMQConfig;
import com.blb.wisdom_education_datasync.feign.CourseFeignClient;
import com.xpand.starter.canal.annotation.CanalEventListener;
import com.xpand.starter.canal.annotation.ListenPoint;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;import java.util.List;/*** 监听课程表的监听器*/
@CanalEventListener
public class CourseCanalListener {@Autowiredprivate CourseFeignClient courseFeignClient;@Autowiredprivate RabbitTemplate rabbitTemplate;@ListenPoint(schema = "wisdom_education_course",table = "course")public void courseChange(CanalEntry.EventType eventType, CanalEntry.RowData rowData){System.out.println("1111");//获得操作的类型String type = eventType.toString();if("INSERT".equals(type) || "UPDATE".equals(type)){List<CanalEntry.Column> list = rowData.getAfterColumnsList();//获得课程idString id = null;for(CanalEntry.Column column : list){if("id".equals(column.getName())){id = column.getValue();break;}}//调用课程服务查询课程信息Course course = courseFeignClient.getCourseById(Long.valueOf(id));//发送消息给搜索服务System.out.println(type+":"+course);rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_COURSE,RabbitMQConfig.KEY_SAVE, JSON.toJSONString(course));}else if("DELETE".equals(type)){List<CanalEntry.Column> list = rowData.getBeforeColumnsList();//获得课程idString id = null;for(CanalEntry.Column column : list){if("id".equals(column.getName())){id = column.getValue();break;}}//发送消息给搜索服务rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_COURSE,RabbitMQConfig.KEY_REMOVE, id);}}
}

至此加上前端的代码就可以完成一个Elasticsearch+RabbitMQ+Canal的一个数据过滤和数据同步的效果了。运行有bug的话,希望自己加油d,RabbiotMQ消息保证问题久不说了添加一些小配置即可。

完结,谢谢。

都看到这儿了不给个赞或者订阅嘛?