Yujun's Blog
ElasticSearch(二):分库分表下的准实时搜索
ElasticSearch(二):分库分表下的准实时搜索🚀
在上一篇我们学习了如何在单体MySQL环境下集成Elasticsearch构建基本的商品搜索功能。然而,在真实的C端(面向消费者)场景中,我们经常会采用分库分表的方案来承载巨大的数据流量和存储压力。例如,我们会以用户ID为切分键,将用户的抽奖订单、签到记录等行为数据分散到各个物理库表中。
这样做极大地提升了C端用户服务的性能和可扩展性。但是,新的挑战也随之而来:运营团队需要对全量数据进行分析、统计和查询,以制定营销策略、监控活动效果。分库分表后,数据散落在各个库中,传统的聚合查询变得异常复杂和低效。我们总不能轮询每一个分库分表去捞数据吧,那样效率就很慢了。😫
为了解决这个问题,我们需要一套能够将这些分散数据重新聚合的方案。而Elasticsearch凭借其强大的聚合分析和全文检索能力,成为了理想的数据归宿。关键在于,如何高效、准实时地将分库分表MySQL中的数据同步到Elasticsearch呢?
就是引入变更数据捕获 (Change Data Capture, CDC) 技术,而 Canal 正是阿里巴巴开源的一款优秀的基于MySQL数据库binlog的增量订阅与消费组件。更进一步,我们将利用 Canal Adapter 来简化数据同步到Elasticsearch的过程。
本篇博客,我们将深入探讨如何在一个营销平台的背景下,构建一个 分库分表MySQL -> Canal Server -> Canal Adapter -> Elasticsearch 的数据同步链路,实现准实时的运营数据查询和分析能力。
你将从本文学到:
- C端场景下分库分表(按用户ID切分)的典型应用及其带来的数据聚合难题。
 - Canal的核心工作原理及其在CDC中的关键角色。
 - Canal Adapter如何简化将binlog变更同步到Elasticsearch的过程。
 - 针对营销平台特定库表(如
raffle_activity_order,user_raffle_order)的Canal及Adapter配置实战。 - 如何在Elasticsearch中为同步过来的数据创建合适的索引和Mapping。
 - 通过Kibana进行数据验证和索引模式管理。
 - 如何通过实际业务操作(如执行抽奖、签到)来触发数据同步并验证其效果。
 
🏛️ 组件介绍与整体架构
Canal:数据的“搬运工”与“翻译官”
Canal,译为水道/管道/沟渠,顾名思义,它的主要用途就是基于MySQL数据库的增量日志(binlog)进行解析,提供增量数据的订阅和消费服务。
早在阿里巴巴初期,由于杭州和美国双机房部署的需求,跨机房数据同步主要依赖于业务层面的trigger来获取增量变更。从2010年开始,阿里逐步转向通过解析数据库日志来获取增量变更,从而催生了大量的数据库增量订阅和消费业务,Canal便是这一背景下的杰出产物。
它的工作原理其实很巧妙:
Canal会模拟一个MySQL的从库(slave)的交互协议,向MySQL主库(master)发送dump协议请求。当MySQL主库收到这个请求后,便会开始将二进制日志(binary log)推送给这个“伪装”的从库,也就是Canal Server。Canal Server接收到binlog后,会对其进行解析,提取出结构化的数据变更事件(INSERT, UPDATE, DELETE),然后根据配置将这些变更分发出去。
Canal Adapter:直达ES的“特快专递”
虽然Canal Server负责捕获和解析binlog,但如何将这些变更数据高效、正确地写入目标存储(如Elasticsearch)呢?这时,Canal Adapter就派上用场了。Canal Adapter是Canal生态中的一个重要组件,它可以消费Canal Server解析后的数据,并根据预设的适配器(Adapter)配置,将数据写入到各种目标数据源,包括Elasticsearch、HBase、Kafka、RocketMQ等。
对于我们的场景,我们将使用Canal Adapter的ES适配器,它可以直接将数据变更同步到Elasticsearch中,省去了我们自己编写复杂消费端逻辑的麻烦。
整体架构图
+---------------------+     +---------------------+     +-----------------------+     +-------------------+
|                     |     |                     |     |                       |     |                   |
|  MySQL分片集群      | --> |    Canal Server     | --> |     Canal Adapter     | --> |   Elasticsearch   |
| (big_market_01,    |     | (订阅Binlog)        |     | (ES适配器, 同步数据)  |     | (统一索引)        |
|  big_market_02)     |     |                     |     |                       |     |                   |
+---------------------+     +---------------------+     +-----------------------+     +-------------------+
        ^                                                                                       |
        |                                                                                       |
        |  (应用层通过分库分表中间件写入/读取 - 用户行为)                                           | (运营侧查询/分析)
        +---------------------------------------------------------------------------------------+
以上数据流转的核心逻辑:
- 用户在营销平台进行抽奖、签到等操作,数据通过分库分表规则写入到例如 
big_market_01.raffle_activity_order_00或big_market_02.user_raffle_order_01等具体的MySQL分片表中。 - MySQL产生binlog记录这些数据变更。
 - Canal Server订阅这些MySQL实例的binlog,解析出结构化的变更事件。
 - Canal Adapter连接到Canal Server,获取这些变更事件。
 - Canal Adapter根据其ES适配器的配置(比如哪个库的哪个表对应ES的哪个索引,字段如何映射等),将数据自动同步到Elasticsearch中。
 - 运营人员通过Kibana或内部数据平台查询Elasticsearch,获取聚合后的全量数据进行分析。
 
🎯 功能预期
本次实战,我们将重点选大营销平台的 big_market_01 和 big_market_02 两个分库中的 raffle_activity_order 和 user_raffle_order 相关分片表(假设它们是按某种规则分片的,例如 raffle_activity_order_xx)进行Elasticsearch同步配置。通过学习这个过程,你可以触类旁通,为项目中其他的库表配置同步。
🛠️ 环境安装与配置
环境脚本说明: 假设我们已经准备好了包含Canal、Canal Adapter、Kibana、Logstash(如果用到)以及Docker Compose 脚本文件的工程环境。我们这里重点关注与数据同步相关的配置。
1. MySQL侧准备
- 
开启Binlog: 确保你的所有MySQL分片实例(例如
big_market_01和big_market_02所在的MySQL服务)都已经开启了binlog,并且binlog格式为ROW。 在MySQL的配置文件 (my.cnf或my.ini) 中检查或添加:[mysqld] log-bin=mysql-bin binlog_format=ROW server-id=XXX # 每个MySQL实例的server-id必须全局唯一重启MySQL使配置生效。
 - 
创建Canal连接账户: 为Canal创建一个专用的MySQL账户,并授予必要的复制权限 (REPLICATION SLAVE, REPLICATION CLIENT)。
CREATE USER 'canal'@'%' IDENTIFIED BY 'your_canal_password'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES; 
2. Canal Server 配置
Canal Server需要知道它应该去监听哪些MySQL实例。这通常在 canal.properties 和各个 destination 的 instance.properties 中配置。
- 
conf/canal.properties: 确保canal.serverMode = tcp(因为Adapter会通过TCP连接Server)。canal.port = 11111 canal.serverMode = tcp # ... 其他配置 - 
conf/<destination_name>/instance.properties: 我们需要为每个要监听的MySQL实例(不是每个库,如果多个库在同一个MySQL实例上,一个destination就够了,通过filter来区分库表)配置一个destination。 例如,如果big_market_01和big_market_02是两个不同的MySQL实例,你需要两个destination。如果它们在同一个MySQL实例上,但你想分开管理或过滤,也可以创建两个。假设为
big_market_01创建conf/bm_db01/instance.properties:canal.instance.master.address = host_of_big_market_01_mysql:3306 canal.instance.dbUsername = canal canal.instance.dbPassword = your_canal_password # 监听big_market_01库下所有raffle_activity_order_xx 和 user_raffle_order_xx 表 canal.instance.filter.regex = big_market_01\\.raffle_activity_order_\\d+,big_market_01\\.user_raffle_order_\\d+ # 如果表名没有规律的后缀,而是完全不同的表名,可以用逗号分隔 # canal.instance.filter.regex = big_market_01\\.tableA,big_market_01\\.tableB类似地,为
big_market_02创建conf/bm_db02/instance.properties。 
3. Canal Adapter 配置 (核心!)
Canal Adapter负责将从Canal Server获取的数据同步到Elasticsearch。它的配置主要涉及:
- 
conf/application.yml(Adapter的主配置文件): 这里需要配置Adapter要连接的Canal Server地址、要处理的destination,以及全局的ES连接信息。canalAdapters: - instance: bm_db01 # 对应Canal Server中的destination名称 # sourceConnector: # Canal Server连接信息,如果Adapter和Server在同一部署,很多可以默认 # type: canal # 或 rds ... # properties: # canal.server.host: localhost # Canal Server的主机,如果是Docker网络,可能是canal-server容器名 # canal.server.port: 11111 # canal.destination: bm_db01 # canal.user: "" # canal.passwd: "" groups: - groupId: g1 outerAdapters: # 配置外部适配器,即目标数据源 - name: es7 # 指定使用es7的适配器 (确保你的Adapter版本支持es7) mode: sync # 同步模式 properties: # ES连接信息 elasticsearch.hosts: elasticsearch:9200 # Docker网络中的ES服务名和端口 # elasticsearch.username: # 如果ES有认证 # elasticsearch.password: # elasticsearch.cluster.name: # 如果有集群名 - instance: bm_db02 # 第二个destination的配置,类似bm_db01 # ... groups: - groupId: g1 outerAdapters: - name: es7 mode: sync properties: elasticsearch.hosts: elasticsearch:9200 # ... 其他Adapter全局配置 - 
库表同步映射配置 (YML文件): 根据你的博文描述,需要在Canal Adapter的
conf/es7/目录下(或者你指定的ES适配器对应的配置目录)为每个需要同步的逻辑表(即使它在MySQL中是分片表)创建一个YML配置文件。这个YML文件定义了从MySQL表到ES索引的映射关系、主键、字段映射等。例如,为
raffle_activity_order(逻辑表名,它可能对应MySQL中的big_market_01.raffle_activity_order_xx和big_market_02.raffle_activity_order_yy) 创建conf/es7/big_market_raffle_activity_order.yml:# 数据源信息 (Adapter会根据这个信息匹配Canal Server传来的数据) dataSourceKey: defaultDS # 默认,或与application.yml中配置的对应 destination: bm_db01 # 这个配置属于哪个destination,或者可以配置为监听所有 groupId: g1 # 逻辑表名,Adapter会用这个名字去匹配Canal Server事件中的表名(可能需要正则或通配符) # 或者,更常见的是,这里的 table 字段直接指定了要同步到ES的索引名, # 而具体的MySQL表名匹配规则在更上层的Adapter配置或通过canal.instance.filter.regex来处理。 # Canal Adapter的YML配置细节较多,具体字段名和结构请参考官方文档。 # 这里是一个简化的概念,实际YML可能更复杂。 # 简化的YML示例,更准确的配置请查阅Canal Adapter文档 # 通常YML文件名会是 database-table.yml 或类似格式,Adapter会加载它们 # 假设YML文件名直接对应ES的索引名,或者在内部指定 # File: conf/es7/big_market_raffle_activity_order.yml elasticsearch: # ES索引名,将所有分片表的raffle_activity_order数据同步到这个统一索引 index: big_market.raffle_activity_order # id对应MySQL表中的主键字段名,用于ES文档的_id # 如果是联合主键,可能需要特殊处理或自定义生成策略 pk: _order_id # 假设MySQL表中的主键是 order_id,同步到ES时字段名为 _order_id # (注意:Adapter的YML配置中,字段名可能不需要前缀 "_" , 它会自动处理或根据配置添加) # mapping: # 可选,如果ES索引的mapping不是自动创建或需要更精细控制 # fields: // - column: order_id // name: _order_id // type: keyword // - column: user_id // name: _user_id // type: keyword // ... 其他字段映射需要为
user_raffle_order也创建一个类似的YML配置文件。 重要:Canal Adapter的YML配置文件格式和具体字段非常关键,强烈建议查阅你所使用的Canal Adapter版本的官方文档来获取准确的配置方法。 上述YML仅为示意。 
4. 运行状态检查
- 确保MySQL、Canal Server、Canal Adapter、Elasticsearch、Kibana都已启动。
 - 使用Portainer或Docker日志命令 (
docker logs <container_name>) 查看各个组件的运行日志,确保没有错误,特别是:- Canal Server是否成功连接到所有MySQL实例并开始拉取binlog。
 - Canal Adapter是否成功连接到Canal Server并获取到
destination的数据。 - Canal Adapter是否成功连接到Elasticsearch。
 - Canal Adapter在处理数据时是否有错误(例如YML配置错误、字段映射问题、ES写入权限问题等)。
 
 
🔮 ES索引创建与数据验证
1. 创建ES索引 (手动或通过Adapter自动创建)
- 
手动创建 (通过cURL或Kibana Dev Tools): 在数据同步开始前,或者如果Adapter配置为不自动创建索引,你需要手动创建ES索引并定义好Mapping。这能让你对字段类型(
text,keyword,date,long等)有更精确的控制,特别是对于需要分词的文本字段和需要精确匹配或聚合的keyword字段。big_market.raffle_activity_order 索引创建 (使用提供的cURL):
curl -X PUT "http://127.0.0.1:9200/big_market.raffle_activity_order" -H 'Content-Type: application/json' -d' { "mappings": { "properties": { "_user_id":{"type": "keyword"}, "_sku":{"type": "keyword"}, "_activity_id":{"type": "keyword"}, "_activity_name":{"type": "text", "analyzer": "ik_smart"}, "_strategy_id":{"type": "keyword"}, "_order_id":{"type": "keyword"}, "_order_time":{"type": "date", "format": "yyyy-MM-dd HH:mm:ss||epoch_millis"}, "_total_count":{"type": "integer"}, "_day_count":{"type": "integer"}, "_month_count":{"type": "integer"}, "_pay_amount":{"type": "double"}, "_state":{"type": "keyword"}, "_out_business_no":{"type": "keyword"}, "_create_time":{"type": "date", "format": "yyyy-MM-dd HH:mm:ss||epoch_millis"}, "_update_time":{"type": "date", "format": "yyyy-MM-dd HH:mm:ss||epoch_millis"} } } }'big_market.user_raffle_order 索引创建 (使用提供的cURL):
curl -X PUT "http://127.0.0.1:9200/big_market.user_raffle_order" -H 'Content-Type: application/json' -d' { "mappings": { "properties": { "_user_id":{"type": "keyword"}, "_activity_id":{"type": "keyword"}, "_activity_name":{"type": "text", "analyzer": "ik_smart"}, "_strategy_id":{"type": "keyword"}, "_order_id":{"type": "keyword"}, "_order_time":{"type": "date", "format": "yyyy-MM-dd HH:mm:ss||epoch_millis"}, "_order_state":{"type": "keyword"}, "_create_time":{"type": "date", "format": "yyyy-MM-dd HH:mm:ss||epoch_millis"}, "_update_time":{"type": "date", "format": "yyyy-MM-dd HH:mm:ss||epoch_millis"} } } }'注意我对Mapping的建议性修改:
_user_id,_activity_id,_order_id等ID类字段通常用keyword类型,因为它们用于精确匹配、过滤或聚合,不需要分词。_activity_name这种可能需要模糊搜索的文本字段用text类型,并指定一个合适的分词器 (如ik_smart或ik_max_word,前提是你的ES已安装IK分词器)。_order_time,_create_time,_update_time等时间字段用date类型,并可以指定format(多种格式用||分隔,epoch_millis表示毫秒时间戳)。- 数值型字段(如count, amount)应使用 
integer,long,double,float等数值类型。 - 状态、类型等枚举性质的字段用 
keyword。 - 你提供的Mapping中所有字段都是 
text,这对于非文本字段(如ID、日期、数字)来说是不合适的,会影响搜索、排序和聚合的准确性和性能。请务必根据字段的实际含义调整类型。 
 - 
Adapter自动创建: 某些版本的Canal Adapter或其ES适配器可能支持根据YML配置或源表结构自动创建索引和基础的Mapping。但这通常不如手动精细控制Mapping来得好。如果依赖自动创建,请务必检查生成的Mapping是否符合你的预期。
 
2. 在Kibana中创建索引模式 (Index Pattern / Data View)
- 打开Kibana (
http://127.0.0.1:5601)。 - 导航到 "Stack Management" -> "Index Patterns" (或 "Data Views")。
 - 点击 "Create index pattern"。
 - 输入索引模式名称:
- 对于 
big_market.raffle_activity_order索引,就输入big_market.raffle_activity_order。 - 对于 
big_market.user_raffle_order索引,就输入big_market.user_raffle_order。 - (或者,如果你想用一个模式匹配多个相关的索引,可以使用通配符,例如 
big_market.*_order。) 
 - 对于 
 - 选择时间字段: 选择一个合适的时间字段,如 
_create_time或_order_time。 - 点击 "Create index pattern"。
 
3. 触发数据变更并验证同步
现在,环境和配置都已就绪,就可以验证我们的数据同步链路了。
- 
执行业务操作: 运行你提供的测试方法(或其他能触发相关表数据变更的业务操作):
// 在你的测试类或服务中执行 @Test public void test_draw_for_es_sync() { // 给测试方法一个更明确的名称 for (int i = 0; i < 10; i++) { // 可以先少量测试,比如1-2次 ActivityDrawRequestDTO request = new ActivityDrawRequestDTO(); request.setActivityId(100301L); // 确保这个活动ID存在且会操作你监听的表 request.setUserId("xiaofuge_es_test_" + i); // 使用不同的用户ID,避免主键冲突 Response<ActivityDrawResponseDTO> response = raffleActivityService.draw(request); log.info("ES同步测试 - 抽奖请求参数:{}", JSON.toJSONString(request)); log.info("ES同步测试 - 抽奖测试结果:{}", JSON.toJSONString(response)); // 可以在这里加个短暂的延时,等待Canal同步 try { Thread.sleep(500); } catch (InterruptedException ignored) {} } } @Test public void test_calendarSignRebate_for_es_sync() throws InterruptedException { String userIdForSign = "xiaofuge_sign_test_01"; Response<Boolean> response = raffleActivityService.calendarSignRebate(userIdForSign); log.info("ES同步测试 - 签到返利用户ID:{}", userIdForSign); log.info("ES同步测试 - 签到返利测试结果:{}", JSON.toJSONString(response)); // new CountDownLatch(1).await(); // 如果是想在测试方法结束前阻塞,可以保留,否则去掉 }确保这些操作会修改你通过Canal监听的表 (例如
raffle_activity_order_xx或user_raffle_order_xx)。 - 
观察日志:
- MySQL的binlog是否产生新条目。
 - Canal Server的对应destination日志是否有新的数据拉取和解析记录。
 - Canal Adapter的日志是否有接收到数据并尝试写入ES的记录。注意是否有错误信息。
 
 - 
在Kibana Discover页面查看数据:
- 导航到 "Discover"页面 (
http://127.0.0.1:5601/app/discover)。 - 选择你刚刚创建的索引模式 (例如 
big_market.raffle_activity_order)。 - 调整时间范围到“最近几分钟”或“今天”。
 - 你应该能看到由 
test_draw()或test_calendarSignRebate()操作产生的新的数据记录已经同步到了Elasticsearch中!🎉 - 尝试搜索这些新记录,例如按 
_user_id或_order_id搜索。 
 - 导航到 "Discover"页面 (
 
💡 关键挑战与思考
- 分片键的处理: Canal Adapter通常需要知道如何从源数据中提取ES文档的 
_id。如果你的分片键(如用户ID)本身不是MySQL表的主键,或者ES的_id需要由多个字段组合而成,你可能需要在Adapter的YML配置中进行更复杂的pk定义或使用脚本进行转换。 - 一致性与延迟: 这是一套准实时同步系统,数据从MySQL到ES可被搜索之间存在一定的延迟。需要监控这个延迟,确保其在可接受范围内。
 - Mapping的重要性: 再次强调,在ES中为字段设置正确的类型(
keyword,text,date,integer等)对于后续的搜索、聚合、排序至关重要。直接使用text类型作为所有字段的默认类型是不可取的。 - Canal Adapter的深入学习: Canal Adapter的配置(特别是YML映射文件)有很多细节和高级功能(如字段过滤、重命名、ETL脚本等),值得深入研究其官方文档。
 - 全量与增量的配合: 对于历史数据的初始化,仍然需要一个全量同步的方案。Canal主要解决的是增量数据的同步。
 
🏁 总结
通过整合营销平台的分库分表MySQL、强大的CDC工具Canal以及便捷的Canal Adapter,我们成功地为运营团队构建了一套能够将分散数据聚合到Elasticsearch,并实现准实时查询和分析的解决方案。解决了分库分表带来的数据聚合难题。
这套架构在处理C端高并发写入和运营侧复杂查询的场景中非常实用。希望本篇实战分享,能帮助你更好地理解和应用这套技术栈。💪