ELK,萌萌哒
client

client

Spring Boot 集成 Easysearch 完整指南

EasysearchINFINI Labs 小助手 发表了文章 • 0 个评论 • 2727 次浏览 • 2024-12-29 15:41 • 来自相关话题

Easysearch 的很多用户都有这样的需要,之前是用的 ES,现在要迁移到 Easysearch,但是业务方使用的是 Spring Boot 集成的客户端,问是否能平滑迁移。

Easysearch 是完全兼容 Spring Boot 的,完全不用修改,本指南将探讨如何将 Spring Boot 和 ES 的 high-level 客户端 与 Easysearch 进行集成,涵盖从基础设置到实现 CRUD 操作和测试的所有内容。

服务器设置

首先,需要修改 Easysearch 节点的 easysearch.yml 文件,打开并配置这 2 个配置项:

elasticsearch.api_compatibility: true

#根据客户端版本配置版本号,我这里配置成 7.17.18
elasticsearch.api_compatibility_version: "7.17.18"

项目设置

然后,让我们设置 Maven 依赖。以下是 pom.xml 中的基本配置:

<properties>
    <java.version>11</java.version>
    <spring-data-elasticsearch.version>4.4.18</spring-data-elasticsearch.version>
    <elasticsearch.version>7.17.18</elasticsearch.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-elasticsearch</artifactId>
        <version>${spring-data-elasticsearch.version}</version>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>${elasticsearch.version}</version>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>${elasticsearch.version}</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
</dependencies>

客户端连接配置

完全和连接 Elasticsearch 的方式一样,不用修改:

配置 src/main/resources/application.yml 文件

spring:
  elasticsearch:
    rest:
      uris: https://localhost:9202
      username: admin
      password: xxxxxxxxxxx
    ssl:
      verification-mode: none

连接配置类

@Configuration
public class ElasticsearchConfig extends AbstractElasticsearchConfiguration {
    @Value("${spring.elasticsearch.rest.uris}")
    private String elasticsearchUrl;

    @Value("${spring.elasticsearch.rest.username}")
    private String username;

    @Value("${spring.elasticsearch.rest.password}")
    private String password;

    @Override
    @Bean
    public RestHighLevelClient elasticsearchClient() {
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials(username, password));

        SSLContext sslContext = SSLContexts.custom()
                .loadTrustMaterial(null, (x509Certificates, s) -> true)
                .build();

        RestClientBuilder builder = RestClient.builder(HttpHost.create(elasticsearchUrl))
                .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
                        .setDefaultCredentialsProvider(credentialsProvider)
                        .setSSLContext(sslContext)
                        .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE));

        return new RestHighLevelClient(builder);
    }
}

领域模型

使用 Spring 的 Elasticsearch 注解定义领域模型:

@Data
@Document(indexName = "products")
public class Product {
    @Id
    private String id;

    @Field(type = FieldType.Text, name = "name")
    private String name;

    @Field(type = FieldType.Double, name = "price")
    private Double price;
}

仓库层

创建继承 ElasticsearchRepository 的仓库接口:

@Repository
@EnableElasticsearchRepositories
public interface ProductRepository extends ElasticsearchRepository<Product, String> {
}

服务层

实现服务层来处理业务逻辑:

@Service
public class ProductService {
    private final ProductRepository productRepository;

    @Autowired
    public ProductService(ProductRepository productRepository) {
        this.productRepository = productRepository;
    }

    public Product saveProduct(Product product) {
        return productRepository.save(product);
    }

    public Product findProductById(String id) {
        return productRepository.findById(id).orElse(null);
    }
}

测试

编写集成测试类:

@SpringBootTest
public class ProductServiceIntegrationTest {
    @Autowired
    private ElasticsearchOperations elasticsearchOperations;

    @Autowired
    private ProductService productService;

    private static final String INDEX_NAME = "products";

    @BeforeEach
    public void setUp() {
        IndexOperations indexOperations = elasticsearchOperations.indexOps(IndexCoordinates.of(INDEX_NAME));
        if (indexOperations.exists()) {
            indexOperations.delete();
        }

        // 定义 mapping
        Document mapping = Document.create()
                .append("properties", Document.create()
                        .append("name", Document.create()
                                .append("type", "text")
                                .append("analyzer", "standard"))
                        .append("price", Document.create()
                                .append("type", "double")));

        // 创建索引并应用 mapping
        indexOperations.create(Collections.EMPTY_MAP, mapping);
    }

    @Test
    public void testSaveAndFindProduct() {
         List<Product> products = Arrays.asList(
                new Product("Test Product 1", 99.99),
                new Product("Test Product 2", 199.99),
                new Product("Test Product 3", 299.99)
        );

        List<IndexQuery> queries = products.stream()
            .map(product -> new IndexQueryBuilder()
                .withObject(product)
                .withIndex(INDEX_NAME)
                .build())
            .collect(Collectors.toList());

        List<IndexedObjectInformation> indexedInfos = elasticsearchOperations.bulkIndex(
            queries,
            IndexCoordinates.of(INDEX_NAME)
        );

        // 验证结果
        List<String> ids = indexedInfos.stream()
            .map(IndexedObjectInformation::getId)
            .collect(Collectors.toList());

        assertFalse(ids.isEmpty());
        assertEquals(products.size(), ids.size());
    }
}

结论

本指南展示了 Easysearch 与 Elasticsearch 的高度兼容性:

  1. 配置方式相同,仅需启用 Easysearch 的 API 兼容模式。
  2. 可直接使用现有 Elasticsearch 客户端。
  3. Maven 依赖无需更改。
  4. API、注解和仓库接口完全兼容。
  5. 现有测试代码可直接应用。

这种兼容性使得从 Elasticsearch 迁移到 Easysearch 成为一个简单、低风险的过程。Spring Boot 项目可以几乎无缝地切换到 Easysearch,同时获得其性能和资源利用方面的优势。

关于 Easysearch

INFINI Easysearch 是一个分布式的搜索型数据库,实现非结构化数据检索、全文检索、向量检索、地理位置信息查询、组合索引查询、多语种支持、聚合分析等。Easysearch 可以完美替代 Elasticsearch,同时添加和完善多项企业级功能。Easysearch 助您拥有简洁、高效、易用的搜索体验。

官网文档:https://infinilabs.cn/docs/latest/easysearch

作者:张磊,极限科技(INFINI Labs)搜索引擎研发负责人,对 Elasticsearch 和 Lucene 源码比较熟悉,目前主要负责公司的 Easysearch 产品的研发以及客户服务工作。
原文:https://infinilabs.cn/blog/2024/use-spring-boot-for-easysearch-connection/

Easysearch Java SDK 2.0.x 使用指南(一)

EasysearchINFINI Labs 小助手 发表了文章 • 0 个评论 • 2168 次浏览 • 2024-12-14 17:50 • 来自相关话题

各位 Easysearch 的小伙伴们,我们前一阵刚把 easysearch-client 更新到了 2.0.2 版本!借此详细介绍下新版客户端的使用。

新版客户端和 1.0 版本相比,完全重构,抛弃了旧版客户端的一些历史包袱,从里到外都焕然一新!不管是刚入门的小白还是经验丰富的老司机,2.0.x 客户端都能让你开发效率蹭蹭往上涨!

到底有啥新东西?

  • 更轻更快: 以前的版本依赖了一堆乱七八糟的东西,现在好了,我们把那些没用的都砍掉了,客户端变得更苗条,性能也杠杠的!
  • 类型安全,告别迷糊: 常用的 Easysearch API 现在都配上了强类型的请求和响应对象,再也不用担心写错参数类型了,代码也更好看了,维护起来也更省心!
  • 同步异步,想咋用咋用: 所有 API 都支持同步和异步两种调用方式,不管是啥场景,都能轻松应对!
  • 构建查询,跟搭积木一样简单: 我们用了流式构建器和函数式编程,构建复杂查询的时候,代码写起来那叫一个流畅,看着也舒服!
  • 和 Jackson 无缝对接: 可以轻松地把你的 Java 类和客户端 API 关联起来,数据转换嗖嗖的快!

快速上手

废话不多说,咱们直接上干货!这部分教你怎么快速安装和使用 easysearch-client 2.0.2 客户端,还会演示一些基本操作。

安装

easysearch-client 2.0.2 已经上传到 Maven 中央仓库了,加到你的项目里超级方便。

最低要求: JDK 8 或者更高版本

依赖管理: 客户端内部用 Jackson 来处理对象映射。

Maven 项目

在你的 pom.xml 文件的 <dependencies> 里面加上这段:

<dependencies>
    <dependency>
        <groupId>com.infinilabs</groupId>
        <artifactId>easysearch-client</artifactId>
        <version>2.0.2</version>
    </dependency>
</dependencies>

Gradle 项目

在你的 build.gradle 文件的 dependencies 里面加上这段:

dependencies {
    implementation 'com.infinilabs:easysearch-client:2.0.2'
}

初始化客户端

下面这段代码演示了怎么初始化一个启用了安全通信加密和 security 的 Easysearch 客户端,看起来有点长,别慌,我们一步一步解释!

 public static EasysearchClient create() throws NoSuchAlgorithmException, KeyStoreException,
        KeyManagementException {

        final HttpHost[] hosts = new HttpHost[]{new HttpHost("localhost", 9200, "https")};

        final SSLContext sslContext = SSLContextBuilder.create()
            .loadTrustMaterial(null, (chains, authType) -> true).build();
        SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sslContext, NoopHostnameVerifier.INSTANCE);

        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("username", "passwowd"));

        RestClient restClient = RestClient.builder(hosts)
            .setHttpClientConfigCallback(httpClientBuilder ->
                httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
                    .setSSLStrategy(sessionStrategy)
                    .disableAuthCaching()
            ).setRequestConfigCallback(requestConfigCallback ->
            requestConfigCallback.setConnectTimeout(30000).setSocketTimeout(300000))
            .build();

        EasysearchTransport transport = new RestClientTransport(
            restClient, new JacksonJsonpMapper());
        return new EasysearchClient(transport);
    }

这段代码,简单来说,就是:

  1. 连上 Easysearch: 我们要用 HTTPS 连接到本地的 9200 端口。
  2. 搞定证书: 这里为了方便,我们信任了所有证书(注意!生产环境一定要配置好你们自己的证书)。
  3. 填上用户名密码: 这里需要填上你的用户名和密码。
  4. 设置连接参数: 设置了连接超时时间(30 秒)和读取超时时间(300 秒)。
  5. 创建客户端: 最后,我们就创建好了一个 EasysearchClient 实例,可以开始干活了!

举个栗子:批量操作

下面的例子演示了怎么用 bulk API 来批量索引数据:

 public static void bulk() throws Exception {

        String json2 = "{"
            + "    \"@timestamp\": \"2023-01-08T22:50:13.059Z\","
            + "    \"agent\": {"
            + "      \"version\": \"7.3.2\","
            + "      \"type\": \"filebeat\","
            + "      \"ephemeral_id\": \"3ff1f2c8-1f7f-48c2-b560-4272591b8578\","
            + "      \"hostname\": \"ba-0226-msa-fbl-747db69c8d-ngff6\""
            + "    }"
            + "}";

        EasysearchClient client = create();

        BulkRequest.Builder br = new BulkRequest.Builder();
        br.index("test1");
        for (int i = 0; i < 10; i++) {
            BulkOperation.Builder builder = new BulkOperation.Builder();
            IndexOperation.Builder indexBuilder = new IndexOperation.Builder();
            builder.index(indexBuilder.document(JsonData.fromJson(json2)).build());
            br.operations(builder.build());
        }

        for (int i = 0; i < 10; i++) {
            BulkOperation.Builder builder = new BulkOperation.Builder();
            IndexOperation.Builder indexBuilder = new IndexOperation.Builder();
            indexBuilder.document(JsonData.fromJson(json2)).index("test2");
            builder.index(indexBuilder.build());
            br.operations(builder.build());
        }

        for (int i = 0; i < 10; i++) {
            Map<String, Object> map = new HashMap<>();
            map.put("@timestamp", "2023-01-08T22:50:13.059Z");
            map.put("field1", "value1");
            IndexOperation.Builder indexBuilder = new IndexOperation.Builder();
            indexBuilder.document(map).index("test3");
            br.operations(new BulkOperation(indexBuilder.build()));
        }

        BulkResponse bulkResponse = client.bulk(br.build());
        if (bulkResponse.errors()) {
            for (BulkResponseItem item : bulkResponse.items()) {
                System.out.println(item.toString());
            }
        }
        client._transport().close();

    }

这个例子里,我们一口气把数据批量索引到了 test1test2test3 这三个索引里, 并且展示了三种在 bulk API 中构建 IndexOperation 的方式,虽然它们最终都能实现将文档索引到 Easysearch,但在使用场景和灵活性上还是有一些区别的:

这段代码的核心是利用 BulkRequest.Builder 来构建一个批量请求,并通过 br.operations(...) 方法添加多个操作。而每个操作,在这个例子里,都是一个 IndexOperation,也就是索引一个文档。IndexOperation 可以通过 IndexOperation.Builder 来创建。

三种方式的区别主要体现在如何构建 IndexOperation 里的 document 部分,也就是要索引的文档内容。

第一种方式:使用 JsonData.fromJson(json2) 且不指定索引。

特点:
使用 JsonData.fromJson(json2) 将一个 JSON 字符串直接转换成 JsonData 对象作为文档内容。
这里没有在 IndexOperation.Builder 上调用 index() 方法来指定索引名称。由于没有在每个 IndexOperation 中指定索引,这个索引名称将回退到 BulkRequest.Builder 上设置的索引,即 br.index("test1"),所以这 10 个文档都会被索引到 test1。
当你需要将一批相同结构的 JSON 文档索引到同一个索引时,这种方式比较简洁。

第二种方式:使用 JsonData.fromJson(json2) 并指定索引

特点:
同样使用 JsonData.fromJson(json2) 将 JSON 字符串转换成 JsonData 对象。
关键区别在于,这里在 IndexOperation.Builder 上调用了 index("test2"),为每个操作单独指定了索引名称。
这 10 个文档会被索引到 test2,即使 BulkRequest.Builder 上设置了 index("test1") 也没用,因为 IndexOperation 里的设置优先级更高。
当你需要将一批相同结构的 JSON 文档索引到不同的索引时,就需要使用这种方式来分别指定索引。

第三种方式:使用 Map<String, Object> 并指定索引

特点:
使用 Map<String, Object> 来构建文档内容,这种方式更加灵活,可以构建任意结构的文档。
同样在 IndexOperation.Builder 上调用了 index("test3") 指定了索引名称。 使用 new BulkOperation(indexBuilder.build()) 代替之前的 builder.index(indexBuilder.build()), 这是等价的。 这 10 个文档会被索引到 test3。
当你需要索引的文档结构不固定,或者你需要动态构建文档内容时,使用 Map 是最佳选择。例如,你可以根据不同的业务逻辑,往 Map 里添加不同的字段。

总结

这次 easysearch-client 2.0.x Java 客户端的更新真的很给力,强烈建议大家升级体验!相信我,用了新版客户端,你的开发效率绝对会提升一大截!


想要了解更多?

大家有啥问题或者建议,也欢迎随时反馈!

作者:张磊,极限科技(INFINI Labs)搜索引擎研发负责人,对 Elasticsearch 和 Lucene 源码比较熟悉,目前主要负责公司的 Easysearch 产品的研发以及客户服务工作。

highlevel client(7.10) 查询took>1000时,_shard.successful=0

回复

Elasticsearchaiolos 发起了问题 • 1 人关注 • 0 个回复 • 1693 次浏览 • 2021-02-24 15:59 • 来自相关话题

es client节点设置问题 。。

Elasticsearchrochy 回复了问题 • 3 人关注 • 1 个回复 • 3494 次浏览 • 2019-04-14 21:56 • 来自相关话题

判断TransPortClient是否存活

Elasticsearchrochy 回复了问题 • 3 人关注 • 1 个回复 • 3082 次浏览 • 2019-02-28 15:14 • 来自相关话题

5.1.2 client做成单例的一直不关闭会出问题吗

Elasticsearchnovia 回复了问题 • 2 人关注 • 2 个回复 • 6762 次浏览 • 2017-06-02 10:46 • 来自相关话题

ES5.X之后该如何选择client?

回复

Elasticsearchsailfish 发起了问题 • 1 人关注 • 0 个回复 • 4691 次浏览 • 2017-04-09 12:31 • 来自相关话题

Pandasticsearch: An Elasticsearch client exposing DataFrame API

Elasticsearchonesuper 发表了文章 • 0 个评论 • 6284 次浏览 • 2016-11-08 18:02 • 来自相关话题

https://github.com/onesuper/pandasticsearch  
# Create a DataFrame object
from pandasticsearch import DataFrame
df = DataFrame.from_es('http://localhost:9200', index='people')

# Print the schema(mapping) of the index
df.print_schema()
# company
# |-- employee
#   |-- name: {'index': 'not_analyzed', 'type': 'string'}
#   |-- age: {'type': 'integer'}
#   |-- gender: {'index': 'not_analyzed', 'type': 'string'}

# Inspect the columns
df.columns
#['name', 'age', 'gender']

# Get the column
df.name
# Column('name')

# Filter
df.filter(df.age < 13).collect()
# [Row(age=12,gender='female',name='Alice'), Row(age=11,gender='male',name='Bob')]

# Project
df.filter(df.age < 25).select('name', 'age').collect()
# [Row(age=12,name='Alice'), Row(age=11,name='Bob'), Row(age=13,name='Leo')]

# Print the rows into console
df.filter(df.age < 25).select('name').show(3)
# +------+
# | name |
# +------+
# | Alice|
# | Bob  |
# | Leo  |
# +------+

# Sort
df.sort(df.age.asc).select('name', 'age').collect()
#[Row(age=11,name='Bob'), Row(age=12,name='Alice'), Row(age=13,name='Leo')]

# Aggregate
df[df.gender == 'male'].agg(df.age.avg).collect()
# [Row(avg(age)=12)]

# Groupby
df.groupby('gender').collect()
# [Row(doc_count=1), Row(doc_count=2)]

# Groupby and then aggregate
df.groupby('gender').agg(df.age.max).collect()
# [Row(doc_count=1, max(age)=12), Row(doc_count=2, max(age)=13)]

# Convert to Pandas object for subsequent analysis
df[df.gender == 'male'].agg(df.age.avg).to_pandas()
#    avg(age)
# 0        12

使用node client报以下错误,使用transport client 就没有错误

回复

ElasticsearchJellybean 发起了问题 • 1 人关注 • 0 个回复 • 6719 次浏览 • 2016-01-20 15:24 • 来自相关话题

请问在应用中使用node client还是transport client比较好?

Elasticsearchzttech 回复了问题 • 5 人关注 • 3 个回复 • 6144 次浏览 • 2016-01-19 14:03 • 来自相关话题

highlevel client(7.10) 查询took>1000时,_shard.successful=0

回复

Elasticsearchaiolos 发起了问题 • 1 人关注 • 0 个回复 • 1693 次浏览 • 2021-02-24 15:59 • 来自相关话题

es client节点设置问题 。。

回复

Elasticsearchrochy 回复了问题 • 3 人关注 • 1 个回复 • 3494 次浏览 • 2019-04-14 21:56 • 来自相关话题

判断TransPortClient是否存活

回复

Elasticsearchrochy 回复了问题 • 3 人关注 • 1 个回复 • 3082 次浏览 • 2019-02-28 15:14 • 来自相关话题

5.1.2 client做成单例的一直不关闭会出问题吗

回复

Elasticsearchnovia 回复了问题 • 2 人关注 • 2 个回复 • 6762 次浏览 • 2017-06-02 10:46 • 来自相关话题

ES5.X之后该如何选择client?

回复

Elasticsearchsailfish 发起了问题 • 1 人关注 • 0 个回复 • 4691 次浏览 • 2017-04-09 12:31 • 来自相关话题

使用node client报以下错误,使用transport client 就没有错误

回复

ElasticsearchJellybean 发起了问题 • 1 人关注 • 0 个回复 • 6719 次浏览 • 2016-01-20 15:24 • 来自相关话题

请问在应用中使用node client还是transport client比较好?

回复

Elasticsearchzttech 回复了问题 • 5 人关注 • 3 个回复 • 6144 次浏览 • 2016-01-19 14:03 • 来自相关话题

Spring Boot 集成 Easysearch 完整指南

EasysearchINFINI Labs 小助手 发表了文章 • 0 个评论 • 2727 次浏览 • 2024-12-29 15:41 • 来自相关话题

Easysearch 的很多用户都有这样的需要,之前是用的 ES,现在要迁移到 Easysearch,但是业务方使用的是 Spring Boot 集成的客户端,问是否能平滑迁移。

Easysearch 是完全兼容 Spring Boot 的,完全不用修改,本指南将探讨如何将 Spring Boot 和 ES 的 high-level 客户端 与 Easysearch 进行集成,涵盖从基础设置到实现 CRUD 操作和测试的所有内容。

服务器设置

首先,需要修改 Easysearch 节点的 easysearch.yml 文件,打开并配置这 2 个配置项:

elasticsearch.api_compatibility: true

#根据客户端版本配置版本号,我这里配置成 7.17.18
elasticsearch.api_compatibility_version: "7.17.18"

项目设置

然后,让我们设置 Maven 依赖。以下是 pom.xml 中的基本配置:

<properties>
    <java.version>11</java.version>
    <spring-data-elasticsearch.version>4.4.18</spring-data-elasticsearch.version>
    <elasticsearch.version>7.17.18</elasticsearch.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-elasticsearch</artifactId>
        <version>${spring-data-elasticsearch.version}</version>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>${elasticsearch.version}</version>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>${elasticsearch.version}</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
</dependencies>

客户端连接配置

完全和连接 Elasticsearch 的方式一样,不用修改:

配置 src/main/resources/application.yml 文件

spring:
  elasticsearch:
    rest:
      uris: https://localhost:9202
      username: admin
      password: xxxxxxxxxxx
    ssl:
      verification-mode: none

连接配置类

@Configuration
public class ElasticsearchConfig extends AbstractElasticsearchConfiguration {
    @Value("${spring.elasticsearch.rest.uris}")
    private String elasticsearchUrl;

    @Value("${spring.elasticsearch.rest.username}")
    private String username;

    @Value("${spring.elasticsearch.rest.password}")
    private String password;

    @Override
    @Bean
    public RestHighLevelClient elasticsearchClient() {
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials(username, password));

        SSLContext sslContext = SSLContexts.custom()
                .loadTrustMaterial(null, (x509Certificates, s) -> true)
                .build();

        RestClientBuilder builder = RestClient.builder(HttpHost.create(elasticsearchUrl))
                .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
                        .setDefaultCredentialsProvider(credentialsProvider)
                        .setSSLContext(sslContext)
                        .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE));

        return new RestHighLevelClient(builder);
    }
}

领域模型

使用 Spring 的 Elasticsearch 注解定义领域模型:

@Data
@Document(indexName = "products")
public class Product {
    @Id
    private String id;

    @Field(type = FieldType.Text, name = "name")
    private String name;

    @Field(type = FieldType.Double, name = "price")
    private Double price;
}

仓库层

创建继承 ElasticsearchRepository 的仓库接口:

@Repository
@EnableElasticsearchRepositories
public interface ProductRepository extends ElasticsearchRepository<Product, String> {
}

服务层

实现服务层来处理业务逻辑:

@Service
public class ProductService {
    private final ProductRepository productRepository;

    @Autowired
    public ProductService(ProductRepository productRepository) {
        this.productRepository = productRepository;
    }

    public Product saveProduct(Product product) {
        return productRepository.save(product);
    }

    public Product findProductById(String id) {
        return productRepository.findById(id).orElse(null);
    }
}

测试

编写集成测试类:

@SpringBootTest
public class ProductServiceIntegrationTest {
    @Autowired
    private ElasticsearchOperations elasticsearchOperations;

    @Autowired
    private ProductService productService;

    private static final String INDEX_NAME = "products";

    @BeforeEach
    public void setUp() {
        IndexOperations indexOperations = elasticsearchOperations.indexOps(IndexCoordinates.of(INDEX_NAME));
        if (indexOperations.exists()) {
            indexOperations.delete();
        }

        // 定义 mapping
        Document mapping = Document.create()
                .append("properties", Document.create()
                        .append("name", Document.create()
                                .append("type", "text")
                                .append("analyzer", "standard"))
                        .append("price", Document.create()
                                .append("type", "double")));

        // 创建索引并应用 mapping
        indexOperations.create(Collections.EMPTY_MAP, mapping);
    }

    @Test
    public void testSaveAndFindProduct() {
         List<Product> products = Arrays.asList(
                new Product("Test Product 1", 99.99),
                new Product("Test Product 2", 199.99),
                new Product("Test Product 3", 299.99)
        );

        List<IndexQuery> queries = products.stream()
            .map(product -> new IndexQueryBuilder()
                .withObject(product)
                .withIndex(INDEX_NAME)
                .build())
            .collect(Collectors.toList());

        List<IndexedObjectInformation> indexedInfos = elasticsearchOperations.bulkIndex(
            queries,
            IndexCoordinates.of(INDEX_NAME)
        );

        // 验证结果
        List<String> ids = indexedInfos.stream()
            .map(IndexedObjectInformation::getId)
            .collect(Collectors.toList());

        assertFalse(ids.isEmpty());
        assertEquals(products.size(), ids.size());
    }
}

结论

本指南展示了 Easysearch 与 Elasticsearch 的高度兼容性:

  1. 配置方式相同,仅需启用 Easysearch 的 API 兼容模式。
  2. 可直接使用现有 Elasticsearch 客户端。
  3. Maven 依赖无需更改。
  4. API、注解和仓库接口完全兼容。
  5. 现有测试代码可直接应用。

这种兼容性使得从 Elasticsearch 迁移到 Easysearch 成为一个简单、低风险的过程。Spring Boot 项目可以几乎无缝地切换到 Easysearch,同时获得其性能和资源利用方面的优势。

关于 Easysearch

INFINI Easysearch 是一个分布式的搜索型数据库,实现非结构化数据检索、全文检索、向量检索、地理位置信息查询、组合索引查询、多语种支持、聚合分析等。Easysearch 可以完美替代 Elasticsearch,同时添加和完善多项企业级功能。Easysearch 助您拥有简洁、高效、易用的搜索体验。

官网文档:https://infinilabs.cn/docs/latest/easysearch

作者:张磊,极限科技(INFINI Labs)搜索引擎研发负责人,对 Elasticsearch 和 Lucene 源码比较熟悉,目前主要负责公司的 Easysearch 产品的研发以及客户服务工作。
原文:https://infinilabs.cn/blog/2024/use-spring-boot-for-easysearch-connection/

Easysearch Java SDK 2.0.x 使用指南(一)

EasysearchINFINI Labs 小助手 发表了文章 • 0 个评论 • 2168 次浏览 • 2024-12-14 17:50 • 来自相关话题

各位 Easysearch 的小伙伴们,我们前一阵刚把 easysearch-client 更新到了 2.0.2 版本!借此详细介绍下新版客户端的使用。

新版客户端和 1.0 版本相比,完全重构,抛弃了旧版客户端的一些历史包袱,从里到外都焕然一新!不管是刚入门的小白还是经验丰富的老司机,2.0.x 客户端都能让你开发效率蹭蹭往上涨!

到底有啥新东西?

  • 更轻更快: 以前的版本依赖了一堆乱七八糟的东西,现在好了,我们把那些没用的都砍掉了,客户端变得更苗条,性能也杠杠的!
  • 类型安全,告别迷糊: 常用的 Easysearch API 现在都配上了强类型的请求和响应对象,再也不用担心写错参数类型了,代码也更好看了,维护起来也更省心!
  • 同步异步,想咋用咋用: 所有 API 都支持同步和异步两种调用方式,不管是啥场景,都能轻松应对!
  • 构建查询,跟搭积木一样简单: 我们用了流式构建器和函数式编程,构建复杂查询的时候,代码写起来那叫一个流畅,看着也舒服!
  • 和 Jackson 无缝对接: 可以轻松地把你的 Java 类和客户端 API 关联起来,数据转换嗖嗖的快!

快速上手

废话不多说,咱们直接上干货!这部分教你怎么快速安装和使用 easysearch-client 2.0.2 客户端,还会演示一些基本操作。

安装

easysearch-client 2.0.2 已经上传到 Maven 中央仓库了,加到你的项目里超级方便。

最低要求: JDK 8 或者更高版本

依赖管理: 客户端内部用 Jackson 来处理对象映射。

Maven 项目

在你的 pom.xml 文件的 <dependencies> 里面加上这段:

<dependencies>
    <dependency>
        <groupId>com.infinilabs</groupId>
        <artifactId>easysearch-client</artifactId>
        <version>2.0.2</version>
    </dependency>
</dependencies>

Gradle 项目

在你的 build.gradle 文件的 dependencies 里面加上这段:

dependencies {
    implementation 'com.infinilabs:easysearch-client:2.0.2'
}

初始化客户端

下面这段代码演示了怎么初始化一个启用了安全通信加密和 security 的 Easysearch 客户端,看起来有点长,别慌,我们一步一步解释!

 public static EasysearchClient create() throws NoSuchAlgorithmException, KeyStoreException,
        KeyManagementException {

        final HttpHost[] hosts = new HttpHost[]{new HttpHost("localhost", 9200, "https")};

        final SSLContext sslContext = SSLContextBuilder.create()
            .loadTrustMaterial(null, (chains, authType) -> true).build();
        SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sslContext, NoopHostnameVerifier.INSTANCE);

        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("username", "passwowd"));

        RestClient restClient = RestClient.builder(hosts)
            .setHttpClientConfigCallback(httpClientBuilder ->
                httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
                    .setSSLStrategy(sessionStrategy)
                    .disableAuthCaching()
            ).setRequestConfigCallback(requestConfigCallback ->
            requestConfigCallback.setConnectTimeout(30000).setSocketTimeout(300000))
            .build();

        EasysearchTransport transport = new RestClientTransport(
            restClient, new JacksonJsonpMapper());
        return new EasysearchClient(transport);
    }

这段代码,简单来说,就是:

  1. 连上 Easysearch: 我们要用 HTTPS 连接到本地的 9200 端口。
  2. 搞定证书: 这里为了方便,我们信任了所有证书(注意!生产环境一定要配置好你们自己的证书)。
  3. 填上用户名密码: 这里需要填上你的用户名和密码。
  4. 设置连接参数: 设置了连接超时时间(30 秒)和读取超时时间(300 秒)。
  5. 创建客户端: 最后,我们就创建好了一个 EasysearchClient 实例,可以开始干活了!

举个栗子:批量操作

下面的例子演示了怎么用 bulk API 来批量索引数据:

 public static void bulk() throws Exception {

        String json2 = "{"
            + "    \"@timestamp\": \"2023-01-08T22:50:13.059Z\","
            + "    \"agent\": {"
            + "      \"version\": \"7.3.2\","
            + "      \"type\": \"filebeat\","
            + "      \"ephemeral_id\": \"3ff1f2c8-1f7f-48c2-b560-4272591b8578\","
            + "      \"hostname\": \"ba-0226-msa-fbl-747db69c8d-ngff6\""
            + "    }"
            + "}";

        EasysearchClient client = create();

        BulkRequest.Builder br = new BulkRequest.Builder();
        br.index("test1");
        for (int i = 0; i < 10; i++) {
            BulkOperation.Builder builder = new BulkOperation.Builder();
            IndexOperation.Builder indexBuilder = new IndexOperation.Builder();
            builder.index(indexBuilder.document(JsonData.fromJson(json2)).build());
            br.operations(builder.build());
        }

        for (int i = 0; i < 10; i++) {
            BulkOperation.Builder builder = new BulkOperation.Builder();
            IndexOperation.Builder indexBuilder = new IndexOperation.Builder();
            indexBuilder.document(JsonData.fromJson(json2)).index("test2");
            builder.index(indexBuilder.build());
            br.operations(builder.build());
        }

        for (int i = 0; i < 10; i++) {
            Map<String, Object> map = new HashMap<>();
            map.put("@timestamp", "2023-01-08T22:50:13.059Z");
            map.put("field1", "value1");
            IndexOperation.Builder indexBuilder = new IndexOperation.Builder();
            indexBuilder.document(map).index("test3");
            br.operations(new BulkOperation(indexBuilder.build()));
        }

        BulkResponse bulkResponse = client.bulk(br.build());
        if (bulkResponse.errors()) {
            for (BulkResponseItem item : bulkResponse.items()) {
                System.out.println(item.toString());
            }
        }
        client._transport().close();

    }

这个例子里,我们一口气把数据批量索引到了 test1test2test3 这三个索引里, 并且展示了三种在 bulk API 中构建 IndexOperation 的方式,虽然它们最终都能实现将文档索引到 Easysearch,但在使用场景和灵活性上还是有一些区别的:

这段代码的核心是利用 BulkRequest.Builder 来构建一个批量请求,并通过 br.operations(...) 方法添加多个操作。而每个操作,在这个例子里,都是一个 IndexOperation,也就是索引一个文档。IndexOperation 可以通过 IndexOperation.Builder 来创建。

三种方式的区别主要体现在如何构建 IndexOperation 里的 document 部分,也就是要索引的文档内容。

第一种方式:使用 JsonData.fromJson(json2) 且不指定索引。

特点:
使用 JsonData.fromJson(json2) 将一个 JSON 字符串直接转换成 JsonData 对象作为文档内容。
这里没有在 IndexOperation.Builder 上调用 index() 方法来指定索引名称。由于没有在每个 IndexOperation 中指定索引,这个索引名称将回退到 BulkRequest.Builder 上设置的索引,即 br.index("test1"),所以这 10 个文档都会被索引到 test1。
当你需要将一批相同结构的 JSON 文档索引到同一个索引时,这种方式比较简洁。

第二种方式:使用 JsonData.fromJson(json2) 并指定索引

特点:
同样使用 JsonData.fromJson(json2) 将 JSON 字符串转换成 JsonData 对象。
关键区别在于,这里在 IndexOperation.Builder 上调用了 index("test2"),为每个操作单独指定了索引名称。
这 10 个文档会被索引到 test2,即使 BulkRequest.Builder 上设置了 index("test1") 也没用,因为 IndexOperation 里的设置优先级更高。
当你需要将一批相同结构的 JSON 文档索引到不同的索引时,就需要使用这种方式来分别指定索引。

第三种方式:使用 Map<String, Object> 并指定索引

特点:
使用 Map<String, Object> 来构建文档内容,这种方式更加灵活,可以构建任意结构的文档。
同样在 IndexOperation.Builder 上调用了 index("test3") 指定了索引名称。 使用 new BulkOperation(indexBuilder.build()) 代替之前的 builder.index(indexBuilder.build()), 这是等价的。 这 10 个文档会被索引到 test3。
当你需要索引的文档结构不固定,或者你需要动态构建文档内容时,使用 Map 是最佳选择。例如,你可以根据不同的业务逻辑,往 Map 里添加不同的字段。

总结

这次 easysearch-client 2.0.x Java 客户端的更新真的很给力,强烈建议大家升级体验!相信我,用了新版客户端,你的开发效率绝对会提升一大截!


想要了解更多?

大家有啥问题或者建议,也欢迎随时反馈!

作者:张磊,极限科技(INFINI Labs)搜索引擎研发负责人,对 Elasticsearch 和 Lucene 源码比较熟悉,目前主要负责公司的 Easysearch 产品的研发以及客户服务工作。

Pandasticsearch: An Elasticsearch client exposing DataFrame API

Elasticsearchonesuper 发表了文章 • 0 个评论 • 6284 次浏览 • 2016-11-08 18:02 • 来自相关话题

https://github.com/onesuper/pandasticsearch  
# Create a DataFrame object
from pandasticsearch import DataFrame
df = DataFrame.from_es('http://localhost:9200', index='people')

# Print the schema(mapping) of the index
df.print_schema()
# company
# |-- employee
#   |-- name: {'index': 'not_analyzed', 'type': 'string'}
#   |-- age: {'type': 'integer'}
#   |-- gender: {'index': 'not_analyzed', 'type': 'string'}

# Inspect the columns
df.columns
#['name', 'age', 'gender']

# Get the column
df.name
# Column('name')

# Filter
df.filter(df.age < 13).collect()
# [Row(age=12,gender='female',name='Alice'), Row(age=11,gender='male',name='Bob')]

# Project
df.filter(df.age < 25).select('name', 'age').collect()
# [Row(age=12,name='Alice'), Row(age=11,name='Bob'), Row(age=13,name='Leo')]

# Print the rows into console
df.filter(df.age < 25).select('name').show(3)
# +------+
# | name |
# +------+
# | Alice|
# | Bob  |
# | Leo  |
# +------+

# Sort
df.sort(df.age.asc).select('name', 'age').collect()
#[Row(age=11,name='Bob'), Row(age=12,name='Alice'), Row(age=13,name='Leo')]

# Aggregate
df[df.gender == 'male'].agg(df.age.avg).collect()
# [Row(avg(age)=12)]

# Groupby
df.groupby('gender').collect()
# [Row(doc_count=1), Row(doc_count=2)]

# Groupby and then aggregate
df.groupby('gender').agg(df.age.max).collect()
# [Row(doc_count=1, max(age)=12), Row(doc_count=2, max(age)=13)]

# Convert to Pandas object for subsequent analysis
df[df.gender == 'male'].agg(df.age.avg).to_pandas()
#    avg(age)
# 0        12