愚者求师之过,智者从师之长。

【搜索客社区日报】第2087期 (2025-08-04)

社区日报Muses 发表了文章 • 0 个评论 • 1436 次浏览 • 6 天前 • 来自相关话题

1、IK 字段级别词典升级:IK reload API
https://infinilabs.cn/blog/202 ... ys-2/

2、IK 字段级别词典的升级之路
https://infinilabs.cn/blog/202 ... ys-3/

3、使用 Elasticsearch 和 AI 构建智能重复项检测
https://elasticstack.blog.csdn ... 96938

4、上下文工程(Context Engineering)综述:大模型的下一个前沿
https://mp.weixin.qq.com/s/hsalUdlx0KMtQh8U2gvnnw

5、Elasticsearch:在 Elastic 中玩转 DeepSeek R1 来实现 RAG 应用
https://elasticstack.blog.csdn ... 32862

编辑:Muse
更多资讯:http://news.searchkit.cn

【搜索客社区日报】第2088期 (2025-08-05)

社区日报God_lockin 发表了文章 • 0 个评论 • 1156 次浏览 • 4 天前 • 来自相关话题

1. ids/ips 路由快到碗里来(需要梯子)
https://blog.devgenius.io/buil ... 0d79c
2. mongodb和ES的左右互搏之术(需要梯子)
https://medium.com/%40kumarjai ... 5284e
3. opensearch 里的性能瓶颈?不能忍(需要梯子)
https://medium.com/collaborne- ... 71ace
编辑:斯蒂文
更多资讯:http://news.searchkit.cn
 

【搜索客社区日报】第2089期 (2025-08-06)

社区日报kin122 发表了文章 • 0 个评论 • 986 次浏览 • 4 天前 • 来自相关话题

1.Kafka, A2A, MCP, 和 Flink:AI 代理的新栈(搭梯)
https://seanfalconer.medium.co ... 85b72

2.Text-to-SQL的自动优化:性能分析阶段(搭梯)
https://medium.com/%40DaveLumA ... f6b43

3.Elasticsearch 插件用于 UBI:分析用户搜索行为
https://cloud.tencent.com/deve ... 51491


编辑:kin122 
更多资讯:http://news.searchkit.cn

ES 调优帖:Gateway 批量写入性能优化实践

ElasticsearchINFINI Labs 小助手 发表了文章 • 0 个评论 • 1084 次浏览 • 3 天前 • 来自相关话题


背景:bulk 优化的应用


在 ES 的写入优化里,bulk 操作被广泛地用于批量处理数据。bulk 操作允许用户一次提交多个数据操作,如索引、更新、删除等,从而提高数据处理效率。bulk 操作的实现原理是,将数据操作请求打包成 HTTP 请求,并批量提交给 Elasticsearch 服务器。这样,Elasticsearch 服务器就可以一次处理多个数据操作,从而提高处理效率。

这种优化的核心价值在于减少了网络往返的次数和连接建立的开销。每一次单独的写入操作都需要经历完整的请求-响应周期,而批量写入则是将多个操作打包在一起,用一次通信完成原本需要多次交互的工作。这不仅仅节省了时间,更重要的是释放了系统资源,让服务器能够专注于真正的数据处理,而不是频繁的协议握手和状态维护。

这样的批量请求的确是可以优化写入请求的效率,让 ES 集群获得更多的资源去做写入请求的集中处理。但是除了客户端与 ES 集群的通讯效率优化,还有其他中间过程能优化么?

Gateway 的优化点


bulk 的优化理念是将日常零散的写入需求做集中化的处理,尽量减低日常请求的损耗,完成资源最大化的利用。简而言之就是“好钢用在刀刃上”。

但是 ES 在收到 bulk 写入请求后,也是需要协调节点根据文档的 id 计算所属的分片来将数据分发到对应的数据节点的。这个过程也是有一定损耗的,如果 bulk 请求中数据分布的很散,每个分片都需要进行写入,原本 bulk 集中写入的需求优势则还是没有得到最理想化的提升。

[gateway](https://docs.infinilabs.com/gateway/main/zh/) 的写入加速则对 bulk 的优化理念的最大化补全。

gateway 可以本地计算每个索引文档对应后端 Elasticsearch 集群的目标存放位置,从而能够精准的进行写入请求定位

在一批 bulk 请求中,可能存在多个后端节点的数据,bulk_reshuffle 过滤器用来将正常的 bulk 请求打散,按照目标节点或者分片进行拆分重新组装,避免 Elasticsearch 节点收到请求之后再次进行请求分发, 从而降低 Elasticsearch 集群间的流量和负载,也能避免单个节点成为热点瓶颈,确保各个数据节点的处理均衡,从而提升集群总体的索引吞吐能力。

整理的优化思路如下图:

![](https://infinilabs.cn/img/blog ... k1.png)

优化实践


那我们来实践一下,看看 gateway 能提升多少的写入。

这里我们分 2 个测试场景:

  1. 基础集中写入测试,不带文档 id,直接批量写入。这个场景更像是日志或者监控数据采集的场景。
  2. 带文档 id 的写入测试,更偏向搜索场景或者大数据批同步的场景。

    2 个场景都进行直接写入 ES 和 gateway 转发 ES 的效率比对。

    测试材料除了需要备一个网关和一套 es 外,其余的内容如下:

    测试索引 mapping 一致,名称区分:

    <br /> PUT gateway_bulk_test<br /> {<br /> "settings": {<br /> "number_of_shards": 6,<br /> "number_of_replicas": 0<br /> },<br /> "mappings": {<br /> "properties": {<br /> "timestamp": {<br /> "type": "date",<br /> "format": "strict_date_optional_time"<br /> },<br /> "field1": {<br /> "type": "keyword"<br /> },<br /> "field2": {<br /> "type": "keyword"<br /> },<br /> "field3": {<br /> "type": "keyword"<br /> },<br /> "field4": {<br /> "type": "integer"<br /> },<br /> "field5": {<br /> "type": "keyword"<br /> },<br /> "field6": {<br /> "type": "float"<br /> }<br /> }<br /> }<br /> }<br /> <br /> PUT bulk_test<br /> {<br /> "settings": {<br /> "number_of_shards": 6,<br /> "number_of_replicas": 0<br /> },<br /> "mappings": {<br /> "properties": {<br /> "timestamp": {<br /> "type": "date",<br /> "format": "strict_date_optional_time"<br /> },<br /> "field1": {<br /> "type": "keyword"<br /> },<br /> "field2": {<br /> "type": "keyword"<br /> },<br /> "field3": {<br /> "type": "keyword"<br /> },<br /> "field4": {<br /> "type": "integer"<br /> },<br /> "field5": {<br /> "type": "keyword"<br /> },<br /> "field6": {<br /> "type": "float"<br /> }<br /> }<br /> }<br /> }<br />

    gateway 的配置文件如下:

    ```yaml
    path.data: data
    path.logs: log

    entry:
    • name: my_es_entry
      enabled: true
      router: my_router
      max_concurrency: 200000
      network:
      binding: 0.0.0.0:8000

      flow:
    • name: async_bulk
      filter:
      • bulk_reshuffle:
        when:
        contains:
        _ctx.request.path: /_bulk
        elasticsearch: prod
        level: node
        partition_size: 1
        fix_null_id: true
      • elasticsearch:
        elasticsearch: prod #elasticsearch configure reference name
        max_connection_per_node: 1000 #max tcp connection to upstream, default for all nodes
        max_response_size: -1 #default for all nodes
        balancer: weight
        refresh: # refresh upstream nodes list, need to enable this feature to use elasticsearch nodes auto discovery
        enabled: true
        interval: 60s
        filter:
        roles:
        exclude:
        • master

          router:
    • name: my_router
      default_flow: async_bulk

      elasticsearch:
    • name: prod
      enabled: true
      endpoints:
    • name: bulk_request_ingest
      auto_start: true
      keep_running: true
      retry_delay_in_ms: 1000
      processor:
      • bulk_indexing:
        max_connection_per_node: 100
        num_of_slices: 3
        max_worker_size: 30
        idle_timeout_in_seconds: 10
        bulk:
        compress: false
        batch_size_in_mb: 10
        batch_size_in_docs: 10000
        consumer:
        fetch_max_messages: 100
        queue_selector:
        labels:
        type: bulk_reshuffle
        <br /> <br /> 测试脚本如下:<br /> <br /> python

        !/usr/bin/env python3

        """
        ES Bulk写入性能测试脚本

        """

        import hashlib
        import json
        import random
        import string
        import time
        from typing import List, Dict, Any

        import requests
        from concurrent.futures import ThreadPoolExecutor
        from datetime import datetime
        import urllib3

        禁用SSL警告

        urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


        class ESBulkTester:
        def init(self):

        配置变量 - 可修改

        self.es_configs = [
        {
        "name": "ES直连",
        "url": "<a href="https://127.0.0.1:9221"" rel="nofollow" target="_blank">https://127.0.0.1:9221",
        "index": "bulk_test",
        "username": "admin", # 修改为实际用户名
        "password": "admin", # 修改为实际密码
        "verify_ssl": False # HTTPS需要SSL验证
        },
        {
        "name": "Gateway代理",
        "url": "<a href="http://localhost:8000"" rel="nofollow" target="_blank">http://localhost:8000",
        "index": "gateway_bulk_test",
        "username": None, # 无需认证
        "password": None,
        "verify_ssl": False
        }
        ]
        self.batch_size = 10000 # 每次bulk写入条数
        self.log_interval = 100000 # 每多少次bulk写入输出日志

        ID生成规则配置 - 前2位后5位

        self.id_prefix_start = 1
        self.id_prefix_end = 999 # 前3位: 01-999
        self.id_suffix_start = 1
        self.id_suffix_end = 9999 # 后4位: 0001-9999

        当前ID计数器

        self.current_prefix = self.id_prefix_start
        self.current_suffix = self.id_suffix_start

        def generate_id(self) -> str:
        """生成固定规则的ID - 前2位后5位"""
        id_str = f"{self.current_prefix:02d}{self.current_suffix:05d}"

        更新计数器

        self.current_suffix += 1
        if self.current_suffix > self.id_suffix_end:
        self.current_suffix = self.id_suffix_start
        self.current_prefix += 1
        if self.current_prefix > self.id_prefix_end:
        self.current_prefix = self.id_prefix_start

        return id_str

        def generate_random_hash(self, length: int = 32) -> str:
        """生成随机hash值"""
        random_string = ''.join(random.choices(string.ascii_letters + string.digits, k=length))
        return hashlib.md5(random_string.encode()).hexdigest()

        def generate_document(self) -> Dict[str, Any]:
        """生成随机文档内容"""
        return {
        "timestamp": datetime.now().isoformat(),
        "field1": self.generate_random_hash(),
        "field2": self.generate_random_hash(),
        "field3": self.generate_random_hash(),
        "field4": random.randint(1, 1000),
        "field5": random.choice(["A", "B", "C", "D"]),
        "field6": random.uniform(0.1, 100.0)
        }

        def create_bulk_payload(self, index_name: str) -> str:
        """创建bulk写入payload"""
        bulkdata = []

        for
        in range(self.batch_size):

        doc_id = self.generate_id()

        doc = self.generate_document()

        添加index操作

        bulk_data.append(json.dumps({
        "index": {
        "_index": index_name,

        "_id": doc_id

        }<br />

        }))
        bulk_data.append(json.dumps(doc))

        return "\n".join(bulk_data) + "\n"

        def bulk_index(self, config: Dict[str, Any], payload: str) -> bool:
        """执行bulk写入"""
        url = f"{config['url']}/_bulk"
        headers = {
        "Content-Type": "application/x-ndjson"
        }

        设置认证信息

        auth = None
        if config.get('username') and config.get('password'):
        auth = (config['username'], config['password'])

        try:
        response = requests.post(
        url,
        data=payload,
        headers=headers,
        auth=auth,
        verify=config.get('verify_ssl', True),
        timeout=30
        )
        return response.status_code == 200
        except Exception as e:
        print(f"Bulk写入失败: {e}")
        return False

        def refresh_index(self, config: Dict[str, Any]) -> bool:
        """刷新索引"""
        url = f"{config['url']}/{config['index']}/_refresh"

        设置认证信息

        auth = None
        if config.get('username') and config.get('password'):
        auth = (config['username'], config['password'])

        try:
        response = requests.post(
        url,
        auth=auth,
        verify=config.get('verify_ssl', True),
        timeout=10
        )
        success = response.status_code == 200
        print(f"索引刷新{'成功' if success else '失败'}: {config['index']}")
        return success
        except Exception as e:
        print(f"索引刷新失败: {e}")
        return False

        def run_test(self, config: Dict[str, Any], round_num: int, total_iterations: int = 100000):
        """运行性能测试"""
        test_name = f"{config['name']}-第{round_num}轮"
        print(f"\n开始测试: {test_name}")
        print(f"ES地址: {config['url']}")
        print(f"索引名称: {config['index']}")
        print(f"认证: {'是' if config.get('username') else '否'}")
        print(f"每次bulk写入: {self.batch_size}条")
        print(f"总计划写入: {total_iterations self.batch_size}条")
        print("-"
        50)

        start_time = time.time()
        success_count = 0
        error_count = 0

        for i in range(1, total_iterations + 1):
        payload = self.create_bulk_payload(config['index'])

        if self.bulk_index(config, payload):
        success_count += 1
        else:
        error_count += 1

        每N次输出日志

        if i % self.log_interval == 0:
        elapsed_time = time.time() - start_time
        rate = i / elapsed_time if elapsed_time > 0 else 0
        print(f"已完成 {i:,} 次bulk写入, 耗时: {elapsed_time:.2f}秒, 速率: {rate:.2f} bulk/秒")

        end_time = time.time()
        total_time = end_time - start_time
        total_docs = total_iterations self.batch_size

        print(f"\n{test_name} 测试完成!")
        print(f"总耗时: {total_time:.2f}秒")
        print(f"成功bulk写入: {success_count:,}次")
        print(f"失败bulk写入: {error_count:,}次")
        print(f"总文档数: {total_docs:,}条")
        print(f"平均速率: {success_count/total_time:.2f} bulk/秒")
        print(f"文档写入速率: {total_docs/total_time:.2f} docs/秒")
        print("="
        60)

        return {
        "test_name": test_name,
        "config_name": config['name'],
        "round": round_num,
        "es_url": config['url'],
        "index": config['index'],
        "total_time": total_time,
        "success_count": success_count,
        "error_count": error_count,
        "total_docs": total_docs,
        "bulk_rate": success_count/total_time,
        "doc_rate": total_docs/total_time
        }

        def run_comparison_test(self, total_iterations: int = 10000):
        """运行双地址对比测试"""
        print("ES Bulk写入性能测试开始")
        print(f"测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print("=" * 60)

        results = []
        rounds = 2 # 每个地址测试2轮

        循环测试所有配置

        for config in self.es_configs:
        print(f"\n开始测试配置: {config['name']}")
        print("" 40)

        for round_num in range(1, rounds + 1):

        运行测试

        result = self.run_test(config, round_num, total_iterations)<br />
        results.append(result)<br />


        每轮结束后刷新索引

        print(f"\n第{round_num}轮测试完成,执行索引刷新...")<br />
        self.refresh_index(config)<br />


        重置ID计数器

        if round_num == 1:<br />
            # 第1轮:使用初始ID范围(新增数据)<br />
            print("第1轮:新增数据模式")<br />
        else:<br />
            # 第2轮:重复使用相同ID(更新数据模式)<br />
            print("第2轮:数据更新模式,复用第1轮ID")<br />
            self.current_prefix = self.id_prefix_start<br />
            self.current_suffix = self.id_suffix_start<br />


        print(f"{config['name']} 第{round_num}轮测试结束\n")

        输出对比结果

        print("\n性能对比结果:")
        print("=" * 80)

        按配置分组显示结果

        config_results = {}
        for result in results:
        config_name = result['config_name']
        if config_name not in config_results:
        config_results[config_name] = []
        config_results[config_name].append(result)

        for config_name, rounds_data in config_results.items():
        print(f"\n{config_name}:")
        total_time = 0
        total_bulk_rate = 0
        total_doc_rate = 0

        for round_data in rounds_data:
        print(f" 第{round_data['round']}轮:")
        print(f" 耗时: {round_data['total_time']:.2f}秒")
        print(f" Bulk速率: {round_data['bulk_rate']:.2f} bulk/秒")
        print(f" 文档速率: {round_data['doc_rate']:.2f} docs/秒")
        print(f" 成功率: {round_data['success_count']/(round_data['success_count']+round_data['error_count'])*100:.2f}%")

        total_time += round_data['total_time']
        total_bulk_rate += round_data['bulk_rate']
        total_doc_rate += round_data['doc_rate']

        avg_bulk_rate = total_bulk_rate / len(rounds_data)
        avg_doc_rate = total_doc_rate / len(rounds_data)

        print(f" 平均性能:")
        print(f" 总耗时: {total_time:.2f}秒")
        print(f" 平均Bulk速率: {avg_bulk_rate:.2f} bulk/秒")
        print(f" 平均文档速率: {avg_doc_rate:.2f} docs/秒")

        整体对比

        if len(config_results) >= 2:
        config_names = list(config_results.keys())
        config1_avg = sum([r['bulk_rate'] for r in config_results[config_names[0]]]) / len(config_results[config_names[0]])
        config2_avg = sum([r['bulk_rate'] for r in config_results[config_names[1]]]) / len(config_results[config_names[1]])

        if config1_avg > config2_avg:
        faster = config_names[0]
        rate_diff = config1_avg - config2_avg
        else:
        faster = config_names[1]
        rate_diff = config2_avg - config1_avg

        print(f"\n整体性能对比:")
        print(f"{faster} 平均性能更好,bulk速率高 {rate_diff:.2f} bulk/秒")
        print(f"性能提升: {(rate_diff/min(config1_avg, config2_avg)*100):.1f}%")


        def main():
        """主函数"""
        tester = ESBulkTester()

        运行测试(每次bulk 1万条,300次bulk = 300万条文档)

        tester.run_comparison_test(total_iterations=300)


        if name == "main":
        main()
        ```

        1. 日志场景:不带 id 写入


        测试条件:

  3. bulk 写入数据不带文档 id
  4. 每批次 bulk 10000 条数据,总共写入 30w 数据

    这里把

    反馈结果:

    <br /> 性能对比结果:<br /> ================================================================================<br /> <br /> ES直连:<br /> 第1轮:<br /> 耗时: 152.29秒<br /> Bulk速率: 1.97 bulk/秒<br /> 文档速率: 19699.59 docs/秒<br /> 成功率: 100.00%<br /> 平均性能:<br /> 总耗时: 152.29秒<br /> 平均Bulk速率: 1.97 bulk/秒<br /> 平均文档速率: 19699.59 docs/秒<br /> <br /> Gateway代理:<br /> 第1轮:<br /> 耗时: 115.63秒<br /> Bulk速率: 2.59 bulk/秒<br /> 文档速率: 25944.35 docs/秒<br /> 成功率: 100.00%<br /> 平均性能:<br /> 总耗时: 115.63秒<br /> 平均Bulk速率: 2.59 bulk/秒<br /> 平均文档速率: 25944.35 docs/秒<br /> <br /> 整体性能对比:<br /> Gateway代理 平均性能更好,bulk速率高 0.62 bulk/秒<br /> 性能提升: 31.7%<br />

    2. 业务场景:带文档 id 的写入


    测试条件:

  5. bulk 写入数据带有文档 id,两次测试写入的文档 id 生成规则一致且重复。
  6. 每批次 bulk 10000 条数据,总共写入 30w 数据

    这里把 py 脚本中 第 99 行 和 第 107 行的注释打开。

    反馈结果:

    <br /> 性能对比结果:<br /> ================================================================================<br /> <br /> ES直连:<br /> 第1轮:<br /> 耗时: 155.30秒<br /> Bulk速率: 1.93 bulk/秒<br /> 文档速率: 19317.39 docs/秒<br /> 成功率: 100.00%<br /> 平均性能:<br /> 总耗时: 155.30秒<br /> 平均Bulk速率: 1.93 bulk/秒<br /> 平均文档速率: 19317.39 docs/秒<br /> <br /> Gateway代理:<br /> 第1轮:<br /> 耗时: 116.73秒<br /> Bulk速率: 2.57 bulk/秒<br /> 文档速率: 25700.06 docs/秒<br /> 成功率: 100.00%<br /> 平均性能:<br /> 总耗时: 116.73秒<br /> 平均Bulk速率: 2.57 bulk/秒<br /> 平均文档速率: 25700.06 docs/秒<br /> <br /> 整体性能对比:<br /> Gateway代理 平均性能更好,bulk速率高 0.64 bulk/秒<br /> 性能提升: 33.0%<br />

    小结


    不管是日志场景还是业务价值更重要的大数据或者搜索数据同步场景, gateway 的写入加速都能平稳的节省 25%-30% 的写入耗时。

    关于极限网关(INFINI Gateway)


    ![](https://infinilabs.cn/img/blog ... 2x.png)

    INFINI Gateway 是一个开源的面向搜索场景的高性能数据网关,所有请求都经过网关处理后再转发到后端的搜索业务集群。基于 INFINI Gateway,可以实现索引级别的限速限流、常见查询的缓存加速、查询请求的审计、查询结果的动态修改等等。

    官网文档:<https://docs.infinilabs.com/gateway>;
    开源地址:<https://github.com/infinilabs/gateway>;

    作者:金多安,极限科技(INFINI Labs)搜索运维专家,Elastic 认证专家,搜索客社区日报责任编辑。一直从事与搜索运维相关的工作,日常会去挖掘 ES / Lucene 方向的搜索技术原理,保持搜索相关技术发展的关注。
    原文:https://infinilabs.cn/blog/202 ... tion/

喜报!极限科技 Coco AI 荣获 2025 首届人工智能应用创新大赛全国一等奖

资讯动态INFINI Labs 小助手 发表了文章 • 0 个评论 • 849 次浏览 • 3 天前 • 来自相关话题


由中国技术经济学会主办的 2025 首届全国人工智能应用创新大赛 总决赛于 6 月 22 日在湖南大学圆满落幕。该赛事以“场景驱动・创新创业”为主题,旨在考察参赛选手设计 AI 智能体(AI Agent)、应用人工智能大模型技术解决实际问题的能力。本届大赛吸引了全国 632 所高校及 210 家企业的 4913 支团队参赛,经过层层选拔,最终 689 支团队晋级全国总决赛。

![](https://infinilabs.cn/img/blog ... /1.jpg)

在激烈的竞争中,极限数据(北京)科技有限公司 Coco AI 团队凭借作品《Coco AI - 为现代团队打造的统一搜索与 AI 智能助手》,一路过关斩将,成功晋级全国总决赛,并在总决赛中脱颖而出,荣获一等奖。这一成绩的取得,不仅是对极限科技技术实力和创新能力的高度认可,更是对 Coco AI 产品价值的有力证明。

![](https://infinilabs.cn/img/blog ... /2.png)

[Coco AI](https://coco.rs) 产品以“让搜索更简单”为使命,通过先进的 AI 技术,为现代团队提供高效、智能的统一搜索解决方案。在大赛中,Coco AI 团队凭借其扎实的技术功底、创新的解决方案以及出色的现场表现,赢得了评委和观众的一致好评。

![](https://infinilabs.cn/img/blog ... /3.jpg)

此次获奖,是极限科技在人工智能领域取得的又一重要成果。我们将以此为契机,在人工智能快速发展的大背景下,持续优化 Coco AI 产品,不断提升其性能和用户体验。未来,极限科技将继续加大在人工智能领域的研发投入,助力企业释放数据价值,推动智能化搜索迈向新的未来。

关于 Coco AI


Coco AI 是一款完全开源的统一搜索与 AI 助手平台,专为现代企业设计。它通过统一搜索入口,连接企业内外部的异构数据源,融合大模型能力,帮助团队高效访问知识,智能决策协作。

针对企业数据“分散、敏感、难利用”的三大痛点,Coco AI 提供了全面解决方案:

  1. 统一搜索入口,连接所有数据源

    支持集成 S3、Notion、Google Workspace、语雀、GitHub 等,实现跨平台聚合搜索,打破信息孤岛。

  2. 开源可私有部署,保障数据安全

    平台完全开源,支持本地部署与离线运行,数据不出企业,满足高安全与合规要求。

  3. 融合大模型能力,激活沉睡数据

    支持接入多种主流大模型,实现自然语言问答、语义理解与智能推荐,为个人和组织构建个性化知识图谱。

  4. 模块化扩展,灵活接入外部工具

    通过 MCP 架构,模型可动态调用外部能力,无需修改平台代码,轻松适配多样业务场景。

    Coco AI,助力企业构建智能知识中枢,释放数据价值,优化决策与协作流程。

    官网:[https://coco.rs](https://coco.rs)
    GitHub:[https://github.com/infinilabs/coco-app](https://github.com/infinilabs/coco-app)

    关于极限科技(INFINI Labs)


    ![INFINI Labs](https://infinilabs.cn/img/blog ... bs.png)

    极限科技,全称极限数据(北京)科技有限公司,是一家专注于实时搜索与数据分析的软件公司。旗下品牌极限实验室(INFINI Labs)致力于打造极致易用的数据探索与分析体验。

    极限科技是一支年轻的团队,采用天然分布式的方式来进行远程协作,员工分布在全球各地,希望通过努力成为中国乃至全球企业大数据实时搜索分析产品的首选,为中国技术品牌输出添砖加瓦。

    官网:<https://infinilabs.cn>;

【搜索客社区日报】第2090期 (2025-08-07)

社区日报Se7en 发表了文章 • 0 个评论 • 725 次浏览 • 3 天前 • 来自相关话题

1.vLLM北京Meetup回顾,加速AI生产落地,共话推理前沿
https://mp.weixin.qq.com/s/7n8OYNrCC_I9SJaybHA_-Q
2.告别文字乱码!全新文生图模型Qwen-Image来咯
https://mp.weixin.qq.com/s/CdLk_CwFk5Ui-JAvHV9-8Q
3.理解 AI on K8S
https://mp.weixin.qq.com/s/PHVfC3hPgTz6nUR7OUic_g
4.AIBrix v0.4.0 版本发布:支持 P/D 分离与专家并行、KVCache v1 连接器、KV 事件同步及多引擎支持
https://aibrix.github.io/posts ... ease/

编辑:Se7en
更多资讯:http://news.searchkit.cn

【搜索客社区日报】第2091期 (2025-08-08)

社区日报Fred2000 发表了文章 • 0 个评论 • 479 次浏览 • 2 天前 • 来自相关话题

1、OpenAI 正式发布 GPT-5
https://www.oschina.net/news/364960/openai-gpt-5

2、Easysearch 集成阿里云与 Ollama Embedding API,构建端到端的语义搜索系统
https://searchkit.cn/article/15520

3、ES 调优帖:Gateway 批量写入性能优化实践
https://infinilabs.cn/blog/202 ... tion/

4、Context Engineering: 基于 OceanBase 的代码文档检索引擎
https://mp.weixin.qq.com/s/GPK_U7rc1PHUhcx5Ipnx6A

5、得物向量数据库落地实践
https://my.oschina.net/u/5783135/blog/18686778

编辑:Fred
更多资讯:http://news.searchkit.cn