spring boot +springboot集成es7.9.1+canal同步到es

spring boot +springboot集成es7.9.1+canal同步到es

  • 前言
    • 参考资料来源
    • rocketmq
    • elasticsearch
    • canal
    • 消费MQ订阅的canal信息,进行elasticsearch的同步以及搜索

未经许可,请勿转载。

前言

  1. 其实大部分的代码是来源于参考资料来源主要代码实现,我只是在他的基础上增加自定义注解,自定义分词器等。需要看详细源码的可以去看主要代码实现,结合我的来使用。
  2. 有人会问为什么需要自定义注解,因为elasticsearch7.6 索引将去除type 没有类型的概念了。所以我自己自定义数据类型,有需要的可以自己拓展自己需要的类型。
  3. 我这里主要写的是代码实现,没有涉及到中间件的搭建,因为真的没有时间,哈哈。

参考资料来源

主要实现代码:=gitee_search
自定义注解:
自定义分词器:
Canal胶水层:

rocketmq

  1. mq maven依赖
            <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version></dependency>
  1. mq 适配器
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;/*** @author gideon*/
@Configuration
@RequiredArgsConstructor
public class RocketMqAdapter {private final RocketMQMessageConverter rocketMqMessageConverter;@Value("${rocketmq.name-server:}")private String nameServer;public RocketMQTemplate getTemplateByTopicName(String topic){RocketMQTemplate mqTemplate = new RocketMQTemplate();DefaultMQProducer producer = new DefaultMQProducer(topic);producer.setNamesrvAddr(nameServer);producer.setRetryTimesWhenSendFailed(2);producer.setSendMsgTimeout((int) RocketMqConstant.TIMEOUT);mqTemplate.setProducer(producer);mqTemplate.setMessageConverter(rocketMqMessageConverter.getMessageConverter());return mqTemplate;}}
  1. mq的一些常量信息RocketMqConstant
/*** nameserver用;分割* 同步消息,如果两次*/
public class RocketMqConstant {// 延迟消息 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h (1-18)/*** 自动收货时间,实际上7天*/public static final int ORDER_AUTO_RECEIPT_DELAY_LEVEL = 60 * 24 * 7;/*** 默认发送消息超时时间*/public static final long TIMEOUT = 3000;/*** 订单取消退款*/public static final String ORDER_REFUND_TOPIC = "order-refund-topic";/*** 订单自动收货*/public static final String AUTO_RECEIPT_TOPIC = "auto-receipt-topic";/*** 服务订单订单支付成功*/public static final String ORDER_NOTIFY_TOPIC = "order-notify-topic";/*** canal-topic*/public static final String CANAL_TOPIC = "canal-topic";}
  1. mq的配置类
import com.onecode.dtg.basic.RocketMqAdapter;
import com.onecode.dtg.basic.RocketMqConstant;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;/*** @author gideon*/
@Configuration
@RequiredArgsConstructor
public class RocketMqConfig {private final RocketMqAdapter rocketMqAdapter;@Lazy@Bean(destroyMethod = "destroy")public RocketMQTemplate autoReceiptTemplate() {return rocketMqAdapter.getTemplateByTopicName(RocketMqConstant.AUTO_RECEIPT_TOPIC);}
}
  1. mq 的配置文件信息
rocketmq:name-server: 127.0.0.1:9876

elasticsearch

elasticsearch的搭建我在这里就不不多bb了,你们自行百度,下面是资料。

  1. maven所需依赖
 	</properties><elasticsearch.version>7.9.1</elasticsearch.version></properties><dependencies><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>${elasticsearch.version}</version></dependency><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>${elasticsearch.version}</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-client</artifactId><version>${elasticsearch.version}</version><exclusions><exclusion><artifactId>commons-logging</artifactId><groupId>commons-logging</groupId></exclusion></exclusions></dependency></dependencies>
  1. 分词所需资料 elasticsearch搭建的资料,点击这里。
  2. elasticsearch的yml
# elastic的地址
elastic:hostname: 127.0.0.1port: 9200
  1. elasticsearch 启动配置类
import lombok.RequiredArgsConstructor;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author gideon*/
@Configuration
@RequiredArgsConstructor
public class ElasticConfig {@Value("${elastic.hostname}")private String hostname;@Value("${elastic.port}")private int port;@Beanpublic RestHighLevelClient restHighLevelClient() {return new RestHighLevelClient(RestClient.builder(new HttpHost(hostname, port)));}
}
  1. elasticsearch 自定义注解,AnalyzerType在下面。
import java.lang.annotation.*;/*** @author gideon* @date 2022/9/8*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@Documented
@Inherited
public @interface EsField {FieldType type() default FieldType.TEXT;/*** 指定分词器** @return AnalyzerType*/AnalyzerType analyzer() default AnalyzerType.STANDARD;
}
  1. elasticsearch 自定义AnalyzerType,因为我自己的业务需要所以我加了一个自定义分词器comma
import lombok.Getter;/*** @author gideon* @date 2022/9/8*/
@Getter
public enum AnalyzerType {/*** 不使用分词*/NO("不使用分词"),/*** 标准分词,默认分词器*/STANDARD("standard"),/*** ik_smart:会做最粗粒度的拆分;已被分出的词语将不会再次被其它词语占有*/IK_SMART("ik_smart"),/*** ik_max_word :会将文本做最细粒度的拆分;尽可能多的拆分出词语*/IK_MAX_WORD("ik_max_word"),/*** ik_max_word :会将文本做逗号分词*/COMMA("comma"),;private final String type;AnalyzerType(String type) {this.type = type;}}
  1. elasticsearch 自定义FieldType
import lombok.Getter;/*** @author gideon* @date 2022/9/8*/
@Getter
public enum FieldType {/****/TEXT("text"),KEYWORD("keyword"),INTEGER("integer"),DOUBLE("double"),DATE("date"),LONG("long"),/*** 单条数据*/OBJECT("object"),/*** 嵌套数组*/NESTED("nested"),;FieldType(String type){this.type = type;}private final String type;
}
  1. elasticsearch索引名称枚举
/*** es当中的index** @author gideon*/
public enum EsIndexEnum {/*** 护理员*/SERVER("server"),;private final String value;public String value() {return value;}EsIndexEnum(String value) {this.value = value;}
}
  1. elasticsearch 创建索引代码EsIndexCreateService,CommonBizException是我自定义的异常,你们可以使用自己自定义的异常类。
import com.onecode.dtg.basicmon.enums.ResultCode;
import com.onecode.dtg.basicmon.es.annotation.EsField;
import com.onecode.dtg.basicmon.es.enums.FieldType;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearchmon.xcontent.XContentBuilder;
import org.elasticsearchmon.xcontent.XContentFactory;
import org.springframework.stereotype.Service;import java.io.IOException;
import java.lang.reflect.Field;/*** @author gideon* @date 2022/9/8*/
@Slf4j
@Service
@RequiredArgsConstructor
public class EsIndexCreateService {private final RestHighLevelClient restHighLevelClient;/*** 不需要逗号分词器索引** @param indexName 索引名称* @param clazz     同步到es的实体类* @return boolean*/public boolean createIndex(String indexName, Class<?> clazz) {return createIndex(indexName, clazz, false);}/*** 建立索引** @param indexName 索引名称* @param comma     是否需要逗号分词器* @return boolean*/public boolean createIndex(String indexName, Class<?> clazz, Boolean comma) {try {
//            判断索引是否存在GetIndexRequest getIndexRequest = new GetIndexRequest(indexName);boolean exists = restHighLevelClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);if (exists) {return true;}CreateIndexRequest request = new CreateIndexRequest(indexName);if (comma) {XContentBuilder settingsBuilder = XContentFactory.jsonBuilder().startObject().startObject("analysis").startObject("analyzer").startObject("comma").field("type", "pattern")
//                        将分词器规则定义为按照","进行分词.field("pattern", ",").endObject().endObject().endObject().endObject();request.settings(settingsBuilder);}//            这里创建索引结构request.mapping(generateBuilder(clazz));CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
//            指示是否所有节点都已确认请求boolean acknowledged = response.isAcknowledged();
//            指示是否在超时之前为索引中的每个分片启动了必需的分片副本数boolean shardsAcknowledged = response.isShardsAcknowledged();if (acknowledged || shardsAcknowledged) {log.info("创建索引成功!索引名称为{}", indexName);return true;}return false;} catch (IOException e) {throw new CommonBizException(ResultCode.FAIL.getModel(), "创建索引:" + indexName + "失败。");}}/*** 生成es索引** @param clazz 对于的es实体* @return XContentBuilder*/public static XContentBuilder generateBuilder(Class<?> clazz) {try {
//        获取索引名称及类型XContentBuilder builder = XContentFactory.jsonBuilder();builder.startObject();builder.startObject("properties");Field[] declaredFields = clazz.getDeclaredFields();for (Field declaredField : declaredFields) {if (declaredField.isAnnotationPresent(EsField.class)) {
//                获取注解EsField declaredAnnotation = declaredField.getDeclaredAnnotation(EsField.class);
//                        如果嵌套对象/*** {*   "mappings": {*     "properties": {*       "region": {*         "type": "keyword"*       },*       "manager": {*         "properties": {*           "age":  { "type": "integer" },*           "name": {*             "properties": {*               "first": { "type": "text" },*               "last":  { "type": "text" }*             }*           }*         }*       }*     }*   }* }*/if (declaredAnnotation.type() == FieldType.OBJECT) {
//                    获取当前类的对象-- ActionClass<?> type = declaredField.getType();Field[] typeDeclaredFields = type.getDeclaredFields();builder.startObject(declaredField.getName());builder.startObject("properties");
//                    遍历该对象中的所有属性for (Field field : typeDeclaredFields) {if (field.isAnnotationPresent(EsField.class)) {
//                            获取注解EsField fieldDeclaredAnnotation = field.getDeclaredAnnotation(EsField.class);builder.startObject(field.getName());builder.field("type", fieldDeclaredAnnotation.type().getType());
//                            keyword不需要分词if (fieldDeclaredAnnotation.type() == FieldType.TEXT) {builder.field("analyzer", fieldDeclaredAnnotation.analyzer().getType());}builder.endObject();}}builder.endObject();builder.endObject();} else {builder.startObject(declaredField.getName());builder.field("type", declaredAnnotation.type().getType());
//                        keyword不需要分词if (declaredAnnotation.type() == FieldType.TEXT) {builder.field("analyzer", declaredAnnotation.analyzer().getType());}builder.endObject();}}}
//            对应propertybuilder.endObject();builder.endObject();return builder;} catch (IOException e) {throw new CommonBizException(ResultCode.FAIL.getModel(), "创建索引失败。");}}
}

canal

因为我这里使用的是第三方的canal jar包,也就是上面说到的Canal胶水层
获取地址:
引入的maven

		<dependency><groupId>cn.throwx</groupId><artifactId>canal-glue-core</artifactId><version>1.0</version><scope>system</scope><systemPath>${pom.basedir}/lib/canal-glue-core.jar</systemPath></dependency>

类似于下面这样放

  1. canal的canal.properties配置文件信息,这里主要看你使用什么信息队列就配置什么。我使用的是RocketMQ,然后需要创建一个topic去监听数据库的操作日志,配置topic在rocketmq.producer.group = canal-topic
#################################################
######### 		common argument		#############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rocketMQ
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60# network config
canal.instancework.receiveBufferSize = 16384
canal.instancework.sendBufferSize = 16384
canal.instancework.soTimeout = 30# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB# binlog ddl isolation
canal.instance.get.ddl.isolation = false# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360#################################################
######### 		destinations		#############
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = falsecanal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xmlcanal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml##################################################
######### 	      MQ Properties      #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = localcanal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8##################################################
######### 		     Kafka 		     #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafkapression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"##################################################
######### 		    RocketMQ	     #############
##################################################
rocketmq.producer.group = canal-topic
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 192.168.1.46:9876
rocketmq.retry.times.when.send.failed = 3
rocketmq.vip.channel.enabled = false
rocketmq.tag =##################################################
######### 		    RabbitMQ	     #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =
  1. canal的instance.properties配置文件信息,canal.instance.filter.regex这个参数可以指定监听的数据库->表
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0# enable gtid use true/false
canal.instance.gtidon=false# position info
canal.instance.master.address=192.168.1.46:3306
canal.instance.master.journal.name=mysql-binlog.000001
canal.instance.master.position=0
canal.instance.master.timestamp=
canal.instance.master.gtid=# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=# table meta tsdb info
canal.instance.tsdb.enable=false
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regex
canal.instance.filter.regex=mp_biz_service.server:*,mp_biz_service.shop_service_server:*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config
canal.mq.topic=canal-topic
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################
  1. canal自定义处理器NdCanalBinLogEventParser
import cn.throwx.canal.gulemon.BinLogEventType;
import cn.throwx.canal.gulemon.OperationType;
import cn.throwx.canal.gule.model.CanalBinLogEvent;
import cn.throwx.canal.gule.model.CanalBinLogResult;
import cn.throwx.canal.gule.support.parser.BaseCommonEntryFunction;
import cn.throwx.canal.gule.support.parser.BasePrimaryKeyTupleFunction;
import cn.throwx.canal.gule.support.parser.CanalBinLogEventParser;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;import java.util.*;/** * @author gideon*/
@Slf4j
public class NdCanalBinLogEventParser implements CanalBinLogEventParser {@Overridepublic <T> List<CanalBinLogResult<T>> parse(CanalBinLogEvent event, Class<T> klass, BasePrimaryKeyTupleFunction primaryKeyFunction, BaseCommonEntryFunction<T> commonEntryFunction) {BinLogEventType eventType = BinLogEventType.fromType(event.getType());if (Objects.equals(BinLogEventType.CREATE, eventType) || Objects.equals(BinLogEventType.ALTER, eventType)) {if (log.isDebugEnabled()) {log.debug("监听到不需要处理或者未知的binlog事件类型[{}],将忽略解析过程返回空列表,binlog事件:{}", eventType, JSON.toJSONString(event));}return Collections.emptyList();}if (BinLogEventType.UNKNOWN != eventType && BinLogEventType.QUERY != eventType) {if (Boolean.TRUE.equals(event.getIsDdl())) {CanalBinLogResult<T> entry = new CanalBinLogResult<>();entry.setOperationType(OperationType.DDL);entry.setBinLogEventType(eventType);entry.setDatabaseName(event.getDatabase());entry.setTableName(event.getTable());entry.setSql(event.getSql());return Collections.singletonList(entry);} else {Optional.ofNullable(event.getPkNames()).filter((x) -> x.size() == 1).orElseThrow(() -> new IllegalArgumentException("DML类型binlog事件主键列数量不为1"));String primaryKeyName = event.getPkNames().get(0);List<CanalBinLogResult<T>> entryList = new LinkedList<>();List<Map<String, String>> data = event.getData();List<Map<String, String>> old = event.getOld();int dataSize = null != data ? data.size() : 0;int oldSize = null != old ? old.size() : 0;if (dataSize > 0) {for(int index = 0; index < dataSize; ++index) {CanalBinLogResult<T> entry = new CanalBinLogResult<>();entryList.add(entry);entry.setSql(event.getSql());entry.setOperationType(OperationType.DML);entry.setBinLogEventType(eventType);entry.setTableName(event.getTable());entry.setDatabaseName(event.getDatabase());Map<String, String> item = data.get(index);entry.setAfterData(commonEntryFunction.apply(item));Map<String, String> oldItem = null;if (oldSize > 0 && index <= oldSize) {oldItem = old.get(index);entry.setBeforeData(commonEntryFunction.apply(oldItem));}entry.setPrimaryKey(primaryKeyFunction.apply(oldItem, item, primaryKeyName));}}return entryList;}} else {if (log.isDebugEnabled()) {log.debug("监听到不需要处理或者未知的binlog事件类型[{}],将忽略解析过程返回空列表,binlog事件:{}", eventType, JSON.toJSONString(event));}return Collections.emptyList();}}private NdCanalBinLogEventParser() {}public static NdCanalBinLogEventParser of() {return new NdCanalBinLogEventParser();}
}
  1. canal自定义处理器NdCanalBinlogEventProcessorFactory
import cn.throwx.canal.gule.model.ModelTable;
import cn.throwx.canal.gule.support.processor.BaseCanalBinlogEventProcessor;
import cn.throwx.canal.gule.support.processor.CanalBinlogEventProcessorFactory;import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;/*** @author gideon*/
public class NdCanalBinlogEventProcessorFactory implements CanalBinlogEventProcessorFactory {private final ConcurrentMap<ModelTable, List<BaseCanalBinlogEventProcessor<?>>> cache = new ConcurrentHashMap<>(16);@Overridepublic void register(ModelTable modelTable, BaseCanalBinlogEventProcessor<?> processor) {synchronized(this.cache) {this.cache.putIfAbsent(modelTable, new LinkedList<>());this.cache.get(modelTable).add(processor);}}@Overridepublic List<BaseCanalBinlogEventProcessor<?>> get(ModelTable modelTable) {return this.cache.get(modelTable);}private NdCanalBinlogEventProcessorFactory() {}public static NdCanalBinlogEventProcessorFactory of() {return new NdCanalBinlogEventProcessorFactory();}
}
  1. canal自定义处理器NdCanalGlue
import cn.throwx.canal.gule.CanalGlue;
import cn.throwx.canal.gule.model.CanalBinLogEvent;
import cn.throwx.canal.gule.model.ModelTable;
import cn.throwx.canal.gule.support.adapter.SourceAdapterFacade;
import cn.throwx.canal.gule.support.processor.BaseCanalBinlogEventProcessor;
import cn.throwx.canal.gule.support.processor.CanalBinlogEventProcessorFactory;import java.util.List;/*** @author gideon*/
public class NdCanalGlue implements CanalGlue {private final CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory;@Overridepublic void process(String content) {CanalBinLogEvent event = SourceAdapterFacade.X.adapt(CanalBinLogEvent.class, content);ModelTable modelTable = ModelTable.of(event.getDatabase(), event.getTable());List<BaseCanalBinlogEventProcessor<?>> baseCanalBinlogEventProcessors = this.canalBinlogEventProcessorFactory.get(modelTable);if (baseCanalBinlogEventProcessors.isEmpty()) {return;}baseCanalBinlogEventProcessors.forEach((processor) -> processor.process(event));}private NdCanalGlue(CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory) {this.canalBinlogEventProcessorFactory = canalBinlogEventProcessorFactory;}public static NdCanalGlue of(CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory) {return new NdCanalGlue(canalBinlogEventProcessorFactory);}
}
  1. canal配置类
import cn.throwx.canal.gule.CanalGlue;
import cn.throwx.canal.gule.support.parser.*;
import cn.throwx.canal.gule.support.parser.converter.CanalFieldConverterFactory;
import cn.throwx.canal.gule.support.parser.converter.InMemoryCanalFieldConverterFactory;
import cn.throwx.canal.gule.support.processor.BaseCanalBinlogEventProcessor;
import cn.throwx.canal.gule.support.processor.CanalBinlogEventProcessorFactory;
import com.onecode.middle.search.service.canal.NdCanalBinLogEventParser;
import com.onecode.middle.search.service.canal.NdCanalBinlogEventProcessorFactory;
import com.onecode.middle.search.service.canal.NdCanalGlue;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;import java.util.Map;/*** @author gideon*/
@Configuration
public class CanalGlueAutoConfiguration implements SmartInitializingSingleton, BeanFactoryAware {private ConfigurableListableBeanFactory configurableListableBeanFactory;@Bean@ConditionalOnMissingBeanpublic CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory() {return NdCanalBinlogEventProcessorFactory.of();}@Bean@ConditionalOnMissingBeanpublic ModelTableMetadataManager modelTableMetadataManager(CanalFieldConverterFactory canalFieldConverterFactory) {return InMemoryModelTableMetadataManager.of(canalFieldConverterFactory);}@Bean@ConditionalOnMissingBeanpublic CanalFieldConverterFactory canalFieldConverterFactory() {return InMemoryCanalFieldConverterFactory.of();}@Bean@ConditionalOnMissingBeanpublic CanalBinLogEventParser canalBinLogEventParser() {return NdCanalBinLogEventParser.of();}@Bean@ConditionalOnMissingBeanpublic ParseResultInterceptorManager parseResultInterceptorManager(ModelTableMetadataManager modelTableMetadataManager) {return InMemoryParseResultInterceptorManager.of(modelTableMetadataManager);}@Bean@Primarypublic CanalGlue canalGlue(CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory) {return NdCanalGlue.of(canalBinlogEventProcessorFactory);}@Overridepublic void setBeanFactory(BeanFactory beanFactory) throws BeansException {this.configurableListableBeanFactory = (ConfigurableListableBeanFactory) beanFactory;}@SuppressWarnings({"rawtypes", "unchecked"})@Overridepublic void afterSingletonsInstantiated() {ParseResultInterceptorManager parseResultInterceptorManager= configurableListableBeanFactory.getBean(ParseResultInterceptorManager.class);ModelTableMetadataManager modelTableMetadataManager= configurableListableBeanFactory.getBean(ModelTableMetadataManager.class);CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory= configurableListableBeanFactory.getBean(CanalBinlogEventProcessorFactory.class);CanalBinLogEventParser canalBinLogEventParser= configurableListableBeanFactory.getBean(CanalBinLogEventParser.class);Map<String, BaseParseResultInterceptor> interceptors= configurableListableBeanFactory.getBeansOfType(BaseParseResultInterceptor.class);interceptors.forEach((k, interceptor) -> parseResultInterceptorManager.registerParseResultInterceptor(interceptor));Map<String, BaseCanalBinlogEventProcessor> processors= configurableListableBeanFactory.getBeansOfType(BaseCanalBinlogEventProcessor.class);processors.forEach((k, processor) -> processor.init(canalBinLogEventParser, modelTableMetadataManager,canalBinlogEventProcessorFactory, parseResultInterceptorManager));}
}
  1. ServerBO canal转换的实体类,@CanalModel 的参数database是对应的数据库,table是对应数据库下的数据表,因为这个数据是需要同步到es的所以设置了类型。
import cn.throwx.canal.gule.annotation.CanalModel;
import cn.throwx.canal.gulemon.FieldNamingPolicy;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.onecode.dtg.basicmon.es.annotation.EsField;
import com.onecode.dtg.basicmon.es.enums.AnalyzerType;
import com.onecode.dtg.basicmon.es.enums.FieldType;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;import java.time.LocalDate;
import java.time.LocalDateTime;/*** @author gideon*/
@Data
@CanalModel(database = "mp_biz_service", table = "server", fieldNamingPolicy = FieldNamingPolicy.LOWER_UNDERSCORE)
public class ServerBO {@ApiModelProperty("id")@EsField(type = FieldType.LONG)private Long id;@ApiModelProperty("用户标识")@EsField(type = FieldType.LONG)private Long userId;@ApiModelProperty("类型")@EsField(type = FieldType.KEYWORD)private String type;@ApiModelProperty("姓名")@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)private String name;@ApiModelProperty("值为1时是男性,值为2时是女性,值为0时是未知")@EsField(type = FieldType.KEYWORD)private String gender;@ApiModelProperty("出生年月日")@JsonFormat(pattern = "yyyy-MM-dd")@EsField(type = FieldType.DATE)private LocalDate birthday;@ApiModelProperty("学历")@EsField(type = FieldType.KEYWORD)private String education;@ApiModelProperty("从业时间")@JsonFormat(pattern = "yyyy-MM-dd")@EsField(type = FieldType.DATE)private LocalDate practiceDate;@ApiModelProperty("评级")@EsField(type = FieldType.KEYWORD)private String level;@ApiModelProperty("认证标签")@EsField(type = FieldType.TEXT)private String authLabel;@ApiModelProperty("勋章(逗号隔开)")@EsField(type = FieldType.TEXT)private String medal;@ApiModelProperty("服务评分")@EsField(type = FieldType.INTEGER)private Integer serviceScore;@ApiModelProperty("已实名认证")@EsField(type = FieldType.INTEGER)private Integer realNameAuth;@ApiModelProperty("身份证号")@EsField(type = FieldType.TEXT)private String idCardNo;@ApiModelProperty("户籍-省")@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)private String idCardProvince;@ApiModelProperty("户籍-市")@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)private String idCardCity;@ApiModelProperty("户籍-区")@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)private String idCardRegion;@ApiModelProperty("手机号")@EsField(type = FieldType.TEXT)private String phone;@ApiModelProperty("现住-省")@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)private String presentProvince;@ApiModelProperty("现住-市")@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)private String presentCity;@ApiModelProperty("现住-区")@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)private String presentRegion;@ApiModelProperty("现住-地址")@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)private String presentAddress;@ApiModelProperty("头像")@EsField(type = FieldType.TEXT)private String head;@ApiModelProperty("使用状态(初始化:init;正常:normal,禁用:ban)")@EsField(type = FieldType.KEYWORD)private String useStatus;@ApiModelProperty("审核状态 (待审核:await;通过:pass,驳回:reject)")@EsField(type = FieldType.KEYWORD)private String auditStatus;@ApiModelProperty("驳回理由")@EsField(type = FieldType.TEXT)private String rejectReason;@ApiModelProperty("注册时间")@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")@EsField(type = FieldType.DATE)private LocalDateTime regDate;@ApiModelProperty("介绍-内容")@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)private String introContent;@ApiModelProperty("介绍-视频")@EsField(type = FieldType.TEXT)private String introVideo;@ApiModelProperty("介绍-标签")@EsField(type = FieldType.TEXT)private String introLabel;@ApiModelProperty("商家标识")@EsField(type = FieldType.LONG)private Long merchantId;@ApiModelProperty("组织标识")@EsField(type = FieldType.LONG)private Long orgId;@ApiModelProperty("护理员申请sku,多个逗号隔开")@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.COMMA)private String skuId;@ApiModelProperty("护理员排班数据,多个逗号隔开")@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.COMMA)private String schedule;/*** 逻辑删除*/@EsField(type = FieldType.INTEGER)private Integer del;/*** 创建人*/@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)private String createBy;/*** 创建时间*/@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")@EsField(type = FieldType.DATE)private LocalDateTime createTime;/*** 更新者*/@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)private String updateBy;/*** 更新时间*/@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")@EsField(type = FieldType.DATE)private LocalDateTime updateTime;}
  1. ShopServiceServer canal转换的实体类,@CanalModel 的参数database是对应的数据库,table是对应数据库下的数据表
import cn.throwx.canal.gule.annotation.CanalModel;
import cn.throwx.canal.gulemon.FieldNamingPolicy;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;import java.time.LocalDateTime;/*** @author gideon*/
@Data
@CanalModel(database = "mp_biz_service", table = "shop_service_server", fieldNamingPolicy = FieldNamingPolicy.LOWER_UNDERSCORE)
public class ShopServiceServerBO {@ApiModelProperty("ID")private Long id;@ApiModelProperty("产品标识")private Long productId;@ApiModelProperty("服务者用户标识")private Long serverUserId;@ApiModelProperty("产品sku标识")private Long productSkuId;@ApiModelProperty(value = "盈利")private Integer profit;@ApiModelProperty("商家标识")private Long merchantId;@ApiModelProperty("组织标识")private Long orgId;/*** 逻辑删除*/private Integer del;/*** 创建人*/private String createBy;/*** 创建时间*/private LocalDateTime createTime;/*** 更新者*/private String updateBy;/*** 更新时间*/private LocalDateTime updateTime;
}
  1. 至此canal的基础代码就完成。

消费MQ订阅的canal信息,进行elasticsearch的同步以及搜索

  1. 监听我们上面canal-topic订阅的消息然后进行同步数据CanalListener
import cn.throwx.canal.gule.CanalGlue;
import com.onecode.dtg.basic.RocketMqConstant;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** canal消费数据库操作日志mq** @author gideon*/
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(topic = RocketMqConstant.CANAL_TOPIC, consumerGroup = RocketMqConstant.CANAL_TOPIC)
public class CanalListener implements RocketMQListener<String> {private final CanalGlue canalGlue;@Overridepublic void onMessage(String message) {canalGlue.process(message);}
}
  1. 对我们需要监听的表进行处理ServerCanalListener,这里面的hutool是一个工具类有需要的可以自行引入
<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.6</version>
</dependency>
import cn.hutool.json.JSONUtil;
import cn.throwx.canal.gule.model.CanalBinLogEvent;
import cn.throwx.canal.gule.model.CanalBinLogResult;
import cn.throwx.canal.gule.support.processor.BaseCanalBinlogEventProcessor;
import cn.throwx.canal.gule.support.processor.ExceptionHandler;
import com.onecode.dtg.basicmon.core.exception.CommonBizException;
import com.onecode.dtg.basicmon.enums.ResultCode;
import com.onecode.middle.search.service.bo.ServerBO;
import com.onecode.middle.search.service.constant.EsIndexEnum;
import com.onecode.middle.search.service.util.EsIndexCreateService;
import com.onecode.service.feign.ServerFeignClient;
import com.onecode.service.feign.bo.EsServerBO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearchmon.xcontent.XContentType;
import org.springframework.stereotype.Component;import java.io.IOException;/*** @author gideon*/
@Slf4j
@Component
@RequiredArgsConstructor
public class ServerCanalListener extends BaseCanalBinlogEventProcessor<ServerBO> {private final EsIndexCreateService esIndexCreateService;private final ServerFeignClient serverFeignClient;private final RestHighLevelClient restHighLevelClient;/*** 插入护理员,此时插入es*/@Overrideprotected void processInsertInternal(CanalBinLogResult<ServerBO> result) {Long serverId = result.getPrimaryKey();EsServerBO esServerBO = serverFeignClient.loadEsServerBO(serverId);if (esServerBO == null) {throw new CommonBizException(ResultCode.FAIL.getModel(), "创建索引异常");}//        创建索引boolean index = esIndexCreateService.createIndex(EsIndexEnum.SERVER.value(), ServerBO.class, true);if (!index) {throw new CommonBizException(ResultCode.FAIL.getModel(), "创建索引:" + EsIndexEnum.SERVER.value() + "失败。");}IndexRequest request = new IndexRequest(EsIndexEnum.SERVER.value());request.id(String.valueOf(serverId));request.source(JSONUtil.toJsonStr(esServerBO), XContentType.JSON);try {IndexResponse indexResponse = restHighLevelClient.index(request, RequestOptions.DEFAULT);log.info(indexResponse.toString());} catch (IOException e) {log.error(e.toString());throw new CommonBizException(ResultCode.FAIL.getModel(), "保存es信息异常:" + e);}}/*** 更新护理员,删除护理员索引,再重新构建一个*/@Overrideprotected void processUpdateInternal(CanalBinLogResult<ServerBO> result) {Long spuId = result.getPrimaryKey();EsServerBO esServerBO = serverFeignClient.loadEsServerBO(spuId);String source = JSONUtil.toJsonStr(esServerBO);//        创建索引boolean index = esIndexCreateService.createIndex(EsIndexEnum.SERVER.value(), ServerBO.class, true);if (!index) {throw new CommonBizException(ResultCode.FAIL.getModel(), "创建索引:" + EsIndexEnum.SERVER.value() + "失败。");}UpdateRequest request = new UpdateRequest(EsIndexEnum.SERVER.value(), String.valueOf(spuId));request.doc(source, XContentType.JSON);request.docAsUpsert(true);try {UpdateResponse updateResponse = restHighLevelClient.update(request, RequestOptions.DEFAULT);log.info(updateResponse.toString());} catch (IOException e) {log.error(e.toString());throw new CommonBizException(ResultCode.FAIL.getModel(), "删除es信息异常:" + e);}}@Overrideprotected ExceptionHandler exceptionHandler() {return (CanalBinLogEvent event, Throwable throwable) -> {throw new CommonBizException(ResultCode.FAIL.getModel(), "创建索引异常:" + throwable);};}}
  1. 这个表的监听,是因为我的业务需求,shop_service_server表增加或者删除的时候需要将skuId加到server表的skuId字段里面去,所以需要监听修改。ShopServiceServerCanalListener
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import cn.throwx.canal.gule.model.CanalBinLogResult;
import cn.throwx.canal.gule.support.processor.BaseCanalBinlogEventProcessor;
import com.onecode.dtg.basicmon.core.exception.CommonBizException;
import com.onecode.dtg.basicmon.enums.ResultCode;
import com.onecode.middle.search.service.bo.ShopServiceServerBO;
import com.onecode.middle.search.service.manager.ServerUpdateManager;
import com.onecode.service.feign.ServerFeignClient;
import com.onecode.service.feign.bo.EsServerBO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import java.util.List;/*** @author gideon*/
@Slf4j
@Component
@RequiredArgsConstructor
public class ShopServiceServerCanalListener extends BaseCanalBinlogEventProcessor<ShopServiceServerBO> {private final ServerFeignClient serverFeignClient;private final ServerUpdateManager serverUpdateManager;/*** 新增商品服务者数据** @param result result*/@Overrideprotected void processInsertInternal(CanalBinLogResult<ShopServiceServerBO> result) {//数据库操作后的数据ShopServiceServerBO afterData = result.getAfterData();EsServerBO loadServerBO = loadServerBO(afterData.getServerUserId());List<String> skuIdList = StrUtil.split(loadServerBO.getSkuId(), ",");skuIdList.add(afterData.getProductSkuId().toString());EsServerBO esServerBO = new EsServerBO();esServerBO.setSkuId(StrUtil.join(",", skuIdList));serverUpdateManager.esUpdateServerByServerId(loadServerBO.getId(), esServerBO);}/*** 更新商品服务者数据** @param result result*/@Overrideprotected void processUpdateInternal(CanalBinLogResult<ShopServiceServerBO> result) {//数据库执行操作后的数据ShopServiceServerBO afterData = result.getAfterData();//del字段是我的表是否逻辑删除的判断,大家根据自己需要去掉if ("1".equals(afterData.getDel())) {return;}//微服务项目调用接口查询数据EsServerBO loadEsServerBO = loadServerBO(afterData.getServerUserId());//处理修改后的数据EsServerBO esServerBO = dealWithData(afterData, loadEsServerBO);serverUpdateManager.esUpdateServerByServerId(loadEsServerBO.getId(), esServerBO);}/*** 删除商品服务者数据** @param result result*/@Overrideprotected void processDeleteInternal(CanalBinLogResult<ShopServiceServerBO> result) {//数据库操作前的数据ShopServiceServerBO beforeData = result.getBeforeData();EsServerBO loadServerBO = loadServerBO(beforeData.getServerUserId());EsServerBO esServerBO = dealWithData(beforeData, loadServerBO);serverUpdateManager.esUpdateServerByServerId(loadServerBO.getId(), esServerBO);}/*** 处理数据** @param data 数据库操作数据* @return EsServerBO*/private EsServerBO dealWithData(ShopServiceServerBO data, EsServerBO loadEsServerBO) {List<String> skuIdList = StrUtil.split(loadEsServerBO.getSkuId(), ",");CollUtil.removeAny(skuIdList, data.getProductSkuId().toString());EsServerBO esServerBO = new EsServerBO();esServerBO.setSkuId(StrUtil.join(",", skuIdList));return esServerBO;}/*** 获取护理员书信息** @param serverUserId 护理员用户标识* @return EsServerBO*/private EsServerBO loadServerBO(Long serverUserId) {EsServerBO loadEsServerBO = serverFeignClient.loadEsServerBoByServerUserId(serverUserId);if (loadEsServerBO == null) {throw new CommonBizException(ResultCode.FAIL.getModel(),"es数据同步失败:无法通过护工用户标识:" + serverUserId + "找到护理员信息。");}return loadEsServerBO;}
}
  1. ServerUpdateManager,这个是ShopServiceServerListener的处理实现类
import cn.hutool.json.JSONUtil;
import com.onecode.dtg.basicmon.core.exception.CommonBizException;
import com.onecode.dtg.basicmon.enums.ResultCode;
import com.onecode.middle.search.service.constant.EsIndexEnum;
import com.onecode.service.feign.bo.EsServerBO;
import lombok.RequiredArgsConstructor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearchmon.xcontent.XContentType;
import org.springframework.stereotype.Component;/*** @author gideon*/
@Component
@RequiredArgsConstructor
public class ServerUpdateManager {private final RestHighLevelClient restHighLevelClient;/*** 批量更新es中的商品信息** @param serverId      护理员标识* @param esServerBO 更新的数据*/public void esUpdateServerByServerId(Long serverId, EsServerBO esServerBO) {String source = JSONUtil.toJsonStr(esServerBO);try {BulkRequest request = new BulkRequest();// 准备更新的数据request.add(new UpdateRequest(EsIndexEnum.SERVER.value(), String.valueOf(serverId)).doc(source, XContentType.JSON));//更新BulkResponse bulkResponse = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);if (bulkResponse.hasFailures()) {throw new CommonBizException(ResultCode.FAIL.getModel(), bulkResponse.buildFailureMessage());}} catch (Exception e) {throw new CommonBizException(ResultCode.FAIL.getModel(), e.getMessage());}}
}
  1. ServerSearchManager是搜索接口实现
import cn.hutool.json.JSONUtil;
import com.onecode.dtg.basicmon.core.exception.CommonBizException;
import com.onecode.dtg.basicmon.enums.ResultCode;
import com.onecode.dtg.basicmon.util.ColumnUtil;
import com.onecode.dtg.basicmon.util.LocalDateUtil;
import com.onecode.middle.search.service.bo.ServerBO;
import com.onecode.middle.search.service.constant.EsIndexEnum;
import com.onecode.middle.search.service.dto.ServerSearchDTO;
import com.onecode.middle.search.service.vo.EsPageVO;
import com.onecode.middle.search.service.vo.search.EsServerVO;
import com.onecode.service.feign.constant.AuditStatus;
import com.onecode.service.feign.constant.UseStatus;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.Operator;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;/*** @author gideon*/
@Slf4j
@Component
@RequiredArgsConstructor
public class ServerSearchManager {private final RestHighLevelClient restHighLevelClient;/*** 通过搜索信息分页搜索es数据的信息** @param serverSearchDTO 护理员搜索条件* @return 搜索结果*/public EsPageVO<EsServerVO> pageSearchResult(ServerSearchDTO serverSearchDTO) {//1、动态构建出查询需要的DSL语句EsPageVO<EsServerVO> result;//1、准备检索请求SearchRequest searchRequest = buildSearchRequest(serverSearchDTO);try {//2、执行检索请求SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);log.info("搜索返回结果:" + response.toString());//3、分析响应数据,封装成我们需要的格式result = buildSearchResult(serverSearchDTO, response);} catch (IOException e) {log.error(e.toString());throw new CommonBizException(ResultCode.FAIL.getModel(), "搜索服务出了点小差,请稍后再试:" + e);}return result;}/*** 构建结果数据*/private EsPageVO<EsServerVO> buildSearchResult(ServerSearchDTO dto, SearchResponse response) {EsPageVO<EsServerVO> esPageVO = new EsPageVO<>();//1、返回的所有查询到的商品SearchHits hits = response.getHits();List<EsServerVO> productSearchs = getEsOrderBOList(response);esPageVO.setList(productSearchs);//===============分页信息====================////总记录数long total = hits.getTotalHits().value;esPageVO.setTotal(total);// 总页码int totalPages = (int) total % dto.getPageSize() == 0 ?(int) total / dto.getPageSize() : ((int) total / dto.getPageSize() + 1);esPageVO.setPages(totalPages);return esPageVO;}private List<EsServerVO> getEsOrderBOList(SearchResponse response) {return getOrderListByResponse(response.getHits().getHits());}/*** 从es返回的数据中获取spu列表** @param hits es返回的数据* @return*/private List<EsServerVO> getOrderListByResponse(SearchHit[] hits) {List<EsServerVO> esOrders = new ArrayList<>();for (SearchHit hit : hits) {EsServerVO esOrder = JSONUtil.toBean(hit.getSourceAsString(), EsServerVO.class);esOrders.add(esOrder);}return esOrders;}/*** 准备检索请求** @param param 搜索参数* @return*/private SearchRequest buildSearchRequest(ServerSearchDTO param) {SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();// 构建bool-queryBoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();// 过滤filterQueryIfNecessary(param, boolQueryBuilder);// 关键字搜索keywordSearch(param, boolQueryBuilder);// 排序sort(searchSourceBuilder, boolQueryBuilder);//分页searchSourceBuilder.from((param.getPageNum() - 1) * param.getPageSize());searchSourceBuilder.size(param.getPageSize());log.info("构建的DSL语句 {}", searchSourceBuilder);return new SearchRequest(new String[]{EsIndexEnum.SERVER.value()}, searchSourceBuilder);}/*** 关键字搜索*/private void keywordSearch(ServerSearchDTO param, BoolQueryBuilder boolQueryBuilder) {BoolQueryBuilder keywordShouldQuery = QueryBuilders.boolQuery();
//        现住-省if (Objects.nonNull(param.getPresentProvince())) {keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getPresentProvince), param.getPresentProvince()).operator(Operator.AND));}
//        现住-市if (Objects.nonNull(param.getPresentCity())) {keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getPresentCity), param.getPresentCity()).operator(Operator.AND));}
//        现住-区if (Objects.nonNull(param.getPresentRegion())) {keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getPresentRegion), param.getPresentRegion()).operator(Operator.AND));}
//        户籍-省if (Objects.nonNull(param.getIdCardProvince())) {keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getIdCardProvince), param.getIdCardProvince()).operator(Operator.AND));}
//        户籍-市if (Objects.nonNull(param.getIdCardCity())) {keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getIdCardCity), param.getIdCardCity()).operator(Operator.AND));}
//        户籍-区if (Objects.nonNull(param.getIdCardRegion())) {keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getIdCardRegion), param.getIdCardRegion()).operator(Operator.AND));}//        标签if (Objects.nonNull(param.getIntroLabels())) {keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getIntroLabel), param.getIntroLabels()).operator(Operator.AND));}
//        排班,使用了我的自定义逗号分词器,所以是根据逗号分隔后进行匹配的,但是需要多个匹配,我就使用了for循环,应该是有优化的地方,暂时没处理if (param.getServiceStartDate() != null && param.getServiceEndDate() != null) {List<String> scheduleList = LocalDateUtil.getContinuousTime(param.getServiceStartDate(), param.getServiceEndDate(), DateTimeFormatter.ofPattern("yyyyMMdd"));for (String schedule : scheduleList) {keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getSchedule), schedule).operator(Operator.AND));}}
//        skuId,使用了我的自定义逗号分词器,所以是根据逗号分隔后进行匹配的if (Objects.nonNull(param.getSkuId())) {keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getSkuId), param.getSkuId()).operator(Operator.AND));}boolQueryBuilder.must(keywordShouldQuery);}/*** 进行排序*/private void sort(SearchSourceBuilder searchSourceBuilder, BoolQueryBuilder boolQueryBuilder) {searchSourceBuilder.sort(ColumnUtil.getName(ServerBO::getCreateTime), SortOrder.DESC);searchSourceBuilder.query(boolQueryBuilder);}/*** 过滤查询条件,如果有必要的话** @param param            查询条件* @param boolQueryBuilder 组合进boolQueryBuilder*/private void filterQueryIfNecessary(ServerSearchDTO param, BoolQueryBuilder boolQueryBuilder) {
//        类型if (Objects.nonNull(param.getType())) {boolQueryBuilder.filter(QueryBuilders.termQuery(ColumnUtil.getName(ServerBO::getType), param.getType()));}
//        性别if (Objects.nonNull(param.getGender())) {boolQueryBuilder.filter(QueryBuilders.termQuery(ColumnUtil.getName(ServerBO::getGender), param.getGender()));}
//        学历if (Objects.nonNull(param.getEducation())) {boolQueryBuilder.filter(QueryBuilders.termQuery(ColumnUtil.getName(ServerBO::getEducation), param.getEducation()));}boolQueryBuilder.filter(QueryBuilders.termQuery(ColumnUtil.getName(ServerBO::getAuditStatus), AuditStatus.PASS.getStatus()));boolQueryBuilder.filter(QueryBuilders.termQuery(ColumnUtil.getName(ServerBO::getUseStatus), UseStatus.NORMAL.getStatus()));boolQueryBuilder.filter(QueryBuilders.termQuery(ColumnUtil.getName(ServerBO::getDel), 0));}
}
  1. ServerSearchController
import com.onecode.dtg.basicmon.model.ResultBean;
import com.onecode.middle.search.service.dto.ServerSearchDTO;
import com.onecode.middle.search.service.manager.ServerSearchManager;
import com.onecode.middle.search.service.vo.EsPageVO;
import com.onecode.middle.search.service.vo.search.EsServerVO;
import io.swagger.annotations.Api;
import lombok.AllArgsConstructor;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;/*** @author gideon* @date 2022/9/6*/
@Validated
@AllArgsConstructor
@RestController
@RequestMapping("/search/server/")
@Api(tags = "api-服务者搜索接口")
public class ServerSearchController {private final ServerSearchManager serverSearchManager;@PostMapping("/page")public ResultBean<EsPageVO<EsServerVO>> page(@RequestBody ServerSearchDTO dto) {return new ResultBean<>(serverSearchManager.pageSearchResult(dto));}}
  1. 分页参数EsPageDTO实体类
import com.onecode.dtg.basicmon.util.PrincipalUtil;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;import javax.validation.constraints.NotNull;
import java.util.Arrays;/*** @author gideon*/
@Data
public class EsPageDTO {public static final String ASC = "ASC";public static final String DESC = "DESC";/*** 最大分页大小,如果分页大小大于500,则用500作为分页的大小。防止有人直接传入一个较大的数,导致服务器内存溢出宕机*/public static final Integer MAX_PAGE_SIZE = 500;/*** 当前页*/@NotNull(message = "pageNum 不能为空")@ApiModelProperty(value = "当前页", required = true)private Integer pageNum;@NotNull(message = "pageSize 不能为空")@ApiModelProperty(value = "每页大小", required = true)private Integer pageSize;@ApiModelProperty(value = "排序字段数组,用逗号分割")private String[] columns;@ApiModelProperty(value = "排序字段方式,用逗号分割,ASC正序,DESC倒序")private String[] orders;public Integer getPageNum() {return pageNum;}public void setPageNum(Integer pageNum) {this.pageNum = pageNum;}public Integer getPageSize() {return pageSize;}public void setPageSize(Integer pageSize) {if (pageSize > MAX_PAGE_SIZE) {this.pageSize = MAX_PAGE_SIZE;return;}this.pageSize = pageSize;}public String getOrderBy() {return order(this.columns, this.orders);}public String[] getColumns() {return columns;}public void setColumns(String[] columns) {this.columns = columns;}public String[] getOrders() {return orders;}public void setOrders(String[] orders) {this.orders = orders;}public static String order(String[] columns, String[] orders) {if (columns == null || columns.length == 0) {return "";}StringBuilder stringBuilder = new StringBuilder();for (int x = 0; x < columns.length; x++) {String column = columns[x];String order;if (orders != null && orders.length > x) {order = orders[x].toUpperCase();if (!(order.equals(ASC) || order.equals(DESC))) {throw new IllegalArgumentException("非法的排序策略:" + column);}} else {order = ASC;}// 判断列名称的合法性,防止SQL注入。只能是【字母,数字,下划线】if (PrincipalUtil.isField(column)) {throw new IllegalArgumentException("非法的排序字段名称:" + column);}// 驼峰转换为下划线column = humpConversionUnderscore(column);if (x != 0) {stringBuilder.append(", ");}stringBuilder.append("`").append(column).append("` ").append(order);}return stringBuilder.toString();}public static String humpConversionUnderscore(String value) {StringBuilder stringBuilder = new StringBuilder();char[] chars = value.toCharArray();for (char character : chars) {if (Character.isUpperCase(character)) {stringBuilder.append("_");character = Character.toLowerCase(character);}stringBuilder.append(character);}return stringBuilder.toString();}@Overridepublic String toString() {return "EsPageDTO{" +"pageNum=" + pageNum +", pageSize=" + pageSize +", columns=" + Arrays.toString(columns) +", orders=" + Arrays.toString(orders) +'}';}
}
  1. 查询参数实体类ServerSearchDTO
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;import javax.validation.constraints.NotNull;
import java.time.LocalDate;/*** @author gideon*/
@EqualsAndHashCode(callSuper = true)
@Data
public class ServerSearchDTO extends EsPageDTO{@ApiModelProperty("类型")private String type;@ApiModelProperty("值为1时是男性,值为2时是女性,值为0时是未知")private String gender;@ApiModelProperty("学历")private String education;@ApiModelProperty("户籍-省")private String idCardProvince;@ApiModelProperty("户籍-市")private String idCardCity;@ApiModelProperty("户籍-区")private String idCardRegion;@ApiModelProperty("现住-省")private String presentProvince;@ApiModelProperty("现住-市")private String presentCity;@ApiModelProperty("现住-区")private String presentRegion;@ApiModelProperty("介绍-标签(多个值需要使用逗号分割)")private String introLabels;@ApiModelProperty("服务开始时间")@NotNull(message = "服务开始时间不能为空")private LocalDate serviceStartDate;@ApiModelProperty("服务结束时间")@NotNull(message = "服务结束时间不能为空")private LocalDate serviceEndDate;@ApiModelProperty("skuId")@NotNull(message = "skuId不能为空。")private Long skuId;}
  1. 返回值EsServerVO参数
/*** @author gideon* @date 2022/9/5*/
@Data
public class EsServerVO {@ApiModelProperty("id")private Long id;@ApiModelProperty("用户标识")private Long userId;@ApiModelProperty("类型")private String type;@ApiModelProperty("姓名")private String name;@ApiModelProperty("值为1时是男性,值为2时是女性,值为0时是未知")private String gender;@ApiModelProperty("出生年月日")@JsonFormat(pattern = "yyyy-MM-dd")private LocalDate birthday;@ApiModelProperty("学历")private String education;@ApiModelProperty("从业时间")@JsonFormat(pattern = "yyyy-MM-dd")private LocalDate practiceDate;@ApiModelProperty("评级")private String level;@ApiModelProperty("认证标签")private String authLabel;@ApiModelProperty("勋章(逗号隔开)")private String medal;@ApiModelProperty("服务评分")private Integer serviceScore;@ApiModelProperty("已实名认证")private Integer realNameAuth;@ApiModelProperty("身份证号")private String idCardNo;@ApiModelProperty("户籍-省")private String idCardProvince;@ApiModelProperty("户籍-市")private String idCardCity;@ApiModelProperty("户籍-区")private String idCardRegion;@ApiModelProperty("手机号")private String phone;@ApiModelProperty("现住-省")private String presentProvince;@ApiModelProperty("现住-市")private String presentCity;@ApiModelProperty("现住-区")private String presentRegion;@ApiModelProperty("现住-地址")private String presentAddress;@ApiModelProperty("头像")private String head;@ApiModelProperty("使用状态(初始化:init;正常:normal,禁用:ban)")private String useStatus;@ApiModelProperty("审核状态 (待审核:await;通过:pass,驳回:reject)")private String auditStatus;@ApiModelProperty("驳回理由")private String rejectReason;@ApiModelProperty("注册时间")@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime regDate;@ApiModelProperty("介绍-内容")private String introContent;@ApiModelProperty("介绍-视频")private String introVideo;@ApiModelProperty("介绍-标签")private String introLabel;@ApiModelProperty("商家标识")private Long merchantId;@ApiModelProperty("组织标识")private Long orgId;@ApiModelProperty("护理员申请sku,多个逗号隔开")private String skuId;@ApiModelProperty("护理员排班数据,多个逗号隔开")private String schedule;/*** 逻辑删除*/private Integer del;/*** 创建人*/private String createBy;/*** 创建时间*/@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime createTime;/*** 更新者*/private String updateBy;/*** 更新时间*/@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime updateTime;
}
  1. 分页返回值EsPageVO
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;import java.util.List;/*** @author gideon* @date 2022/9/5*/
@Data
public class EsPageVO<T> {@ApiModelProperty("总页数")private Integer pages;@ApiModelProperty("总条目数")private Long total;@ApiModelProperty("结果集")private List<T> list;
}

spring boot +springboot集成es7.9.1+canal同步到es

spring boot +springboot集成es7.9.1+canal同步到es

  • 前言
    • 参考资料来源
    • rocketmq
    • elasticsearch
    • canal
    • 消费MQ订阅的canal信息,进行elasticsearch的同步以及搜索

未经许可,请勿转载。

前言

  1. 其实大部分的代码是来源于参考资料来源主要代码实现,我只是在他的基础上增加自定义注解,自定义分词器等。需要看详细源码的可以去看主要代码实现,结合我的来使用。
  2. 有人会问为什么需要自定义注解,因为elasticsearch7.6 索引将去除type 没有类型的概念了。所以我自己自定义数据类型,有需要的可以自己拓展自己需要的类型。
  3. 我这里主要写的是代码实现,没有涉及到中间件的搭建,因为真的没有时间,哈哈。

参考资料来源

主要实现代码:=gitee_search
自定义注解:
自定义分词器:
Canal胶水层:

rocketmq

  1. mq maven依赖
            <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version></dependency>
  1. mq 适配器
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;/*** @author gideon*/
@Configuration
@RequiredArgsConstructor
public class RocketMqAdapter {private final RocketMQMessageConverter rocketMqMessageConverter;@Value("${rocketmq.name-server:}")private String nameServer;public RocketMQTemplate getTemplateByTopicName(String topic){RocketMQTemplate mqTemplate = new RocketMQTemplate();DefaultMQProducer producer = new DefaultMQProducer(topic);producer.setNamesrvAddr(nameServer);producer.setRetryTimesWhenSendFailed(2);producer.setSendMsgTimeout((int) RocketMqConstant.TIMEOUT);mqTemplate.setProducer(producer);mqTemplate.setMessageConverter(rocketMqMessageConverter.getMessageConverter());return mqTemplate;}}
  1. mq的一些常量信息RocketMqConstant
/*** nameserver用;分割* 同步消息,如果两次*/
public class RocketMqConstant {// 延迟消息 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h (1-18)/*** 自动收货时间,实际上7天*/public static final int ORDER_AUTO_RECEIPT_DELAY_LEVEL = 60 * 24 * 7;/*** 默认发送消息超时时间*/public static final long TIMEOUT = 3000;/*** 订单取消退款*/public static final String ORDER_REFUND_TOPIC = "order-refund-topic";/*** 订单自动收货*/public static final String AUTO_RECEIPT_TOPIC = "auto-receipt-topic";/*** 服务订单订单支付成功*/public static final String ORDER_NOTIFY_TOPIC = "order-notify-topic";/*** canal-topic*/public static final String CANAL_TOPIC = "canal-topic";}
  1. mq的配置类
import com.onecode.dtg.basic.RocketMqAdapter;
import com.onecode.dtg.basic.RocketMqConstant;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;/*** @author gideon*/
@Configuration
@RequiredArgsConstructor
public class RocketMqConfig {private final RocketMqAdapter rocketMqAdapter;@Lazy@Bean(destroyMethod = "destroy")public RocketMQTemplate autoReceiptTemplate() {return rocketMqAdapter.getTemplateByTopicName(RocketMqConstant.AUTO_RECEIPT_TOPIC);}
}
  1. mq 的配置文件信息
rocketmq:name-server: 127.0.0.1:9876

elasticsearch

elasticsearch的搭建我在这里就不不多bb了,你们自行百度,下面是资料。

  1. maven所需依赖
 	</properties><elasticsearch.version>7.9.1</elasticsearch.version></properties><dependencies><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>${elasticsearch.version}</version></dependency><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>${elasticsearch.version}</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-client</artifactId><version>${elasticsearch.version}</version><exclusions><exclusion><artifactId>commons-logging</artifactId><groupId>commons-logging</groupId></exclusion></exclusions></dependency></dependencies>
  1. 分词所需资料 elasticsearch搭建的资料,点击这里。
  2. elasticsearch的yml
# elastic的地址
elastic:hostname: 127.0.0.1port: 9200
  1. elasticsearch 启动配置类
import lombok.RequiredArgsConstructor;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author gideon*/
@Configuration
@RequiredArgsConstructor
public class ElasticConfig {@Value("${elastic.hostname}")private String hostname;@Value("${elastic.port}")private int port;@Beanpublic RestHighLevelClient restHighLevelClient() {return new RestHighLevelClient(RestClient.builder(new HttpHost(hostname, port)));}
}
  1. elasticsearch 自定义注解,AnalyzerType在下面。
import java.lang.annotation.*;/*** @author gideon* @date 2022/9/8*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@Documented
@Inherited
public @interface EsField {FieldType type() default FieldType.TEXT;/*** 指定分词器** @return AnalyzerType*/AnalyzerType analyzer() default AnalyzerType.STANDARD;
}
  1. elasticsearch 自定义AnalyzerType,因为我自己的业务需要所以我加了一个自定义分词器comma
import lombok.Getter;/*** @author gideon* @date 2022/9/8*/
@Getter
public enum AnalyzerType {/*** 不使用分词*/NO("不使用分词"),/*** 标准分词,默认分词器*/STANDARD("standard"),/*** ik_smart:会做最粗粒度的拆分;已被分出的词语将不会再次被其它词语占有*/IK_SMART("ik_smart"),/*** ik_max_word :会将文本做最细粒度的拆分;尽可能多的拆分出词语*/IK_MAX_WORD("ik_max_word"),/*** ik_max_word :会将文本做逗号分词*/COMMA("comma"),;private final String type;AnalyzerType(String type) {this.type = type;}}
  1. elasticsearch 自定义FieldType
import lombok.Getter;/*** @author gideon* @date 2022/9/8*/
@Getter
public enum FieldType {/****/TEXT("text"),KEYWORD("keyword"),INTEGER("integer"),DOUBLE("double"),DATE("date"),LONG("long"),/*** 单条数据*/OBJECT("object"),/*** 嵌套数组*/NESTED("nested"),;FieldType(String type){this.type = type;}private final String type;
}
  1. elasticsearch索引名称枚举
/*** es当中的index** @author gideon*/
public enum EsIndexEnum {/*** 护理员*/SERVER("server"),;private final String value;public String value() {return value;}EsIndexEnum(String value) {this.value = value;}
}
  1. elasticsearch 创建索引代码EsIndexCreateService,CommonBizException是我自定义的异常,你们可以使用自己自定义的异常类。
import com.onecode.dtg.basicmon.enums.ResultCode;
import com.onecode.dtg.basicmon.es.annotation.EsField;
import com.onecode.dtg.basicmon.es.enums.FieldType;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearchmon.xcontent.XContentBuilder;
import org.elasticsearchmon.xcontent.XContentFactory;
import org.springframework.stereotype.Service;import java.io.IOException;
import java.lang.reflect.Field;/*** @author gideon* @date 2022/9/8*/
@Slf4j
@Service
@RequiredArgsConstructor
public class EsIndexCreateService {private final RestHighLevelClient restHighLevelClient;/*** 不需要逗号分词器索引** @param indexName 索引名称* @param clazz     同步到es的实体类* @return boolean*/public boolean createIndex(String indexName, Class<?> clazz) {return createIndex(indexName, clazz, false);}/*** 建立索引** @param indexName 索引名称* @param comma     是否需要逗号分词器* @return boolean*/public boolean createIndex(String indexName, Class<?> clazz, Boolean comma) {try {
//            判断索引是否存在GetIndexRequest getIndexRequest = new GetIndexRequest(indexName);boolean exists = restHighLevelClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);if (exists) {return true;}CreateIndexRequest request = new CreateIndexRequest(indexName);if (comma) {XContentBuilder settingsBuilder = XContentFactory.jsonBuilder().startObject().startObject("analysis").startObject("analyzer").startObject("comma").field("type", "pattern")
//                        将分词器规则定义为按照","进行分词.field("pattern", ",").endObject().endObject().endObject().endObject();request.settings(settingsBuilder);}//            这里创建索引结构request.mapping(generateBuilder(clazz));CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
//            指示是否所有节点都已确认请求boolean acknowledged = response.isAcknowledged();
//            指示是否在超时之前为索引中的每个分片启动了必需的分片副本数boolean shardsAcknowledged = response.isShardsAcknowledged();if (acknowledged || shardsAcknowledged) {log.info("创建索引成功!索引名称为{}", indexName);return true;}return false;} catch (IOException e) {throw new CommonBizException(ResultCode.FAIL.getModel(), "创建索引:" + indexName + "失败。");}}/*** 生成es索引** @param clazz 对于的es实体* @return XContentBuilder*/public static XContentBuilder generateBuilder(Class<?> clazz) {try {
//        获取索引名称及类型XContentBuilder builder = XContentFactory.jsonBuilder();builder.startObject();builder.startObject("properties");Field[] declaredFields = clazz.getDeclaredFields();for (Field declaredField : declaredFields) {if (declaredField.isAnnotationPresent(EsField.class)) {
//                获取注解EsField declaredAnnotation = declaredField.getDeclaredAnnotation(EsField.class);
//                        如果嵌套对象/*** {*   "mappings": {*     "properties": {*       "region": {*         "type": "keyword"*       },*       "manager": {*         "properties": {*           "age":  { "type": "integer" },*           "name": {*             "properties": {*               "first": { "type": "text" },*               "last":  { "type": "text" }*             }*           }*         }*       }*     }*   }* }*/if (declaredAnnotation.type() == FieldType.OBJECT) {
//                    获取当前类的对象-- ActionClass<?> type = declaredField.getType();Field[] typeDeclaredFields = type.getDeclaredFields();builder.startObject(declaredField.getName());builder.startObject("properties");
//                    遍历该对象中的所有属性for (Field field : typeDeclaredFields) {if (field.isAnnotationPresent(EsField.class)) {
//                            获取注解EsField fieldDeclaredAnnotation = field.getDeclaredAnnotation(EsField.class);builder.startObject(field.getName());builder.field("type", fieldDeclaredAnnotation.type().getType());
//                            keyword不需要分词if (fieldDeclaredAnnotation.type() == FieldType.TEXT) {builder.field("analyzer", fieldDeclaredAnnotation.analyzer().getType());}builder.endObject();}}builder.endObject();builder.endObject();} else {builder.startObject(declaredField.getName());builder.field("type", declaredAnnotation.type().getType());
//                        keyword不需要分词if (declaredAnnotation.type() == FieldType.TEXT) {builder.field("analyzer", declaredAnnotation.analyzer().getType());}builder.endObject();}}}
//            对应propertybuilder.endObject();builder.endObject();return builder;} catch (IOException e) {throw new CommonBizException(ResultCode.FAIL.getModel(), "创建索引失败。");}}
}

canal

因为我这里使用的是第三方的canal jar包,也就是上面说到的Canal胶水层
获取地址:
引入的maven

		<dependency><groupId>cn.throwx</groupId><artifactId>canal-glue-core</artifactId><version>1.0</version><scope>system</scope><systemPath>${pom.basedir}/lib/canal-glue-core.jar</systemPath></dependency>

类似于下面这样放

  1. canal的canal.properties配置文件信息,这里主要看你使用什么信息队列就配置什么。我使用的是RocketMQ,然后需要创建一个topic去监听数据库的操作日志,配置topic在rocketmq.producer.group = canal-topic
#################################################
######### 		common argument		#############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rocketMQ
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60# network config
canal.instancework.receiveBufferSize = 16384
canal.instancework.sendBufferSize = 16384
canal.instancework.soTimeout = 30# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB# binlog ddl isolation
canal.instance.get.ddl.isolation = false# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360#################################################
######### 		destinations		#############
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = falsecanal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xmlcanal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml##################################################
######### 	      MQ Properties      #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = localcanal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8##################################################
######### 		     Kafka 		     #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafkapression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"##################################################
######### 		    RocketMQ	     #############
##################################################
rocketmq.producer.group = canal-topic
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 192.168.1.46:9876
rocketmq.retry.times.when.send.failed = 3
rocketmq.vip.channel.enabled = false
rocketmq.tag =##################################################
######### 		    RabbitMQ	     #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =
  1. canal的instance.properties配置文件信息,canal.instance.filter.regex这个参数可以指定监听的数据库->表
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0# enable gtid use true/false
canal.instance.gtidon=false# position info
canal.instance.master.address=192.168.1.46:3306
canal.instance.master.journal.name=mysql-binlog.000001
canal.instance.master.position=0
canal.instance.master.timestamp=
canal.instance.master.gtid=# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=# table meta tsdb info
canal.instance.tsdb.enable=false
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regex
canal.instance.filter.regex=mp_biz_service.server:*,mp_biz_service.shop_service_server:*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config
canal.mq.topic=canal-topic
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################
  1. canal自定义处理器NdCanalBinLogEventParser
import cn.throwx.canal.gulemon.BinLogEventType;
import cn.throwx.canal.gulemon.OperationType;
import cn.throwx.canal.gule.model.CanalBinLogEvent;
import cn.throwx.canal.gule.model.CanalBinLogResult;
import cn.throwx.canal.gule.support.parser.BaseCommonEntryFunction;
import cn.throwx.canal.gule.support.parser.BasePrimaryKeyTupleFunction;
import cn.throwx.canal.gule.support.parser.CanalBinLogEventParser;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;import java.util.*;/** * @author gideon*/
@Slf4j
public class NdCanalBinLogEventParser implements CanalBinLogEventParser {@Overridepublic <T> List<CanalBinLogResult<T>> parse(CanalBinLogEvent event, Class<T> klass, BasePrimaryKeyTupleFunction primaryKeyFunction, BaseCommonEntryFunction<T> commonEntryFunction) {BinLogEventType eventType = BinLogEventType.fromType(event.getType());if (Objects.equals(BinLogEventType.CREATE, eventType) || Objects.equals(BinLogEventType.ALTER, eventType)) {if (log.isDebugEnabled()) {log.debug("监听到不需要处理或者未知的binlog事件类型[{}],将忽略解析过程返回空列表,binlog事件:{}", eventType, JSON.toJSONString(event));}return Collections.emptyList();}if (BinLogEventType.UNKNOWN != eventType && BinLogEventType.QUERY != eventType) {if (Boolean.TRUE.equals(event.getIsDdl())) {CanalBinLogResult<T> entry = new CanalBinLogResult<>();entry.setOperationType(OperationType.DDL);entry.setBinLogEventType(eventType);entry.setDatabaseName(event.getDatabase());entry.setTableName(event.getTable());entry.setSql(event.getSql());return Collections.singletonList(entry);} else {Optional.ofNullable(event.getPkNames()).filter((x) -> x.size() == 1).orElseThrow(() -> new IllegalArgumentException("DML类型binlog事件主键列数量不为1"));String primaryKeyName = event.getPkNames().get(0);List<CanalBinLogResult<T>> entryList = new LinkedList<>();List<Map<String, String>> data = event.getData();List<Map<String, String>> old = event.getOld();int dataSize = null != data ? data.size() : 0;int oldSize = null != old ? old.size() : 0;if (dataSize > 0) {for(int index = 0; index < dataSize; ++index) {CanalBinLogResult<T> entry = new CanalBinLogResult<>();entryList.add(entry);entry.setSql(event.getSql());entry.setOperationType(OperationType.DML);entry.setBinLogEventType(eventType);entry.setTableName(event.getTable());entry.setDatabaseName(event.getDatabase());Map<String, String> item = data.get(index);entry.setAfterData(commonEntryFunction.apply(item));Map<String, String> oldItem = null;if (oldSize > 0 && index <= oldSize) {oldItem = old.get(index);entry.setBeforeData(commonEntryFunction.apply(oldItem));}entry.setPrimaryKey(primaryKeyFunction.apply(oldItem, item, primaryKeyName));}}return entryList;}} else {if (log.isDebugEnabled()) {log.debug("监听到不需要处理或者未知的binlog事件类型[{}],将忽略解析过程返回空列表,binlog事件:{}", eventType, JSON.toJSONString(event));}return Collections.emptyList();}}private NdCanalBinLogEventParser() {}public static NdCanalBinLogEventParser of() {return new NdCanalBinLogEventParser();}
}
  1. canal自定义处理器NdCanalBinlogEventProcessorFactory
import cn.throwx.canal.gule.model.ModelTable;
import cn.throwx.canal.gule.support.processor.BaseCanalBinlogEventProcessor;
import cn.throwx.canal.gule.support.processor.CanalBinlogEventProcessorFactory;import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;/*** @author gideon*/
public class NdCanalBinlogEventProcessorFactory implements CanalBinlogEventProcessorFactory {private final ConcurrentMap<ModelTable, List<BaseCanalBinlogEventProcessor<?>>> cache = new ConcurrentHashMap<>(16);@Overridepublic void register(ModelTable modelTable, BaseCanalBinlogEventProcessor<?> processor) {synchronized(this.cache) {this.cache.putIfAbsent(modelTable, new LinkedList<>());this.cache.get(modelTable).add(processor);}}@Overridepublic List<BaseCanalBinlogEventProcessor<?>> get(ModelTable modelTable) {return this.cache.get(modelTable);}private NdCanalBinlogEventProcessorFactory() {}public static NdCanalBinlogEventProcessorFactory of() {return new NdCanalBinlogEventProcessorFactory();}
}
  1. canal自定义处理器NdCanalGlue
import cn.throwx.canal.gule.CanalGlue;
import cn.throwx.canal.gule.model.CanalBinLogEvent;
import cn.throwx.canal.gule.model.ModelTable;
import cn.throwx.canal.gule.support.adapter.SourceAdapterFacade;
import cn.throwx.canal.gule.support.processor.BaseCanalBinlogEventProcessor;
import cn.throwx.canal.gule.support.processor.CanalBinlogEventProcessorFactory;import java.util.List;/*** @author gideon*/
public class NdCanalGlue implements CanalGlue {private final CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory;@Overridepublic void process(String content) {CanalBinLogEvent event = SourceAdapterFacade.X.adapt(CanalBinLogEvent.class, content);ModelTable modelTable = ModelTable.of(event.getDatabase(), event.getTable());List<BaseCanalBinlogEventProcessor<?>> baseCanalBinlogEventProcessors = this.canalBinlogEventProcessorFactory.get(modelTable);if (baseCanalBinlogEventProcessors.isEmpty()) {return;}baseCanalBinlogEventProcessors.forEach((processor) -> processor.process(event));}private NdCanalGlue(CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory) {this.canalBinlogEventProcessorFactory = canalBinlogEventProcessorFactory;}public static NdCanalGlue of(CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory) {return new NdCanalGlue(canalBinlogEventProcessorFactory);}
}
  1. canal配置类
import cn.throwx.canal.gule.CanalGlue;
import cn.throwx.canal.gule.support.parser.*;
import cn.throwx.canal.gule.support.parser.converter.CanalFieldConverterFactory;
import cn.throwx.canal.gule.support.parser.converter.InMemoryCanalFieldConverterFactory;
import cn.throwx.canal.gule.support.processor.BaseCanalBinlogEventProcessor;
import cn.throwx.canal.gule.support.processor.CanalBinlogEventProcessorFactory;
import com.onecode.middle.search.service.canal.NdCanalBinLogEventParser;
import com.onecode.middle.search.service.canal.NdCanalBinlogEventProcessorFactory;
import com.onecode.middle.search.service.canal.NdCanalGlue;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;import java.util.Map;/*** @author gideon*/
@Configuration
public class CanalGlueAutoConfiguration implements SmartInitializingSingleton, BeanFactoryAware {private ConfigurableListableBeanFactory configurableListableBeanFactory;@Bean@ConditionalOnMissingBeanpublic CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory() {return NdCanalBinlogEventProcessorFactory.of();}@Bean@ConditionalOnMissingBeanpublic ModelTableMetadataManager modelTableMetadataManager(CanalFieldConverterFactory canalFieldConverterFactory) {return InMemoryModelTableMetadataManager.of(canalFieldConverterFactory);}@Bean@ConditionalOnMissingBeanpublic CanalFieldConverterFactory canalFieldConverterFactory() {return InMemoryCanalFieldConverterFactory.of();}@Bean@ConditionalOnMissingBeanpublic CanalBinLogEventParser canalBinLogEventParser() {return NdCanalBinLogEventParser.of();}@Bean@ConditionalOnMissingBeanpublic ParseResultInterceptorManager parseResultInterceptorManager(ModelTableMetadataManager modelTableMetadataManager) {return InMemoryParseResultInterceptorManager.of(modelTableMetadataManager);}@Bean@Primarypublic CanalGlue canalGlue(CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory) {return NdCanalGlue.of(canalBinlogEventProcessorFactory);}@Overridepublic void setBeanFactory(BeanFactory beanFactory) throws BeansException {this.configurableListableBeanFactory = (ConfigurableListableBeanFactory) beanFactory;}@SuppressWarnings({"rawtypes", "unchecked"})@Overridepublic void afterSingletonsInstantiated() {ParseResultInterceptorManager parseResultInterceptorManager= configurableListableBeanFactory.getBean(ParseResultInterceptorManager.class);ModelTableMetadataManager modelTableMetadataManager= configurableListableBeanFactory.getBean(ModelTableMetadataManager.class);CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory= configurableListableBeanFactory.getBean(CanalBinlogEventProcessorFactory.class);CanalBinLogEventParser canalBinLogEventParser= configurableListableBeanFactory.getBean(CanalBinLogEventParser.class);Map<String, BaseParseResultInterceptor> interceptors= configurableListableBeanFactory.getBeansOfType(BaseParseResultInterceptor.class);interceptors.forEach((k, interceptor) -> parseResultInterceptorManager.registerParseResultInterceptor(interceptor));Map<String, BaseCanalBinlogEventProcessor> processors= configurableListableBeanFactory.getBeansOfType(BaseCanalBinlogEventProcessor.class);processors.forEach((k, processor) -> processor.init(canalBinLogEventParser, modelTableMetadataManager,canalBinlogEventProcessorFactory, parseResultInterceptorManager));}
}
  1. ServerBO canal转换的实体类,@CanalModel 的参数database是对应的数据库,table是对应数据库下的数据表,因为这个数据是需要同步到es的所以设置了类型。
import cn.throwx.canal.gule.annotation.CanalModel;
import cn.throwx.canal.gulemon.FieldNamingPolicy;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.onecode.dtg.basicmon.es.annotation.EsField;
import com.onecode.dtg.basicmon.es.enums.AnalyzerType;
import com.onecode.dtg.basicmon.es.enums.FieldType;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;import java.time.LocalDate;
import java.time.LocalDateTime;/*** @author gideon*/
@Data
@CanalModel(database = "mp_biz_service", table = "server", fieldNamingPolicy = FieldNamingPolicy.LOWER_UNDERSCORE)
public class ServerBO {@ApiModelProperty("id")@EsField(type = FieldType.LONG)private Long id;@ApiModelProperty("用户标识")@EsField(type = FieldType.LONG)private Long userId;@ApiModelProperty("类型")@EsField(type = FieldType.KEYWORD)private String type;@ApiModelProperty("姓名")@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)private String name;@ApiModelProperty("值为1时是男性,值为2时是女性,值为0时是未知")@EsField(type = FieldType.KEYWORD)private String gender;@ApiModelProperty("出生年月日")@JsonFormat(pattern = "yyyy-MM-dd")@EsField(type = FieldType.DATE)private LocalDate birthday;@ApiModelProperty("学历")@EsField(type = FieldType.KEYWORD)private String education;@ApiModelProperty("从业时间")@JsonFormat(pattern = "yyyy-MM-dd")@EsField(type = FieldType.DATE)private LocalDate practiceDate;@ApiModelProperty("评级")@EsField(type = FieldType.KEYWORD)private String level;@ApiModelProperty("认证标签")@EsField(type = FieldType.TEXT)private String authLabel;@ApiModelProperty("勋章(逗号隔开)")@EsField(type = FieldType.TEXT)private String medal;@ApiModelProperty("服务评分")@EsField(type = FieldType.INTEGER)private Integer serviceScore;@ApiModelProperty("已实名认证")@EsField(type = FieldType.INTEGER)private Integer realNameAuth;@ApiModelProperty("身份证号")@EsField(type = FieldType.TEXT)private String idCardNo;@ApiModelProperty("户籍-省")@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)private String idCardProvince;@ApiModelProperty("户籍-市")@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)private String idCardCity;@ApiModelProperty("户籍-区")@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)private String idCardRegion;@ApiModelProperty("手机号")@EsField(type = FieldType.TEXT)private String phone;@ApiModelProperty("现住-省")@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)private String presentProvince;@ApiModelProperty("现住-市")@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)private String presentCity;@ApiModelProperty("现住-区")@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)private String presentRegion;@ApiModelProperty("现住-地址")@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)private String presentAddress;@ApiModelProperty("头像")@EsField(type = FieldType.TEXT)private String head;@ApiModelProperty("使用状态(初始化:init;正常:normal,禁用:ban)")@EsField(type = FieldType.KEYWORD)private String useStatus;@ApiModelProperty("审核状态 (待审核:await;通过:pass,驳回:reject)")@EsField(type = FieldType.KEYWORD)private String auditStatus;@ApiModelProperty("驳回理由")@EsField(type = FieldType.TEXT)private String rejectReason;@ApiModelProperty("注册时间")@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")@EsField(type = FieldType.DATE)private LocalDateTime regDate;@ApiModelProperty("介绍-内容")@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)private String introContent;@ApiModelProperty("介绍-视频")@EsField(type = FieldType.TEXT)private String introVideo;@ApiModelProperty("介绍-标签")@EsField(type = FieldType.TEXT)private String introLabel;@ApiModelProperty("商家标识")@EsField(type = FieldType.LONG)private Long merchantId;@ApiModelProperty("组织标识")@EsField(type = FieldType.LONG)private Long orgId;@ApiModelProperty("护理员申请sku,多个逗号隔开")@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.COMMA)private String skuId;@ApiModelProperty("护理员排班数据,多个逗号隔开")@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.COMMA)private String schedule;/*** 逻辑删除*/@EsField(type = FieldType.INTEGER)private Integer del;/*** 创建人*/@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)private String createBy;/*** 创建时间*/@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")@EsField(type = FieldType.DATE)private LocalDateTime createTime;/*** 更新者*/@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)private String updateBy;/*** 更新时间*/@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")@EsField(type = FieldType.DATE)private LocalDateTime updateTime;}
  1. ShopServiceServer canal转换的实体类,@CanalModel 的参数database是对应的数据库,table是对应数据库下的数据表
import cn.throwx.canal.gule.annotation.CanalModel;
import cn.throwx.canal.gulemon.FieldNamingPolicy;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;import java.time.LocalDateTime;/*** @author gideon*/
@Data
@CanalModel(database = "mp_biz_service", table = "shop_service_server", fieldNamingPolicy = FieldNamingPolicy.LOWER_UNDERSCORE)
public class ShopServiceServerBO {@ApiModelProperty("ID")private Long id;@ApiModelProperty("产品标识")private Long productId;@ApiModelProperty("服务者用户标识")private Long serverUserId;@ApiModelProperty("产品sku标识")private Long productSkuId;@ApiModelProperty(value = "盈利")private Integer profit;@ApiModelProperty("商家标识")private Long merchantId;@ApiModelProperty("组织标识")private Long orgId;/*** 逻辑删除*/private Integer del;/*** 创建人*/private String createBy;/*** 创建时间*/private LocalDateTime createTime;/*** 更新者*/private String updateBy;/*** 更新时间*/private LocalDateTime updateTime;
}
  1. 至此canal的基础代码就完成。

消费MQ订阅的canal信息,进行elasticsearch的同步以及搜索

  1. 监听我们上面canal-topic订阅的消息然后进行同步数据CanalListener
import cn.throwx.canal.gule.CanalGlue;
import com.onecode.dtg.basic.RocketMqConstant;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** canal消费数据库操作日志mq** @author gideon*/
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(topic = RocketMqConstant.CANAL_TOPIC, consumerGroup = RocketMqConstant.CANAL_TOPIC)
public class CanalListener implements RocketMQListener<String> {private final CanalGlue canalGlue;@Overridepublic void onMessage(String message) {canalGlue.process(message);}
}
  1. 对我们需要监听的表进行处理ServerCanalListener,这里面的hutool是一个工具类有需要的可以自行引入
<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.6</version>
</dependency>
import cn.hutool.json.JSONUtil;
import cn.throwx.canal.gule.model.CanalBinLogEvent;
import cn.throwx.canal.gule.model.CanalBinLogResult;
import cn.throwx.canal.gule.support.processor.BaseCanalBinlogEventProcessor;
import cn.throwx.canal.gule.support.processor.ExceptionHandler;
import com.onecode.dtg.basicmon.core.exception.CommonBizException;
import com.onecode.dtg.basicmon.enums.ResultCode;
import com.onecode.middle.search.service.bo.ServerBO;
import com.onecode.middle.search.service.constant.EsIndexEnum;
import com.onecode.middle.search.service.util.EsIndexCreateService;
import com.onecode.service.feign.ServerFeignClient;
import com.onecode.service.feign.bo.EsServerBO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearchmon.xcontent.XContentType;
import org.springframework.stereotype.Component;import java.io.IOException;/*** @author gideon*/
@Slf4j
@Component
@RequiredArgsConstructor
public class ServerCanalListener extends BaseCanalBinlogEventProcessor<ServerBO> {private final EsIndexCreateService esIndexCreateService;private final ServerFeignClient serverFeignClient;private final RestHighLevelClient restHighLevelClient;/*** 插入护理员,此时插入es*/@Overrideprotected void processInsertInternal(CanalBinLogResult<ServerBO> result) {Long serverId = result.getPrimaryKey();EsServerBO esServerBO = serverFeignClient.loadEsServerBO(serverId);if (esServerBO == null) {throw new CommonBizException(ResultCode.FAIL.getModel(), "创建索引异常");}//        创建索引boolean index = esIndexCreateService.createIndex(EsIndexEnum.SERVER.value(), ServerBO.class, true);if (!index) {throw new CommonBizException(ResultCode.FAIL.getModel(), "创建索引:" + EsIndexEnum.SERVER.value() + "失败。");}IndexRequest request = new IndexRequest(EsIndexEnum.SERVER.value());request.id(String.valueOf(serverId));request.source(JSONUtil.toJsonStr(esServerBO), XContentType.JSON);try {IndexResponse indexResponse = restHighLevelClient.index(request, RequestOptions.DEFAULT);log.info(indexResponse.toString());} catch (IOException e) {log.error(e.toString());throw new CommonBizException(ResultCode.FAIL.getModel(), "保存es信息异常:" + e);}}/*** 更新护理员,删除护理员索引,再重新构建一个*/@Overrideprotected void processUpdateInternal(CanalBinLogResult<ServerBO> result) {Long spuId = result.getPrimaryKey();EsServerBO esServerBO = serverFeignClient.loadEsServerBO(spuId);String source = JSONUtil.toJsonStr(esServerBO);//        创建索引boolean index = esIndexCreateService.createIndex(EsIndexEnum.SERVER.value(), ServerBO.class, true);if (!index) {throw new CommonBizException(ResultCode.FAIL.getModel(), "创建索引:" + EsIndexEnum.SERVER.value() + "失败。");}UpdateRequest request = new UpdateRequest(EsIndexEnum.SERVER.value(), String.valueOf(spuId));request.doc(source, XContentType.JSON);request.docAsUpsert(true);try {UpdateResponse updateResponse = restHighLevelClient.update(request, RequestOptions.DEFAULT);log.info(updateResponse.toString());} catch (IOException e) {log.error(e.toString());throw new CommonBizException(ResultCode.FAIL.getModel(), "删除es信息异常:" + e);}}@Overrideprotected ExceptionHandler exceptionHandler() {return (CanalBinLogEvent event, Throwable throwable) -> {throw new CommonBizException(ResultCode.FAIL.getModel(), "创建索引异常:" + throwable);};}}
  1. 这个表的监听,是因为我的业务需求,shop_service_server表增加或者删除的时候需要将skuId加到server表的skuId字段里面去,所以需要监听修改。ShopServiceServerCanalListener
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import cn.throwx.canal.gule.model.CanalBinLogResult;
import cn.throwx.canal.gule.support.processor.BaseCanalBinlogEventProcessor;
import com.onecode.dtg.basicmon.core.exception.CommonBizException;
import com.onecode.dtg.basicmon.enums.ResultCode;
import com.onecode.middle.search.service.bo.ShopServiceServerBO;
import com.onecode.middle.search.service.manager.ServerUpdateManager;
import com.onecode.service.feign.ServerFeignClient;
import com.onecode.service.feign.bo.EsServerBO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import java.util.List;/*** @author gideon*/
@Slf4j
@Component
@RequiredArgsConstructor
public class ShopServiceServerCanalListener extends BaseCanalBinlogEventProcessor<ShopServiceServerBO> {private final ServerFeignClient serverFeignClient;private final ServerUpdateManager serverUpdateManager;/*** 新增商品服务者数据** @param result result*/@Overrideprotected void processInsertInternal(CanalBinLogResult<ShopServiceServerBO> result) {//数据库操作后的数据ShopServiceServerBO afterData = result.getAfterData();EsServerBO loadServerBO = loadServerBO(afterData.getServerUserId());List<String> skuIdList = StrUtil.split(loadServerBO.getSkuId(), ",");skuIdList.add(afterData.getProductSkuId().toString());EsServerBO esServerBO = new EsServerBO();esServerBO.setSkuId(StrUtil.join(",", skuIdList));serverUpdateManager.esUpdateServerByServerId(loadServerBO.getId(), esServerBO);}/*** 更新商品服务者数据** @param result result*/@Overrideprotected void processUpdateInternal(CanalBinLogResult<ShopServiceServerBO> result) {//数据库执行操作后的数据ShopServiceServerBO afterData = result.getAfterData();//del字段是我的表是否逻辑删除的判断,大家根据自己需要去掉if ("1".equals(afterData.getDel())) {return;}//微服务项目调用接口查询数据EsServerBO loadEsServerBO = loadServerBO(afterData.getServerUserId());//处理修改后的数据EsServerBO esServerBO = dealWithData(afterData, loadEsServerBO);serverUpdateManager.esUpdateServerByServerId(loadEsServerBO.getId(), esServerBO);}/*** 删除商品服务者数据** @param result result*/@Overrideprotected void processDeleteInternal(CanalBinLogResult<ShopServiceServerBO> result) {//数据库操作前的数据ShopServiceServerBO beforeData = result.getBeforeData();EsServerBO loadServerBO = loadServerBO(beforeData.getServerUserId());EsServerBO esServerBO = dealWithData(beforeData, loadServerBO);serverUpdateManager.esUpdateServerByServerId(loadServerBO.getId(), esServerBO);}/*** 处理数据** @param data 数据库操作数据* @return EsServerBO*/private EsServerBO dealWithData(ShopServiceServerBO data, EsServerBO loadEsServerBO) {List<String> skuIdList = StrUtil.split(loadEsServerBO.getSkuId(), ",");CollUtil.removeAny(skuIdList, data.getProductSkuId().toString());EsServerBO esServerBO = new EsServerBO();esServerBO.setSkuId(StrUtil.join(",", skuIdList));return esServerBO;}/*** 获取护理员书信息** @param serverUserId 护理员用户标识* @return EsServerBO*/private EsServerBO loadServerBO(Long serverUserId) {EsServerBO loadEsServerBO = serverFeignClient.loadEsServerBoByServerUserId(serverUserId);if (loadEsServerBO == null) {throw new CommonBizException(ResultCode.FAIL.getModel(),"es数据同步失败:无法通过护工用户标识:" + serverUserId + "找到护理员信息。");}return loadEsServerBO;}
}
  1. ServerUpdateManager,这个是ShopServiceServerListener的处理实现类
import cn.hutool.json.JSONUtil;
import com.onecode.dtg.basicmon.core.exception.CommonBizException;
import com.onecode.dtg.basicmon.enums.ResultCode;
import com.onecode.middle.search.service.constant.EsIndexEnum;
import com.onecode.service.feign.bo.EsServerBO;
import lombok.RequiredArgsConstructor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearchmon.xcontent.XContentType;
import org.springframework.stereotype.Component;/*** @author gideon*/
@Component
@RequiredArgsConstructor
public class ServerUpdateManager {private final RestHighLevelClient restHighLevelClient;/*** 批量更新es中的商品信息** @param serverId      护理员标识* @param esServerBO 更新的数据*/public void esUpdateServerByServerId(Long serverId, EsServerBO esServerBO) {String source = JSONUtil.toJsonStr(esServerBO);try {BulkRequest request = new BulkRequest();// 准备更新的数据request.add(new UpdateRequest(EsIndexEnum.SERVER.value(), String.valueOf(serverId)).doc(source, XContentType.JSON));//更新BulkResponse bulkResponse = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);if (bulkResponse.hasFailures()) {throw new CommonBizException(ResultCode.FAIL.getModel(), bulkResponse.buildFailureMessage());}} catch (Exception e) {throw new CommonBizException(ResultCode.FAIL.getModel(), e.getMessage());}}
}
  1. ServerSearchManager是搜索接口实现
import cn.hutool.json.JSONUtil;
import com.onecode.dtg.basicmon.core.exception.CommonBizException;
import com.onecode.dtg.basicmon.enums.ResultCode;
import com.onecode.dtg.basicmon.util.ColumnUtil;
import com.onecode.dtg.basicmon.util.LocalDateUtil;
import com.onecode.middle.search.service.bo.ServerBO;
import com.onecode.middle.search.service.constant.EsIndexEnum;
import com.onecode.middle.search.service.dto.ServerSearchDTO;
import com.onecode.middle.search.service.vo.EsPageVO;
import com.onecode.middle.search.service.vo.search.EsServerVO;
import com.onecode.service.feign.constant.AuditStatus;
import com.onecode.service.feign.constant.UseStatus;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.Operator;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;/*** @author gideon*/
@Slf4j
@Component
@RequiredArgsConstructor
public class ServerSearchManager {private final RestHighLevelClient restHighLevelClient;/*** 通过搜索信息分页搜索es数据的信息** @param serverSearchDTO 护理员搜索条件* @return 搜索结果*/public EsPageVO<EsServerVO> pageSearchResult(ServerSearchDTO serverSearchDTO) {//1、动态构建出查询需要的DSL语句EsPageVO<EsServerVO> result;//1、准备检索请求SearchRequest searchRequest = buildSearchRequest(serverSearchDTO);try {//2、执行检索请求SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);log.info("搜索返回结果:" + response.toString());//3、分析响应数据,封装成我们需要的格式result = buildSearchResult(serverSearchDTO, response);} catch (IOException e) {log.error(e.toString());throw new CommonBizException(ResultCode.FAIL.getModel(), "搜索服务出了点小差,请稍后再试:" + e);}return result;}/*** 构建结果数据*/private EsPageVO<EsServerVO> buildSearchResult(ServerSearchDTO dto, SearchResponse response) {EsPageVO<EsServerVO> esPageVO = new EsPageVO<>();//1、返回的所有查询到的商品SearchHits hits = response.getHits();List<EsServerVO> productSearchs = getEsOrderBOList(response);esPageVO.setList(productSearchs);//===============分页信息====================////总记录数long total = hits.getTotalHits().value;esPageVO.setTotal(total);// 总页码int totalPages = (int) total % dto.getPageSize() == 0 ?(int) total / dto.getPageSize() : ((int) total / dto.getPageSize() + 1);esPageVO.setPages(totalPages);return esPageVO;}private List<EsServerVO> getEsOrderBOList(SearchResponse response) {return getOrderListByResponse(response.getHits().getHits());}/*** 从es返回的数据中获取spu列表** @param hits es返回的数据* @return*/private List<EsServerVO> getOrderListByResponse(SearchHit[] hits) {List<EsServerVO> esOrders = new ArrayList<>();for (SearchHit hit : hits) {EsServerVO esOrder = JSONUtil.toBean(hit.getSourceAsString(), EsServerVO.class);esOrders.add(esOrder);}return esOrders;}/*** 准备检索请求** @param param 搜索参数* @return*/private SearchRequest buildSearchRequest(ServerSearchDTO param) {SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();// 构建bool-queryBoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();// 过滤filterQueryIfNecessary(param, boolQueryBuilder);// 关键字搜索keywordSearch(param, boolQueryBuilder);// 排序sort(searchSourceBuilder, boolQueryBuilder);//分页searchSourceBuilder.from((param.getPageNum() - 1) * param.getPageSize());searchSourceBuilder.size(param.getPageSize());log.info("构建的DSL语句 {}", searchSourceBuilder);return new SearchRequest(new String[]{EsIndexEnum.SERVER.value()}, searchSourceBuilder);}/*** 关键字搜索*/private void keywordSearch(ServerSearchDTO param, BoolQueryBuilder boolQueryBuilder) {BoolQueryBuilder keywordShouldQuery = QueryBuilders.boolQuery();
//        现住-省if (Objects.nonNull(param.getPresentProvince())) {keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getPresentProvince), param.getPresentProvince()).operator(Operator.AND));}
//        现住-市if (Objects.nonNull(param.getPresentCity())) {keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getPresentCity), param.getPresentCity()).operator(Operator.AND));}
//        现住-区if (Objects.nonNull(param.getPresentRegion())) {keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getPresentRegion), param.getPresentRegion()).operator(Operator.AND));}
//        户籍-省if (Objects.nonNull(param.getIdCardProvince())) {keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getIdCardProvince), param.getIdCardProvince()).operator(Operator.AND));}
//        户籍-市if (Objects.nonNull(param.getIdCardCity())) {keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getIdCardCity), param.getIdCardCity()).operator(Operator.AND));}
//        户籍-区if (Objects.nonNull(param.getIdCardRegion())) {keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getIdCardRegion), param.getIdCardRegion()).operator(Operator.AND));}//        标签if (Objects.nonNull(param.getIntroLabels())) {keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getIntroLabel), param.getIntroLabels()).operator(Operator.AND));}
//        排班,使用了我的自定义逗号分词器,所以是根据逗号分隔后进行匹配的,但是需要多个匹配,我就使用了for循环,应该是有优化的地方,暂时没处理if (param.getServiceStartDate() != null && param.getServiceEndDate() != null) {List<String> scheduleList = LocalDateUtil.getContinuousTime(param.getServiceStartDate(), param.getServiceEndDate(), DateTimeFormatter.ofPattern("yyyyMMdd"));for (String schedule : scheduleList) {keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getSchedule), schedule).operator(Operator.AND));}}
//        skuId,使用了我的自定义逗号分词器,所以是根据逗号分隔后进行匹配的if (Objects.nonNull(param.getSkuId())) {keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getSkuId), param.getSkuId()).operator(Operator.AND));}boolQueryBuilder.must(keywordShouldQuery);}/*** 进行排序*/private void sort(SearchSourceBuilder searchSourceBuilder, BoolQueryBuilder boolQueryBuilder) {searchSourceBuilder.sort(ColumnUtil.getName(ServerBO::getCreateTime), SortOrder.DESC);searchSourceBuilder.query(boolQueryBuilder);}/*** 过滤查询条件,如果有必要的话** @param param            查询条件* @param boolQueryBuilder 组合进boolQueryBuilder*/private void filterQueryIfNecessary(ServerSearchDTO param, BoolQueryBuilder boolQueryBuilder) {
//        类型if (Objects.nonNull(param.getType())) {boolQueryBuilder.filter(QueryBuilders.termQuery(ColumnUtil.getName(ServerBO::getType), param.getType()));}
//        性别if (Objects.nonNull(param.getGender())) {boolQueryBuilder.filter(QueryBuilders.termQuery(ColumnUtil.getName(ServerBO::getGender), param.getGender()));}
//        学历if (Objects.nonNull(param.getEducation())) {boolQueryBuilder.filter(QueryBuilders.termQuery(ColumnUtil.getName(ServerBO::getEducation), param.getEducation()));}boolQueryBuilder.filter(QueryBuilders.termQuery(ColumnUtil.getName(ServerBO::getAuditStatus), AuditStatus.PASS.getStatus()));boolQueryBuilder.filter(QueryBuilders.termQuery(ColumnUtil.getName(ServerBO::getUseStatus), UseStatus.NORMAL.getStatus()));boolQueryBuilder.filter(QueryBuilders.termQuery(ColumnUtil.getName(ServerBO::getDel), 0));}
}
  1. ServerSearchController
import com.onecode.dtg.basicmon.model.ResultBean;
import com.onecode.middle.search.service.dto.ServerSearchDTO;
import com.onecode.middle.search.service.manager.ServerSearchManager;
import com.onecode.middle.search.service.vo.EsPageVO;
import com.onecode.middle.search.service.vo.search.EsServerVO;
import io.swagger.annotations.Api;
import lombok.AllArgsConstructor;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;/*** @author gideon* @date 2022/9/6*/
@Validated
@AllArgsConstructor
@RestController
@RequestMapping("/search/server/")
@Api(tags = "api-服务者搜索接口")
public class ServerSearchController {private final ServerSearchManager serverSearchManager;@PostMapping("/page")public ResultBean<EsPageVO<EsServerVO>> page(@RequestBody ServerSearchDTO dto) {return new ResultBean<>(serverSearchManager.pageSearchResult(dto));}}
  1. 分页参数EsPageDTO实体类
import com.onecode.dtg.basicmon.util.PrincipalUtil;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;import javax.validation.constraints.NotNull;
import java.util.Arrays;/*** @author gideon*/
@Data
public class EsPageDTO {public static final String ASC = "ASC";public static final String DESC = "DESC";/*** 最大分页大小,如果分页大小大于500,则用500作为分页的大小。防止有人直接传入一个较大的数,导致服务器内存溢出宕机*/public static final Integer MAX_PAGE_SIZE = 500;/*** 当前页*/@NotNull(message = "pageNum 不能为空")@ApiModelProperty(value = "当前页", required = true)private Integer pageNum;@NotNull(message = "pageSize 不能为空")@ApiModelProperty(value = "每页大小", required = true)private Integer pageSize;@ApiModelProperty(value = "排序字段数组,用逗号分割")private String[] columns;@ApiModelProperty(value = "排序字段方式,用逗号分割,ASC正序,DESC倒序")private String[] orders;public Integer getPageNum() {return pageNum;}public void setPageNum(Integer pageNum) {this.pageNum = pageNum;}public Integer getPageSize() {return pageSize;}public void setPageSize(Integer pageSize) {if (pageSize > MAX_PAGE_SIZE) {this.pageSize = MAX_PAGE_SIZE;return;}this.pageSize = pageSize;}public String getOrderBy() {return order(this.columns, this.orders);}public String[] getColumns() {return columns;}public void setColumns(String[] columns) {this.columns = columns;}public String[] getOrders() {return orders;}public void setOrders(String[] orders) {this.orders = orders;}public static String order(String[] columns, String[] orders) {if (columns == null || columns.length == 0) {return "";}StringBuilder stringBuilder = new StringBuilder();for (int x = 0; x < columns.length; x++) {String column = columns[x];String order;if (orders != null && orders.length > x) {order = orders[x].toUpperCase();if (!(order.equals(ASC) || order.equals(DESC))) {throw new IllegalArgumentException("非法的排序策略:" + column);}} else {order = ASC;}// 判断列名称的合法性,防止SQL注入。只能是【字母,数字,下划线】if (PrincipalUtil.isField(column)) {throw new IllegalArgumentException("非法的排序字段名称:" + column);}// 驼峰转换为下划线column = humpConversionUnderscore(column);if (x != 0) {stringBuilder.append(", ");}stringBuilder.append("`").append(column).append("` ").append(order);}return stringBuilder.toString();}public static String humpConversionUnderscore(String value) {StringBuilder stringBuilder = new StringBuilder();char[] chars = value.toCharArray();for (char character : chars) {if (Character.isUpperCase(character)) {stringBuilder.append("_");character = Character.toLowerCase(character);}stringBuilder.append(character);}return stringBuilder.toString();}@Overridepublic String toString() {return "EsPageDTO{" +"pageNum=" + pageNum +", pageSize=" + pageSize +", columns=" + Arrays.toString(columns) +", orders=" + Arrays.toString(orders) +'}';}
}
  1. 查询参数实体类ServerSearchDTO
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;import javax.validation.constraints.NotNull;
import java.time.LocalDate;/*** @author gideon*/
@EqualsAndHashCode(callSuper = true)
@Data
public class ServerSearchDTO extends EsPageDTO{@ApiModelProperty("类型")private String type;@ApiModelProperty("值为1时是男性,值为2时是女性,值为0时是未知")private String gender;@ApiModelProperty("学历")private String education;@ApiModelProperty("户籍-省")private String idCardProvince;@ApiModelProperty("户籍-市")private String idCardCity;@ApiModelProperty("户籍-区")private String idCardRegion;@ApiModelProperty("现住-省")private String presentProvince;@ApiModelProperty("现住-市")private String presentCity;@ApiModelProperty("现住-区")private String presentRegion;@ApiModelProperty("介绍-标签(多个值需要使用逗号分割)")private String introLabels;@ApiModelProperty("服务开始时间")@NotNull(message = "服务开始时间不能为空")private LocalDate serviceStartDate;@ApiModelProperty("服务结束时间")@NotNull(message = "服务结束时间不能为空")private LocalDate serviceEndDate;@ApiModelProperty("skuId")@NotNull(message = "skuId不能为空。")private Long skuId;}
  1. 返回值EsServerVO参数
/*** @author gideon* @date 2022/9/5*/
@Data
public class EsServerVO {@ApiModelProperty("id")private Long id;@ApiModelProperty("用户标识")private Long userId;@ApiModelProperty("类型")private String type;@ApiModelProperty("姓名")private String name;@ApiModelProperty("值为1时是男性,值为2时是女性,值为0时是未知")private String gender;@ApiModelProperty("出生年月日")@JsonFormat(pattern = "yyyy-MM-dd")private LocalDate birthday;@ApiModelProperty("学历")private String education;@ApiModelProperty("从业时间")@JsonFormat(pattern = "yyyy-MM-dd")private LocalDate practiceDate;@ApiModelProperty("评级")private String level;@ApiModelProperty("认证标签")private String authLabel;@ApiModelProperty("勋章(逗号隔开)")private String medal;@ApiModelProperty("服务评分")private Integer serviceScore;@ApiModelProperty("已实名认证")private Integer realNameAuth;@ApiModelProperty("身份证号")private String idCardNo;@ApiModelProperty("户籍-省")private String idCardProvince;@ApiModelProperty("户籍-市")private String idCardCity;@ApiModelProperty("户籍-区")private String idCardRegion;@ApiModelProperty("手机号")private String phone;@ApiModelProperty("现住-省")private String presentProvince;@ApiModelProperty("现住-市")private String presentCity;@ApiModelProperty("现住-区")private String presentRegion;@ApiModelProperty("现住-地址")private String presentAddress;@ApiModelProperty("头像")private String head;@ApiModelProperty("使用状态(初始化:init;正常:normal,禁用:ban)")private String useStatus;@ApiModelProperty("审核状态 (待审核:await;通过:pass,驳回:reject)")private String auditStatus;@ApiModelProperty("驳回理由")private String rejectReason;@ApiModelProperty("注册时间")@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime regDate;@ApiModelProperty("介绍-内容")private String introContent;@ApiModelProperty("介绍-视频")private String introVideo;@ApiModelProperty("介绍-标签")private String introLabel;@ApiModelProperty("商家标识")private Long merchantId;@ApiModelProperty("组织标识")private Long orgId;@ApiModelProperty("护理员申请sku,多个逗号隔开")private String skuId;@ApiModelProperty("护理员排班数据,多个逗号隔开")private String schedule;/*** 逻辑删除*/private Integer del;/*** 创建人*/private String createBy;/*** 创建时间*/@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime createTime;/*** 更新者*/private String updateBy;/*** 更新时间*/@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime updateTime;
}
  1. 分页返回值EsPageVO
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;import java.util.List;/*** @author gideon* @date 2022/9/5*/
@Data
public class EsPageVO<T> {@ApiModelProperty("总页数")private Integer pages;@ApiModelProperty("总条目数")private Long total;@ApiModelProperty("结果集")private List<T> list;
}