Day24: Elasticsearch添加Shield后TransportClient如何连接?
Elasticsearch使用了Shield后,Elasticsearch就需要权限才能访问了,和默认的调用方式有些不同,下面简单介绍一下HTTP和TCP两种方式的连接.
关于Shield的安装和配置我这里不就具体介绍,创建了一个用户名和密码都是tribe_user的用户,权限是admin.
1.HTTP方式
现在直接访问es的http接口就会报错
curl http://localhost:9200
{"error":{"root_cause":[{"type":"security_exception","reason":"missing authentication token for REST request [/]","header":{"WWW-Authenticate":"Basic realm=\"shield\""}}],"type":"security_exception","reason":"missing authentication token for REST request [/]","header":{"WWW-Authenticate":"Basic realm=\"shield\""}},"status":401}
shield支持HttpBasic验证,所以正确的访问姿势是:
curl -u tribe_user:tribe_user http://localhost:9200 { "name" : "Melter", "cluster_name" : "elasticsearch", "version" : { "number" : "2.1.1", "build_hash" : "805c528f3167980046f224310f9147fa745e5371", "build_timestamp" : "2015-12-09T20:23:16Z", "build_snapshot" : false, "lucene_version" : "5.3.1" }, "tagline" : "You Know, for Search" }
如果是浏览器访问的话,第一次访问会弹出验证窗口,后续只要不关闭这个浏览器保持这个session就能一直访问.
注意http basic是不安全的认证方式,仅供开发调试使用,生产环境还需要结合HTTPS的加密通道使用.
2.TransportClient方式的访问Shield加防的Elasticsearch,稍微麻烦点,需要依赖Shield的包,步骤如下:
2.1 如果你是maven管理的项目,在pom.xml文件里添加Elasticsearch的maven仓库源,如下:
<repositories>
<repository>
<id>elasticsearch-releases</id>
<url>https://maven.elasticsearch.or ... gt%3B
<releases> <enabled>true</enabled> </releases>
<snapshots> <enabled>false</enabled> </snapshots>
</repository>
</repositories>
2.2 添加依赖的配置
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>shield</artifactId>
<version>2.1.1</version>
</dependency
2.3 构建TransportClient的地方增加访问用户的配置
import org.elasticsearch.shield.ShieldPlugin; import org.elasticsearch.shield.authc.support.SecuredString; import static org.elasticsearch.shield.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
String clusterName="elasticsearch"; String ip= "127.0.0.1";
Settings settings = Settings.settingsBuilder()
.put("cluster.name", clusterName)
.put("shield.user", "tribe_user:tribe_user")
.build();
try { client = TransportClient.builder()
.addPlugin(ShieldPlugin.class)
.settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ip),9300));
String token = basicAuthHeaderValue("tribe_user", new SecuredString("tribe_user".toCharArray())); client.prepareSearch()
.putHeader("Authorization", token).get(); }
catch (UnknownHostException e)
{ logger.error("es",e); }
现在的编辑器贴代码有点恶心,可以看这里:
http://log.medcl.net/item/2015 ... -1252
Elasticsearch使用了Shield后,Elasticsearch就需要权限才能访问了,和默认的调用方式有些不同,下面简单介绍一下HTTP和TCP两种方式的连接.
关于Shield的安装和配置我这里不就具体介绍,创建了一个用户名和密码都是tribe_user的用户,权限是admin.
1.HTTP方式
现在直接访问es的http接口就会报错
curl http://localhost:9200
{"error":{"root_cause":[{"type":"security_exception","reason":"missing authentication token for REST request [/]","header":{"WWW-Authenticate":"Basic realm=\"shield\""}}],"type":"security_exception","reason":"missing authentication token for REST request [/]","header":{"WWW-Authenticate":"Basic realm=\"shield\""}},"status":401}
shield支持HttpBasic验证,所以正确的访问姿势是:
curl -u tribe_user:tribe_user http://localhost:9200 { "name" : "Melter", "cluster_name" : "elasticsearch", "version" : { "number" : "2.1.1", "build_hash" : "805c528f3167980046f224310f9147fa745e5371", "build_timestamp" : "2015-12-09T20:23:16Z", "build_snapshot" : false, "lucene_version" : "5.3.1" }, "tagline" : "You Know, for Search" }
如果是浏览器访问的话,第一次访问会弹出验证窗口,后续只要不关闭这个浏览器保持这个session就能一直访问.
注意http basic是不安全的认证方式,仅供开发调试使用,生产环境还需要结合HTTPS的加密通道使用.
2.TransportClient方式的访问Shield加防的Elasticsearch,稍微麻烦点,需要依赖Shield的包,步骤如下:
2.1 如果你是maven管理的项目,在pom.xml文件里添加Elasticsearch的maven仓库源,如下:
<repositories>
<repository>
<id>elasticsearch-releases</id>
<url>https://maven.elasticsearch.or ... gt%3B
<releases> <enabled>true</enabled> </releases>
<snapshots> <enabled>false</enabled> </snapshots>
</repository>
</repositories>
2.2 添加依赖的配置
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>shield</artifactId>
<version>2.1.1</version>
</dependency
2.3 构建TransportClient的地方增加访问用户的配置
import org.elasticsearch.shield.ShieldPlugin; import org.elasticsearch.shield.authc.support.SecuredString; import static org.elasticsearch.shield.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
String clusterName="elasticsearch"; String ip= "127.0.0.1";
Settings settings = Settings.settingsBuilder()
.put("cluster.name", clusterName)
.put("shield.user", "tribe_user:tribe_user")
.build();
try { client = TransportClient.builder()
.addPlugin(ShieldPlugin.class)
.settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ip),9300));
String token = basicAuthHeaderValue("tribe_user", new SecuredString("tribe_user".toCharArray())); client.prepareSearch()
.putHeader("Authorization", token).get(); }
catch (UnknownHostException e)
{ logger.error("es",e); }
现在的编辑器贴代码有点恶心,可以看这里:
http://log.medcl.net/item/2015 ... -1252 收起阅读 »
Day 23 谈谈ES 的Recovery
Recovery是指将一个索引的未分配shard分配到一个结点的过程。 在快照恢复,更改索引复制片数量,结点故障或者结点启动时发生。由于master持有整个集群的状态信息,因此可以判断出哪些shard需要做再分配,以及分配到哪个结点。例如:
- 如果某个shard主片在,副片所在结点挂了,那么选择另外一个可用结点,将副片分配(allocate)上去,然后进行主从片的复制。
- 如果某个shard的主片所在结点挂了,副片还在,那么将副片升级为主片,然后做主副复制。
- 如果某个shard的主副片所在结点都挂了,则暂时无法恢复,等待持有相关数据的结点重新加入集群后,从结点上恢复主分片,再选择某个结点分配复制片,并从主分片同步数据。
通过CAT health API,我们可以查看集群的状态,从而获知数据的完整性情况:
可能的状态及含义:
Green: 所有的shard主副片都完好的
Yellow: 所有shard的主片都完好,部分副片没有了,数据完整性依然完好。
Red: 某些shard的主副片都没有了,对应的索引数据不完整
Recovery过程要消耗额外的资源,CPU、内存、结点之间的网络带宽等等。 这些额外的资源消耗,有可能会导致集群的服务能力降级,或者一部分功能暂时不可用。了解一些Recovery的过程和相关的配置参数,对于减小recovery带来的资源消耗,加快集群恢复过程都是很有帮助的。
减少集群Full Restart造成的数据来回拷贝
集群可能会有整体重启的需要,比如需要升级硬件、升级操作系统或者升级ES大版本。重启所有结点可能带来的一个问题: 某些结点可能先于其他结点加入集群。 先加入集群的结点可能已经可以选举好master,并立即启动了recovery的过程,由于这个时候整个集群数据还不完整,master会指示一些结点之间相互开始复制数据。 那些晚到的结点,一旦发现本地的数据已经被复制到其他结点,则直接删除掉本地“失效”的数据。 当整个集群恢复完毕后,数据分布不均衡显然是不均衡的,master会触发rebalance过程,将数据在结点之间挪动。整个过程无谓消耗了大量的网络流量。 合理设置recovery相关参数则可以防范这种问题的发生。
gateway.expected_nodes
gateway.expected_master_nodes
gateway.expected_data_nodes
以上三个参数是说集群里一旦有多少个结点就立即开始recovery过程。 不同之处在于,第一个参数指的是master或者data都算在内,而后面两个参数则分指master和data node。
在期待的节点数条件满足之前, recovery过程会等待gateway.recover_after_time (默认5分钟) 这么长时间,一旦等待超时,则会根据以下条件判断是否启动:
gateway.recover_after_nodes
gateway.recover_after_master_nodes
gateway.recover_after_data_nodes
举例来说,对于一个有10个data node的集群,如果有以下的设置:
gateway.expected_data_nodes: 10
gateway.recover_after_time: 5m
gateway.recover_after_data_nodes: 8
那么集群5分钟以内10个data node都加入了,或者5分钟以后8个以上的data node加入了,都会立即启动recovery过程。
减少主副本之间的数据复制
如果不是full restart,而是重启单个data node,仍然会造成数据在不同结点之间来回复制。为避免这个问题,可以在重启之前,先关闭集群的shard allocation:
然后在结点重启完成加入集群后,再重新打开:
这样在结点重启完成后,尽量多的从本地直接恢复数据。
但是在ES1.6版本之前,即使做了以上措施,仍然会发现有大量主副本之间的数据拷贝。从表面去看,这点很让人不能理解。 主副本数据完全一致,ES应该直接从副本本地恢复数据就好了,为什么要重新从主片再复制一遍呢? 原因在于Recovery是简单对比主副本的segment file来判断哪些数据一致可以本地恢复,哪些不一致需要远端拷贝的。而不同结点的segment merge是完全独立运行的,可能导致主副本merge的深度不完全一样,从而造成即使文档集完全一样,产生的segment file却不完全一样。
为了解决这个问题,ES1.6版本以后加入了synced flush的新特性。 对于5分钟没有更新过的shard,会自动synced flush一下,实质是为对应的shard加了一个synced flush ID。这样当重启结点的时候,先对比一下shard的synced flush ID,就可以知道两个shard是否完全相同,避免了不必要的segment file拷贝,极大加快了冷索引的恢复速度。
需要注意的是synced flush只对冷索引有效,对于热索引(5分钟内有更新的索引)没有作用。 如果重启的结点包含有热索引,那么还是免不了大量的文件拷贝。因此在重启一个结点之前,最好按照以下步骤执行,recovery几乎可以瞬间完成:
- 暂停数据写入程序
- 关闭集群shard allocation
- 手动执行POST /_flush/synced
- 重启结点
- 重新开启集群shard allocation
- 等待recovery完成,集群health status变成green
- 重新开启数据写入程序
(特别大的)热索引为何恢复慢
对于冷索引,由于数据不再更新,利用synced flush特性,可以快速直接从本地恢复数据。 而对于热索引,特别是shard很大的热索引,除了synced flush派不上用场需要大量跨结点拷贝segment file以外,translog recovery是导致慢的更重要的原因。
从主片恢复数据到副片需要经历3个阶段:
- 对主片上的segment file做一个快照,然后拷贝到复制片分配到的结点。数据拷贝期间,不会阻塞索引请求,新增索引操作记录到translog里。
- 对translog做一个快照,此快照包含第一阶段新增的索引请求,然后重放快照里的索引操作。此阶段仍然不阻塞索引请求,新增索引操作记录到translog里。
- 为了能达到主副片完全同步,阻塞掉新索引请求,然后重放阶段二新增的translog操作。
可见,在recovery完成之前,translog是不能够被清除掉的(禁用掉正常运作期间后台的flush操作)。如果shard比较大,第一阶段耗时很长,会导致此阶段产生的translog很大。重放translog比起简单的文件拷贝耗时要长得多,因此第二阶段的translog耗时也会显著增加。等到第三阶段,需要重放的translog可能会比第二阶段还要多。 而第三阶段是会阻塞新索引写入的,在对写入实时性要求很高的场合,就会非常影响用户体验。 因此,要加快大的热索引恢复速度,最好的方式是遵从上一节提到的方法: 暂停新数据写入,手动sync flush,等待数据恢复完成后,重新开启数据写入,这样可以将数据延迟影响可以降到最低。
万一遇到Recovery慢,想知道进度怎么办呢? CAT Recovery API可以显示详细的recovery各个阶段的状态。 这个API怎么用就不在这里赘述了,参考: CAT Recovery
其他Recovery相关的专家级设置
还有其他一些专家级的设置(参见: recovery)可以影响recovery的速度,但提升速度的代价是更多的资源消耗,因此在生产集群上调整这些参数需要结合实际情况谨慎调整,一旦影响应用要立即调整回来。 对于搜索并发量要求高,延迟要求低的场合,默认设置一般就不要去动了。 对于日志实时分析类对于搜索延迟要求不高,但对于数据写入延迟期望比较低的场合,可以适当调大indices.recovery.max_bytes_per_sec,提升recovery速度,减少数据写入被阻塞的时长。
最后要说的一点是ES的版本迭代很快,对于Recovery的机制也在不断的优化中。 其中有一些版本甚至引入了一些bug,比如在ES1.4.x有严重的translog recovery bug,导致大的索引trans log recovery几乎无法完成 (issue #9226) 。因此实际使用中如果遇到问题,最好在Github的issue list里搜索一下,看是否使用的版本有其他人反映同样的问题。
Recovery是指将一个索引的未分配shard分配到一个结点的过程。 在快照恢复,更改索引复制片数量,结点故障或者结点启动时发生。由于master持有整个集群的状态信息,因此可以判断出哪些shard需要做再分配,以及分配到哪个结点。例如:
- 如果某个shard主片在,副片所在结点挂了,那么选择另外一个可用结点,将副片分配(allocate)上去,然后进行主从片的复制。
- 如果某个shard的主片所在结点挂了,副片还在,那么将副片升级为主片,然后做主副复制。
- 如果某个shard的主副片所在结点都挂了,则暂时无法恢复,等待持有相关数据的结点重新加入集群后,从结点上恢复主分片,再选择某个结点分配复制片,并从主分片同步数据。
通过CAT health API,我们可以查看集群的状态,从而获知数据的完整性情况:
可能的状态及含义:
Green: 所有的shard主副片都完好的
Yellow: 所有shard的主片都完好,部分副片没有了,数据完整性依然完好。
Red: 某些shard的主副片都没有了,对应的索引数据不完整
Recovery过程要消耗额外的资源,CPU、内存、结点之间的网络带宽等等。 这些额外的资源消耗,有可能会导致集群的服务能力降级,或者一部分功能暂时不可用。了解一些Recovery的过程和相关的配置参数,对于减小recovery带来的资源消耗,加快集群恢复过程都是很有帮助的。
减少集群Full Restart造成的数据来回拷贝
集群可能会有整体重启的需要,比如需要升级硬件、升级操作系统或者升级ES大版本。重启所有结点可能带来的一个问题: 某些结点可能先于其他结点加入集群。 先加入集群的结点可能已经可以选举好master,并立即启动了recovery的过程,由于这个时候整个集群数据还不完整,master会指示一些结点之间相互开始复制数据。 那些晚到的结点,一旦发现本地的数据已经被复制到其他结点,则直接删除掉本地“失效”的数据。 当整个集群恢复完毕后,数据分布不均衡显然是不均衡的,master会触发rebalance过程,将数据在结点之间挪动。整个过程无谓消耗了大量的网络流量。 合理设置recovery相关参数则可以防范这种问题的发生。
gateway.expected_nodes
gateway.expected_master_nodes
gateway.expected_data_nodes
以上三个参数是说集群里一旦有多少个结点就立即开始recovery过程。 不同之处在于,第一个参数指的是master或者data都算在内,而后面两个参数则分指master和data node。
在期待的节点数条件满足之前, recovery过程会等待gateway.recover_after_time (默认5分钟) 这么长时间,一旦等待超时,则会根据以下条件判断是否启动:
gateway.recover_after_nodes
gateway.recover_after_master_nodes
gateway.recover_after_data_nodes
举例来说,对于一个有10个data node的集群,如果有以下的设置:
gateway.expected_data_nodes: 10
gateway.recover_after_time: 5m
gateway.recover_after_data_nodes: 8
那么集群5分钟以内10个data node都加入了,或者5分钟以后8个以上的data node加入了,都会立即启动recovery过程。
减少主副本之间的数据复制
如果不是full restart,而是重启单个data node,仍然会造成数据在不同结点之间来回复制。为避免这个问题,可以在重启之前,先关闭集群的shard allocation:
然后在结点重启完成加入集群后,再重新打开:
这样在结点重启完成后,尽量多的从本地直接恢复数据。
但是在ES1.6版本之前,即使做了以上措施,仍然会发现有大量主副本之间的数据拷贝。从表面去看,这点很让人不能理解。 主副本数据完全一致,ES应该直接从副本本地恢复数据就好了,为什么要重新从主片再复制一遍呢? 原因在于Recovery是简单对比主副本的segment file来判断哪些数据一致可以本地恢复,哪些不一致需要远端拷贝的。而不同结点的segment merge是完全独立运行的,可能导致主副本merge的深度不完全一样,从而造成即使文档集完全一样,产生的segment file却不完全一样。
为了解决这个问题,ES1.6版本以后加入了synced flush的新特性。 对于5分钟没有更新过的shard,会自动synced flush一下,实质是为对应的shard加了一个synced flush ID。这样当重启结点的时候,先对比一下shard的synced flush ID,就可以知道两个shard是否完全相同,避免了不必要的segment file拷贝,极大加快了冷索引的恢复速度。
需要注意的是synced flush只对冷索引有效,对于热索引(5分钟内有更新的索引)没有作用。 如果重启的结点包含有热索引,那么还是免不了大量的文件拷贝。因此在重启一个结点之前,最好按照以下步骤执行,recovery几乎可以瞬间完成:
- 暂停数据写入程序
- 关闭集群shard allocation
- 手动执行POST /_flush/synced
- 重启结点
- 重新开启集群shard allocation
- 等待recovery完成,集群health status变成green
- 重新开启数据写入程序
(特别大的)热索引为何恢复慢
对于冷索引,由于数据不再更新,利用synced flush特性,可以快速直接从本地恢复数据。 而对于热索引,特别是shard很大的热索引,除了synced flush派不上用场需要大量跨结点拷贝segment file以外,translog recovery是导致慢的更重要的原因。
从主片恢复数据到副片需要经历3个阶段:
- 对主片上的segment file做一个快照,然后拷贝到复制片分配到的结点。数据拷贝期间,不会阻塞索引请求,新增索引操作记录到translog里。
- 对translog做一个快照,此快照包含第一阶段新增的索引请求,然后重放快照里的索引操作。此阶段仍然不阻塞索引请求,新增索引操作记录到translog里。
- 为了能达到主副片完全同步,阻塞掉新索引请求,然后重放阶段二新增的translog操作。
可见,在recovery完成之前,translog是不能够被清除掉的(禁用掉正常运作期间后台的flush操作)。如果shard比较大,第一阶段耗时很长,会导致此阶段产生的translog很大。重放translog比起简单的文件拷贝耗时要长得多,因此第二阶段的translog耗时也会显著增加。等到第三阶段,需要重放的translog可能会比第二阶段还要多。 而第三阶段是会阻塞新索引写入的,在对写入实时性要求很高的场合,就会非常影响用户体验。 因此,要加快大的热索引恢复速度,最好的方式是遵从上一节提到的方法: 暂停新数据写入,手动sync flush,等待数据恢复完成后,重新开启数据写入,这样可以将数据延迟影响可以降到最低。
万一遇到Recovery慢,想知道进度怎么办呢? CAT Recovery API可以显示详细的recovery各个阶段的状态。 这个API怎么用就不在这里赘述了,参考: CAT Recovery
其他Recovery相关的专家级设置
还有其他一些专家级的设置(参见: recovery)可以影响recovery的速度,但提升速度的代价是更多的资源消耗,因此在生产集群上调整这些参数需要结合实际情况谨慎调整,一旦影响应用要立即调整回来。 对于搜索并发量要求高,延迟要求低的场合,默认设置一般就不要去动了。 对于日志实时分析类对于搜索延迟要求不高,但对于数据写入延迟期望比较低的场合,可以适当调大indices.recovery.max_bytes_per_sec,提升recovery速度,减少数据写入被阻塞的时长。
最后要说的一点是ES的版本迭代很快,对于Recovery的机制也在不断的优化中。 其中有一些版本甚至引入了一些bug,比如在ES1.4.x有严重的translog recovery bug,导致大的索引trans log recovery几乎无法完成 (issue #9226) 。因此实际使用中如果遇到问题,最好在Github的issue list里搜索一下,看是否使用的版本有其他人反映同样的问题。 收起阅读 »
elasticsearch-rtf更新至2.1.1
使用git快速签出最新版:
git clone git://github.com/medcl/elasticsearch-rtf.git -b master --depth 1
包含插件:
elasticsearch-analysis-ik-1.6.2 elasticsearch-analysis-pinyin-1.5.2
elasticsearch-analysis-mmseg-1.6.2 elasticsearch-analysis-stconvert-1.6.1
使用:
cd elasticsearch/bin
./elasticsearch
使用git快速签出最新版:
git clone git://github.com/medcl/elasticsearch-rtf.git -b master --depth 1
包含插件:
elasticsearch-analysis-ik-1.6.2 elasticsearch-analysis-pinyin-1.5.2
elasticsearch-analysis-mmseg-1.6.2 elasticsearch-analysis-stconvert-1.6.1
使用:
cd elasticsearch/bin
./elasticsearch
收起阅读 »
Day22:pipeline aggregation计算日留存率示例
目前我想到的比较容易达成的做法,是我们在记录用户登录操作日志的时候,把该用户的注册时间也同期输出。也就是说,这个索引的 mapping 是下面这样:
curl -XPUT 'http://127.0.0.1:9200/login-2015.12.23/' -d '{
"settings" : {
"number_of_shards" : 1
},
"mappings" : {
"logs" : {
"properties" : {
"uid" : { "type" : "string", "index" : "not_analyzed" },
"register_time" : { "type" : "date", "index" : "not_analyzed" },
"login_time" : { "type" : "date", "index" : "not_analyzed" }
}
}
}
}'
那么实际记录的日志会类似这样:{"index":{"_index":"login-2015.12.23","_type":"logs"}}
{"uid":"1","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-23T12:00:00Z"}
{"index":{"_index":"login-2015.12.23","_type":"logs"}}
{"uid":"2","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-23T12:00:00Z"}
{"index":{"_index":"login-2015.12.24","_type":"logs"}}
{"uid":"1","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-24T12:00:00Z"}
这段我虚拟的数据,表示 uid 为 1 的用户,23 号注册并登录,24 号再次登录;uid 为 2 的用户,23 号注册并登录,24 号无登录。显然以这短短 3 行示例数据,我们口算都知道单日留存率是 50% 了。那么怎么通过一次 ES 请求也算出来呢?下面就要用到 ES 2.0 新增加的 pipeline aggregation 了。
curl -XPOST 'http://127.0.0.1:9200/login-2015.12.23,login-2015.12.24/_search' -d'
{
"size" : 0,
"aggs" : {
"new_users" : {
"filters" : {
"filters" : [
{
"range" : {
"register_time" : {
"gte" : "2015-12-23",
"lt" : "2015-12-24"
}
}
}
]
},
"aggs" : {
"register_count" : {
"cardinality" : {
"field" : "uid"
}
},
"today" : {
"filter" : {
"range" : {
"login_time" : {
"gte" : "2015-12-24",
"lt" : "2015-12-25"
}
}
},
"aggs" : {
"login_count" : {
"cardinality" : {
"field" : "uid"
}
}
}
},
"retention" : {
"bucket_script" : {
"buckets_path" : {
"today_count" : "today>login_count",
"yesterday_count" : "register_count"
},
"script" : {
"lang" : "expression",
"inline" : "today_count / yesterday_count"
}
}
}
}
}
}
}'
这个 pipeline aggregation 在使用上有几个要点:- pipeline agg 的 parent agg 必须是返回数组的 buckets agg 类型。我这里曾经打算使用 filter agg 直接请求register_time:["now-2d" TO "now-1d"],结果报错说找不到 buckets_path 的 START_OBJECT。所以改用了 filters agg 的数组格式。
- bucket_script agg 同样受 scripting module 的影响。也就是说,官网示例里的"script":"today_count / yesterday_count" 这种写法,是采用了 groovy 引擎的 inline 模式。在 ES 2.0 的默认设置下,是被禁止运行的!所以,应该按照 scripting module 的统一要求,改写成 file 形式存放到 config/scripts下;或者改用 Lucene Expression 运行。考虑到 pipeline aggregation 只支持数值运算,这里使用 groovy 价值不大,所以直接指明 lang 参数即可。
最终这次请求的响应如下:
{
"took" : 3,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"failed" : 0
},
"hits" : {
"total" : 3,
"max_score" : 0.0,
"hits" : [ ]
},
"aggregations" : {
"new_users" : {
"buckets" : [ {
"doc_count" : 3,
"today" : {
"doc_count" : 1,
"login_count" : {
"value" : 1
}
},
"register_count" : {
"value" : 2
},
"retention" : {
"value" : 0.5
}
} ]
}
}
}
这个 retention 数据,就是我们要求解的 0.5 了。目前我想到的比较容易达成的做法,是我们在记录用户登录操作日志的时候,把该用户的注册时间也同期输出。也就是说,这个索引的 mapping 是下面这样:
curl -XPUT 'http://127.0.0.1:9200/login-2015.12.23/' -d '{
"settings" : {
"number_of_shards" : 1
},
"mappings" : {
"logs" : {
"properties" : {
"uid" : { "type" : "string", "index" : "not_analyzed" },
"register_time" : { "type" : "date", "index" : "not_analyzed" },
"login_time" : { "type" : "date", "index" : "not_analyzed" }
}
}
}
}'
那么实际记录的日志会类似这样:{"index":{"_index":"login-2015.12.23","_type":"logs"}}
{"uid":"1","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-23T12:00:00Z"}
{"index":{"_index":"login-2015.12.23","_type":"logs"}}
{"uid":"2","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-23T12:00:00Z"}
{"index":{"_index":"login-2015.12.24","_type":"logs"}}
{"uid":"1","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-24T12:00:00Z"}
这段我虚拟的数据,表示 uid 为 1 的用户,23 号注册并登录,24 号再次登录;uid 为 2 的用户,23 号注册并登录,24 号无登录。显然以这短短 3 行示例数据,我们口算都知道单日留存率是 50% 了。那么怎么通过一次 ES 请求也算出来呢?下面就要用到 ES 2.0 新增加的 pipeline aggregation 了。
curl -XPOST 'http://127.0.0.1:9200/login-2015.12.23,login-2015.12.24/_search' -d'
{
"size" : 0,
"aggs" : {
"new_users" : {
"filters" : {
"filters" : [
{
"range" : {
"register_time" : {
"gte" : "2015-12-23",
"lt" : "2015-12-24"
}
}
}
]
},
"aggs" : {
"register_count" : {
"cardinality" : {
"field" : "uid"
}
},
"today" : {
"filter" : {
"range" : {
"login_time" : {
"gte" : "2015-12-24",
"lt" : "2015-12-25"
}
}
},
"aggs" : {
"login_count" : {
"cardinality" : {
"field" : "uid"
}
}
}
},
"retention" : {
"bucket_script" : {
"buckets_path" : {
"today_count" : "today>login_count",
"yesterday_count" : "register_count"
},
"script" : {
"lang" : "expression",
"inline" : "today_count / yesterday_count"
}
}
}
}
}
}
}'
这个 pipeline aggregation 在使用上有几个要点:- pipeline agg 的 parent agg 必须是返回数组的 buckets agg 类型。我这里曾经打算使用 filter agg 直接请求register_time:["now-2d" TO "now-1d"],结果报错说找不到 buckets_path 的 START_OBJECT。所以改用了 filters agg 的数组格式。
- bucket_script agg 同样受 scripting module 的影响。也就是说,官网示例里的"script":"today_count / yesterday_count" 这种写法,是采用了 groovy 引擎的 inline 模式。在 ES 2.0 的默认设置下,是被禁止运行的!所以,应该按照 scripting module 的统一要求,改写成 file 形式存放到 config/scripts下;或者改用 Lucene Expression 运行。考虑到 pipeline aggregation 只支持数值运算,这里使用 groovy 价值不大,所以直接指明 lang 参数即可。
最终这次请求的响应如下:
{
"took" : 3,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"failed" : 0
},
"hits" : {
"total" : 3,
"max_score" : 0.0,
"hits" : [ ]
},
"aggregations" : {
"new_users" : {
"buckets" : [ {
"doc_count" : 3,
"today" : {
"doc_count" : 1,
"login_count" : {
"value" : 1
}
},
"register_count" : {
"value" : 2
},
"retention" : {
"value" : 0.5
}
} ]
}
}
}
这个 retention 数据,就是我们要求解的 0.5 了。收起阅读 »
Day21: 如何快速把Kibana4 Discover页的Document Table导出成CSV
但是Discover页上,除了顶部的date_histogram这个visualize,更重要的是下边的search document table的内容。当我们通过搜索发现异常信息,想要长期保存证据,或者分享给其他没有权限的外部人员的时候,单纯保存search到es,或者分享单条日志的link都不顶用,还是需要能导出成一个文件。
可惜Kibana4没有针对search document table的导出!
国外一家叫MineWhat的公司,最近公开了一个非常细小的创新方案,意图解决这个问题。他们的方式是:避免修改Kibana源码,而通过chrome浏览器插件完成……
点击这个地址安装chrome插件:https://chrome.google.com/webs ... lated
然后再访问Kibana的时候,你会发现自己的搜索框最右侧多了一个CSV按钮:
然后点击这个『CSV』按钮,会弹出一片提示:
可以点击选择,把search document table内容保存到本机的复制粘贴板,还是Google Drive网盘。
我们当然选择本机……
然后打开本地的文本文件,Ctrl+V,就看到编辑器里出现了整个CSV内容。
实测下来,发现有个小问题,粘贴出来的数据里丢掉了空格~不过聊胜于无吧,还是介绍给大家一试。
注意:这个功能只会导出目前页面上已经展示出来的table内容。并不代表其使用了scroll API去ES拉取全部结果集!
但是Discover页上,除了顶部的date_histogram这个visualize,更重要的是下边的search document table的内容。当我们通过搜索发现异常信息,想要长期保存证据,或者分享给其他没有权限的外部人员的时候,单纯保存search到es,或者分享单条日志的link都不顶用,还是需要能导出成一个文件。
可惜Kibana4没有针对search document table的导出!
国外一家叫MineWhat的公司,最近公开了一个非常细小的创新方案,意图解决这个问题。他们的方式是:避免修改Kibana源码,而通过chrome浏览器插件完成……
点击这个地址安装chrome插件:https://chrome.google.com/webs ... lated
然后再访问Kibana的时候,你会发现自己的搜索框最右侧多了一个CSV按钮:
然后点击这个『CSV』按钮,会弹出一片提示:
可以点击选择,把search document table内容保存到本机的复制粘贴板,还是Google Drive网盘。
我们当然选择本机……
然后打开本地的文本文件,Ctrl+V,就看到编辑器里出现了整个CSV内容。
实测下来,发现有个小问题,粘贴出来的数据里丢掉了空格~不过聊胜于无吧,还是介绍给大家一试。
注意:这个功能只会导出目前页面上已经展示出来的table内容。并不代表其使用了scroll API去ES拉取全部结果集! 收起阅读 »
简繁体转换插件更新:elasticsearch-analysis-stconvert 升级支持2.0
项目地址:https://github.com/medcl/elast ... nvert
mvn 编译打包,拷贝release下面的zip并解压到你的es plugins目录即可,需要重启es
这个插件帮你处理简繁体,简繁体全部统一成简体或繁体,不管输入的简体还是繁体,都能得到搜索结果
比如:
不管输入的是『北京国际电视台』的还是『北京國際電視臺』都能命中。
详细配置和使用请参照上面的地址。
项目地址:https://github.com/medcl/elast ... nvert
mvn 编译打包,拷贝release下面的zip并解压到你的es plugins目录即可,需要重启es
这个插件帮你处理简繁体,简繁体全部统一成简体或繁体,不管输入的简体还是繁体,都能得到搜索结果
比如:
不管输入的是『北京国际电视台』的还是『北京國際電視臺』都能命中。
详细配置和使用请参照上面的地址。
收起阅读 »
Day20 利用tcpdump和kafka协议定位不合法topic的来源
因为在topic名字里面不能含有%字符, 所以kafka server的日志里面大量报错. Logstash每发一次数据, kafka就会生成下面一大段错误
[2015-12-23 23:20:47,749] ERROR [KafkaApi-0] error when handling request Name: TopicMetadataRequest; Version: 0; CorrelationId: 48; ClientId: ; Topics: test-%{type} (kafka.server.KafkaApis)
kafka.common.InvalidTopicException: topic name test-%{type} is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'
at kafka.common.Topic$.validate(Topic.scala:42)
at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:181)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172)
at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:520)
at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
at scala.collection.SetLike$class.map(SetLike.scala:93)
at scala.collection.AbstractSet.map(Set.scala:47)
at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503)
at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542)
at kafka.server.KafkaApis.handle(KafkaApis.scala:62)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
at java.lang.Thread.run(Thread.java:744)
把可用的信息瞬间淹没.
更不幸的是, 错误日志里面并没有客户来源的信息, 根本不知道是哪些机器还有问题.
我想做的, 就是把有问题的logstash机器找出来.
我就先事后诸葛亮一把, 用下面这个命令就可以把配置错误的机器找出来(也可以没有任何结果, 原因后面说)
tcpdump -nn 'dst port 9092 and tcp[37]==3 and tcp[57]==37'
dst port 9092就不说了, 这是kafka的默认端口, 后面的tcp[37]==3 and tcp[57]==37是啥意思呢, 我们慢慢说.
先要说一下: client要生产数据到kafka, 在发送消息之前, 首先得向kafka"询问"这个topic的metadata信息, 包括有几个partiton, 每个parttion在哪个服务器上面等信息, 拿到这些信息之后, 才能把消息发到正确的kafka服务器上.
重点来了! 向kafka"询问"topic的metadata, 其实就是发送一个tcp包过去, 我们需要知道的是这个tcp包的格式. 我已经帮你找到了, 就在这里 https://cwiki.apache.org/confl ... quest
看完文档之后(半小时或者更长时间过去了), 你就会知道, tcp body(除去tcp head)里面的第6个字节是03, 代表这是一个TopicMetadataRequest请求. topicname里面的%字符出现在tcp body的第26个字节, %的ascii码是37
tcp头一般是20个字符, 所以加上这20个字节, 然后下标从0算起, 就是tcp[20+5]==3 and tcp[20+25]==37, 也就是tcp[25]==3 and tcp[45]==37.
咦, 为啥和开始写的那个过滤条件不一样呢, 因为tcp头"一般"是20字节, 但是如果其中还包含了tcp选项的话, 就可能比20多了. 反正我这里看到的的tcp头都是32个字节, 所以不能加20, 要加32, 也就是最开始写的 tcp[37]==3 and tcp[57]==37
最后呢, 再提2点结束.
1. 终极大杀器, 不过tcp头的长度是多少, 20也好, 32也好, 或者其他也好, 下面这样都能搞定
tcpdump -nn 'dst port 9092 and tcp[(tcp[12]>>2)+5]==3 and tcp[(tcp[12]>>2)+25]==37'
2. 不要一上来就这么高端, 其实我最开始是这样先确定问题的
tcpdump -vv -nn -X -s 0 dst port 9092 | grep -C 5 "test-"
你问我为啥不把test-t{type}写完整? 不是为了省事, 其实是因为很不幸, test-%{t 到这里的时候, 正好换行了.
因为在topic名字里面不能含有%字符, 所以kafka server的日志里面大量报错. Logstash每发一次数据, kafka就会生成下面一大段错误
[2015-12-23 23:20:47,749] ERROR [KafkaApi-0] error when handling request Name: TopicMetadataRequest; Version: 0; CorrelationId: 48; ClientId: ; Topics: test-%{type} (kafka.server.KafkaApis)
kafka.common.InvalidTopicException: topic name test-%{type} is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'
at kafka.common.Topic$.validate(Topic.scala:42)
at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:181)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172)
at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:520)
at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
at scala.collection.SetLike$class.map(SetLike.scala:93)
at scala.collection.AbstractSet.map(Set.scala:47)
at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503)
at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542)
at kafka.server.KafkaApis.handle(KafkaApis.scala:62)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
at java.lang.Thread.run(Thread.java:744)
把可用的信息瞬间淹没.
更不幸的是, 错误日志里面并没有客户来源的信息, 根本不知道是哪些机器还有问题.
我想做的, 就是把有问题的logstash机器找出来.
我就先事后诸葛亮一把, 用下面这个命令就可以把配置错误的机器找出来(也可以没有任何结果, 原因后面说)
tcpdump -nn 'dst port 9092 and tcp[37]==3 and tcp[57]==37'
dst port 9092就不说了, 这是kafka的默认端口, 后面的tcp[37]==3 and tcp[57]==37是啥意思呢, 我们慢慢说.
先要说一下: client要生产数据到kafka, 在发送消息之前, 首先得向kafka"询问"这个topic的metadata信息, 包括有几个partiton, 每个parttion在哪个服务器上面等信息, 拿到这些信息之后, 才能把消息发到正确的kafka服务器上.
重点来了! 向kafka"询问"topic的metadata, 其实就是发送一个tcp包过去, 我们需要知道的是这个tcp包的格式. 我已经帮你找到了, 就在这里 https://cwiki.apache.org/confl ... quest
看完文档之后(半小时或者更长时间过去了), 你就会知道, tcp body(除去tcp head)里面的第6个字节是03, 代表这是一个TopicMetadataRequest请求. topicname里面的%字符出现在tcp body的第26个字节, %的ascii码是37
tcp头一般是20个字符, 所以加上这20个字节, 然后下标从0算起, 就是tcp[20+5]==3 and tcp[20+25]==37, 也就是tcp[25]==3 and tcp[45]==37.
咦, 为啥和开始写的那个过滤条件不一样呢, 因为tcp头"一般"是20字节, 但是如果其中还包含了tcp选项的话, 就可能比20多了. 反正我这里看到的的tcp头都是32个字节, 所以不能加20, 要加32, 也就是最开始写的 tcp[37]==3 and tcp[57]==37
最后呢, 再提2点结束.
1. 终极大杀器, 不过tcp头的长度是多少, 20也好, 32也好, 或者其他也好, 下面这样都能搞定
tcpdump -nn 'dst port 9092 and tcp[(tcp[12]>>2)+5]==3 and tcp[(tcp[12]>>2)+25]==37'
2. 不要一上来就这么高端, 其实我最开始是这样先确定问题的
tcpdump -vv -nn -X -s 0 dst port 9092 | grep -C 5 "test-"
你问我为啥不把test-t{type}写完整? 不是为了省事, 其实是因为很不幸, test-%{t 到这里的时候, 正好换行了. 收起阅读 »
Day19 ES内存那点事
注: 本文主要针对ES 2.x。
“该给ES分配多少内存?”
“JVM参数如何优化?“
“为何我的Heap占用这么高?”
“为何经常有某个field的数据量超出内存限制的异常?“
“为何感觉上没多少数据,也会经常Out Of Memory?”
以上问题,显然没有一个统一的数学公式能够给出答案。 和数据库类似,ES对于内存的消耗,和很多因素相关,诸如数据总量、mapping设置、查询方式、查询频度等等。默认的设置虽开箱即用,但不能适用每一种使用场景。作为ES的开发、运维人员,如果不了解ES对内存使用的一些基本原理,就很难针对特有的应用场景,有效的测试、规划和管理集群,从而踩到各种坑,被各种问题挫败。
要理解ES如何使用内存,先要理解下面两个基本事实:
1. ES是JAVA应用
2. 底层存储引擎是基于Lucene的
看似很普通是吗?但其实没多少人真正理解这意味着什么。
首先,作为一个JAVA应用,就脱离不开JVM和GC。很多人上手ES的时候,对GC一点概念都没有就去网上抄各种JVM“优化”参数,却仍然被heap不够用,内存溢出这样的问题搞得焦头烂额。了解JVM GC的概念和基本工作机制是很有必要的,本文不在此做过多探讨,读者可以自行Google相关资料进行学习。如何知道ES heap是否真的有压力了? 推荐阅读这篇博客:Understanding Memory Pressure Indicator。 即使对于JVM GC机制不够熟悉,头脑里还是需要有这么一个基本概念: 应用层面生成大量长生命周期的对象,是给heap造成压力的主要原因,例如读取一大片数据在内存中进行排序,或者在heap内部建cache缓存大量数据。如果GC释放的空间有限,而应用层面持续大量申请新对象,GC频度就开始上升,同时会消耗掉很多CPU时间。严重时可能恶性循环,导致整个集群停工。因此在使用ES的过程中,要知道哪些设置和操作容易造成以上问题,有针对性的予以规避。
其次,Lucene的倒排索引(Inverted Index)是先在内存里生成,然后定期以段文件(segment file)的形式刷到磁盘的。每个段实际就是一个完整的倒排索引,并且一旦写到磁盘上就不会做修改。 API层面的文档更新和删除实际上是增量写入的一种特殊文档,会保存在新的段里。不变的段文件易于被操作系统cache,热数据几乎等效于内存访问。
基于以上2个基本事实,我们不难理解,为何官方建议的heap size不要超过系统可用内存的一半。heap以外的内存并不会被浪费,操作系统会很开心的利用他们来cache被用读取过的段文件。
Heap分配多少合适?遵从官方建议就没错。 不要超过系统可用内存的一半,并且不要超过32GB。JVM参数呢?对于初级用户来说,并不需要做特别调整,仍然遵从官方的建议,将xms和xmx设置成和heap一样大小,避免动态分配heap size就好了。虽然有针对性的调整JVM参数可以带来些许GC效率的提升,当有一些“坏”用例的时候,这些调整并不会有什么魔法效果帮你减轻heap压力,甚至可能让问题更糟糕。
那么,ES的heap是如何被瓜分掉的? 说几个我知道的内存消耗大户并分别做解读:
1. segment memory
2. filter cache
3. field data cache
4. bulk queue
5. indexing buffer
6. state buffer
7. 超大搜索聚合结果集的fetch
8. 对高cardinality字段做terms aggregation
Segment Memory
Segment不是file吗?segment memory又是什么?前面提到过,一个segment是一个完备的lucene倒排索引,而倒排索引是通过词典 (Term Dictionary)到文档列表(Postings List)的映射关系,快速做查询的。 由于词典的size会很大,全部装载到heap里不现实,因此Lucene为词典做了一层前缀索引(Term Index),这个索引在Lucene4.0以后采用的数据结构是FST (Finite State Transducer)。 这种数据结构占用空间很小,Lucene打开索引的时候将其全量装载到内存中,加快磁盘上词典查询速度的同时减少随机磁盘访问次数。
下面是词典索引和词典主存储之间的一个对应关系图:
Lucene file的完整数据结构参见Apache Lucene - Index File Formats
说了这么多,要传达的一个意思就是,ES的data node存储数据并非只是耗费磁盘空间的,为了加速数据的访问,每个segment都有会一些索引数据驻留在heap里。因此segment越多,瓜分掉的heap也越多,并且这部分heap是无法被GC掉的! 理解这点对于监控和管理集群容量很重要,当一个node的segment memory占用过多的时候,就需要考虑删除、归档数据,或者扩容了。
怎么知道segment memory占用情况呢? CAT API可以给出答案。
1. 查看一个索引所有segment的memory占用情况:
2. 查看一个node上所有segment占用的memory总和:
那么有哪些途径减少data node上的segment memory占用呢? 总结起来有三种方法:
1. 删除不用的索引
2. 关闭索引 (文件仍然存在于磁盘,只是释放掉内存)。需要的时候可以重新打开。
3. 定期对不再更新的索引做optimize (ES2.0以后更改为force merge api)。这Optimze的实质是对segment file强制做合并,可以节省大量的segment memory。
Filter Cache (5.x里叫做Request cache)
Filter cache是用来缓存使用过的filter的结果集的,需要注意的是这个缓存也是常驻heap,在被evict掉之前,是无法被GC的。我的经验是默认的10% heap设置工作得够好了,如果实际使用中heap没什么压力的情况下,才考虑加大这个设置。
Field Data cache
在有大量排序、数据聚合的应用场景,可以说field data cache是性能和稳定性的杀手。 对搜索结果做排序或者聚合操作,需要将倒排索引里的数据进行解析,按列构造成docid->value的形式才能够做后续快速计算。 对于数据量很大的索引,这个构造过程会非常耗费时间,因此ES 2.0以前的版本会将构造好的数据缓存起来,提升性能。但是由于heap空间有限,当遇到用户对海量数据做计算的时候,就很容易导致heap吃紧,集群频繁GC,根本无法完成计算过程。 ES2.0以后,正式默认启用Doc Values特性(1.x需要手动更改mapping开启),将field data在indexing time构建在磁盘上,经过一系列优化,可以达到比之前采用field data cache机制更好的性能。因此需要限制对field data cache的使用,最好是完全不用,可以极大释放heap压力。 需要注意的是,很多同学已经升级到ES2.0,或者1.0里已经设置mapping启用了doc values,在kibana里仍然会遇到问题。 这里一个陷阱就在于kibana的table panel可以对所有字段排序。 设想如果有一个字段是analyzed过的,而用户去点击对应字段的排序表头是什么后果? 一来排序的结果并不是用户想要的,排序的对象实际是词典; 二来analyzed过的字段无法利用doc values,需要装载到field data cache,数据量很大的情况下可能集群就在忙着GC或者根本出不来结果。
Bulk Queue
一般来说,Bulk queue不会消耗很多的heap,但是见过一些用户为了提高bulk的速度,客户端设置了很大的并发量,并且将bulk Queue设置到不可思议的大,比如好几千。 Bulk Queue是做什么用的?当所有的bulk thread都在忙,无法响应新的bulk request的时候,将request在内存里排列起来,然后慢慢清掉。 这在应对短暂的请求爆发的时候有用,但是如果集群本身索引速度一直跟不上,设置的好几千的queue都满了会是什么状况呢? 取决于一个bulk的数据量大小,乘上queue的大小,heap很有可能就不够用,内存溢出了。一般来说官方默认的thread pool设置已经能很好的工作了,建议不要随意去“调优”相关的设置,很多时候都是适得其反的效果。
Indexing Buffer
Indexing Buffer是用来缓存新数据,当其满了或者refresh/flush interval到了,就会以segment file的形式写入到磁盘。 这个参数的默认值是10% heap size。根据经验,这个默认值也能够很好的工作,应对很大的索引吞吐量。 但有些用户认为这个buffer越大吞吐量越高,因此见过有用户将其设置为40%的。到了极端的情况,写入速度很高的时候,40%都被占用,导致OOM。
Cluster State Buffer
ES被设计成每个node都可以响应用户的api请求,因此每个node的内存里都包含有一份集群状态的拷贝。这个cluster state包含诸如集群有多少个node,多少个index,每个index的mapping是什么?有少shard,每个shard的分配情况等等 (ES有各类stats api获取这类数据)。 在一个规模很大的集群,这个状态信息可能会非常大的,耗用的内存空间就不可忽视了。并且在ES2.0之前的版本,state的更新是由master node做完以后全量散播到其他结点的。 频繁的状态更新就可以给heap带来很大的压力。 在超大规模集群的情况下,可以考虑分集群并通过tribe node连接做到对用户api的透明,这样可以保证每个集群里的state信息不会膨胀得过大。
超大搜索聚合结果集的fetch
ES是分布式搜索引擎,搜索和聚合计算除了在各个data node并行计算以外,还需要将结果返回给汇总节点进行汇总和排序后再返回。无论是搜索,还是聚合,如果返回结果的size设置过大,都会给heap造成很大的压力,特别是数据汇聚节点。超大的size多数情况下都是用户用例不对,比如本来是想计算cardinality,却用了terms aggregation + size:0这样的方式; 对大结果集做深度分页;一次性拉取全量数据等等。
对高cardinality字段做terms aggregation
所谓高cardinality,就是该字段的唯一值比较多。 比如client ip,可能存在上千万甚至上亿的不同值。 对这种类型的字段做terms aggregation时,需要在内存里生成海量的分桶,内存需求会非常高。如果内部再嵌套有其他聚合,情况会更糟糕。 在做日志聚合分析时,一个典型的可以引起性能问题的场景,就是对带有参数的url字段做terms aggregation。 对于访问量大的网站,带有参数的url字段cardinality可能会到数亿,做一次terms aggregation内存开销巨大,然而对带有参数的url字段做聚合通常没有什么意义。 对于这类问题,可以额外索引一个url_stem字段,这个字段索引剥离掉参数部分的url。可以极大降低内存消耗,提高聚合速度。
小结:
- 倒排词典的索引需要常驻内存,无法GC,需要监控data node上segment memory增长趋势。
- 各类缓存,field cache, filter cache, indexing cache, bulk queue等等,要设置合理的大小,并且要应该根据最坏的情况来看heap是否够用,也就是各类缓存全部占满的时候,还有heap空间可以分配给其他任务吗?避免采用clear cache等“自欺欺人”的方式来释放内存。
- 避免返回大量结果集的搜索与聚合。确实需要大量拉取数据的场景,可以采用scan & scroll api来实现。
- cluster stats驻留内存并无法水平扩展,超大规模集群可以考虑分拆成多个集群通过tribe node连接。
- 想知道heap够不够,必须结合实际应用场景,并对集群的heap使用情况做持续的监控。
- 根据监控数据理解内存需求,合理配置各类circuit breaker,将内存溢出风险降低到最低。
注: 本文主要针对ES 2.x。
“该给ES分配多少内存?”
“JVM参数如何优化?“
“为何我的Heap占用这么高?”
“为何经常有某个field的数据量超出内存限制的异常?“
“为何感觉上没多少数据,也会经常Out Of Memory?”
以上问题,显然没有一个统一的数学公式能够给出答案。 和数据库类似,ES对于内存的消耗,和很多因素相关,诸如数据总量、mapping设置、查询方式、查询频度等等。默认的设置虽开箱即用,但不能适用每一种使用场景。作为ES的开发、运维人员,如果不了解ES对内存使用的一些基本原理,就很难针对特有的应用场景,有效的测试、规划和管理集群,从而踩到各种坑,被各种问题挫败。
要理解ES如何使用内存,先要理解下面两个基本事实:
1. ES是JAVA应用
2. 底层存储引擎是基于Lucene的
看似很普通是吗?但其实没多少人真正理解这意味着什么。
首先,作为一个JAVA应用,就脱离不开JVM和GC。很多人上手ES的时候,对GC一点概念都没有就去网上抄各种JVM“优化”参数,却仍然被heap不够用,内存溢出这样的问题搞得焦头烂额。了解JVM GC的概念和基本工作机制是很有必要的,本文不在此做过多探讨,读者可以自行Google相关资料进行学习。如何知道ES heap是否真的有压力了? 推荐阅读这篇博客:Understanding Memory Pressure Indicator。 即使对于JVM GC机制不够熟悉,头脑里还是需要有这么一个基本概念: 应用层面生成大量长生命周期的对象,是给heap造成压力的主要原因,例如读取一大片数据在内存中进行排序,或者在heap内部建cache缓存大量数据。如果GC释放的空间有限,而应用层面持续大量申请新对象,GC频度就开始上升,同时会消耗掉很多CPU时间。严重时可能恶性循环,导致整个集群停工。因此在使用ES的过程中,要知道哪些设置和操作容易造成以上问题,有针对性的予以规避。
其次,Lucene的倒排索引(Inverted Index)是先在内存里生成,然后定期以段文件(segment file)的形式刷到磁盘的。每个段实际就是一个完整的倒排索引,并且一旦写到磁盘上就不会做修改。 API层面的文档更新和删除实际上是增量写入的一种特殊文档,会保存在新的段里。不变的段文件易于被操作系统cache,热数据几乎等效于内存访问。
基于以上2个基本事实,我们不难理解,为何官方建议的heap size不要超过系统可用内存的一半。heap以外的内存并不会被浪费,操作系统会很开心的利用他们来cache被用读取过的段文件。
Heap分配多少合适?遵从官方建议就没错。 不要超过系统可用内存的一半,并且不要超过32GB。JVM参数呢?对于初级用户来说,并不需要做特别调整,仍然遵从官方的建议,将xms和xmx设置成和heap一样大小,避免动态分配heap size就好了。虽然有针对性的调整JVM参数可以带来些许GC效率的提升,当有一些“坏”用例的时候,这些调整并不会有什么魔法效果帮你减轻heap压力,甚至可能让问题更糟糕。
那么,ES的heap是如何被瓜分掉的? 说几个我知道的内存消耗大户并分别做解读:
1. segment memory
2. filter cache
3. field data cache
4. bulk queue
5. indexing buffer
6. state buffer
7. 超大搜索聚合结果集的fetch
8. 对高cardinality字段做terms aggregation
Segment Memory
Segment不是file吗?segment memory又是什么?前面提到过,一个segment是一个完备的lucene倒排索引,而倒排索引是通过词典 (Term Dictionary)到文档列表(Postings List)的映射关系,快速做查询的。 由于词典的size会很大,全部装载到heap里不现实,因此Lucene为词典做了一层前缀索引(Term Index),这个索引在Lucene4.0以后采用的数据结构是FST (Finite State Transducer)。 这种数据结构占用空间很小,Lucene打开索引的时候将其全量装载到内存中,加快磁盘上词典查询速度的同时减少随机磁盘访问次数。
下面是词典索引和词典主存储之间的一个对应关系图:
Lucene file的完整数据结构参见Apache Lucene - Index File Formats
说了这么多,要传达的一个意思就是,ES的data node存储数据并非只是耗费磁盘空间的,为了加速数据的访问,每个segment都有会一些索引数据驻留在heap里。因此segment越多,瓜分掉的heap也越多,并且这部分heap是无法被GC掉的! 理解这点对于监控和管理集群容量很重要,当一个node的segment memory占用过多的时候,就需要考虑删除、归档数据,或者扩容了。
怎么知道segment memory占用情况呢? CAT API可以给出答案。
1. 查看一个索引所有segment的memory占用情况:
2. 查看一个node上所有segment占用的memory总和:
那么有哪些途径减少data node上的segment memory占用呢? 总结起来有三种方法:
1. 删除不用的索引
2. 关闭索引 (文件仍然存在于磁盘,只是释放掉内存)。需要的时候可以重新打开。
3. 定期对不再更新的索引做optimize (ES2.0以后更改为force merge api)。这Optimze的实质是对segment file强制做合并,可以节省大量的segment memory。
Filter Cache (5.x里叫做Request cache)
Filter cache是用来缓存使用过的filter的结果集的,需要注意的是这个缓存也是常驻heap,在被evict掉之前,是无法被GC的。我的经验是默认的10% heap设置工作得够好了,如果实际使用中heap没什么压力的情况下,才考虑加大这个设置。
Field Data cache
在有大量排序、数据聚合的应用场景,可以说field data cache是性能和稳定性的杀手。 对搜索结果做排序或者聚合操作,需要将倒排索引里的数据进行解析,按列构造成docid->value的形式才能够做后续快速计算。 对于数据量很大的索引,这个构造过程会非常耗费时间,因此ES 2.0以前的版本会将构造好的数据缓存起来,提升性能。但是由于heap空间有限,当遇到用户对海量数据做计算的时候,就很容易导致heap吃紧,集群频繁GC,根本无法完成计算过程。 ES2.0以后,正式默认启用Doc Values特性(1.x需要手动更改mapping开启),将field data在indexing time构建在磁盘上,经过一系列优化,可以达到比之前采用field data cache机制更好的性能。因此需要限制对field data cache的使用,最好是完全不用,可以极大释放heap压力。 需要注意的是,很多同学已经升级到ES2.0,或者1.0里已经设置mapping启用了doc values,在kibana里仍然会遇到问题。 这里一个陷阱就在于kibana的table panel可以对所有字段排序。 设想如果有一个字段是analyzed过的,而用户去点击对应字段的排序表头是什么后果? 一来排序的结果并不是用户想要的,排序的对象实际是词典; 二来analyzed过的字段无法利用doc values,需要装载到field data cache,数据量很大的情况下可能集群就在忙着GC或者根本出不来结果。
Bulk Queue
一般来说,Bulk queue不会消耗很多的heap,但是见过一些用户为了提高bulk的速度,客户端设置了很大的并发量,并且将bulk Queue设置到不可思议的大,比如好几千。 Bulk Queue是做什么用的?当所有的bulk thread都在忙,无法响应新的bulk request的时候,将request在内存里排列起来,然后慢慢清掉。 这在应对短暂的请求爆发的时候有用,但是如果集群本身索引速度一直跟不上,设置的好几千的queue都满了会是什么状况呢? 取决于一个bulk的数据量大小,乘上queue的大小,heap很有可能就不够用,内存溢出了。一般来说官方默认的thread pool设置已经能很好的工作了,建议不要随意去“调优”相关的设置,很多时候都是适得其反的效果。
Indexing Buffer
Indexing Buffer是用来缓存新数据,当其满了或者refresh/flush interval到了,就会以segment file的形式写入到磁盘。 这个参数的默认值是10% heap size。根据经验,这个默认值也能够很好的工作,应对很大的索引吞吐量。 但有些用户认为这个buffer越大吞吐量越高,因此见过有用户将其设置为40%的。到了极端的情况,写入速度很高的时候,40%都被占用,导致OOM。
Cluster State Buffer
ES被设计成每个node都可以响应用户的api请求,因此每个node的内存里都包含有一份集群状态的拷贝。这个cluster state包含诸如集群有多少个node,多少个index,每个index的mapping是什么?有少shard,每个shard的分配情况等等 (ES有各类stats api获取这类数据)。 在一个规模很大的集群,这个状态信息可能会非常大的,耗用的内存空间就不可忽视了。并且在ES2.0之前的版本,state的更新是由master node做完以后全量散播到其他结点的。 频繁的状态更新就可以给heap带来很大的压力。 在超大规模集群的情况下,可以考虑分集群并通过tribe node连接做到对用户api的透明,这样可以保证每个集群里的state信息不会膨胀得过大。
超大搜索聚合结果集的fetch
ES是分布式搜索引擎,搜索和聚合计算除了在各个data node并行计算以外,还需要将结果返回给汇总节点进行汇总和排序后再返回。无论是搜索,还是聚合,如果返回结果的size设置过大,都会给heap造成很大的压力,特别是数据汇聚节点。超大的size多数情况下都是用户用例不对,比如本来是想计算cardinality,却用了terms aggregation + size:0这样的方式; 对大结果集做深度分页;一次性拉取全量数据等等。
对高cardinality字段做terms aggregation
所谓高cardinality,就是该字段的唯一值比较多。 比如client ip,可能存在上千万甚至上亿的不同值。 对这种类型的字段做terms aggregation时,需要在内存里生成海量的分桶,内存需求会非常高。如果内部再嵌套有其他聚合,情况会更糟糕。 在做日志聚合分析时,一个典型的可以引起性能问题的场景,就是对带有参数的url字段做terms aggregation。 对于访问量大的网站,带有参数的url字段cardinality可能会到数亿,做一次terms aggregation内存开销巨大,然而对带有参数的url字段做聚合通常没有什么意义。 对于这类问题,可以额外索引一个url_stem字段,这个字段索引剥离掉参数部分的url。可以极大降低内存消耗,提高聚合速度。
小结:
- 倒排词典的索引需要常驻内存,无法GC,需要监控data node上segment memory增长趋势。
- 各类缓存,field cache, filter cache, indexing cache, bulk queue等等,要设置合理的大小,并且要应该根据最坏的情况来看heap是否够用,也就是各类缓存全部占满的时候,还有heap空间可以分配给其他任务吗?避免采用clear cache等“自欺欺人”的方式来释放内存。
- 避免返回大量结果集的搜索与聚合。确实需要大量拉取数据的场景,可以采用scan & scroll api来实现。
- cluster stats驻留内存并无法水平扩展,超大规模集群可以考虑分拆成多个集群通过tribe node连接。
- 想知道heap够不够,必须结合实际应用场景,并对集群的heap使用情况做持续的监控。
- 根据监控数据理解内存需求,合理配置各类circuit breaker,将内存溢出风险降低到最低。
收起阅读 »
Day18: 程序内的消息流:ArrayBlockingQueue和zeromq对比
在我写hangout的第一个版本,也是这么做的,用ArrayBlockingQueue来中转消息, 上游几个线程把消息放在queue中, 下游再几个线程把queue中的消息消费走.
但是, 用下来之后, 发现在queue上面消耗的资源是相当的大,strace查看,非常大量的lock相关的系统调用, 现在的版本已经把queue去掉了. 想必Logstash也会有大量资源用在这一块.
zeromq中的Parallel Pipeline正好适合这个场景,而且文档中说是lock free的, 拿来和queue对比一下看.
在我自己的电脑上测试,2.6 GHz Intel Core i5. 一个主线程生成10,000,000个随机数, 分发给四个线程消费.
用Queue来实现, 需要约37秒, CPU使用率在150%. 用zeromq的ipc来传递消息, 只需要22秒, 期间CPU使用率在250%. 总的CPU使用时间都60秒左右.
不知道java中还有没有更合适的Queue可以用在这个场景中.至少zeromq和ArrayBlockingQueue相比, zeromq可以更快的处理消息, 但代价就是更高的CPU使用率.
在我写hangout的第一个版本,也是这么做的,用ArrayBlockingQueue来中转消息, 上游几个线程把消息放在queue中, 下游再几个线程把queue中的消息消费走.
但是, 用下来之后, 发现在queue上面消耗的资源是相当的大,strace查看,非常大量的lock相关的系统调用, 现在的版本已经把queue去掉了. 想必Logstash也会有大量资源用在这一块.
zeromq中的Parallel Pipeline正好适合这个场景,而且文档中说是lock free的, 拿来和queue对比一下看.
在我自己的电脑上测试,2.6 GHz Intel Core i5. 一个主线程生成10,000,000个随机数, 分发给四个线程消费.
用Queue来实现, 需要约37秒, CPU使用率在150%. 用zeromq的ipc来传递消息, 只需要22秒, 期间CPU使用率在250%. 总的CPU使用时间都60秒左右.
不知道java中还有没有更合适的Queue可以用在这个场景中.至少zeromq和ArrayBlockingQueue相比, zeromq可以更快的处理消息, 但代价就是更高的CPU使用率. 收起阅读 »
Day17: "奇怪"的搜索
除了应用在日志系统外, 越来越多的业务数据也接入ES, 利用它天生强大的搜索性能和分布式可扩展, 可以为业务的精确快速灵活的搜索提供极大便利, 我觉得这是未来一个很好的方向.
但是, 对它ES各种各样的搜索方式, 你了解了吗?
我们来看几个”奇怪”的搜索.
## 奇怪的打分
### 奇怪的打分1
我们有个数据结构是
{
“first_name”:”string”,
“last_name”:”string”
}
插入了几条数据, 有诸葛亮 诸葛明 诸葛暗 诸葛黑, 还有个人名字很奇怪, 叫司马诸葛.
然后我们要搜索诸葛瑾, 虽然索引里面没有一个人叫这个名字, 但搜索出来诸葛亮也不错, 他们名字这么像, 说不定是亲兄弟, 可以顺藤摸瓜, 找到我们需要的信息呢.
{
"query": {
"multi_match": {
"query": "诸葛瑜",
"type": "most_fields",
"fields": [ “*_name” ]
}
}
}
但实际上呢, 司马诸葛这个人居然稳居搜索榜首位, 他是搞竞价排名了吧? 你知道其中的打分原理吗?
### 奇怪的打分2
我们有两条数据:
PUT /my_index/my_type/1
{
"title": "Quick brown rabbits",
"body": "Brown rabbits are commonly seen."
}
PUT /my_index/my_type/2
{
"title": "Keeping pets healthy",
"body": "My quick brown fox eats rabbits on a regular basis."
}
要搜索{
"query": {
"bool": {
"should": [
{ "match": { "title": "Brown fox" }},
{ "match": { "body": "Brown fox" }}
]
}
}
}
第二条文档里面明确含有”brown fox”这个词组, 但是它的搜索得分比较低, 你知道为啥吗?## and用在哪
{
"query": {
"multi_match": {
"query": "peter smith",
"type": "most_fields",
"operator": "and",
"fields": [ "first_name", "last_name" ]
}
}
}
你知道这个and代表什么吗?是说
A: 姓和名里面都要含有"peter smith”,
还是说
B: 姓或者名里面要包含peter以及smith ?
还有, 怎么才能获得另外一个效果呢?
# 列表中的元素
我们有一条数据如下(按汉语分词)
{
“时代”:”三国”,
“姓名”: [“大司马”,“诸葛亮”]
}
我以词组的方式搜索:{
"query": {
"match_phrase": {
"姓名": "司马诸葛"
}
}
}
能搜索到吗?上面这些其实都是[elasticsearch Definitive Guide](https://www.elastic.co/guide)里面的几个小例子, 欢迎大家继续去那里寻找答案和其他各种小技巧.
除了应用在日志系统外, 越来越多的业务数据也接入ES, 利用它天生强大的搜索性能和分布式可扩展, 可以为业务的精确快速灵活的搜索提供极大便利, 我觉得这是未来一个很好的方向.
但是, 对它ES各种各样的搜索方式, 你了解了吗?
我们来看几个”奇怪”的搜索.
## 奇怪的打分
### 奇怪的打分1
我们有个数据结构是
{
“first_name”:”string”,
“last_name”:”string”
}
插入了几条数据, 有诸葛亮 诸葛明 诸葛暗 诸葛黑, 还有个人名字很奇怪, 叫司马诸葛.
然后我们要搜索诸葛瑾, 虽然索引里面没有一个人叫这个名字, 但搜索出来诸葛亮也不错, 他们名字这么像, 说不定是亲兄弟, 可以顺藤摸瓜, 找到我们需要的信息呢.
{
"query": {
"multi_match": {
"query": "诸葛瑜",
"type": "most_fields",
"fields": [ “*_name” ]
}
}
}
但实际上呢, 司马诸葛这个人居然稳居搜索榜首位, 他是搞竞价排名了吧? 你知道其中的打分原理吗?
### 奇怪的打分2
我们有两条数据:
PUT /my_index/my_type/1
{
"title": "Quick brown rabbits",
"body": "Brown rabbits are commonly seen."
}
PUT /my_index/my_type/2
{
"title": "Keeping pets healthy",
"body": "My quick brown fox eats rabbits on a regular basis."
}
要搜索{
"query": {
"bool": {
"should": [
{ "match": { "title": "Brown fox" }},
{ "match": { "body": "Brown fox" }}
]
}
}
}
第二条文档里面明确含有”brown fox”这个词组, 但是它的搜索得分比较低, 你知道为啥吗?## and用在哪
{
"query": {
"multi_match": {
"query": "peter smith",
"type": "most_fields",
"operator": "and",
"fields": [ "first_name", "last_name" ]
}
}
}
你知道这个and代表什么吗?是说
A: 姓和名里面都要含有"peter smith”,
还是说
B: 姓或者名里面要包含peter以及smith ?
还有, 怎么才能获得另外一个效果呢?
# 列表中的元素
我们有一条数据如下(按汉语分词)
{
“时代”:”三国”,
“姓名”: [“大司马”,“诸葛亮”]
}
我以词组的方式搜索:{
"query": {
"match_phrase": {
"姓名": "司马诸葛"
}
}
}
能搜索到吗?上面这些其实都是[elasticsearch Definitive Guide](https://www.elastic.co/guide)里面的几个小例子, 欢迎大家继续去那里寻找答案和其他各种小技巧.
收起阅读 »
day16 logstash-forwader To Kakfa!
在日志收集系统中, 从kafkf到ES这条路是没问题了, 但散布在各个服务器上采集日志的agent用logstash实在是太重了, 而且效率也低. 特别是我们有大量的windows服务器, 找一个合适的agent居然不是想象中的容易.
logstash-forwarder对于日志文件的探测和offset记录, deadtime等配置都非常适合我们, 但惟一不支持吐数据到kafak,对我们来说是一个遗憾. 我和oliver(https://github.com/oliveagle)做过一点改造之后, 让她支持了这个功能.
目前我们所有iis服务器已经部署了这个应用, 效率高, 占资源小, 可以数据压缩, 支持简单的格式切割, 实乃windows居家必备(我真不是来打广告的). golang客户端, 还能直接发送到kafka, 想想就很贴心~
贴上一段配置瞅瞅先, 启一个进程采集nginx和tomcat日志, 分别吐到kafka的2个topic中.
{
"files": [
{
"paths": [
"/var/log/nginx/*.log"
],
"Fields":{
"type":"nginx"
},
"DeadTime": "30m"
},
{
"paths": [
"/var/log/tomcat/*.log",
"/var/log/tomcat/*/*.log"
],
"Fields":{
"type":"tomcat"
},
"DeadTime": "30m"
}
],
"kafka": {
"broker_list": ["10.0.0.1:9092","10.0.0.2:9092"],
"topic_id": "topic_name_change_it_{{.type}}",
"compression_codec": "gzip"
}
}
再简单介绍一下参数吧,
- DeadTime:30m 是说超过30分钟没有更新, 就不会再继续跟踪这个文件了(退出goroutine)
- “Fields”:{ “type”:”tomcat” } , 会在每条日志中增加配置的字段
- path目前就是用的golang官方库, 好像是还不支持递归多层目录查找, 反正我翻了一下文档, 没有找到.
grok还不支持, 但简单的分割是可以的
"files": [
{
"paths": [
"d:\\target.txt"
],
"FieldNames": ["datetime", "datetime", "s_ip", "cs_method", "cs_uri_stem", "cs_uri_query", "s_port", "time_taken"],
"Delimiter": "\\s+",
"QuoteChar": "\""
}
]
以上配置就是说按空白符把日志切割来, 塞到对应的字段中去. 第一个第二个合在一起, 放在datetime字段中.
其实还是有不少要完善的地方, 比如说没有带上机器的Hostname, 以及日志的路径. 在很多时候, 这些信息还是很有用的, 我们也会继续完善.
现在放在了https://github.com/childe/logs ... kafka, 有需要的同学,可以去看下.
在日志收集系统中, 从kafkf到ES这条路是没问题了, 但散布在各个服务器上采集日志的agent用logstash实在是太重了, 而且效率也低. 特别是我们有大量的windows服务器, 找一个合适的agent居然不是想象中的容易.
logstash-forwarder对于日志文件的探测和offset记录, deadtime等配置都非常适合我们, 但惟一不支持吐数据到kafak,对我们来说是一个遗憾. 我和oliver(https://github.com/oliveagle)做过一点改造之后, 让她支持了这个功能.
目前我们所有iis服务器已经部署了这个应用, 效率高, 占资源小, 可以数据压缩, 支持简单的格式切割, 实乃windows居家必备(我真不是来打广告的). golang客户端, 还能直接发送到kafka, 想想就很贴心~
贴上一段配置瞅瞅先, 启一个进程采集nginx和tomcat日志, 分别吐到kafka的2个topic中.
{
"files": [
{
"paths": [
"/var/log/nginx/*.log"
],
"Fields":{
"type":"nginx"
},
"DeadTime": "30m"
},
{
"paths": [
"/var/log/tomcat/*.log",
"/var/log/tomcat/*/*.log"
],
"Fields":{
"type":"tomcat"
},
"DeadTime": "30m"
}
],
"kafka": {
"broker_list": ["10.0.0.1:9092","10.0.0.2:9092"],
"topic_id": "topic_name_change_it_{{.type}}",
"compression_codec": "gzip"
}
}
再简单介绍一下参数吧,
- DeadTime:30m 是说超过30分钟没有更新, 就不会再继续跟踪这个文件了(退出goroutine)
- “Fields”:{ “type”:”tomcat” } , 会在每条日志中增加配置的字段
- path目前就是用的golang官方库, 好像是还不支持递归多层目录查找, 反正我翻了一下文档, 没有找到.
grok还不支持, 但简单的分割是可以的
"files": [
{
"paths": [
"d:\\target.txt"
],
"FieldNames": ["datetime", "datetime", "s_ip", "cs_method", "cs_uri_stem", "cs_uri_query", "s_port", "time_taken"],
"Delimiter": "\\s+",
"QuoteChar": "\""
}
]
以上配置就是说按空白符把日志切割来, 塞到对应的字段中去. 第一个第二个合在一起, 放在datetime字段中.
其实还是有不少要完善的地方, 比如说没有带上机器的Hostname, 以及日志的路径. 在很多时候, 这些信息还是很有用的, 我们也会继续完善.
现在放在了https://github.com/childe/logs ... kafka, 有需要的同学,可以去看下. 收起阅读 »
Day15:Beats是什么东西?
https://pan.baidu.com/s/1eS157 ... -6-18
事实上Beats是一系列产品的统称,属于ElasticStack里面收集数据的这一层:Data Shipper Layer,包括以下若干Beats:
- PacketBeat,用来嗅探和分析网络流量,如HTTP、MySQL、Redis等
- TopBeat,用来收集系统的监控信息,功能如其名,类似*nix下的top命令,只不过所有的信息都会发送给后端的集中存储:Elasticsearch,这样你就可以很方便的监控所有的服务器的运行情况了
- FileBeat,用来收集数据源是文件的数据,比如常见的系统日志、应用日志、网站日志等等,FIleBeat思路来自Logstash-forwarder,Beats团队加入之后重构改写而成,解决的就是Logstash作为Agent采集时占用太多被收集系统资源的问题,Beats家族都是Golang编写,效率高,占用内存和CPU比较少,非常适合作为agent跑着服务器上
- 。。。
所以Beats其实是一套框架,另外的一个子项目Libbeat,就是所有beats都共用的模块,封装了所有的公共的组件,如配置管理、公共基础类、协议的解析处理、与Elasticsearch的操作等等,你可以很方便基于它实现你自己的beats,这也是Beats的目标,希望将来会出现更多的Beats,做各种各样的事情。
另外PacketBeat比较特殊,它又是网络协议抓包和处理的一个框架,目前支持了常见的一些协议,要扩展未知的协议其实非常简单,PacketBeat作为一个框架,数据抓包和后续的存储已经帮你处理好了,你只需要实现你的协议的解码操作就行了,当然这块也是最难和最业务相关的。
关于PacketBeat我回头再单独写一篇文章来介绍怎样编写一个PacketBeat的协议扩展吧,PacketBeat扩展的其它协议最终还是需要和PacketBeat集成在一起,也就是最终你的代码是要和PacketBeat的代码在一个工程里面的,而其它的Beats使用Libbeat完全是单独的Beat,如Filebeat和TopBeat,完全是独立打包和独立运行,这个也是两大Beats的主要区别。
随便提一下,现在所有的这些Beats已经合并到一个项目里面来方便管理了,golang,you know:https://github.com/elastic/beats
现在社区已经提交了的Beats:
https://www.elastic.co/guide/e ... .html
明后天在Beijing的ArchSummit2015,我将在Elastic展台,欢迎过来骚扰,领取Elastic的各种贴纸,还有限量的印有Elastic的T恤,数量有限哦
今天的Advent就这些吧。
Advent接力活动,规则:http://elasticsearch.cn/article/20
https://pan.baidu.com/s/1eS157 ... -6-18
事实上Beats是一系列产品的统称,属于ElasticStack里面收集数据的这一层:Data Shipper Layer,包括以下若干Beats:
- PacketBeat,用来嗅探和分析网络流量,如HTTP、MySQL、Redis等
- TopBeat,用来收集系统的监控信息,功能如其名,类似*nix下的top命令,只不过所有的信息都会发送给后端的集中存储:Elasticsearch,这样你就可以很方便的监控所有的服务器的运行情况了
- FileBeat,用来收集数据源是文件的数据,比如常见的系统日志、应用日志、网站日志等等,FIleBeat思路来自Logstash-forwarder,Beats团队加入之后重构改写而成,解决的就是Logstash作为Agent采集时占用太多被收集系统资源的问题,Beats家族都是Golang编写,效率高,占用内存和CPU比较少,非常适合作为agent跑着服务器上
- 。。。
所以Beats其实是一套框架,另外的一个子项目Libbeat,就是所有beats都共用的模块,封装了所有的公共的组件,如配置管理、公共基础类、协议的解析处理、与Elasticsearch的操作等等,你可以很方便基于它实现你自己的beats,这也是Beats的目标,希望将来会出现更多的Beats,做各种各样的事情。
另外PacketBeat比较特殊,它又是网络协议抓包和处理的一个框架,目前支持了常见的一些协议,要扩展未知的协议其实非常简单,PacketBeat作为一个框架,数据抓包和后续的存储已经帮你处理好了,你只需要实现你的协议的解码操作就行了,当然这块也是最难和最业务相关的。
关于PacketBeat我回头再单独写一篇文章来介绍怎样编写一个PacketBeat的协议扩展吧,PacketBeat扩展的其它协议最终还是需要和PacketBeat集成在一起,也就是最终你的代码是要和PacketBeat的代码在一个工程里面的,而其它的Beats使用Libbeat完全是单独的Beat,如Filebeat和TopBeat,完全是独立打包和独立运行,这个也是两大Beats的主要区别。
随便提一下,现在所有的这些Beats已经合并到一个项目里面来方便管理了,golang,you know:https://github.com/elastic/beats
现在社区已经提交了的Beats:
https://www.elastic.co/guide/e ... .html
明后天在Beijing的ArchSummit2015,我将在Elastic展台,欢迎过来骚扰,领取Elastic的各种贴纸,还有限量的印有Elastic的T恤,数量有限哦
今天的Advent就这些吧。
Advent接力活动,规则:http://elasticsearch.cn/article/20
收起阅读 »
Day14: percolator接口在logstash中的运用
而单纯从 Logstash 来做实时过滤报警,规则又不是很灵活。toplog.io 公司开发了一个 logstash-output-percolator插件,在有一定既定条件的情况下,成功运用上了 Percolator 方案。
这个插件的设计逻辑是:
- 通过 logstash-filter-checksum 自主生成 ES 文档的 _id;
- 使用上一步生成的 _id 同时发送 logstash-output-elasticsearch 和 logstash-output-percolator
- Percolator 接口一旦过滤成功,将 _id 发送给 Redis 服务器
- 其他系统从 Redis 服务器中获取 _id 即可从 ES 里拿到实际数据
Percolator 接口的用法简单说是这样:
创建接口:
curl -XPUT 'localhost:9200/patterns/.percolator/my-pattern-id' -d '{"query" : {"match" : {"message" : "ERROR"} } }'
过滤测试:curl -XGET 'localhost:9200/my-index/my-type/_percolate' -d '{"doc" : {"message" : "ERROR: Service Apache failed to connect to MySQL"} }'
要点就是把文档放在 doc 属性里发送到 _percolate 里。对应的 Logstash 配置如下:
filter {
checksum {
algorithm => "md5"
keys => ["message"]
}
}
output {
elasticsearch {
host => "localhost"
cluster => "my-cluster"
document_id => "%{logstash_checksum}"
index => "my-index"
}
percolator {
host => "es-balancer"
redis_host => ["localhost"]
document_id => "%{logstash_checksum}"
pattern_index => "patterns"
}
}
连接上对应的 Redis,就可以看到报警信息了:$ redis-cli
127.0.0.1:6379> lrange percolator 0 1
1) "{\"matches\":[\"2\"],\"document_id\":\"a5d5c5f69b26ac0597370c9b1e7a8111\"}"
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。
而单纯从 Logstash 来做实时过滤报警,规则又不是很灵活。toplog.io 公司开发了一个 logstash-output-percolator插件,在有一定既定条件的情况下,成功运用上了 Percolator 方案。
这个插件的设计逻辑是:
- 通过 logstash-filter-checksum 自主生成 ES 文档的 _id;
- 使用上一步生成的 _id 同时发送 logstash-output-elasticsearch 和 logstash-output-percolator
- Percolator 接口一旦过滤成功,将 _id 发送给 Redis 服务器
- 其他系统从 Redis 服务器中获取 _id 即可从 ES 里拿到实际数据
Percolator 接口的用法简单说是这样:
创建接口:
curl -XPUT 'localhost:9200/patterns/.percolator/my-pattern-id' -d '{"query" : {"match" : {"message" : "ERROR"} } }'
过滤测试:curl -XGET 'localhost:9200/my-index/my-type/_percolate' -d '{"doc" : {"message" : "ERROR: Service Apache failed to connect to MySQL"} }'
要点就是把文档放在 doc 属性里发送到 _percolate 里。对应的 Logstash 配置如下:
filter {
checksum {
algorithm => "md5"
keys => ["message"]
}
}
output {
elasticsearch {
host => "localhost"
cluster => "my-cluster"
document_id => "%{logstash_checksum}"
index => "my-index"
}
percolator {
host => "es-balancer"
redis_host => ["localhost"]
document_id => "%{logstash_checksum}"
pattern_index => "patterns"
}
}
连接上对应的 Redis,就可以看到报警信息了:$ redis-cli
127.0.0.1:6379> lrange percolator 0 1
1) "{\"matches\":[\"2\"],\"document_id\":\"a5d5c5f69b26ac0597370c9b1e7a8111\"}"
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。 收起阅读 »
Day13: ipip.net介绍
用法很简单:
filter {
ipip {
source => "clientip"
target => "ipip"
}
}
生成的 JSON 数据结构类似下面这样:{
"clientip" : "",
"ipip" : {
"country" : "",
"city" : "",
"carrier" : "",
"province" : ""
}
}
不过这个插件只实现了收费版的数据库基础格式。免费版的支持,收费版高级的经纬度、基站位置等,都没有随着更新。事实上,我们可以通过 ipip 官方的 Java 库,实现一个更灵活的 logstash-filter-ipip_java 插件出来,下期见。想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。
用法很简单:
filter {
ipip {
source => "clientip"
target => "ipip"
}
}
生成的 JSON 数据结构类似下面这样:{
"clientip" : "",
"ipip" : {
"country" : "",
"city" : "",
"carrier" : "",
"province" : ""
}
}
不过这个插件只实现了收费版的数据库基础格式。免费版的支持,收费版高级的经纬度、基站位置等,都没有随着更新。事实上,我们可以通过 ipip 官方的 Java 库,实现一个更灵活的 logstash-filter-ipip_java 插件出来,下期见。想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。 收起阅读 »