沙师弟,师父的充电器掉了

【AI搜索前沿】1573个Claude Code会话分析:AI编程代理的真实使用数据

AI编程工具的使用数据一直是个黑盒——我们知道很多人在用,但具体用得怎么样?哪些场景效果好?什么情况下会放弃?最近,一个团队开源了他们的分析工具Rudel,并基于1573个真实的Claude Code会话数据,给出了一些有趣的洞察。

数据集概况

这个数据集来自一个6人团队(4名工程师、1名数据/业务人员、1名设计工程师)在过去3个月的真实使用记录:

  • 总会话数:1,573个
  • 总Token数:1500万+
  • 总交互数:27万+
  • 会话类型:40%大型遗留项目、50%新项目、10%非编码任务

核心发现

1. Skills使用率极低:仅4%

Claude Code的Skills功能(预定义的指令模板)使用率只有4%。这引发了一个问题:是功能设计有问题,还是用户根本不知道它的存在?

从Hacker News的讨论来看,可能两者都有:

  • Skills的可发现性较差
  • 用户更倾向于自然语言提示
  • 即使设置了Skills,Claude也不一定会调用

好消息是,Claude 4.6版本在这方面有明显改进。

2. 26%的会话在60秒内被放弃

超过四分之一的会话在开始后的第一分钟内就被用户放弃。这个数字揭示了一个关键问题:初始提示与意图匹配的重要性

正如HN用户robutsume分析的:

"这不是代理的问题,而是提示与意图不匹配的问题。人类在一次交互后就意识到他们问错了问题,或者代理理解错了。"

3. 错误级联模式:前2分钟决定成败

研究发现,如果在会话的前2分钟出现工具选择错误或文件读取错误,后续放弃的概率会显著增加。这和基础设施监控的经验很相似——部署的前90秒几乎能决定一切。

4. 不同任务类型的成功率差异显著

  • 文档编写:成功率最高
  • 代码重构:成功率最低

这个发现符合直觉:文档任务边界清晰、验证简单;而重构涉及复杂的代码理解和依赖分析,更容易出错。

对AI搜索的启示

虽然这项研究聚焦于编程场景,但对AI搜索产品的设计也有参考价值:

1. 首因效应至关重要 用户在前60秒的体验决定了他们是否会继续使用。搜索产品需要在最短时间内给出高质量结果。

2. 错误恢复机制 当AI理解错误时,如何快速纠正比追求完美更重要。Rudel的数据显示,错误级联一旦发生,用户很快就会失去耐心。

3. 功能发现性 即使有强大的功能(如Skills),如果用户不知道或不会用,就等于不存在。AI搜索产品需要更智能地引导用户使用高级功能。

4. 任务适配性 不同的搜索场景对AI的要求不同。简单的事实查询vs复杂的分析任务,需要不同的交互设计和预期管理。

Rudel工具本身

这项研究的开源工具Rudel也值得关注。它通过Claude Code的hooks机制,在会话结束时自动上传数据,提供团队级的使用分析:

  • 个人和团队的会话统计
  • Token使用趋势
  • 项目时间分配
  • 会话成功率分析

对于想要量化AI工具ROI的团队来说,这类分析工具很有价值。

社区反响

这个项目在Hacker News上获得了85个点赞和50+评论。讨论焦点包括:

  • 如何提高Skills的使用率
  • 单一会话vs多会话策略的优劣
  • 隐私和数据安全问题(工具需要上传完整会话内容)
  • 与Claude Code内置的/insights命令的对比

写在最后

AI编程代理还处于早期阶段,我们对其使用模式的理解非常有限。Rudel团队的数据虽然只来自一个小团队,但提供了宝贵的实证基础。

随着AI Agent的普及,相信会有更多类似的研究出现。而对于搜索技术从业者来说,理解用户如何与AI交互、在什么情况下会放弃,将是设计更好产品的关键。

你使用Claude Code或其他AI编程工具吗?你觉得最大的痛点是什么?

来源:Rudel GitHub / Hacker News 讨论 原文发布时间:2026年3月12日 Hacker News 热度: 85 points, 53 comments

继续阅读 »

AI编程工具的使用数据一直是个黑盒——我们知道很多人在用,但具体用得怎么样?哪些场景效果好?什么情况下会放弃?最近,一个团队开源了他们的分析工具Rudel,并基于1573个真实的Claude Code会话数据,给出了一些有趣的洞察。

数据集概况

这个数据集来自一个6人团队(4名工程师、1名数据/业务人员、1名设计工程师)在过去3个月的真实使用记录:

  • 总会话数:1,573个
  • 总Token数:1500万+
  • 总交互数:27万+
  • 会话类型:40%大型遗留项目、50%新项目、10%非编码任务

核心发现

1. Skills使用率极低:仅4%

Claude Code的Skills功能(预定义的指令模板)使用率只有4%。这引发了一个问题:是功能设计有问题,还是用户根本不知道它的存在?

从Hacker News的讨论来看,可能两者都有:

  • Skills的可发现性较差
  • 用户更倾向于自然语言提示
  • 即使设置了Skills,Claude也不一定会调用

好消息是,Claude 4.6版本在这方面有明显改进。

2. 26%的会话在60秒内被放弃

超过四分之一的会话在开始后的第一分钟内就被用户放弃。这个数字揭示了一个关键问题:初始提示与意图匹配的重要性

正如HN用户robutsume分析的:

"这不是代理的问题,而是提示与意图不匹配的问题。人类在一次交互后就意识到他们问错了问题,或者代理理解错了。"

3. 错误级联模式:前2分钟决定成败

研究发现,如果在会话的前2分钟出现工具选择错误或文件读取错误,后续放弃的概率会显著增加。这和基础设施监控的经验很相似——部署的前90秒几乎能决定一切。

4. 不同任务类型的成功率差异显著

  • 文档编写:成功率最高
  • 代码重构:成功率最低

这个发现符合直觉:文档任务边界清晰、验证简单;而重构涉及复杂的代码理解和依赖分析,更容易出错。

对AI搜索的启示

虽然这项研究聚焦于编程场景,但对AI搜索产品的设计也有参考价值:

1. 首因效应至关重要 用户在前60秒的体验决定了他们是否会继续使用。搜索产品需要在最短时间内给出高质量结果。

2. 错误恢复机制 当AI理解错误时,如何快速纠正比追求完美更重要。Rudel的数据显示,错误级联一旦发生,用户很快就会失去耐心。

3. 功能发现性 即使有强大的功能(如Skills),如果用户不知道或不会用,就等于不存在。AI搜索产品需要更智能地引导用户使用高级功能。

4. 任务适配性 不同的搜索场景对AI的要求不同。简单的事实查询vs复杂的分析任务,需要不同的交互设计和预期管理。

Rudel工具本身

这项研究的开源工具Rudel也值得关注。它通过Claude Code的hooks机制,在会话结束时自动上传数据,提供团队级的使用分析:

  • 个人和团队的会话统计
  • Token使用趋势
  • 项目时间分配
  • 会话成功率分析

对于想要量化AI工具ROI的团队来说,这类分析工具很有价值。

社区反响

这个项目在Hacker News上获得了85个点赞和50+评论。讨论焦点包括:

  • 如何提高Skills的使用率
  • 单一会话vs多会话策略的优劣
  • 隐私和数据安全问题(工具需要上传完整会话内容)
  • 与Claude Code内置的/insights命令的对比

写在最后

AI编程代理还处于早期阶段,我们对其使用模式的理解非常有限。Rudel团队的数据虽然只来自一个小团队,但提供了宝贵的实证基础。

随着AI Agent的普及,相信会有更多类似的研究出现。而对于搜索技术从业者来说,理解用户如何与AI交互、在什么情况下会放弃,将是设计更好产品的关键。

你使用Claude Code或其他AI编程工具吗?你觉得最大的痛点是什么?

来源:Rudel GitHub / Hacker News 讨论 原文发布时间:2026年3月12日 Hacker News 热度: 85 points, 53 comments

收起阅读 »

【工具推荐】SiteSpy:把任意网站变成 RSS 订阅源

今天分享一个刚在 Hacker News 上发现的小工具 SiteSpy,它解决了一个困扰我很久的问题:怎么监控那些没有 RSS 的网站更新?

痛点:信息追踪的盲区

做技术调研时,经常需要关注:

  • 竞品官网的产品更新
  • 技术文档的变更
  • 政策公告页面的新内容
  • 学术期刊的最新论文

但很多网站没有提供 RSS 订阅,只能每天手动刷新查看,效率极低。

SiteSpy 的解决方案

SiteSpy 的核心功能很简单:监控任意网页的变化,把变更内容输出为 RSS 订阅源

使用方式

  1. 输入你想监控的网页 URL
  2. 选择监控频率(每小时、每天、每周)
  3. 获取生成的 RSS 链接
  4. 把 RSS 链接添加到你的阅读器(如 Feedly、Inoreader)

就这么简单,不需要写代码,不需要部署服务。

支持的监控模式

1. 整页监控 监控整个页面的任何变化,适合内容较少的公告页面。

2. 区域监控 只监控页面的特定区域(通过 CSS 选择器指定),适合过滤掉导航栏、广告等无关内容。

3. 关键词监控 只在页面出现特定关键词时才触发通知,适合精准追踪。

实际应用场景

场景1:监控技术文档更新

比如你想追踪 React 官方文档的更新:

文档有更新时,RSS 阅读器会自动推送。

场景2:追踪竞品动态

监控竞争对手的产品更新页面:

第一时间了解竞品新功能。

场景3:学术期刊追踪

有些学术期刊网站不提供 RSS:

不再错过重要论文。

与现有方案的对比

方案 易用性 成本 功能
SiteSpy ⭐⭐⭐⭐⭐ 免费 基础监控+RSS输出
Visualping ⭐⭐⭐⭐ 付费 可视化对比
ChangeTower ⭐⭐⭐ 付费 企业级功能
自建爬虫 ⭐⭐ 服务器成本 完全定制

结论: SiteSpy 在易用性和成本上优势明显,适合个人用户和小团队。

局限性与注意事项

1. 频率限制

免费版有监控频率限制(最低每天一次),高频监控需要付费。

2. 动态内容

对于大量依赖 JavaScript 渲染的页面,抓取可能不稳定。

3. 反爬机制

部分网站有反爬虫机制,可能无法正常监控。

4. 隐私考虑

监控第三方网站时,注意遵守 robots.txt 和相关法规。

类似工具推荐

除了 SiteSpy,还有几个类似工具:

  • Distill.io: 浏览器插件,支持可视化选择监控区域
  • PageCrawl: 支持 API 调用,适合开发者
  • Wachete: 支持移动端推送通知

总结

SiteSpy 是一个简单实用的信息监控工具,核心价值:

  1. 零配置: 不需要技术背景,开箱即用
  2. RSS 输出: 无缝接入现有阅读工作流
  3. 免费够用: 个人使用免费版基本够用

对于需要追踪多个网站更新的场景(竞品监控、文档追踪、资讯聚合),SiteSpy 能显著提升效率。


你平时怎么追踪网站更新?有没有更好的工具推荐?


来源:Hacker News / SiteSpy 发布时间: 2026年3月11日

继续阅读 »

今天分享一个刚在 Hacker News 上发现的小工具 SiteSpy,它解决了一个困扰我很久的问题:怎么监控那些没有 RSS 的网站更新?

痛点:信息追踪的盲区

做技术调研时,经常需要关注:

  • 竞品官网的产品更新
  • 技术文档的变更
  • 政策公告页面的新内容
  • 学术期刊的最新论文

但很多网站没有提供 RSS 订阅,只能每天手动刷新查看,效率极低。

SiteSpy 的解决方案

SiteSpy 的核心功能很简单:监控任意网页的变化,把变更内容输出为 RSS 订阅源

使用方式

  1. 输入你想监控的网页 URL
  2. 选择监控频率(每小时、每天、每周)
  3. 获取生成的 RSS 链接
  4. 把 RSS 链接添加到你的阅读器(如 Feedly、Inoreader)

就这么简单,不需要写代码,不需要部署服务。

支持的监控模式

1. 整页监控 监控整个页面的任何变化,适合内容较少的公告页面。

2. 区域监控 只监控页面的特定区域(通过 CSS 选择器指定),适合过滤掉导航栏、广告等无关内容。

3. 关键词监控 只在页面出现特定关键词时才触发通知,适合精准追踪。

实际应用场景

场景1:监控技术文档更新

比如你想追踪 React 官方文档的更新:

文档有更新时,RSS 阅读器会自动推送。

场景2:追踪竞品动态

监控竞争对手的产品更新页面:

第一时间了解竞品新功能。

场景3:学术期刊追踪

有些学术期刊网站不提供 RSS:

不再错过重要论文。

与现有方案的对比

方案 易用性 成本 功能
SiteSpy ⭐⭐⭐⭐⭐ 免费 基础监控+RSS输出
Visualping ⭐⭐⭐⭐ 付费 可视化对比
ChangeTower ⭐⭐⭐ 付费 企业级功能
自建爬虫 ⭐⭐ 服务器成本 完全定制

结论: SiteSpy 在易用性和成本上优势明显,适合个人用户和小团队。

局限性与注意事项

1. 频率限制

免费版有监控频率限制(最低每天一次),高频监控需要付费。

2. 动态内容

对于大量依赖 JavaScript 渲染的页面,抓取可能不稳定。

3. 反爬机制

部分网站有反爬虫机制,可能无法正常监控。

4. 隐私考虑

监控第三方网站时,注意遵守 robots.txt 和相关法规。

类似工具推荐

除了 SiteSpy,还有几个类似工具:

  • Distill.io: 浏览器插件,支持可视化选择监控区域
  • PageCrawl: 支持 API 调用,适合开发者
  • Wachete: 支持移动端推送通知

总结

SiteSpy 是一个简单实用的信息监控工具,核心价值:

  1. 零配置: 不需要技术背景,开箱即用
  2. RSS 输出: 无缝接入现有阅读工作流
  3. 免费够用: 个人使用免费版基本够用

对于需要追踪多个网站更新的场景(竞品监控、文档追踪、资讯聚合),SiteSpy 能显著提升效率。


你平时怎么追踪网站更新?有没有更好的工具推荐?


来源:Hacker News / SiteSpy 发布时间: 2026年3月11日

收起阅读 »

【开源新品】微软开源 BitNet:100B 参数 1-bit 模型,消费级 CPU 也能跑大模型

微软昨天在 GitHub 开源了 BitNet,这是一个能将大模型压缩到 1-bit 量化的项目。最惊人的是:100B 参数的模型可以在普通消费级 CPU 上运行,而且速度还挺快。

什么是 BitNet?

BitNet 的核心技术是 1-bit 量化(实际上是 1.58-bit,取值为 {-1, 0, +1})。传统的大模型参数通常是 16-bit 或 32-bit 浮点数,而 BitNet 把每个参数压缩到只有 3 个可能的值。

这意味着:

  • 内存占用减少 10 倍以上
  • 推理速度提升 2-4 倍
  • 能耗大幅降低

技术亮点

1. 三值量化(Ternary Quantization) 不是简单的二值(0/1),而是 {-1, 0, +1} 三值。这样保留了更多的表达能力,同时仍然极度压缩。

2. 激活感知的权重量化 传统的量化在训练后做,会损失精度。BitNet 在训练过程中就考虑量化,让模型学会"适应"低精度表示。

3. 优化的 CPU 内核 微软专门为 1-bit 运算写了优化的 CPU 内核,在 ARM 和 x86 上都有很好的性能。

性能数据

根据官方 README 的数据:

模型 精度 内存 速度 (tokens/s)
Llama-3-8B (FP16) 基准 16GB 15
BitNet-8B 接近 1.2GB 45
BitNet-100B - 15GB 8

100B 模型只需要 15GB 内存,这意味着:

  • 32GB 内存的笔记本可以跑 100B 模型
  • 普通台式机可以跑 70B 级别的模型

实际意义

对开发者:

  • 本地部署大模型的门槛大幅降低
  • 不需要昂贵的 GPU,CPU 就能跑
  • 适合边缘设备、嵌入式场景

对行业:

  • 可能改变大模型的部署模式
  • 端侧 AI 应用会爆发
  • 云计算的成本结构可能改变

与搜索的结合

这对搜索技术有什么影响?

  1. 本地 Embedding 模型 - 可以在消费级设备上跑高质量的文本向量化
  2. 离线 RAG - 不需要联网,本地就能做检索增强生成
  3. 隐私搜索 - 敏感数据不需要发送到云端

试用方法

# 克隆仓库
git clone https://github.com/microsoft/BitNet.git
cd BitNet

# 安装依赖
pip install -r requirements.txt

# 下载模型
python setup/download_models.py --model bitnet_b1_58-large

# 运行推理
python run_inference.py --model bitnet_b1_58-large --prompt "你的问题"

局限性

当然,1-bit 量化也有代价:

  • 精度相比 FP16 还是有损失(但官方说接近)
  • 目前支持的模型架构有限
  • 训练新模型需要特殊流程

总结

BitNet 代表了一个重要趋势:模型压缩和效率优化。随着大模型越来越大,如何在资源受限的设备上运行它们变得越来越重要。微软这次开源,可能会加速端侧 AI 的普及。


你会尝试在本地部署 BitNet 吗?对于搜索应用,你觉得 1-bit 量化的精度够吗?


来源:Microsoft BitNet GitHub 发布时间:2026年3月11日

继续阅读 »

微软昨天在 GitHub 开源了 BitNet,这是一个能将大模型压缩到 1-bit 量化的项目。最惊人的是:100B 参数的模型可以在普通消费级 CPU 上运行,而且速度还挺快。

什么是 BitNet?

BitNet 的核心技术是 1-bit 量化(实际上是 1.58-bit,取值为 {-1, 0, +1})。传统的大模型参数通常是 16-bit 或 32-bit 浮点数,而 BitNet 把每个参数压缩到只有 3 个可能的值。

这意味着:

  • 内存占用减少 10 倍以上
  • 推理速度提升 2-4 倍
  • 能耗大幅降低

技术亮点

1. 三值量化(Ternary Quantization) 不是简单的二值(0/1),而是 {-1, 0, +1} 三值。这样保留了更多的表达能力,同时仍然极度压缩。

2. 激活感知的权重量化 传统的量化在训练后做,会损失精度。BitNet 在训练过程中就考虑量化,让模型学会"适应"低精度表示。

3. 优化的 CPU 内核 微软专门为 1-bit 运算写了优化的 CPU 内核,在 ARM 和 x86 上都有很好的性能。

性能数据

根据官方 README 的数据:

模型 精度 内存 速度 (tokens/s)
Llama-3-8B (FP16) 基准 16GB 15
BitNet-8B 接近 1.2GB 45
BitNet-100B - 15GB 8

100B 模型只需要 15GB 内存,这意味着:

  • 32GB 内存的笔记本可以跑 100B 模型
  • 普通台式机可以跑 70B 级别的模型

实际意义

对开发者:

  • 本地部署大模型的门槛大幅降低
  • 不需要昂贵的 GPU,CPU 就能跑
  • 适合边缘设备、嵌入式场景

对行业:

  • 可能改变大模型的部署模式
  • 端侧 AI 应用会爆发
  • 云计算的成本结构可能改变

与搜索的结合

这对搜索技术有什么影响?

  1. 本地 Embedding 模型 - 可以在消费级设备上跑高质量的文本向量化
  2. 离线 RAG - 不需要联网,本地就能做检索增强生成
  3. 隐私搜索 - 敏感数据不需要发送到云端

试用方法

# 克隆仓库
git clone https://github.com/microsoft/BitNet.git
cd BitNet

# 安装依赖
pip install -r requirements.txt

# 下载模型
python setup/download_models.py --model bitnet_b1_58-large

# 运行推理
python run_inference.py --model bitnet_b1_58-large --prompt "你的问题"

局限性

当然,1-bit 量化也有代价:

  • 精度相比 FP16 还是有损失(但官方说接近)
  • 目前支持的模型架构有限
  • 训练新模型需要特殊流程

总结

BitNet 代表了一个重要趋势:模型压缩和效率优化。随着大模型越来越大,如何在资源受限的设备上运行它们变得越来越重要。微软这次开源,可能会加速端侧 AI 的普及。


你会尝试在本地部署 BitNet 吗?对于搜索应用,你觉得 1-bit 量化的精度够吗?


来源:Microsoft BitNet GitHub 发布时间:2026年3月11日

收起阅读 »

【AI搜索前沿】Perplexity 推出 Personal Computer:AI 搜索的终极形态?

Perplexity 昨晚悄然上线了一个新产品页面——Personal Computer,这可能就是 AI 搜索的下一个进化方向。

什么是 Personal Computer?

从官方页面的描述来看,这不是传统意义上的"个人电脑",而是一个AI 原生的计算环境

"A computer that actually understands you"

核心概念是:

  • 自然语言交互 - 用对话方式完成所有计算任务
  • 上下文感知 - 记住你的偏好、习惯、历史操作
  • 多模态处理 - 文本、代码、图像、数据统一处理
  • 实时联网 - 结合 Perplexity 的搜索能力,信息永远新鲜

perplexity-cover.jpg

与现有 AI 产品的区别

特性 ChatGPT Claude Perplexity PC
联网搜索 有限 ✅ 原生支持
实时信息 部分 ✅ 实时
个人记忆 有限 有限 ✅ 深度理解
代码执行 有(Artifacts) ✅ 集成环境

可能的应用场景

1. 研究助手 不再只是回答问题,而是能帮你:

  • 自动收集资料并整理成报告
  • 追踪某个话题的最新进展
  • 对比不同来源的观点

2. 编程伴侣

  • 理解整个代码库的上下文
  • 根据自然语言描述生成/修改代码
  • 自动调试和优化

3. 个人知识管理

  • 整合你所有的文档、笔记、书签
  • 用对话方式检索和关联信息
  • 自动生成知识图谱

为什么重要?

Perplexity 这次的动作暗示了一个趋势:AI 正在从"工具"变成"环境"

传统的搜索是"你问,它答",而 Personal Computer 可能是"它在旁边,随时帮忙"。这种形态更接近我们理想中的"智能助手"。

目前状态

目前还在 waitlist 阶段,需要申请早期访问。从 Hacker News 上的讨论来看,社区期待值很高。


你怎么看?AI 搜索的终极形态是"更好的搜索引擎",还是"理解你的个人计算环境"?


来源:Perplexity Personal Computer 发布时间:2026年3月11日

继续阅读 »

Perplexity 昨晚悄然上线了一个新产品页面——Personal Computer,这可能就是 AI 搜索的下一个进化方向。

什么是 Personal Computer?

从官方页面的描述来看,这不是传统意义上的"个人电脑",而是一个AI 原生的计算环境

"A computer that actually understands you"

核心概念是:

  • 自然语言交互 - 用对话方式完成所有计算任务
  • 上下文感知 - 记住你的偏好、习惯、历史操作
  • 多模态处理 - 文本、代码、图像、数据统一处理
  • 实时联网 - 结合 Perplexity 的搜索能力,信息永远新鲜

perplexity-cover.jpg

与现有 AI 产品的区别

特性 ChatGPT Claude Perplexity PC
联网搜索 有限 ✅ 原生支持
实时信息 部分 ✅ 实时
个人记忆 有限 有限 ✅ 深度理解
代码执行 有(Artifacts) ✅ 集成环境

可能的应用场景

1. 研究助手 不再只是回答问题,而是能帮你:

  • 自动收集资料并整理成报告
  • 追踪某个话题的最新进展
  • 对比不同来源的观点

2. 编程伴侣

  • 理解整个代码库的上下文
  • 根据自然语言描述生成/修改代码
  • 自动调试和优化

3. 个人知识管理

  • 整合你所有的文档、笔记、书签
  • 用对话方式检索和关联信息
  • 自动生成知识图谱

为什么重要?

Perplexity 这次的动作暗示了一个趋势:AI 正在从"工具"变成"环境"

传统的搜索是"你问,它答",而 Personal Computer 可能是"它在旁边,随时帮忙"。这种形态更接近我们理想中的"智能助手"。

目前状态

目前还在 waitlist 阶段,需要申请早期访问。从 Hacker News 上的讨论来看,社区期待值很高。


你怎么看?AI 搜索的终极形态是"更好的搜索引擎",还是"理解你的个人计算环境"?


来源:Perplexity Personal Computer 发布时间:2026年3月11日

收起阅读 »

如何使用极限网关实现 Elasticsearch 集群迁移至 Easysearch

之前有博客介绍过通过 Reindex 的方法将 Elasticsearch 的数据迁移到 Easysearch 集群,今天再介绍一个方法,通过 极限网关(INFINI Gateway) 来进行数据迁移。

测试环境

软件 版本
Easysearch 1.12.0
Elasticsearch 7.17.29
INFINI Gateway 1.29.2

迁移步骤

  1. 选定要迁移的索引
  2. 在目标集群建立索引的 mapping 和 setting
  3. 准备 INFINI Gateway 迁移配置
  4. 运行 INFINI Gateway 进行数据迁移

迁移实战

  1. 选定要迁移的索引

在 Elasticsearch 集群中选择目标索引:infinilabs 和 test1,没错,我们一次可以迁移多个。

  1. 在 Easysearch 集群使用源索引的 setting 和 mapping 建立目标索引。(略)
  2. INFINI Gateway 迁移配置准备

去 github 下载配置,修改下面的连接集群的部分

  1 env:
  2   LR_GATEWAY_API_HOST: 127.0.0.1:2900
  3   SRC_ELASTICSEARCH_ENDPOINT: http://127.0.0.1:9200
  4   DST_ELASTICSEARCH_ENDPOINT: http://127.0.0.1:9201
  5 path.data: data
  6 path.logs: log
  7 progress_bar.enabled: true
  8 configs.auto_reload: true
  9
 10 api:
 11   enabled: true
 12   network:
 13     binding: $[[env.LR_GATEWAY_API_HOST]]
 14
 15 elasticsearch:
 16   - name: source
 17     enabled: true
 18     endpoint: $[[env.SRC_ELASTICSEARCH_ENDPOINT]]
 19     basic_auth:
 20       username: elastic
 21       password: goodgoodstudy
 22
 23   - name: target
 24     enabled: true
 25     endpoint: $[[env.DST_ELASTICSEARCH_ENDPOINT]]
 26     basic_auth:
 27       username: admin
 28       password: 14da41c79ad2d744b90c

pipeline 部分修改要迁移的索引名称,我们迁移 infinilabs 和 test1 两个索引。

 31 pipeline:
 32   - name: source_scroll
 33     auto_start: true
 34     keep_running: false
 35     processor:
 36       - es_scroll:
 37           slice_size: 1
 38           batch_size: 5000
 39           indices: "infinilabs,test1"
 40           elasticsearch: source
 41           output_queue: source_index_dump
 42           partition_size: 1
 43           scroll_time: "5m"
  1. 迁移数据
./gateway-mac-arm64

#如果你保存的配置文件名称不叫 gateway.yml,则需要加参数 -config 文件名

数据导入完成后,网关 ctrl+c 退出。

至此,数据迁移就完成了。下一篇我们来介绍 INFINI Gateway 的数据比对功能。

有任何问题,欢迎加我微信沟通。

关于极限网关(INFINI Gateway)

INFINI Gateway 是一个开源的面向搜索场景的高性能数据网关,所有请求都经过网关处理后再转发到后端的搜索业务集群。基于 INFINI Gateway,可以实现索引级别的限速限流、常见查询的缓存加速、查询请求的审计、查询结果的动态修改等等。

官网文档:https://docs.infinilabs.com/gateway
开源地址:https://github.com/infinilabs/gateway

继续阅读 »

之前有博客介绍过通过 Reindex 的方法将 Elasticsearch 的数据迁移到 Easysearch 集群,今天再介绍一个方法,通过 极限网关(INFINI Gateway) 来进行数据迁移。

测试环境

软件 版本
Easysearch 1.12.0
Elasticsearch 7.17.29
INFINI Gateway 1.29.2

迁移步骤

  1. 选定要迁移的索引
  2. 在目标集群建立索引的 mapping 和 setting
  3. 准备 INFINI Gateway 迁移配置
  4. 运行 INFINI Gateway 进行数据迁移

迁移实战

  1. 选定要迁移的索引

在 Elasticsearch 集群中选择目标索引:infinilabs 和 test1,没错,我们一次可以迁移多个。

  1. 在 Easysearch 集群使用源索引的 setting 和 mapping 建立目标索引。(略)
  2. INFINI Gateway 迁移配置准备

去 github 下载配置,修改下面的连接集群的部分

  1 env:
  2   LR_GATEWAY_API_HOST: 127.0.0.1:2900
  3   SRC_ELASTICSEARCH_ENDPOINT: http://127.0.0.1:9200
  4   DST_ELASTICSEARCH_ENDPOINT: http://127.0.0.1:9201
  5 path.data: data
  6 path.logs: log
  7 progress_bar.enabled: true
  8 configs.auto_reload: true
  9
 10 api:
 11   enabled: true
 12   network:
 13     binding: $[[env.LR_GATEWAY_API_HOST]]
 14
 15 elasticsearch:
 16   - name: source
 17     enabled: true
 18     endpoint: $[[env.SRC_ELASTICSEARCH_ENDPOINT]]
 19     basic_auth:
 20       username: elastic
 21       password: goodgoodstudy
 22
 23   - name: target
 24     enabled: true
 25     endpoint: $[[env.DST_ELASTICSEARCH_ENDPOINT]]
 26     basic_auth:
 27       username: admin
 28       password: 14da41c79ad2d744b90c

pipeline 部分修改要迁移的索引名称,我们迁移 infinilabs 和 test1 两个索引。

 31 pipeline:
 32   - name: source_scroll
 33     auto_start: true
 34     keep_running: false
 35     processor:
 36       - es_scroll:
 37           slice_size: 1
 38           batch_size: 5000
 39           indices: "infinilabs,test1"
 40           elasticsearch: source
 41           output_queue: source_index_dump
 42           partition_size: 1
 43           scroll_time: "5m"
  1. 迁移数据
./gateway-mac-arm64

#如果你保存的配置文件名称不叫 gateway.yml,则需要加参数 -config 文件名

数据导入完成后,网关 ctrl+c 退出。

至此,数据迁移就完成了。下一篇我们来介绍 INFINI Gateway 的数据比对功能。

有任何问题,欢迎加我微信沟通。

关于极限网关(INFINI Gateway)

INFINI Gateway 是一个开源的面向搜索场景的高性能数据网关,所有请求都经过网关处理后再转发到后端的搜索业务集群。基于 INFINI Gateway,可以实现索引级别的限速限流、常见查询的缓存加速、查询请求的审计、查询结果的动态修改等等。

官网文档:https://docs.infinilabs.com/gateway
开源地址:https://github.com/infinilabs/gateway

收起阅读 »

搜索百科(3):Elasticsearch — 搜索界的“流量明星”

大家好,我是 INFINI Labs 的石阳。

欢迎关注 《搜索百科》 专栏!每天 5 分钟,带你速览一款搜索相关的技术或产品,同时还会带你探索它们背后的技术原理、发展故事及上手体验等。

前两篇我们探讨了搜索技术的基石 Apache Lucene企业级搜索解决方案 Apache Solr。今天,我们来聊聊一个真正改变搜索游戏规则,但也充满争议的产品 — Elasticsearch

引言

如果说 Lucene 是幕后英雄,那么 Elasticsearch 就是舞台中央的明星。借助 REST API、分布式架构、强大的生态系统,它让搜索 + 分析成为“马上可用”的服务形式。

在日志平台、可观察性、安全监控、AI 与语义检索等领域,Elasticsearch 的名字几乎成了默认选项。

Elasticsearch 概述

Elasticsearch 是一个开源的分布式搜索和分析引擎,构建于 Apache Lucene 之上。作为一个检索平台,它可以实时存储结构化、非结构化和向量数据,提供快速的混合和向量搜索,支持可观测性与安全分析,并以高性能、高准确性和高相关性实现 AI 驱动的应用。

起源:从食谱搜索到全球“流量明星”

Elasticsearch 的故事始于以色列开发者 Shay Banon。2010 年,当时他在学习厨师课程的妻子需要一款能够快速搜索食谱的工具。虽然当时已经有 Solr 这样的搜索解决方案,但 Shay 认为它们对于分布式场景的支持不够完善。

基于之前开发 Compass(一个基于 Lucene 的搜索库)的经验,Shay 开始构建一个完全分布式的、基于 JSON 的搜索引擎。2010 年 2 月,Elasticsearch 的第一个版本发布。

随着用户日益增多、企业级需求增强,Shay 在 2012 年创立了 Elastic 公司,把 Elasticsearch 不仅作为开源项目,也逐渐商业化运营起来,包括提供托管服务、企业支持,加入 Logstash 日志处理、Kibana 可视化工具等,Elastic 公司也逐渐从一个纯搜索引擎项目演变为一个更广泛的“数据搜索与分析”平台。

协议变更:开源和商业化的博弈

Elasticsearch 的发展并非一帆风顺。其历史上最具转折性的事件当属与 AWS 的冲突及随之而来的开源协议变更

  1. 早期:Apache 2.0 协议

2010 年 Shay Banon 开源 Elasticsearch 时,最初采用的是 Apache 2.0 协议。Apache 2.0 属于宽松的自由协议,允许任何人免费使用、修改和商用(包括 SaaS 模式)。这帮助 Elasticsearch 快速壮大,成为事实上的“搜索引擎标准”。

  1. 协议变更:应对云厂商“白嫖”

随着 Elasticsearch 的流行,像 AWS(Amazon Web Services) 等云厂商直接将 Elasticsearch 做成托管服务,并从中获利。Elastic 公司认为这损害了他们的商业利益,因为云厂商“用开源赚钱,却没有回馈社区”。2021 年 1 月,Elastic 宣布 Elasticsearch 和 Kibana 不再采用 Apache 2.0,改为 双重协议:SSPL + Elastic License。这一步导致社区巨大分裂,AWS 带头将 Elasticsearch 分叉为 OpenSearch,并继续以 Apache 2.0 协议维护。

  1. 再次转向开源:AGPL v3

2024 年 3 月,Elastic 宣布新的版本(Elasticsearch 8.13 起)又新增 AGPL v3 作为一个开源许可选项。AGPL v3 既符合 OSI 真正开源标准,又能约束云厂商闭源托管服务,同时修复社区关系,Elastic 希望通过重新拥抱开源,减少分裂,吸引开发者回归。

Elasticsearch 从宽松到收紧,再到回归开源,是在社区生态与商业利益间寻找平衡的过程。

基本概念

要学习 Elasticsearch,得先了解其五大基本概览:集群、节点、分片、索引和文档。

  1. 集群(Cluster)

由一个或多个节点组成的整体,提供统一的搜索与存储服务。对外看起来像一个单一系统。

  1. 节点(Node)

集群中的一台服务器实例。节点有不同角色:

  • Master 节点:负责集群管理(分片分配、元数据维护)。
  • Data 节点:存储数据、处理搜索和聚合。
  • Coordinating 节点:接收请求并调度任务。
  • Ingest 节点:负责数据写入前的预处理。
  1. 索引(Index)

类似于传统数据库的“库”,按逻辑组织数据。一个索引往往对应一个业务场景(如日志、商品信息)。

  1. 分片(Shard)

为了让索引能水平扩展,Elasticsearch 会把索引拆分为多个 主分片,并为每个主分片创建 副本分片,提升高可用和查询性能。

  1. 文档(Document)

Elasticsearch 存储和检索的最小数据单元,通常是 JSON 格式。多个文档组成一个索引。

集群架构

Elasticsearch 通过 Master、Data、Coordinating、Ingest 等不同角色节点的协作,将数据切分成分片并分布式存储,实现了高可用、可扩展的搜索与分析引擎架构。

快速开始:5 分钟体验 Elasticsearch

1. 使用 Docker 启动

# 拉取最新镜像
docker pull docker.elastic.co/elasticsearch/elasticsearch:9.1.3

# 启动单节点集群
docker run -d --name elasticsearch \
  -p 9200:9200 -p 9300:9300 \
  -e "discovery.type=single-node" \
  -e "xpack.security.enabled=false" \
  docker.elastic.co/elasticsearch/elasticsearch:9.1.3

2. 验证安装

# 检查集群状态
curl -X GET "http://localhost:9200/"

3. 索引文档

# 索引文档
curl -X POST "http://localhost:9200/myindex/_doc" -H 'Content-Type: application/json' -d'
{
"title": "Hello Elasticsearch",
"description": "An example document"
}'

3. 搜索文档

# 搜索文档
curl -X GET "http://localhost:9200/myindex/_search" -H 'Content-Type: application/json' -d'
{
  "query": {
    "match": {
      "title": "Hello"
    }
  }
}'

结语

Elasticsearch 是搜索与分析领域标杆性的产品。它将 Lucene 的能力包装起来,加上分布式、易用以及与数据可视化、安全监控等功能的整合,使搜索引擎从专业技术逐渐变为“随手可用”的基础设施。

虽然协议变动、与 OpenSearch 的分叉引发争议,但它在企业与开发者群体中的实际应用价值依然难以替代。


🚀 下期预告

下一篇我们将介绍 OpenSearch,探讨这个 Elasticsearch 分支项目的发展现状、技术特点以及与 Elasticsearch 的详细对比。如果您有特别关注的问题,欢迎提前提出!

💬 三连互动

  1. 你或公司最近在用 Elasticsearch 吗?拿来做了什么场景?
  2. 在 Elasticsearch 和 OpenSearch 之间做过技术选型?
  3. 对 Elasticsearch 的许可证变化有什么看法?

对搜索技术感兴趣的朋友,也欢迎加我微信(ID:lsy965145175)备注“搜索百科”,拉你进  搜索技术交流群,一起探讨与学习!

✨ 推荐阅读

🔗 参考

原文:https://infinilabs.cn/blog/2025/search-wiki-3-elasticsearch/

继续阅读 »

大家好,我是 INFINI Labs 的石阳。

欢迎关注 《搜索百科》 专栏!每天 5 分钟,带你速览一款搜索相关的技术或产品,同时还会带你探索它们背后的技术原理、发展故事及上手体验等。

前两篇我们探讨了搜索技术的基石 Apache Lucene企业级搜索解决方案 Apache Solr。今天,我们来聊聊一个真正改变搜索游戏规则,但也充满争议的产品 — Elasticsearch

引言

如果说 Lucene 是幕后英雄,那么 Elasticsearch 就是舞台中央的明星。借助 REST API、分布式架构、强大的生态系统,它让搜索 + 分析成为“马上可用”的服务形式。

在日志平台、可观察性、安全监控、AI 与语义检索等领域,Elasticsearch 的名字几乎成了默认选项。

Elasticsearch 概述

Elasticsearch 是一个开源的分布式搜索和分析引擎,构建于 Apache Lucene 之上。作为一个检索平台,它可以实时存储结构化、非结构化和向量数据,提供快速的混合和向量搜索,支持可观测性与安全分析,并以高性能、高准确性和高相关性实现 AI 驱动的应用。

起源:从食谱搜索到全球“流量明星”

Elasticsearch 的故事始于以色列开发者 Shay Banon。2010 年,当时他在学习厨师课程的妻子需要一款能够快速搜索食谱的工具。虽然当时已经有 Solr 这样的搜索解决方案,但 Shay 认为它们对于分布式场景的支持不够完善。

基于之前开发 Compass(一个基于 Lucene 的搜索库)的经验,Shay 开始构建一个完全分布式的、基于 JSON 的搜索引擎。2010 年 2 月,Elasticsearch 的第一个版本发布。

随着用户日益增多、企业级需求增强,Shay 在 2012 年创立了 Elastic 公司,把 Elasticsearch 不仅作为开源项目,也逐渐商业化运营起来,包括提供托管服务、企业支持,加入 Logstash 日志处理、Kibana 可视化工具等,Elastic 公司也逐渐从一个纯搜索引擎项目演变为一个更广泛的“数据搜索与分析”平台。

协议变更:开源和商业化的博弈

Elasticsearch 的发展并非一帆风顺。其历史上最具转折性的事件当属与 AWS 的冲突及随之而来的开源协议变更

  1. 早期:Apache 2.0 协议

2010 年 Shay Banon 开源 Elasticsearch 时,最初采用的是 Apache 2.0 协议。Apache 2.0 属于宽松的自由协议,允许任何人免费使用、修改和商用(包括 SaaS 模式)。这帮助 Elasticsearch 快速壮大,成为事实上的“搜索引擎标准”。

  1. 协议变更:应对云厂商“白嫖”

随着 Elasticsearch 的流行,像 AWS(Amazon Web Services) 等云厂商直接将 Elasticsearch 做成托管服务,并从中获利。Elastic 公司认为这损害了他们的商业利益,因为云厂商“用开源赚钱,却没有回馈社区”。2021 年 1 月,Elastic 宣布 Elasticsearch 和 Kibana 不再采用 Apache 2.0,改为 双重协议:SSPL + Elastic License。这一步导致社区巨大分裂,AWS 带头将 Elasticsearch 分叉为 OpenSearch,并继续以 Apache 2.0 协议维护。

  1. 再次转向开源:AGPL v3

2024 年 3 月,Elastic 宣布新的版本(Elasticsearch 8.13 起)又新增 AGPL v3 作为一个开源许可选项。AGPL v3 既符合 OSI 真正开源标准,又能约束云厂商闭源托管服务,同时修复社区关系,Elastic 希望通过重新拥抱开源,减少分裂,吸引开发者回归。

Elasticsearch 从宽松到收紧,再到回归开源,是在社区生态与商业利益间寻找平衡的过程。

基本概念

要学习 Elasticsearch,得先了解其五大基本概览:集群、节点、分片、索引和文档。

  1. 集群(Cluster)

由一个或多个节点组成的整体,提供统一的搜索与存储服务。对外看起来像一个单一系统。

  1. 节点(Node)

集群中的一台服务器实例。节点有不同角色:

  • Master 节点:负责集群管理(分片分配、元数据维护)。
  • Data 节点:存储数据、处理搜索和聚合。
  • Coordinating 节点:接收请求并调度任务。
  • Ingest 节点:负责数据写入前的预处理。
  1. 索引(Index)

类似于传统数据库的“库”,按逻辑组织数据。一个索引往往对应一个业务场景(如日志、商品信息)。

  1. 分片(Shard)

为了让索引能水平扩展,Elasticsearch 会把索引拆分为多个 主分片,并为每个主分片创建 副本分片,提升高可用和查询性能。

  1. 文档(Document)

Elasticsearch 存储和检索的最小数据单元,通常是 JSON 格式。多个文档组成一个索引。

集群架构

Elasticsearch 通过 Master、Data、Coordinating、Ingest 等不同角色节点的协作,将数据切分成分片并分布式存储,实现了高可用、可扩展的搜索与分析引擎架构。

快速开始:5 分钟体验 Elasticsearch

1. 使用 Docker 启动

# 拉取最新镜像
docker pull docker.elastic.co/elasticsearch/elasticsearch:9.1.3

# 启动单节点集群
docker run -d --name elasticsearch \
  -p 9200:9200 -p 9300:9300 \
  -e "discovery.type=single-node" \
  -e "xpack.security.enabled=false" \
  docker.elastic.co/elasticsearch/elasticsearch:9.1.3

2. 验证安装

# 检查集群状态
curl -X GET "http://localhost:9200/"

3. 索引文档

# 索引文档
curl -X POST "http://localhost:9200/myindex/_doc" -H 'Content-Type: application/json' -d'
{
"title": "Hello Elasticsearch",
"description": "An example document"
}'

3. 搜索文档

# 搜索文档
curl -X GET "http://localhost:9200/myindex/_search" -H 'Content-Type: application/json' -d'
{
  "query": {
    "match": {
      "title": "Hello"
    }
  }
}'

结语

Elasticsearch 是搜索与分析领域标杆性的产品。它将 Lucene 的能力包装起来,加上分布式、易用以及与数据可视化、安全监控等功能的整合,使搜索引擎从专业技术逐渐变为“随手可用”的基础设施。

虽然协议变动、与 OpenSearch 的分叉引发争议,但它在企业与开发者群体中的实际应用价值依然难以替代。


🚀 下期预告

下一篇我们将介绍 OpenSearch,探讨这个 Elasticsearch 分支项目的发展现状、技术特点以及与 Elasticsearch 的详细对比。如果您有特别关注的问题,欢迎提前提出!

💬 三连互动

  1. 你或公司最近在用 Elasticsearch 吗?拿来做了什么场景?
  2. 在 Elasticsearch 和 OpenSearch 之间做过技术选型?
  3. 对 Elasticsearch 的许可证变化有什么看法?

对搜索技术感兴趣的朋友,也欢迎加我微信(ID:lsy965145175)备注“搜索百科”,拉你进  搜索技术交流群,一起探讨与学习!

✨ 推荐阅读

🔗 参考

原文:https://infinilabs.cn/blog/2025/search-wiki-3-elasticsearch/

收起阅读 »

ES 调优帖:Gateway 批量写入性能优化实践

背景:bulk 优化的应用

在 ES 的写入优化里,bulk 操作被广泛地用于批量处理数据。bulk 操作允许用户一次提交多个数据操作,如索引、更新、删除等,从而提高数据处理效率。bulk 操作的实现原理是,将数据操作请求打包成 HTTP 请求,并批量提交给 Elasticsearch 服务器。这样,Elasticsearch 服务器就可以一次处理多个数据操作,从而提高处理效率。

这种优化的核心价值在于减少了网络往返的次数和连接建立的开销。每一次单独的写入操作都需要经历完整的请求-响应周期,而批量写入则是将多个操作打包在一起,用一次通信完成原本需要多次交互的工作。这不仅仅节省了时间,更重要的是释放了系统资源,让服务器能够专注于真正的数据处理,而不是频繁的协议握手和状态维护。

这样的批量请求的确是可以优化写入请求的效率,让 ES 集群获得更多的资源去做写入请求的集中处理。但是除了客户端与 ES 集群的通讯效率优化,还有其他中间过程能优化么?

Gateway 的优化点

bulk 的优化理念是将日常零散的写入需求做集中化的处理,尽量减低日常请求的损耗,完成资源最大化的利用。简而言之就是“好钢用在刀刃上”。

但是 ES 在收到 bulk 写入请求后,也是需要协调节点根据文档的 id 计算所属的分片来将数据分发到对应的数据节点的。这个过程也是有一定损耗的,如果 bulk 请求中数据分布的很散,每个分片都需要进行写入,原本 bulk 集中写入的需求优势则还是没有得到最理想化的提升。

gateway 的写入加速则对 bulk 的优化理念的最大化补全。

gateway 可以本地计算每个索引文档对应后端 Elasticsearch 集群的目标存放位置,从而能够精准的进行写入请求定位

在一批 bulk 请求中,可能存在多个后端节点的数据,bulk_reshuffle 过滤器用来将正常的 bulk 请求打散,按照目标节点或者分片进行拆分重新组装,避免 Elasticsearch 节点收到请求之后再次进行请求分发, 从而降低 Elasticsearch 集群间的流量和负载,也能避免单个节点成为热点瓶颈,确保各个数据节点的处理均衡,从而提升集群总体的索引吞吐能力。

整理的优化思路如下图:

优化实践

那我们来实践一下,看看 gateway 能提升多少的写入。

这里我们分 2 个测试场景:

  1. 基础集中写入测试,不带文档 id,直接批量写入。这个场景更像是日志或者监控数据采集的场景。
  2. 带文档 id 的写入测试,更偏向搜索场景或者大数据批同步的场景。

2 个场景都进行直接写入 ES 和 gateway 转发 ES 的效率比对。

测试材料除了需要备一个网关和一套 es 外,其余的内容如下:

测试索引 mapping 一致,名称区分:

PUT gateway_bulk_test
{
  "settings": {
    "number_of_shards": 6,
    "number_of_replicas": 0
  },
  "mappings": {
    "properties": {
      "timestamp": {
        "type": "date",
        "format": "strict_date_optional_time"
      },
      "field1": {
        "type": "keyword"
      },
      "field2": {
        "type": "keyword"
      },
      "field3": {
        "type": "keyword"
      },
      "field4": {
        "type": "integer"
      },
      "field5": {
        "type": "keyword"
      },
      "field6": {
        "type": "float"
      }
    }
  }
}

PUT bulk_test
{
  "settings": {
    "number_of_shards": 6,
    "number_of_replicas": 0
  },
  "mappings": {
    "properties": {
      "timestamp": {
        "type": "date",
        "format": "strict_date_optional_time"
      },
      "field1": {
        "type": "keyword"
      },
      "field2": {
        "type": "keyword"
      },
      "field3": {
        "type": "keyword"
      },
      "field4": {
        "type": "integer"
      },
      "field5": {
        "type": "keyword"
      },
      "field6": {
        "type": "float"
      }
    }
  }
}

gateway 的配置文件如下:

path.data: data
path.logs: log

entry:
  - name: my_es_entry
    enabled: true
    router: my_router
    max_concurrency: 200000
    network:
      binding: 0.0.0.0:8000

flow:
  - name: async_bulk
    filter:
      - bulk_reshuffle:
          when:
            contains:
              _ctx.request.path: /_bulk
          elasticsearch: prod
          level: node
          partition_size: 1
          fix_null_id: true
      - elasticsearch:
          elasticsearch: prod #elasticsearch configure reference name
          max_connection_per_node: 1000 #max tcp connection to upstream, default for all nodes
          max_response_size: -1 #default for all nodes
          balancer: weight
          refresh: # refresh upstream nodes list, need to enable this feature to use elasticsearch nodes auto discovery
            enabled: true
            interval: 60s
          filter:
            roles:
              exclude:
                - master

router:
  - name: my_router
    default_flow: async_bulk

elasticsearch:
  - name: prod
    enabled: true
    endpoints:
      - https://127.0.0.1:9221
      - https://127.0.0.1:9222
      - https://127.0.0.1:9223
    basic_auth:
      username: admin
      password: admin

pipeline:
  - name: bulk_request_ingest
    auto_start: true
    keep_running: true
    retry_delay_in_ms: 1000
    processor:
      - bulk_indexing:
          max_connection_per_node: 100
          num_of_slices: 3
          max_worker_size: 30
          idle_timeout_in_seconds: 10
          bulk:
            compress: false
            batch_size_in_mb: 10
            batch_size_in_docs: 10000
          consumer:
            fetch_max_messages: 100
          queue_selector:
            labels:
              type: bulk_reshuffle

测试脚本如下:

#!/usr/bin/env python3
"""
ES Bulk写入性能测试脚本

"""

import hashlib
import json
import random
import string
import time
from typing import List, Dict, Any

import requests
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
import urllib3

# 禁用SSL警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

class ESBulkTester:
    def __init__(self):
        # 配置变量 - 可修改
        self.es_configs = [
            {
                "name": "ES直连",
                "url": "https://127.0.0.1:9221",
                "index": "bulk_test",
                "username": "admin",  # 修改为实际用户名
                "password": "admin",  # 修改为实际密码
                "verify_ssl": False  # HTTPS需要SSL验证
            },
            {
                "name": "Gateway代理",
                "url": "http://localhost:8000",
                "index": "gateway_bulk_test",
                "username": None,  # 无需认证
                "password": None,
                "verify_ssl": False
            }
        ]
        self.batch_size = 10000  # 每次bulk写入条数
        self.log_interval = 100000  # 每多少次bulk写入输出日志

        # ID生成规则配置 - 前2位后5位
        self.id_prefix_start = 1
        self.id_prefix_end = 999      # 前3位: 01-999
        self.id_suffix_start = 1
        self.id_suffix_end = 9999   # 后4位: 0001-9999

        # 当前ID计数器
        self.current_prefix = self.id_prefix_start
        self.current_suffix = self.id_suffix_start

    def generate_id(self) -> str:
        """生成固定规则的ID - 前2位后5位"""
        id_str = f"{self.current_prefix:02d}{self.current_suffix:05d}"

        # 更新计数器
        self.current_suffix += 1
        if self.current_suffix > self.id_suffix_end:
            self.current_suffix = self.id_suffix_start
            self.current_prefix += 1
            if self.current_prefix > self.id_prefix_end:
                self.current_prefix = self.id_prefix_start

        return id_str

    def generate_random_hash(self, length: int = 32) -> str:
        """生成随机hash值"""
        random_string = ''.join(random.choices(string.ascii_letters + string.digits, k=length))
        return hashlib.md5(random_string.encode()).hexdigest()

    def generate_document(self) -> Dict[str, Any]:
        """生成随机文档内容"""
        return {
            "timestamp": datetime.now().isoformat(),
            "field1": self.generate_random_hash(),
            "field2": self.generate_random_hash(),
            "field3": self.generate_random_hash(),
            "field4": random.randint(1, 1000),
            "field5": random.choice(["A", "B", "C", "D"]),
            "field6": random.uniform(0.1, 100.0)
        }

    def create_bulk_payload(self, index_name: str) -> str:
        """创建bulk写入payload"""
        bulk_data = []

        for _ in range(self.batch_size):
            #doc_id = self.generate_id()
            doc = self.generate_document()

            # 添加index操作
            bulk_data.append(json.dumps({
                "index": {
                    "_index": index_name,
            #        "_id": doc_id
                }
            }))
            bulk_data.append(json.dumps(doc))

        return "\n".join(bulk_data) + "\n"

    def bulk_index(self, config: Dict[str, Any], payload: str) -> bool:
        """执行bulk写入"""
        url = f"{config['url']}/_bulk"
        headers = {
            "Content-Type": "application/x-ndjson"
        }

        # 设置认证信息
        auth = None
        if config.get('username') and config.get('password'):
            auth = (config['username'], config['password'])

        try:
            response = requests.post(
                url,
                data=payload,
                headers=headers,
                auth=auth,
                verify=config.get('verify_ssl', True),
                timeout=30
            )
            return response.status_code == 200
        except Exception as e:
            print(f"Bulk写入失败: {e}")
            return False

    def refresh_index(self, config: Dict[str, Any]) -> bool:
        """刷新索引"""
        url = f"{config['url']}/{config['index']}/_refresh"

        # 设置认证信息
        auth = None
        if config.get('username') and config.get('password'):
            auth = (config['username'], config['password'])

        try:
            response = requests.post(
                url,
                auth=auth,
                verify=config.get('verify_ssl', True),
                timeout=10
            )
            success = response.status_code == 200
            print(f"索引刷新{'成功' if success else '失败'}: {config['index']}")
            return success
        except Exception as e:
            print(f"索引刷新失败: {e}")
            return False

    def run_test(self, config: Dict[str, Any], round_num: int, total_iterations: int = 100000):
        """运行性能测试"""
        test_name = f"{config['name']}-第{round_num}轮"
        print(f"\n开始测试: {test_name}")
        print(f"ES地址: {config['url']}")
        print(f"索引名称: {config['index']}")
        print(f"认证: {'是' if config.get('username') else '否'}")
        print(f"每次bulk写入: {self.batch_size}条")
        print(f"总计划写入: {total_iterations * self.batch_size}条")
        print("-" * 50)

        start_time = time.time()
        success_count = 0
        error_count = 0

        for i in range(1, total_iterations + 1):
            payload = self.create_bulk_payload(config['index'])

            if self.bulk_index(config, payload):
                success_count += 1
            else:
                error_count += 1

            # 每N次输出日志
            if i % self.log_interval == 0:
                elapsed_time = time.time() - start_time
                rate = i / elapsed_time if elapsed_time > 0 else 0
                print(f"已完成 {i:,} 次bulk写入, 耗时: {elapsed_time:.2f}秒, 速率: {rate:.2f} bulk/秒")

        end_time = time.time()
        total_time = end_time - start_time
        total_docs = total_iterations * self.batch_size

        print(f"\n{test_name} 测试完成!")
        print(f"总耗时: {total_time:.2f}秒")
        print(f"成功bulk写入: {success_count:,}次")
        print(f"失败bulk写入: {error_count:,}次")
        print(f"总文档数: {total_docs:,}条")
        print(f"平均速率: {success_count/total_time:.2f} bulk/秒")
        print(f"文档写入速率: {total_docs/total_time:.2f} docs/秒")
        print("=" * 60)

        return {
            "test_name": test_name,
            "config_name": config['name'],
            "round": round_num,
            "es_url": config['url'],
            "index": config['index'],
            "total_time": total_time,
            "success_count": success_count,
            "error_count": error_count,
            "total_docs": total_docs,
            "bulk_rate": success_count/total_time,
            "doc_rate": total_docs/total_time
        }

    def run_comparison_test(self, total_iterations: int = 10000):
        """运行双地址对比测试"""
        print("ES Bulk写入性能测试开始")
        print(f"测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print("=" * 60)

        results = []
        rounds = 2  # 每个地址测试2轮

        # 循环测试所有配置
        for config in self.es_configs:
            print(f"\n开始测试配置: {config['name']}")
            print("*" * 40)

            for round_num in range(1, rounds + 1):
                # 运行测试
                result = self.run_test(config, round_num, total_iterations)
                results.append(result)

                # 每轮结束后刷新索引
                print(f"\n第{round_num}轮测试完成,执行索引刷新...")
                self.refresh_index(config)

                # 重置ID计数器
                if round_num == 1:
                    # 第1轮:使用初始ID范围(新增数据)
                    print("第1轮:新增数据模式")
                else:
                    # 第2轮:重复使用相同ID(更新数据模式)
                    print("第2轮:数据更新模式,复用第1轮ID")
                    self.current_prefix = self.id_prefix_start
                    self.current_suffix = self.id_suffix_start

                print(f"{config['name']} 第{round_num}轮测试结束\n")

        # 输出对比结果
        print("\n性能对比结果:")
        print("=" * 80)

        # 按配置分组显示结果
        config_results = {}
        for result in results:
            config_name = result['config_name']
            if config_name not in config_results:
                config_results[config_name] = []
            config_results[config_name].append(result)

        for config_name, rounds_data in config_results.items():
            print(f"\n{config_name}:")
            total_time = 0
            total_bulk_rate = 0
            total_doc_rate = 0

            for round_data in rounds_data:
                print(f"  第{round_data['round']}轮:")
                print(f"    耗时: {round_data['total_time']:.2f}秒")
                print(f"    Bulk速率: {round_data['bulk_rate']:.2f} bulk/秒")
                print(f"    文档速率: {round_data['doc_rate']:.2f} docs/秒")
                print(f"    成功率: {round_data['success_count']/(round_data['success_count']+round_data['error_count'])*100:.2f}%")

                total_time += round_data['total_time']
                total_bulk_rate += round_data['bulk_rate']
                total_doc_rate += round_data['doc_rate']

            avg_bulk_rate = total_bulk_rate / len(rounds_data)
            avg_doc_rate = total_doc_rate / len(rounds_data)

            print(f"  平均性能:")
            print(f"    总耗时: {total_time:.2f}秒")
            print(f"    平均Bulk速率: {avg_bulk_rate:.2f} bulk/秒")
            print(f"    平均文档速率: {avg_doc_rate:.2f} docs/秒")

        # 整体对比
        if len(config_results) >= 2:
            config_names = list(config_results.keys())
            config1_avg = sum([r['bulk_rate'] for r in config_results[config_names[0]]]) / len(config_results[config_names[0]])
            config2_avg = sum([r['bulk_rate'] for r in config_results[config_names[1]]]) / len(config_results[config_names[1]])

            if config1_avg > config2_avg:
                faster = config_names[0]
                rate_diff = config1_avg - config2_avg
            else:
                faster = config_names[1]
                rate_diff = config2_avg - config1_avg

            print(f"\n整体性能对比:")
            print(f"{faster} 平均性能更好,bulk速率高 {rate_diff:.2f} bulk/秒")
            print(f"性能提升: {(rate_diff/min(config1_avg, config2_avg)*100):.1f}%")

def main():
    """主函数"""
    tester = ESBulkTester()

    # 运行测试(每次bulk 1万条,300次bulk = 300万条文档)
    tester.run_comparison_test(total_iterations=300)

if __name__ == "__main__":
    main()

1. 日志场景:不带 id 写入

测试条件:

  1. bulk 写入数据不带文档 id
  2. 每批次 bulk 10000 条数据,总共写入 30w 数据

这里把

反馈结果:

性能对比结果:
================================================================================

ES直连:
  第1轮:
    耗时: 152.29秒
    Bulk速率: 1.97 bulk/秒
    文档速率: 19699.59 docs/秒
    成功率: 100.00%
  平均性能:
    总耗时: 152.29秒
    平均Bulk速率: 1.97 bulk/秒
    平均文档速率: 19699.59 docs/秒

Gateway代理:
  第1轮:
    耗时: 115.63秒
    Bulk速率: 2.59 bulk/秒
    文档速率: 25944.35 docs/秒
    成功率: 100.00%
  平均性能:
    总耗时: 115.63秒
    平均Bulk速率: 2.59 bulk/秒
    平均文档速率: 25944.35 docs/秒

整体性能对比:
Gateway代理 平均性能更好,bulk速率高 0.62 bulk/秒
性能提升: 31.7%

2. 业务场景:带文档 id 的写入

测试条件:

  1. bulk 写入数据带有文档 id,两次测试写入的文档 id 生成规则一致且重复。
  2. 每批次 bulk 10000 条数据,总共写入 30w 数据

这里把 py 脚本中 第 99 行 和 第 107 行的注释打开。

反馈结果:

性能对比结果:
================================================================================

ES直连:
  第1轮:
    耗时: 155.30秒
    Bulk速率: 1.93 bulk/秒
    文档速率: 19317.39 docs/秒
    成功率: 100.00%
  平均性能:
    总耗时: 155.30秒
    平均Bulk速率: 1.93 bulk/秒
    平均文档速率: 19317.39 docs/秒

Gateway代理:
  第1轮:
    耗时: 116.73秒
    Bulk速率: 2.57 bulk/秒
    文档速率: 25700.06 docs/秒
    成功率: 100.00%
  平均性能:
    总耗时: 116.73秒
    平均Bulk速率: 2.57 bulk/秒
    平均文档速率: 25700.06 docs/秒

整体性能对比:
Gateway代理 平均性能更好,bulk速率高 0.64 bulk/秒
性能提升: 33.0%

小结

不管是日志场景还是业务价值更重要的大数据或者搜索数据同步场景, gateway 的写入加速都能平稳的节省 25%-30% 的写入耗时。

关于极限网关(INFINI Gateway)

INFINI Gateway 是一个开源的面向搜索场景的高性能数据网关,所有请求都经过网关处理后再转发到后端的搜索业务集群。基于 INFINI Gateway,可以实现索引级别的限速限流、常见查询的缓存加速、查询请求的审计、查询结果的动态修改等等。

官网文档:https://docs.infinilabs.com/gateway
开源地址:https://github.com/infinilabs/gateway

作者:金多安,极限科技(INFINI Labs)搜索运维专家,Elastic 认证专家,搜索客社区日报责任编辑。一直从事与搜索运维相关的工作,日常会去挖掘 ES / Lucene 方向的搜索技术原理,保持搜索相关技术发展的关注。
原文:https://infinilabs.cn/blog/2025/gateway-bulk-write-performance-optimization/

继续阅读 »

背景:bulk 优化的应用

在 ES 的写入优化里,bulk 操作被广泛地用于批量处理数据。bulk 操作允许用户一次提交多个数据操作,如索引、更新、删除等,从而提高数据处理效率。bulk 操作的实现原理是,将数据操作请求打包成 HTTP 请求,并批量提交给 Elasticsearch 服务器。这样,Elasticsearch 服务器就可以一次处理多个数据操作,从而提高处理效率。

这种优化的核心价值在于减少了网络往返的次数和连接建立的开销。每一次单独的写入操作都需要经历完整的请求-响应周期,而批量写入则是将多个操作打包在一起,用一次通信完成原本需要多次交互的工作。这不仅仅节省了时间,更重要的是释放了系统资源,让服务器能够专注于真正的数据处理,而不是频繁的协议握手和状态维护。

这样的批量请求的确是可以优化写入请求的效率,让 ES 集群获得更多的资源去做写入请求的集中处理。但是除了客户端与 ES 集群的通讯效率优化,还有其他中间过程能优化么?

Gateway 的优化点

bulk 的优化理念是将日常零散的写入需求做集中化的处理,尽量减低日常请求的损耗,完成资源最大化的利用。简而言之就是“好钢用在刀刃上”。

但是 ES 在收到 bulk 写入请求后,也是需要协调节点根据文档的 id 计算所属的分片来将数据分发到对应的数据节点的。这个过程也是有一定损耗的,如果 bulk 请求中数据分布的很散,每个分片都需要进行写入,原本 bulk 集中写入的需求优势则还是没有得到最理想化的提升。

gateway 的写入加速则对 bulk 的优化理念的最大化补全。

gateway 可以本地计算每个索引文档对应后端 Elasticsearch 集群的目标存放位置,从而能够精准的进行写入请求定位

在一批 bulk 请求中,可能存在多个后端节点的数据,bulk_reshuffle 过滤器用来将正常的 bulk 请求打散,按照目标节点或者分片进行拆分重新组装,避免 Elasticsearch 节点收到请求之后再次进行请求分发, 从而降低 Elasticsearch 集群间的流量和负载,也能避免单个节点成为热点瓶颈,确保各个数据节点的处理均衡,从而提升集群总体的索引吞吐能力。

整理的优化思路如下图:

优化实践

那我们来实践一下,看看 gateway 能提升多少的写入。

这里我们分 2 个测试场景:

  1. 基础集中写入测试,不带文档 id,直接批量写入。这个场景更像是日志或者监控数据采集的场景。
  2. 带文档 id 的写入测试,更偏向搜索场景或者大数据批同步的场景。

2 个场景都进行直接写入 ES 和 gateway 转发 ES 的效率比对。

测试材料除了需要备一个网关和一套 es 外,其余的内容如下:

测试索引 mapping 一致,名称区分:

PUT gateway_bulk_test
{
  "settings": {
    "number_of_shards": 6,
    "number_of_replicas": 0
  },
  "mappings": {
    "properties": {
      "timestamp": {
        "type": "date",
        "format": "strict_date_optional_time"
      },
      "field1": {
        "type": "keyword"
      },
      "field2": {
        "type": "keyword"
      },
      "field3": {
        "type": "keyword"
      },
      "field4": {
        "type": "integer"
      },
      "field5": {
        "type": "keyword"
      },
      "field6": {
        "type": "float"
      }
    }
  }
}

PUT bulk_test
{
  "settings": {
    "number_of_shards": 6,
    "number_of_replicas": 0
  },
  "mappings": {
    "properties": {
      "timestamp": {
        "type": "date",
        "format": "strict_date_optional_time"
      },
      "field1": {
        "type": "keyword"
      },
      "field2": {
        "type": "keyword"
      },
      "field3": {
        "type": "keyword"
      },
      "field4": {
        "type": "integer"
      },
      "field5": {
        "type": "keyword"
      },
      "field6": {
        "type": "float"
      }
    }
  }
}

gateway 的配置文件如下:

path.data: data
path.logs: log

entry:
  - name: my_es_entry
    enabled: true
    router: my_router
    max_concurrency: 200000
    network:
      binding: 0.0.0.0:8000

flow:
  - name: async_bulk
    filter:
      - bulk_reshuffle:
          when:
            contains:
              _ctx.request.path: /_bulk
          elasticsearch: prod
          level: node
          partition_size: 1
          fix_null_id: true
      - elasticsearch:
          elasticsearch: prod #elasticsearch configure reference name
          max_connection_per_node: 1000 #max tcp connection to upstream, default for all nodes
          max_response_size: -1 #default for all nodes
          balancer: weight
          refresh: # refresh upstream nodes list, need to enable this feature to use elasticsearch nodes auto discovery
            enabled: true
            interval: 60s
          filter:
            roles:
              exclude:
                - master

router:
  - name: my_router
    default_flow: async_bulk

elasticsearch:
  - name: prod
    enabled: true
    endpoints:
      - https://127.0.0.1:9221
      - https://127.0.0.1:9222
      - https://127.0.0.1:9223
    basic_auth:
      username: admin
      password: admin

pipeline:
  - name: bulk_request_ingest
    auto_start: true
    keep_running: true
    retry_delay_in_ms: 1000
    processor:
      - bulk_indexing:
          max_connection_per_node: 100
          num_of_slices: 3
          max_worker_size: 30
          idle_timeout_in_seconds: 10
          bulk:
            compress: false
            batch_size_in_mb: 10
            batch_size_in_docs: 10000
          consumer:
            fetch_max_messages: 100
          queue_selector:
            labels:
              type: bulk_reshuffle

测试脚本如下:

#!/usr/bin/env python3
"""
ES Bulk写入性能测试脚本

"""

import hashlib
import json
import random
import string
import time
from typing import List, Dict, Any

import requests
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
import urllib3

# 禁用SSL警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

class ESBulkTester:
    def __init__(self):
        # 配置变量 - 可修改
        self.es_configs = [
            {
                "name": "ES直连",
                "url": "https://127.0.0.1:9221",
                "index": "bulk_test",
                "username": "admin",  # 修改为实际用户名
                "password": "admin",  # 修改为实际密码
                "verify_ssl": False  # HTTPS需要SSL验证
            },
            {
                "name": "Gateway代理",
                "url": "http://localhost:8000",
                "index": "gateway_bulk_test",
                "username": None,  # 无需认证
                "password": None,
                "verify_ssl": False
            }
        ]
        self.batch_size = 10000  # 每次bulk写入条数
        self.log_interval = 100000  # 每多少次bulk写入输出日志

        # ID生成规则配置 - 前2位后5位
        self.id_prefix_start = 1
        self.id_prefix_end = 999      # 前3位: 01-999
        self.id_suffix_start = 1
        self.id_suffix_end = 9999   # 后4位: 0001-9999

        # 当前ID计数器
        self.current_prefix = self.id_prefix_start
        self.current_suffix = self.id_suffix_start

    def generate_id(self) -> str:
        """生成固定规则的ID - 前2位后5位"""
        id_str = f"{self.current_prefix:02d}{self.current_suffix:05d}"

        # 更新计数器
        self.current_suffix += 1
        if self.current_suffix > self.id_suffix_end:
            self.current_suffix = self.id_suffix_start
            self.current_prefix += 1
            if self.current_prefix > self.id_prefix_end:
                self.current_prefix = self.id_prefix_start

        return id_str

    def generate_random_hash(self, length: int = 32) -> str:
        """生成随机hash值"""
        random_string = ''.join(random.choices(string.ascii_letters + string.digits, k=length))
        return hashlib.md5(random_string.encode()).hexdigest()

    def generate_document(self) -> Dict[str, Any]:
        """生成随机文档内容"""
        return {
            "timestamp": datetime.now().isoformat(),
            "field1": self.generate_random_hash(),
            "field2": self.generate_random_hash(),
            "field3": self.generate_random_hash(),
            "field4": random.randint(1, 1000),
            "field5": random.choice(["A", "B", "C", "D"]),
            "field6": random.uniform(0.1, 100.0)
        }

    def create_bulk_payload(self, index_name: str) -> str:
        """创建bulk写入payload"""
        bulk_data = []

        for _ in range(self.batch_size):
            #doc_id = self.generate_id()
            doc = self.generate_document()

            # 添加index操作
            bulk_data.append(json.dumps({
                "index": {
                    "_index": index_name,
            #        "_id": doc_id
                }
            }))
            bulk_data.append(json.dumps(doc))

        return "\n".join(bulk_data) + "\n"

    def bulk_index(self, config: Dict[str, Any], payload: str) -> bool:
        """执行bulk写入"""
        url = f"{config['url']}/_bulk"
        headers = {
            "Content-Type": "application/x-ndjson"
        }

        # 设置认证信息
        auth = None
        if config.get('username') and config.get('password'):
            auth = (config['username'], config['password'])

        try:
            response = requests.post(
                url,
                data=payload,
                headers=headers,
                auth=auth,
                verify=config.get('verify_ssl', True),
                timeout=30
            )
            return response.status_code == 200
        except Exception as e:
            print(f"Bulk写入失败: {e}")
            return False

    def refresh_index(self, config: Dict[str, Any]) -> bool:
        """刷新索引"""
        url = f"{config['url']}/{config['index']}/_refresh"

        # 设置认证信息
        auth = None
        if config.get('username') and config.get('password'):
            auth = (config['username'], config['password'])

        try:
            response = requests.post(
                url,
                auth=auth,
                verify=config.get('verify_ssl', True),
                timeout=10
            )
            success = response.status_code == 200
            print(f"索引刷新{'成功' if success else '失败'}: {config['index']}")
            return success
        except Exception as e:
            print(f"索引刷新失败: {e}")
            return False

    def run_test(self, config: Dict[str, Any], round_num: int, total_iterations: int = 100000):
        """运行性能测试"""
        test_name = f"{config['name']}-第{round_num}轮"
        print(f"\n开始测试: {test_name}")
        print(f"ES地址: {config['url']}")
        print(f"索引名称: {config['index']}")
        print(f"认证: {'是' if config.get('username') else '否'}")
        print(f"每次bulk写入: {self.batch_size}条")
        print(f"总计划写入: {total_iterations * self.batch_size}条")
        print("-" * 50)

        start_time = time.time()
        success_count = 0
        error_count = 0

        for i in range(1, total_iterations + 1):
            payload = self.create_bulk_payload(config['index'])

            if self.bulk_index(config, payload):
                success_count += 1
            else:
                error_count += 1

            # 每N次输出日志
            if i % self.log_interval == 0:
                elapsed_time = time.time() - start_time
                rate = i / elapsed_time if elapsed_time > 0 else 0
                print(f"已完成 {i:,} 次bulk写入, 耗时: {elapsed_time:.2f}秒, 速率: {rate:.2f} bulk/秒")

        end_time = time.time()
        total_time = end_time - start_time
        total_docs = total_iterations * self.batch_size

        print(f"\n{test_name} 测试完成!")
        print(f"总耗时: {total_time:.2f}秒")
        print(f"成功bulk写入: {success_count:,}次")
        print(f"失败bulk写入: {error_count:,}次")
        print(f"总文档数: {total_docs:,}条")
        print(f"平均速率: {success_count/total_time:.2f} bulk/秒")
        print(f"文档写入速率: {total_docs/total_time:.2f} docs/秒")
        print("=" * 60)

        return {
            "test_name": test_name,
            "config_name": config['name'],
            "round": round_num,
            "es_url": config['url'],
            "index": config['index'],
            "total_time": total_time,
            "success_count": success_count,
            "error_count": error_count,
            "total_docs": total_docs,
            "bulk_rate": success_count/total_time,
            "doc_rate": total_docs/total_time
        }

    def run_comparison_test(self, total_iterations: int = 10000):
        """运行双地址对比测试"""
        print("ES Bulk写入性能测试开始")
        print(f"测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print("=" * 60)

        results = []
        rounds = 2  # 每个地址测试2轮

        # 循环测试所有配置
        for config in self.es_configs:
            print(f"\n开始测试配置: {config['name']}")
            print("*" * 40)

            for round_num in range(1, rounds + 1):
                # 运行测试
                result = self.run_test(config, round_num, total_iterations)
                results.append(result)

                # 每轮结束后刷新索引
                print(f"\n第{round_num}轮测试完成,执行索引刷新...")
                self.refresh_index(config)

                # 重置ID计数器
                if round_num == 1:
                    # 第1轮:使用初始ID范围(新增数据)
                    print("第1轮:新增数据模式")
                else:
                    # 第2轮:重复使用相同ID(更新数据模式)
                    print("第2轮:数据更新模式,复用第1轮ID")
                    self.current_prefix = self.id_prefix_start
                    self.current_suffix = self.id_suffix_start

                print(f"{config['name']} 第{round_num}轮测试结束\n")

        # 输出对比结果
        print("\n性能对比结果:")
        print("=" * 80)

        # 按配置分组显示结果
        config_results = {}
        for result in results:
            config_name = result['config_name']
            if config_name not in config_results:
                config_results[config_name] = []
            config_results[config_name].append(result)

        for config_name, rounds_data in config_results.items():
            print(f"\n{config_name}:")
            total_time = 0
            total_bulk_rate = 0
            total_doc_rate = 0

            for round_data in rounds_data:
                print(f"  第{round_data['round']}轮:")
                print(f"    耗时: {round_data['total_time']:.2f}秒")
                print(f"    Bulk速率: {round_data['bulk_rate']:.2f} bulk/秒")
                print(f"    文档速率: {round_data['doc_rate']:.2f} docs/秒")
                print(f"    成功率: {round_data['success_count']/(round_data['success_count']+round_data['error_count'])*100:.2f}%")

                total_time += round_data['total_time']
                total_bulk_rate += round_data['bulk_rate']
                total_doc_rate += round_data['doc_rate']

            avg_bulk_rate = total_bulk_rate / len(rounds_data)
            avg_doc_rate = total_doc_rate / len(rounds_data)

            print(f"  平均性能:")
            print(f"    总耗时: {total_time:.2f}秒")
            print(f"    平均Bulk速率: {avg_bulk_rate:.2f} bulk/秒")
            print(f"    平均文档速率: {avg_doc_rate:.2f} docs/秒")

        # 整体对比
        if len(config_results) >= 2:
            config_names = list(config_results.keys())
            config1_avg = sum([r['bulk_rate'] for r in config_results[config_names[0]]]) / len(config_results[config_names[0]])
            config2_avg = sum([r['bulk_rate'] for r in config_results[config_names[1]]]) / len(config_results[config_names[1]])

            if config1_avg > config2_avg:
                faster = config_names[0]
                rate_diff = config1_avg - config2_avg
            else:
                faster = config_names[1]
                rate_diff = config2_avg - config1_avg

            print(f"\n整体性能对比:")
            print(f"{faster} 平均性能更好,bulk速率高 {rate_diff:.2f} bulk/秒")
            print(f"性能提升: {(rate_diff/min(config1_avg, config2_avg)*100):.1f}%")

def main():
    """主函数"""
    tester = ESBulkTester()

    # 运行测试(每次bulk 1万条,300次bulk = 300万条文档)
    tester.run_comparison_test(total_iterations=300)

if __name__ == "__main__":
    main()

1. 日志场景:不带 id 写入

测试条件:

  1. bulk 写入数据不带文档 id
  2. 每批次 bulk 10000 条数据,总共写入 30w 数据

这里把

反馈结果:

性能对比结果:
================================================================================

ES直连:
  第1轮:
    耗时: 152.29秒
    Bulk速率: 1.97 bulk/秒
    文档速率: 19699.59 docs/秒
    成功率: 100.00%
  平均性能:
    总耗时: 152.29秒
    平均Bulk速率: 1.97 bulk/秒
    平均文档速率: 19699.59 docs/秒

Gateway代理:
  第1轮:
    耗时: 115.63秒
    Bulk速率: 2.59 bulk/秒
    文档速率: 25944.35 docs/秒
    成功率: 100.00%
  平均性能:
    总耗时: 115.63秒
    平均Bulk速率: 2.59 bulk/秒
    平均文档速率: 25944.35 docs/秒

整体性能对比:
Gateway代理 平均性能更好,bulk速率高 0.62 bulk/秒
性能提升: 31.7%

2. 业务场景:带文档 id 的写入

测试条件:

  1. bulk 写入数据带有文档 id,两次测试写入的文档 id 生成规则一致且重复。
  2. 每批次 bulk 10000 条数据,总共写入 30w 数据

这里把 py 脚本中 第 99 行 和 第 107 行的注释打开。

反馈结果:

性能对比结果:
================================================================================

ES直连:
  第1轮:
    耗时: 155.30秒
    Bulk速率: 1.93 bulk/秒
    文档速率: 19317.39 docs/秒
    成功率: 100.00%
  平均性能:
    总耗时: 155.30秒
    平均Bulk速率: 1.93 bulk/秒
    平均文档速率: 19317.39 docs/秒

Gateway代理:
  第1轮:
    耗时: 116.73秒
    Bulk速率: 2.57 bulk/秒
    文档速率: 25700.06 docs/秒
    成功率: 100.00%
  平均性能:
    总耗时: 116.73秒
    平均Bulk速率: 2.57 bulk/秒
    平均文档速率: 25700.06 docs/秒

整体性能对比:
Gateway代理 平均性能更好,bulk速率高 0.64 bulk/秒
性能提升: 33.0%

小结

不管是日志场景还是业务价值更重要的大数据或者搜索数据同步场景, gateway 的写入加速都能平稳的节省 25%-30% 的写入耗时。

关于极限网关(INFINI Gateway)

INFINI Gateway 是一个开源的面向搜索场景的高性能数据网关,所有请求都经过网关处理后再转发到后端的搜索业务集群。基于 INFINI Gateway,可以实现索引级别的限速限流、常见查询的缓存加速、查询请求的审计、查询结果的动态修改等等。

官网文档:https://docs.infinilabs.com/gateway
开源地址:https://github.com/infinilabs/gateway

作者:金多安,极限科技(INFINI Labs)搜索运维专家,Elastic 认证专家,搜索客社区日报责任编辑。一直从事与搜索运维相关的工作,日常会去挖掘 ES / Lucene 方向的搜索技术原理,保持搜索相关技术发展的关注。
原文:https://infinilabs.cn/blog/2025/gateway-bulk-write-performance-optimization/

收起阅读 »

ES 调优帖:关于索引合并参数 index.merge.policy.deletePctAllowed 的取值优化

最近发现了 lucene 9.5 版本把 merge 策略的默认参数改了。

* GITHUB#11761: TieredMergePolicy now allowed a maximum allowable deletes percentage of down to 5%, and the default
   maximum allowable deletes percentage is changed from 33% to 20%. (Marc D'Mello)

也就是 index.merge.policy.deletePctAllowed 最小值可以取 5%(原来是 20%),而默认值为 20%(原来是 33%)。

这是一个控制索引中已删除文档的占比的参数,简单来说,调低这个参数能够降低存储大小,同时也需要更多的 cpu 和内存资源来完成这个调优。

通过这个帖子的讨论,大家可以发现,“实践出真知”,这次的参数调整是 lucene 社区对于用户积极反馈的采纳。因此,对于老版本的用户,也可以在 deletepct 比较高的场景下,调优这个参数,当然一切生产调整都需要经过测试

对于 ES 的新用户来说,这时候可能冒出了下面这些问题

  • 这个参数反馈的已删除文档占比 deletepct 是什么?
  • 它怎么计算的呢?较高的 deletepct 会有什么影响?
  • 较低的 deletepct 为什么会有更多的资源消耗?
  • 除了调优这个参数还有什么优化办法么?

伴随这些问题,来探讨一下这个参数的来源和作用。

deletePctAllowed:软删除的遗留

在 Lucene 中,软删除是一种标记文档以便后续逻辑删除的机制,而不是立即从索引中物理删除文档。

但是这些软删除文档又不是永久存在的,deletePctAllowed 表示索引中允许存在的软删除文档占总文档数的最大百分比。

当软删除文档的比例达到或超过 deletePctAllowed 所设定的阈值时,Lucene 会触发索引合并操作。这是因为在合并过程中,那些被软删除的文档会被物理地从索引中移除,从而减少索引的存储空间占用。

当 deletePctAllowed 设置过低时,会频繁触发索引合并,因合并操作需大量磁盘 I/O、CPU 和内存资源,会使写入性能显著下降,磁盘 I/O 压力增大。假设 deletePctAllowed 为 0,则每次写入都需要消耗额外的资源来做 segment 的合并。

deletePctAllowed 过高,索引会容纳大量软删除文档,占用过多磁盘空间,增加存储成本且可能导致磁盘空间不足。查询时要过滤大量软删除文档,使查询响应时间变长、性能下降。同时也观察到,在使用 soft-deleted 特性后,文档更新和 refresh 也会受到影响,deletePctAllowed 过高,文档更新/refresh 操作耗时也会明显上升。

deletePctAllowed 的实际效果

从上面的解释看,index.merge.policy.deletePctAllowed 这个参数仿佛并不难理解,但实际上这个参数是应用到各个 segment 级别的,并且 segment 对这个参数的触发条件也是有限制(过小的 segment 并不会因为这个参数触发合并操作)。在多分片多 segment 的条件下,索引对 deletePctAllowed 参数实际的应用效果并不完全一致。因此,可以做个实际测试来看 deletePctAllowed 对索引产生的效果。

这里创建一个一千万文档的索引,然后全量更新一遍,看最后 deletePctAllowed 会保留多少的被删除文档。

GET test_del/_count
{
  "count": 10000000,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  }
}

# 查看 delete 文档数量

GET test_del/_stats
···
    "primaries": {
      "docs": {
        "count": 10000000,
        "deleted": 0
      },
···

这里的 deletePctAllowed 还是使用的 33%。

更新任务命令:

POST test_del/_update_by_query?wait_for_completion=false
{
  "query": {
    "match_all": {}
  },
  "script": {
    "source": "ctx._source.field_name = 'new_value'",
    "lang": "painless"
  }
}

完成后,

# 任务状态
···
  "task": {
    "node": "28HymM3xTESGMPRD3LvtCg",
    "id": 10385666,
    "type": "transport",
    "action": "indices:data/write/update/byquery",
    "status": {
      "total": 10000000,
      "updated": 10000000,# 这里可以看到全量更新
      "created": 0,
      "deleted": 0,
      "batches": 10000,
      "version_conflicts": 0,
      "noops": 0,
      "retries": {
        "bulk": 0,
        "search": 0
      },
      "throttled_millis": 0,
      "requests_per_second": -1,
      "throttled_until_millis": 0
    }
···

# 索引的状态
GET test_del/_stats
···
  "_all": {
    "primaries": {
      "docs": {
        "count": 10000000,
        "deleted": 809782
      },
···

实际删除文档与非删除文档的比例为 8.09%。

现在尝试调低 index.merge.policy.deletes_pct_allowed到 20%。

PUT test_del/_settings
{"index.merge.policy.deletes_pct_allowed":20}

由于之前删除文档占比过低,调整参数并不会触发新的 merge,因此需要重新全量更新数据查看一下是否有改变。

最终得到的索引状态如下:

GET test_del/_stats
···
    "primaries": {
      "docs": {
        "count": 10000000,
        "deleted": 190458
      }
···

这次得到的实际删除文档与非删除文档的比例为 1.9%

deletes_pct_allowed 默认值的调整

上面提到 deletePctAllowed 设置过低时,会频繁触发索引合并,而合并任务的线程使用线程类型是 SCALING 的,是一种动态扩展使用 cpu 的策略。

那么,当 deletePctAllowed 设置过低时,merge 任务增加,cpu 线程使用增加。集群的 cpu 和磁盘的使用会随着写入增加,deletePctAllowed 降低产生了放大效果。

所以,在没有大量数据支撑的条件下,ES 的使用者们往往会选择业务低峰期使用 forcemerge 来降低文档删除比,因为 forcemerge 的线程类型是 fixed,并且为 1,对 cpu 和磁盘的压力更加可控,同时 forcemerge 的 deletePctAllowed 默认阈值是 10%,更加低。

而社区中,大家的实际反馈则更倾向使用较低的 deletePctAllowed 阈值,特别是小索引小写入的情况下。

并且提供了相应的测试结果

#### RUN 1
Test config:
Single node domain
Instance type: EC2 m5.4xlarge
Updates: 50% of the total request

Baseline:
OS_2.3
"index.merge.policy.deletes_pct_allowed" : "33.0"
Target:
OS_2.3
"index.merge.policy.deletes_pct_allowed" : "20.0"

| Metrics | Baseline | Target |
------------------------------------
| Store size | 39gb | 37gb |
| Deleted docs percent | 22% | 18% |
| Avg. CPU | (42 - 53)% | (43 - 55)% |
| Write throughput | 11 - 15 mbps | 11 - 17 mbps |
| Indexing latency | 0.15 - 0.36 ms | 0.15 - 0.39 ms |
| P90 search latency | 14.9 ms | 13.2 ms |
| P90 term query latency | 13.7 ms | 13.5 ms |

#### RUN 2
Test config:
Single node domain
Instance type: EC2 m5.4xlarge
Updates: 75% of the total request

Baseline:
OS_2.3
"index.merge.policy.deletes_pct_allowed" : "33.0"
Target:
OS_2.3
"index.merge.policy.deletes_pct_allowed" : "20.0"

| Metrics | Baseline | Target |
------------------------------------
| Store size | 19.4gb | 17.7gb |
| Deleted docs percent | 22.8% | 15% |
| Avg. CPU | (43 - 53)% | (46 - 53)% |
| Write throughput | 9 - 14.5 mbps | 10 - 15.9 mbps |
| Indexing latency | 0.14 - 0.33 ms | 0.14 - 0.28 ms |
| P90 search latency | 15.9 ms | 13.5 ms |
| P90 term query latency | 15.7 ms | 13.9 ms |

#### RUN 3
Test config:
Single node domain
Instance type: EC2 m5.4xlarge
Updates: 80% of the total request

Baseline:
OS_2.3
"index.merge.policy.deletes_pct_allowed" : "33.0"
Target:
OS_2.3
"index.merge.policy.deletes_pct_allowed" : "20.0"

| Metrics | Baseline | Target |
------------------------------------
| Store size | 15.9gb | 14.6gb |
| Deleted docs percent | 24% | 18% |
| Avg. CPU | (46 - 52)% | (47 - 52)% |
| Write throughput | 9 - 13 mbps | 10 - 15 mbps |
| Indexing latency | 0.14 - 0.28 ms | 0.13 - 0.26 ms |
| P90 search latency | 15.3 ms | 13.6 ms |
| P90 term query latency | 15.2 ms | 13.4 ms |

#### RUN 4
Test config:
Single node domain
Instance type: EC2 m5.2xlarge
Updates: 80% of the total request

Baseline:
OS_2.3
"index.merge.policy.deletes_pct_allowed" : "33.0"
Target:
OS_2.3
"index.merge.policy.deletes_pct_allowed" : "20.0"

| Metrics | Baseline | Target |
------------------------------------
| Store size | 21.6gb | 17.8gb |
| Deleted docs percent | 30% | 18% |
| Avg. CPU | (71 - 89)% | (83 - 90)% |
| Write throughput | 6 - 12 mbps | 10 - 15 mbps |
| indexing latency | 0.21 - 0.30 ms | 0.20 - 0.31 ms |
| P90 search latency | 15.4 ms | 16.3 ms |
| P90 term query latency | 15.4 ms | 14.8 ms |

在测试中给出的结论是:

  1. CPU 和 IO 吞吐量没有明显增加
  2. 由于索引中删除的文档数量较少,搜索延迟更少。
  3. 减少被删除文档占用的磁盘空间浪费

但是也需要注意,这里的测试索引和消耗资源并不大,有些业务量较大的索引还是需要重新做相关压力测试

另一种调优思路

那除了降低 deletePctAllowed 和使用 forcemerge,还有其他方法么?

这里一个pr,提供一个综合性的解决方案,作者把两个 merge 策略进行了合并,在主动合并的间隙添加 forcemerge 检测方法,遇到可执行的时间段(资源使用率低),主动发起对单个 segment 的 forcemerge,这里 segment 得删选大小更加低,这样对 forcemerge 的任务耗时也更低,最终减少索引的删除文档占比。

简单的理解就是,利用了集群资源的“碎片时间”去完成主动的 forcemerge。也是一种可控且优质的调优方式。

关于极限科技(INFINI Labs)

INFINI Labs

极限科技,全称极限数据(北京)科技有限公司,是一家专注于实时搜索与数据分析的软件公司。旗下品牌极限实验室(INFINI Labs)致力于打造极致易用的数据探索与分析体验。

极限科技是一支年轻的团队,采用天然分布式的方式来进行远程协作,员工分布在全球各地,希望通过努力成为中国乃至全球企业大数据实时搜索分析产品的首选,为中国技术品牌输出添砖加瓦。

官网:https://infinilabs.cn

作者:金多安,极限科技(INFINI Labs)搜索运维专家,Elastic 认证专家,搜索客社区日报责任编辑。一直从事与搜索运维相关的工作,日常会去挖掘 ES / Lucene 方向的搜索技术原理,保持搜索相关技术发展的关注。
原文:https://infinilabs.cn/blog/2025/index-merge-policy-deletepctallowed/

继续阅读 »

最近发现了 lucene 9.5 版本把 merge 策略的默认参数改了。

* GITHUB#11761: TieredMergePolicy now allowed a maximum allowable deletes percentage of down to 5%, and the default
   maximum allowable deletes percentage is changed from 33% to 20%. (Marc D'Mello)

也就是 index.merge.policy.deletePctAllowed 最小值可以取 5%(原来是 20%),而默认值为 20%(原来是 33%)。

这是一个控制索引中已删除文档的占比的参数,简单来说,调低这个参数能够降低存储大小,同时也需要更多的 cpu 和内存资源来完成这个调优。

通过这个帖子的讨论,大家可以发现,“实践出真知”,这次的参数调整是 lucene 社区对于用户积极反馈的采纳。因此,对于老版本的用户,也可以在 deletepct 比较高的场景下,调优这个参数,当然一切生产调整都需要经过测试

对于 ES 的新用户来说,这时候可能冒出了下面这些问题

  • 这个参数反馈的已删除文档占比 deletepct 是什么?
  • 它怎么计算的呢?较高的 deletepct 会有什么影响?
  • 较低的 deletepct 为什么会有更多的资源消耗?
  • 除了调优这个参数还有什么优化办法么?

伴随这些问题,来探讨一下这个参数的来源和作用。

deletePctAllowed:软删除的遗留

在 Lucene 中,软删除是一种标记文档以便后续逻辑删除的机制,而不是立即从索引中物理删除文档。

但是这些软删除文档又不是永久存在的,deletePctAllowed 表示索引中允许存在的软删除文档占总文档数的最大百分比。

当软删除文档的比例达到或超过 deletePctAllowed 所设定的阈值时,Lucene 会触发索引合并操作。这是因为在合并过程中,那些被软删除的文档会被物理地从索引中移除,从而减少索引的存储空间占用。

当 deletePctAllowed 设置过低时,会频繁触发索引合并,因合并操作需大量磁盘 I/O、CPU 和内存资源,会使写入性能显著下降,磁盘 I/O 压力增大。假设 deletePctAllowed 为 0,则每次写入都需要消耗额外的资源来做 segment 的合并。

deletePctAllowed 过高,索引会容纳大量软删除文档,占用过多磁盘空间,增加存储成本且可能导致磁盘空间不足。查询时要过滤大量软删除文档,使查询响应时间变长、性能下降。同时也观察到,在使用 soft-deleted 特性后,文档更新和 refresh 也会受到影响,deletePctAllowed 过高,文档更新/refresh 操作耗时也会明显上升。

deletePctAllowed 的实际效果

从上面的解释看,index.merge.policy.deletePctAllowed 这个参数仿佛并不难理解,但实际上这个参数是应用到各个 segment 级别的,并且 segment 对这个参数的触发条件也是有限制(过小的 segment 并不会因为这个参数触发合并操作)。在多分片多 segment 的条件下,索引对 deletePctAllowed 参数实际的应用效果并不完全一致。因此,可以做个实际测试来看 deletePctAllowed 对索引产生的效果。

这里创建一个一千万文档的索引,然后全量更新一遍,看最后 deletePctAllowed 会保留多少的被删除文档。

GET test_del/_count
{
  "count": 10000000,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  }
}

# 查看 delete 文档数量

GET test_del/_stats
···
    "primaries": {
      "docs": {
        "count": 10000000,
        "deleted": 0
      },
···

这里的 deletePctAllowed 还是使用的 33%。

更新任务命令:

POST test_del/_update_by_query?wait_for_completion=false
{
  "query": {
    "match_all": {}
  },
  "script": {
    "source": "ctx._source.field_name = 'new_value'",
    "lang": "painless"
  }
}

完成后,

# 任务状态
···
  "task": {
    "node": "28HymM3xTESGMPRD3LvtCg",
    "id": 10385666,
    "type": "transport",
    "action": "indices:data/write/update/byquery",
    "status": {
      "total": 10000000,
      "updated": 10000000,# 这里可以看到全量更新
      "created": 0,
      "deleted": 0,
      "batches": 10000,
      "version_conflicts": 0,
      "noops": 0,
      "retries": {
        "bulk": 0,
        "search": 0
      },
      "throttled_millis": 0,
      "requests_per_second": -1,
      "throttled_until_millis": 0
    }
···

# 索引的状态
GET test_del/_stats
···
  "_all": {
    "primaries": {
      "docs": {
        "count": 10000000,
        "deleted": 809782
      },
···

实际删除文档与非删除文档的比例为 8.09%。

现在尝试调低 index.merge.policy.deletes_pct_allowed到 20%。

PUT test_del/_settings
{"index.merge.policy.deletes_pct_allowed":20}

由于之前删除文档占比过低,调整参数并不会触发新的 merge,因此需要重新全量更新数据查看一下是否有改变。

最终得到的索引状态如下:

GET test_del/_stats
···
    "primaries": {
      "docs": {
        "count": 10000000,
        "deleted": 190458
      }
···

这次得到的实际删除文档与非删除文档的比例为 1.9%

deletes_pct_allowed 默认值的调整

上面提到 deletePctAllowed 设置过低时,会频繁触发索引合并,而合并任务的线程使用线程类型是 SCALING 的,是一种动态扩展使用 cpu 的策略。

那么,当 deletePctAllowed 设置过低时,merge 任务增加,cpu 线程使用增加。集群的 cpu 和磁盘的使用会随着写入增加,deletePctAllowed 降低产生了放大效果。

所以,在没有大量数据支撑的条件下,ES 的使用者们往往会选择业务低峰期使用 forcemerge 来降低文档删除比,因为 forcemerge 的线程类型是 fixed,并且为 1,对 cpu 和磁盘的压力更加可控,同时 forcemerge 的 deletePctAllowed 默认阈值是 10%,更加低。

而社区中,大家的实际反馈则更倾向使用较低的 deletePctAllowed 阈值,特别是小索引小写入的情况下。

并且提供了相应的测试结果

#### RUN 1
Test config:
Single node domain
Instance type: EC2 m5.4xlarge
Updates: 50% of the total request

Baseline:
OS_2.3
"index.merge.policy.deletes_pct_allowed" : "33.0"
Target:
OS_2.3
"index.merge.policy.deletes_pct_allowed" : "20.0"

| Metrics | Baseline | Target |
------------------------------------
| Store size | 39gb | 37gb |
| Deleted docs percent | 22% | 18% |
| Avg. CPU | (42 - 53)% | (43 - 55)% |
| Write throughput | 11 - 15 mbps | 11 - 17 mbps |
| Indexing latency | 0.15 - 0.36 ms | 0.15 - 0.39 ms |
| P90 search latency | 14.9 ms | 13.2 ms |
| P90 term query latency | 13.7 ms | 13.5 ms |

#### RUN 2
Test config:
Single node domain
Instance type: EC2 m5.4xlarge
Updates: 75% of the total request

Baseline:
OS_2.3
"index.merge.policy.deletes_pct_allowed" : "33.0"
Target:
OS_2.3
"index.merge.policy.deletes_pct_allowed" : "20.0"

| Metrics | Baseline | Target |
------------------------------------
| Store size | 19.4gb | 17.7gb |
| Deleted docs percent | 22.8% | 15% |
| Avg. CPU | (43 - 53)% | (46 - 53)% |
| Write throughput | 9 - 14.5 mbps | 10 - 15.9 mbps |
| Indexing latency | 0.14 - 0.33 ms | 0.14 - 0.28 ms |
| P90 search latency | 15.9 ms | 13.5 ms |
| P90 term query latency | 15.7 ms | 13.9 ms |

#### RUN 3
Test config:
Single node domain
Instance type: EC2 m5.4xlarge
Updates: 80% of the total request

Baseline:
OS_2.3
"index.merge.policy.deletes_pct_allowed" : "33.0"
Target:
OS_2.3
"index.merge.policy.deletes_pct_allowed" : "20.0"

| Metrics | Baseline | Target |
------------------------------------
| Store size | 15.9gb | 14.6gb |
| Deleted docs percent | 24% | 18% |
| Avg. CPU | (46 - 52)% | (47 - 52)% |
| Write throughput | 9 - 13 mbps | 10 - 15 mbps |
| Indexing latency | 0.14 - 0.28 ms | 0.13 - 0.26 ms |
| P90 search latency | 15.3 ms | 13.6 ms |
| P90 term query latency | 15.2 ms | 13.4 ms |

#### RUN 4
Test config:
Single node domain
Instance type: EC2 m5.2xlarge
Updates: 80% of the total request

Baseline:
OS_2.3
"index.merge.policy.deletes_pct_allowed" : "33.0"
Target:
OS_2.3
"index.merge.policy.deletes_pct_allowed" : "20.0"

| Metrics | Baseline | Target |
------------------------------------
| Store size | 21.6gb | 17.8gb |
| Deleted docs percent | 30% | 18% |
| Avg. CPU | (71 - 89)% | (83 - 90)% |
| Write throughput | 6 - 12 mbps | 10 - 15 mbps |
| indexing latency | 0.21 - 0.30 ms | 0.20 - 0.31 ms |
| P90 search latency | 15.4 ms | 16.3 ms |
| P90 term query latency | 15.4 ms | 14.8 ms |

在测试中给出的结论是:

  1. CPU 和 IO 吞吐量没有明显增加
  2. 由于索引中删除的文档数量较少,搜索延迟更少。
  3. 减少被删除文档占用的磁盘空间浪费

但是也需要注意,这里的测试索引和消耗资源并不大,有些业务量较大的索引还是需要重新做相关压力测试

另一种调优思路

那除了降低 deletePctAllowed 和使用 forcemerge,还有其他方法么?

这里一个pr,提供一个综合性的解决方案,作者把两个 merge 策略进行了合并,在主动合并的间隙添加 forcemerge 检测方法,遇到可执行的时间段(资源使用率低),主动发起对单个 segment 的 forcemerge,这里 segment 得删选大小更加低,这样对 forcemerge 的任务耗时也更低,最终减少索引的删除文档占比。

简单的理解就是,利用了集群资源的“碎片时间”去完成主动的 forcemerge。也是一种可控且优质的调优方式。

关于极限科技(INFINI Labs)

INFINI Labs

极限科技,全称极限数据(北京)科技有限公司,是一家专注于实时搜索与数据分析的软件公司。旗下品牌极限实验室(INFINI Labs)致力于打造极致易用的数据探索与分析体验。

极限科技是一支年轻的团队,采用天然分布式的方式来进行远程协作,员工分布在全球各地,希望通过努力成为中国乃至全球企业大数据实时搜索分析产品的首选,为中国技术品牌输出添砖加瓦。

官网:https://infinilabs.cn

作者:金多安,极限科技(INFINI Labs)搜索运维专家,Elastic 认证专家,搜索客社区日报责任编辑。一直从事与搜索运维相关的工作,日常会去挖掘 ES / Lucene 方向的搜索技术原理,保持搜索相关技术发展的关注。
原文:https://infinilabs.cn/blog/2025/index-merge-policy-deletepctallowed/

收起阅读 »

谈谈 ES 6.8 到 7.10 的功能变迁(6)- 其他

这是 ES 7.10 相较于 ES 6.8 新增内容的最后一篇,主要涉及算分方法和同义词加载的部分。

自定义算分:script_score 2.0

Elasticsearch 7.0 引入了新一代的函数分数功能,称为 script_score 查询。这一新功能提供了一种更简单、更灵活的方式来为每条记录生成排名分数。script_score 查询由一组函数构成,包括算术函数和距离函数,用户可以根据需要混合和匹配这些函数,以构建任意的分数计算逻辑。这种模块化的结构使得使用更加简便,同时也为更多用户提供了这一重要功能的访问权限。通过 script_score,用户可以根据复杂的业务逻辑自定义评分,而不仅仅依赖于传统的 TF-IDF 或 BM25 算法。例如,可以根据文档的地理位置、时间戳、或其他自定义字段的值来调整评分,从而更精确地控制搜索结果的排序。

script_score 是 ES 对 function score 功能的一个迭代替换。

常用函数

基本函数

用于对字段值或评分进行基本的数学运算。 doc[<field>].value 获取文档中某个字段的值。

  "script": {
    "source": "doc['price'].value * 1.2"
  }

算术运算

支持加 (+)、减 (-)、乘 (*)、除 (/)、取模 (%) 等操作。

  "script": {
    "source": "doc['price'].value + (doc['discount'].value * 0.5)"
  }

Saturation 函数

saturation 函数用于对字段值进行饱和处理,限制字段值对评分的影响范围。

"script": {
  "source": "saturation(doc['<field_name>'].value, <pivot>)"
}
  • <field_name>: 需要处理的字段。
  • <pivot>: 饱和点(pivot),当字段值达到该值时,评分增益趋于饱和。
//在这个示例中,`likes` 字段的值在达到 `100` 后,对评分的影响会趋于饱和。
{
  "query": {
    "script_score": {
      "query": {
        "match_all": {}
      },
      "script": {
        "source": "saturation(doc['likes'].value, 100)"
      }
    }
  }
}

Sigmoid 函数

sigmoid 函数用于对字段值进行 S 形曲线变换,平滑地调整字段值对评分的影响。

"script": {
  "source": "sigmoid(doc['<field_name>'].value, <pivot>, <exponent>)"
}
  • 需要处理的字段。
  • 中心点(pivot),S 形曲线的中点。
  • 指数,控制曲线的陡峭程度。
//在这个示例中,`likes` 字段的值在 `50` 附近对评分的影响最为显著,而随着值远离 `50`,影响会逐渐平滑。
{
  "query": {
    "script_score": {
      "query": {
        "match_all": {}
      },
      "script": {
        "source": "sigmoid(doc['likes'].value, 50, 0.5)"
      }
    }
  }
}

距离衰减函数

用于衰减计算地理位置的函数。

//相关函数
double decayGeoLinear(String originStr, String scaleStr, String offsetStr, double decay, GeoPoint docValue)
double decayGeoExp(String originStr, String scaleStr, String offsetStr, double decay, GeoPoint docValue)
double decayGeoGauss(String originStr, String scaleStr, String offsetStr, double decay, GeoPoint docValue)

"script" : {
    "source" : "decayGeoExp(params.origin, params.scale, params.offset, params.decay, doc['location'].value)",
    "params": {
        "origin": "40, -70.12",
        "scale": "200km",
        "offset": "0km",
        "decay" : 0.2
    }
}

数值衰减函数

用于衰减计算数值的函数。

//相关函数
double decayNumericLinear(double origin, double scale, double offset, double decay, double docValue)
double decayNumericExp(double origin, double scale, double offset, double decay, double docValue)
double decayNumericGauss(double origin, double scale, double offset, double decay, double docValue)

"script" : {
    "source" : "decayNumericLinear(params.origin, params.scale, params.offset, params.decay, doc['dval'].value)",
    "params": {
        "origin": 20,
        "scale": 10,
        "decay" : 0.5,
        "offset" : 0
    }
}

日期衰减函数

用于衰减计算日期的函数。

//相关函数
double decayDateLinear(String originStr, String scaleStr, String offsetStr, double decay, JodaCompatibleZonedDateTime docValueDate)
double decayDateExp(String originStr, String scaleStr, String offsetStr, double decay, JodaCompatibleZonedDateTime docValueDate)
double decayDateGauss(String originStr, String scaleStr, String offsetStr, double decay, JodaCompatibleZonedDateTime docValueDate)

"script" : {
    "source" : "decayDateGauss(params.origin, params.scale, params.offset, params.decay, doc['date'].value)",
    "params": {
        "origin": "2008-01-01T01:00:00Z",
        "scale": "1h",
        "offset" : "0",
        "decay" : 0.5
    }
}

随机函数

用于生成随机评分。 randomNotReproducible` 生成一个随机评分。

"script" : {
    "source" : "randomNotReproducible()"
    }

randomReproducible 使用种子值生成可重复的随机评分。

"script" : {
    "source" : "randomReproducible(Long.toString(doc['_seq_no'].value), 100)"
    }

字段值因子

用于根据字段值调整评分。 _field_valuefactor` 根据字段值调整评分。

"script" : {
    "source" : "Math.log10(doc['field'].value * params.factor)",
    params" : {
        "factor" : 5
        }
    }

其他实用函数

  • Math.log:计算对数,Math.log(doc['price'].value)
  • Math.sqrt:计算平方根,Math.sqrt(doc['popularity'].value)
  • Math.pow:计算幂次,Math.pow(doc['score'].value, 2)

同义词字段重加载

Elasticsearch 7.3 引入了同义词字段重加载功能,允许用户在更新同义词文件后,无需重新索引即可使更改生效。 这一功能极大地简化了同义词管理的流程,尤其是在需要频繁更新同义词的场景下。通过 _reload_search_analyzers API,用户可以重新加载指定索引的分词器,从而使新的同义词规则立即生效。 注意,虽然同义词词典能被热加载,但是已经生成的索引数据不会被修改。

测试代码

PUT /my_index
{
    "settings": {
        "index" : {
            "analysis" : {
                "analyzer" : {
                    "my_synonyms" : {
                        "tokenizer" : "whitespace",
                        "filter" : ["synonym"]
                    }
                },
                "filter" : {
                    "synonym" : {
                        "type" : "synonym_graph",
                        "synonyms_path" : "analysis/synonym.txt",
                        "updateable" : true
                    }
                }
            }
        }
    },
    "mappings": {
        "properties": {
            "text": {
                "type": "text",
                "analyzer" : "standard",
                "search_analyzer": "my_synonyms"
            }
        }
    }
}

POST /my_index/_reload_search_analyzers

执行上述请求后,Elasticsearch 会重新加载 my_index 索引的分析器,使最新的同义词规则生效。

case insensitive 参数

case_insensitive 参数允许用户在执行精确匹配查询时忽略大小写。 这一功能特别适用于需要处理大小写不敏感数据的场景,例如用户名、标签或分类代码等。通过设置 case_insensitivetrue,用户可以在不修改数据的情况下,实现对大小写不敏感的查询,从而简化查询逻辑并提高搜索的准确性。

测试代码

//在这个示例中,`term` 查询会匹配 `user` 字段值为 `JohnDoe`、`johndoe` 或 `JOHNDOE` 的文档,而忽略大小写差异。
{
  "query": {
    "term": {
      "user": {
        "value": "JohnDoe",
        "case_insensitive": true
      }
    }
  }
}

小结

Elasticsearch 作为一款强大的开源搜索和分析引擎,其版本的不断迭代带来了诸多显著的改进与优化。对比 Elasticsearch 6.8,Elasticsearch 7.10 在多个方面展现出了新的功能和特性,极大地提升了用户体验和系统性能。这系列文章简短的介绍了各个方面的新功能和优化,希望能给大家一定的帮助。

推荐阅读

关于极限科技(INFINI Labs)

INFINI Labs

极限科技,全称极限数据(北京)科技有限公司,是一家专注于实时搜索与数据分析的软件公司。旗下品牌极限实验室(INFINI Labs)致力于打造极致易用的数据探索与分析体验。

极限科技是一支年轻的团队,采用天然分布式的方式来进行远程协作,员工分布在全球各地,希望通过努力成为中国乃至全球企业大数据实时搜索分析产品的首选,为中国技术品牌输出添砖加瓦。

官网:https://infinilabs.cn

作者:金多安,极限科技(INFINI Labs)搜索运维专家,Elastic 认证专家,搜索客社区日报责任编辑。一直从事与搜索运维相关的工作,日常会去挖掘 ES / Lucene 方向的搜索技术原理,保持搜索相关技术发展的关注。
原文:https://infinilabs.cn/blog/2025/feature-evolution-from-elasticsearch-6.8-to-7.10-part-6/

继续阅读 »

这是 ES 7.10 相较于 ES 6.8 新增内容的最后一篇,主要涉及算分方法和同义词加载的部分。

自定义算分:script_score 2.0

Elasticsearch 7.0 引入了新一代的函数分数功能,称为 script_score 查询。这一新功能提供了一种更简单、更灵活的方式来为每条记录生成排名分数。script_score 查询由一组函数构成,包括算术函数和距离函数,用户可以根据需要混合和匹配这些函数,以构建任意的分数计算逻辑。这种模块化的结构使得使用更加简便,同时也为更多用户提供了这一重要功能的访问权限。通过 script_score,用户可以根据复杂的业务逻辑自定义评分,而不仅仅依赖于传统的 TF-IDF 或 BM25 算法。例如,可以根据文档的地理位置、时间戳、或其他自定义字段的值来调整评分,从而更精确地控制搜索结果的排序。

script_score 是 ES 对 function score 功能的一个迭代替换。

常用函数

基本函数

用于对字段值或评分进行基本的数学运算。 doc[<field>].value 获取文档中某个字段的值。

  "script": {
    "source": "doc['price'].value * 1.2"
  }

算术运算

支持加 (+)、减 (-)、乘 (*)、除 (/)、取模 (%) 等操作。

  "script": {
    "source": "doc['price'].value + (doc['discount'].value * 0.5)"
  }

Saturation 函数

saturation 函数用于对字段值进行饱和处理,限制字段值对评分的影响范围。

"script": {
  "source": "saturation(doc['<field_name>'].value, <pivot>)"
}
  • <field_name>: 需要处理的字段。
  • <pivot>: 饱和点(pivot),当字段值达到该值时,评分增益趋于饱和。
//在这个示例中,`likes` 字段的值在达到 `100` 后,对评分的影响会趋于饱和。
{
  "query": {
    "script_score": {
      "query": {
        "match_all": {}
      },
      "script": {
        "source": "saturation(doc['likes'].value, 100)"
      }
    }
  }
}

Sigmoid 函数

sigmoid 函数用于对字段值进行 S 形曲线变换,平滑地调整字段值对评分的影响。

"script": {
  "source": "sigmoid(doc['<field_name>'].value, <pivot>, <exponent>)"
}
  • 需要处理的字段。
  • 中心点(pivot),S 形曲线的中点。
  • 指数,控制曲线的陡峭程度。
//在这个示例中,`likes` 字段的值在 `50` 附近对评分的影响最为显著,而随着值远离 `50`,影响会逐渐平滑。
{
  "query": {
    "script_score": {
      "query": {
        "match_all": {}
      },
      "script": {
        "source": "sigmoid(doc['likes'].value, 50, 0.5)"
      }
    }
  }
}

距离衰减函数

用于衰减计算地理位置的函数。

//相关函数
double decayGeoLinear(String originStr, String scaleStr, String offsetStr, double decay, GeoPoint docValue)
double decayGeoExp(String originStr, String scaleStr, String offsetStr, double decay, GeoPoint docValue)
double decayGeoGauss(String originStr, String scaleStr, String offsetStr, double decay, GeoPoint docValue)

"script" : {
    "source" : "decayGeoExp(params.origin, params.scale, params.offset, params.decay, doc['location'].value)",
    "params": {
        "origin": "40, -70.12",
        "scale": "200km",
        "offset": "0km",
        "decay" : 0.2
    }
}

数值衰减函数

用于衰减计算数值的函数。

//相关函数
double decayNumericLinear(double origin, double scale, double offset, double decay, double docValue)
double decayNumericExp(double origin, double scale, double offset, double decay, double docValue)
double decayNumericGauss(double origin, double scale, double offset, double decay, double docValue)

"script" : {
    "source" : "decayNumericLinear(params.origin, params.scale, params.offset, params.decay, doc['dval'].value)",
    "params": {
        "origin": 20,
        "scale": 10,
        "decay" : 0.5,
        "offset" : 0
    }
}

日期衰减函数

用于衰减计算日期的函数。

//相关函数
double decayDateLinear(String originStr, String scaleStr, String offsetStr, double decay, JodaCompatibleZonedDateTime docValueDate)
double decayDateExp(String originStr, String scaleStr, String offsetStr, double decay, JodaCompatibleZonedDateTime docValueDate)
double decayDateGauss(String originStr, String scaleStr, String offsetStr, double decay, JodaCompatibleZonedDateTime docValueDate)

"script" : {
    "source" : "decayDateGauss(params.origin, params.scale, params.offset, params.decay, doc['date'].value)",
    "params": {
        "origin": "2008-01-01T01:00:00Z",
        "scale": "1h",
        "offset" : "0",
        "decay" : 0.5
    }
}

随机函数

用于生成随机评分。 randomNotReproducible` 生成一个随机评分。

"script" : {
    "source" : "randomNotReproducible()"
    }

randomReproducible 使用种子值生成可重复的随机评分。

"script" : {
    "source" : "randomReproducible(Long.toString(doc['_seq_no'].value), 100)"
    }

字段值因子

用于根据字段值调整评分。 _field_valuefactor` 根据字段值调整评分。

"script" : {
    "source" : "Math.log10(doc['field'].value * params.factor)",
    params" : {
        "factor" : 5
        }
    }

其他实用函数

  • Math.log:计算对数,Math.log(doc['price'].value)
  • Math.sqrt:计算平方根,Math.sqrt(doc['popularity'].value)
  • Math.pow:计算幂次,Math.pow(doc['score'].value, 2)

同义词字段重加载

Elasticsearch 7.3 引入了同义词字段重加载功能,允许用户在更新同义词文件后,无需重新索引即可使更改生效。 这一功能极大地简化了同义词管理的流程,尤其是在需要频繁更新同义词的场景下。通过 _reload_search_analyzers API,用户可以重新加载指定索引的分词器,从而使新的同义词规则立即生效。 注意,虽然同义词词典能被热加载,但是已经生成的索引数据不会被修改。

测试代码

PUT /my_index
{
    "settings": {
        "index" : {
            "analysis" : {
                "analyzer" : {
                    "my_synonyms" : {
                        "tokenizer" : "whitespace",
                        "filter" : ["synonym"]
                    }
                },
                "filter" : {
                    "synonym" : {
                        "type" : "synonym_graph",
                        "synonyms_path" : "analysis/synonym.txt",
                        "updateable" : true
                    }
                }
            }
        }
    },
    "mappings": {
        "properties": {
            "text": {
                "type": "text",
                "analyzer" : "standard",
                "search_analyzer": "my_synonyms"
            }
        }
    }
}

POST /my_index/_reload_search_analyzers

执行上述请求后,Elasticsearch 会重新加载 my_index 索引的分析器,使最新的同义词规则生效。

case insensitive 参数

case_insensitive 参数允许用户在执行精确匹配查询时忽略大小写。 这一功能特别适用于需要处理大小写不敏感数据的场景,例如用户名、标签或分类代码等。通过设置 case_insensitivetrue,用户可以在不修改数据的情况下,实现对大小写不敏感的查询,从而简化查询逻辑并提高搜索的准确性。

测试代码

//在这个示例中,`term` 查询会匹配 `user` 字段值为 `JohnDoe`、`johndoe` 或 `JOHNDOE` 的文档,而忽略大小写差异。
{
  "query": {
    "term": {
      "user": {
        "value": "JohnDoe",
        "case_insensitive": true
      }
    }
  }
}

小结

Elasticsearch 作为一款强大的开源搜索和分析引擎,其版本的不断迭代带来了诸多显著的改进与优化。对比 Elasticsearch 6.8,Elasticsearch 7.10 在多个方面展现出了新的功能和特性,极大地提升了用户体验和系统性能。这系列文章简短的介绍了各个方面的新功能和优化,希望能给大家一定的帮助。

推荐阅读

关于极限科技(INFINI Labs)

INFINI Labs

极限科技,全称极限数据(北京)科技有限公司,是一家专注于实时搜索与数据分析的软件公司。旗下品牌极限实验室(INFINI Labs)致力于打造极致易用的数据探索与分析体验。

极限科技是一支年轻的团队,采用天然分布式的方式来进行远程协作,员工分布在全球各地,希望通过努力成为中国乃至全球企业大数据实时搜索分析产品的首选,为中国技术品牌输出添砖加瓦。

官网:https://infinilabs.cn

作者:金多安,极限科技(INFINI Labs)搜索运维专家,Elastic 认证专家,搜索客社区日报责任编辑。一直从事与搜索运维相关的工作,日常会去挖掘 ES / Lucene 方向的搜索技术原理,保持搜索相关技术发展的关注。
原文:https://infinilabs.cn/blog/2025/feature-evolution-from-elasticsearch-6.8-to-7.10-part-6/

收起阅读 »

谈谈 ES 6.8 到 7.10 的功能变迁(5)- 任务和集群管理

这一篇我们继续了解 ES 7.10 相较于 ES 6.8 调优的集群管理和任务管理的方法,主要有断联查询的主动取消、投票节点角色、异步查询和可搜索快照四个功能。

Query 自动取消

对于一个完善的产品来说,当一个任务发起链接主动断联的时候,服务端与之相关的任务应该也都被回收。但是这个特性到了 elasticsearch 7.4 版本才有了明确的声明。

Elasticsearch now automatically terminates queries sent through the _search endpoint when the initiating connection is closed.

相关的 PR 和 issue 在这里,对源码有兴趣的同学可以挖掘一下。

PR:https://github.com/elastic/elasticsearch/pull/43332
issue:https://github.com/elastic/elasticsearch/issues/43105

简单来说,ES 接受在某个查询的 http 链接断掉的时候,与其相关的父子任务的自动取消。原来的场景下可能需要手工一个个关闭。

实际测试

利用 painless 模拟复杂查询,下面这个查询在测试集群上能维持 5s 左右

GET /_search?max_concurrent_shard_requests=1
{
    "query": {
        "bool": {
            "must": [
                {
                    "script": {
                        "script": {
                            "lang": "painless",
                            "source": """
                                long sum = 0;
                                for (int i = 0; i < 100000; i++) {
                                    sum += i;
                                }
                                return true;
                            """
                        }
                    }
                },
                {
                    "script": {
                        "script": {
                            "lang": "painless",
                            "source": """
                                long product = 1;
                                for (int i = 1; i < 100000; i++) {
                                    product *= i;
                                }
                                return true;
                            """
                        }
                    }
                },
                {
                    "script": {
                        "script": {
                            "lang": "painless",
                            "source": """
                                long factorial = 1;
                                for (int i = 1; i < 100000; i++) {
                                    factorial *= i;
                                }
                                long squareSum = 0;
                                for (int j = 0; j < 100000; j++) {
                                    squareSum += j * j;
                                }
                                return true;
                            """
                        }
                    }
                },
                {
                    "script": {
                        "script": {
                            "lang": "painless",
                            "source": """
                                long fib1 = 0;
                                long fib2 = 1;
                                long next;
                                for (int i = 0; i < 100000; i++) {
                                    next = fib1 + fib2;
                                    fib1 = fib2;
                                    fib2 = next;
                                }
                                return true;
                            """
                        }
                    }
                }
            ]
        }
    }
}

查看任务被终止的状态

GET /_tasks?detailed=true&actions=*search*

测试脚本,判断上面该查询被取消后是否还可以查到任务

import requests
import multiprocessing
import time
from requests.exceptions import RequestException
from datetime import datetime

# Elasticsearch 地址
#ES_URL = "http://localhost:9210" # 6.8版本地址
ES_URL = "http://localhost:9201"

# 耗时查询的 DSL
LONG_RUNNING_QUERY = {"size":0,
    "query": {
        "bool": {
            "must": [
                {
                    "script": {
                        "script": {
                            "lang": "painless",
                            "source": """
                                long sum = 0;
                                for (int i = 0; i < 100000; i++) {
                                    sum += i;
                                }
                                return true;
                            """
                        }
                    }
                },
                {
                    "script": {
                        "script": {
                            "lang": "painless",
                            "source": """
                                long product = 1;
                                for (int i = 1; i < 100000; i++) {
                                    product *= i;
                                }
                                return true;
                            """
                        }
                    }
                },
                {
                    "script": {
                        "script": {
                            "lang": "painless",
                            "source": """
                                long factorial = 1;
                                for (int i = 1; i < 100000; i++) {
                                    factorial *= i;
                                }
                                long squareSum = 0;
                                for (int j = 0; j < 100000; j++) {
                                    squareSum += j * j;
                                }
                                return true;
                            """
                        }
                    }
                },
                {
                    "script": {
                        "script": {
                            "lang": "painless",
                            "source": """
                                long fib1 = 0;
                                long fib2 = 1;
                                long next;
                                for (int i = 0; i < 100000; i++) {
                                    next = fib1 + fib2;
                                    fib1 = fib2;
                                    fib2 = next;
                                }
                                return true;
                            """
                        }
                    }
                }
            ]
        }
    }
}

# 用于同步的事件对象
query_finished = multiprocessing.Event()
# 新增:进程终止标志位
process_terminated = multiprocessing.Event()

# 定义一个函数用于添加时间戳到日志
def log_with_timestamp(message,*message1):
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[{timestamp}] {message}+{message1}")

# 发起查询的函数
def run_query():
    try:
        log_with_timestamp("发起查询...")
        session = requests.Session()
        response = session.post(
            f"{ES_URL}/_search",
            json=LONG_RUNNING_QUERY,
            stream=True  # 启用流式请求,允许后续中断
        )
        try:
            # 尝试读取响应内容(如果连接未被中断)
            if response.status_code == 200:
                log_with_timestamp("查询完成,结果:", response.json())
            else:
                log_with_timestamp("查询失败,错误信息:", response.text)
        except RequestException as e:
            log_with_timestamp("请求被中断:", e)
    finally:
        # 标记查询完成
        query_finished.set()

# 中断连接的信号函数
def interrupt_signal():
    time.sleep(1)  # 等待 1 秒
    log_with_timestamp("发出中断查询信号...")
    # 标记可以中断查询了
    query_finished.set()

# 检测任务是否存在的函数
def check_task_exists():
    # 等待进程终止标志位
    process_terminated.wait()
    max_retries = 3
    retries = 0
    time.sleep(1) #1s后检查
    while retries < max_retries:
        log_with_timestamp("检查任务是否存在...")
        tasks_url = f"{ES_URL}/_tasks?detailed=true&actions=*search*"
        try:
            tasks_response = requests.get(tasks_url)
            if tasks_response.status_code == 200:
                tasks = tasks_response.json().get("nodes")
                if tasks:
                    log_with_timestamp("任务仍存在:", tasks)
                else:
                    log_with_timestamp("任务已消失")
                break
            else:
                log_with_timestamp("获取任务列表失败,错误信息:", tasks_response.text)
        except RequestException as e:
            log_with_timestamp(f"检测任务失败(第 {retries + 1} 次重试): {e}")
            retries += 1
            time.sleep(1)  # 等待 1 秒后重试
    if retries == max_retries:
        log_with_timestamp("达到最大重试次数,无法检测任务状态。")

# 主函数
def main():
    # 启动查询进程
    query_process = multiprocessing.Process(target=run_query)
    query_process.start()

    # 启动中断信号进程
    interrupt_process = multiprocessing.Process(target=interrupt_signal)
    interrupt_process.start()

    # 等待中断信号
    query_finished.wait()

    # 检查查询进程是否还存活并终止它
    if query_process.is_alive():
        log_with_timestamp("尝试中断查询进程...")
        query_process.terminate()
        log_with_timestamp("查询进程已终止")
        # 新增:设置进程终止标志位
        process_terminated.set()

    # 启动任务检测进程
    check_process = multiprocessing.Process(target=check_task_exists)
    check_process.start()

    # 等待所有进程完成
    query_process.join()
    interrupt_process.join()
    check_process.join()

if __name__ == "__main__":
    main()

实际测试结果:

# 6.8 版本
[2025-02-08 15:17:21] 发起查询...+()
[2025-02-08 15:17:22] 发出中断查询信号...+()
[2025-02-08 15:17:22] 尝试中断查询进程...+()
[2025-02-08 15:17:22] 查询进程已终止+()
[2025-02-08 15:17:23] 检查任务是否存在...+()
[2025-02-08 15:17:23] 任务仍存在:+({'fYMNv_KxQGCGzhgfMxPXuA': {......}},)

可以看到在查询任务被终止后 1s 再去检查,任务仍然存在

# 7.10 版本
[2025-02-08 15:18:16] 发起查询...+()
[2025-02-08 15:18:17] 发出中断查询信号...+()
[2025-02-08 15:18:17] 尝试中断查询进程...+()
[2025-02-08 15:18:17] 查询进程已终止+()
[2025-02-08 15:18:18] 检查任务是否存在...+()
[2025-02-08 15:18:18] 任务已消失+()

这里可以看到任务已经检测不到了。

关于 timeout 配置

这里展开讨论下,timeout 配置。超时回收处理是一个‘best effort’行为。

(Optional, time units) Specifies the period of time to wait for a response. If no response is received before the timeout expires, the request fails and returns an error. Defaults to no timeout.

the search request is more of a best effort and does not guarantee that the request will never last longer than the specified amount of time.

异步搜索

使用方法

可以让用户进行异步的搜索,可以通过相关参数进行检查维护该搜索的状态和结果。比较合适查询量较大但对延迟要求较低的查询,进行精细化的管理控制。

注意:这里的参数基本都是添加到 url 上的,并不是添加到 request body 上的。

POST test_index/_async_search?keep_on_completion=true
{
  "query": {
    "match_all": {}
  }
}

注:这里为了产生查询结果 id 使用了 keep_on_completion 参数,这个参数的使用见下面解释。

返回结果,和一般的查询结果不同的是,添加了结果 id 和查询的一些状态数据。

{
  "id": "Fmk2b0VjM2FEVE9Dbk9TemVyOTlkMncbOFlwRGU2OWZTa2kxNEpoT0Q2bVZrZzozODIz",//结果id,可以用于后续的复查
  "is_partial": false,//是否为部分完成结果
  "is_running": false,//是否还在查询
  "start_time_in_millis": 1738978637287,//查询产生时间戳
  "expiration_time_in_millis": 1739410637287,//查询结果过期时间戳
  "response": {
    "took": 1,
    "timed_out": false,
    "_shards": {
      "total": 1,
      "successful": 1,
      "skipped": 0,
      "failed": 0
    },
    "hits": {
      "total": {
        "value": 3,
        "relation": "eq"
      },
      "max_score": 1,
      "hits": [······]
    }
  }
}

管理查询结果

//查询结果和第一次返回的内容一致
GET /_async_search/Fmk2b0VjM2FEVE9Dbk9TemVyOTlkMncbOFlwRGU2OWZTa2kxNEpoT0Q2bVZrZzozODIz

//主动删除查询结果
DELETE /_async_search/Fmk2b0VjM2FEVE9Dbk9TemVyOTlkMncbOFlwRGU2OWZTa2kxNEpoT0Q2bVZrZzozODIz

关键参数

  • wait_for_completion_timeout:参数(默认为 1 秒),这个参数用来设置异步查询的等待时间。当异步搜索在此时间内完成时,响应将不包括 ID,结果也不会存储在集群中
  • keep_on_completion:参数(默认为 false)可以设置为 true,可以强制存储查询结果,即便在 wait_for_completion_timeout 设置时间内完成搜索,该结果也能被查询到。
  • keep_alive:指定异步搜索结果可以被保存多长时间,默认为 5d(5 天)。在此期间之后,正在进行的异步搜索和任何保存的搜索结果将被删除。
  • batched_reduce_size:是 Elasticsearch 中的一个配置参数,默认值为 5。它的作用是控制分片结果的部分归并频率,具体来说,它决定了协调节点(coordinating node)在接收到多少个分片的响应后,会执行一次部分结果归并(partial reduction)。
  • pre_filter_shard_size:是 Elasticsearch 中与查询执行相关的一个参数,它的默认值为 1,并且不可更改。这个参数的作用是强制 Elasticsearch 在执行查询之前,先进行一轮预过滤(pre-filter),以确定哪些分片(shard)可能包含与查询匹配的文档,从而跳过那些肯定不包含匹配文档的分片。

查询结果存储位置

异步查询的结果部分存储在.async-search中,但是进行了程序加密,内容对使用者不可见。

GET .async-search/_search
// 返回的结果
···
"hits": [
      {
        "_index": ".async-search",
        "_type": "_doc",
        "_id": "bPNotcTCTV-gSIiZLuK0IA",
        "_score": 1,
        "_source": {
          "result": "i6+xAwFERm1KUVRtOTBZMVJEVkZZdFoxTkphVnBNZFVzd1NVRWJPRmx3UkdVMk9XWlRhMmt4TkVwb1QwUTJiVlpyWnpvek1EWTEAAQEDAD+AAAADP4AAAAAAABR0Sm9yNDVRQlQ3bzBsZTdsYmp0TgAAAARfZG9jAP//////////AwALeyJhIjoxMTExfQoAAAAAAAAAAQEAAAAWOFlwRGU2OWZTa2kxNEpoT0Q2bVZrZwp0ZXN0X2luZGV4Fk5fYmphNXM1UWtpcnU4RXdleVlGSUEAAAA/gAAAAAAAFHRab3I0NVFCVDdvMGxlN2xlVHNrAAAABF9kb2MA//////////8DAAt7ImEiOjExMTJ9CgAAAAAAAAABAQAAABY4WXBEZTY5ZlNraTE0SmhPRDZtVmtnCnRlc3RfaW5kZXgWTl9iamE1czVRa2lydThFd2V5WUZJQQAAAD+AAAAAAAAUdHBvcjQ1UUJUN28wbGU3bGZqc28AAAAEX2RvYwD//////////wMAC3siYSI6MTExM30KAAAAAAAAAAEBAAAAFjhZcERlNjlmU2tpMTRKaE9ENm1Wa2cKdGVzdF9pbmRleBZOX2JqYTVzNVFraXJ1OEV3ZXlZRklBAAAAAAAAAAAAAgABAQEAAAAAAAsAAAAAAAABlOMuvCQAAAGU/O6IJA==",
          "headers": {},
          "expiration_time": 1739410278436,
          "response_headers": {}
        }
      },
···

只投票候选节点

这是一个主候选节点角色的优化,能相对固定 master 节点的位置,减少了选举候选节点过多的问题。

作用

Voting - only master - eligible node(仅参与投票的具备主节点资格的节点)在 Elasticsearch 集群中有以下作用:

  1. 参与主节点选举:该节点参与主节点选举过程,但本身不会成为集群选出的主节点,主要作为选举中的决胜因素(打破平局)。
  2. 保障高可用性:在高可用性(HA)集群中,至少需要三个具备主节点资格的节点,其中至少两个不能是仅参与投票的节点,这样即使有一个节点故障,集群仍能选出主节点。
  3. 分担选举及状态发布任务:和普通具备主节点资格的节点一样,在集群状态发布期间承担特定任务。
  4. 灵活承担其他角色:可以同时承担集群中的其他角色,如数据节点;也可以作为专用节点,不承担其他角色。

配置

三个节点的集群:可以配置两个普通主节点资格节点和一个仅参与投票的节点。这样在一个普通主节点故障时,剩下的普通主节点和仅参与投票的节点一起可以完成主节点选举,保证集群的正常运行。 理论上,主候选节点数量能满足不同区域间的主备切换要求即可,其余可以都是投票节点。

可搜索快照

注意:这是一个收费功能

实现机制

可搜索快照让你能够通过使用快照来保障数据恢复能力,而非在集群内维护副本分片,从而降低运营成本。

当你将快照中的索引挂载为可搜索快照时,Elasticsearch 会将索引分片复制到集群内的本地存储中。这能确保搜索性能与搜索其他任何索引相当,并尽量减少对访问快照存储库的需求。如果某个节点发生故障,可搜索快照索引的分片会自动从快照存储库中恢复。

搜索可搜索快照索引与搜索其他任何索引的方式相同。搜索性能与常规索引相当,因为在挂载可搜索快照时,分片数据会被复制到集群中的节点上

如果某个节点发生故障,且需要从快照中恢复可搜索快照分片,在 Elasticsearch 将分片分配到其他节点的短暂时间内,集群健康状态将不会显示为绿色。在这些分片重新分配完成之前,对这些分片的搜索将会失败或返回部分结果。

对于搜索频率较低的数据,这能显著节省成本。使用可搜索快照,不再需要额外的索引分片副本以避免数据丢失,这有可能将搜索该数据所需的节点本地存储容量减少一半。同时可搜索快照依赖于备份使用的快照,也不需要额外的空间。

使用建议

  1. 从含多索引的快照挂载单个索引时,建议进行使用分隔,创建仅含目标索引的快照副本并挂载,方便独立管理备份与可搜索快照生命周期。

  2. 挂载为可搜索快照索引前,建议将索引强制合并为每分片一个段,减少从存储库读取数据的操作和成本。

实际测试

基础配置

前提条件:需要一个镜像使用存储,这里使用 minIO 作为测试

  1. 安装 S3 插件,并注册快照库信息
# 在线安装插件

elasticsearch-plugin install repository-s3

# 设置访问minio的信息,elasticsearch的bin目录下,使用minIO中设置的用户名密码

./elasticsearch-keystore add s3.client.default.access_key
./elasticsearch-keystore add s3.client.default.secret_key

# 重载安全设置,然后重启节点
POST _nodes/reload_secure_settings

# 注册快照库
PUT _snapshot/my-minio-repository
{
  "type": "s3",
  "settings": {
    "bucket": "es-bucket",
    "endpoint": "http://127.0.0.1:9002",
    "compress": true
  }
}
  1. 挂载需要的快照索引
POST /_snapshot/my-minio-repository/snapshot_es_prp_cmain_20240829/_mount?wait_for_completion=true
{
  "index": "es_prp_cmain_insured_itemkind_detail_formal_20240829",
  "renamed_index": "test_searchable_snapshot",//挂载时对索引进行重命名
  "index_settings": {
    "index.number_of_replicas": 0
  },
  "ignore_index_settings": [ "index.refresh_interval" ]
}
  1. 检查空间占用
GET _cat/indices/test_searchable_snapshot?v
health status index                    uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   test_searchable_snapshot qROj2flcRdiGOZaejeAmQQ   1   0      10000            0     21.3mb         21.3mb

在系统上也看到了对应 uuid 的文件目录

[root@hcss-ecs 0]# ls
_state  snapshot_cache  translog
[root@hcss-ecs 0]# pwd
/data/elasticsearch-7.10.2/data/nodes/0/indices/qROj2flcRdiGOZaejeAmQQ/0

小结

这篇的内容讲解测试的相对较细,对于查询的自动取消和异步查询增加了 ES 查询任务的灵活性;只投票节点也是加强了主节点选举的稳定性;可搜索快照是成本和功能的均衡方法,对于日志场景的使用是一个不错的选择。

推荐阅读

关于极限科技(INFINI Labs)

INFINI Labs

极限科技,全称极限数据(北京)科技有限公司,是一家专注于实时搜索与数据分析的软件公司。旗下品牌极限实验室(INFINI Labs)致力于打造极致易用的数据探索与分析体验。

极限科技是一支年轻的团队,采用天然分布式的方式来进行远程协作,员工分布在全球各地,希望通过努力成为中国乃至全球企业大数据实时搜索分析产品的首选,为中国技术品牌输出添砖加瓦。

官网:https://infinilabs.cn

作者:金多安,极限科技(INFINI Labs)搜索运维专家,Elastic 认证专家,搜索客社区日报责任编辑。一直从事与搜索运维相关的工作,日常会去挖掘 ES / Lucene 方向的搜索技术原理,保持搜索相关技术发展的关注。
原文:https://infinilabs.cn/blog/2025/feature-evolution-from-elasticsearch-6.8-to-7.10-part-5/

继续阅读 »

这一篇我们继续了解 ES 7.10 相较于 ES 6.8 调优的集群管理和任务管理的方法,主要有断联查询的主动取消、投票节点角色、异步查询和可搜索快照四个功能。

Query 自动取消

对于一个完善的产品来说,当一个任务发起链接主动断联的时候,服务端与之相关的任务应该也都被回收。但是这个特性到了 elasticsearch 7.4 版本才有了明确的声明。

Elasticsearch now automatically terminates queries sent through the _search endpoint when the initiating connection is closed.

相关的 PR 和 issue 在这里,对源码有兴趣的同学可以挖掘一下。

PR:https://github.com/elastic/elasticsearch/pull/43332
issue:https://github.com/elastic/elasticsearch/issues/43105

简单来说,ES 接受在某个查询的 http 链接断掉的时候,与其相关的父子任务的自动取消。原来的场景下可能需要手工一个个关闭。

实际测试

利用 painless 模拟复杂查询,下面这个查询在测试集群上能维持 5s 左右

GET /_search?max_concurrent_shard_requests=1
{
    "query": {
        "bool": {
            "must": [
                {
                    "script": {
                        "script": {
                            "lang": "painless",
                            "source": """
                                long sum = 0;
                                for (int i = 0; i < 100000; i++) {
                                    sum += i;
                                }
                                return true;
                            """
                        }
                    }
                },
                {
                    "script": {
                        "script": {
                            "lang": "painless",
                            "source": """
                                long product = 1;
                                for (int i = 1; i < 100000; i++) {
                                    product *= i;
                                }
                                return true;
                            """
                        }
                    }
                },
                {
                    "script": {
                        "script": {
                            "lang": "painless",
                            "source": """
                                long factorial = 1;
                                for (int i = 1; i < 100000; i++) {
                                    factorial *= i;
                                }
                                long squareSum = 0;
                                for (int j = 0; j < 100000; j++) {
                                    squareSum += j * j;
                                }
                                return true;
                            """
                        }
                    }
                },
                {
                    "script": {
                        "script": {
                            "lang": "painless",
                            "source": """
                                long fib1 = 0;
                                long fib2 = 1;
                                long next;
                                for (int i = 0; i < 100000; i++) {
                                    next = fib1 + fib2;
                                    fib1 = fib2;
                                    fib2 = next;
                                }
                                return true;
                            """
                        }
                    }
                }
            ]
        }
    }
}

查看任务被终止的状态

GET /_tasks?detailed=true&actions=*search*

测试脚本,判断上面该查询被取消后是否还可以查到任务

import requests
import multiprocessing
import time
from requests.exceptions import RequestException
from datetime import datetime

# Elasticsearch 地址
#ES_URL = "http://localhost:9210" # 6.8版本地址
ES_URL = "http://localhost:9201"

# 耗时查询的 DSL
LONG_RUNNING_QUERY = {"size":0,
    "query": {
        "bool": {
            "must": [
                {
                    "script": {
                        "script": {
                            "lang": "painless",
                            "source": """
                                long sum = 0;
                                for (int i = 0; i < 100000; i++) {
                                    sum += i;
                                }
                                return true;
                            """
                        }
                    }
                },
                {
                    "script": {
                        "script": {
                            "lang": "painless",
                            "source": """
                                long product = 1;
                                for (int i = 1; i < 100000; i++) {
                                    product *= i;
                                }
                                return true;
                            """
                        }
                    }
                },
                {
                    "script": {
                        "script": {
                            "lang": "painless",
                            "source": """
                                long factorial = 1;
                                for (int i = 1; i < 100000; i++) {
                                    factorial *= i;
                                }
                                long squareSum = 0;
                                for (int j = 0; j < 100000; j++) {
                                    squareSum += j * j;
                                }
                                return true;
                            """
                        }
                    }
                },
                {
                    "script": {
                        "script": {
                            "lang": "painless",
                            "source": """
                                long fib1 = 0;
                                long fib2 = 1;
                                long next;
                                for (int i = 0; i < 100000; i++) {
                                    next = fib1 + fib2;
                                    fib1 = fib2;
                                    fib2 = next;
                                }
                                return true;
                            """
                        }
                    }
                }
            ]
        }
    }
}

# 用于同步的事件对象
query_finished = multiprocessing.Event()
# 新增:进程终止标志位
process_terminated = multiprocessing.Event()

# 定义一个函数用于添加时间戳到日志
def log_with_timestamp(message,*message1):
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[{timestamp}] {message}+{message1}")

# 发起查询的函数
def run_query():
    try:
        log_with_timestamp("发起查询...")
        session = requests.Session()
        response = session.post(
            f"{ES_URL}/_search",
            json=LONG_RUNNING_QUERY,
            stream=True  # 启用流式请求,允许后续中断
        )
        try:
            # 尝试读取响应内容(如果连接未被中断)
            if response.status_code == 200:
                log_with_timestamp("查询完成,结果:", response.json())
            else:
                log_with_timestamp("查询失败,错误信息:", response.text)
        except RequestException as e:
            log_with_timestamp("请求被中断:", e)
    finally:
        # 标记查询完成
        query_finished.set()

# 中断连接的信号函数
def interrupt_signal():
    time.sleep(1)  # 等待 1 秒
    log_with_timestamp("发出中断查询信号...")
    # 标记可以中断查询了
    query_finished.set()

# 检测任务是否存在的函数
def check_task_exists():
    # 等待进程终止标志位
    process_terminated.wait()
    max_retries = 3
    retries = 0
    time.sleep(1) #1s后检查
    while retries < max_retries:
        log_with_timestamp("检查任务是否存在...")
        tasks_url = f"{ES_URL}/_tasks?detailed=true&actions=*search*"
        try:
            tasks_response = requests.get(tasks_url)
            if tasks_response.status_code == 200:
                tasks = tasks_response.json().get("nodes")
                if tasks:
                    log_with_timestamp("任务仍存在:", tasks)
                else:
                    log_with_timestamp("任务已消失")
                break
            else:
                log_with_timestamp("获取任务列表失败,错误信息:", tasks_response.text)
        except RequestException as e:
            log_with_timestamp(f"检测任务失败(第 {retries + 1} 次重试): {e}")
            retries += 1
            time.sleep(1)  # 等待 1 秒后重试
    if retries == max_retries:
        log_with_timestamp("达到最大重试次数,无法检测任务状态。")

# 主函数
def main():
    # 启动查询进程
    query_process = multiprocessing.Process(target=run_query)
    query_process.start()

    # 启动中断信号进程
    interrupt_process = multiprocessing.Process(target=interrupt_signal)
    interrupt_process.start()

    # 等待中断信号
    query_finished.wait()

    # 检查查询进程是否还存活并终止它
    if query_process.is_alive():
        log_with_timestamp("尝试中断查询进程...")
        query_process.terminate()
        log_with_timestamp("查询进程已终止")
        # 新增:设置进程终止标志位
        process_terminated.set()

    # 启动任务检测进程
    check_process = multiprocessing.Process(target=check_task_exists)
    check_process.start()

    # 等待所有进程完成
    query_process.join()
    interrupt_process.join()
    check_process.join()

if __name__ == "__main__":
    main()

实际测试结果:

# 6.8 版本
[2025-02-08 15:17:21] 发起查询...+()
[2025-02-08 15:17:22] 发出中断查询信号...+()
[2025-02-08 15:17:22] 尝试中断查询进程...+()
[2025-02-08 15:17:22] 查询进程已终止+()
[2025-02-08 15:17:23] 检查任务是否存在...+()
[2025-02-08 15:17:23] 任务仍存在:+({'fYMNv_KxQGCGzhgfMxPXuA': {......}},)

可以看到在查询任务被终止后 1s 再去检查,任务仍然存在

# 7.10 版本
[2025-02-08 15:18:16] 发起查询...+()
[2025-02-08 15:18:17] 发出中断查询信号...+()
[2025-02-08 15:18:17] 尝试中断查询进程...+()
[2025-02-08 15:18:17] 查询进程已终止+()
[2025-02-08 15:18:18] 检查任务是否存在...+()
[2025-02-08 15:18:18] 任务已消失+()

这里可以看到任务已经检测不到了。

关于 timeout 配置

这里展开讨论下,timeout 配置。超时回收处理是一个‘best effort’行为。

(Optional, time units) Specifies the period of time to wait for a response. If no response is received before the timeout expires, the request fails and returns an error. Defaults to no timeout.

the search request is more of a best effort and does not guarantee that the request will never last longer than the specified amount of time.

异步搜索

使用方法

可以让用户进行异步的搜索,可以通过相关参数进行检查维护该搜索的状态和结果。比较合适查询量较大但对延迟要求较低的查询,进行精细化的管理控制。

注意:这里的参数基本都是添加到 url 上的,并不是添加到 request body 上的。

POST test_index/_async_search?keep_on_completion=true
{
  "query": {
    "match_all": {}
  }
}

注:这里为了产生查询结果 id 使用了 keep_on_completion 参数,这个参数的使用见下面解释。

返回结果,和一般的查询结果不同的是,添加了结果 id 和查询的一些状态数据。

{
  "id": "Fmk2b0VjM2FEVE9Dbk9TemVyOTlkMncbOFlwRGU2OWZTa2kxNEpoT0Q2bVZrZzozODIz",//结果id,可以用于后续的复查
  "is_partial": false,//是否为部分完成结果
  "is_running": false,//是否还在查询
  "start_time_in_millis": 1738978637287,//查询产生时间戳
  "expiration_time_in_millis": 1739410637287,//查询结果过期时间戳
  "response": {
    "took": 1,
    "timed_out": false,
    "_shards": {
      "total": 1,
      "successful": 1,
      "skipped": 0,
      "failed": 0
    },
    "hits": {
      "total": {
        "value": 3,
        "relation": "eq"
      },
      "max_score": 1,
      "hits": [······]
    }
  }
}

管理查询结果

//查询结果和第一次返回的内容一致
GET /_async_search/Fmk2b0VjM2FEVE9Dbk9TemVyOTlkMncbOFlwRGU2OWZTa2kxNEpoT0Q2bVZrZzozODIz

//主动删除查询结果
DELETE /_async_search/Fmk2b0VjM2FEVE9Dbk9TemVyOTlkMncbOFlwRGU2OWZTa2kxNEpoT0Q2bVZrZzozODIz

关键参数

  • wait_for_completion_timeout:参数(默认为 1 秒),这个参数用来设置异步查询的等待时间。当异步搜索在此时间内完成时,响应将不包括 ID,结果也不会存储在集群中
  • keep_on_completion:参数(默认为 false)可以设置为 true,可以强制存储查询结果,即便在 wait_for_completion_timeout 设置时间内完成搜索,该结果也能被查询到。
  • keep_alive:指定异步搜索结果可以被保存多长时间,默认为 5d(5 天)。在此期间之后,正在进行的异步搜索和任何保存的搜索结果将被删除。
  • batched_reduce_size:是 Elasticsearch 中的一个配置参数,默认值为 5。它的作用是控制分片结果的部分归并频率,具体来说,它决定了协调节点(coordinating node)在接收到多少个分片的响应后,会执行一次部分结果归并(partial reduction)。
  • pre_filter_shard_size:是 Elasticsearch 中与查询执行相关的一个参数,它的默认值为 1,并且不可更改。这个参数的作用是强制 Elasticsearch 在执行查询之前,先进行一轮预过滤(pre-filter),以确定哪些分片(shard)可能包含与查询匹配的文档,从而跳过那些肯定不包含匹配文档的分片。

查询结果存储位置

异步查询的结果部分存储在.async-search中,但是进行了程序加密,内容对使用者不可见。

GET .async-search/_search
// 返回的结果
···
"hits": [
      {
        "_index": ".async-search",
        "_type": "_doc",
        "_id": "bPNotcTCTV-gSIiZLuK0IA",
        "_score": 1,
        "_source": {
          "result": "i6+xAwFERm1KUVRtOTBZMVJEVkZZdFoxTkphVnBNZFVzd1NVRWJPRmx3UkdVMk9XWlRhMmt4TkVwb1QwUTJiVlpyWnpvek1EWTEAAQEDAD+AAAADP4AAAAAAABR0Sm9yNDVRQlQ3bzBsZTdsYmp0TgAAAARfZG9jAP//////////AwALeyJhIjoxMTExfQoAAAAAAAAAAQEAAAAWOFlwRGU2OWZTa2kxNEpoT0Q2bVZrZwp0ZXN0X2luZGV4Fk5fYmphNXM1UWtpcnU4RXdleVlGSUEAAAA/gAAAAAAAFHRab3I0NVFCVDdvMGxlN2xlVHNrAAAABF9kb2MA//////////8DAAt7ImEiOjExMTJ9CgAAAAAAAAABAQAAABY4WXBEZTY5ZlNraTE0SmhPRDZtVmtnCnRlc3RfaW5kZXgWTl9iamE1czVRa2lydThFd2V5WUZJQQAAAD+AAAAAAAAUdHBvcjQ1UUJUN28wbGU3bGZqc28AAAAEX2RvYwD//////////wMAC3siYSI6MTExM30KAAAAAAAAAAEBAAAAFjhZcERlNjlmU2tpMTRKaE9ENm1Wa2cKdGVzdF9pbmRleBZOX2JqYTVzNVFraXJ1OEV3ZXlZRklBAAAAAAAAAAAAAgABAQEAAAAAAAsAAAAAAAABlOMuvCQAAAGU/O6IJA==",
          "headers": {},
          "expiration_time": 1739410278436,
          "response_headers": {}
        }
      },
···

只投票候选节点

这是一个主候选节点角色的优化,能相对固定 master 节点的位置,减少了选举候选节点过多的问题。

作用

Voting - only master - eligible node(仅参与投票的具备主节点资格的节点)在 Elasticsearch 集群中有以下作用:

  1. 参与主节点选举:该节点参与主节点选举过程,但本身不会成为集群选出的主节点,主要作为选举中的决胜因素(打破平局)。
  2. 保障高可用性:在高可用性(HA)集群中,至少需要三个具备主节点资格的节点,其中至少两个不能是仅参与投票的节点,这样即使有一个节点故障,集群仍能选出主节点。
  3. 分担选举及状态发布任务:和普通具备主节点资格的节点一样,在集群状态发布期间承担特定任务。
  4. 灵活承担其他角色:可以同时承担集群中的其他角色,如数据节点;也可以作为专用节点,不承担其他角色。

配置

三个节点的集群:可以配置两个普通主节点资格节点和一个仅参与投票的节点。这样在一个普通主节点故障时,剩下的普通主节点和仅参与投票的节点一起可以完成主节点选举,保证集群的正常运行。 理论上,主候选节点数量能满足不同区域间的主备切换要求即可,其余可以都是投票节点。

可搜索快照

注意:这是一个收费功能

实现机制

可搜索快照让你能够通过使用快照来保障数据恢复能力,而非在集群内维护副本分片,从而降低运营成本。

当你将快照中的索引挂载为可搜索快照时,Elasticsearch 会将索引分片复制到集群内的本地存储中。这能确保搜索性能与搜索其他任何索引相当,并尽量减少对访问快照存储库的需求。如果某个节点发生故障,可搜索快照索引的分片会自动从快照存储库中恢复。

搜索可搜索快照索引与搜索其他任何索引的方式相同。搜索性能与常规索引相当,因为在挂载可搜索快照时,分片数据会被复制到集群中的节点上

如果某个节点发生故障,且需要从快照中恢复可搜索快照分片,在 Elasticsearch 将分片分配到其他节点的短暂时间内,集群健康状态将不会显示为绿色。在这些分片重新分配完成之前,对这些分片的搜索将会失败或返回部分结果。

对于搜索频率较低的数据,这能显著节省成本。使用可搜索快照,不再需要额外的索引分片副本以避免数据丢失,这有可能将搜索该数据所需的节点本地存储容量减少一半。同时可搜索快照依赖于备份使用的快照,也不需要额外的空间。

使用建议

  1. 从含多索引的快照挂载单个索引时,建议进行使用分隔,创建仅含目标索引的快照副本并挂载,方便独立管理备份与可搜索快照生命周期。

  2. 挂载为可搜索快照索引前,建议将索引强制合并为每分片一个段,减少从存储库读取数据的操作和成本。

实际测试

基础配置

前提条件:需要一个镜像使用存储,这里使用 minIO 作为测试

  1. 安装 S3 插件,并注册快照库信息
# 在线安装插件

elasticsearch-plugin install repository-s3

# 设置访问minio的信息,elasticsearch的bin目录下,使用minIO中设置的用户名密码

./elasticsearch-keystore add s3.client.default.access_key
./elasticsearch-keystore add s3.client.default.secret_key

# 重载安全设置,然后重启节点
POST _nodes/reload_secure_settings

# 注册快照库
PUT _snapshot/my-minio-repository
{
  "type": "s3",
  "settings": {
    "bucket": "es-bucket",
    "endpoint": "http://127.0.0.1:9002",
    "compress": true
  }
}
  1. 挂载需要的快照索引
POST /_snapshot/my-minio-repository/snapshot_es_prp_cmain_20240829/_mount?wait_for_completion=true
{
  "index": "es_prp_cmain_insured_itemkind_detail_formal_20240829",
  "renamed_index": "test_searchable_snapshot",//挂载时对索引进行重命名
  "index_settings": {
    "index.number_of_replicas": 0
  },
  "ignore_index_settings": [ "index.refresh_interval" ]
}
  1. 检查空间占用
GET _cat/indices/test_searchable_snapshot?v
health status index                    uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   test_searchable_snapshot qROj2flcRdiGOZaejeAmQQ   1   0      10000            0     21.3mb         21.3mb

在系统上也看到了对应 uuid 的文件目录

[root@hcss-ecs 0]# ls
_state  snapshot_cache  translog
[root@hcss-ecs 0]# pwd
/data/elasticsearch-7.10.2/data/nodes/0/indices/qROj2flcRdiGOZaejeAmQQ/0

小结

这篇的内容讲解测试的相对较细,对于查询的自动取消和异步查询增加了 ES 查询任务的灵活性;只投票节点也是加强了主节点选举的稳定性;可搜索快照是成本和功能的均衡方法,对于日志场景的使用是一个不错的选择。

推荐阅读

关于极限科技(INFINI Labs)

INFINI Labs

极限科技,全称极限数据(北京)科技有限公司,是一家专注于实时搜索与数据分析的软件公司。旗下品牌极限实验室(INFINI Labs)致力于打造极致易用的数据探索与分析体验。

极限科技是一支年轻的团队,采用天然分布式的方式来进行远程协作,员工分布在全球各地,希望通过努力成为中国乃至全球企业大数据实时搜索分析产品的首选,为中国技术品牌输出添砖加瓦。

官网:https://infinilabs.cn

作者:金多安,极限科技(INFINI Labs)搜索运维专家,Elastic 认证专家,搜索客社区日报责任编辑。一直从事与搜索运维相关的工作,日常会去挖掘 ES / Lucene 方向的搜索技术原理,保持搜索相关技术发展的关注。
原文:https://infinilabs.cn/blog/2025/feature-evolution-from-elasticsearch-6.8-to-7.10-part-5/

收起阅读 »

INFINI Console 纳管 Elasticsearch 9(二):日志监控

前面介绍过 INFINI Console 纳管 Elasticsearch 9(一),进行指标监控、数据管理、DSL 语句执行,但日志监控功能需要结合 Agent 才能使用。现在来实现一下:

Agent 需要和 ES 部署到同一机器上,这里是在我本地电脑上进行安装。

安装 Elastisearch

curl -O https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-9.0.0-darwin-x86_64.tar.gz
curl https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-9.0.0-darwin-x86_64.tar.gz.sha512 | shasum -a 512 -c -
tar -xzf elasticsearch-9.0.0-darwin-x86_64.tar.gz
cd elasticsearch-9.0.0/
./bin/elasticsearch

将 ES 注册到 Console 中。

安装 Agent

curl -O https://release.infinilabs.com/agent/stable/agent-1.29.2-2008-mac-amd64.zip
mkdir agent
unzip agent-1.29.2-2008-mac-amd64.zip -d agent/
cd agent

修改配置文件 agent.yml,填写正确的 Console 地址。

启动 Agent,成功注册到 Console,获取到相关配置,但连接 Console 系统集群出现异常,这是因为 Console 系统集群是 Docker 部署的(172.17.0.2 是 Docker 内部 IP)。

通过 Console 修改 Agent 配置。

注:Console 系统集群地址需调整为 Agent 可以访问的地址;配置版本号需要增大。

Agent 运行无异常。

Agent 注册 Console 成功后,在 Console 页面“资源管理”-“探针管理”中可以看到注册的 Agent 实例。

关联操作

Agent 关联到需要采集数据的 ES 集群。

调整监控模式

关联成功后,将 ES 集群的监控模式改为 Agent。

可在监控报表中,查看采集状态。

查看日志

至此日志功能已可以使用。

关于 INFINI Console

INFINI Console 是一款开源的非常轻量级的多集群、跨版本的搜索基础设施统一管控平台。通过对流行的搜索引擎基础设施进行跨版本、多集群的集中纳管,企业可以快速方便的统一管理企业内部的不同版本的多套搜索集群。INFINI Console 还可以对集群内的索引及数据进行操作管理,可以配置灵活的告警规则,可以指定统一的安全策略,可以查看各个维度的日志和审计信息,真正实现企业级的搜索服务平台化建设和运营。

官网文档:https://docs.infinilabs.com/console
开源地址:https://github.com/infinilabs/console

原文:https://infinilabs.cn/blog/2025/time-range-mergepolicy-for-easysearch/

继续阅读 »

前面介绍过 INFINI Console 纳管 Elasticsearch 9(一),进行指标监控、数据管理、DSL 语句执行,但日志监控功能需要结合 Agent 才能使用。现在来实现一下:

Agent 需要和 ES 部署到同一机器上,这里是在我本地电脑上进行安装。

安装 Elastisearch

curl -O https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-9.0.0-darwin-x86_64.tar.gz
curl https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-9.0.0-darwin-x86_64.tar.gz.sha512 | shasum -a 512 -c -
tar -xzf elasticsearch-9.0.0-darwin-x86_64.tar.gz
cd elasticsearch-9.0.0/
./bin/elasticsearch

将 ES 注册到 Console 中。

安装 Agent

curl -O https://release.infinilabs.com/agent/stable/agent-1.29.2-2008-mac-amd64.zip
mkdir agent
unzip agent-1.29.2-2008-mac-amd64.zip -d agent/
cd agent

修改配置文件 agent.yml,填写正确的 Console 地址。

启动 Agent,成功注册到 Console,获取到相关配置,但连接 Console 系统集群出现异常,这是因为 Console 系统集群是 Docker 部署的(172.17.0.2 是 Docker 内部 IP)。

通过 Console 修改 Agent 配置。

注:Console 系统集群地址需调整为 Agent 可以访问的地址;配置版本号需要增大。

Agent 运行无异常。

Agent 注册 Console 成功后,在 Console 页面“资源管理”-“探针管理”中可以看到注册的 Agent 实例。

关联操作

Agent 关联到需要采集数据的 ES 集群。

调整监控模式

关联成功后,将 ES 集群的监控模式改为 Agent。

可在监控报表中,查看采集状态。

查看日志

至此日志功能已可以使用。

关于 INFINI Console

INFINI Console 是一款开源的非常轻量级的多集群、跨版本的搜索基础设施统一管控平台。通过对流行的搜索引擎基础设施进行跨版本、多集群的集中纳管,企业可以快速方便的统一管理企业内部的不同版本的多套搜索集群。INFINI Console 还可以对集群内的索引及数据进行操作管理,可以配置灵活的告警规则,可以指定统一的安全策略,可以查看各个维度的日志和审计信息,真正实现企业级的搜索服务平台化建设和运营。

官网文档:https://docs.infinilabs.com/console
开源地址:https://github.com/infinilabs/console

原文:https://infinilabs.cn/blog/2025/time-range-mergepolicy-for-easysearch/

收起阅读 »

谈谈 ES 6.8 到 7.10 的功能变迁(4)- 聚合功能篇

这一篇我们继续了解 ES 7.10 相较于 ES 6.8 新增的聚合方法。

Rare Terms 聚合

功能说明

用于聚合查询出字段中的稀有项。ES 常见的统计方法是使用 term 查询的正向排序,但是在大数据量和高基数的数据分布场景下会出现 unbounded 错误。Rare 聚合弥补了这个场景的查询方法。注意的是,这个聚合计算出来的是一个近似值。

注意事项

使用限制

  • 只能用于 keyword、numeric、ip 或 boolean 类型字段
  • max_doc_count 参数限制文档数量(默认为 1)
  • precision_threshold 参数控制精度(默认为 3000)

性能考虑

  • 高基数(数据集中不同值的数量非常多)字段上性能较好
  • 内存消耗相对较大
  • 聚合是在 shard 层做的统计,建议使用合适的 shard 大小

精度控制

  • 结果是近似值,具体说明见此
  • 可以通过 precision_threshold 调整精度,精度越高,内存消耗越大

Cumulative Cardinality 聚合

功能说明

一个管道聚合,计算 histogram(或 date_histogram)聚合中的累积基数。 Cumulative_cardinality 聚合对于查找几个时间段内的"新项目"很有用,比如每天网站的新访客数量。常规 Cardinaity 聚合会告诉你每天有多少独立访客,但不会区分"新"或"重复"访客。Cumulative_cardinality 聚合可以用来确定每天有多少独立访问者是"新"的。

可以通过 precision_threshold 参数调整精度,内存消耗随精度增加而增加。建议根据实际需求调整精度,避免不必要的高精度设置。

使用要求

  • 需要一个 date_histogram 或 histogram 聚合
  • 需要一个 cardinality 度量聚合
  • buckets_path 必须指向一个有效的 cardinality 聚合

代码样例

GET /user_hits/_search
{
  "size": 0,
  "aggs": {
    "users_per_day": {
      "date_histogram": {
        "field": "timestamp",
        "calendar_interval": "day"
      },
      "aggs": {
        "distinct_users": {
          "cardinality": {
            "field": "user_id"
          }
        },
        "total_new_users": {
          "cumulative_cardinality": {
            "buckets_path": "distinct_users"
          }
        }
      }
    }
  }
}

Geotile Grid 聚合

功能说明

基于 geo_point 字段的地理位置多桶聚合。将地理空间数据按照网格划分,便于可视化和分析。

注意要点

  1. 网格设置: precision 参数控制网格精度(0-29),精度越高,网格越小,桶数越多。
  2. 高精度会产生大量桶,内存消耗随精度增加而增加。
  3. 只支持 geo_point 类型字段。

代码样例

POST /museums/_search?size=0
{
  "aggregations": {
    "tiles-in-bounds": {
      "geotile_grid": {
        "field": "location",
        "precision": 22,
        "bounds": {
          "top_left": "52.4, 4.9",
          "bottom_right": "52.3, 5.0"
        }
      }
    }
  }
}

T-test 聚合

功能说明

T_test 是一种统计假设检验,用于判断测试统计量在零假设下是否服从学生 t 分布(Student’s t-distribution)。它适用于从聚合文档中提取的数值或通过提供的脚本生成的数值。 该聚合将会返回该检验的 p 值(概率值)。它是在零假设正确的情况下(这意味着总体均值之间没有差异),获得至少与聚合所处理结果一样极端结果的概率。p 值越小,意味着零假设越有可能不正确,总体均值实际上是存在差异的。

关于 Student’s t-distribution

Student's t - distribution(学生 t - 分布),简称 t - 分布,是一种概率分布。它在统计学中具有重要地位,特别是在样本量较小且总体标准差未知的情况下用于对总体均值进行估计和假设检验。 它的形状类似于正态分布,呈钟形曲线,但比正态分布的 “尾部” 更厚。也就是说,t - 分布在均值两侧的极端值出现的概率比正态分布更高。

测试代码

GET node_upgrade/_search
{
  "size": 0,
  "aggs": {
    "startup_time_ttest": {
      "t_test": {
        "a": { "field": "startup_time_before" },
        "b": { "field": "startup_time_after" },
        "type": "paired"
      }
    }
  }
}

Variable Width Histogram 可变直方图聚合

功能说明

类似于 histogram 的多桶聚合。但与 histogram 不同,每个桶的宽度不是预先指定的,而是根据目标桶数量动态确定间隔

参数设置

  • field 必须是数值类型
  • buckets 参数指定目标桶数
  • 实际桶数可能少于指定值

性能考虑

  • 比固定宽度直方图更耗资源,大数据集上可能较慢
  • 建议限制目标桶数量

使用场景

  • 数据分布不均匀时特别有用
  • 适合探索性数据分析
  • 可以避免空桶或过密桶

Normalize 归一化聚合

功能说明

一个管道聚合,用于计算特定桶值的归一化或重新缩放后的值

方法选择

可以归一化处理的方法有:

  • rescale_0_1:0 到 1 重缩放,这种方法对数据进行重新缩放,使得最小值变为 0,最大值变为 1,其余数值则在两者之间进行线性归一化。
  • rescale_0_100:0 到 100 重缩放,该方法对数据进行重新缩放,让最小值变为 0,最大值变为 100,其余数值在它们之间按线性方式进行归一化。
  • percent_of_sum:占总和的百分比,此方法对每个值进行归一化,使其表示为占总值的百分比。
  • mean:均值归一化,这种方法进行归一化时,每个值依据其与平均值的差异程度来进行归一化。
  • zscore:Z 分数归一化,该方法进行归一化时,每个值表示的是其相对于标准差偏离均值的程度。
  • softmax:软最大化归一化,这种方法进行归一化时,先对每个值取指数,然后相对于原始值指数之和来进行归一化。

参数配置

  • method 参数指定归一化方法
  • buckets_path 指定数据来源
  • 可以设置缺失值处理

Moving Percentiles 聚合

功能介绍

一个管道聚合,对于一组有序的百分位数,移动百分位数聚合(Moving Percentile Aggregation)会在这些百分位数上滑动一个窗口,并计算累积百分位数。

关于 shift 偏移参数

默认情况下(偏移量 shift = 0 时),用于计算的窗口是除当前桶之外的最后 n 个值。将偏移量增加 1 会使起始窗口位置向右移动 1 个单位。

  • 若要将当前桶包含在窗口内,请使用 shift = 1。
  • 对于居中对齐(当前桶前后各有 n / 2 个值),使用 shift = window / 2。
  • 对于右对齐(当前桶之后有 n 个值),使用 shift = window。

如果窗口的任一边缘移动到数据序列边界之外,窗口将会收缩,仅包含可用的值。

Rate 聚合

功能说明

在 date_histogram 的聚合上使用,用于计算每个 date_histogram 桶中的文档速率或字段速率

rare 聚合支持多种时间单位(如秒、分、时),使用时需要明确指定单位。可以用来计算文档数或字段值,但必须与 date_histogram 一起使用。

小结

这一篇粗略的列举了 ES 7.10 版本中新增的聚合方法。相较于查询方法的高使用频率和低资源占用,聚合方法的使用频率相对较少,内存也会有一定的占用,大家可以根据实际场景选择使用。

推荐阅读

关于极限科技(INFINI Labs)

INFINI Labs

极限科技,全称极限数据(北京)科技有限公司,是一家专注于实时搜索与数据分析的软件公司。旗下品牌极限实验室(INFINI Labs)致力于打造极致易用的数据探索与分析体验。

极限科技是一支年轻的团队,采用天然分布式的方式来进行远程协作,员工分布在全球各地,希望通过努力成为中国乃至全球企业大数据实时搜索分析产品的首选,为中国技术品牌输出添砖加瓦。

官网:https://infinilabs.cn

作者:金多安,极限科技(INFINI Labs)搜索运维专家,Elastic 认证专家,搜索客社区日报责任编辑。一直从事与搜索运维相关的工作,日常会去挖掘 ES / Lucene 方向的搜索技术原理,保持搜索相关技术发展的关注。
原文:https://infinilabs.cn/blog/2025/feature-evolution-from-elasticsearch-6.8-to-7.10-part-4/

继续阅读 »

这一篇我们继续了解 ES 7.10 相较于 ES 6.8 新增的聚合方法。

Rare Terms 聚合

功能说明

用于聚合查询出字段中的稀有项。ES 常见的统计方法是使用 term 查询的正向排序,但是在大数据量和高基数的数据分布场景下会出现 unbounded 错误。Rare 聚合弥补了这个场景的查询方法。注意的是,这个聚合计算出来的是一个近似值。

注意事项

使用限制

  • 只能用于 keyword、numeric、ip 或 boolean 类型字段
  • max_doc_count 参数限制文档数量(默认为 1)
  • precision_threshold 参数控制精度(默认为 3000)

性能考虑

  • 高基数(数据集中不同值的数量非常多)字段上性能较好
  • 内存消耗相对较大
  • 聚合是在 shard 层做的统计,建议使用合适的 shard 大小

精度控制

  • 结果是近似值,具体说明见此
  • 可以通过 precision_threshold 调整精度,精度越高,内存消耗越大

Cumulative Cardinality 聚合

功能说明

一个管道聚合,计算 histogram(或 date_histogram)聚合中的累积基数。 Cumulative_cardinality 聚合对于查找几个时间段内的"新项目"很有用,比如每天网站的新访客数量。常规 Cardinaity 聚合会告诉你每天有多少独立访客,但不会区分"新"或"重复"访客。Cumulative_cardinality 聚合可以用来确定每天有多少独立访问者是"新"的。

可以通过 precision_threshold 参数调整精度,内存消耗随精度增加而增加。建议根据实际需求调整精度,避免不必要的高精度设置。

使用要求

  • 需要一个 date_histogram 或 histogram 聚合
  • 需要一个 cardinality 度量聚合
  • buckets_path 必须指向一个有效的 cardinality 聚合

代码样例

GET /user_hits/_search
{
  "size": 0,
  "aggs": {
    "users_per_day": {
      "date_histogram": {
        "field": "timestamp",
        "calendar_interval": "day"
      },
      "aggs": {
        "distinct_users": {
          "cardinality": {
            "field": "user_id"
          }
        },
        "total_new_users": {
          "cumulative_cardinality": {
            "buckets_path": "distinct_users"
          }
        }
      }
    }
  }
}

Geotile Grid 聚合

功能说明

基于 geo_point 字段的地理位置多桶聚合。将地理空间数据按照网格划分,便于可视化和分析。

注意要点

  1. 网格设置: precision 参数控制网格精度(0-29),精度越高,网格越小,桶数越多。
  2. 高精度会产生大量桶,内存消耗随精度增加而增加。
  3. 只支持 geo_point 类型字段。

代码样例

POST /museums/_search?size=0
{
  "aggregations": {
    "tiles-in-bounds": {
      "geotile_grid": {
        "field": "location",
        "precision": 22,
        "bounds": {
          "top_left": "52.4, 4.9",
          "bottom_right": "52.3, 5.0"
        }
      }
    }
  }
}

T-test 聚合

功能说明

T_test 是一种统计假设检验,用于判断测试统计量在零假设下是否服从学生 t 分布(Student’s t-distribution)。它适用于从聚合文档中提取的数值或通过提供的脚本生成的数值。 该聚合将会返回该检验的 p 值(概率值)。它是在零假设正确的情况下(这意味着总体均值之间没有差异),获得至少与聚合所处理结果一样极端结果的概率。p 值越小,意味着零假设越有可能不正确,总体均值实际上是存在差异的。

关于 Student’s t-distribution

Student's t - distribution(学生 t - 分布),简称 t - 分布,是一种概率分布。它在统计学中具有重要地位,特别是在样本量较小且总体标准差未知的情况下用于对总体均值进行估计和假设检验。 它的形状类似于正态分布,呈钟形曲线,但比正态分布的 “尾部” 更厚。也就是说,t - 分布在均值两侧的极端值出现的概率比正态分布更高。

测试代码

GET node_upgrade/_search
{
  "size": 0,
  "aggs": {
    "startup_time_ttest": {
      "t_test": {
        "a": { "field": "startup_time_before" },
        "b": { "field": "startup_time_after" },
        "type": "paired"
      }
    }
  }
}

Variable Width Histogram 可变直方图聚合

功能说明

类似于 histogram 的多桶聚合。但与 histogram 不同,每个桶的宽度不是预先指定的,而是根据目标桶数量动态确定间隔

参数设置

  • field 必须是数值类型
  • buckets 参数指定目标桶数
  • 实际桶数可能少于指定值

性能考虑

  • 比固定宽度直方图更耗资源,大数据集上可能较慢
  • 建议限制目标桶数量

使用场景

  • 数据分布不均匀时特别有用
  • 适合探索性数据分析
  • 可以避免空桶或过密桶

Normalize 归一化聚合

功能说明

一个管道聚合,用于计算特定桶值的归一化或重新缩放后的值

方法选择

可以归一化处理的方法有:

  • rescale_0_1:0 到 1 重缩放,这种方法对数据进行重新缩放,使得最小值变为 0,最大值变为 1,其余数值则在两者之间进行线性归一化。
  • rescale_0_100:0 到 100 重缩放,该方法对数据进行重新缩放,让最小值变为 0,最大值变为 100,其余数值在它们之间按线性方式进行归一化。
  • percent_of_sum:占总和的百分比,此方法对每个值进行归一化,使其表示为占总值的百分比。
  • mean:均值归一化,这种方法进行归一化时,每个值依据其与平均值的差异程度来进行归一化。
  • zscore:Z 分数归一化,该方法进行归一化时,每个值表示的是其相对于标准差偏离均值的程度。
  • softmax:软最大化归一化,这种方法进行归一化时,先对每个值取指数,然后相对于原始值指数之和来进行归一化。

参数配置

  • method 参数指定归一化方法
  • buckets_path 指定数据来源
  • 可以设置缺失值处理

Moving Percentiles 聚合

功能介绍

一个管道聚合,对于一组有序的百分位数,移动百分位数聚合(Moving Percentile Aggregation)会在这些百分位数上滑动一个窗口,并计算累积百分位数。

关于 shift 偏移参数

默认情况下(偏移量 shift = 0 时),用于计算的窗口是除当前桶之外的最后 n 个值。将偏移量增加 1 会使起始窗口位置向右移动 1 个单位。

  • 若要将当前桶包含在窗口内,请使用 shift = 1。
  • 对于居中对齐(当前桶前后各有 n / 2 个值),使用 shift = window / 2。
  • 对于右对齐(当前桶之后有 n 个值),使用 shift = window。

如果窗口的任一边缘移动到数据序列边界之外,窗口将会收缩,仅包含可用的值。

Rate 聚合

功能说明

在 date_histogram 的聚合上使用,用于计算每个 date_histogram 桶中的文档速率或字段速率

rare 聚合支持多种时间单位(如秒、分、时),使用时需要明确指定单位。可以用来计算文档数或字段值,但必须与 date_histogram 一起使用。

小结

这一篇粗略的列举了 ES 7.10 版本中新增的聚合方法。相较于查询方法的高使用频率和低资源占用,聚合方法的使用频率相对较少,内存也会有一定的占用,大家可以根据实际场景选择使用。

推荐阅读

关于极限科技(INFINI Labs)

INFINI Labs

极限科技,全称极限数据(北京)科技有限公司,是一家专注于实时搜索与数据分析的软件公司。旗下品牌极限实验室(INFINI Labs)致力于打造极致易用的数据探索与分析体验。

极限科技是一支年轻的团队,采用天然分布式的方式来进行远程协作,员工分布在全球各地,希望通过努力成为中国乃至全球企业大数据实时搜索分析产品的首选,为中国技术品牌输出添砖加瓦。

官网:https://infinilabs.cn

作者:金多安,极限科技(INFINI Labs)搜索运维专家,Elastic 认证专家,搜索客社区日报责任编辑。一直从事与搜索运维相关的工作,日常会去挖掘 ES / Lucene 方向的搜索技术原理,保持搜索相关技术发展的关注。
原文:https://infinilabs.cn/blog/2025/feature-evolution-from-elasticsearch-6.8-to-7.10-part-4/

收起阅读 »

INFINI Console 纳管 Elasticsearch 9(一):指标监控、数据管理、DSL 语句执行

Elasticsearch v9.0 版本最近已发布,而 INFINI Console 作为一款开源的非常轻量级的多集群、跨版本的搜索基础设施统一管控平台,是否支持最新的 Elasticsearch v9.0 集群管理呢?本文以 INFINI Console v1.29.2 为例,从指标监控、数据管理、DSL 语句执行等方面进行测试。

部署注册

使用 Docker 快速部署 ES9。

docker run --name es9 -p 9201:9200 -it -m 1GB docker.elastic.co/elasticsearch/elasticsearch:9.0.0

使用 Docker 部署 Console,请参考文档

docker run -d --name console -p 9001:9000 infinilabs/console:1.29.2-2008

将 ES9 注册到 Console,默认采集模式为 Agentless。

请求模拟

使用 Loadgen 模拟数据写入和查询。

env:
  ES_USERNAME: elastic
  ES_PASSWORD: CZ-FHm+M5cbfee_yMPZp
  ES_ENDPOINT: https://192.168.0.101:9201

runner:
#  total_rounds: 1
  no_warm: true
  valid_status_codes_during_warmup: [ 200,201,404 ]
  # Whether to log all requests
  log_requests: false
  # Whether to log all requests with the specified response status
  log_status_codes:
    - 0
    - 500
  assert_invalid: false
  assert_error: false
  # Whether to reset the context, including variables, runtime KV pairs, etc.,
  # before this test run.
  reset_context: false
  default_endpoint: $[[env.ES_ENDPOINT]]
  default_basic_auth:
    username: $[[env.ES_USERNAME]]
    password: $[[env.ES_PASSWORD]]

variables:
  - name: id
    type: sequence
  - name: uuid
    type: uuid
  - name: now_local
    type: now_local
  - name: now_unix
    type: now_unix
  - name: list
    type: list
    data:
      - "medcl"
      - "abc"
      - "efg"
      - "xyz"

requests:
  - request: #prepare some docs
      method: POST
      url: /_bulk
      body_repeat_times: 5000
      body: |
        {"index": {"_index": "infinilabs", "_id": "$[[uuid]]"}}
        {"id": "$[[id]]", "field1": "$[[list]]", "now_local": "$[[now_local]]", "now_unix": "$[[now_unix]]"}

  - request:
      method: GET
      url: infinilabs/_search
      body: |
        {"query":{"term":{"id":"$[[id]]"}}}
./loadgen-mac-amd64 -d 300

平台管理

平台概览

监控报表

  • 指标概览

  • 集群指标

  • 节点指标

  • 索引指标

  • 节点线程指标

  • 节点热力图

  • 索引热力图

  • 查看日志

日志采集需要安装 Agent,关于这块功能后续会进行介绍。

  • 节点实时指标

  • 索引实时指标

数据管理

开发工具

总结

经过测试,INFINI Console 可以支持 Elasticsearch 9 集群纳管,大家可以下载体验使用。

关于 INFINI Console

INFINI Console 是一款开源的非常轻量级的多集群、跨版本的搜索基础设施统一管控平台。通过对流行的搜索引擎基础设施进行跨版本、多集群的集中纳管,企业可以快速方便的统一管理企业内部的不同版本的多套搜索集群。INFINI Console 还可以对集群内的索引及数据进行操作管理,可以配置灵活的告警规则,可以指定统一的安全策略,可以查看各个维度的日志和审计信息,真正实现企业级的搜索服务平台化建设和运营。

官网文档:https://docs.infinilabs.com/console
开源地址:https://github.com/infinilabs/console

原文:https://infinilabs.cn/blog/2025/time-range-mergepolicy-for-easysearch/

继续阅读 »

Elasticsearch v9.0 版本最近已发布,而 INFINI Console 作为一款开源的非常轻量级的多集群、跨版本的搜索基础设施统一管控平台,是否支持最新的 Elasticsearch v9.0 集群管理呢?本文以 INFINI Console v1.29.2 为例,从指标监控、数据管理、DSL 语句执行等方面进行测试。

部署注册

使用 Docker 快速部署 ES9。

docker run --name es9 -p 9201:9200 -it -m 1GB docker.elastic.co/elasticsearch/elasticsearch:9.0.0

使用 Docker 部署 Console,请参考文档

docker run -d --name console -p 9001:9000 infinilabs/console:1.29.2-2008

将 ES9 注册到 Console,默认采集模式为 Agentless。

请求模拟

使用 Loadgen 模拟数据写入和查询。

env:
  ES_USERNAME: elastic
  ES_PASSWORD: CZ-FHm+M5cbfee_yMPZp
  ES_ENDPOINT: https://192.168.0.101:9201

runner:
#  total_rounds: 1
  no_warm: true
  valid_status_codes_during_warmup: [ 200,201,404 ]
  # Whether to log all requests
  log_requests: false
  # Whether to log all requests with the specified response status
  log_status_codes:
    - 0
    - 500
  assert_invalid: false
  assert_error: false
  # Whether to reset the context, including variables, runtime KV pairs, etc.,
  # before this test run.
  reset_context: false
  default_endpoint: $[[env.ES_ENDPOINT]]
  default_basic_auth:
    username: $[[env.ES_USERNAME]]
    password: $[[env.ES_PASSWORD]]

variables:
  - name: id
    type: sequence
  - name: uuid
    type: uuid
  - name: now_local
    type: now_local
  - name: now_unix
    type: now_unix
  - name: list
    type: list
    data:
      - "medcl"
      - "abc"
      - "efg"
      - "xyz"

requests:
  - request: #prepare some docs
      method: POST
      url: /_bulk
      body_repeat_times: 5000
      body: |
        {"index": {"_index": "infinilabs", "_id": "$[[uuid]]"}}
        {"id": "$[[id]]", "field1": "$[[list]]", "now_local": "$[[now_local]]", "now_unix": "$[[now_unix]]"}

  - request:
      method: GET
      url: infinilabs/_search
      body: |
        {"query":{"term":{"id":"$[[id]]"}}}
./loadgen-mac-amd64 -d 300

平台管理

平台概览

监控报表

  • 指标概览

  • 集群指标

  • 节点指标

  • 索引指标

  • 节点线程指标

  • 节点热力图

  • 索引热力图

  • 查看日志

日志采集需要安装 Agent,关于这块功能后续会进行介绍。

  • 节点实时指标

  • 索引实时指标

数据管理

开发工具

总结

经过测试,INFINI Console 可以支持 Elasticsearch 9 集群纳管,大家可以下载体验使用。

关于 INFINI Console

INFINI Console 是一款开源的非常轻量级的多集群、跨版本的搜索基础设施统一管控平台。通过对流行的搜索引擎基础设施进行跨版本、多集群的集中纳管,企业可以快速方便的统一管理企业内部的不同版本的多套搜索集群。INFINI Console 还可以对集群内的索引及数据进行操作管理,可以配置灵活的告警规则,可以指定统一的安全策略,可以查看各个维度的日志和审计信息,真正实现企业级的搜索服务平台化建设和运营。

官网文档:https://docs.infinilabs.com/console
开源地址:https://github.com/infinilabs/console

原文:https://infinilabs.cn/blog/2025/time-range-mergepolicy-for-easysearch/

收起阅读 »

谈谈 ES 6.8 到 7.10 的功能变迁(3)- 查询方法篇

上一篇咱们了解了 ES 7.10 相较于 ES 6.8 新增的字段类型,这一篇我们继续了解新增的查询方法。

Interval 间隔查询:

功能介绍

Interval 查询,词项间距查询,可以根据匹配词项的顺序、间距和接近度对文档进行排名。主要解决的查询场景“创建一个多搜索词匹配的查询,同时保留搜索词的顺序”,比 match phrase 更加符合需求场景,查询方法使用比 span 查询更简单。ES 后续版本想用 interval 查询逐步替代 span 查询。

注意事项

规则组合

  • 可以使用 prefix、wildcard、fuzzy 等规则
  • 通过设置 max_gaps 和 ordered 参数,可以控制词项间的最大间隙和顺序要求

性能考虑

  • 间隔查询比简单的词项匹配更消耗资源
  • 嵌套规则越多,性能开销越大
  • 建议合理使用 maxGaps 参数限制间距

使用限制

  • 只能用于 text 字段
  • 不支持跨字段查询
  • 不支持对数值类型字段使用

Distance feature 查询

功能说明

时间/地理距离特性查询,该查询用于查找更接近被查询日期和地理位置的结果。 日期和位置分别是声明为 date 和 geo_point 数据类型的字段。返回结果的字段值不需要完全等于被查询值,而是按照给定日期或给定位置的进度算分,越是接近被查询值,在相关性得分中被评为更高

字段类型要求

  • 日期字段必须是 date 类型,地理位置字段必须是 geo_point 类型
  • 不支持其他类型的距离计算

评分机制

  • 距离越近,得分越高
  • 使用 boost 参数调整权重
  • 可以与其他查询组合使用

性能考虑

  • 地理距离计算较为耗费资源,建议使用合适的索引优化地理查询,比如:考虑使用地理网格索引提升性能

Pinned 查询

功能说明

实现对某些文档的置顶功能,使用存储在_id 字段中的文档 ID 来标识升级或“固定”的文档。
此功能通常用于引导搜索者查找精选的文档,这些文档在搜索的任何 “organic” 匹配项之上被提升。当查询中有排序时,pinned 查询失效

使用限制

  • 不能与自定义排序一起使用
  • 置顶文档必须存在于索引中
  • 最多支持 100 个置顶文档

排序规则

  • 置顶文档按照 ids 数组中的顺序排序
  • organic 查询结果按照相关性得分排序
  • 置顶文档始终在 organic 结果之前

PIT 查询

功能说明

Point in time 查询是一个轻量级的视图,根据保留周期保留 PIT 查询发生时数据的状态,用于不同条件的深度分页查询。

scroll 滚动搜索及其上下文与查询内容绑定。这意味着编写一个查询,添加一个滚动参数,来自这个查询的响应数据就会保持一致。不同的查询内容则会产生不同的 scroll 上下文,资源使用就会相对紧张。有时想对同一固定数据集适时运行不同的查询,就需要 PIT 查询。

如果对于一个不断变化的索引有着很高的搜索负载,那么为每个请求创建一个新的时间点查询会使用相当多的资源。可以通过使用一个后台进程每隔几分钟创建一个时间点 id 并将其用于所有搜索请求的方式来优化资源使用

更多内容可以参照这里

注意事项

资源管理

  • PIT 会占用系统资源,需要及时释放
  • 建议设置合理的保留时间
  • 监控 open context 数量

使用场景

  • 适合需要一致性视图的场景
  • 适合需要深度分页的场景
  • 适合需要在固定数据集上执行多次查询的场景

性能优化

  • 避免过长的保留时间,合理设置批次大小
  • 建议查询的时候带 sort 参数排序

测试代码

// 1. 使用 product_test 索引,创建 PIT
POST /product_test/_pit?keep_alive=1m

GET /_search
{
  "size": 1,
  "query": {
    "match": {
      "model": "iphone"
    }
  },
  "pit": {
    "id": "i6-xAwEMcHJvZHVjdF90ZXN0FmRObXltV3ZDU1VTTnllYjNoR0ZtamcAFk1GQklTWXBaUkllb2h1cGl1VVFsdUEAAAAAAAABZk8WOFVoUHBhN3BSVVN5TWVmeTh4d3JpdwEWZE5teW1XdkNTVVNOeWViM2hHRm1qZwAA",
    "keep_alive": "1m"
  },
  "sort": [
    {"_score": "desc"},
    {"_id": "asc"}
  ]
}

DELETE /_pit
{"id":"i6-xAwEMcHJvZHVjdF90ZXN0FmRObXltV3ZDU1VTTnllYjNoR0ZtamcAFk1GQklTWXBaUkllb2h1cGl1VVFsdUEAAAAAAAABZk8WOFVoUHBhN3BSVVN5TWVmeTh4d3JpdwEWZE5teW1XdkNTVVNOeWViM2hHRm1qZwAA"}

小结

作为查询方法,Elasticsearch 新增的几个功能面对的场景更加具象且方便,大家可以根据业务的场景特性可以优化使用。其中深度分页查询是 ES 使用者几乎避免不了的场景,PIT 查询也是提供了一个更为优质的方法。

推荐阅读

关于极限科技(INFINI Labs)

INFINI Labs

极限科技,全称极限数据(北京)科技有限公司,是一家专注于实时搜索与数据分析的软件公司。旗下品牌极限实验室(INFINI Labs)致力于打造极致易用的数据探索与分析体验。

极限科技是一支年轻的团队,采用天然分布式的方式来进行远程协作,员工分布在全球各地,希望通过努力成为中国乃至全球企业大数据实时搜索分析产品的首选,为中国技术品牌输出添砖加瓦。

官网:https://infinilabs.cn

作者:金多安,极限科技(INFINI Labs)搜索运维专家,Elastic 认证专家,搜索客社区日报责任编辑。一直从事与搜索运维相关的工作,日常会去挖掘 ES / Lucene 方向的搜索技术原理,保持搜索相关技术发展的关注。
原文:https://infinilabs.cn/blog/2025/feature-evolution-from-elasticsearch-6.8-to-7.10-part-3/

继续阅读 »

上一篇咱们了解了 ES 7.10 相较于 ES 6.8 新增的字段类型,这一篇我们继续了解新增的查询方法。

Interval 间隔查询:

功能介绍

Interval 查询,词项间距查询,可以根据匹配词项的顺序、间距和接近度对文档进行排名。主要解决的查询场景“创建一个多搜索词匹配的查询,同时保留搜索词的顺序”,比 match phrase 更加符合需求场景,查询方法使用比 span 查询更简单。ES 后续版本想用 interval 查询逐步替代 span 查询。

注意事项

规则组合

  • 可以使用 prefix、wildcard、fuzzy 等规则
  • 通过设置 max_gaps 和 ordered 参数,可以控制词项间的最大间隙和顺序要求

性能考虑

  • 间隔查询比简单的词项匹配更消耗资源
  • 嵌套规则越多,性能开销越大
  • 建议合理使用 maxGaps 参数限制间距

使用限制

  • 只能用于 text 字段
  • 不支持跨字段查询
  • 不支持对数值类型字段使用

Distance feature 查询

功能说明

时间/地理距离特性查询,该查询用于查找更接近被查询日期和地理位置的结果。 日期和位置分别是声明为 date 和 geo_point 数据类型的字段。返回结果的字段值不需要完全等于被查询值,而是按照给定日期或给定位置的进度算分,越是接近被查询值,在相关性得分中被评为更高

字段类型要求

  • 日期字段必须是 date 类型,地理位置字段必须是 geo_point 类型
  • 不支持其他类型的距离计算

评分机制

  • 距离越近,得分越高
  • 使用 boost 参数调整权重
  • 可以与其他查询组合使用

性能考虑

  • 地理距离计算较为耗费资源,建议使用合适的索引优化地理查询,比如:考虑使用地理网格索引提升性能

Pinned 查询

功能说明

实现对某些文档的置顶功能,使用存储在_id 字段中的文档 ID 来标识升级或“固定”的文档。
此功能通常用于引导搜索者查找精选的文档,这些文档在搜索的任何 “organic” 匹配项之上被提升。当查询中有排序时,pinned 查询失效

使用限制

  • 不能与自定义排序一起使用
  • 置顶文档必须存在于索引中
  • 最多支持 100 个置顶文档

排序规则

  • 置顶文档按照 ids 数组中的顺序排序
  • organic 查询结果按照相关性得分排序
  • 置顶文档始终在 organic 结果之前

PIT 查询

功能说明

Point in time 查询是一个轻量级的视图,根据保留周期保留 PIT 查询发生时数据的状态,用于不同条件的深度分页查询。

scroll 滚动搜索及其上下文与查询内容绑定。这意味着编写一个查询,添加一个滚动参数,来自这个查询的响应数据就会保持一致。不同的查询内容则会产生不同的 scroll 上下文,资源使用就会相对紧张。有时想对同一固定数据集适时运行不同的查询,就需要 PIT 查询。

如果对于一个不断变化的索引有着很高的搜索负载,那么为每个请求创建一个新的时间点查询会使用相当多的资源。可以通过使用一个后台进程每隔几分钟创建一个时间点 id 并将其用于所有搜索请求的方式来优化资源使用

更多内容可以参照这里

注意事项

资源管理

  • PIT 会占用系统资源,需要及时释放
  • 建议设置合理的保留时间
  • 监控 open context 数量

使用场景

  • 适合需要一致性视图的场景
  • 适合需要深度分页的场景
  • 适合需要在固定数据集上执行多次查询的场景

性能优化

  • 避免过长的保留时间,合理设置批次大小
  • 建议查询的时候带 sort 参数排序

测试代码

// 1. 使用 product_test 索引,创建 PIT
POST /product_test/_pit?keep_alive=1m

GET /_search
{
  "size": 1,
  "query": {
    "match": {
      "model": "iphone"
    }
  },
  "pit": {
    "id": "i6-xAwEMcHJvZHVjdF90ZXN0FmRObXltV3ZDU1VTTnllYjNoR0ZtamcAFk1GQklTWXBaUkllb2h1cGl1VVFsdUEAAAAAAAABZk8WOFVoUHBhN3BSVVN5TWVmeTh4d3JpdwEWZE5teW1XdkNTVVNOeWViM2hHRm1qZwAA",
    "keep_alive": "1m"
  },
  "sort": [
    {"_score": "desc"},
    {"_id": "asc"}
  ]
}

DELETE /_pit
{"id":"i6-xAwEMcHJvZHVjdF90ZXN0FmRObXltV3ZDU1VTTnllYjNoR0ZtamcAFk1GQklTWXBaUkllb2h1cGl1VVFsdUEAAAAAAAABZk8WOFVoUHBhN3BSVVN5TWVmeTh4d3JpdwEWZE5teW1XdkNTVVNOeWViM2hHRm1qZwAA"}

小结

作为查询方法,Elasticsearch 新增的几个功能面对的场景更加具象且方便,大家可以根据业务的场景特性可以优化使用。其中深度分页查询是 ES 使用者几乎避免不了的场景,PIT 查询也是提供了一个更为优质的方法。

推荐阅读

关于极限科技(INFINI Labs)

INFINI Labs

极限科技,全称极限数据(北京)科技有限公司,是一家专注于实时搜索与数据分析的软件公司。旗下品牌极限实验室(INFINI Labs)致力于打造极致易用的数据探索与分析体验。

极限科技是一支年轻的团队,采用天然分布式的方式来进行远程协作,员工分布在全球各地,希望通过努力成为中国乃至全球企业大数据实时搜索分析产品的首选,为中国技术品牌输出添砖加瓦。

官网:https://infinilabs.cn

作者:金多安,极限科技(INFINI Labs)搜索运维专家,Elastic 认证专家,搜索客社区日报责任编辑。一直从事与搜索运维相关的工作,日常会去挖掘 ES / Lucene 方向的搜索技术原理,保持搜索相关技术发展的关注。
原文:https://infinilabs.cn/blog/2025/feature-evolution-from-elasticsearch-6.8-to-7.10-part-3/

收起阅读 »

谈谈 ES 6.8 到 7.10 的功能变迁(2)- 字段类型篇

我们继续来了解一下从 ES 6.8 到 ES 7.10 新增的功能。本篇主要介绍新增的字段类型,会简要概述一下新增字段类型的使用场景和限制,提供简单的测试代码。

Flattened 扁平化对象字段

功能说明

解决场景

该功能主要用于处理具有大量不确定键的 JSON 对象,避免字段映射爆炸问题,特别适用于不需要对对象内部字段进行单独分析和聚合的场景,以及当对象结构不固定,字段名称动态变化时。

使用注意点

  • 整个对象被视为单个字段,无法对内部单个字段进行分析或聚合
  • 只支持 keyword 类型的操作,如 term、prefix 查询等。因为它的每个解析出的字段都值为 keyword 字段
  • 默认最大字段深度为 20,可以通过 depth_limit 来设置。
  • 不支持数字范围查询,高亮显示
  • 查询时,无法使用通配符引用字段键,比如 { "term": {"labels.time*": 1541457010}}
  • split_queries_on_whitespace 为 true 时,这个字段的全文查询(match,query_string,simple_query_string)等于是用了空格分词器。

支持的查询方法

  • term 查询:精确匹配某个字段的值
  • terms/terms_set 查询:匹配多个值中的任意一个或者多个
  • prefix 查询:前缀匹配
  • exists 查询:检查字段是否存在
  • match 查询:分词后的全文检索(但因为是 keyword,所以实际上是精确匹配)
  • query_string 和 simple_query_string

Shape 字段

功能说明

该功能主要用于存储和查询任意几何图形数据,支持点、线、圆、矩形、多边形等几何形状,特别适用于地理空间分析和基于形状的搜索场景,以及相比于 geo_shape 专门用于地理空间数据(坐标系统固定为 WGS84 经纬度),shape 字段可以用于任意坐标系统的几何形状,比如虚拟世界或者保密空间。

使用要点

  • 形状数据使用 GeoJSONWell-Known Text (WKT)格式表示
  • 支持的空间关系操作包括:INTERSECTS(相交)、DISJOINT(不相交)、WITHIN(内部)和 CONTAINS(包含)
  • 由于 shape 字段的复杂输入结构和索引表示,目前无法对 shape 字段进行排序,也无法直接检索其字段值(只能通过 _source 字段获取)
  • orientation 参数用于定义多边形顶点读取的顺序:默认逆时针(counterclockwise),可选顺时针(clockwise)

Wildcard 字段

功能说明

解决的场景

用于优化通配符和正则表达式查询性能。特别适用于需要进行暴力模糊匹配的文本字段。 wildcard 字段在 keyword 字段和 ngrams 分词器之间找到了性能和存储成本的均衡

原理说明

wildcard 字段采用了一种独特的索引策略,使用这种数据类型自动加速通配符和正则表达式搜索:

  1. n-gram 索引:存储字符串中所有 3 个字符的序列。可以理解为 ngram 分词器取长度为 3 的子串.
  2. 二进制 doc value:存储完整的原始文档值

这种方式与传统的 keyword 字段(完全不分词)和 text 字段(基于分词器分词)有很大不同。在查询时,系统首先使用这些 ngrams 构建的索引进行初步筛选,快速定位可能匹配的文档。这个过程类似于数据库中的索引过滤,可以大大减少需要详细检查的文档数量。然后,系统会从二进制 doc value 中取出这些候选文档的完整字段值,进行精确的模式匹配,确保最终结果的准确性。

这种策略在性能和存储空间上取得了很好的平衡:

  • 相比 keyword 字段,在精确匹配查询时性能稍差
  • 相比 ngrams 分词,它避免了过度的 token 生成,既节省存储空间,又保持了不错的查询性能
  • 特别适合对日志数据进行类似 grep 的模式匹配查询,如通配符查询和正则表达式查询

使用注意点

  • 比 keyword 类型更适合做通配符搜索,但会占用更多磁盘空间
  • 建议仅在确实需要频繁进行通配符查询的字段上使用
  • 与 keyword 字段一样是非分词(untokenized)的,所以不支持依赖词项位置的查询(如短语查询)

更多原理说明可以参照这里

Version 字段类型

功能说明

该功能主要用于存储和比较软件版本号,提供版本号的自然排序和比较功能,支持标准的版本号格式(如 1.0.0, 2.1.3-alpha 等)。

使用说明

  • 版本号必须符合标准格式:主版本号、次版本号、修订号
  • 支持带有预发布标识符的版本号(如 alpha, beta, rc 等)
  • 可以进行版本号的大小比较和范围查询,比如可以在 1.0.0 和 2.0.0 之间查找所有版本
  • 底层存储的是 keyword 类型,虽然可以正则匹配或者模糊查询,但是在大数据量下注意性能问题。

Histogram 直方图字段

功能说明

解决的场景

Histogram 字段专门用于存储预先计算好的直方图数据,这种数据结构在需要频繁进行统计分析的场景下特别有用。通过预先聚合数据并以直方图形式存储,可以显著减少查询时的实时计算开销,提高查询性能。对大规模数据集的统计分析很有利,比如系统监控指标、用户行为分析等需要快速获取数据分布情况的场景。

直方图的数据格式

直方图字段需要包含两个必需的参数来表示直方图数据:

  • values:数值数组,表示每个聚合分桶的取值点,即区间的起点。数组中的值必须是严格递增双精度浮点数
  • counts:整数数组,表示落在当前区间内的元素数量。数组中的值必须是非负整数。具体来说,对于位置 i,counts[i] 表示数值大于等于 values[i] 且小于 values[i+1] 的元素个数(最后一个区间则包含等于 values[last] 的元素)

这两个数组必须具有相同的长度。 例如,一个表示年龄分布的直方图数据可能如下:

{
  "age_histogram": {
    "values": [20.0, 30.0, 40.0, 50.0], // 年龄区间的起点:[20-30)、[30-40)、[40-50)、[50 及以上]
    "counts": [100, 150, 75, 25] // 表示:20-29 岁有 100 人,30-39 岁有 150 人,40-49 岁有 75 人,50 岁及以上有 25 人
  }
}

存储说明

直方图字段主要用于聚合分析。为了优化聚合操作的性能,数据以二进制 doc values 形式存储,而不是创建索引。每个直方图字段的存储大小最多为 13 * numValues 字节,其中 numValues 是数组的长度。

使用注意点

  • value 数组必须按升序排列
  • count 数组的长度必须与 value 数组相同
  • 由于数据被索引,直方图字段仅支持以下操作:
    • 聚合操作:min(最小值)、max(最大值)、sum(求和)、value_count(计数)、avg(平均值)、percentiles(百分位数)、percentile_ranks(百分位等级)、boxplot(箱线图)、histogram(直方图)
    • 查询操作:仅支持 exists(存在性)查询

Search-as-you-type 字段

功能说明

解决的场景

该功能主要用于实现自动补全的即时搜索体验,比如可以在当用户输入搜索关键词的时候,还没输完就可以提示用户数据库里最相关的内容。 它是通过支持前缀匹配和部分词语匹配的方式,使用户在输入过程中就能获得搜索结果。

字段结构说明

search_as_you_type 字段会自动创建以下子字段:

  • {field_name}: 基础字段,用于完整词语匹配,使用字段设置的分词器。
  • {field_name}._2gram: 用大小为 2 的 shingle token filter 分词器对 ny_field 进行分词
  • {field_name}._3gram: 用大小为 3 的 shingle token filter 分词器对 ny_field 进行分词
  • {field_name}._index_prefix: 用 edge ngram token filter 包装 my_field._3gram 的分词器

使用注意

  • 可以通过设置 max_shingle_size 参数(默认为 3)来控制生成的子字段数量。max_shingle_size 参数越大,子字段越多,默认 3 个字段能覆盖大部分的场景。
  • search-as-you-type 字段索引空间占用较大,因为需要存储多个分词器的结果。

前缀查询(prefix)优化机制

search_as_you_type 字段在处理前缀查询时有特殊的优化。当对根字段或其子字段进行前缀查询时,查询会被重写为针对 ._index_prefix 子字段的 term 查询,这比传统的文本字段前缀查询更高效 ._index_prefix 子字段的分析器会对分词结果进行特殊处理:

  • 不仅索引常规的 n-gram 分词结果
  • 还会索引字段值词条的前缀,即使这些词条不会出现在 n-gram 子字段中

例如,对于文本 "quick brown fox",在 max_shingle_size 为 3 的情况下:不仅会索引会索引"quick" "brown" "fox",还会索引这些此项的前缀,这确保了字段中所有词条都能支持自动完成功能,即使这些词条组合并不存在于 ._3gram 子字段中

小结

这里我们只是简要的介绍了一下新增字段类型的设计背景和原理,并没有深入展开挖掘新增字段的性能场景或者奇思妙用,比如 wildcard 与 ngram 颗粒度性能对比,直方图数据和 pipeline 进行日志分析的组合等等。大家可以根据实际的需求场景进行深度挖掘。

推荐阅读

关于极限科技(INFINI Labs)

INFINI Labs

极限科技,全称极限数据(北京)科技有限公司,是一家专注于实时搜索与数据分析的软件公司。旗下品牌极限实验室(INFINI Labs)致力于打造极致易用的数据探索与分析体验。

极限科技是一支年轻的团队,采用天然分布式的方式来进行远程协作,员工分布在全球各地,希望通过努力成为中国乃至全球企业大数据实时搜索分析产品的首选,为中国技术品牌输出添砖加瓦。

官网:https://infinilabs.cn

作者:金多安,极限科技(INFINI Labs)搜索运维专家,Elastic 认证专家,搜索客社区日报责任编辑。一直从事与搜索运维相关的工作,日常会去挖掘 ES / Lucene 方向的搜索技术原理,保持搜索相关技术发展的关注。

继续阅读 »

我们继续来了解一下从 ES 6.8 到 ES 7.10 新增的功能。本篇主要介绍新增的字段类型,会简要概述一下新增字段类型的使用场景和限制,提供简单的测试代码。

Flattened 扁平化对象字段

功能说明

解决场景

该功能主要用于处理具有大量不确定键的 JSON 对象,避免字段映射爆炸问题,特别适用于不需要对对象内部字段进行单独分析和聚合的场景,以及当对象结构不固定,字段名称动态变化时。

使用注意点

  • 整个对象被视为单个字段,无法对内部单个字段进行分析或聚合
  • 只支持 keyword 类型的操作,如 term、prefix 查询等。因为它的每个解析出的字段都值为 keyword 字段
  • 默认最大字段深度为 20,可以通过 depth_limit 来设置。
  • 不支持数字范围查询,高亮显示
  • 查询时,无法使用通配符引用字段键,比如 { "term": {"labels.time*": 1541457010}}
  • split_queries_on_whitespace 为 true 时,这个字段的全文查询(match,query_string,simple_query_string)等于是用了空格分词器。

支持的查询方法

  • term 查询:精确匹配某个字段的值
  • terms/terms_set 查询:匹配多个值中的任意一个或者多个
  • prefix 查询:前缀匹配
  • exists 查询:检查字段是否存在
  • match 查询:分词后的全文检索(但因为是 keyword,所以实际上是精确匹配)
  • query_string 和 simple_query_string

Shape 字段

功能说明

该功能主要用于存储和查询任意几何图形数据,支持点、线、圆、矩形、多边形等几何形状,特别适用于地理空间分析和基于形状的搜索场景,以及相比于 geo_shape 专门用于地理空间数据(坐标系统固定为 WGS84 经纬度),shape 字段可以用于任意坐标系统的几何形状,比如虚拟世界或者保密空间。

使用要点

  • 形状数据使用 GeoJSONWell-Known Text (WKT)格式表示
  • 支持的空间关系操作包括:INTERSECTS(相交)、DISJOINT(不相交)、WITHIN(内部)和 CONTAINS(包含)
  • 由于 shape 字段的复杂输入结构和索引表示,目前无法对 shape 字段进行排序,也无法直接检索其字段值(只能通过 _source 字段获取)
  • orientation 参数用于定义多边形顶点读取的顺序:默认逆时针(counterclockwise),可选顺时针(clockwise)

Wildcard 字段

功能说明

解决的场景

用于优化通配符和正则表达式查询性能。特别适用于需要进行暴力模糊匹配的文本字段。 wildcard 字段在 keyword 字段和 ngrams 分词器之间找到了性能和存储成本的均衡

原理说明

wildcard 字段采用了一种独特的索引策略,使用这种数据类型自动加速通配符和正则表达式搜索:

  1. n-gram 索引:存储字符串中所有 3 个字符的序列。可以理解为 ngram 分词器取长度为 3 的子串.
  2. 二进制 doc value:存储完整的原始文档值

这种方式与传统的 keyword 字段(完全不分词)和 text 字段(基于分词器分词)有很大不同。在查询时,系统首先使用这些 ngrams 构建的索引进行初步筛选,快速定位可能匹配的文档。这个过程类似于数据库中的索引过滤,可以大大减少需要详细检查的文档数量。然后,系统会从二进制 doc value 中取出这些候选文档的完整字段值,进行精确的模式匹配,确保最终结果的准确性。

这种策略在性能和存储空间上取得了很好的平衡:

  • 相比 keyword 字段,在精确匹配查询时性能稍差
  • 相比 ngrams 分词,它避免了过度的 token 生成,既节省存储空间,又保持了不错的查询性能
  • 特别适合对日志数据进行类似 grep 的模式匹配查询,如通配符查询和正则表达式查询

使用注意点

  • 比 keyword 类型更适合做通配符搜索,但会占用更多磁盘空间
  • 建议仅在确实需要频繁进行通配符查询的字段上使用
  • 与 keyword 字段一样是非分词(untokenized)的,所以不支持依赖词项位置的查询(如短语查询)

更多原理说明可以参照这里

Version 字段类型

功能说明

该功能主要用于存储和比较软件版本号,提供版本号的自然排序和比较功能,支持标准的版本号格式(如 1.0.0, 2.1.3-alpha 等)。

使用说明

  • 版本号必须符合标准格式:主版本号、次版本号、修订号
  • 支持带有预发布标识符的版本号(如 alpha, beta, rc 等)
  • 可以进行版本号的大小比较和范围查询,比如可以在 1.0.0 和 2.0.0 之间查找所有版本
  • 底层存储的是 keyword 类型,虽然可以正则匹配或者模糊查询,但是在大数据量下注意性能问题。

Histogram 直方图字段

功能说明

解决的场景

Histogram 字段专门用于存储预先计算好的直方图数据,这种数据结构在需要频繁进行统计分析的场景下特别有用。通过预先聚合数据并以直方图形式存储,可以显著减少查询时的实时计算开销,提高查询性能。对大规模数据集的统计分析很有利,比如系统监控指标、用户行为分析等需要快速获取数据分布情况的场景。

直方图的数据格式

直方图字段需要包含两个必需的参数来表示直方图数据:

  • values:数值数组,表示每个聚合分桶的取值点,即区间的起点。数组中的值必须是严格递增双精度浮点数
  • counts:整数数组,表示落在当前区间内的元素数量。数组中的值必须是非负整数。具体来说,对于位置 i,counts[i] 表示数值大于等于 values[i] 且小于 values[i+1] 的元素个数(最后一个区间则包含等于 values[last] 的元素)

这两个数组必须具有相同的长度。 例如,一个表示年龄分布的直方图数据可能如下:

{
  "age_histogram": {
    "values": [20.0, 30.0, 40.0, 50.0], // 年龄区间的起点:[20-30)、[30-40)、[40-50)、[50 及以上]
    "counts": [100, 150, 75, 25] // 表示:20-29 岁有 100 人,30-39 岁有 150 人,40-49 岁有 75 人,50 岁及以上有 25 人
  }
}

存储说明

直方图字段主要用于聚合分析。为了优化聚合操作的性能,数据以二进制 doc values 形式存储,而不是创建索引。每个直方图字段的存储大小最多为 13 * numValues 字节,其中 numValues 是数组的长度。

使用注意点

  • value 数组必须按升序排列
  • count 数组的长度必须与 value 数组相同
  • 由于数据被索引,直方图字段仅支持以下操作:
    • 聚合操作:min(最小值)、max(最大值)、sum(求和)、value_count(计数)、avg(平均值)、percentiles(百分位数)、percentile_ranks(百分位等级)、boxplot(箱线图)、histogram(直方图)
    • 查询操作:仅支持 exists(存在性)查询

Search-as-you-type 字段

功能说明

解决的场景

该功能主要用于实现自动补全的即时搜索体验,比如可以在当用户输入搜索关键词的时候,还没输完就可以提示用户数据库里最相关的内容。 它是通过支持前缀匹配和部分词语匹配的方式,使用户在输入过程中就能获得搜索结果。

字段结构说明

search_as_you_type 字段会自动创建以下子字段:

  • {field_name}: 基础字段,用于完整词语匹配,使用字段设置的分词器。
  • {field_name}._2gram: 用大小为 2 的 shingle token filter 分词器对 ny_field 进行分词
  • {field_name}._3gram: 用大小为 3 的 shingle token filter 分词器对 ny_field 进行分词
  • {field_name}._index_prefix: 用 edge ngram token filter 包装 my_field._3gram 的分词器

使用注意

  • 可以通过设置 max_shingle_size 参数(默认为 3)来控制生成的子字段数量。max_shingle_size 参数越大,子字段越多,默认 3 个字段能覆盖大部分的场景。
  • search-as-you-type 字段索引空间占用较大,因为需要存储多个分词器的结果。

前缀查询(prefix)优化机制

search_as_you_type 字段在处理前缀查询时有特殊的优化。当对根字段或其子字段进行前缀查询时,查询会被重写为针对 ._index_prefix 子字段的 term 查询,这比传统的文本字段前缀查询更高效 ._index_prefix 子字段的分析器会对分词结果进行特殊处理:

  • 不仅索引常规的 n-gram 分词结果
  • 还会索引字段值词条的前缀,即使这些词条不会出现在 n-gram 子字段中

例如,对于文本 "quick brown fox",在 max_shingle_size 为 3 的情况下:不仅会索引会索引"quick" "brown" "fox",还会索引这些此项的前缀,这确保了字段中所有词条都能支持自动完成功能,即使这些词条组合并不存在于 ._3gram 子字段中

小结

这里我们只是简要的介绍了一下新增字段类型的设计背景和原理,并没有深入展开挖掘新增字段的性能场景或者奇思妙用,比如 wildcard 与 ngram 颗粒度性能对比,直方图数据和 pipeline 进行日志分析的组合等等。大家可以根据实际的需求场景进行深度挖掘。

推荐阅读

关于极限科技(INFINI Labs)

INFINI Labs

极限科技,全称极限数据(北京)科技有限公司,是一家专注于实时搜索与数据分析的软件公司。旗下品牌极限实验室(INFINI Labs)致力于打造极致易用的数据探索与分析体验。

极限科技是一支年轻的团队,采用天然分布式的方式来进行远程协作,员工分布在全球各地,希望通过努力成为中国乃至全球企业大数据实时搜索分析产品的首选,为中国技术品牌输出添砖加瓦。

官网:https://infinilabs.cn

作者:金多安,极限科技(INFINI Labs)搜索运维专家,Elastic 认证专家,搜索客社区日报责任编辑。一直从事与搜索运维相关的工作,日常会去挖掘 ES / Lucene 方向的搜索技术原理,保持搜索相关技术发展的关注。

收起阅读 »