
社区日报 第110期 (2017-11-24)
http://t.cn/RYzDIQw
2、ES冷热数据分离实践
http://t.cn/RjeBOfl
3、ELK搭建GPE监控预警系统
http://t.cn/RYzDusL
编辑:laoyang360
归档:https://elasticsearch.cn/article/392
订阅:https://tinyletter.com/elastic-daily
http://t.cn/RYzDIQw
2、ES冷热数据分离实践
http://t.cn/RjeBOfl
3、ELK搭建GPE监控预警系统
http://t.cn/RYzDusL
编辑:laoyang360
归档:https://elasticsearch.cn/article/392
订阅:https://tinyletter.com/elastic-daily
收起阅读 »

社区日报 第109期 (2017-11-23)
http://t.cn/R6SGWTP
2.一个数据精度引发的血案
http://t.cn/RY75Xu8
3.关于Elasticsearch部署中的内存注意事项
http://t.cn/RY75Oy6
编辑:金桥
归档:https://elasticsearch.cn/article/391
订阅:https://tinyletter.com/elastic-daily
http://t.cn/R6SGWTP
2.一个数据精度引发的血案
http://t.cn/RY75Xu8
3.关于Elasticsearch部署中的内存注意事项
http://t.cn/RY75Oy6
编辑:金桥
归档:https://elasticsearch.cn/article/391
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

社区日报 第108期 (2017-11-22)
http://t.cn/RYPBZVw
2.如何使用Elastic APM
http://t.cn/RYPrtHJ
3.理解Elasticsearch缓存从下面开始
http://t.cn/RYh7cbu
编辑:wt
归档:https://elasticsearch.cn/article/390
订阅:https://tinyletter.com/elastic-daily
http://t.cn/RYPBZVw
2.如何使用Elastic APM
http://t.cn/RYPrtHJ
3.理解Elasticsearch缓存从下面开始
http://t.cn/RYh7cbu
编辑:wt
归档:https://elasticsearch.cn/article/390
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

社区日报 第107期 (2017-11-21)
http://t.cn/RjdGkzi
2.图文详解如何部署一套线上的高商用ELK集群。
http://t.cn/RjdbuAb
3.使用ELK处理OSS访问日志详解。
http://t.cn/RjecYmf
编辑:叮咚光军
归档:https://elasticsearch.cn/article/389
订阅:https://tinyletter.com/elastic-daily
http://t.cn/RjdGkzi
2.图文详解如何部署一套线上的高商用ELK集群。
http://t.cn/RjdbuAb
3.使用ELK处理OSS访问日志详解。
http://t.cn/RjecYmf
编辑:叮咚光军
归档:https://elasticsearch.cn/article/389
订阅:https://tinyletter.com/elastic-daily
收起阅读 »

请问一下 集群的 heap总是不平均 这是不是有问题呢
在日常使用中
即使没有负载的情况下
es集群的各个机器的Heap总是相差比较大
请问这是不是有问题
在日常使用中
即使没有负载的情况下
es集群的各个机器的Heap总是相差比较大
请问这是不是有问题

社区日报 第106期 (2017-11-20)
http://t.cn/RjualK1
2.来看看logstash 6.0令人激动的改进:可视化、性能分析以及多管道支持。
http://t.cn/RjuoRuT
3.深入理解直方图
http://t.cn/RKgkOo1
编辑:cyberdak
归档:https://elasticsearch.cn/article/387
订阅:https://tinyletter.com/elastic-daily
http://t.cn/RjualK1
2.来看看logstash 6.0令人激动的改进:可视化、性能分析以及多管道支持。
http://t.cn/RjuoRuT
3.深入理解直方图
http://t.cn/RKgkOo1
编辑:cyberdak
归档:https://elasticsearch.cn/article/387
订阅:https://tinyletter.com/elastic-daily
收起阅读 »

社区日报 第105期 (2017-11-19)
http://t.cn/RjQuE5x
2.深入理解cluster allocation API,准确找到shard未正确分配的原因。
http://t.cn/RlrzTsD
3.(自备梯子)项目经理和程序猿如何能够愉快的达成一致的项目周期
http://t.cn/RjQueTd
编辑:至尊宝
归档:https://elasticsearch.cn/article/386
订阅:https://tinyletter.com/elastic-daily
http://t.cn/RjQuE5x
2.深入理解cluster allocation API,准确找到shard未正确分配的原因。
http://t.cn/RlrzTsD
3.(自备梯子)项目经理和程序猿如何能够愉快的达成一致的项目周期
http://t.cn/RjQueTd
编辑:至尊宝
归档:https://elasticsearch.cn/article/386
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

社区日报 第104期 (2017-11-18)
http://t.cn/RjOc0Rh
2、ES6.0只支持单个mapping type了
http://t.cn/RjOgAPZ
3、利用haystack库,用python对ES进行索引和查询操作
http://t.cn/RjO9yvi
4、只等你来 | Elastic Meetup 广州交流会
https://elasticsearch.cn/article/364
编辑:bsll
归档:https://elasticsearch.cn/article/385
订阅:https://tinyletter.com/elastic-daily
http://t.cn/RjOc0Rh
2、ES6.0只支持单个mapping type了
http://t.cn/RjOgAPZ
3、利用haystack库,用python对ES进行索引和查询操作
http://t.cn/RjO9yvi
4、只等你来 | Elastic Meetup 广州交流会
https://elasticsearch.cn/article/364
编辑:bsll
归档:https://elasticsearch.cn/article/385
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

社区日报 第103期 (2017-11-17)
http://t.cn/RS2kgfB
2、索引膨胀的原因大讨论
http://t.cn/RjxtXPs
3、旧闻新读 | IK分词作者林良益访谈实录
http://t.cn/RjomL5I
4、只等你来 | Elastic Meetup 广州交流会
https://elasticsearch.cn/article/364
编辑:laoyang360
归档:https://elasticsearch.cn/article/384
订阅:https://tinyletter.com/elastic-daily
http://t.cn/RS2kgfB
2、索引膨胀的原因大讨论
http://t.cn/RjxtXPs
3、旧闻新读 | IK分词作者林良益访谈实录
http://t.cn/RjomL5I
4、只等你来 | Elastic Meetup 广州交流会
https://elasticsearch.cn/article/364
编辑:laoyang360
归档:https://elasticsearch.cn/article/384
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

社区日报 第102期 (2017-11-16)
http://t.cn/RjSSp1T
2.通过Search Guard 为Elasticsearch 进行安全加固。
https://elasticsearch.cn/article/350
3.一个将ES查询结果以PDF,HTML或CSV形式导出的ES插件
http://t.cn/RjJsItO
4、Elastic Meetup 广州交流会
https://elasticsearch.cn/article/364
编辑:金桥
归档:https://elasticsearch.cn/article/382
订阅:https://tinyletter.com/elastic-daily
http://t.cn/RjSSp1T
2.通过Search Guard 为Elasticsearch 进行安全加固。
https://elasticsearch.cn/article/350
3.一个将ES查询结果以PDF,HTML或CSV形式导出的ES插件
http://t.cn/RjJsItO
4、Elastic Meetup 广州交流会
https://elasticsearch.cn/article/364
编辑:金桥
归档:https://elasticsearch.cn/article/382
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

Elasticsearch Java API 索引的增删改查(二)
本节介绍以下 CRUD API:
单文档 APIs
多文档 APIs
Multi Get API Bulk API
注意:所有的单文档的CRUD API,index参数只能接受单一的索引库名称,或者是一个指向单一索引库的alias。
Index API
Index API 允许我们存储一个JSON格式的文档,使数据可以被搜索。文档通过index、type、id唯一确定。我们可以自己提供一个id,或者也使用Index API 为我们自动生成一个。
这里有几种不同的方式来产生JSON格式的文档(document):
- 手动方式,使用原生的byte[]或者String
- 使用Map方式,会自动转换成与之等价的JSON
- 使用第三方库来序列化beans,如Jackson
- 使用内置的帮助类 XContentFactory.jsonBuilder()
手动方式
String json = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
实例
/**
* 手动生成JSON
*/
@Test
public void CreateJSON(){
String json = "{" +
"\"user\":\"fendo\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"Hell word\"" +
"}";
IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());
}
Map方式
Map是key:value数据类型,可以代表json结构.
Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate",new Date());
json.put("message","trying out Elasticsearch");
实例
/**
* 使用集合
*/
@Test
public void CreateList(){
Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate","2013-01-30");
json.put("message","trying out Elasticsearch");
IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());
}
序列化方式
ElasticSearch已经使用了jackson,可以直接使用它把javabean转为json.
import com.fasterxml.jackson.databind.*;
// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse
// generate json
byte[] json = mapper.writeValueAsBytes(yourbeaninstance);
实例
/**
* 使用JACKSON序列化
* @throws Exception
*/
@Test
public void CreateJACKSON() throws Exception{
CsdnBlog csdn=new CsdnBlog();
csdn.setAuthor("fendo");
csdn.setContent("这是JAVA书籍");
csdn.setTag("C");
csdn.setView("100");
csdn.setTitile("编程");
csdn.setDate(new Date().toString());
// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse
// generate json
byte[] json = mapper.writeValueAsBytes(csdn);
IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());
}
XContentBuilder帮助类方式
ElasticSearch提供了一个内置的帮助类XContentBuilder来产生JSON文档
// Index name
String _index = response.getIndex();
// Type name
String _type = response.getType();
// Document ID (generated or not)
String _id = response.getId();
// Version (if it's the first time you index this document, you will get: 1)
long _version = response.getVersion();
// status has stored current instance statement.
RestStatus status = response.status();
实例
/**
* 使用ElasticSearch 帮助类
* @throws IOException
*/
@Test
public void CreateXContentBuilder() throws IOException{
XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()
.field("user", "ccse")
.field("postDate", new Date())
.field("message", "this is Elasticsearch")
.endObject();
IndexResponse response = client.prepareIndex("fendo", "fendodata").setSource(builder).get();
System.out.println("创建成功!");
}
综合实例
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.junit.Before;
import org.junit.Test;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class CreateIndex {
private TransportClient client;
@Before
public void getClient() throws Exception{
//设置集群名称
Settings settings = Settings.builder().put("cluster.name", "my-application").build();// 集群名
//创建client
client = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
}
/**
* 手动生成JSON
*/
@Test
public void CreateJSON(){
String json = "{" +
"\"user\":\"fendo\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"Hell word\"" +
"}";
IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());
}
/**
* 使用集合
*/
@Test
public void CreateList(){
Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate","2013-01-30");
json.put("message","trying out Elasticsearch");
IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());
}
/**
* 使用JACKSON序列化
* @throws Exception
*/
@Test
public void CreateJACKSON() throws Exception{
CsdnBlog csdn=new CsdnBlog();
csdn.setAuthor("fendo");
csdn.setContent("这是JAVA书籍");
csdn.setTag("C");
csdn.setView("100");
csdn.setTitile("编程");
csdn.setDate(new Date().toString());
// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse
// generate json
byte[] json = mapper.writeValueAsBytes(csdn);
IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());
}
/**
* 使用ElasticSearch 帮助类
* @throws IOException
*/
@Test
public void CreateXContentBuilder() throws IOException{
XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()
.field("user", "ccse")
.field("postDate", new Date())
.field("message", "this is Elasticsearch")
.endObject();
IndexResponse response = client.prepareIndex("fendo", "fendodata").setSource(builder).get();
System.out.println("创建成功!");
}
}
你还可以通过startArray(string)和endArray()方法添加数组。.field()方法可以接受多种对象类型。你可以给它传递数字、日期、甚至其他XContentBuilder对象。
Get API
根据id查看文档:
GetResponse response = client.prepareGet("twitter", "tweet", "1").get();
更多请查看 rest get API 文档
配置线程
operationThreaded
设置为 true
是在不同的线程里执行此次操作
下面的例子是operationThreaded
设置为 false
:
GetResponse response = client.prepareGet("twitter", "tweet", "1")
.setOperationThreaded(false)
.get();
Delete API
根据ID删除:
DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();
更多请查看 delete API 文档
配置线程
operationThreaded
设置为 true
是在不同的线程里执行此次操作
下面的例子是operationThreaded
设置为 false
:
GetResponse response = client.prepareGet("twitter", "tweet", "1")
.setOperationThreaded(false)
.get();
DeleteResponse response = client.prepareDelete("twitter", "tweet", "1")
.setOperationThreaded(false)
.get();
Delete By Query API
通过查询条件删除
BulkByScrollResponse response =
DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("gender", "male")) //查询条件
.source("persons") //index(索引名)
.get(); //执行
long deleted = response.getDeleted(); //删除文档的数量
如果需要执行的时间比较长,可以使用异步的方式处理,结果在回调里面获取
DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("gender", "male")) //查询
.source("persons") //index(索引名)
.execute(new ActionListener<BulkByScrollResponse>() { //回调监听
@Override
public void onResponse(BulkByScrollResponse response) {
long deleted = response.getDeleted(); //删除文档的数量
}
@Override
public void onFailure(Exception e) {
// Handle the exception
}
});
Update API
有两种方式更新索引:
- 创建
UpdateRequest
,通过client发送; - 使用
prepareUpdate()
方法;
使用UpdateRequest
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("index");
updateRequest.type("type");
updateRequest.id("1");
updateRequest.doc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject());
client.update(updateRequest).get();
使用 prepareUpdate()
方法
这里官方的示例有问题,new Script()参数错误,所以一下代码是我自己写的(2017/11/10)
client.prepareUpdate("ttl", "doc", "1")
.setScript(new Script("ctx._source.gender = \"male\"" ,ScriptService.ScriptType.INLINE, null, null))//脚本可以是本地文件存储的,如果使用文件存储的脚本,需要设置 ScriptService.ScriptType.FILE
.get();
client.prepareUpdate("ttl", "doc", "1")
.setDoc(jsonBuilder() //合并到现有文档
.startObject()
.field("gender", "male")
.endObject())
.get();
Update by script
使用脚本更新文档
UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1")
.script(new Script("ctx._source.gender = \"male\""));
client.update(updateRequest).get();
Update by merging documents
合并文档
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
.doc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject());
client.update(updateRequest).get();
Upsert
更新插入,如果存在文档就更新,如果不存在就插入
IndexRequest indexRequest = new IndexRequest("index", "type", "1")
.source(jsonBuilder()
.startObject()
.field("name", "Joe Smith")
.field("gender", "male")
.endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
.doc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject())
.upsert(indexRequest); //如果不存在此文档 ,就增加 `indexRequest`
client.update(updateRequest).get();
如果 index/type/1
存在,类似下面的文档:
{
"name" : "Joe Dalton",
"gender": "male"
}
如果不存在,会插入新的文档:
{
"name" : "Joe Smith",
"gender": "male"
}
Multi Get API
一次获取多个文档
MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
.add("twitter", "tweet", "1") //一个id的方式
.add("twitter", "tweet", "2", "3", "4") //多个id的方式
.add("another", "type", "foo") //可以从另外一个索引获取
.get();
for (MultiGetItemResponse itemResponse : multiGetItemResponses) { //迭代返回值
GetResponse response = itemResponse.getResponse();
if (response.isExists()) { //判断是否存在
String json = response.getSourceAsString(); //_source 字段
}
}
更多请浏览REST multi get 文档
Bulk API
Bulk API,批量插入:
import static org.elasticsearch.common.xcontent.XContentFactory.*;
BulkRequestBuilder bulkRequest = client.prepareBulk();
// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
)
);
bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "another post")
.endObject()
)
);
BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
// process failures by iterating through each bulk response item
//处理失败
}
使用 Bulk Processor
BulkProcessor 提供了一个简单的接口,在给定的大小数量上定时批量自动请求
创建BulkProcessor
实例
首先创建BulkProcessor
实例
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
BulkProcessor bulkProcessor = BulkProcessor.builder(
client, //增加elasticsearch客户端
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) { ... } //调用bulk之前执行 ,例如你可以通过request.numberOfActions()方法知道numberOfActions
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) { ... } //调用bulk之后执行 ,例如你可以通过request.hasFailures()方法知道是否执行失败
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) { ... } //调用失败抛 Throwable
})
.setBulkActions(10000) //每次10000请求
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) //拆成5mb一块
.setFlushInterval(TimeValue.timeValueSeconds(5)) //无论请求数量多少,每5秒钟请求一次。
.setConcurrentRequests(1) //设置并发请求的数量。值为0意味着只允许执行一个请求。值为1意味着允许1并发请求。
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))//设置自定义重复请求机制,最开始等待100毫秒,之后成倍更加,重试3次,当一次或多次重复请求失败后因为计算资源不够抛出 EsRejectedExecutionException 异常,可以通过BackoffPolicy.noBackoff()方法关闭重试机制
.build();
BulkProcessor 默认设置
- bulkActions 1000
- bulkSize 5mb
- 不设置flushInterval
- concurrentRequests 为 1 ,异步执行
- backoffPolicy 重试 8次,等待50毫秒
增加requests
然后增加requests
到BulkProcessor
bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
关闭 Bulk Processor
当所有文档都处理完成,使用awaitClose
或 close
方法关闭BulkProcessor
:
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
或
bulkProcessor.close();
在测试中使用Bulk Processor
如果你在测试种使用Bulk Processor
可以执行同步方法
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
.setBulkActions(10000)
.setConcurrentRequests(0)
.build();
// Add your requests
bulkProcessor.add(/* Your requests */);
// Flush any remaining requests
bulkProcessor.flush();
// Or close the bulkProcessor if you don't need it anymore
bulkProcessor.close();
// Refresh your indices
client.admin().indices().prepareRefresh().get();
// Now you can start searching!
client.prepareSearch().get();
所有实例 已经上传到Git
更多请浏览 spring-boot-starter-es 开源项目
如何有任何问题请关注微信公众号给我留言
本节介绍以下 CRUD API:
单文档 APIs
多文档 APIs
Multi Get API Bulk API
注意:所有的单文档的CRUD API,index参数只能接受单一的索引库名称,或者是一个指向单一索引库的alias。
Index API
Index API 允许我们存储一个JSON格式的文档,使数据可以被搜索。文档通过index、type、id唯一确定。我们可以自己提供一个id,或者也使用Index API 为我们自动生成一个。
这里有几种不同的方式来产生JSON格式的文档(document):
- 手动方式,使用原生的byte[]或者String
- 使用Map方式,会自动转换成与之等价的JSON
- 使用第三方库来序列化beans,如Jackson
- 使用内置的帮助类 XContentFactory.jsonBuilder()
手动方式
String json = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
实例
/**
* 手动生成JSON
*/
@Test
public void CreateJSON(){
String json = "{" +
"\"user\":\"fendo\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"Hell word\"" +
"}";
IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());
}
Map方式
Map是key:value数据类型,可以代表json结构.
Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate",new Date());
json.put("message","trying out Elasticsearch");
实例
/**
* 使用集合
*/
@Test
public void CreateList(){
Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate","2013-01-30");
json.put("message","trying out Elasticsearch");
IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());
}
序列化方式
ElasticSearch已经使用了jackson,可以直接使用它把javabean转为json.
import com.fasterxml.jackson.databind.*;
// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse
// generate json
byte[] json = mapper.writeValueAsBytes(yourbeaninstance);
实例
/**
* 使用JACKSON序列化
* @throws Exception
*/
@Test
public void CreateJACKSON() throws Exception{
CsdnBlog csdn=new CsdnBlog();
csdn.setAuthor("fendo");
csdn.setContent("这是JAVA书籍");
csdn.setTag("C");
csdn.setView("100");
csdn.setTitile("编程");
csdn.setDate(new Date().toString());
// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse
// generate json
byte[] json = mapper.writeValueAsBytes(csdn);
IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());
}
XContentBuilder帮助类方式
ElasticSearch提供了一个内置的帮助类XContentBuilder来产生JSON文档
// Index name
String _index = response.getIndex();
// Type name
String _type = response.getType();
// Document ID (generated or not)
String _id = response.getId();
// Version (if it's the first time you index this document, you will get: 1)
long _version = response.getVersion();
// status has stored current instance statement.
RestStatus status = response.status();
实例
/**
* 使用ElasticSearch 帮助类
* @throws IOException
*/
@Test
public void CreateXContentBuilder() throws IOException{
XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()
.field("user", "ccse")
.field("postDate", new Date())
.field("message", "this is Elasticsearch")
.endObject();
IndexResponse response = client.prepareIndex("fendo", "fendodata").setSource(builder).get();
System.out.println("创建成功!");
}
综合实例
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.junit.Before;
import org.junit.Test;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class CreateIndex {
private TransportClient client;
@Before
public void getClient() throws Exception{
//设置集群名称
Settings settings = Settings.builder().put("cluster.name", "my-application").build();// 集群名
//创建client
client = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
}
/**
* 手动生成JSON
*/
@Test
public void CreateJSON(){
String json = "{" +
"\"user\":\"fendo\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"Hell word\"" +
"}";
IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());
}
/**
* 使用集合
*/
@Test
public void CreateList(){
Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate","2013-01-30");
json.put("message","trying out Elasticsearch");
IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());
}
/**
* 使用JACKSON序列化
* @throws Exception
*/
@Test
public void CreateJACKSON() throws Exception{
CsdnBlog csdn=new CsdnBlog();
csdn.setAuthor("fendo");
csdn.setContent("这是JAVA书籍");
csdn.setTag("C");
csdn.setView("100");
csdn.setTitile("编程");
csdn.setDate(new Date().toString());
// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse
// generate json
byte[] json = mapper.writeValueAsBytes(csdn);
IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());
}
/**
* 使用ElasticSearch 帮助类
* @throws IOException
*/
@Test
public void CreateXContentBuilder() throws IOException{
XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()
.field("user", "ccse")
.field("postDate", new Date())
.field("message", "this is Elasticsearch")
.endObject();
IndexResponse response = client.prepareIndex("fendo", "fendodata").setSource(builder).get();
System.out.println("创建成功!");
}
}
你还可以通过startArray(string)和endArray()方法添加数组。.field()方法可以接受多种对象类型。你可以给它传递数字、日期、甚至其他XContentBuilder对象。
Get API
根据id查看文档:
GetResponse response = client.prepareGet("twitter", "tweet", "1").get();
更多请查看 rest get API 文档
配置线程
operationThreaded
设置为 true
是在不同的线程里执行此次操作
下面的例子是operationThreaded
设置为 false
:
GetResponse response = client.prepareGet("twitter", "tweet", "1")
.setOperationThreaded(false)
.get();
Delete API
根据ID删除:
DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();
更多请查看 delete API 文档
配置线程
operationThreaded
设置为 true
是在不同的线程里执行此次操作
下面的例子是operationThreaded
设置为 false
:
GetResponse response = client.prepareGet("twitter", "tweet", "1")
.setOperationThreaded(false)
.get();
DeleteResponse response = client.prepareDelete("twitter", "tweet", "1")
.setOperationThreaded(false)
.get();
Delete By Query API
通过查询条件删除
BulkByScrollResponse response =
DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("gender", "male")) //查询条件
.source("persons") //index(索引名)
.get(); //执行
long deleted = response.getDeleted(); //删除文档的数量
如果需要执行的时间比较长,可以使用异步的方式处理,结果在回调里面获取
DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("gender", "male")) //查询
.source("persons") //index(索引名)
.execute(new ActionListener<BulkByScrollResponse>() { //回调监听
@Override
public void onResponse(BulkByScrollResponse response) {
long deleted = response.getDeleted(); //删除文档的数量
}
@Override
public void onFailure(Exception e) {
// Handle the exception
}
});
Update API
有两种方式更新索引:
- 创建
UpdateRequest
,通过client发送; - 使用
prepareUpdate()
方法;
使用UpdateRequest
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("index");
updateRequest.type("type");
updateRequest.id("1");
updateRequest.doc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject());
client.update(updateRequest).get();
使用 prepareUpdate()
方法
这里官方的示例有问题,new Script()参数错误,所以一下代码是我自己写的(2017/11/10)
client.prepareUpdate("ttl", "doc", "1")
.setScript(new Script("ctx._source.gender = \"male\"" ,ScriptService.ScriptType.INLINE, null, null))//脚本可以是本地文件存储的,如果使用文件存储的脚本,需要设置 ScriptService.ScriptType.FILE
.get();
client.prepareUpdate("ttl", "doc", "1")
.setDoc(jsonBuilder() //合并到现有文档
.startObject()
.field("gender", "male")
.endObject())
.get();
Update by script
使用脚本更新文档
UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1")
.script(new Script("ctx._source.gender = \"male\""));
client.update(updateRequest).get();
Update by merging documents
合并文档
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
.doc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject());
client.update(updateRequest).get();
Upsert
更新插入,如果存在文档就更新,如果不存在就插入
IndexRequest indexRequest = new IndexRequest("index", "type", "1")
.source(jsonBuilder()
.startObject()
.field("name", "Joe Smith")
.field("gender", "male")
.endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
.doc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject())
.upsert(indexRequest); //如果不存在此文档 ,就增加 `indexRequest`
client.update(updateRequest).get();
如果 index/type/1
存在,类似下面的文档:
{
"name" : "Joe Dalton",
"gender": "male"
}
如果不存在,会插入新的文档:
{
"name" : "Joe Smith",
"gender": "male"
}
Multi Get API
一次获取多个文档
MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
.add("twitter", "tweet", "1") //一个id的方式
.add("twitter", "tweet", "2", "3", "4") //多个id的方式
.add("another", "type", "foo") //可以从另外一个索引获取
.get();
for (MultiGetItemResponse itemResponse : multiGetItemResponses) { //迭代返回值
GetResponse response = itemResponse.getResponse();
if (response.isExists()) { //判断是否存在
String json = response.getSourceAsString(); //_source 字段
}
}
更多请浏览REST multi get 文档
Bulk API
Bulk API,批量插入:
import static org.elasticsearch.common.xcontent.XContentFactory.*;
BulkRequestBuilder bulkRequest = client.prepareBulk();
// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
)
);
bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "another post")
.endObject()
)
);
BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
// process failures by iterating through each bulk response item
//处理失败
}
使用 Bulk Processor
BulkProcessor 提供了一个简单的接口,在给定的大小数量上定时批量自动请求
创建BulkProcessor
实例
首先创建BulkProcessor
实例
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
BulkProcessor bulkProcessor = BulkProcessor.builder(
client, //增加elasticsearch客户端
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) { ... } //调用bulk之前执行 ,例如你可以通过request.numberOfActions()方法知道numberOfActions
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) { ... } //调用bulk之后执行 ,例如你可以通过request.hasFailures()方法知道是否执行失败
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) { ... } //调用失败抛 Throwable
})
.setBulkActions(10000) //每次10000请求
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) //拆成5mb一块
.setFlushInterval(TimeValue.timeValueSeconds(5)) //无论请求数量多少,每5秒钟请求一次。
.setConcurrentRequests(1) //设置并发请求的数量。值为0意味着只允许执行一个请求。值为1意味着允许1并发请求。
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))//设置自定义重复请求机制,最开始等待100毫秒,之后成倍更加,重试3次,当一次或多次重复请求失败后因为计算资源不够抛出 EsRejectedExecutionException 异常,可以通过BackoffPolicy.noBackoff()方法关闭重试机制
.build();
BulkProcessor 默认设置
- bulkActions 1000
- bulkSize 5mb
- 不设置flushInterval
- concurrentRequests 为 1 ,异步执行
- backoffPolicy 重试 8次,等待50毫秒
增加requests
然后增加requests
到BulkProcessor
bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
关闭 Bulk Processor
当所有文档都处理完成,使用awaitClose
或 close
方法关闭BulkProcessor
:
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
或
bulkProcessor.close();
在测试中使用Bulk Processor
如果你在测试种使用Bulk Processor
可以执行同步方法
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
.setBulkActions(10000)
.setConcurrentRequests(0)
.build();
// Add your requests
bulkProcessor.add(/* Your requests */);
// Flush any remaining requests
bulkProcessor.flush();
// Or close the bulkProcessor if you don't need it anymore
bulkProcessor.close();
// Refresh your indices
client.admin().indices().prepareRefresh().get();
// Now you can start searching!
client.prepareSearch().get();
所有实例 已经上传到Git
更多请浏览 spring-boot-starter-es 开源项目
如何有任何问题请关注微信公众号给我留言
收起阅读 »

Elasticsearch Java API - 客户端连接(TransportClient,PreBuiltXPackTransportClient)(一)
Elasticsearch Java API 客户端连接
一个是TransportClient
,一个是NodeClient
,还有一个XPackTransportClient
- TransportClient:
作为一个外部访问者,请求ES的集群,对于集群而言,它是一个外部因素。
- NodeClient
作为ES集群的一个节点,它是ES中的一环,其他的节点对它是感知的。
- XPackTransportClient:
服务安装了 x-pack
插件
重要:客户端版本应该和服务端版本保持一致
TransportClient旨在被Java高级REST客户端取代,该客户端执行HTTP请求而不是序列化的Java请求。 在即将到来的Elasticsearch版本中将不赞成使用TransportClient,建议使用Java高级REST客户端。
上面的警告比较尴尬,但是在 5xx版本中使用还是没有问题的,可能使用rest 客户端兼容性更好做一些。
Elasticsearch Java Rest API 手册
Maven Repository
Elasticsearch Java API包已经上传到 Maven Central
在pom.xml
文件中增加:
transport 版本号最好就是与Elasticsearch版本号一致。
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>5.6.3</version>
</dependency>
Transport Client
不设置集群名称
// on startup
//此步骤添加IP,至少一个,如果设置了"client.transport.sniff"= true 一个就够了,因为添加了自动嗅探配置
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300));
// on shutdown 关闭client
client.close();
设置集群名称
Settings settings = Settings.builder()
.put("cluster.name", "myClusterName").build(); //设置ES实例的名称
TransportClient client = new PreBuiltTransportClient(settings); //自动嗅探整个集群的状态,把集群中其他ES节点的ip添加到本地的客户端列表中
//Add transport addresses and do something with the client...
增加自动嗅探配置
Settings settings = Settings.builder()
.put("client.transport.sniff", true).build();
TransportClient client = new PreBuiltTransportClient(settings);
其他配置
client.transport.ignore_cluster_name //设置 true ,忽略连接节点集群名验证
client.transport.ping_timeout //ping一个节点的响应时间 默认5秒
client.transport.nodes_sampler_interval //sample/ping 节点的时间间隔,默认是5s
对于ES Client,有两种形式,一个是TransportClient,一个是NodeClient。两个的区别为: TransportClient作为一个外部访问者,通过HTTP去请求ES的集群,对于集群而言,它是一个外部因素。 NodeClient顾名思义,是作为ES集群的一个节点,它是ES中的一环,其他的节点对它是感知的,不像TransportClient那样,ES集群对它一无所知。NodeClient通信的性能会更好,但是因为是ES的一环,所以它出问题,也会给ES集群带来问题。NodeClient可以设置不作为数据节点,在elasticsearch.yml中设置,这样就不会在此节点上分配数据。
如果用ES的节点,仁者见仁智者见智。
实例
package name.quanke.es.study;
import name.quanke.es.study.util.Utils;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.junit.After;
import org.junit.Before;
import java.net.InetAddress;
/**
* Elasticsearch 5.5.1 的client 和 ElasticsearchTemplate的初始化
* 作为一个外部访问者,请求ES的集群,对于集群而言,它是一个外部因素。
* Created by http://quanke.name on 2017/11/10.
*/
public class ElasticsearchClient {
protected TransportClient client;
@Before
public void setUp() throws Exception {
Settings esSettings = Settings.builder()
.put("cluster.name", "utan-es") //设置ES实例的名称
.put("client.transport.sniff", true) //自动嗅探整个集群的状态,把集群中其他ES节点的ip添加到本地的客户端列表中
.build();
/**
* 这里的连接方式指的是没有安装x-pack插件,如果安装了x-pack则参考{@link ElasticsearchXPackClient}
* 1. java客户端的方式是以tcp协议在9300端口上进行通信
* 2. http客户端的方式是以http协议在9200端口上进行通信
*/
client = new PreBuiltTransportClient(esSettings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.1.10"), 9300));
System.out.println("ElasticsearchClient 连接成功");
}
@After
public void tearDown() throws Exception {
if (client != null) {
client.close();
}
}
protected void println(SearchResponse searchResponse) {
Utils.println(searchResponse);
}
}
本实例代码已经上传到 Git ElasticsearchClient.java
所有实例 已经上传到Git
XPackTransportClient
如果 ElasticSearch
服务安装了 x-pack
插件,需要PreBuiltXPackTransportClient
实例才能访问
使用Maven管理项目,把下面代码增加到pom.xml
;
一定要修改默认仓库地址为https://artifacts.elastic.co/maven ,因为这个库没有上传到Maven中央仓库,如果有自己的 maven ,请配置代理
<project ...>
<repositories>
<!-- add the elasticsearch repo -->
<repository>
<id>elasticsearch-releases</id>
<url>https://artifacts.elastic.co/maven</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
...
</repositories>
...
<dependencies>
<!-- add the x-pack jar as a dependency -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>x-pack-transport</artifactId>
<version>5.6.3</version>
</dependency>
...
</dependencies>
...
</project>
实例
/**
* Elasticsearch XPack Client
* Created by http://quanke.name on 2017/11/10.
*/
public class ElasticsearchXPackClient {
protected TransportClient client;
@Before
public void setUp() throws Exception {
/**
* 如果es集群安装了x-pack插件则以此种方式连接集群
* 1. java客户端的方式是以tcp协议在9300端口上进行通信
* 2. http客户端的方式是以http协议在9200端口上进行通信
*/
Settings settings = Settings.builder()
.put("xpack.security.user", "elastic:utan100")
.put("cluster.name", "utan-es")
.build();
client = new PreBuiltXPackTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.1.10"), 9300));
// final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
// credentialsProvider.setCredentials(AuthScope.ANY,
// new UsernamePasswordCredentials("elastic", "utan100"));
System.out.println("ElasticsearchXPackClient 启动成功");
}
@Test
public void testClientConnection() throws Exception {
System.out.println("--------------------------");
}
@After
public void tearDown() throws Exception {
if (client != null) {
client.close();
}
}
protected void println(SearchResponse searchResponse) {
Utils.println(searchResponse);
}
}
本实例代码已经上传到 Git ElasticsearchXPackClient.java
所有实例 已经上传到Git
更多请浏览 spring-boot-starter-es 开源项目
如何有任何问题请关注微信公众号给我留言
Elasticsearch Java API 客户端连接
一个是TransportClient
,一个是NodeClient
,还有一个XPackTransportClient
- TransportClient:
作为一个外部访问者,请求ES的集群,对于集群而言,它是一个外部因素。
- NodeClient
作为ES集群的一个节点,它是ES中的一环,其他的节点对它是感知的。
- XPackTransportClient:
服务安装了 x-pack
插件
重要:客户端版本应该和服务端版本保持一致
TransportClient旨在被Java高级REST客户端取代,该客户端执行HTTP请求而不是序列化的Java请求。 在即将到来的Elasticsearch版本中将不赞成使用TransportClient,建议使用Java高级REST客户端。
上面的警告比较尴尬,但是在 5xx版本中使用还是没有问题的,可能使用rest 客户端兼容性更好做一些。
Elasticsearch Java Rest API 手册
Maven Repository
Elasticsearch Java API包已经上传到 Maven Central
在pom.xml
文件中增加:
transport 版本号最好就是与Elasticsearch版本号一致。
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>5.6.3</version>
</dependency>
Transport Client
不设置集群名称
// on startup
//此步骤添加IP,至少一个,如果设置了"client.transport.sniff"= true 一个就够了,因为添加了自动嗅探配置
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300));
// on shutdown 关闭client
client.close();
设置集群名称
Settings settings = Settings.builder()
.put("cluster.name", "myClusterName").build(); //设置ES实例的名称
TransportClient client = new PreBuiltTransportClient(settings); //自动嗅探整个集群的状态,把集群中其他ES节点的ip添加到本地的客户端列表中
//Add transport addresses and do something with the client...
增加自动嗅探配置
Settings settings = Settings.builder()
.put("client.transport.sniff", true).build();
TransportClient client = new PreBuiltTransportClient(settings);
其他配置
client.transport.ignore_cluster_name //设置 true ,忽略连接节点集群名验证
client.transport.ping_timeout //ping一个节点的响应时间 默认5秒
client.transport.nodes_sampler_interval //sample/ping 节点的时间间隔,默认是5s
对于ES Client,有两种形式,一个是TransportClient,一个是NodeClient。两个的区别为: TransportClient作为一个外部访问者,通过HTTP去请求ES的集群,对于集群而言,它是一个外部因素。 NodeClient顾名思义,是作为ES集群的一个节点,它是ES中的一环,其他的节点对它是感知的,不像TransportClient那样,ES集群对它一无所知。NodeClient通信的性能会更好,但是因为是ES的一环,所以它出问题,也会给ES集群带来问题。NodeClient可以设置不作为数据节点,在elasticsearch.yml中设置,这样就不会在此节点上分配数据。
如果用ES的节点,仁者见仁智者见智。
实例
package name.quanke.es.study;
import name.quanke.es.study.util.Utils;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.junit.After;
import org.junit.Before;
import java.net.InetAddress;
/**
* Elasticsearch 5.5.1 的client 和 ElasticsearchTemplate的初始化
* 作为一个外部访问者,请求ES的集群,对于集群而言,它是一个外部因素。
* Created by http://quanke.name on 2017/11/10.
*/
public class ElasticsearchClient {
protected TransportClient client;
@Before
public void setUp() throws Exception {
Settings esSettings = Settings.builder()
.put("cluster.name", "utan-es") //设置ES实例的名称
.put("client.transport.sniff", true) //自动嗅探整个集群的状态,把集群中其他ES节点的ip添加到本地的客户端列表中
.build();
/**
* 这里的连接方式指的是没有安装x-pack插件,如果安装了x-pack则参考{@link ElasticsearchXPackClient}
* 1. java客户端的方式是以tcp协议在9300端口上进行通信
* 2. http客户端的方式是以http协议在9200端口上进行通信
*/
client = new PreBuiltTransportClient(esSettings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.1.10"), 9300));
System.out.println("ElasticsearchClient 连接成功");
}
@After
public void tearDown() throws Exception {
if (client != null) {
client.close();
}
}
protected void println(SearchResponse searchResponse) {
Utils.println(searchResponse);
}
}
本实例代码已经上传到 Git ElasticsearchClient.java
所有实例 已经上传到Git
XPackTransportClient
如果 ElasticSearch
服务安装了 x-pack
插件,需要PreBuiltXPackTransportClient
实例才能访问
使用Maven管理项目,把下面代码增加到pom.xml
;
一定要修改默认仓库地址为https://artifacts.elastic.co/maven ,因为这个库没有上传到Maven中央仓库,如果有自己的 maven ,请配置代理
<project ...>
<repositories>
<!-- add the elasticsearch repo -->
<repository>
<id>elasticsearch-releases</id>
<url>https://artifacts.elastic.co/maven</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
...
</repositories>
...
<dependencies>
<!-- add the x-pack jar as a dependency -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>x-pack-transport</artifactId>
<version>5.6.3</version>
</dependency>
...
</dependencies>
...
</project>
实例
/**
* Elasticsearch XPack Client
* Created by http://quanke.name on 2017/11/10.
*/
public class ElasticsearchXPackClient {
protected TransportClient client;
@Before
public void setUp() throws Exception {
/**
* 如果es集群安装了x-pack插件则以此种方式连接集群
* 1. java客户端的方式是以tcp协议在9300端口上进行通信
* 2. http客户端的方式是以http协议在9200端口上进行通信
*/
Settings settings = Settings.builder()
.put("xpack.security.user", "elastic:utan100")
.put("cluster.name", "utan-es")
.build();
client = new PreBuiltXPackTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.1.10"), 9300));
// final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
// credentialsProvider.setCredentials(AuthScope.ANY,
// new UsernamePasswordCredentials("elastic", "utan100"));
System.out.println("ElasticsearchXPackClient 启动成功");
}
@Test
public void testClientConnection() throws Exception {
System.out.println("--------------------------");
}
@After
public void tearDown() throws Exception {
if (client != null) {
client.close();
}
}
protected void println(SearchResponse searchResponse) {
Utils.println(searchResponse);
}
}
本实例代码已经上传到 Git ElasticsearchXPackClient.java
所有实例 已经上传到Git
更多请浏览 spring-boot-starter-es 开源项目
收起阅读 »如何有任何问题请关注微信公众号给我留言

Elastic Stack 全新推出 6.0.0
https://www.elastic.co/cn/blog/elastic-stack-6-0-0-released
全新推出 6.0.0。
无需多说。你应该立即下载试用,或者通过你最喜欢的托管式 Elasticsearch 和 Kibana 提供平台 Elastic Cloud 亲身体验。
如果你在过去几个月没有跟上我们的发布节奏,可能会对今天的公告感到意外。今天标志着成千上万的 pull 请求和成百上千位代码提交者的努力终见成效。期间共有两个 alpha 版本、两个 beta 版本、两个候选版本以及最终的通用版本 (GA)。这个里程碑离不开 Elastic 各路团队的努力。还要感谢参与先锋计划的用户提出的意见和反馈。
今天,我们不仅发布了整套 Elastic Stack,还发布了 Elastic Cloud Enterprise 1.1,其中包括 6.0 支持、离线安装,并且对用户体验进行了一系列改进,旨在简化集群的配置、管理和监控。同天发布多款产品的正式版本还不够……还有仍是 Alpha 版本的 APM ,我们邀请大家在 6.0.0 中对它进行测试。
一个版本有如此多的亮点,该从哪里说起呢?你们撰文细述也好,提供详情链接也好,祝你们有愉快的阅读体验……更重要的是……祝你们有愉快的搜索、分析和可视化体验。
Elasticsearch
全新零停机升级体验,增加了序列 ID、改进了对稀疏数据的处理、加快了查询速度、分布式执行 watch 等等。功能摘要请查看详情。
Kibana
支持 “Dashboard Only” 模式,支持 “全屏” 模式,能够将保存的搜索结果导出到 .csv,X-Pack 黄金版及以上版本支持通过 UI 创建告警,X-Pack 基础版提供迁移助手,我们还通过调整对比度、支持快捷键来产品易用性,让用户使用起来更方便。数据交互的未来详见此贴。
Logstash
单一 Logstash 实例中可存在多个自成体系的管道,另有新增 UI 组件 - X-Pack 基础版中的管道查看器,以及 X-Pack 黄金版中的 Logstash 管道管理。了解详情,点这里。
Beats
Beats <3 容器以及 Beats <3 模块(并且改进了适用于这些模块的仪表板)。再结合全新命令和配置布局,在 Metricbeat 实现更高效的存储。此外,全新推出 Auditbeat。细节详见这里。
ES-Hadoop
对Spark的结构化数据流的一流支持已经降落到了 6.0,并重新编写了连接器映射代码以更好地支持多个映射。支持读写新的连接字段也被添加了。用户现在也可以利用非内联脚本类型的更新操作。详细信息。
立即获取!
https://www.elastic.co/cn/blog/elastic-stack-6-0-0-released
全新推出 6.0.0。
无需多说。你应该立即下载试用,或者通过你最喜欢的托管式 Elasticsearch 和 Kibana 提供平台 Elastic Cloud 亲身体验。
如果你在过去几个月没有跟上我们的发布节奏,可能会对今天的公告感到意外。今天标志着成千上万的 pull 请求和成百上千位代码提交者的努力终见成效。期间共有两个 alpha 版本、两个 beta 版本、两个候选版本以及最终的通用版本 (GA)。这个里程碑离不开 Elastic 各路团队的努力。还要感谢参与先锋计划的用户提出的意见和反馈。
今天,我们不仅发布了整套 Elastic Stack,还发布了 Elastic Cloud Enterprise 1.1,其中包括 6.0 支持、离线安装,并且对用户体验进行了一系列改进,旨在简化集群的配置、管理和监控。同天发布多款产品的正式版本还不够……还有仍是 Alpha 版本的 APM ,我们邀请大家在 6.0.0 中对它进行测试。
一个版本有如此多的亮点,该从哪里说起呢?你们撰文细述也好,提供详情链接也好,祝你们有愉快的阅读体验……更重要的是……祝你们有愉快的搜索、分析和可视化体验。
Elasticsearch
全新零停机升级体验,增加了序列 ID、改进了对稀疏数据的处理、加快了查询速度、分布式执行 watch 等等。功能摘要请查看详情。
Kibana
支持 “Dashboard Only” 模式,支持 “全屏” 模式,能够将保存的搜索结果导出到 .csv,X-Pack 黄金版及以上版本支持通过 UI 创建告警,X-Pack 基础版提供迁移助手,我们还通过调整对比度、支持快捷键来产品易用性,让用户使用起来更方便。数据交互的未来详见此贴。
Logstash
单一 Logstash 实例中可存在多个自成体系的管道,另有新增 UI 组件 - X-Pack 基础版中的管道查看器,以及 X-Pack 黄金版中的 Logstash 管道管理。了解详情,点这里。
Beats
Beats <3 容器以及 Beats <3 模块(并且改进了适用于这些模块的仪表板)。再结合全新命令和配置布局,在 Metricbeat 实现更高效的存储。此外,全新推出 Auditbeat。细节详见这里。
ES-Hadoop
对Spark的结构化数据流的一流支持已经降落到了 6.0,并重新编写了连接器映射代码以更好地支持多个映射。支持读写新的连接字段也被添加了。用户现在也可以利用非内联脚本类型的更新操作。详细信息。
立即获取!
收起阅读 »
上海普翔招聘 Elastic技术支持工程师
招聘职位:Elastic技术支持工程师
工作地点:上海
薪资待遇:18k ~ 25k
公司介绍:上海普翔是 elastic 目前在中国的合作伙伴,负责 X-Pack、ECE 产品销售以及技术咨询等 elastic相关的业务,详情可以查看 http://elastictech.cn 。
工作内容:
1、与 Elastic 公司一起挖掘国内的付费用户,拜访客户并介绍 X-Pack 、ECE 等产品。
2、参与国内付费用户的 elastic 产品实践,帮助他们解决实际中遇到的问题。
3、参与国内 elastic 产品的推广工作,比如录制教学视频、直播等。
职位要求:
1、本科以上学历,3年以上工作经验。
2、熟悉 Elastic 产品(如 Elasticsearch、Kibana、Logstash、Beats )的使用方法,了解常见优化方案,可以解决社区和客户中用户遇到的问题。
3、需要极强的学习和研究能力,面对一个新产品或者特性时,可以在极短的时间内掌握并通过自己的语言给客户讲解和演示。
4、有良好的沟通和表达能力,擅长倾听客户的问题并快速定位解决问题的关键点。
我们是 elastic 的官方合作伙伴,所以会有很多最新的信息与资料,对 elastic 在中国的发展有信心的同学不要错过机会哦!
欢迎投递简历至:weibinway@elastictech.cn
招聘职位:Elastic技术支持工程师
工作地点:上海
薪资待遇:18k ~ 25k
公司介绍:上海普翔是 elastic 目前在中国的合作伙伴,负责 X-Pack、ECE 产品销售以及技术咨询等 elastic相关的业务,详情可以查看 http://elastictech.cn 。
工作内容:
1、与 Elastic 公司一起挖掘国内的付费用户,拜访客户并介绍 X-Pack 、ECE 等产品。
2、参与国内付费用户的 elastic 产品实践,帮助他们解决实际中遇到的问题。
3、参与国内 elastic 产品的推广工作,比如录制教学视频、直播等。
职位要求:
1、本科以上学历,3年以上工作经验。
2、熟悉 Elastic 产品(如 Elasticsearch、Kibana、Logstash、Beats )的使用方法,了解常见优化方案,可以解决社区和客户中用户遇到的问题。
3、需要极强的学习和研究能力,面对一个新产品或者特性时,可以在极短的时间内掌握并通过自己的语言给客户讲解和演示。
4、有良好的沟通和表达能力,擅长倾听客户的问题并快速定位解决问题的关键点。
我们是 elastic 的官方合作伙伴,所以会有很多最新的信息与资料,对 elastic 在中国的发展有信心的同学不要错过机会哦!
欢迎投递简历至:weibinway@elastictech.cn
收起阅读 »