elasticsearch nested 查询巨慢
Elasticsearch • 三斗室 回复了问题 • 3 人关注 • 1 个回复 • 11396 次浏览 • 2015-12-19 21:04
ELK中elasticsearch如何关闭throttling indexing
Elasticsearch • hivefans 回复了问题 • 5 人关注 • 5 个回复 • 11149 次浏览 • 2016-04-11 14:29
day16 logstash-forwader To Kakfa!
Advent • childe 发表了文章 • 1 个评论 • 5137 次浏览 • 2015-12-18 16:37
在日志收集系统中, 从kafkf到ES这条路是没问题了, 但散布在各个服务器上采集日志的agent用logstash实在是太重了, 而且效率也低. 特别是我们有大量的windows服务器, 找一个合适的agent居然不是想象中的容易.
logstash-forwarder对于日志文件的探测和offset记录, deadtime等配置都非常适合我们, 但惟一不支持吐数据到kafak,对我们来说是一个遗憾. 我和oliver(https://github.com/oliveagle)做过一点改造之后, 让她支持了这个功能.
目前我们所有iis服务器已经部署了这个应用, 效率高, 占资源小, 可以数据压缩, 支持简单的格式切割, 实乃windows居家必备(我真不是来打广告的). golang客户端, 还能直接发送到kafka, 想想就很贴心~
贴上一段配置瞅瞅先, 启一个进程采集nginx和tomcat日志, 分别吐到kafka的2个topic中.
{
"files": [
{
"paths": [
"/var/log/nginx/*.log"
],
"Fields":{
"type":"nginx"
},
"DeadTime": "30m"
},
{
"paths": [
"/var/log/tomcat/*.log",
"/var/log/tomcat/*/*.log"
],
"Fields":{
"type":"tomcat"
},
"DeadTime": "30m"
}
],
"kafka": {
"broker_list": ["10.0.0.1:9092","10.0.0.2:9092"],
"topic_id": "topic_name_change_it_{{.type}}",
"compression_codec": "gzip"
}
}
再简单介绍一下参数吧,
- DeadTime:30m 是说超过30分钟没有更新, 就不会再继续跟踪这个文件了(退出goroutine)
- “Fields”:{ “type”:”tomcat” } , 会在每条日志中增加配置的字段
- path目前就是用的golang官方库, 好像是还不支持递归多层目录查找, 反正我翻了一下文档, 没有找到.
grok还不支持, 但简单的分割是可以的
"files": [
{
"paths": [
"d:\\target.txt"
],
"FieldNames": ["datetime", "datetime", "s_ip", "cs_method", "cs_uri_stem", "cs_uri_query", "s_port", "time_taken"],
"Delimiter": "\\s+",
"QuoteChar": "\""
}
]
以上配置就是说按空白符把日志切割来, 塞到对应的字段中去. 第一个第二个合在一起, 放在datetime字段中.
其实还是有不少要完善的地方, 比如说没有带上机器的Hostname, 以及日志的路径. 在很多时候, 这些信息还是很有用的, 我们也会继续完善.
现在放在了https://github.com/childe/logs ... kafka, 有需要的同学,可以去看下.
查询cdn日志报错,单term日志大于32K,看报错是成功写入了es,但是,不能查询出来
Elasticsearch • samir 回复了问题 • 2 人关注 • 1 个回复 • 9630 次浏览 • 2016-01-20 14:35
Day15:Beats是什么东西?
Advent • medcl 发表了文章 • 5 个评论 • 11391 次浏览 • 2015-12-17 22:34
https://pan.baidu.com/s/1eS157 ... -6-18
事实上Beats是一系列产品的统称,属于ElasticStack里面收集数据的这一层:Data Shipper Layer,包括以下若干Beats:
- PacketBeat,用来嗅探和分析网络流量,如HTTP、MySQL、Redis等
- TopBeat,用来收集系统的监控信息,功能如其名,类似*nix下的top命令,只不过所有的信息都会发送给后端的集中存储:Elasticsearch,这样你就可以很方便的监控所有的服务器的运行情况了
- FileBeat,用来收集数据源是文件的数据,比如常见的系统日志、应用日志、网站日志等等,FIleBeat思路来自Logstash-forwarder,Beats团队加入之后重构改写而成,解决的就是Logstash作为Agent采集时占用太多被收集系统资源的问题,Beats家族都是Golang编写,效率高,占用内存和CPU比较少,非常适合作为agent跑着服务器上
- 。。。
所以Beats其实是一套框架,另外的一个子项目Libbeat,就是所有beats都共用的模块,封装了所有的公共的组件,如配置管理、公共基础类、协议的解析处理、与Elasticsearch的操作等等,你可以很方便基于它实现你自己的beats,这也是Beats的目标,希望将来会出现更多的Beats,做各种各样的事情。
另外PacketBeat比较特殊,它又是网络协议抓包和处理的一个框架,目前支持了常见的一些协议,要扩展未知的协议其实非常简单,PacketBeat作为一个框架,数据抓包和后续的存储已经帮你处理好了,你只需要实现你的协议的解码操作就行了,当然这块也是最难和最业务相关的。
关于PacketBeat我回头再单独写一篇文章来介绍怎样编写一个PacketBeat的协议扩展吧,PacketBeat扩展的其它协议最终还是需要和PacketBeat集成在一起,也就是最终你的代码是要和PacketBeat的代码在一个工程里面的,而其它的Beats使用Libbeat完全是单独的Beat,如Filebeat和TopBeat,完全是独立打包和独立运行,这个也是两大Beats的主要区别。
随便提一下,现在所有的这些Beats已经合并到一个项目里面来方便管理了,golang,you know:https://github.com/elastic/beats
现在社区已经提交了的Beats:
https://www.elastic.co/guide/e ... .html
明后天在Beijing的ArchSummit2015,我将在Elastic展台,欢迎过来骚扰,领取Elastic的各种贴纸,还有限量的印有Elastic的T恤,数量有限哦
今天的Advent就这些吧。
Advent接力活动,规则:http://elasticsearch.cn/article/20
elasticsearch recovery问题
Elasticsearch • jingkyks 回复了问题 • 2 人关注 • 2 个回复 • 11720 次浏览 • 2015-12-21 10:18
Day14: percolator接口在logstash中的运用
Advent • 三斗室 发表了文章 • 0 个评论 • 6063 次浏览 • 2015-12-16 23:10
而单纯从 Logstash 来做实时过滤报警,规则又不是很灵活。toplog.io 公司开发了一个 logstash-output-percolator插件,在有一定既定条件的情况下,成功运用上了 Percolator 方案。
这个插件的设计逻辑是:
- 通过 logstash-filter-checksum 自主生成 ES 文档的 _id;
- 使用上一步生成的 _id 同时发送 logstash-output-elasticsearch 和 logstash-output-percolator
- Percolator 接口一旦过滤成功,将 _id 发送给 Redis 服务器
- 其他系统从 Redis 服务器中获取 _id 即可从 ES 里拿到实际数据
Percolator 接口的用法简单说是这样:
创建接口:
curl -XPUT 'localhost:9200/patterns/.percolator/my-pattern-id' -d '{"query" : {"match" : {"message" : "ERROR"} } }'
过滤测试:curl -XGET 'localhost:9200/my-index/my-type/_percolate' -d '{"doc" : {"message" : "ERROR: Service Apache failed to connect to MySQL"} }'
要点就是把文档放在 doc 属性里发送到 _percolate 里。对应的 Logstash 配置如下:
filter {
checksum {
algorithm => "md5"
keys => ["message"]
}
}
output {
elasticsearch {
host => "localhost"
cluster => "my-cluster"
document_id => "%{logstash_checksum}"
index => "my-index"
}
percolator {
host => "es-balancer"
redis_host => ["localhost"]
document_id => "%{logstash_checksum}"
pattern_index => "patterns"
}
}
连接上对应的 Redis,就可以看到报警信息了:$ redis-cli
127.0.0.1:6379> lrange percolator 0 1
1) "{\"matches\":[\"2\"],\"document_id\":\"a5d5c5f69b26ac0597370c9b1e7a8111\"}"
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。 Day13: ipip.net介绍
Advent • 三斗室 发表了文章 • 1 个评论 • 7507 次浏览 • 2015-12-16 23:05
用法很简单:
filter {
ipip {
source => "clientip"
target => "ipip"
}
}
生成的 JSON 数据结构类似下面这样:{
"clientip" : "",
"ipip" : {
"country" : "",
"city" : "",
"carrier" : "",
"province" : ""
}
}
不过这个插件只实现了收费版的数据库基础格式。免费版的支持,收费版高级的经纬度、基站位置等,都没有随着更新。事实上,我们可以通过 ipip 官方的 Java 库,实现一个更灵活的 logstash-filter-ipip_java 插件出来,下期见。想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。
Day12: siren-join简介
Advent • 三斗室 发表了文章 • 0 个评论 • 8142 次浏览 • 2015-12-16 23:03
不过需要提醒一下的是:filter 层面的意思,就是只相当于是 SQL 里的 exists 操作。所以目前对这个插件也不要抱有太大期望。今天我们来稍微演示一下。
安装和其他 ES 插件一样:
# bin/plugin -i solutions.siren/siren-join/1.0
注意 siren-join v1.0 只支持 ES 1.7 版本,2.0 版本支持据说正在开发中。我们 bulk 上传这么一段数据:
{"index":{"_index":"index1","_type":"type","_id":"1"}}
{"id":1, "foreign_key":"13"}
{"index":{"_index":"index1","_type":"type","_id":"2"}}
{"id":2}
{"index":{"_index":"index1","_type":"type","_id":"3"}}
{"id":3, "foreign_key": "2"}
{"index":{"_index":"index1","_type":"type","_id":"4"}}
{"id":4, "foreign_key": "14"}
{"index":{"_index":"index1","_type":"type","_id":"5"}}
{"id":5, "foreign_key": "2"}
{"index":{"_index":"index2","_type":"type","_id":"1"}}
{"id":"1", "tag": "aaa"}
{"index":{"_index":"index2","_type":"type","_id":"2"}}
{"id":"2", "tag": "aaa"}
{"index":{"_index":"index2","_type":"type","_id":"3"}}
{"id":"3", "tag": "bbb"}
{"index":{"_index":"index2","_type":"type","_id":"4"}}
{"id":"4", "tag": "ccc"}
注意,siren-join 要求用来 join 的字段必须数据类型一致。所以,当我们要用 index2 的 id 和 index1 的foreign_key 做 join 的时候,这两个字段就要保持一致,这里为了演示,特意都改成字符串。那么我们发起一个请求如下:# curl -s -XPOST 'http://localhost:9200/index1/_coordinate_search?pretty' -d '
{
"query":{
"filtered":{
"query":{
"match_all":{}
},
"filter":{
"filterjoin":{
"foreign_key":{
"index":"index2",
"type":"type",
"path":"id",
"query":{
"terms":{
"tag":["aaa"]
}
}
}
}
}
}
},
"aggs":{
"avg":{
"avg":{
"field":"id"
}
}
}
}'
意即:从 index2 中搜索 q=tag:aaa 的数据的 id,查找 index1 中对应 foreign_key 的文档的 id 数据平均值。响应结果如下:{
"coordinate_search" : {
"actions" : [ {
"relations" : {
"from" : {
"indices" : [ ],
"types" : [ ],
"field" : "id"
},
"to" : {
"indices" : null,
"types" : null,
"field" : "foreign_key"
}
},
"size" : 2,
"size_in_bytes" : 20,
"is_pruned" : false,
"cache_hit" : true,
"took" : 0
} ]
},
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
},
"hits" : {
"total" : 2,
"max_score" : 1.0,
"hits" : [ {
"_index" : "index1",
"_type" : "type",
"_id" : "5",
"_score" : 1.0,
"_source":{"id":5, "foreign_key": "2"}
}, {
"_index" : "index1",
"_type" : "type",
"_id" : "3",
"_score" : 1.0,
"_source":{"id":3, "foreign_key": "2"}
} ]
},
"aggregations" : {
"avg" : {
"value" : 4.0
}
}
}
响应告诉我们:从 index2 中搜索到 2 条参与 join 的文档,在 index1 中命中 2 条数据,最后求平均值为 4.0。想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。
Day11: timelion请求语法
Advent • 三斗室 发表了文章 • 0 个评论 • 13183 次浏览 • 2015-12-16 23:01
timelion 的用法在官博里已经有介绍。尤其是最近两篇如何用 timelion 实现异常告警的文章,更是从 ES 的 pipeline aggregation 细节和场景一路讲到 timelion 具体操作,我这里几乎没有再重新讲一遍 timelion 操作入门的必要了。不过,官方却一直没有列出来 timelion 支持的请求语法的文档,而是在页面上通过点击图标的方式下拉帮助。
timelion 页面设计上,更接近 Kibana3 而不是 Kibana4。比如 panel 分布是通过设置几行几列的数目来固化的;query 框是唯一的,要修改哪个 panel 的 query,鼠标点选一下 panel,query 就自动切换成这个 panel 的了。
为了方便大家在上手之前了解 timelion 能做到什么,今天特意把 timelion 的请求语法所支持的函数分为几类,罗列如下:
可视化效果类:
.bars($width): 用柱状图展示数组
.lines($width, $fill, $show, $steps): 用折线图展示数组
.points(): 用散点图展示数组
.color("#c6c6c6"): 改变颜色
.hide(): 隐藏该数组
.label("change from %s"): 标签
.legend($position, $column): 图例位置
.yaxis($yaxis_number, $min, $max, $position): 设置 Y 轴属性,.yaxis(2) 表示第二根 Y 轴
数据运算类:
.abs(): 对整个数组元素求绝对值
.precision($number): 浮点数精度
.testcast($count, $alpha, $beta, $gamma): holt-winters 预测
.cusum($base): 数组元素之和,再加上 $base
.derivative(): 对数组求导数
.divide($divisor): 数组元素除法
.multiply($multiplier): 数组元素乘法
.subtract($term): 数组元素减法
.sum($term): 数组元素加法
.add(): 同 .sum()
.plus(): 同 .sum()
.first(): 返回第一个元素
.movingaverage($window): 用指定的窗口大小计算移动平均值
.mvavg(): .movingaverage() 的简写
.movingstd($window): 用指定的窗口大小计算移动标准差
.mvstd(): .movingstd() 的简写
数据源设定类: .elasticsearch(): 从 ES 读取数据
.es(q="querystring", metric="cardinality:uid", index="logstash-*", offset="-1d"): .elasticsearch() 的简写
.graphite(metric="path.to.*.data", offset="-1d"): 从 graphite 读取数据
.quandl(): 从 quandl.com 读取 quandl 码
.worldbank_indicators(): 从 worldbank.org 读取国家数据
.wbi(): .worldbank_indicators() 的简写
.worldbank(): 从 worldbank.org 读取数据
.wb(): .worldbanck() 的简写
以上所有函数,都在 series_functions 目录下实现,每个 js 文件实现一个 TimelionFunction 功能。想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。
Day10: 如何处理数组形式的JSON日志
Advent • 三斗室 发表了文章 • 1 个评论 • 10002 次浏览 • 2015-12-16 22:57
- 预先已经记录成 JSON 了;
- 日志主体内容是一个巨大无比的数组,数据元素才是实际的单次日志记录;
- 一次 POST 会有几 MB 到几十 MB 大小。
在处理这类数据的时候,第一关是别让数据超长直接给丢弃了(说的就是你啊,Rsyslog);第二关就是拆分 JSON 数组,把几十 MB 数据扔 ES 字段里,显然是不利于搜索和统计需求的。今天我们就来说说怎么拆分 JSON 数组。
假设收到的是这么一段日志:
{"uid":123456,"upload_datetime":"2015-12-10 11:38:11","logs":[{"type":"crash","timestamp":"2015-12-10 17:55:00","reason":"****"},{"type":"network_error","timestamp":"2015-12-10 17:56:12","tracert":"****"}]}
首先我们知道可以在读取的时候把 JSON 数据解析成 LogStash::Event 对象:input {
tcp {
codec => json
}
}
但是怎么把解析出来的 logs 字段拆分成多个 event 呢?这里我们可以用一个已有插件:logstash-filter-split。filter {
split {
field => "logs"
}
date {
match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
remove_fields => ["logs", "timestamp"]
}
}
这样,就可以得到两个 event 了:{"uid":123456,"upload_datetime":"2015-12-10 11:38:11","type":"crash","@timestamp":"2015-12-10T09:55:00Z","reason":"****"}
{"uid":123456,"upload_datetime":"2015-12-10 11:38:11","type":"network_error","@timestamp":"2015-12-10T09:56:12Z","tracert":"****"}
看起来可能跟这个插件的文档描述不太一样。文档上写的是通过 terminator 字符,切割 field 字符串成多个 event。但实际上,field 设置是会自动判断的,如果 field 内容是字符串,就切割字符串成为数组再循环;如果内容已经是数组了,直接循环: original_value = event[@field]
if original_value.is_a?(Array)
splits = original_value
elsif original_value.is_a?(String)
splits = original_value.split(@terminator, -1)
else
raise LogStash::ConfigurationError, "Only String and Array types are splittable. field:#{@field} is of type = #{original_value.class}"
end
return if splits.length == 1
splits.each do |value|
next if value.empty?
event_split = event.clone
@logger.debug("Split event", :value => value, :field => @field)
event_split[(@target || @field)] = value
filter_matched(event_split)
yield event_split
end
event.cancel
顺带提一句:这里 yield 在 Logstash 1.5.0 之前,实现有问题,生成的新事件,不会继续执行后续 filter,直接进入到 output 阶段。也就是说,如果你用 Logstash 1.4.2 来执行上面那段配置,生成的两个事件会是这样的:{"@timestamp":"2015-12-10T09:38:13Z","uid":123456,"upload_datetime":"2015-12-10 11:38:11","type":"crash","timestamp":"2015-12-10 17:55:00","reason":"****","logs":[{"type":"crash","timestamp":"2015-12-10 17:55:00","reason":"****"},{"type":"network_error","timestamp":"2015-12-10 17:56:12","tracert":"****"}]}
{"@timestamp":"2015-12-10T09:38:13Z","uid":123456,"upload_datetime":"2015-12-10 11:38:11","type":"network_error","@timestamp":"2015-12-10 17:56:12","tracert":"****","logs":[{"type":"crash","timestamp":"2015-12-10 17:55:00","reason":"****"},{"type":"network_error","timestamp":"2015-12-10 17:56:12","tracert":"****"}]}
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。 delete_by_query插件的java调用方式
Elasticsearch • wdy 回复了问题 • 3 人关注 • 2 个回复 • 5544 次浏览 • 2017-09-24 17:59
elasticsearch源码调试环境小结
Elasticsearch • jingkyks 发表了文章 • 4 个评论 • 16604 次浏览 • 2015-12-16 13:04
---------------------
调试环境是window(linux理论上通用)
用到的工具类:
1:mvn:https://maven.apache.org/
elasticsearch的源码是用mvn工具管理的,根据pom.xml来下载一些依赖包非常方便。
(当然也可以用gradle,由于不太熟悉,就没研究)
安装mvn,注意配置后环境变量即可。官方文档写的很明白。
最好自己修改一下mvn的setting.xml文件中的本地repo
<!-- localRepository
| The path to the local repository maven will use to store artifacts.
|
| Default: ${user.home}/.m2/repository
<localRepository>/path/to/local/repo</localRepository>
-->
我设置成了:
<localRepository>E:/m2/repository</localRepository>
mvn -v 测试以下
2:eclipse:编辑器,应用应该还比较广泛的。我用的最新版的mars。
(intellij idea据说这是一个很牛逼的编辑器,也是因为暂时不熟悉,还没研究)
----------------------
步骤:
1: 去github上选择一个tag版本,我用的是2.1.0.
https://github.com/elastic/ela ... 2.1.0
直接DownloadZip文件即可
(也可以用git clone下来)
解压缩。
假设目录为E:/elasticsearch-2.1.0
2: 编译源代码
cmd 打开命令行
进入源文件目录 E:/elasticsearch-2.1.0
执行 mvn package命令
这个时间段耗时比较长,当然也得根据网速情况。
会出现失败,大多是因为拉取不到依赖包。可以根据提示信息,手动去下载失败的jar,然后拷贝到本地repo对应的文件夹下边即可。
等出现build success信息的时候代表成功了。
可以到core/target目录下看到elasticsearch-2.1.0-SNAPSHOT.jar。
3:转为eclipse工程
可能习惯了eclipse工程,所以这里就直接用mvn转成了eclipse的工程,生成.classpath和.project文件。
进入core目录执行以下指令
mvn eclipse:eclipse
这一步也会消耗一些时间,通常的错误也是jar包下载不成功,根据终端打印的错误信息,把对应jar包直接下载下来,放到本地的repo对应目录下边即可,然后重新运行命令。直到成功。
之后,就会发现出现了.classpath和.project文件了。
然后打开eclipse 直接带入core中的工程即可。
4: 设置运行参数
打开刚刚导入成功的工程:
Run As----Run Configution---Args
设置ProgramArgument 为 start
设置VMArgument为 -Des.path.home=E:\elasticsearch-2.1.0\core\
完毕
-------
现在就就可以运行+调试了。
beats 中的 shipper 配置
Beats • medcl 回复了问题 • 3 人关注 • 1 个回复 • 5763 次浏览 • 2015-12-16 13:36