
Day18: 程序内的消息流:ArrayBlockingQueue和zeromq对比
在logstash内部, input到filter, 以及filter到output, 消息都是通过一个队列来中转.
在我写hangout的第一个版本,也是这么做的,用ArrayBlockingQueue来中转消息, 上游几个线程把消息放在queue中, 下游再几个线程把queue中的消息消费走.
但是, 用下来之后, 发现在queue上面消耗的资源是相当的大,strace查看,非常大量的lock相关的系统调用, 现在的版本已经把queue去掉了. 想必Logstash也会有大量资源用在这一块.
zeromq中的Parallel Pipeline正好适合这个场景,而且文档中说是lock free的, 拿来和queue对比一下看.
在我自己的电脑上测试,2.6 GHz Intel Core i5. 一个主线程生成10,000,000个随机数, 分发给四个线程消费.
用Queue来实现, 需要约37秒, CPU使用率在150%. 用zeromq的ipc来传递消息, 只需要22秒, 期间CPU使用率在250%. 总的CPU使用时间都60秒左右.
不知道java中还有没有更合适的Queue可以用在这个场景中.至少zeromq和ArrayBlockingQueue相比, zeromq可以更快的处理消息, 但代价就是更高的CPU使用率.
继续阅读 »
在我写hangout的第一个版本,也是这么做的,用ArrayBlockingQueue来中转消息, 上游几个线程把消息放在queue中, 下游再几个线程把queue中的消息消费走.
但是, 用下来之后, 发现在queue上面消耗的资源是相当的大,strace查看,非常大量的lock相关的系统调用, 现在的版本已经把queue去掉了. 想必Logstash也会有大量资源用在这一块.
zeromq中的Parallel Pipeline正好适合这个场景,而且文档中说是lock free的, 拿来和queue对比一下看.
在我自己的电脑上测试,2.6 GHz Intel Core i5. 一个主线程生成10,000,000个随机数, 分发给四个线程消费.
用Queue来实现, 需要约37秒, CPU使用率在150%. 用zeromq的ipc来传递消息, 只需要22秒, 期间CPU使用率在250%. 总的CPU使用时间都60秒左右.
不知道java中还有没有更合适的Queue可以用在这个场景中.至少zeromq和ArrayBlockingQueue相比, zeromq可以更快的处理消息, 但代价就是更高的CPU使用率.
在logstash内部, input到filter, 以及filter到output, 消息都是通过一个队列来中转.
在我写hangout的第一个版本,也是这么做的,用ArrayBlockingQueue来中转消息, 上游几个线程把消息放在queue中, 下游再几个线程把queue中的消息消费走.
但是, 用下来之后, 发现在queue上面消耗的资源是相当的大,strace查看,非常大量的lock相关的系统调用, 现在的版本已经把queue去掉了. 想必Logstash也会有大量资源用在这一块.
zeromq中的Parallel Pipeline正好适合这个场景,而且文档中说是lock free的, 拿来和queue对比一下看.
在我自己的电脑上测试,2.6 GHz Intel Core i5. 一个主线程生成10,000,000个随机数, 分发给四个线程消费.
用Queue来实现, 需要约37秒, CPU使用率在150%. 用zeromq的ipc来传递消息, 只需要22秒, 期间CPU使用率在250%. 总的CPU使用时间都60秒左右.
不知道java中还有没有更合适的Queue可以用在这个场景中.至少zeromq和ArrayBlockingQueue相比, zeromq可以更快的处理消息, 但代价就是更高的CPU使用率. 收起阅读 »
在我写hangout的第一个版本,也是这么做的,用ArrayBlockingQueue来中转消息, 上游几个线程把消息放在queue中, 下游再几个线程把queue中的消息消费走.
但是, 用下来之后, 发现在queue上面消耗的资源是相当的大,strace查看,非常大量的lock相关的系统调用, 现在的版本已经把queue去掉了. 想必Logstash也会有大量资源用在这一块.
zeromq中的Parallel Pipeline正好适合这个场景,而且文档中说是lock free的, 拿来和queue对比一下看.
在我自己的电脑上测试,2.6 GHz Intel Core i5. 一个主线程生成10,000,000个随机数, 分发给四个线程消费.
用Queue来实现, 需要约37秒, CPU使用率在150%. 用zeromq的ipc来传递消息, 只需要22秒, 期间CPU使用率在250%. 总的CPU使用时间都60秒左右.
不知道java中还有没有更合适的Queue可以用在这个场景中.至少zeromq和ArrayBlockingQueue相比, zeromq可以更快的处理消息, 但代价就是更高的CPU使用率. 收起阅读 »

Day17: "奇怪"的搜索
代@childe 发文。
除了应用在日志系统外, 越来越多的业务数据也接入ES, 利用它天生强大的搜索性能和分布式可扩展, 可以为业务的精确快速灵活的搜索提供极大便利, 我觉得这是未来一个很好的方向.
但是, 对它ES各种各样的搜索方式, 你了解了吗?
我们来看几个”奇怪”的搜索.
## 奇怪的打分
### 奇怪的打分1
我们有个数据结构是
插入了几条数据, 有诸葛亮 诸葛明 诸葛暗 诸葛黑, 还有个人名字很奇怪, 叫司马诸葛.
然后我们要搜索诸葛瑾, 虽然索引里面没有一个人叫这个名字, 但搜索出来诸葛亮也不错, 他们名字这么像, 说不定是亲兄弟, 可以顺藤摸瓜, 找到我们需要的信息呢.
但实际上呢, 司马诸葛这个人居然稳居搜索榜首位, 他是搞竞价排名了吧? 你知道其中的打分原理吗?
### 奇怪的打分2
我们有两条数据:
## and用在哪
是说
A: 姓和名里面都要含有"peter smith”,
还是说
B: 姓或者名里面要包含peter以及smith ?
还有, 怎么才能获得另外一个效果呢?
# 列表中的元素
我们有一条数据如下(按汉语分词)
上面这些其实都是[elasticsearch Definitive Guide](https://www.elastic.co/guide)里面的几个小例子, 欢迎大家继续去那里寻找答案和其他各种小技巧.
继续阅读 »
除了应用在日志系统外, 越来越多的业务数据也接入ES, 利用它天生强大的搜索性能和分布式可扩展, 可以为业务的精确快速灵活的搜索提供极大便利, 我觉得这是未来一个很好的方向.
但是, 对它ES各种各样的搜索方式, 你了解了吗?
我们来看几个”奇怪”的搜索.
## 奇怪的打分
### 奇怪的打分1
我们有个数据结构是
{
“first_name”:”string”,
“last_name”:”string”
}
插入了几条数据, 有诸葛亮 诸葛明 诸葛暗 诸葛黑, 还有个人名字很奇怪, 叫司马诸葛.
然后我们要搜索诸葛瑾, 虽然索引里面没有一个人叫这个名字, 但搜索出来诸葛亮也不错, 他们名字这么像, 说不定是亲兄弟, 可以顺藤摸瓜, 找到我们需要的信息呢.
{
"query": {
"multi_match": {
"query": "诸葛瑜",
"type": "most_fields",
"fields": [ “*_name” ]
}
}
}
但实际上呢, 司马诸葛这个人居然稳居搜索榜首位, 他是搞竞价排名了吧? 你知道其中的打分原理吗?
### 奇怪的打分2
我们有两条数据:
PUT /my_index/my_type/1
{
"title": "Quick brown rabbits",
"body": "Brown rabbits are commonly seen."
}
PUT /my_index/my_type/2
{
"title": "Keeping pets healthy",
"body": "My quick brown fox eats rabbits on a regular basis."
}
要搜索{
"query": {
"bool": {
"should": [
{ "match": { "title": "Brown fox" }},
{ "match": { "body": "Brown fox" }}
]
}
}
}
第二条文档里面明确含有”brown fox”这个词组, 但是它的搜索得分比较低, 你知道为啥吗?## and用在哪
{
"query": {
"multi_match": {
"query": "peter smith",
"type": "most_fields",
"operator": "and",
"fields": [ "first_name", "last_name" ]
}
}
}
你知道这个and代表什么吗?是说
A: 姓和名里面都要含有"peter smith”,
还是说
B: 姓或者名里面要包含peter以及smith ?
还有, 怎么才能获得另外一个效果呢?
# 列表中的元素
我们有一条数据如下(按汉语分词)
{
“时代”:”三国”,
“姓名”: [“大司马”,“诸葛亮”]
}
我以词组的方式搜索:{
"query": {
"match_phrase": {
"姓名": "司马诸葛"
}
}
}
能搜索到吗?上面这些其实都是[elasticsearch Definitive Guide](https://www.elastic.co/guide)里面的几个小例子, 欢迎大家继续去那里寻找答案和其他各种小技巧.
代@childe 发文。
除了应用在日志系统外, 越来越多的业务数据也接入ES, 利用它天生强大的搜索性能和分布式可扩展, 可以为业务的精确快速灵活的搜索提供极大便利, 我觉得这是未来一个很好的方向.
但是, 对它ES各种各样的搜索方式, 你了解了吗?
我们来看几个”奇怪”的搜索.
## 奇怪的打分
### 奇怪的打分1
我们有个数据结构是
插入了几条数据, 有诸葛亮 诸葛明 诸葛暗 诸葛黑, 还有个人名字很奇怪, 叫司马诸葛.
然后我们要搜索诸葛瑾, 虽然索引里面没有一个人叫这个名字, 但搜索出来诸葛亮也不错, 他们名字这么像, 说不定是亲兄弟, 可以顺藤摸瓜, 找到我们需要的信息呢.
但实际上呢, 司马诸葛这个人居然稳居搜索榜首位, 他是搞竞价排名了吧? 你知道其中的打分原理吗?
### 奇怪的打分2
我们有两条数据:
## and用在哪
是说
A: 姓和名里面都要含有"peter smith”,
还是说
B: 姓或者名里面要包含peter以及smith ?
还有, 怎么才能获得另外一个效果呢?
# 列表中的元素
我们有一条数据如下(按汉语分词)
上面这些其实都是[elasticsearch Definitive Guide](https://www.elastic.co/guide)里面的几个小例子, 欢迎大家继续去那里寻找答案和其他各种小技巧.
收起阅读 »
除了应用在日志系统外, 越来越多的业务数据也接入ES, 利用它天生强大的搜索性能和分布式可扩展, 可以为业务的精确快速灵活的搜索提供极大便利, 我觉得这是未来一个很好的方向.
但是, 对它ES各种各样的搜索方式, 你了解了吗?
我们来看几个”奇怪”的搜索.
## 奇怪的打分
### 奇怪的打分1
我们有个数据结构是
{
“first_name”:”string”,
“last_name”:”string”
}
插入了几条数据, 有诸葛亮 诸葛明 诸葛暗 诸葛黑, 还有个人名字很奇怪, 叫司马诸葛.
然后我们要搜索诸葛瑾, 虽然索引里面没有一个人叫这个名字, 但搜索出来诸葛亮也不错, 他们名字这么像, 说不定是亲兄弟, 可以顺藤摸瓜, 找到我们需要的信息呢.
{
"query": {
"multi_match": {
"query": "诸葛瑜",
"type": "most_fields",
"fields": [ “*_name” ]
}
}
}
但实际上呢, 司马诸葛这个人居然稳居搜索榜首位, 他是搞竞价排名了吧? 你知道其中的打分原理吗?
### 奇怪的打分2
我们有两条数据:
PUT /my_index/my_type/1
{
"title": "Quick brown rabbits",
"body": "Brown rabbits are commonly seen."
}
PUT /my_index/my_type/2
{
"title": "Keeping pets healthy",
"body": "My quick brown fox eats rabbits on a regular basis."
}
要搜索{
"query": {
"bool": {
"should": [
{ "match": { "title": "Brown fox" }},
{ "match": { "body": "Brown fox" }}
]
}
}
}
第二条文档里面明确含有”brown fox”这个词组, 但是它的搜索得分比较低, 你知道为啥吗?## and用在哪
{
"query": {
"multi_match": {
"query": "peter smith",
"type": "most_fields",
"operator": "and",
"fields": [ "first_name", "last_name" ]
}
}
}
你知道这个and代表什么吗?是说
A: 姓和名里面都要含有"peter smith”,
还是说
B: 姓或者名里面要包含peter以及smith ?
还有, 怎么才能获得另外一个效果呢?
# 列表中的元素
我们有一条数据如下(按汉语分词)
{
“时代”:”三国”,
“姓名”: [“大司马”,“诸葛亮”]
}
我以词组的方式搜索:{
"query": {
"match_phrase": {
"姓名": "司马诸葛"
}
}
}
能搜索到吗?上面这些其实都是[elasticsearch Definitive Guide](https://www.elastic.co/guide)里面的几个小例子, 欢迎大家继续去那里寻找答案和其他各种小技巧.
收起阅读 »

day16 logstash-forwader To Kakfa!
看到前一天, Medcl 介绍了Beat, 我想今天我就介绍一下算是同一个领域的, 我们的一个小产品吧, 同样也基于elastic旗下的logstash-forwarder. 我真的不是来打广告的, 就是第一次写, 没经验, 看着前一天的文章, 顺手就想到了.
在日志收集系统中, 从kafkf到ES这条路是没问题了, 但散布在各个服务器上采集日志的agent用logstash实在是太重了, 而且效率也低. 特别是我们有大量的windows服务器, 找一个合适的agent居然不是想象中的容易.
logstash-forwarder对于日志文件的探测和offset记录, deadtime等配置都非常适合我们, 但惟一不支持吐数据到kafak,对我们来说是一个遗憾. 我和oliver(https://github.com/oliveagle)做过一点改造之后, 让她支持了这个功能.
目前我们所有iis服务器已经部署了这个应用, 效率高, 占资源小, 可以数据压缩, 支持简单的格式切割, 实乃windows居家必备(我真不是来打广告的). golang客户端, 还能直接发送到kafka, 想想就很贴心~
贴上一段配置瞅瞅先, 启一个进程采集nginx和tomcat日志, 分别吐到kafka的2个topic中.
再简单介绍一下参数吧,
grok还不支持, 但简单的分割是可以的
以上配置就是说按空白符把日志切割来, 塞到对应的字段中去. 第一个第二个合在一起, 放在datetime字段中.
其实还是有不少要完善的地方, 比如说没有带上机器的Hostname, 以及日志的路径. 在很多时候, 这些信息还是很有用的, 我们也会继续完善.
现在放在了https://github.com/childe/logs ... kafka, 有需要的同学,可以去看下.
继续阅读 »
在日志收集系统中, 从kafkf到ES这条路是没问题了, 但散布在各个服务器上采集日志的agent用logstash实在是太重了, 而且效率也低. 特别是我们有大量的windows服务器, 找一个合适的agent居然不是想象中的容易.
logstash-forwarder对于日志文件的探测和offset记录, deadtime等配置都非常适合我们, 但惟一不支持吐数据到kafak,对我们来说是一个遗憾. 我和oliver(https://github.com/oliveagle)做过一点改造之后, 让她支持了这个功能.
目前我们所有iis服务器已经部署了这个应用, 效率高, 占资源小, 可以数据压缩, 支持简单的格式切割, 实乃windows居家必备(我真不是来打广告的). golang客户端, 还能直接发送到kafka, 想想就很贴心~
贴上一段配置瞅瞅先, 启一个进程采集nginx和tomcat日志, 分别吐到kafka的2个topic中.
{
"files": [
{
"paths": [
"/var/log/nginx/*.log"
],
"Fields":{
"type":"nginx"
},
"DeadTime": "30m"
},
{
"paths": [
"/var/log/tomcat/*.log",
"/var/log/tomcat/*/*.log"
],
"Fields":{
"type":"tomcat"
},
"DeadTime": "30m"
}
],
"kafka": {
"broker_list": ["10.0.0.1:9092","10.0.0.2:9092"],
"topic_id": "topic_name_change_it_{{.type}}",
"compression_codec": "gzip"
}
}
再简单介绍一下参数吧,
- DeadTime:30m 是说超过30分钟没有更新, 就不会再继续跟踪这个文件了(退出goroutine)
- “Fields”:{ “type”:”tomcat” } , 会在每条日志中增加配置的字段
- path目前就是用的golang官方库, 好像是还不支持递归多层目录查找, 反正我翻了一下文档, 没有找到.
grok还不支持, 但简单的分割是可以的
"files": [
{
"paths": [
"d:\\target.txt"
],
"FieldNames": ["datetime", "datetime", "s_ip", "cs_method", "cs_uri_stem", "cs_uri_query", "s_port", "time_taken"],
"Delimiter": "\\s+",
"QuoteChar": "\""
}
]
以上配置就是说按空白符把日志切割来, 塞到对应的字段中去. 第一个第二个合在一起, 放在datetime字段中.
其实还是有不少要完善的地方, 比如说没有带上机器的Hostname, 以及日志的路径. 在很多时候, 这些信息还是很有用的, 我们也会继续完善.
现在放在了https://github.com/childe/logs ... kafka, 有需要的同学,可以去看下.
看到前一天, Medcl 介绍了Beat, 我想今天我就介绍一下算是同一个领域的, 我们的一个小产品吧, 同样也基于elastic旗下的logstash-forwarder. 我真的不是来打广告的, 就是第一次写, 没经验, 看着前一天的文章, 顺手就想到了.
在日志收集系统中, 从kafkf到ES这条路是没问题了, 但散布在各个服务器上采集日志的agent用logstash实在是太重了, 而且效率也低. 特别是我们有大量的windows服务器, 找一个合适的agent居然不是想象中的容易.
logstash-forwarder对于日志文件的探测和offset记录, deadtime等配置都非常适合我们, 但惟一不支持吐数据到kafak,对我们来说是一个遗憾. 我和oliver(https://github.com/oliveagle)做过一点改造之后, 让她支持了这个功能.
目前我们所有iis服务器已经部署了这个应用, 效率高, 占资源小, 可以数据压缩, 支持简单的格式切割, 实乃windows居家必备(我真不是来打广告的). golang客户端, 还能直接发送到kafka, 想想就很贴心~
贴上一段配置瞅瞅先, 启一个进程采集nginx和tomcat日志, 分别吐到kafka的2个topic中.
再简单介绍一下参数吧,
grok还不支持, 但简单的分割是可以的
以上配置就是说按空白符把日志切割来, 塞到对应的字段中去. 第一个第二个合在一起, 放在datetime字段中.
其实还是有不少要完善的地方, 比如说没有带上机器的Hostname, 以及日志的路径. 在很多时候, 这些信息还是很有用的, 我们也会继续完善.
现在放在了https://github.com/childe/logs ... kafka, 有需要的同学,可以去看下. 收起阅读 »
在日志收集系统中, 从kafkf到ES这条路是没问题了, 但散布在各个服务器上采集日志的agent用logstash实在是太重了, 而且效率也低. 特别是我们有大量的windows服务器, 找一个合适的agent居然不是想象中的容易.
logstash-forwarder对于日志文件的探测和offset记录, deadtime等配置都非常适合我们, 但惟一不支持吐数据到kafak,对我们来说是一个遗憾. 我和oliver(https://github.com/oliveagle)做过一点改造之后, 让她支持了这个功能.
目前我们所有iis服务器已经部署了这个应用, 效率高, 占资源小, 可以数据压缩, 支持简单的格式切割, 实乃windows居家必备(我真不是来打广告的). golang客户端, 还能直接发送到kafka, 想想就很贴心~
贴上一段配置瞅瞅先, 启一个进程采集nginx和tomcat日志, 分别吐到kafka的2个topic中.
{
"files": [
{
"paths": [
"/var/log/nginx/*.log"
],
"Fields":{
"type":"nginx"
},
"DeadTime": "30m"
},
{
"paths": [
"/var/log/tomcat/*.log",
"/var/log/tomcat/*/*.log"
],
"Fields":{
"type":"tomcat"
},
"DeadTime": "30m"
}
],
"kafka": {
"broker_list": ["10.0.0.1:9092","10.0.0.2:9092"],
"topic_id": "topic_name_change_it_{{.type}}",
"compression_codec": "gzip"
}
}
再简单介绍一下参数吧,
- DeadTime:30m 是说超过30分钟没有更新, 就不会再继续跟踪这个文件了(退出goroutine)
- “Fields”:{ “type”:”tomcat” } , 会在每条日志中增加配置的字段
- path目前就是用的golang官方库, 好像是还不支持递归多层目录查找, 反正我翻了一下文档, 没有找到.
grok还不支持, 但简单的分割是可以的
"files": [
{
"paths": [
"d:\\target.txt"
],
"FieldNames": ["datetime", "datetime", "s_ip", "cs_method", "cs_uri_stem", "cs_uri_query", "s_port", "time_taken"],
"Delimiter": "\\s+",
"QuoteChar": "\""
}
]
以上配置就是说按空白符把日志切割来, 塞到对应的字段中去. 第一个第二个合在一起, 放在datetime字段中.
其实还是有不少要完善的地方, 比如说没有带上机器的Hostname, 以及日志的路径. 在很多时候, 这些信息还是很有用的, 我们也会继续完善.
现在放在了https://github.com/childe/logs ... kafka, 有需要的同学,可以去看下. 收起阅读 »

Day15:Beats是什么东西?
Advent接力传到我这里了,今天我给大家介绍一下Beats,刚好前几天也有好多人问我它是干嘛的,之前的上海我有分享过Beats的内容,PPT在这里:
https://pan.baidu.com/s/1eS157 ... -6-18
事实上Beats是一系列产品的统称,属于ElasticStack里面收集数据的这一层:Data Shipper Layer,包括以下若干Beats:
所以Beats其实是一套框架,另外的一个子项目Libbeat,就是所有beats都共用的模块,封装了所有的公共的组件,如配置管理、公共基础类、协议的解析处理、与Elasticsearch的操作等等,你可以很方便基于它实现你自己的beats,这也是Beats的目标,希望将来会出现更多的Beats,做各种各样的事情。
另外PacketBeat比较特殊,它又是网络协议抓包和处理的一个框架,目前支持了常见的一些协议,要扩展未知的协议其实非常简单,PacketBeat作为一个框架,数据抓包和后续的存储已经帮你处理好了,你只需要实现你的协议的解码操作就行了,当然这块也是最难和最业务相关的。
关于PacketBeat我回头再单独写一篇文章来介绍怎样编写一个PacketBeat的协议扩展吧,PacketBeat扩展的其它协议最终还是需要和PacketBeat集成在一起,也就是最终你的代码是要和PacketBeat的代码在一个工程里面的,而其它的Beats使用Libbeat完全是单独的Beat,如Filebeat和TopBeat,完全是独立打包和独立运行,这个也是两大Beats的主要区别。
随便提一下,现在所有的这些Beats已经合并到一个项目里面来方便管理了,golang,you know:https://github.com/elastic/beats
现在社区已经提交了的Beats:
https://www.elastic.co/guide/e ... .html
明后天在Beijing的ArchSummit2015,我将在Elastic展台,欢迎过来骚扰,领取Elastic的各种贴纸,还有限量的印有Elastic的T恤,数量有限哦
今天的Advent就这些吧。
Advent接力活动,规则:http://elasticsearch.cn/article/20
继续阅读 »
https://pan.baidu.com/s/1eS157 ... -6-18
事实上Beats是一系列产品的统称,属于ElasticStack里面收集数据的这一层:Data Shipper Layer,包括以下若干Beats:
- PacketBeat,用来嗅探和分析网络流量,如HTTP、MySQL、Redis等
- TopBeat,用来收集系统的监控信息,功能如其名,类似*nix下的top命令,只不过所有的信息都会发送给后端的集中存储:Elasticsearch,这样你就可以很方便的监控所有的服务器的运行情况了
- FileBeat,用来收集数据源是文件的数据,比如常见的系统日志、应用日志、网站日志等等,FIleBeat思路来自Logstash-forwarder,Beats团队加入之后重构改写而成,解决的就是Logstash作为Agent采集时占用太多被收集系统资源的问题,Beats家族都是Golang编写,效率高,占用内存和CPU比较少,非常适合作为agent跑着服务器上
- 。。。
所以Beats其实是一套框架,另外的一个子项目Libbeat,就是所有beats都共用的模块,封装了所有的公共的组件,如配置管理、公共基础类、协议的解析处理、与Elasticsearch的操作等等,你可以很方便基于它实现你自己的beats,这也是Beats的目标,希望将来会出现更多的Beats,做各种各样的事情。
另外PacketBeat比较特殊,它又是网络协议抓包和处理的一个框架,目前支持了常见的一些协议,要扩展未知的协议其实非常简单,PacketBeat作为一个框架,数据抓包和后续的存储已经帮你处理好了,你只需要实现你的协议的解码操作就行了,当然这块也是最难和最业务相关的。
关于PacketBeat我回头再单独写一篇文章来介绍怎样编写一个PacketBeat的协议扩展吧,PacketBeat扩展的其它协议最终还是需要和PacketBeat集成在一起,也就是最终你的代码是要和PacketBeat的代码在一个工程里面的,而其它的Beats使用Libbeat完全是单独的Beat,如Filebeat和TopBeat,完全是独立打包和独立运行,这个也是两大Beats的主要区别。
随便提一下,现在所有的这些Beats已经合并到一个项目里面来方便管理了,golang,you know:https://github.com/elastic/beats
现在社区已经提交了的Beats:
https://www.elastic.co/guide/e ... .html
明后天在Beijing的ArchSummit2015,我将在Elastic展台,欢迎过来骚扰,领取Elastic的各种贴纸,还有限量的印有Elastic的T恤,数量有限哦
今天的Advent就这些吧。
Advent接力活动,规则:http://elasticsearch.cn/article/20
Advent接力传到我这里了,今天我给大家介绍一下Beats,刚好前几天也有好多人问我它是干嘛的,之前的上海我有分享过Beats的内容,PPT在这里:
https://pan.baidu.com/s/1eS157 ... -6-18
事实上Beats是一系列产品的统称,属于ElasticStack里面收集数据的这一层:Data Shipper Layer,包括以下若干Beats:
所以Beats其实是一套框架,另外的一个子项目Libbeat,就是所有beats都共用的模块,封装了所有的公共的组件,如配置管理、公共基础类、协议的解析处理、与Elasticsearch的操作等等,你可以很方便基于它实现你自己的beats,这也是Beats的目标,希望将来会出现更多的Beats,做各种各样的事情。
另外PacketBeat比较特殊,它又是网络协议抓包和处理的一个框架,目前支持了常见的一些协议,要扩展未知的协议其实非常简单,PacketBeat作为一个框架,数据抓包和后续的存储已经帮你处理好了,你只需要实现你的协议的解码操作就行了,当然这块也是最难和最业务相关的。
关于PacketBeat我回头再单独写一篇文章来介绍怎样编写一个PacketBeat的协议扩展吧,PacketBeat扩展的其它协议最终还是需要和PacketBeat集成在一起,也就是最终你的代码是要和PacketBeat的代码在一个工程里面的,而其它的Beats使用Libbeat完全是单独的Beat,如Filebeat和TopBeat,完全是独立打包和独立运行,这个也是两大Beats的主要区别。
随便提一下,现在所有的这些Beats已经合并到一个项目里面来方便管理了,golang,you know:https://github.com/elastic/beats
现在社区已经提交了的Beats:
https://www.elastic.co/guide/e ... .html
明后天在Beijing的ArchSummit2015,我将在Elastic展台,欢迎过来骚扰,领取Elastic的各种贴纸,还有限量的印有Elastic的T恤,数量有限哦
今天的Advent就这些吧。
Advent接力活动,规则:http://elasticsearch.cn/article/20
收起阅读 »
https://pan.baidu.com/s/1eS157 ... -6-18
事实上Beats是一系列产品的统称,属于ElasticStack里面收集数据的这一层:Data Shipper Layer,包括以下若干Beats:
- PacketBeat,用来嗅探和分析网络流量,如HTTP、MySQL、Redis等
- TopBeat,用来收集系统的监控信息,功能如其名,类似*nix下的top命令,只不过所有的信息都会发送给后端的集中存储:Elasticsearch,这样你就可以很方便的监控所有的服务器的运行情况了
- FileBeat,用来收集数据源是文件的数据,比如常见的系统日志、应用日志、网站日志等等,FIleBeat思路来自Logstash-forwarder,Beats团队加入之后重构改写而成,解决的就是Logstash作为Agent采集时占用太多被收集系统资源的问题,Beats家族都是Golang编写,效率高,占用内存和CPU比较少,非常适合作为agent跑着服务器上
- 。。。
所以Beats其实是一套框架,另外的一个子项目Libbeat,就是所有beats都共用的模块,封装了所有的公共的组件,如配置管理、公共基础类、协议的解析处理、与Elasticsearch的操作等等,你可以很方便基于它实现你自己的beats,这也是Beats的目标,希望将来会出现更多的Beats,做各种各样的事情。
另外PacketBeat比较特殊,它又是网络协议抓包和处理的一个框架,目前支持了常见的一些协议,要扩展未知的协议其实非常简单,PacketBeat作为一个框架,数据抓包和后续的存储已经帮你处理好了,你只需要实现你的协议的解码操作就行了,当然这块也是最难和最业务相关的。
关于PacketBeat我回头再单独写一篇文章来介绍怎样编写一个PacketBeat的协议扩展吧,PacketBeat扩展的其它协议最终还是需要和PacketBeat集成在一起,也就是最终你的代码是要和PacketBeat的代码在一个工程里面的,而其它的Beats使用Libbeat完全是单独的Beat,如Filebeat和TopBeat,完全是独立打包和独立运行,这个也是两大Beats的主要区别。
随便提一下,现在所有的这些Beats已经合并到一个项目里面来方便管理了,golang,you know:https://github.com/elastic/beats
现在社区已经提交了的Beats:
https://www.elastic.co/guide/e ... .html
明后天在Beijing的ArchSummit2015,我将在Elastic展台,欢迎过来骚扰,领取Elastic的各种贴纸,还有限量的印有Elastic的T恤,数量有限哦
今天的Advent就这些吧。
Advent接力活动,规则:http://elasticsearch.cn/article/20
收起阅读 »

Day14: percolator接口在logstash中的运用
我们都知道 Elasticsearch 除了普通的 search 接口以外,还有另一个 Percolator 接口,天生用来做实时过滤告警的。但是由于接口比较复杂,在目前的 ELK 体系中不是很容易运用。
而单纯从 Logstash 来做实时过滤报警,规则又不是很灵活。toplog.io 公司开发了一个 logstash-output-percolator插件,在有一定既定条件的情况下,成功运用上了 Percolator 方案。
这个插件的设计逻辑是:
Percolator 接口的用法简单说是这样:
创建接口:
对应的 Logstash 配置如下:
继续阅读 »
而单纯从 Logstash 来做实时过滤报警,规则又不是很灵活。toplog.io 公司开发了一个 logstash-output-percolator插件,在有一定既定条件的情况下,成功运用上了 Percolator 方案。
这个插件的设计逻辑是:
- 通过 logstash-filter-checksum 自主生成 ES 文档的 _id;
- 使用上一步生成的 _id 同时发送 logstash-output-elasticsearch 和 logstash-output-percolator
- Percolator 接口一旦过滤成功,将 _id 发送给 Redis 服务器
- 其他系统从 Redis 服务器中获取 _id 即可从 ES 里拿到实际数据
Percolator 接口的用法简单说是这样:
创建接口:
curl -XPUT 'localhost:9200/patterns/.percolator/my-pattern-id' -d '{"query" : {"match" : {"message" : "ERROR"} } }'
过滤测试:curl -XGET 'localhost:9200/my-index/my-type/_percolate' -d '{"doc" : {"message" : "ERROR: Service Apache failed to connect to MySQL"} }'
要点就是把文档放在 doc 属性里发送到 _percolate 里。对应的 Logstash 配置如下:
filter {
checksum {
algorithm => "md5"
keys => ["message"]
}
}
output {
elasticsearch {
host => "localhost"
cluster => "my-cluster"
document_id => "%{logstash_checksum}"
index => "my-index"
}
percolator {
host => "es-balancer"
redis_host => ["localhost"]
document_id => "%{logstash_checksum}"
pattern_index => "patterns"
}
}
连接上对应的 Redis,就可以看到报警信息了:$ redis-cli
127.0.0.1:6379> lrange percolator 0 1
1) "{\"matches\":[\"2\"],\"document_id\":\"a5d5c5f69b26ac0597370c9b1e7a8111\"}"
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。
我们都知道 Elasticsearch 除了普通的 search 接口以外,还有另一个 Percolator 接口,天生用来做实时过滤告警的。但是由于接口比较复杂,在目前的 ELK 体系中不是很容易运用。
而单纯从 Logstash 来做实时过滤报警,规则又不是很灵活。toplog.io 公司开发了一个 logstash-output-percolator插件,在有一定既定条件的情况下,成功运用上了 Percolator 方案。
这个插件的设计逻辑是:
Percolator 接口的用法简单说是这样:
创建接口:
对应的 Logstash 配置如下:
而单纯从 Logstash 来做实时过滤报警,规则又不是很灵活。toplog.io 公司开发了一个 logstash-output-percolator插件,在有一定既定条件的情况下,成功运用上了 Percolator 方案。
这个插件的设计逻辑是:
- 通过 logstash-filter-checksum 自主生成 ES 文档的 _id;
- 使用上一步生成的 _id 同时发送 logstash-output-elasticsearch 和 logstash-output-percolator
- Percolator 接口一旦过滤成功,将 _id 发送给 Redis 服务器
- 其他系统从 Redis 服务器中获取 _id 即可从 ES 里拿到实际数据
Percolator 接口的用法简单说是这样:
创建接口:
curl -XPUT 'localhost:9200/patterns/.percolator/my-pattern-id' -d '{"query" : {"match" : {"message" : "ERROR"} } }'
过滤测试:curl -XGET 'localhost:9200/my-index/my-type/_percolate' -d '{"doc" : {"message" : "ERROR: Service Apache failed to connect to MySQL"} }'
要点就是把文档放在 doc 属性里发送到 _percolate 里。对应的 Logstash 配置如下:
filter {
checksum {
algorithm => "md5"
keys => ["message"]
}
}
output {
elasticsearch {
host => "localhost"
cluster => "my-cluster"
document_id => "%{logstash_checksum}"
index => "my-index"
}
percolator {
host => "es-balancer"
redis_host => ["localhost"]
document_id => "%{logstash_checksum}"
pattern_index => "patterns"
}
}
连接上对应的 Redis,就可以看到报警信息了:$ redis-cli
127.0.0.1:6379> lrange percolator 0 1
1) "{\"matches\":[\"2\"],\"document_id\":\"a5d5c5f69b26ac0597370c9b1e7a8111\"}"
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。 收起阅读 »

Day13: ipip.net介绍
Geo 定位在 ELK 应用中是非常重要和有用的一个环节。不幸的是:GeoIP 本身在国内的准确度实在堪忧。高春辉近年成立了一个项目,专注收集细化 IP 地址在国内的数据:http://www.ipip.net。数据分为免费版和收费版两种。项目提供了不少客户端,有趣的是,有社区贡献了一个 Logstash 插件:https://github.com/bittopaz/logstash-filter-ipip。
用法很简单:
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。
继续阅读 »
用法很简单:
filter {
ipip {
source => "clientip"
target => "ipip"
}
}
生成的 JSON 数据结构类似下面这样:{
"clientip" : "",
"ipip" : {
"country" : "",
"city" : "",
"carrier" : "",
"province" : ""
}
}
不过这个插件只实现了收费版的数据库基础格式。免费版的支持,收费版高级的经纬度、基站位置等,都没有随着更新。事实上,我们可以通过 ipip 官方的 Java 库,实现一个更灵活的 logstash-filter-ipip_java 插件出来,下期见。想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。
Geo 定位在 ELK 应用中是非常重要和有用的一个环节。不幸的是:GeoIP 本身在国内的准确度实在堪忧。高春辉近年成立了一个项目,专注收集细化 IP 地址在国内的数据:http://www.ipip.net。数据分为免费版和收费版两种。项目提供了不少客户端,有趣的是,有社区贡献了一个 Logstash 插件:https://github.com/bittopaz/logstash-filter-ipip。
用法很简单:
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。 收起阅读 »
用法很简单:
filter {
ipip {
source => "clientip"
target => "ipip"
}
}
生成的 JSON 数据结构类似下面这样:{
"clientip" : "",
"ipip" : {
"country" : "",
"city" : "",
"carrier" : "",
"province" : ""
}
}
不过这个插件只实现了收费版的数据库基础格式。免费版的支持,收费版高级的经纬度、基站位置等,都没有随着更新。事实上,我们可以通过 ipip 官方的 Java 库,实现一个更灵活的 logstash-filter-ipip_java 插件出来,下期见。想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。 收起阅读 »

Day12: siren-join简介
很多从 MySQL 转过来的 Elasticsearch 用户总是很习惯的问一个问题:『怎么在 ES 里实现 join 操作?』过去,我们的回答一般都是:通过类似宽表的思路,将数据平铺在一个索引里。不过,最近另一家 Lucene 开发商给出了另一个方案,他们开发了一个 Elasticsearch 插件,实现了 filter 层面的 join,GitHub 项目地址见:https://github.com/sirensolutions/siren-join
不过需要提醒一下的是:filter 层面的意思,就是只相当于是 SQL 里的 exists 操作。所以目前对这个插件也不要抱有太大期望。今天我们来稍微演示一下。
安装和其他 ES 插件一样:
我们 bulk 上传这么一段数据:
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。
继续阅读 »
不过需要提醒一下的是:filter 层面的意思,就是只相当于是 SQL 里的 exists 操作。所以目前对这个插件也不要抱有太大期望。今天我们来稍微演示一下。
安装和其他 ES 插件一样:
# bin/plugin -i solutions.siren/siren-join/1.0
注意 siren-join v1.0 只支持 ES 1.7 版本,2.0 版本支持据说正在开发中。我们 bulk 上传这么一段数据:
{"index":{"_index":"index1","_type":"type","_id":"1"}}
{"id":1, "foreign_key":"13"}
{"index":{"_index":"index1","_type":"type","_id":"2"}}
{"id":2}
{"index":{"_index":"index1","_type":"type","_id":"3"}}
{"id":3, "foreign_key": "2"}
{"index":{"_index":"index1","_type":"type","_id":"4"}}
{"id":4, "foreign_key": "14"}
{"index":{"_index":"index1","_type":"type","_id":"5"}}
{"id":5, "foreign_key": "2"}
{"index":{"_index":"index2","_type":"type","_id":"1"}}
{"id":"1", "tag": "aaa"}
{"index":{"_index":"index2","_type":"type","_id":"2"}}
{"id":"2", "tag": "aaa"}
{"index":{"_index":"index2","_type":"type","_id":"3"}}
{"id":"3", "tag": "bbb"}
{"index":{"_index":"index2","_type":"type","_id":"4"}}
{"id":"4", "tag": "ccc"}
注意,siren-join 要求用来 join 的字段必须数据类型一致。所以,当我们要用 index2 的 id 和 index1 的foreign_key 做 join 的时候,这两个字段就要保持一致,这里为了演示,特意都改成字符串。那么我们发起一个请求如下:# curl -s -XPOST 'http://localhost:9200/index1/_coordinate_search?pretty' -d '
{
"query":{
"filtered":{
"query":{
"match_all":{}
},
"filter":{
"filterjoin":{
"foreign_key":{
"index":"index2",
"type":"type",
"path":"id",
"query":{
"terms":{
"tag":["aaa"]
}
}
}
}
}
}
},
"aggs":{
"avg":{
"avg":{
"field":"id"
}
}
}
}'
意即:从 index2 中搜索 q=tag:aaa 的数据的 id,查找 index1 中对应 foreign_key 的文档的 id 数据平均值。响应结果如下:{
"coordinate_search" : {
"actions" : [ {
"relations" : {
"from" : {
"indices" : [ ],
"types" : [ ],
"field" : "id"
},
"to" : {
"indices" : null,
"types" : null,
"field" : "foreign_key"
}
},
"size" : 2,
"size_in_bytes" : 20,
"is_pruned" : false,
"cache_hit" : true,
"took" : 0
} ]
},
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
},
"hits" : {
"total" : 2,
"max_score" : 1.0,
"hits" : [ {
"_index" : "index1",
"_type" : "type",
"_id" : "5",
"_score" : 1.0,
"_source":{"id":5, "foreign_key": "2"}
}, {
"_index" : "index1",
"_type" : "type",
"_id" : "3",
"_score" : 1.0,
"_source":{"id":3, "foreign_key": "2"}
} ]
},
"aggregations" : {
"avg" : {
"value" : 4.0
}
}
}
响应告诉我们:从 index2 中搜索到 2 条参与 join 的文档,在 index1 中命中 2 条数据,最后求平均值为 4.0。想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。
很多从 MySQL 转过来的 Elasticsearch 用户总是很习惯的问一个问题:『怎么在 ES 里实现 join 操作?』过去,我们的回答一般都是:通过类似宽表的思路,将数据平铺在一个索引里。不过,最近另一家 Lucene 开发商给出了另一个方案,他们开发了一个 Elasticsearch 插件,实现了 filter 层面的 join,GitHub 项目地址见:https://github.com/sirensolutions/siren-join
不过需要提醒一下的是:filter 层面的意思,就是只相当于是 SQL 里的 exists 操作。所以目前对这个插件也不要抱有太大期望。今天我们来稍微演示一下。
安装和其他 ES 插件一样:
我们 bulk 上传这么一段数据:
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。 收起阅读 »
不过需要提醒一下的是:filter 层面的意思,就是只相当于是 SQL 里的 exists 操作。所以目前对这个插件也不要抱有太大期望。今天我们来稍微演示一下。
安装和其他 ES 插件一样:
# bin/plugin -i solutions.siren/siren-join/1.0
注意 siren-join v1.0 只支持 ES 1.7 版本,2.0 版本支持据说正在开发中。我们 bulk 上传这么一段数据:
{"index":{"_index":"index1","_type":"type","_id":"1"}}
{"id":1, "foreign_key":"13"}
{"index":{"_index":"index1","_type":"type","_id":"2"}}
{"id":2}
{"index":{"_index":"index1","_type":"type","_id":"3"}}
{"id":3, "foreign_key": "2"}
{"index":{"_index":"index1","_type":"type","_id":"4"}}
{"id":4, "foreign_key": "14"}
{"index":{"_index":"index1","_type":"type","_id":"5"}}
{"id":5, "foreign_key": "2"}
{"index":{"_index":"index2","_type":"type","_id":"1"}}
{"id":"1", "tag": "aaa"}
{"index":{"_index":"index2","_type":"type","_id":"2"}}
{"id":"2", "tag": "aaa"}
{"index":{"_index":"index2","_type":"type","_id":"3"}}
{"id":"3", "tag": "bbb"}
{"index":{"_index":"index2","_type":"type","_id":"4"}}
{"id":"4", "tag": "ccc"}
注意,siren-join 要求用来 join 的字段必须数据类型一致。所以,当我们要用 index2 的 id 和 index1 的foreign_key 做 join 的时候,这两个字段就要保持一致,这里为了演示,特意都改成字符串。那么我们发起一个请求如下:# curl -s -XPOST 'http://localhost:9200/index1/_coordinate_search?pretty' -d '
{
"query":{
"filtered":{
"query":{
"match_all":{}
},
"filter":{
"filterjoin":{
"foreign_key":{
"index":"index2",
"type":"type",
"path":"id",
"query":{
"terms":{
"tag":["aaa"]
}
}
}
}
}
}
},
"aggs":{
"avg":{
"avg":{
"field":"id"
}
}
}
}'
意即:从 index2 中搜索 q=tag:aaa 的数据的 id,查找 index1 中对应 foreign_key 的文档的 id 数据平均值。响应结果如下:{
"coordinate_search" : {
"actions" : [ {
"relations" : {
"from" : {
"indices" : [ ],
"types" : [ ],
"field" : "id"
},
"to" : {
"indices" : null,
"types" : null,
"field" : "foreign_key"
}
},
"size" : 2,
"size_in_bytes" : 20,
"is_pruned" : false,
"cache_hit" : true,
"took" : 0
} ]
},
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
},
"hits" : {
"total" : 2,
"max_score" : 1.0,
"hits" : [ {
"_index" : "index1",
"_type" : "type",
"_id" : "5",
"_score" : 1.0,
"_source":{"id":5, "foreign_key": "2"}
}, {
"_index" : "index1",
"_type" : "type",
"_id" : "3",
"_score" : 1.0,
"_source":{"id":3, "foreign_key": "2"}
} ]
},
"aggregations" : {
"avg" : {
"value" : 4.0
}
}
}
响应告诉我们:从 index2 中搜索到 2 条参与 join 的文档,在 index1 中命中 2 条数据,最后求平均值为 4.0。想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。 收起阅读 »

Day11: timelion请求语法
ES2.0 开始提供了一个崭新的 pipeline aggregation 特性,但是 Kibana 似乎并没有立刻跟进这方面的意思,相反,Elastic 公司推出了另一个实验室产品:Timelion。
timelion 的用法在官博里已经有介绍。尤其是最近两篇如何用 timelion 实现异常告警的文章,更是从 ES 的 pipeline aggregation 细节和场景一路讲到 timelion 具体操作,我这里几乎没有再重新讲一遍 timelion 操作入门的必要了。不过,官方却一直没有列出来 timelion 支持的请求语法的文档,而是在页面上通过点击图标的方式下拉帮助。

timelion 页面设计上,更接近 Kibana3 而不是 Kibana4。比如 panel 分布是通过设置几行几列的数目来固化的;query 框是唯一的,要修改哪个 panel 的 query,鼠标点选一下 panel,query 就自动切换成这个 panel 的了。
为了方便大家在上手之前了解 timelion 能做到什么,今天特意把 timelion 的请求语法所支持的函数分为几类,罗列如下:
可视化效果类:
数据运算类:
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。
继续阅读 »
timelion 的用法在官博里已经有介绍。尤其是最近两篇如何用 timelion 实现异常告警的文章,更是从 ES 的 pipeline aggregation 细节和场景一路讲到 timelion 具体操作,我这里几乎没有再重新讲一遍 timelion 操作入门的必要了。不过,官方却一直没有列出来 timelion 支持的请求语法的文档,而是在页面上通过点击图标的方式下拉帮助。

timelion 页面设计上,更接近 Kibana3 而不是 Kibana4。比如 panel 分布是通过设置几行几列的数目来固化的;query 框是唯一的,要修改哪个 panel 的 query,鼠标点选一下 panel,query 就自动切换成这个 panel 的了。
为了方便大家在上手之前了解 timelion 能做到什么,今天特意把 timelion 的请求语法所支持的函数分为几类,罗列如下:
可视化效果类:
.bars($width): 用柱状图展示数组
.lines($width, $fill, $show, $steps): 用折线图展示数组
.points(): 用散点图展示数组
.color("#c6c6c6"): 改变颜色
.hide(): 隐藏该数组
.label("change from %s"): 标签
.legend($position, $column): 图例位置
.yaxis($yaxis_number, $min, $max, $position): 设置 Y 轴属性,.yaxis(2) 表示第二根 Y 轴
数据运算类:
.abs(): 对整个数组元素求绝对值
.precision($number): 浮点数精度
.testcast($count, $alpha, $beta, $gamma): holt-winters 预测
.cusum($base): 数组元素之和,再加上 $base
.derivative(): 对数组求导数
.divide($divisor): 数组元素除法
.multiply($multiplier): 数组元素乘法
.subtract($term): 数组元素减法
.sum($term): 数组元素加法
.add(): 同 .sum()
.plus(): 同 .sum()
.first(): 返回第一个元素
.movingaverage($window): 用指定的窗口大小计算移动平均值
.mvavg(): .movingaverage() 的简写
.movingstd($window): 用指定的窗口大小计算移动标准差
.mvstd(): .movingstd() 的简写
数据源设定类: .elasticsearch(): 从 ES 读取数据
.es(q="querystring", metric="cardinality:uid", index="logstash-*", offset="-1d"): .elasticsearch() 的简写
.graphite(metric="path.to.*.data", offset="-1d"): 从 graphite 读取数据
.quandl(): 从 quandl.com 读取 quandl 码
.worldbank_indicators(): 从 worldbank.org 读取国家数据
.wbi(): .worldbank_indicators() 的简写
.worldbank(): 从 worldbank.org 读取数据
.wb(): .worldbanck() 的简写
以上所有函数,都在 series_functions 目录下实现,每个 js 文件实现一个 TimelionFunction 功能。想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。
ES2.0 开始提供了一个崭新的 pipeline aggregation 特性,但是 Kibana 似乎并没有立刻跟进这方面的意思,相反,Elastic 公司推出了另一个实验室产品:Timelion。
timelion 的用法在官博里已经有介绍。尤其是最近两篇如何用 timelion 实现异常告警的文章,更是从 ES 的 pipeline aggregation 细节和场景一路讲到 timelion 具体操作,我这里几乎没有再重新讲一遍 timelion 操作入门的必要了。不过,官方却一直没有列出来 timelion 支持的请求语法的文档,而是在页面上通过点击图标的方式下拉帮助。

timelion 页面设计上,更接近 Kibana3 而不是 Kibana4。比如 panel 分布是通过设置几行几列的数目来固化的;query 框是唯一的,要修改哪个 panel 的 query,鼠标点选一下 panel,query 就自动切换成这个 panel 的了。
为了方便大家在上手之前了解 timelion 能做到什么,今天特意把 timelion 的请求语法所支持的函数分为几类,罗列如下:
可视化效果类:
数据运算类:
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。 收起阅读 »
timelion 的用法在官博里已经有介绍。尤其是最近两篇如何用 timelion 实现异常告警的文章,更是从 ES 的 pipeline aggregation 细节和场景一路讲到 timelion 具体操作,我这里几乎没有再重新讲一遍 timelion 操作入门的必要了。不过,官方却一直没有列出来 timelion 支持的请求语法的文档,而是在页面上通过点击图标的方式下拉帮助。

timelion 页面设计上,更接近 Kibana3 而不是 Kibana4。比如 panel 分布是通过设置几行几列的数目来固化的;query 框是唯一的,要修改哪个 panel 的 query,鼠标点选一下 panel,query 就自动切换成这个 panel 的了。
为了方便大家在上手之前了解 timelion 能做到什么,今天特意把 timelion 的请求语法所支持的函数分为几类,罗列如下:
可视化效果类:
.bars($width): 用柱状图展示数组
.lines($width, $fill, $show, $steps): 用折线图展示数组
.points(): 用散点图展示数组
.color("#c6c6c6"): 改变颜色
.hide(): 隐藏该数组
.label("change from %s"): 标签
.legend($position, $column): 图例位置
.yaxis($yaxis_number, $min, $max, $position): 设置 Y 轴属性,.yaxis(2) 表示第二根 Y 轴
数据运算类:
.abs(): 对整个数组元素求绝对值
.precision($number): 浮点数精度
.testcast($count, $alpha, $beta, $gamma): holt-winters 预测
.cusum($base): 数组元素之和,再加上 $base
.derivative(): 对数组求导数
.divide($divisor): 数组元素除法
.multiply($multiplier): 数组元素乘法
.subtract($term): 数组元素减法
.sum($term): 数组元素加法
.add(): 同 .sum()
.plus(): 同 .sum()
.first(): 返回第一个元素
.movingaverage($window): 用指定的窗口大小计算移动平均值
.mvavg(): .movingaverage() 的简写
.movingstd($window): 用指定的窗口大小计算移动标准差
.mvstd(): .movingstd() 的简写
数据源设定类: .elasticsearch(): 从 ES 读取数据
.es(q="querystring", metric="cardinality:uid", index="logstash-*", offset="-1d"): .elasticsearch() 的简写
.graphite(metric="path.to.*.data", offset="-1d"): 从 graphite 读取数据
.quandl(): 从 quandl.com 读取 quandl 码
.worldbank_indicators(): 从 worldbank.org 读取国家数据
.wbi(): .worldbank_indicators() 的简写
.worldbank(): 从 worldbank.org 读取数据
.wb(): .worldbanck() 的简写
以上所有函数,都在 series_functions 目录下实现,每个 js 文件实现一个 TimelionFunction 功能。想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。 收起阅读 »

Day10: 如何处理数组形式的JSON日志
ELK 收集业务日志的来源,除了应用服务器以外,还有很大一部分来自客户端。考虑到客户端网络流量的因素,一般实现上都不会要求实时上报数据,而是攒一批,等到手机连上 WIFI 网络了,再统一发送出来。所以,这类客户端日志一般都有几个特点:
在处理这类数据的时候,第一关是别让数据超长直接给丢弃了(说的就是你啊,Rsyslog);第二关就是拆分 JSON 数组,把几十 MB 数据扔 ES 字段里,显然是不利于搜索和统计需求的。今天我们就来说说怎么拆分 JSON 数组。
假设收到的是这么一段日志:
继续阅读 »
- 预先已经记录成 JSON 了;
- 日志主体内容是一个巨大无比的数组,数据元素才是实际的单次日志记录;
- 一次 POST 会有几 MB 到几十 MB 大小。
在处理这类数据的时候,第一关是别让数据超长直接给丢弃了(说的就是你啊,Rsyslog);第二关就是拆分 JSON 数组,把几十 MB 数据扔 ES 字段里,显然是不利于搜索和统计需求的。今天我们就来说说怎么拆分 JSON 数组。
假设收到的是这么一段日志:
{"uid":123456,"upload_datetime":"2015-12-10 11:38:11","logs":[{"type":"crash","timestamp":"2015-12-10 17:55:00","reason":"****"},{"type":"network_error","timestamp":"2015-12-10 17:56:12","tracert":"****"}]}
首先我们知道可以在读取的时候把 JSON 数据解析成 LogStash::Event 对象:input {
tcp {
codec => json
}
}
但是怎么把解析出来的 logs 字段拆分成多个 event 呢?这里我们可以用一个已有插件:logstash-filter-split。filter {
split {
field => "logs"
}
date {
match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
remove_fields => ["logs", "timestamp"]
}
}
这样,就可以得到两个 event 了:{"uid":123456,"upload_datetime":"2015-12-10 11:38:11","type":"crash","@timestamp":"2015-12-10T09:55:00Z","reason":"****"}
{"uid":123456,"upload_datetime":"2015-12-10 11:38:11","type":"network_error","@timestamp":"2015-12-10T09:56:12Z","tracert":"****"}
看起来可能跟这个插件的文档描述不太一样。文档上写的是通过 terminator 字符,切割 field 字符串成多个 event。但实际上,field 设置是会自动判断的,如果 field 内容是字符串,就切割字符串成为数组再循环;如果内容已经是数组了,直接循环: original_value = event[@field]
if original_value.is_a?(Array)
splits = original_value
elsif original_value.is_a?(String)
splits = original_value.split(@terminator, -1)
else
raise LogStash::ConfigurationError, "Only String and Array types are splittable. field:#{@field} is of type = #{original_value.class}"
end
return if splits.length == 1
splits.each do |value|
next if value.empty?
event_split = event.clone
@logger.debug("Split event", :value => value, :field => @field)
event_split[(@target || @field)] = value
filter_matched(event_split)
yield event_split
end
event.cancel
顺带提一句:这里 yield 在 Logstash 1.5.0 之前,实现有问题,生成的新事件,不会继续执行后续 filter,直接进入到 output 阶段。也就是说,如果你用 Logstash 1.4.2 来执行上面那段配置,生成的两个事件会是这样的:{"@timestamp":"2015-12-10T09:38:13Z","uid":123456,"upload_datetime":"2015-12-10 11:38:11","type":"crash","timestamp":"2015-12-10 17:55:00","reason":"****","logs":[{"type":"crash","timestamp":"2015-12-10 17:55:00","reason":"****"},{"type":"network_error","timestamp":"2015-12-10 17:56:12","tracert":"****"}]}
{"@timestamp":"2015-12-10T09:38:13Z","uid":123456,"upload_datetime":"2015-12-10 11:38:11","type":"network_error","@timestamp":"2015-12-10 17:56:12","tracert":"****","logs":[{"type":"crash","timestamp":"2015-12-10 17:55:00","reason":"****"},{"type":"network_error","timestamp":"2015-12-10 17:56:12","tracert":"****"}]}
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。
ELK 收集业务日志的来源,除了应用服务器以外,还有很大一部分来自客户端。考虑到客户端网络流量的因素,一般实现上都不会要求实时上报数据,而是攒一批,等到手机连上 WIFI 网络了,再统一发送出来。所以,这类客户端日志一般都有几个特点:
在处理这类数据的时候,第一关是别让数据超长直接给丢弃了(说的就是你啊,Rsyslog);第二关就是拆分 JSON 数组,把几十 MB 数据扔 ES 字段里,显然是不利于搜索和统计需求的。今天我们就来说说怎么拆分 JSON 数组。
假设收到的是这么一段日志:
- 预先已经记录成 JSON 了;
- 日志主体内容是一个巨大无比的数组,数据元素才是实际的单次日志记录;
- 一次 POST 会有几 MB 到几十 MB 大小。
在处理这类数据的时候,第一关是别让数据超长直接给丢弃了(说的就是你啊,Rsyslog);第二关就是拆分 JSON 数组,把几十 MB 数据扔 ES 字段里,显然是不利于搜索和统计需求的。今天我们就来说说怎么拆分 JSON 数组。
假设收到的是这么一段日志:
{"uid":123456,"upload_datetime":"2015-12-10 11:38:11","logs":[{"type":"crash","timestamp":"2015-12-10 17:55:00","reason":"****"},{"type":"network_error","timestamp":"2015-12-10 17:56:12","tracert":"****"}]}
首先我们知道可以在读取的时候把 JSON 数据解析成 LogStash::Event 对象:input {
tcp {
codec => json
}
}
但是怎么把解析出来的 logs 字段拆分成多个 event 呢?这里我们可以用一个已有插件:logstash-filter-split。filter {
split {
field => "logs"
}
date {
match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
remove_fields => ["logs", "timestamp"]
}
}
这样,就可以得到两个 event 了:{"uid":123456,"upload_datetime":"2015-12-10 11:38:11","type":"crash","@timestamp":"2015-12-10T09:55:00Z","reason":"****"}
{"uid":123456,"upload_datetime":"2015-12-10 11:38:11","type":"network_error","@timestamp":"2015-12-10T09:56:12Z","tracert":"****"}
看起来可能跟这个插件的文档描述不太一样。文档上写的是通过 terminator 字符,切割 field 字符串成多个 event。但实际上,field 设置是会自动判断的,如果 field 内容是字符串,就切割字符串成为数组再循环;如果内容已经是数组了,直接循环: original_value = event[@field]
if original_value.is_a?(Array)
splits = original_value
elsif original_value.is_a?(String)
splits = original_value.split(@terminator, -1)
else
raise LogStash::ConfigurationError, "Only String and Array types are splittable. field:#{@field} is of type = #{original_value.class}"
end
return if splits.length == 1
splits.each do |value|
next if value.empty?
event_split = event.clone
@logger.debug("Split event", :value => value, :field => @field)
event_split[(@target || @field)] = value
filter_matched(event_split)
yield event_split
end
event.cancel
顺带提一句:这里 yield 在 Logstash 1.5.0 之前,实现有问题,生成的新事件,不会继续执行后续 filter,直接进入到 output 阶段。也就是说,如果你用 Logstash 1.4.2 来执行上面那段配置,生成的两个事件会是这样的:{"@timestamp":"2015-12-10T09:38:13Z","uid":123456,"upload_datetime":"2015-12-10 11:38:11","type":"crash","timestamp":"2015-12-10 17:55:00","reason":"****","logs":[{"type":"crash","timestamp":"2015-12-10 17:55:00","reason":"****"},{"type":"network_error","timestamp":"2015-12-10 17:56:12","tracert":"****"}]}
{"@timestamp":"2015-12-10T09:38:13Z","uid":123456,"upload_datetime":"2015-12-10 11:38:11","type":"network_error","@timestamp":"2015-12-10 17:56:12","tracert":"****","logs":[{"type":"crash","timestamp":"2015-12-10 17:55:00","reason":"****"},{"type":"network_error","timestamp":"2015-12-10 17:56:12","tracert":"****"}]}
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。 收起阅读 »

elasticsearch源码调试环境小结
前端时间折腾了一下源码调试的问题,简单总结以下。
---------------------
调试环境是window(linux理论上通用)
用到的工具类:
1:mvn:https://maven.apache.org/
elasticsearch的源码是用mvn工具管理的,根据pom.xml来下载一些依赖包非常方便。
(当然也可以用gradle,由于不太熟悉,就没研究)
安装mvn,注意配置后环境变量即可。官方文档写的很明白。
最好自己修改一下mvn的setting.xml文件中的本地repo
<!-- localRepository
| The path to the local repository maven will use to store artifacts.
|
| Default: ${user.home}/.m2/repository
<localRepository>/path/to/local/repo</localRepository>
-->
我设置成了:
<localRepository>E:/m2/repository</localRepository>
mvn -v 测试以下
2:eclipse:编辑器,应用应该还比较广泛的。我用的最新版的mars。
(intellij idea据说这是一个很牛逼的编辑器,也是因为暂时不熟悉,还没研究)
----------------------
步骤:
1: 去github上选择一个tag版本,我用的是2.1.0.
https://github.com/elastic/ela ... 2.1.0
直接DownloadZip文件即可
(也可以用git clone下来)
解压缩。
假设目录为E:/elasticsearch-2.1.0
2: 编译源代码
cmd 打开命令行
进入源文件目录 E:/elasticsearch-2.1.0
执行 mvn package命令
这个时间段耗时比较长,当然也得根据网速情况。
会出现失败,大多是因为拉取不到依赖包。可以根据提示信息,手动去下载失败的jar,然后拷贝到本地repo对应的文件夹下边即可。
等出现build success信息的时候代表成功了。
可以到core/target目录下看到elasticsearch-2.1.0-SNAPSHOT.jar。
3:转为eclipse工程
可能习惯了eclipse工程,所以这里就直接用mvn转成了eclipse的工程,生成.classpath和.project文件。
进入core目录执行以下指令
mvn eclipse:eclipse
这一步也会消耗一些时间,通常的错误也是jar包下载不成功,根据终端打印的错误信息,把对应jar包直接下载下来,放到本地的repo对应目录下边即可,然后重新运行命令。直到成功。
之后,就会发现出现了.classpath和.project文件了。
然后打开eclipse 直接带入core中的工程即可。
4: 设置运行参数
打开刚刚导入成功的工程:
Run As----Run Configution---Args
设置ProgramArgument 为 start
设置VMArgument为 -Des.path.home=E:\elasticsearch-2.1.0\core\
完毕
-------
现在就就可以运行+调试了。
继续阅读 »
---------------------
调试环境是window(linux理论上通用)
用到的工具类:
1:mvn:https://maven.apache.org/
elasticsearch的源码是用mvn工具管理的,根据pom.xml来下载一些依赖包非常方便。
(当然也可以用gradle,由于不太熟悉,就没研究)
安装mvn,注意配置后环境变量即可。官方文档写的很明白。
最好自己修改一下mvn的setting.xml文件中的本地repo
<!-- localRepository
| The path to the local repository maven will use to store artifacts.
|
| Default: ${user.home}/.m2/repository
<localRepository>/path/to/local/repo</localRepository>
-->
我设置成了:
<localRepository>E:/m2/repository</localRepository>
mvn -v 测试以下
2:eclipse:编辑器,应用应该还比较广泛的。我用的最新版的mars。
(intellij idea据说这是一个很牛逼的编辑器,也是因为暂时不熟悉,还没研究)
----------------------
步骤:
1: 去github上选择一个tag版本,我用的是2.1.0.
https://github.com/elastic/ela ... 2.1.0
直接DownloadZip文件即可
(也可以用git clone下来)
解压缩。
假设目录为E:/elasticsearch-2.1.0
2: 编译源代码
cmd 打开命令行
进入源文件目录 E:/elasticsearch-2.1.0
执行 mvn package命令
这个时间段耗时比较长,当然也得根据网速情况。
会出现失败,大多是因为拉取不到依赖包。可以根据提示信息,手动去下载失败的jar,然后拷贝到本地repo对应的文件夹下边即可。
等出现build success信息的时候代表成功了。
可以到core/target目录下看到elasticsearch-2.1.0-SNAPSHOT.jar。
3:转为eclipse工程
可能习惯了eclipse工程,所以这里就直接用mvn转成了eclipse的工程,生成.classpath和.project文件。
进入core目录执行以下指令
mvn eclipse:eclipse
这一步也会消耗一些时间,通常的错误也是jar包下载不成功,根据终端打印的错误信息,把对应jar包直接下载下来,放到本地的repo对应目录下边即可,然后重新运行命令。直到成功。
之后,就会发现出现了.classpath和.project文件了。
然后打开eclipse 直接带入core中的工程即可。
4: 设置运行参数
打开刚刚导入成功的工程:
Run As----Run Configution---Args
设置ProgramArgument 为 start
设置VMArgument为 -Des.path.home=E:\elasticsearch-2.1.0\core\
完毕
-------
现在就就可以运行+调试了。
前端时间折腾了一下源码调试的问题,简单总结以下。
---------------------
调试环境是window(linux理论上通用)
用到的工具类:
1:mvn:https://maven.apache.org/
elasticsearch的源码是用mvn工具管理的,根据pom.xml来下载一些依赖包非常方便。
(当然也可以用gradle,由于不太熟悉,就没研究)
安装mvn,注意配置后环境变量即可。官方文档写的很明白。
最好自己修改一下mvn的setting.xml文件中的本地repo
<!-- localRepository
| The path to the local repository maven will use to store artifacts.
|
| Default: ${user.home}/.m2/repository
<localRepository>/path/to/local/repo</localRepository>
-->
我设置成了:
<localRepository>E:/m2/repository</localRepository>
mvn -v 测试以下
2:eclipse:编辑器,应用应该还比较广泛的。我用的最新版的mars。
(intellij idea据说这是一个很牛逼的编辑器,也是因为暂时不熟悉,还没研究)
----------------------
步骤:
1: 去github上选择一个tag版本,我用的是2.1.0.
https://github.com/elastic/ela ... 2.1.0
直接DownloadZip文件即可
(也可以用git clone下来)
解压缩。
假设目录为E:/elasticsearch-2.1.0
2: 编译源代码
cmd 打开命令行
进入源文件目录 E:/elasticsearch-2.1.0
执行 mvn package命令
这个时间段耗时比较长,当然也得根据网速情况。
会出现失败,大多是因为拉取不到依赖包。可以根据提示信息,手动去下载失败的jar,然后拷贝到本地repo对应的文件夹下边即可。
等出现build success信息的时候代表成功了。
可以到core/target目录下看到elasticsearch-2.1.0-SNAPSHOT.jar。
3:转为eclipse工程
可能习惯了eclipse工程,所以这里就直接用mvn转成了eclipse的工程,生成.classpath和.project文件。
进入core目录执行以下指令
mvn eclipse:eclipse
这一步也会消耗一些时间,通常的错误也是jar包下载不成功,根据终端打印的错误信息,把对应jar包直接下载下来,放到本地的repo对应目录下边即可,然后重新运行命令。直到成功。
之后,就会发现出现了.classpath和.project文件了。
然后打开eclipse 直接带入core中的工程即可。
4: 设置运行参数
打开刚刚导入成功的工程:
Run As----Run Configution---Args
设置ProgramArgument 为 start
设置VMArgument为 -Des.path.home=E:\elasticsearch-2.1.0\core\
完毕
-------
现在就就可以运行+调试了。
收起阅读 »
---------------------
调试环境是window(linux理论上通用)
用到的工具类:
1:mvn:https://maven.apache.org/
elasticsearch的源码是用mvn工具管理的,根据pom.xml来下载一些依赖包非常方便。
(当然也可以用gradle,由于不太熟悉,就没研究)
安装mvn,注意配置后环境变量即可。官方文档写的很明白。
最好自己修改一下mvn的setting.xml文件中的本地repo
<!-- localRepository
| The path to the local repository maven will use to store artifacts.
|
| Default: ${user.home}/.m2/repository
<localRepository>/path/to/local/repo</localRepository>
-->
我设置成了:
<localRepository>E:/m2/repository</localRepository>
mvn -v 测试以下
2:eclipse:编辑器,应用应该还比较广泛的。我用的最新版的mars。
(intellij idea据说这是一个很牛逼的编辑器,也是因为暂时不熟悉,还没研究)
----------------------
步骤:
1: 去github上选择一个tag版本,我用的是2.1.0.
https://github.com/elastic/ela ... 2.1.0
直接DownloadZip文件即可
(也可以用git clone下来)
解压缩。
假设目录为E:/elasticsearch-2.1.0
2: 编译源代码
cmd 打开命令行
进入源文件目录 E:/elasticsearch-2.1.0
执行 mvn package命令
这个时间段耗时比较长,当然也得根据网速情况。
会出现失败,大多是因为拉取不到依赖包。可以根据提示信息,手动去下载失败的jar,然后拷贝到本地repo对应的文件夹下边即可。
等出现build success信息的时候代表成功了。
可以到core/target目录下看到elasticsearch-2.1.0-SNAPSHOT.jar。
3:转为eclipse工程
可能习惯了eclipse工程,所以这里就直接用mvn转成了eclipse的工程,生成.classpath和.project文件。
进入core目录执行以下指令
mvn eclipse:eclipse
这一步也会消耗一些时间,通常的错误也是jar包下载不成功,根据终端打印的错误信息,把对应jar包直接下载下来,放到本地的repo对应目录下边即可,然后重新运行命令。直到成功。
之后,就会发现出现了.classpath和.project文件了。
然后打开eclipse 直接带入core中的工程即可。
4: 设置运行参数
打开刚刚导入成功的工程:
Run As----Run Configution---Args
设置ProgramArgument 为 start
设置VMArgument为 -Des.path.home=E:\elasticsearch-2.1.0\core\
完毕
-------
现在就就可以运行+调试了。
收起阅读 »
jingkyks 发表于 : 2015-12-16 13:04
评论 (4)

Day9: Elasticsearch template的order
ELK Stack 在入门学习过程中,必然会碰到自己修改定制索引映射(mapping)乃至模板(template)的问题。
这时候,不少比较认真看 Logstash 文档的新用户会通过下面这段配置来制定自己的模板策略:
然而随后就发现,自己辛辛苦苦修改出来的模板,通过 curl -XGET 'http://127.0.0.1:9200/_template/myname' 看也确实上传成功了,但实际新数据索引创建出来,就是没生效!
这个原因是:Logstash 默认会上传一个名叫 logstash 的模板到 ES 里。如果你在使用上面这个配置之前,曾经运行过 Logstash(一般来说都会),那么 ES 里就已经存在这么一个模板了。你可以curl -XGET 'http://127.0.0.1:9200/_template/logstash' 验证。
这个时候,ES 里就变成有两个模板,logstash 和 myname,都匹配 logstash-* 索引名,要求设置一定的映射规则了。
ES 会按照一定的规则来尝试自动 merge 多个都匹配上了的模板规则,最终运用到索引上:https://www.elastic.co/guide/e ... lates
其中要点就是:template 是可以设置 order 参数的!而不写这个参数,默认的 order 值就是 0。order 值越大,在 merge 规则的时候优先级越高。
所以,解决这个问题的办法很简单:在你自定义的 template 里,加一行,变成这样:
继续阅读 »
这时候,不少比较认真看 Logstash 文档的新用户会通过下面这段配置来制定自己的模板策略:
output {
elasticsearch {
host => "127.0.0.1"
manage_template => true
template => "/path/to/mytemplate"
template_name => "myname"
}
}
然而随后就发现,自己辛辛苦苦修改出来的模板,通过 curl -XGET 'http://127.0.0.1:9200/_template/myname' 看也确实上传成功了,但实际新数据索引创建出来,就是没生效!
这个原因是:Logstash 默认会上传一个名叫 logstash 的模板到 ES 里。如果你在使用上面这个配置之前,曾经运行过 Logstash(一般来说都会),那么 ES 里就已经存在这么一个模板了。你可以curl -XGET 'http://127.0.0.1:9200/_template/logstash' 验证。
这个时候,ES 里就变成有两个模板,logstash 和 myname,都匹配 logstash-* 索引名,要求设置一定的映射规则了。
ES 会按照一定的规则来尝试自动 merge 多个都匹配上了的模板规则,最终运用到索引上:https://www.elastic.co/guide/e ... lates
其中要点就是:template 是可以设置 order 参数的!而不写这个参数,默认的 order 值就是 0。order 值越大,在 merge 规则的时候优先级越高。
所以,解决这个问题的办法很简单:在你自定义的 template 里,加一行,变成这样:
{
"template" : "logstash-*",
"order" : 1,
"settings" : { ... },
"mappings" : { ... }
}
当然,其实如果只从 Logstash 配置角度出发,其实更简单的办法是:直接修改原来默认的 logstash 模板,然后模板名称也不要改,就好了:output {
elasticsearch {
host => "127.0.0.1"
manage_template => true
template_overwrite => true
}
}
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。
ELK Stack 在入门学习过程中,必然会碰到自己修改定制索引映射(mapping)乃至模板(template)的问题。
这时候,不少比较认真看 Logstash 文档的新用户会通过下面这段配置来制定自己的模板策略:
然而随后就发现,自己辛辛苦苦修改出来的模板,通过 curl -XGET 'http://127.0.0.1:9200/_template/myname' 看也确实上传成功了,但实际新数据索引创建出来,就是没生效!
这个原因是:Logstash 默认会上传一个名叫 logstash 的模板到 ES 里。如果你在使用上面这个配置之前,曾经运行过 Logstash(一般来说都会),那么 ES 里就已经存在这么一个模板了。你可以curl -XGET 'http://127.0.0.1:9200/_template/logstash' 验证。
这个时候,ES 里就变成有两个模板,logstash 和 myname,都匹配 logstash-* 索引名,要求设置一定的映射规则了。
ES 会按照一定的规则来尝试自动 merge 多个都匹配上了的模板规则,最终运用到索引上:https://www.elastic.co/guide/e ... lates
其中要点就是:template 是可以设置 order 参数的!而不写这个参数,默认的 order 值就是 0。order 值越大,在 merge 规则的时候优先级越高。
所以,解决这个问题的办法很简单:在你自定义的 template 里,加一行,变成这样:
这时候,不少比较认真看 Logstash 文档的新用户会通过下面这段配置来制定自己的模板策略:
output {
elasticsearch {
host => "127.0.0.1"
manage_template => true
template => "/path/to/mytemplate"
template_name => "myname"
}
}
然而随后就发现,自己辛辛苦苦修改出来的模板,通过 curl -XGET 'http://127.0.0.1:9200/_template/myname' 看也确实上传成功了,但实际新数据索引创建出来,就是没生效!
这个原因是:Logstash 默认会上传一个名叫 logstash 的模板到 ES 里。如果你在使用上面这个配置之前,曾经运行过 Logstash(一般来说都会),那么 ES 里就已经存在这么一个模板了。你可以curl -XGET 'http://127.0.0.1:9200/_template/logstash' 验证。
这个时候,ES 里就变成有两个模板,logstash 和 myname,都匹配 logstash-* 索引名,要求设置一定的映射规则了。
ES 会按照一定的规则来尝试自动 merge 多个都匹配上了的模板规则,最终运用到索引上:https://www.elastic.co/guide/e ... lates
其中要点就是:template 是可以设置 order 参数的!而不写这个参数,默认的 order 值就是 0。order 值越大,在 merge 规则的时候优先级越高。
所以,解决这个问题的办法很简单:在你自定义的 template 里,加一行,变成这样:
{
"template" : "logstash-*",
"order" : 1,
"settings" : { ... },
"mappings" : { ... }
}
当然,其实如果只从 Logstash 配置角度出发,其实更简单的办法是:直接修改原来默认的 logstash 模板,然后模板名称也不要改,就好了:output {
elasticsearch {
host => "127.0.0.1"
manage_template => true
template_overwrite => true
}
}
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。 收起阅读 »

大家一起来写Advent吧
三斗已经写了好多篇了,分享了很多小经验和知识,非常不错,不如我们一起把Advent玩起来吧,这样,我们玩一个接力游戏,每个同学写完之后可以@论坛里面的其它同学,被@的同学需要完成一篇小的Advent,然后写完再继续接着传给下一位,如果不在论坛的同学,就邀请他加入下 :)
@的操作就是在贴子里@对方的名字,然后你想办法通知到对方就行了,QQ、论坛消息等等反正告诉对方被@和继续就行咯。
没有找到下一个接班的人就继续写下去。哈哈
写Advent,选择发表类型『文章』,分类选择『advent』,话题添加『advent』+其它的自选
下面是一篇ELK的Advent,顺便分享一下。Day 5 - ELK Operations and Administration
欢迎补充完善规则。
继续阅读 »
@的操作就是在贴子里@对方的名字,然后你想办法通知到对方就行了,QQ、论坛消息等等反正告诉对方被@和继续就行咯。
没有找到下一个接班的人就继续写下去。哈哈
写Advent,选择发表类型『文章』,分类选择『advent』,话题添加『advent』+其它的自选
下面是一篇ELK的Advent,顺便分享一下。Day 5 - ELK Operations and Administration
欢迎补充完善规则。
三斗已经写了好多篇了,分享了很多小经验和知识,非常不错,不如我们一起把Advent玩起来吧,这样,我们玩一个接力游戏,每个同学写完之后可以@论坛里面的其它同学,被@的同学需要完成一篇小的Advent,然后写完再继续接着传给下一位,如果不在论坛的同学,就邀请他加入下 :)
@的操作就是在贴子里@对方的名字,然后你想办法通知到对方就行了,QQ、论坛消息等等反正告诉对方被@和继续就行咯。
没有找到下一个接班的人就继续写下去。哈哈
写Advent,选择发表类型『文章』,分类选择『advent』,话题添加『advent』+其它的自选
下面是一篇ELK的Advent,顺便分享一下。Day 5 - ELK Operations and Administration
欢迎补充完善规则。
收起阅读 »
@的操作就是在贴子里@对方的名字,然后你想办法通知到对方就行了,QQ、论坛消息等等反正告诉对方被@和继续就行咯。
没有找到下一个接班的人就继续写下去。哈哈
写Advent,选择发表类型『文章』,分类选择『advent』,话题添加『advent』+其它的自选
下面是一篇ELK的Advent,顺便分享一下。Day 5 - ELK Operations and Administration
欢迎补充完善规则。
收起阅读 »

自建的索引没数据?
ELK的架构:
logstash==>redis==>logstash==>elasticsearch==>kibana开始我自己在ES上建索引,
建索引语句如下:
curl -XPUT "http://localhost:9200/qn-service" -d '{"mappings":{"_default_":{"properties":{"speaker":{"type":"string","index":"not_analyzed"},"play_name":{"type":"string","index":"not_analyzed"},"line_id":{"type":"integer"},"speech_number":{"type":"integer"}}}}}'
然后通过logstash导数据到ES后,却发现查询不到数据,然后用
curl http://localhost:9200/_cat/indices?v 命令发现索引的数据为空;
发现es自动建的索引有数据,而我自己的索引数据为空。
找了半天原因没找到,然后就将es中得数据删除,
curl -XDELETE *[/url]
用上述方法重建索引;
然后按照书上《ELK权威指南》上得方法,直接导入数据到es,
curl -XPUT http://localhost:9200/_bulk --data-binary @shakespeare.json
却发现自己建的索引还是没有数据,es却多了一个叫shakespeare得索引,这个索引中有数据,那么我有两点疑问1:为什么我用书上建索引的方法建立索引(shakespeare名字被我改成qn-service)却没有数据?
2:shakespeare这个索引是哪里来得?
logstash shipper.conf
input {
file {
path => ["/data/logs/superErpLog/trace/shakespeare.json"]
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter{
json{
source=>"message"
remove_field => ["message"]
}
}
output {
stdout{}
redis {
host => "localhost"
port => 6379
data_type => "list"
key => "performance"
}
}
logstash center.conf
input {
redis {
host => "localhost"
port => 6379
type => "redis-input"
data_type => "list"
key => "performance"
}
}
output {
stdout {}
elasticsearch {
cluster => "elasticsearch"
host => "localhost"
port => 9200
codec => "json"
protocol => "http"
}
}
继续阅读 »
logstash==>redis==>logstash==>elasticsearch==>kibana开始我自己在ES上建索引,
建索引语句如下:
curl -XPUT "http://localhost:9200/qn-service" -d '{"mappings":{"_default_":{"properties":{"speaker":{"type":"string","index":"not_analyzed"},"play_name":{"type":"string","index":"not_analyzed"},"line_id":{"type":"integer"},"speech_number":{"type":"integer"}}}}}'
然后通过logstash导数据到ES后,却发现查询不到数据,然后用
curl http://localhost:9200/_cat/indices?v 命令发现索引的数据为空;
发现es自动建的索引有数据,而我自己的索引数据为空。
找了半天原因没找到,然后就将es中得数据删除,
curl -XDELETE *[/url]
用上述方法重建索引;
然后按照书上《ELK权威指南》上得方法,直接导入数据到es,
curl -XPUT http://localhost:9200/_bulk --data-binary @shakespeare.json
却发现自己建的索引还是没有数据,es却多了一个叫shakespeare得索引,这个索引中有数据,那么我有两点疑问1:为什么我用书上建索引的方法建立索引(shakespeare名字被我改成qn-service)却没有数据?
2:shakespeare这个索引是哪里来得?
logstash shipper.conf
input {
file {
path => ["/data/logs/superErpLog/trace/shakespeare.json"]
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter{
json{
source=>"message"
remove_field => ["message"]
}
}
output {
stdout{}
redis {
host => "localhost"
port => 6379
data_type => "list"
key => "performance"
}
}
logstash center.conf
input {
redis {
host => "localhost"
port => 6379
type => "redis-input"
data_type => "list"
key => "performance"
}
}
output {
stdout {}
elasticsearch {
cluster => "elasticsearch"
host => "localhost"
port => 9200
codec => "json"
protocol => "http"
}
}
ELK的架构:
logstash==>redis==>logstash==>elasticsearch==>kibana开始我自己在ES上建索引,
建索引语句如下:
curl -XPUT "http://localhost:9200/qn-service" -d '{"mappings":{"_default_":{"properties":{"speaker":{"type":"string","index":"not_analyzed"},"play_name":{"type":"string","index":"not_analyzed"},"line_id":{"type":"integer"},"speech_number":{"type":"integer"}}}}}'
然后通过logstash导数据到ES后,却发现查询不到数据,然后用
curl http://localhost:9200/_cat/indices?v 命令发现索引的数据为空;
发现es自动建的索引有数据,而我自己的索引数据为空。
找了半天原因没找到,然后就将es中得数据删除,
curl -XDELETE *[/url]
用上述方法重建索引;
然后按照书上《ELK权威指南》上得方法,直接导入数据到es,
curl -XPUT http://localhost:9200/_bulk --data-binary @shakespeare.json
却发现自己建的索引还是没有数据,es却多了一个叫shakespeare得索引,这个索引中有数据,那么我有两点疑问1:为什么我用书上建索引的方法建立索引(shakespeare名字被我改成qn-service)却没有数据?
2:shakespeare这个索引是哪里来得?
logstash shipper.conf
input {
file {
path => ["/data/logs/superErpLog/trace/shakespeare.json"]
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter{
json{
source=>"message"
remove_field => ["message"]
}
}
output {
stdout{}
redis {
host => "localhost"
port => 6379
data_type => "list"
key => "performance"
}
}
logstash center.conf
input {
redis {
host => "localhost"
port => 6379
type => "redis-input"
data_type => "list"
key => "performance"
}
}
output {
stdout {}
elasticsearch {
cluster => "elasticsearch"
host => "localhost"
port => 9200
codec => "json"
protocol => "http"
}
} 收起阅读 »
logstash==>redis==>logstash==>elasticsearch==>kibana开始我自己在ES上建索引,
建索引语句如下:
curl -XPUT "http://localhost:9200/qn-service" -d '{"mappings":{"_default_":{"properties":{"speaker":{"type":"string","index":"not_analyzed"},"play_name":{"type":"string","index":"not_analyzed"},"line_id":{"type":"integer"},"speech_number":{"type":"integer"}}}}}'
然后通过logstash导数据到ES后,却发现查询不到数据,然后用
curl http://localhost:9200/_cat/indices?v 命令发现索引的数据为空;
发现es自动建的索引有数据,而我自己的索引数据为空。
找了半天原因没找到,然后就将es中得数据删除,
curl -XDELETE *[/url]
用上述方法重建索引;
然后按照书上《ELK权威指南》上得方法,直接导入数据到es,
curl -XPUT http://localhost:9200/_bulk --data-binary @shakespeare.json
却发现自己建的索引还是没有数据,es却多了一个叫shakespeare得索引,这个索引中有数据,那么我有两点疑问1:为什么我用书上建索引的方法建立索引(shakespeare名字被我改成qn-service)却没有数据?
2:shakespeare这个索引是哪里来得?
logstash shipper.conf
input {
file {
path => ["/data/logs/superErpLog/trace/shakespeare.json"]
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter{
json{
source=>"message"
remove_field => ["message"]
}
}
output {
stdout{}
redis {
host => "localhost"
port => 6379
data_type => "list"
key => "performance"
}
}
logstash center.conf
input {
redis {
host => "localhost"
port => 6379
type => "redis-input"
data_type => "list"
key => "performance"
}
}
output {
stdout {}
elasticsearch {
cluster => "elasticsearch"
host => "localhost"
port => 9200
codec => "json"
protocol => "http"
}
} 收起阅读 »

Day8:隐藏仪表盘的菜单栏
Kibana4 上线后,又有同事找过来。还好这次是小问题:『新版的这个仪表盘顶部菜单栏太宽了啊。头顶上监控屏幕空间有限,能不能省省?』
跟 Kibana3 相比,确实宽了点。这时候好几个方案瞬间进入我脑子里:
不过等打开 chrome.html 看了一下,发现 navbar 本身是有相关的隐藏判断的:
好了,我们试一下,把 http://localhost:5601/app/kiba ... ydash 改成http://localhost:5601/app/kiba ... embed,回车,果然,整个菜单栏都消失了!同步消失的还有每个 panel 的编辑按钮。
其实呢,embed 在页面上是有说明的,在 dashboard 的 share 连接里,提供了一个 iframe 分享方式,iframe 里使用的,就是 embed 链接!
注意:Kibana4 部分版本的 share 说明中的 embed 位置生成的有问题,请小心。
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。
继续阅读 »
跟 Kibana3 相比,确实宽了点。这时候好几个方案瞬间进入我脑子里:
- 浏览器往下拖动一点,不过要确保定期刷新的时候还能回到拖动位置;
- 进 ui/public/chrome/chrome.html 里把 navbar 干掉;
- 添加一个 bootstrap 效果,navbar 默认隐藏,鼠标挪上去自动浮现。
不过等打开 chrome.html 看了一下,发现 navbar 本身是有相关的隐藏判断的:
<nav
ng-style="::{ background: chrome.getNavBackground() }"
ng-class="{ show: chrome.getVisible() }"
class="hide navbar navbar-inverse navbar-static-top">
这个设置在 ui/public/chrome/api/angular.js 里的 internals.setVisibleDefault(!$location.search().embed);。我们知道 $locatio.search() 是 AngularJS 的标准用法,这里也就是代表 URL 请求参数里是否有 ?embed 选项。好了,我们试一下,把 http://localhost:5601/app/kiba ... ydash 改成http://localhost:5601/app/kiba ... embed,回车,果然,整个菜单栏都消失了!同步消失的还有每个 panel 的编辑按钮。
其实呢,embed 在页面上是有说明的,在 dashboard 的 share 连接里,提供了一个 iframe 分享方式,iframe 里使用的,就是 embed 链接!
注意:Kibana4 部分版本的 share 说明中的 embed 位置生成的有问题,请小心。
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。
Kibana4 上线后,又有同事找过来。还好这次是小问题:『新版的这个仪表盘顶部菜单栏太宽了啊。头顶上监控屏幕空间有限,能不能省省?』
跟 Kibana3 相比,确实宽了点。这时候好几个方案瞬间进入我脑子里:
不过等打开 chrome.html 看了一下,发现 navbar 本身是有相关的隐藏判断的:
好了,我们试一下,把 http://localhost:5601/app/kiba ... ydash 改成http://localhost:5601/app/kiba ... embed,回车,果然,整个菜单栏都消失了!同步消失的还有每个 panel 的编辑按钮。
其实呢,embed 在页面上是有说明的,在 dashboard 的 share 连接里,提供了一个 iframe 分享方式,iframe 里使用的,就是 embed 链接!
注意:Kibana4 部分版本的 share 说明中的 embed 位置生成的有问题,请小心。
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。 收起阅读 »
跟 Kibana3 相比,确实宽了点。这时候好几个方案瞬间进入我脑子里:
- 浏览器往下拖动一点,不过要确保定期刷新的时候还能回到拖动位置;
- 进 ui/public/chrome/chrome.html 里把 navbar 干掉;
- 添加一个 bootstrap 效果,navbar 默认隐藏,鼠标挪上去自动浮现。
不过等打开 chrome.html 看了一下,发现 navbar 本身是有相关的隐藏判断的:
<nav
ng-style="::{ background: chrome.getNavBackground() }"
ng-class="{ show: chrome.getVisible() }"
class="hide navbar navbar-inverse navbar-static-top">
这个设置在 ui/public/chrome/api/angular.js 里的 internals.setVisibleDefault(!$location.search().embed);。我们知道 $locatio.search() 是 AngularJS 的标准用法,这里也就是代表 URL 请求参数里是否有 ?embed 选项。好了,我们试一下,把 http://localhost:5601/app/kiba ... ydash 改成http://localhost:5601/app/kiba ... embed,回车,果然,整个菜单栏都消失了!同步消失的还有每个 panel 的编辑按钮。
其实呢,embed 在页面上是有说明的,在 dashboard 的 share 连接里,提供了一个 iframe 分享方式,iframe 里使用的,就是 embed 链接!
注意:Kibana4 部分版本的 share 说明中的 embed 位置生成的有问题,请小心。
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。 收起阅读 »

Day7: hangout 替代 logstash-input-kafka
用 Logstash 接收 Kafka 里的业务日志再写入 Elasticsearch 已经成为一个常见的选择。但是大多数人随后就会碰到一个问题:logstash-input-kafka 的性能上不去!
这个问题,主要是由于 Logstash 用 JRuby 实现,所以数据从 Kafka 下来到最后流转进 Logstash 里,要经过四五次 Ruby 和 Java 之间的数据结构转换,大大浪费和消耗了 CPU 资源。作为优化,我们可以通过修改默认的 logstash-input-kafka 的 codec 配置为 line,把 Jrjackson 处理流程挪到 logstash-filter-json 里多线程处理,但是也只能提高一倍性能而已。
Logstash 开发组目前也在实现纯 Java 版的 logstash-core-event,但是最终能提高多少,也是未知数。
那么在 Logstash 性能提上去之前,围绕 Kafka 还有什么办法能高效又不失灵活的做到数据处理并写入 Elasticsearch 呢?今天给大家推荐一下携程网开源的 hangout。
hangout 采用 YAML 格式配置语法,跟 Elasticsearch 一样,省去了 Logstash 解析 DSL 的复杂度。下面一段配置是 repo 中自带的 example 示例:
实际运行下来,hangout 比 Logstash 确实在处理能力,尤其是 CPU 资源消耗方面,性价比要高出很多。
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。
继续阅读 »
这个问题,主要是由于 Logstash 用 JRuby 实现,所以数据从 Kafka 下来到最后流转进 Logstash 里,要经过四五次 Ruby 和 Java 之间的数据结构转换,大大浪费和消耗了 CPU 资源。作为优化,我们可以通过修改默认的 logstash-input-kafka 的 codec 配置为 line,把 Jrjackson 处理流程挪到 logstash-filter-json 里多线程处理,但是也只能提高一倍性能而已。
Logstash 开发组目前也在实现纯 Java 版的 logstash-core-event,但是最终能提高多少,也是未知数。
那么在 Logstash 性能提上去之前,围绕 Kafka 还有什么办法能高效又不失灵活的做到数据处理并写入 Elasticsearch 呢?今天给大家推荐一下携程网开源的 hangout。
hangout 采用 YAML 格式配置语法,跟 Elasticsearch 一样,省去了 Logstash 解析 DSL 的复杂度。下面一段配置是 repo 中自带的 example 示例:
inputs:
- Kafka:
codec: plain
encoding: UTF8 # defaut UTF8
topic:
app: 2
consumer_settings:
group.id: hangout
zookeeper.connect: 192.168.1.200:2181
auto.commit.interval.ms: "1000"
socket.receive.buffer.bytes: "1048576"
fetch.message.max.bytes: "1048576"
num.consumer.fetchers: "4"
- Kafka:
codec: json
topic:
web: 1
consumer_settings:
group.id: hangout
zookeeper.connect: 192.168.1.201:2181
auto.commit.interval.ms: "5000"
filters:
- Grok:
match:
- '^(?<logtime>\S+) (?<user>.+) (-|(?<level>\w+)) %{DATA:msg}$'
remove_fields: ['message']
- Add:
fields:
test: 'abcd'
if:
- '<#if message??>true</#if>'
- '<#if message?contains("liu")>true<#elseif message?contains("warn")>true</#if>'
- Date:
src: logtime
formats:
- 'ISO8601'
remove_fields: ['logtime']
- Lowercase:
fields: ['user']
- Add:
fields:
me: 'I am ${user}'
- Remove:
fields:
- logtime
- Trim:
fields:
- user
- Rename:
fields:
me: he
user: she
- Gsub:
fields:
she: ['c','CCC']
he: ['(^\w+)|(\w+$)','XXX']
- Translate:
source: user
target: nick
dictionary_path: /tmp/app.dic
- KV:
source: msg
target: kv
field_split: ' '
value_split: '='
trim: '\t\"'
trimkey: '\"'
include_keys: ["a","b","xyz","12"]
exclude_keys: ["b","c"] # b in excluded
tag_on_failure: "KVfail"
remove_fields: ['msg']
- Convert:
fields:
cs_bytes: integer
time_taken: float
- URLDecode:
fields: ["query1","query2"]
outputs:
- Stdout:
if:
- '<#if user=="childe">true</#if>'
- Elasticsearch:
cluster: hangoutcluster
hosts:
- 192.168.1.200
index: 'hangout-%{user}-%{+YYYY.MM.dd}'
index_type: logs # default logs
bulk_actions: 20000 #default 20000
bulk_size: 15 # default 15 MB
flush_interval: 10 # default 10 seconds
concurrent_requests: 0 # default 0, concurrent_requests设置成大于0的数, 意思着多线程处理, 以我应用的经验,还有是一定OOM风险的,强烈建议设置为0
- Kafka:
broker_list: 192.168.1.200:9092
topic: test2
其 pipeline 设计和 Logstash 不同的是:整个 filter 和 output 流程,都在 Kafka 的 consumer 线程中完成。所以,并发线程数完全是有 Kafka 的 partitions 设置来控制的。实际运行下来,hangout 比 Logstash 确实在处理能力,尤其是 CPU 资源消耗方面,性价比要高出很多。
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。
用 Logstash 接收 Kafka 里的业务日志再写入 Elasticsearch 已经成为一个常见的选择。但是大多数人随后就会碰到一个问题:logstash-input-kafka 的性能上不去!
这个问题,主要是由于 Logstash 用 JRuby 实现,所以数据从 Kafka 下来到最后流转进 Logstash 里,要经过四五次 Ruby 和 Java 之间的数据结构转换,大大浪费和消耗了 CPU 资源。作为优化,我们可以通过修改默认的 logstash-input-kafka 的 codec 配置为 line,把 Jrjackson 处理流程挪到 logstash-filter-json 里多线程处理,但是也只能提高一倍性能而已。
Logstash 开发组目前也在实现纯 Java 版的 logstash-core-event,但是最终能提高多少,也是未知数。
那么在 Logstash 性能提上去之前,围绕 Kafka 还有什么办法能高效又不失灵活的做到数据处理并写入 Elasticsearch 呢?今天给大家推荐一下携程网开源的 hangout。
hangout 采用 YAML 格式配置语法,跟 Elasticsearch 一样,省去了 Logstash 解析 DSL 的复杂度。下面一段配置是 repo 中自带的 example 示例:
实际运行下来,hangout 比 Logstash 确实在处理能力,尤其是 CPU 资源消耗方面,性价比要高出很多。
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。 收起阅读 »
这个问题,主要是由于 Logstash 用 JRuby 实现,所以数据从 Kafka 下来到最后流转进 Logstash 里,要经过四五次 Ruby 和 Java 之间的数据结构转换,大大浪费和消耗了 CPU 资源。作为优化,我们可以通过修改默认的 logstash-input-kafka 的 codec 配置为 line,把 Jrjackson 处理流程挪到 logstash-filter-json 里多线程处理,但是也只能提高一倍性能而已。
Logstash 开发组目前也在实现纯 Java 版的 logstash-core-event,但是最终能提高多少,也是未知数。
那么在 Logstash 性能提上去之前,围绕 Kafka 还有什么办法能高效又不失灵活的做到数据处理并写入 Elasticsearch 呢?今天给大家推荐一下携程网开源的 hangout。
hangout 采用 YAML 格式配置语法,跟 Elasticsearch 一样,省去了 Logstash 解析 DSL 的复杂度。下面一段配置是 repo 中自带的 example 示例:
inputs:
- Kafka:
codec: plain
encoding: UTF8 # defaut UTF8
topic:
app: 2
consumer_settings:
group.id: hangout
zookeeper.connect: 192.168.1.200:2181
auto.commit.interval.ms: "1000"
socket.receive.buffer.bytes: "1048576"
fetch.message.max.bytes: "1048576"
num.consumer.fetchers: "4"
- Kafka:
codec: json
topic:
web: 1
consumer_settings:
group.id: hangout
zookeeper.connect: 192.168.1.201:2181
auto.commit.interval.ms: "5000"
filters:
- Grok:
match:
- '^(?<logtime>\S+) (?<user>.+) (-|(?<level>\w+)) %{DATA:msg}$'
remove_fields: ['message']
- Add:
fields:
test: 'abcd'
if:
- '<#if message??>true</#if>'
- '<#if message?contains("liu")>true<#elseif message?contains("warn")>true</#if>'
- Date:
src: logtime
formats:
- 'ISO8601'
remove_fields: ['logtime']
- Lowercase:
fields: ['user']
- Add:
fields:
me: 'I am ${user}'
- Remove:
fields:
- logtime
- Trim:
fields:
- user
- Rename:
fields:
me: he
user: she
- Gsub:
fields:
she: ['c','CCC']
he: ['(^\w+)|(\w+$)','XXX']
- Translate:
source: user
target: nick
dictionary_path: /tmp/app.dic
- KV:
source: msg
target: kv
field_split: ' '
value_split: '='
trim: '\t\"'
trimkey: '\"'
include_keys: ["a","b","xyz","12"]
exclude_keys: ["b","c"] # b in excluded
tag_on_failure: "KVfail"
remove_fields: ['msg']
- Convert:
fields:
cs_bytes: integer
time_taken: float
- URLDecode:
fields: ["query1","query2"]
outputs:
- Stdout:
if:
- '<#if user=="childe">true</#if>'
- Elasticsearch:
cluster: hangoutcluster
hosts:
- 192.168.1.200
index: 'hangout-%{user}-%{+YYYY.MM.dd}'
index_type: logs # default logs
bulk_actions: 20000 #default 20000
bulk_size: 15 # default 15 MB
flush_interval: 10 # default 10 seconds
concurrent_requests: 0 # default 0, concurrent_requests设置成大于0的数, 意思着多线程处理, 以我应用的经验,还有是一定OOM风险的,强烈建议设置为0
- Kafka:
broker_list: 192.168.1.200:9092
topic: test2
其 pipeline 设计和 Logstash 不同的是:整个 filter 和 output 流程,都在 Kafka 的 consumer 线程中完成。所以,并发线程数完全是有 Kafka 的 partitions 设置来控制的。实际运行下来,hangout 比 Logstash 确实在处理能力,尤其是 CPU 资源消耗方面,性价比要高出很多。
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。 收起阅读 »