Yujun's Portfolio

生产级智能商品搜索系统

June 10, 2024 (1y ago)

项目亮剑:构建生产级智能商品搜索系统实战 🚀

本项目将深入实战,从零开始构建一个生产级别的智能商品搜索系统。我们将不仅仅停留在理论层面,而是亲手操刀,解决真实世界中会遇到的各种挑战,涵盖从Elasticsearch集群的部署,到中文分词器的深度定制,再到海量数据的高效处理极致的搜索性能优化

本项目将实践以下核心技术与经验:

  1. 坚如磐石:生产环境ES集群部署与运维 🏭
  • 告别单机演示,我们将学习如何在多节点环境下规划、部署和配置一个高可用、可扩展的Elasticsearch生产集群。
  • 深入探讨节点角色、集群发现、故障转移、监控告警等关键运维知识,确保你的搜索引擎7x24小时稳定运行。
  1. 独具匠心:IK分词器源码二次开发与Pinyin分词器集成 🔪
  • 突破原生IK分词器的局限。我们将亲自动手修改IK源码,实现从数据库动态加载和热更新词典的强大功能,让你的分词器实时掌握网络新词、行业术语。
  • 集成Pinyin分词器,完美支持用户通过拼音输入进行商品搜索和智能提示,大幅提升用户体验。
  1. 双剑合璧:商品主索引与Suggest建议索引的精妙设计 🏛️➕🔮
  • 精心设计商品主索引 (Product Index) 的Mapping,优化字段类型和分词策略,为核心的商品搜索、筛选、排序提供坚实基础。
  • 独立构建搜索建议索引 (Suggest Index),专门服务于搜索框的自动补全 (Completion Suggester)拼写纠错 (Phrase/Term Suggester) 功能,让用户的每一次输入都充满“智能感”。
  1. 海量数据处理:生产级全量商品数据写入ES实战与优化 🌊
  • 面对百万级甚至亿级的商品数据,如何高效、稳定地将其从数据源(如MySQL)同步到Elasticsearch?我们将编写生产级的Java代码,利用Bulk API进行批量写入
  • 深入探讨数据写入过程中的性能瓶颈分析优化策略,如调整批处理大小、并发控制、ES集群参数调优等,确保数据同步的效率和成功率。
  1. 核心引擎:商品核心搜索接口的精细化实现 🔍
  • 基于Elasticsearch强大的Query DSL,实现支持多种条件的商品搜索接口,包括:
  • 关键词全文检索(跨多个字段,如名称、描述)。
  • 结构化筛选(按分类、品牌、价格区间等)。
  • 复杂排序(按相关度、价格、销量、新品等)。
  • 精准分页。
  • 关注搜索结果的相关性排序,确保用户最想要的结果排在最前面。
  1. 性能极限挑战:亿级商品数据搜索性能测试与极致优化 ⏱️⚡
  • 模拟真实用户场景,对包含亿级商品数据的Elasticsearch集群进行压力测试和性能分析
  • 识别搜索瓶颈(慢查询、高CPU/内存消耗、磁盘I/O瓶颈等),并运用各种ES调优技巧(查询改写、缓存策略、索引优化、硬件升级等)进行极致性能优化,追求毫秒级的搜索响应。
  1. 智能交互:输入框自动补全与拼写纠错功能实战 💡✏️
  • 利用Suggest API,为前端搜索框实现流畅的输入即提示 (Search-as-you-type) 功能。
  • 当用户输入有误或不完整时,提供智能的拼写纠错建议,引导用户快速找到正确的目标。

通过这个项目,不仅仅是学习Elasticsearch的API调用,更是完整体验一个真实搜索系统的构建、优化和迭代过程,获得宝贵的生产实践经验。

IK分词器开发实战

用过ES做中文搜索的朋友,对IK分词器一定不陌生。它开源、好用,是很多项目中文分词的首选。但有时候,原生的IK分词器直接拿来用,可能会觉得差那么点意思,尤其是在词库更新这块儿。

比如,网络热词、行业新词层出不穷,如果我们的分词器词库还是老古董,那分词效果自然就跟不上时代了。比如用户搜“水池里面银龙鱼”,如果“银龙鱼”不在词库里,很可能就被拆得七零八落,搜索结果自然也就跑偏了。

所以,很多时候,我们需要撸起袖子,对IK分词器的源码进行二开,让它能更智能、更及时地更新词库。这篇文章,我们就来聊聊怎么改造IK分词器,特别是实现一个灵活的词库热刷新机制。

要改造IK,第一步当然是拿到它的源码。analysis-ik。然后用顺手的Java IDE(比如IntelliJ IDEA)打开这个项目。接下来是重要的一步:

我们的目标是改造词典加载逻辑。根据IK的源码结构,我们重点要关注的是负责词典管理和加载的核心类。通常,这个关键角色会落在org.wltea.analyzer.dic.Dictionary这个类身上。它管理着所有的词典数据(主词典、停用词词典等),并在IK初始化和运行时提供词典查询服务。我们要做的,就是让它学会从数据库“进货”,并且还能定期“更新货架”。

IK分词器的词库热刷新机制

我们要玩中文分词,关键的一点是要有词库,如果词库一直不更新,里面少了一些冷门,网络词语,比如“水池里面银龙鱼”, 分词器就会分词不准确。 IK中文分词器,就必须要支持我们从自定义的数据库中初始化的时候加载词库,开启一个线程,让它定时从数据库中刷新和加载最新的词库。如果我们想要更新词库的话,就可以在web界面手动录入词语,或系统灌入最新的词汇,数据库,ES JVM 进程里 -》IK中文分词器-》不断的自己热加载数据库里最新的词汇和热刷新。

IK分词器源码改造流程讲解

ok,前面我们已经知道了我们要实现什么样的效果,本节我们就来动手改造。

需要分析IK分词器的源码结构,特别是与词库加载相关的部分,以便实现从数据库热加载词库的功能。从项目结构可以看出,是一个Maven项目(存在pom.xml),主要有configcoreelasticsearchopensearch等目录。

先了解IK分词器的核心实现在哪里,特别是与词库加载相关的代码。主要代码路径是在 analysis-ik/core/src/main/java/org/wltea/analyzer/dic 目录下。

需要特别关注Dictionary.javaMonitor.java文件,它们可能与词典管理和监控有关,我们需要修改代码使其能够从数据库热加载词库。

ok,先在找到了代码,再来分析下需求,是让IK分词器支持从数据库中热加载词库。目前IK分词器支持以下方式加载词库:

  • 本地文件加载词典
  • 远程HTTP接口获取词典(能够实现热加载)

如何实现从数据库中热加载词库,即实现思路是什么?

  • 首先需要,创建数据库连接管理类
  • 修改词典加载机制,增加从数据库加载的方法
  • 创建定时任务,定期从数据库中检查并更新词库。
// 在org.wltea.analyzer.dic包中创建DBDictSource.java
package org.wltea.analyzer.dic;

import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import org.apache.logging.log4j.Logger;
import org.wltea.analyzer.help.ESPluginLoggerFactory;

public class DBDictSource {

    private static final Logger logger = ESPluginLoggerFactory.getLogger(DBDictSource.class.getName());
    private String jdbcUrl;
    private String username;
    private String password;
    private String dictTableName;
    private String dictColumnName;
    private String lastUpdateTimeColumnName;
    private long lastUpdateTime;

    public DBDictSource(String jdbcUrl, String username, String password, String dictTableName, String dictColumnName, String lastUpdateTimeColumnName) {
        this.jdbcUrl = jdbcUrl;
        this.username = username;
        this.password = password;
        this.dictTableName = dictTableName;
        this.dictColumnName = dictColumnName;
        this.lastUpdateTimeColumnName = lastUpdateTimeColumnName;
        this.lastUpdateTime = 0L;
    }

    public Connection getConnection() throws SQLException {
        try {
            Class.forName("com.mysql.jdbc.Driver"); // 根据实际使用的数据库修改驱动
            return DriverManager.getConnection(jdbcUrl, username, password);
        } catch (ClassNotFoundException e) {
            logger.error("Database driver not found", e);
            throw new SQLException("Database driver not found", e);
        }
    }

    public List<String> getNewDictWords() {
        List<String> words = new ArrayList<>();
        Connection conn = null;
        PreparedStatement ps = null;
        ResultSet rs = null;

        try {
            conn = getConnection();
            String sql = "SELECT " + dictColumnName + " FROM " + dictTableName;
            if (lastUpdateTimeColumnName != null && !lastUpdateTimeColumnName.isEmpty()) {
                sql += " WHERE " + lastUpdateTimeColumnName + " > ?";
                ps = conn.prepareStatement(sql);
                ps.setTimestamp(1, new Timestamp(lastUpdateTime));
            } else {
                ps = conn.prepareStatement(sql);
            }

            rs = ps.executeQuery();
            while (rs.next()) {
                String word = rs.getString(1);
                if (word != null && !word.trim().isEmpty()) {
                    words.add(word.trim());
                }
            }

            // 更新最后检查时间
            if (lastUpdateTimeColumnName != null && !lastUpdateTimeColumnName.isEmpty()) {
                sql = "SELECT MAX(" + lastUpdateTimeColumnName + ") FROM " + dictTableName;
                ps = conn.prepareStatement(sql);
                rs = ps.executeQuery();
                if (rs.next()) {
                    Timestamp maxTime = rs.getTimestamp(1);
                    if (maxTime != null) {
                        lastUpdateTime = maxTime.getTime();
                    }
                }
            }

            return words;
        } catch (SQLException e) {
            logger.error("Failed to load dictionary from database", e);
            return words;
        } finally {
            closeResources(conn, ps, rs);
        }
    }

    public List<String> getAllDictWords() {
        List<String> words = new ArrayList<>();
        Connection conn = null;
        PreparedStatement ps = null;
        ResultSet rs = null;

        try {
            conn = getConnection();
            String sql = "SELECT " + dictColumnName + " FROM " + dictTableName;
            ps = conn.prepareStatement(sql);
            rs = ps.executeQuery();

            while (rs.next()) {
                String word = rs.getString(1);
                if (word != null && !word.trim().isEmpty()) {
                    words.add(word.trim());
                }
            }

            return words;
        } catch (SQLException e) {
            logger.error("Failed to load dictionary from database", e);
            return words;
        } finally {
            closeResources(conn, ps, rs);
        }
    }

    private void closeResources(Connection conn, PreparedStatement ps, ResultSet rs) {
        try {
            if (rs != null) rs.close();
        } catch (SQLException e) {
            logger.error("Failed to close ResultSet", e);
        }

        try {
            if (ps != null) ps.close();
        } catch (SQLException e) {
            logger.error("Failed to close PreparedStatement", e);
        }

        try {
            if (conn != null) conn.close();
        } catch (SQLException e) {
            logger.error("Failed to close Connection", e);
        }
    }
}

修改配置文件和配置类:

<!-- 在IKAnalyzer.cfg.xml中添加数据库配置 -->
<properties>
    <!-- 现有配置保持不变 -->
    <!-- 数据库词库配置 -->
    <entry key="db_enable">true</entry>
    <entry key="db_url">jdbc:mysql://localhost:3306/dict_db</entry>
    <entry key="db_username">root</entry>
    <entry key="db_password">password</entry>
    <entry key="db_dict_table">custom_dict</entry>
    <entry key="db_dict_column">word</entry>
    <entry key="db_time_column">update_time</entry>
    <entry key="db_poll_time">60</entry>
</properties>
// 在org.wltea.analyzer.cfg.Configuration类中添加方法和属性
private boolean dbEnable = false;
private String dbUrl;
private String dbUsername;
private String dbPassword;
private String dbDictTable;
private String dbDictColumn;
private String dbTimeColumn;
private int dbPollTime = 60; // 默认60秒

// 在合适的初始化方法中添加
public void loadConfig(Properties props) {
    // 加载现有配置

    // 加载数据库配置
    dbEnable = Boolean.valueOf(props.getProperty("db_enable", "false"));
    if (dbEnable) {
        dbUrl = props.getProperty("db_url");
        dbUsername = props.getProperty("db_username");
        dbPassword = props.getProperty("db_password");
        dbDictTable = props.getProperty("db_dict_table");
        dbDictColumn = props.getProperty("db_dict_column");
        dbTimeColumn = props.getProperty("db_time_column");
        String pollTimeStr = props.getProperty("db_poll_time");
        if (pollTimeStr != null) {
            try {
                dbPollTime = Integer.parseInt(pollTimeStr);
            } catch (NumberFormatException e) {
                // 使用默认值
            }
        }
    }
}

// 添加getter方法
public boolean isDbEnable() {
    return dbEnable;
}

public String getDbUrl() {
    return dbUrl;
}

public String getDbUsername() {
    return dbUsername;
}

public String getDbPassword() {
    return dbPassword;
}

public String getDbDictTable() {
    return dbDictTable;
}

public String getDbDictColumn() {
    return dbDictColumn;
}

public String getDbTimeColumn() {
    return dbTimeColumn;
}

public int getDbPollTime() {
    return dbPollTime;
}

创建数据库词典监控类:参考现有的Monitor类,创建一个数据库词典监控类:

// 在org.wltea.analyzer.dic包中创建DBDictMonitor.java
package org.wltea.analyzer.dic;

import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.List;

import org.apache.logging.log4j.Logger;
import org.wltea.analyzer.cfg.Configuration;
import org.wltea.analyzer.help.ESPluginLoggerFactory;

public class DBDictMonitor implements Runnable {

    private static final Logger logger = ESPluginLoggerFactory.getLogger(DBDictMonitor.class.getName());

    private Configuration configuration;
    private DBDictSource dbDictSource;

    public DBDictMonitor(Configuration cfg) {
        this.configuration = cfg;
        if (cfg.isDbEnable()) {
            this.dbDictSource = new DBDictSource(
                cfg.getDbUrl(),
                cfg.getDbUsername(),
                cfg.getDbPassword(),
                cfg.getDbDictTable(),
                cfg.getDbDictColumn(),
                cfg.getDbTimeColumn()
            );
        }
    }

    @Override
    public void run() {
        configuration.check();
        AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
            this.runUnprivileged();
            return null;
        });
    }

    public void runUnprivileged() {
        if (dbDictSource == null) {
            return;
        }

        try {
            logger.info("Checking database dictionary updates...");
            List<String> words = dbDictSource.getNewDictWords();

            if (words != null && !words.isEmpty()) {
                logger.info("Found {} new words in database, updating dictionary...", words.size());
                Dictionary.getSingleton().addWords(words);
                logger.info("Dictionary updated with words from database.");
            }
        } catch (Exception e) {
            logger.error("Error updating dictionary from database", e);
        }
    }

    // 初始化方法,从数据库加载所有词条
    public void loadAllDictWords() {
        if (dbDictSource == null) {
            return;
        }

        try {
            logger.info("Loading all dictionary words from database...");
            List<String> words = dbDictSource.getAllDictWords();

            if (words != null && !words.isEmpty()) {
                logger.info("Loaded {} words from database", words.size());
                Dictionary.getSingleton().addWords(words);
                logger.info("Dictionary initialized with words from database.");
            }
        } catch (Exception e) {
            logger.error("Error loading dictionary from database", e);
        }
    }
}

添加 MAVEN 依赖:在项目的pom.xml中添加JDBC驱动依赖,比如MySQL的驱动

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.28</version>
</dependency>

需要用户在数据库中创建相应的表结构:

CREATE TABLE custom_dict (
    id INT PRIMARY KEY AUTO_INCREMENT,
    word VARCHAR(100) NOT NULL,
    update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

我们需要增加一种新的方式:从数据库中读取并热加载词库。

停用词,stop words 就是没有太大意义的一些词汇,分词可以忽略掉。也有一个

在ES生产集群中安装IK分词器

改造完成后,我们就可以重新打包,注意版本。执行命令:

mvn clean package

就会得到一个 jar包,后面用这个jar包,替换掉IK发行版本中的jar包。

之后建立数据库以及配置文件

最后给3个ES节点都安装IK分词器。 先搞第一台机器。 在elasticsearch的plugins目录下创建一个ik目录:

mkdir -p /app/elasticsearch/elasticsearch-7.9.3/plugins/ik

把下载的发行版压缩包上传到这里。然后解压

用刚刚打包好的jar包替换掉解压得到的同名jar包。

把mysql-connector-java-8.0.20.jar包也上传到ik目录。(为什么) 在ik/config目录下添加一个jdbc.properties文件。

然后搞第二台机器。

重启es的三个节点。

ES生产环境下,IK分词器和pinyin分词器一定会一起使用。