es里面的在存入时间类型之前,需要提前mapping好吗
helloes 回复了问题 • 2 人关注 • 1 个回复 • 4252 次浏览 • 2017-11-18 16:06
关于elasticsearch的merge操作
helloes 回复了问题 • 3 人关注 • 2 个回复 • 8804 次浏览 • 2017-11-18 16:12
es启动后内存占用过高问题
medcl 回复了问题 • 1 人关注 • 1 个回复 • 5025 次浏览 • 2017-12-05 10:15
请问在es 5.x版本下如何建立多对多的映射关系
xiaochangg 回复了问题 • 3 人关注 • 3 个回复 • 5279 次浏览 • 2019-05-29 16:23
12T的数据,es分片要怎么分片比较好
typuc 回复了问题 • 6 人关注 • 2 个回复 • 4142 次浏览 • 2017-11-17 21:55
es 2.4.4 时间字段range会出现不在该范围内的数据
medcl 回复了问题 • 2 人关注 • 1 个回复 • 1632 次浏览 • 2017-12-05 10:23
es集群所有机器需要安装相同的插件
laoyang360 回复了问题 • 2 人关注 • 1 个回复 • 3246 次浏览 • 2017-11-17 11:56
es preload 内存换出
kennywu76 回复了问题 • 3 人关注 • 1 个回复 • 3700 次浏览 • 2017-11-16 18:41
es集群 副本5个的时候会造成 shards丢失 求各位大大看看
wq131311 回复了问题 • 2 人关注 • 2 个回复 • 2807 次浏览 • 2017-11-17 11:01
es怎么用本地的Python脚本过滤搜索结果
medcl 回复了问题 • 3 人关注 • 2 个回复 • 2935 次浏览 • 2017-12-05 10:30
PreBuiltXPackTransportClient 初始化异常
novia 回复了问题 • 2 人关注 • 2 个回复 • 6071 次浏览 • 2017-12-28 09:23
Elasticsearch Java API 索引的增删改查(二)
quanke 发表了文章 • 1 个评论 • 9080 次浏览 • 2017-11-16 09:53
本节介绍以下 CRUD API:
单文档 APIs
- Index API
- Get API
- Delete API
- Delete By Query API
- Update API
多文档 APIs
- Multi Get API
- Bulk API
- Using Bulk Processor
Multi Get API
Bulk API
注意:所有的单文档的CRUD API,index参数只能接受单一的索引库名称,或者是一个指向单一索引库的alias。
Index API
Index API 允许我们存储一个JSON格式的文档,使数据可以被搜索。文档通过index、type、id唯一确定。我们可以自己提供一个id,或者也使用Index API 为我们自动生成一个。
这里有几种不同的方式来产生JSON格式的文档(document):
- 手动方式,使用原生的byte[]或者String
- 使用Map方式,会自动转换成与之等价的JSON
- 使用第三方库来序列化beans,如Jackson
- 使用内置的帮助类 XContentFactory.jsonBuilder()
手动方式
[数据格式](https://www.elastic.co/guide/e ... t.html)
<br /> String json = "{" +<br /> "\"user\":\"kimchy\"," +<br /> "\"postDate\":\"2013-01-30\"," +<br /> "\"message\":\"trying out Elasticsearch\"" +<br /> "}";<br />
实例
<br /> /** <br /> * 手动生成JSON <br /> */ <br /> @Test <br /> public void CreateJSON(){ <br /> <br /> String json = "{" + <br /> "\"user\":\"fendo\"," + <br /> "\"postDate\":\"2013-01-30\"," + <br /> "\"message\":\"Hell word\"" + <br /> "}"; <br /> <br /> IndexResponse response = client.prepareIndex("fendo", "fendodate") <br /> .setSource(json) <br /> .get(); <br /> System.out.println(response.getResult()); <br /> <br /> } <br />
Map方式
Map是key:value数据类型,可以代表json结构.
<br /> Map<String, Object> json = new HashMap<String, Object>();<br /> json.put("user","kimchy");<br /> json.put("postDate",new Date());<br /> json.put("message","trying out Elasticsearch");<br />
实例
<br /> /** <br /> * 使用集合 <br /> */ <br /> @Test <br /> public void CreateList(){ <br /> <br /> Map<String, Object> json = new HashMap<String, Object>(); <br /> json.put("user","kimchy"); <br /> json.put("postDate","2013-01-30"); <br /> json.put("message","trying out Elasticsearch"); <br /> <br /> IndexResponse response = client.prepareIndex("fendo", "fendodate") <br /> .setSource(json) <br /> .get(); <br /> System.out.println(response.getResult()); <br /> <br /> } <br />
序列化方式
ElasticSearch已经使用了jackson,可以直接使用它把javabean转为json.
<br /> import com.fasterxml.jackson.databind.*;<br /> <br /> // instance a json mapper<br /> ObjectMapper mapper = new ObjectMapper(); // create once, reuse<br /> <br /> // generate json<br /> byte[] json = mapper.writeValueAsBytes(yourbeaninstance);<br />
实例
<br /> /** <br /> * 使用JACKSON序列化 <br /> * @throws Exception <br /> */ <br /> @Test <br /> public void CreateJACKSON() throws Exception{ <br /> <br /> CsdnBlog csdn=new CsdnBlog(); <br /> csdn.setAuthor("fendo"); <br /> csdn.setContent("这是JAVA书籍"); <br /> csdn.setTag("C"); <br /> csdn.setView("100"); <br /> csdn.setTitile("编程"); <br /> csdn.setDate(new Date().toString()); <br /> <br /> // instance a json mapper <br /> ObjectMapper mapper = new ObjectMapper(); // create once, reuse <br /> <br /> // generate json <br /> byte[] json = mapper.writeValueAsBytes(csdn); <br /> <br /> IndexResponse response = client.prepareIndex("fendo", "fendodate") <br /> .setSource(json) <br /> .get(); <br /> System.out.println(response.getResult()); <br /> } <br />
XContentBuilder帮助类方式
ElasticSearch提供了一个内置的帮助类XContentBuilder来产生JSON文档
<br /> // Index name<br /> String _index = response.getIndex();<br /> // Type name<br /> String _type = response.getType();<br /> // Document ID (generated or not)<br /> String _id = response.getId();<br /> // Version (if it's the first time you index this document, you will get: 1)<br /> long _version = response.getVersion();<br /> // status has stored current instance statement.<br /> RestStatus status = response.status();<br />
实例
<br /> /** <br /> * 使用ElasticSearch 帮助类 <br /> * @throws IOException <br /> */ <br /> @Test <br /> public void CreateXContentBuilder() throws IOException{ <br /> <br /> XContentBuilder builder = XContentFactory.jsonBuilder() <br /> .startObject() <br /> .field("user", "ccse") <br /> .field("postDate", new Date()) <br /> .field("message", "this is Elasticsearch") <br /> .endObject(); <br /> <br /> IndexResponse response = client.prepareIndex("fendo", "fendodata").setSource(builder).get(); <br /> System.out.println("创建成功!"); <br /> <br /> <br /> } <br />
综合实例
<br /> <br /> import java.io.IOException; <br /> import java.net.InetAddress; <br /> import java.net.UnknownHostException; <br /> import java.util.Date; <br /> import java.util.HashMap; <br /> import java.util.Map; <br /> <br /> import org.elasticsearch.action.index.IndexResponse; <br /> import org.elasticsearch.client.transport.TransportClient; <br /> import org.elasticsearch.common.settings.Settings; <br /> import org.elasticsearch.common.transport.InetSocketTransportAddress; <br /> import org.elasticsearch.common.xcontent.XContentBuilder; <br /> import org.elasticsearch.common.xcontent.XContentFactory; <br /> import org.elasticsearch.transport.client.PreBuiltTransportClient; <br /> import org.junit.Before; <br /> import org.junit.Test; <br /> <br /> import com.fasterxml.jackson.core.JsonProcessingException; <br /> import com.fasterxml.jackson.databind.ObjectMapper; <br /> <br /> public class CreateIndex { <br /> <br /> private TransportClient client; <br /> <br /> @Before <br /> public void getClient() throws Exception{ <br /> //设置集群名称 <br /> Settings settings = Settings.builder().put("cluster.name", "my-application").build();// 集群名 <br /> //创建client <br /> client = new PreBuiltTransportClient(settings) <br /> .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); <br /> } <br /> <br /> /** <br /> * 手动生成JSON <br /> */ <br /> @Test <br /> public void CreateJSON(){ <br /> <br /> String json = "{" + <br /> "\"user\":\"fendo\"," + <br /> "\"postDate\":\"2013-01-30\"," + <br /> "\"message\":\"Hell word\"" + <br /> "}"; <br /> <br /> IndexResponse response = client.prepareIndex("fendo", "fendodate") <br /> .setSource(json) <br /> .get(); <br /> System.out.println(response.getResult()); <br /> <br /> } <br /> <br /> <br /> /** <br /> * 使用集合 <br /> */ <br /> @Test <br /> public void CreateList(){ <br /> <br /> Map<String, Object> json = new HashMap<String, Object>(); <br /> json.put("user","kimchy"); <br /> json.put("postDate","2013-01-30"); <br /> json.put("message","trying out Elasticsearch"); <br /> <br /> IndexResponse response = client.prepareIndex("fendo", "fendodate") <br /> .setSource(json) <br /> .get(); <br /> System.out.println(response.getResult()); <br /> <br /> } <br /> <br /> /** <br /> * 使用JACKSON序列化 <br /> * @throws Exception <br /> */ <br /> @Test <br /> public void CreateJACKSON() throws Exception{ <br /> <br /> CsdnBlog csdn=new CsdnBlog(); <br /> csdn.setAuthor("fendo"); <br /> csdn.setContent("这是JAVA书籍"); <br /> csdn.setTag("C"); <br /> csdn.setView("100"); <br /> csdn.setTitile("编程"); <br /> csdn.setDate(new Date().toString()); <br /> <br /> // instance a json mapper <br /> ObjectMapper mapper = new ObjectMapper(); // create once, reuse <br /> <br /> // generate json <br /> byte[] json = mapper.writeValueAsBytes(csdn); <br /> <br /> IndexResponse response = client.prepareIndex("fendo", "fendodate") <br /> .setSource(json) <br /> .get(); <br /> System.out.println(response.getResult()); <br /> } <br /> <br /> /** <br /> * 使用ElasticSearch 帮助类 <br /> * @throws IOException <br /> */ <br /> @Test <br /> public void CreateXContentBuilder() throws IOException{ <br /> <br /> XContentBuilder builder = XContentFactory.jsonBuilder() <br /> .startObject() <br /> .field("user", "ccse") <br /> .field("postDate", new Date()) <br /> .field("message", "this is Elasticsearch") <br /> .endObject(); <br /> <br /> IndexResponse response = client.prepareIndex("fendo", "fendodata").setSource(builder).get(); <br /> System.out.println("创建成功!"); <br /> <br /> <br /> } <br /> <br /> } <br />
你还可以通过startArray(string)和endArray()方法添加数组。.field()方法可以接受多种对象类型。你可以给它传递数字、日期、甚至其他XContentBuilder对象。
Get API
根据id查看文档:
<br /> GetResponse response = client.prepareGet("twitter", "tweet", "1").get();<br /> <br />
更多请查看 [rest get API](https://www.elastic.co/guide/e ... t.html) 文档
配置线程
operationThreaded
设置为true
是在不同的线程里执行此次操作
下面的例子是operationThreaded
设置为false
:
<br /> GetResponse response = client.prepareGet("twitter", "tweet", "1")<br /> .setOperationThreaded(false)<br /> .get();<br />
Delete API
根据ID删除:
<br /> DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();<br /> <br />
更多请查看 [delete API](https://www.elastic.co/guide/e ... e.html) 文档
配置线程
operationThreaded
设置为true
是在不同的线程里执行此次操作
下面的例子是operationThreaded
设置为false
:
<br /> GetResponse response = client.prepareGet("twitter", "tweet", "1")<br /> .setOperationThreaded(false)<br /> .get();<br />
<br /> DeleteResponse response = client.prepareDelete("twitter", "tweet", "1")<br /> .setOperationThreaded(false)<br /> .get();<br />
Delete By Query API
通过查询条件删除
<br /> BulkByScrollResponse response =<br /> DeleteByQueryAction.INSTANCE.newRequestBuilder(client)<br /> .filter(QueryBuilders.matchQuery("gender", "male")) //查询条件<br /> .source("persons") //index(索引名)<br /> .get(); //执行<br /> <br /> long deleted = response.getDeleted(); //删除文档的数量<br />
如果需要执行的时间比较长,可以使用异步的方式处理,结果在回调里面获取
<br /> DeleteByQueryAction.INSTANCE.newRequestBuilder(client)<br /> .filter(QueryBuilders.matchQuery("gender", "male")) //查询 <br /> .source("persons") //index(索引名) <br /> .execute(new ActionListener<BulkByScrollResponse>() { //回调监听 <br /> @Override<br /> public void onResponse(BulkByScrollResponse response) {<br /> long deleted = response.getDeleted(); //删除文档的数量 <br /> }<br /> @Override<br /> public void onFailure(Exception e) {<br /> // Handle the exception<br /> }<br /> });<br />
Update API
有两种方式更新索引: - 创建
UpdateRequest
,通过client发送; - 使用
prepareUpdate()
方法;
使用UpdateRequest
<br /> UpdateRequest updateRequest = new UpdateRequest();<br /> updateRequest.index("index");<br /> updateRequest.type("type");<br /> updateRequest.id("1");<br /> updateRequest.doc(jsonBuilder()<br /> .startObject()<br /> .field("gender", "male")<br /> .endObject());<br /> client.update(updateRequest).get();<br />
使用
prepareUpdate()
方法这里官方的示例有问题,new Script()参数错误,所以一下代码是我自己写的(2017/11/10)
<br /> client.prepareUpdate("ttl", "doc", "1")<br /> .setScript(new Script("ctx._source.gender = \"male\"" ,ScriptService.ScriptType.INLINE, null, null))//脚本可以是本地文件存储的,如果使用文件存储的脚本,需要设置 ScriptService.ScriptType.FILE <br /> .get();<br /> <br /> client.prepareUpdate("ttl", "doc", "1")<br /> .setDoc(jsonBuilder() //合并到现有文档<br /> .startObject()<br /> .field("gender", "male")<br /> .endObject())<br /> .get();<br />
Update by script
使用脚本更新文档
<br /> UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1")<br /> .script(new Script("ctx._source.gender = \"male\""));<br /> client.update(updateRequest).get();<br /> <br />
Update by merging documents
合并文档
<br /> UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")<br /> .doc(jsonBuilder()<br /> .startObject()<br /> .field("gender", "male")<br /> .endObject());<br /> client.update(updateRequest).get();<br />
Upsert
更新插入,如果存在文档就更新,如果不存在就插入
<br /> IndexRequest indexRequest = new IndexRequest("index", "type", "1")<br /> .source(jsonBuilder()<br /> .startObject()<br /> .field("name", "Joe Smith")<br /> .field("gender", "male")<br /> .endObject());<br /> UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")<br /> .doc(jsonBuilder()<br /> .startObject()<br /> .field("gender", "male")<br /> .endObject())<br /> .upsert(indexRequest); //如果不存在此文档 ,就增加 `indexRequest`<br /> client.update(updateRequest).get();<br />
如果index/type/1
存在,类似下面的文档:
<br /> {<br /> "name" : "Joe Dalton",<br /> "gender": "male" <br /> }<br />
如果不存在,会插入新的文档:
<br /> {<br /> "name" : "Joe Smith",<br /> "gender": "male"<br /> }<br />
Multi Get API
一次获取多个文档
<br /> MultiGetResponse multiGetItemResponses = client.prepareMultiGet()<br /> .add("twitter", "tweet", "1") //一个id的方式<br /> .add("twitter", "tweet", "2", "3", "4") //多个id的方式<br /> .add("another", "type", "foo") //可以从另外一个索引获取<br /> .get();<br /> <br /> for (MultiGetItemResponse itemResponse : multiGetItemResponses) { //迭代返回值<br /> GetResponse response = itemResponse.getResponse();<br /> if (response.isExists()) { //判断是否存在 <br /> String json = response.getSourceAsString(); //_source 字段<br /> }<br /> }<br />
更多请浏览REST [multi get](https://www.elastic.co/guide/e ... t.html) 文档
Bulk API
Bulk API,批量插入:
<br /> import static org.elasticsearch.common.xcontent.XContentFactory.*;<br />
<br /> BulkRequestBuilder bulkRequest = client.prepareBulk();<br /> <br /> // either use client#prepare, or use Requests# to directly build index/delete requests<br /> bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")<br /> .setSource(jsonBuilder()<br /> .startObject()<br /> .field("user", "kimchy")<br /> .field("postDate", new Date())<br /> .field("message", "trying out Elasticsearch")<br /> .endObject()<br /> )<br /> );<br /> <br /> bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")<br /> .setSource(jsonBuilder()<br /> .startObject()<br /> .field("user", "kimchy")<br /> .field("postDate", new Date())<br /> .field("message", "another post")<br /> .endObject()<br /> )<br /> );<br /> <br /> BulkResponse bulkResponse = bulkRequest.get();<br /> if (bulkResponse.hasFailures()) {<br /> // process failures by iterating through each bulk response item<br /> //处理失败<br /> }<br />
使用 Bulk Processor
BulkProcessor 提供了一个简单的接口,在给定的大小数量上定时批量自动请求
创建
BulkProcessor
实例
首先创建BulkProcessor
实例
<br /> import org.elasticsearch.action.bulk.BackoffPolicy;<br /> import org.elasticsearch.action.bulk.BulkProcessor;<br /> import org.elasticsearch.common.unit.ByteSizeUnit;<br /> import org.elasticsearch.common.unit.ByteSizeValue;<br /> import org.elasticsearch.common.unit.TimeValue;<br />
<br /> BulkProcessor bulkProcessor = BulkProcessor.builder(<br /> client, //增加elasticsearch客户端<br /> new BulkProcessor.Listener() {<br /> @Override<br /> public void beforeBulk(long executionId,<br /> BulkRequest request) { ... } //调用bulk之前执行 ,例如你可以通过request.numberOfActions()方法知道numberOfActions<br /> <br /> @Override<br /> public void afterBulk(long executionId,<br /> BulkRequest request,<br /> BulkResponse response) { ... } //调用bulk之后执行 ,例如你可以通过request.hasFailures()方法知道是否执行失败<br /> <br /> @Override<br /> public void afterBulk(long executionId,<br /> BulkRequest request,<br /> Throwable failure) { ... } //调用失败抛 Throwable<br /> })<br /> .setBulkActions(10000) //每次10000请求<br /> .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) //拆成5mb一块<br /> .setFlushInterval(TimeValue.timeValueSeconds(5)) //无论请求数量多少,每5秒钟请求一次。<br /> .setConcurrentRequests(1) //设置并发请求的数量。值为0意味着只允许执行一个请求。值为1意味着允许1并发请求。<br /> .setBackoffPolicy(<br /> BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))//设置自定义重复请求机制,最开始等待100毫秒,之后成倍更加,重试3次,当一次或多次重复请求失败后因为计算资源不够抛出 EsRejectedExecutionException 异常,可以通过BackoffPolicy.noBackoff()方法关闭重试机制<br /> .build();<br />
BulkProcessor 默认设置
- bulkActions 1000
- bulkSize 5mb
- 不设置flushInterval
- concurrentRequests 为 1 ,异步执行
- backoffPolicy 重试 8次,等待50毫秒
增加requests
然后增加
requests
到BulkProcessor
<br /> bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));<br /> bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));<br />
关闭 Bulk Processor
当所有文档都处理完成,使用
awaitClose
或close
方法关闭BulkProcessor
:
<br /> bulkProcessor.awaitClose(10, TimeUnit.MINUTES);<br /> <br />
或
<br /> bulkProcessor.close();<br /> <br />
在测试中使用Bulk Processor
如果你在测试种使用Bulk Processor
可以执行同步方法
<br /> BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })<br /> .setBulkActions(10000)<br /> .setConcurrentRequests(0)<br /> .build();<br /> <br /> // Add your requests<br /> bulkProcessor.add(/* Your requests */);<br /> <br /> // Flush any remaining requests<br /> bulkProcessor.flush();<br /> <br /> // Or close the bulkProcessor if you don't need it anymore<br /> bulkProcessor.close();<br /> <br /> // Refresh your indices<br /> client.admin().indices().prepareRefresh().get();<br /> <br /> // Now you can start searching!<br /> client.prepareSearch().get();<br />
[所有实例](https://gitee.com/quanke/elasticsearch-java-study) 已经上传到Git
更多请浏览 [spring-boot-starter-es](https://github.com/quanke/spring-boot-starter-es) 开源项目
如何有任何问题请关注微信公众号给我留言