Yujun's Portfolio
生产级智能商品搜索系统
June 10, 2024 (1y ago)
项目亮剑:构建生产级智能商品搜索系统实战 🚀
本项目将深入实战,从零开始构建一个生产级别的智能商品搜索系统。我们将不仅仅停留在理论层面,而是亲手操刀,解决真实世界中会遇到的各种挑战,涵盖从Elasticsearch集群的部署,到中文分词器的深度定制,再到海量数据的高效处理与极致的搜索性能优化。
本项目将实践以下核心技术与经验:
- 坚如磐石:生产环境ES集群部署与运维 🏭
- 告别单机演示,我们将学习如何在多节点环境下规划、部署和配置一个高可用、可扩展的Elasticsearch生产集群。
- 深入探讨节点角色、集群发现、故障转移、监控告警等关键运维知识,确保你的搜索引擎7x24小时稳定运行。
- 独具匠心:IK分词器源码二次开发与Pinyin分词器集成 🔪
- 突破原生IK分词器的局限。我们将亲自动手修改IK源码,实现从数据库动态加载和热更新词典的强大功能,让你的分词器实时掌握网络新词、行业术语。
- 集成Pinyin分词器,完美支持用户通过拼音输入进行商品搜索和智能提示,大幅提升用户体验。
- 双剑合璧:商品主索引与Suggest建议索引的精妙设计 🏛️➕🔮
- 精心设计商品主索引 (Product Index) 的Mapping,优化字段类型和分词策略,为核心的商品搜索、筛选、排序提供坚实基础。
- 独立构建搜索建议索引 (Suggest Index),专门服务于搜索框的自动补全 (Completion Suggester) 和 拼写纠错 (Phrase/Term Suggester) 功能,让用户的每一次输入都充满“智能感”。
- 海量数据处理:生产级全量商品数据写入ES实战与优化 🌊
- 面对百万级甚至亿级的商品数据,如何高效、稳定地将其从数据源(如MySQL)同步到Elasticsearch?我们将编写生产级的Java代码,利用Bulk API进行批量写入。
- 深入探讨数据写入过程中的性能瓶颈分析与优化策略,如调整批处理大小、并发控制、ES集群参数调优等,确保数据同步的效率和成功率。
- 核心引擎:商品核心搜索接口的精细化实现 🔍
- 基于Elasticsearch强大的Query DSL,实现支持多种条件的商品搜索接口,包括:
- 关键词全文检索(跨多个字段,如名称、描述)。
- 结构化筛选(按分类、品牌、价格区间等)。
- 复杂排序(按相关度、价格、销量、新品等)。
- 精准分页。
- 关注搜索结果的相关性排序,确保用户最想要的结果排在最前面。
- 性能极限挑战:亿级商品数据搜索性能测试与极致优化 ⏱️⚡
- 模拟真实用户场景,对包含亿级商品数据的Elasticsearch集群进行压力测试和性能分析。
- 识别搜索瓶颈(慢查询、高CPU/内存消耗、磁盘I/O瓶颈等),并运用各种ES调优技巧(查询改写、缓存策略、索引优化、硬件升级等)进行极致性能优化,追求毫秒级的搜索响应。
- 智能交互:输入框自动补全与拼写纠错功能实战 💡✏️
- 利用Suggest API,为前端搜索框实现流畅的输入即提示 (Search-as-you-type) 功能。
- 当用户输入有误或不完整时,提供智能的拼写纠错建议,引导用户快速找到正确的目标。
通过这个项目,不仅仅是学习Elasticsearch的API调用,更是完整体验一个真实搜索系统的构建、优化和迭代过程,获得宝贵的生产实践经验。
IK分词器开发实战
用过ES做中文搜索的朋友,对IK分词器一定不陌生。它开源、好用,是很多项目中文分词的首选。但有时候,原生的IK分词器直接拿来用,可能会觉得差那么点意思,尤其是在词库更新这块儿。
比如,网络热词、行业新词层出不穷,如果我们的分词器词库还是老古董,那分词效果自然就跟不上时代了。比如用户搜“水池里面银龙鱼”,如果“银龙鱼”不在词库里,很可能就被拆得七零八落,搜索结果自然也就跑偏了。
所以,很多时候,我们需要撸起袖子,对IK分词器的源码进行二开,让它能更智能、更及时地更新词库。这篇文章,我们就来聊聊怎么改造IK分词器,特别是实现一个灵活的词库热刷新机制。
要改造IK,第一步当然是拿到它的源码。analysis-ik。然后用顺手的Java IDE(比如IntelliJ IDEA)打开这个项目。接下来是重要的一步:
我们的目标是改造词典加载逻辑。根据IK的源码结构,我们重点要关注的是负责词典管理和加载的核心类。通常,这个关键角色会落在org.wltea.analyzer.dic.Dictionary
这个类身上。它管理着所有的词典数据(主词典、停用词词典等),并在IK初始化和运行时提供词典查询服务。我们要做的,就是让它学会从数据库“进货”,并且还能定期“更新货架”。
IK分词器的词库热刷新机制
我们要玩中文分词,关键的一点是要有词库,如果词库一直不更新,里面少了一些冷门,网络词语,比如“水池里面银龙鱼”, 分词器就会分词不准确。 IK中文分词器,就必须要支持我们从自定义的数据库中初始化的时候加载词库,开启一个线程,让它定时从数据库中刷新和加载最新的词库。如果我们想要更新词库的话,就可以在web界面手动录入词语,或系统灌入最新的词汇,数据库,ES JVM 进程里 -》IK中文分词器-》不断的自己热加载数据库里最新的词汇和热刷新。
IK分词器源码改造流程讲解
ok,前面我们已经知道了我们要实现什么样的效果,本节我们就来动手改造。
需要分析IK分词器的源码结构,特别是与词库加载相关的部分,以便实现从数据库热加载词库的功能。从项目结构可以看出,是一个Maven项目(存在pom.xml),主要有config
、core
、elasticsearch
和opensearch
等目录。
先了解IK分词器的核心实现在哪里,特别是与词库加载相关的代码。主要代码路径是在 analysis-ik/core/src/main/java/org/wltea/analyzer/dic
目录下。
需要特别关注Dictionary.java
和Monitor.java
文件,它们可能与词典管理和监控有关,我们需要修改代码使其能够从数据库热加载词库。
ok,先在找到了代码,再来分析下需求,是让IK分词器支持从数据库中热加载词库。目前IK分词器支持以下方式加载词库:
- 本地文件加载词典
- 远程HTTP接口获取词典(能够实现热加载)
如何实现从数据库中热加载词库,即实现思路是什么?
- 首先需要,创建数据库连接管理类
- 修改词典加载机制,增加从数据库加载的方法
- 创建定时任务,定期从数据库中检查并更新词库。
// 在org.wltea.analyzer.dic包中创建DBDictSource.java package org.wltea.analyzer.dic; import java.sql.*; import java.util.ArrayList; import java.util.List; import org.apache.logging.log4j.Logger; import org.wltea.analyzer.help.ESPluginLoggerFactory; public class DBDictSource { private static final Logger logger = ESPluginLoggerFactory.getLogger(DBDictSource.class.getName()); private String jdbcUrl; private String username; private String password; private String dictTableName; private String dictColumnName; private String lastUpdateTimeColumnName; private long lastUpdateTime; public DBDictSource(String jdbcUrl, String username, String password, String dictTableName, String dictColumnName, String lastUpdateTimeColumnName) { this.jdbcUrl = jdbcUrl; this.username = username; this.password = password; this.dictTableName = dictTableName; this.dictColumnName = dictColumnName; this.lastUpdateTimeColumnName = lastUpdateTimeColumnName; this.lastUpdateTime = 0L; } public Connection getConnection() throws SQLException { try { Class.forName("com.mysql.jdbc.Driver"); // 根据实际使用的数据库修改驱动 return DriverManager.getConnection(jdbcUrl, username, password); } catch (ClassNotFoundException e) { logger.error("Database driver not found", e); throw new SQLException("Database driver not found", e); } } public List<String> getNewDictWords() { List<String> words = new ArrayList<>(); Connection conn = null; PreparedStatement ps = null; ResultSet rs = null; try { conn = getConnection(); String sql = "SELECT " + dictColumnName + " FROM " + dictTableName; if (lastUpdateTimeColumnName != null && !lastUpdateTimeColumnName.isEmpty()) { sql += " WHERE " + lastUpdateTimeColumnName + " > ?"; ps = conn.prepareStatement(sql); ps.setTimestamp(1, new Timestamp(lastUpdateTime)); } else { ps = conn.prepareStatement(sql); } rs = ps.executeQuery(); while (rs.next()) { String word = rs.getString(1); if (word != null && !word.trim().isEmpty()) { words.add(word.trim()); } } // 更新最后检查时间 if (lastUpdateTimeColumnName != null && !lastUpdateTimeColumnName.isEmpty()) { sql = "SELECT MAX(" + lastUpdateTimeColumnName + ") FROM " + dictTableName; ps = conn.prepareStatement(sql); rs = ps.executeQuery(); if (rs.next()) { Timestamp maxTime = rs.getTimestamp(1); if (maxTime != null) { lastUpdateTime = maxTime.getTime(); } } } return words; } catch (SQLException e) { logger.error("Failed to load dictionary from database", e); return words; } finally { closeResources(conn, ps, rs); } } public List<String> getAllDictWords() { List<String> words = new ArrayList<>(); Connection conn = null; PreparedStatement ps = null; ResultSet rs = null; try { conn = getConnection(); String sql = "SELECT " + dictColumnName + " FROM " + dictTableName; ps = conn.prepareStatement(sql); rs = ps.executeQuery(); while (rs.next()) { String word = rs.getString(1); if (word != null && !word.trim().isEmpty()) { words.add(word.trim()); } } return words; } catch (SQLException e) { logger.error("Failed to load dictionary from database", e); return words; } finally { closeResources(conn, ps, rs); } } private void closeResources(Connection conn, PreparedStatement ps, ResultSet rs) { try { if (rs != null) rs.close(); } catch (SQLException e) { logger.error("Failed to close ResultSet", e); } try { if (ps != null) ps.close(); } catch (SQLException e) { logger.error("Failed to close PreparedStatement", e); } try { if (conn != null) conn.close(); } catch (SQLException e) { logger.error("Failed to close Connection", e); } } }
修改配置文件和配置类:
<!-- 在IKAnalyzer.cfg.xml中添加数据库配置 --> <properties> <!-- 现有配置保持不变 --> <!-- 数据库词库配置 --> <entry key="db_enable">true</entry> <entry key="db_url">jdbc:mysql://localhost:3306/dict_db</entry> <entry key="db_username">root</entry> <entry key="db_password">password</entry> <entry key="db_dict_table">custom_dict</entry> <entry key="db_dict_column">word</entry> <entry key="db_time_column">update_time</entry> <entry key="db_poll_time">60</entry> </properties>
// 在org.wltea.analyzer.cfg.Configuration类中添加方法和属性 private boolean dbEnable = false; private String dbUrl; private String dbUsername; private String dbPassword; private String dbDictTable; private String dbDictColumn; private String dbTimeColumn; private int dbPollTime = 60; // 默认60秒 // 在合适的初始化方法中添加 public void loadConfig(Properties props) { // 加载现有配置 // 加载数据库配置 dbEnable = Boolean.valueOf(props.getProperty("db_enable", "false")); if (dbEnable) { dbUrl = props.getProperty("db_url"); dbUsername = props.getProperty("db_username"); dbPassword = props.getProperty("db_password"); dbDictTable = props.getProperty("db_dict_table"); dbDictColumn = props.getProperty("db_dict_column"); dbTimeColumn = props.getProperty("db_time_column"); String pollTimeStr = props.getProperty("db_poll_time"); if (pollTimeStr != null) { try { dbPollTime = Integer.parseInt(pollTimeStr); } catch (NumberFormatException e) { // 使用默认值 } } } } // 添加getter方法 public boolean isDbEnable() { return dbEnable; } public String getDbUrl() { return dbUrl; } public String getDbUsername() { return dbUsername; } public String getDbPassword() { return dbPassword; } public String getDbDictTable() { return dbDictTable; } public String getDbDictColumn() { return dbDictColumn; } public String getDbTimeColumn() { return dbTimeColumn; } public int getDbPollTime() { return dbPollTime; }
创建数据库词典监控类:参考现有的Monitor类,创建一个数据库词典监控类:
// 在org.wltea.analyzer.dic包中创建DBDictMonitor.java package org.wltea.analyzer.dic; import java.security.AccessController; import java.security.PrivilegedAction; import java.util.List; import org.apache.logging.log4j.Logger; import org.wltea.analyzer.cfg.Configuration; import org.wltea.analyzer.help.ESPluginLoggerFactory; public class DBDictMonitor implements Runnable { private static final Logger logger = ESPluginLoggerFactory.getLogger(DBDictMonitor.class.getName()); private Configuration configuration; private DBDictSource dbDictSource; public DBDictMonitor(Configuration cfg) { this.configuration = cfg; if (cfg.isDbEnable()) { this.dbDictSource = new DBDictSource( cfg.getDbUrl(), cfg.getDbUsername(), cfg.getDbPassword(), cfg.getDbDictTable(), cfg.getDbDictColumn(), cfg.getDbTimeColumn() ); } } @Override public void run() { configuration.check(); AccessController.doPrivileged((PrivilegedAction<Void>) () -> { this.runUnprivileged(); return null; }); } public void runUnprivileged() { if (dbDictSource == null) { return; } try { logger.info("Checking database dictionary updates..."); List<String> words = dbDictSource.getNewDictWords(); if (words != null && !words.isEmpty()) { logger.info("Found {} new words in database, updating dictionary...", words.size()); Dictionary.getSingleton().addWords(words); logger.info("Dictionary updated with words from database."); } } catch (Exception e) { logger.error("Error updating dictionary from database", e); } } // 初始化方法,从数据库加载所有词条 public void loadAllDictWords() { if (dbDictSource == null) { return; } try { logger.info("Loading all dictionary words from database..."); List<String> words = dbDictSource.getAllDictWords(); if (words != null && !words.isEmpty()) { logger.info("Loaded {} words from database", words.size()); Dictionary.getSingleton().addWords(words); logger.info("Dictionary initialized with words from database."); } } catch (Exception e) { logger.error("Error loading dictionary from database", e); } } }
添加 MAVEN 依赖:在项目的pom.xml中添加JDBC驱动依赖,比如MySQL的驱动
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.28</version> </dependency>
需要用户在数据库中创建相应的表结构:
CREATE TABLE custom_dict ( id INT PRIMARY KEY AUTO_INCREMENT, word VARCHAR(100) NOT NULL, update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP );
我们需要增加一种新的方式:从数据库中读取并热加载词库。
停用词,stop words 就是没有太大意义的一些词汇,分词可以忽略掉。也有一个
在ES生产集群中安装IK分词器
改造完成后,我们就可以重新打包,注意版本。执行命令:
mvn clean package
就会得到一个 jar包,后面用这个jar包,替换掉IK发行版本中的jar包。
之后建立数据库以及配置文件
最后给3个ES节点都安装IK分词器。 先搞第一台机器。 在elasticsearch的plugins目录下创建一个ik目录:
mkdir -p /app/elasticsearch/elasticsearch-7.9.3/plugins/ik
把下载的发行版压缩包上传到这里。然后解压
用刚刚打包好的jar包替换掉解压得到的同名jar包。
把mysql-connector-java-8.0.20.jar包也上传到ik目录。(为什么) 在ik/config目录下添加一个jdbc.properties文件。
然后搞第二台机器。
重启es的三个节点。
ES生产环境下,IK分词器和pinyin分词器一定会一起使用。