
Packetbeat协议扩展开发教程(2)
我们打开Packetbeat项目,看看里面长什么样:


现在beats项目都合并在一起了,第一级可以看到各个子项目:
/libbeat: 公共依赖;
/filebeat: 替代Logstash-forwarder,处理日志类型数据;
/packetbeat: 本文扩展重点,网络抓包;
/topbeat: 监控系统性能;
/winlogbeat: 监控windows下面的日志信息;
/vender: 依赖的第三方库;
/tests: 用于测试的pcamp抓包文件,非常有用;
/scripts: 一些用于开发和测试的Docker脚本文件;
现在重点看看/packetbeat下面目录都有些什么:
/packetbeat/main.go: 启动入口,里面没有什么逻辑;
/packetbeat/beat/: 里面就一个packetbeat.go文件,packetbeat主程序,处理配置和命令行参数,协议需要在这里进行注册;
/packetbeat/config/: 里面就一个config.go文件,定义了所有的配置相关的struct结构体,新协议需要在这里定义其配置的结构体;
/packetbeat/debian/: debian打包相关;
/packetbeat/decoder/: 解码类,网络传输层包的解码;
/packetbeat/docs/: 项目的相关文档;
/packetbeat/etc/: 示例配置文件;
/packetbeat/procs/: 获取系统内核运作状态与进程信息的工具类;
/packetbeat/protos/:自定义协议类,每个目录对应一个应用协议,我们需要在此新增我们的协议,如SMTP;
/packetbeat/sniffer/: 三种不同抓包方式的实现:pcap、af_packet、pf_ring,关于这三者的区别,请参照文档:Traffic Capturing Options;
/packetbeat/tests/: 测试相关的文件,里面有每一个协议的pcab抓包样板,还有一堆Python测试脚本;
知道项目的大概架构就知道从哪下手了,下节分解。
我们打开Packetbeat项目,看看里面长什么样:


现在beats项目都合并在一起了,第一级可以看到各个子项目:
/libbeat: 公共依赖;
/filebeat: 替代Logstash-forwarder,处理日志类型数据;
/packetbeat: 本文扩展重点,网络抓包;
/topbeat: 监控系统性能;
/winlogbeat: 监控windows下面的日志信息;
/vender: 依赖的第三方库;
/tests: 用于测试的pcamp抓包文件,非常有用;
/scripts: 一些用于开发和测试的Docker脚本文件;
现在重点看看/packetbeat下面目录都有些什么:
/packetbeat/main.go: 启动入口,里面没有什么逻辑;
/packetbeat/beat/: 里面就一个packetbeat.go文件,packetbeat主程序,处理配置和命令行参数,协议需要在这里进行注册;
/packetbeat/config/: 里面就一个config.go文件,定义了所有的配置相关的struct结构体,新协议需要在这里定义其配置的结构体;
/packetbeat/debian/: debian打包相关;
/packetbeat/decoder/: 解码类,网络传输层包的解码;
/packetbeat/docs/: 项目的相关文档;
/packetbeat/etc/: 示例配置文件;
/packetbeat/procs/: 获取系统内核运作状态与进程信息的工具类;
/packetbeat/protos/:自定义协议类,每个目录对应一个应用协议,我们需要在此新增我们的协议,如SMTP;
/packetbeat/sniffer/: 三种不同抓包方式的实现:pcap、af_packet、pf_ring,关于这三者的区别,请参照文档:Traffic Capturing Options;
/packetbeat/tests/: 测试相关的文件,里面有每一个协议的pcab抓包样板,还有一堆Python测试脚本;
知道项目的大概架构就知道从哪下手了,下节分解。 收起阅读 »

一个把数据从MySQL同步到Elasticsearch的工具
这个工具用python实现,主要使用了mysqldump输出xml进行初次同步,以及binlog进行增量同步,欢迎试用以及提出修改意见。
最近刚刚更新了中文文档。
这个工具用python实现,主要使用了mysqldump输出xml进行初次同步,以及binlog进行增量同步,欢迎试用以及提出修改意见。
最近刚刚更新了中文文档。 收起阅读 »

社区福利:Elastic-playground
Kibana:
https://6e0ccaba29cd55a7f07f83 ... ibana
用户名/密码:elasticsearch-cn
集群名:"e064eb",使用Java客户端的时候需要,如何连接,参考:http://elasticsearch.cn/article/46
HTTP http://e064eb4b0aa993db28ad513 ... :9200
HTTPS https://e064eb4b0aa993db28ad51 ... :9243
curl -u elasticsearch-cn:elasticsearch-cn http://e064eb4b0aa993db28ad513 ... 9200/
{
"name" : "instance-0000000009",
"cluster_name" : "e064eb4b0aa993db28ad513e4d2df5e3",
"version" : {
"number" : "2.1.1",
"build_hash" : "40e2c53a6b6c2972b3d13846e450e66f4375bd71",
"build_timestamp" : "2015-12-15T13:05:55Z",
"build_snapshot" : false,
"lucene_version" : "5.3.1"
},
"tagline" : "You Know, for Search"
}
内置常用插件,有其他插件要安装的请留言。
Kibana:
https://6e0ccaba29cd55a7f07f83 ... ibana
用户名/密码:elasticsearch-cn
集群名:"e064eb",使用Java客户端的时候需要,如何连接,参考:http://elasticsearch.cn/article/46
HTTP http://e064eb4b0aa993db28ad513 ... :9200
HTTPS https://e064eb4b0aa993db28ad51 ... :9243
curl -u elasticsearch-cn:elasticsearch-cn http://e064eb4b0aa993db28ad513 ... 9200/
{
"name" : "instance-0000000009",
"cluster_name" : "e064eb4b0aa993db28ad513e4d2df5e3",
"version" : {
"number" : "2.1.1",
"build_hash" : "40e2c53a6b6c2972b3d13846e450e66f4375bd71",
"build_timestamp" : "2015-12-15T13:05:55Z",
"build_snapshot" : false,
"lucene_version" : "5.3.1"
},
"tagline" : "You Know, for Search"
}
内置常用插件,有其他插件要安装的请留言。 收起阅读 »

elasticsearch-analysis-ik和elasticsearch-analysis-mmseg更新至1.7.0
https://github.com/medcl/elasticsearch-analysis-ik
elasticsearch-analysis-mmseg:
https://github.com/medcl/elast ... -mseg
主要更新配置文件存放路径,之前版本的配置文件存放在elasticsearch的config目录,现在都修改为插件的相对目录了,主要是简化部署,现在可在Found(https://found.elastic.co)部署了。
https://github.com/medcl/elasticsearch-analysis-ik
elasticsearch-analysis-mmseg:
https://github.com/medcl/elast ... -mseg
主要更新配置文件存放路径,之前版本的配置文件存放在elasticsearch的config目录,现在都修改为插件的相对目录了,主要是简化部署,现在可在Found(https://found.elastic.co)部署了。 收起阅读 »

通过elasticsearch-mapper attachment插件实现文件建立索引
bin/plugin install elasticsearch/elasticsearch-mapper-attachments/3.1.1
2.按照插件官方文档来测试
3.插件需要手动把文档内容转化为base64编码然后建立索引,代码如下
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.tika.Tika;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.parser.AutoDetectParser;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.parser.Parser;
import org.apache.tika.parser.pdf.PDFParser;
import org.apache.tika.sax.BodyContentHandler;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.xml.sax.ContentHandler;
import com.spatial4j.core.io.ParseUtils;
import static org.elasticsearch.common.xcontent.XContentFactory.*;
public class sysfiles {
public static void main(String[] args) throws Exception{
sys();
}
private static void sys() throws IOException {
// TODO Auto-generated method stub
String idxName = "test";
String idxType = "attachments";
Settings settings =ImmutableSettings.settingsBuilder().put("cluster.name","az_bsms_elasticsearch").build();
Client client=new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress("127.0.0.1", 9300));
String data64=org.elasticsearch.common.Base64.encodeFromFile(filepath);
XContentBuilder source = jsonBuilder().startObject()
.field("file", data64)
.field("text", data64)
.endObject();
String id = "file"+11;
IndexResponse idxResp = client.prepareIndex().setIndex(idxName).setType(idxType).setId(id)
.setSource(source).setRefresh(true).execute().actionGet();
System.out.println(idxResp);
client.close();
}
4.按官方文档正常的搜索就可以了
bin/plugin install elasticsearch/elasticsearch-mapper-attachments/3.1.1
2.按照插件官方文档来测试
3.插件需要手动把文档内容转化为base64编码然后建立索引,代码如下
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.tika.Tika;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.parser.AutoDetectParser;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.parser.Parser;
import org.apache.tika.parser.pdf.PDFParser;
import org.apache.tika.sax.BodyContentHandler;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.xml.sax.ContentHandler;
import com.spatial4j.core.io.ParseUtils;
import static org.elasticsearch.common.xcontent.XContentFactory.*;
public class sysfiles {
public static void main(String[] args) throws Exception{
sys();
}
private static void sys() throws IOException {
// TODO Auto-generated method stub
String idxName = "test";
String idxType = "attachments";
Settings settings =ImmutableSettings.settingsBuilder().put("cluster.name","az_bsms_elasticsearch").build();
Client client=new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress("127.0.0.1", 9300));
String data64=org.elasticsearch.common.Base64.encodeFromFile(filepath);
XContentBuilder source = jsonBuilder().startObject()
.field("file", data64)
.field("text", data64)
.endObject();
String id = "file"+11;
IndexResponse idxResp = client.prepareIndex().setIndex(idxName).setType(idxType).setId(id)
.setSource(source).setRefresh(true).execute().actionGet();
System.out.println(idxResp);
client.close();
}
4.按官方文档正常的搜索就可以了 收起阅读 »

Packetbeat协议扩展开发教程(1)
是一个开源的网络抓包与分析框架,内置了很多常见的协议解析,如HTPP、MySQL、Thrift等。但是网络协议有很多,如何扩展一个自己的协议呢,本文将为您介绍如何在Packetbeat基础上扩展实现您自己的协议。
开发环境:
1.Go语言
Packetbeat是由Go语言编写,具有高性能和易部署的特点,有关Go语言的更多信息请访问:https://golang.org/。
2.Git
源码管理,相信大家都比较熟悉了。
3.Tcpdump
*nix下的抓包分析,可选,用于调试。
4.Mac本一台
Windows太伤,不建议。
5.IDE
推荐idea,其它只要你顺手都行。
这个教程给大家介绍的是编写一个SMTP协议的扩展,SMTP就是我们发邮件使用的协议,加密的比较麻烦,为了方便,本教程使用不加密的名文传输的SMTP协议,默认对应端口是25。
A.源码签出
登陆Github打开https://github.com/elastic/beats

fork后得到你自己的仓库,比如我的:https://github.com/medcl/packetbeat
#创建相应目录
mkdir -p $GOPATH/src/github.com/elastic/
cd $GOPATH/src/github.com/elastic
#签出源码
git clone https://github.com/elastic/beats.git
cd beats
#修改官方仓库为upstream源,设置自己的仓库为origin源
git remote rename origin upstream
git remote add origin git@github.com:medcl/packetbeat.git
#获取上游最新的代码,如果是刚fork的话可不用管
git pull upstream master
#签出一个名为smtpbeat的分支,用于开发这个功能
git checkout -b smtpbeat
#切换到packetbeat模块
cd packetbeat
#获取依赖信息
(mkdir -p $GOPATH/src/golang.org/x/&&cd $GOPATH/src/golang.org/x &&git clone https://github.com/golang/tools.git )
go get github.com/tools/godep
#编译
make
编译出来的文件:packetbeat就在根目录
现在我们测试一下
修改etc/packetbeat.yml,在output下面的elasticsearch下面添加enabled: true,默认是不启用的,另外如果你的Elasticsearch安装了Shield,比如我的Elasticsearch的用户名和密码都是tribe_user,哦,忘了说了,我们的Elasticsearch跑在本机。
packetbeat.yml的详细配置可参见:https://www.elastic.co/guide/e ... .html
output:
elasticsearch:
enabled: true
hosts: ["localhost:9200"]
username: "tribe_user"
password: "tribe_user"
现在可以运行命令启动packetbeat了,默认会监听所有内置的协议,如HTTP、DNS等。
./packetbeat -e -c etc/packetbeat.yml -d "publish"
介绍一下常用的参数:
-N dry run模式,不实际output存储日志
-e 控制台输出调试日志
-d 仅显示对应logger的日志
好的,我们打开几个网页,控制台会有相应的输出,如下:
2015/12/29 14:24:39.965037 preprocess.go:37: DBG Start Preprocessing
2015/12/29 14:24:39.965366 publish.go:98: DBG Publish: {
"@timestamp": "2015-12-29T14:24:39.709Z",
"beat": {
"hostname": "medcls-MacBook.local",
"name": "medcls-MacBook.local"
},
"bytes_in": 31,
"bytes_out": 115,
"client_ip": "192.168.3.10",
"client_port": 53669,
"client_proc": "",
"client_server": "",
"count": 1,
"direction": "out",
"dns": {
"additionals_count": 0,
"answers": [
{
"class": "IN",
"data": "www.a.shifen.com",
"name": "sp2.baidu.com",
"ttl": 333,
"type": "CNAME"
}
],
"answers_count": 1,
"authorities": [
{
"class": "IN",
"data": "ns1.a.shifen.com",
"expire": 86400,
"minimum": 3600,
"name": "a.shifen.com",
"refresh": 5,
"retry": 5,
"rname": "baidu_dns_master.baidu.com",
"serial": 1512240003,
"ttl": 12,
"type": "SOA"
}
],
"authorities_count": 1,
"flags": {
"authoritative": false,
"recursion_allowed": true,
"recursion_desired": true,
"truncated_response": false
},
"id": 7435,
"op_code": "QUERY",
"question": {
"class": "IN",
"name": "sp2.baidu.com",
"type": "AAAA"
},
"response_code": "NOERROR"
},
"ip": "192.168.3.1",
"method": "QUERY",
"port": 53,
"proc": "",
"query": "class IN, type AAAA, sp2.baidu.com",
"resource": "sp2.baidu.com",
"responsetime": 18,
"server": "",
"status": "OK",
"transport": "udp",
"type": "dns"
}
2015/12/29 14:24:39.965774 preprocess.go:94: DBG Forward preprocessed events
2015/12/29 14:24:39.965796 async.go:42: DBG async forward to outputers (1)
2015/12/29 14:24:40.099973 output.go:103: DBG output worker: publish 2 events
然后Elasticsearch应该就会有数据进去了,我们看看:
curl http://localhost:9200/_cat/indices\?pretty\=true -u tribe_user:tribe_user
yellow open packetbeat-2015.12.29 5 1 135 0 561.2kb 561.2kb
至此,packetbeat源码的build成功,我们整个开发流程已经跑通了,下一节正式开始介绍SMTP协议的扩展。
是一个开源的网络抓包与分析框架,内置了很多常见的协议解析,如HTPP、MySQL、Thrift等。但是网络协议有很多,如何扩展一个自己的协议呢,本文将为您介绍如何在Packetbeat基础上扩展实现您自己的协议。
开发环境:
1.Go语言
Packetbeat是由Go语言编写,具有高性能和易部署的特点,有关Go语言的更多信息请访问:https://golang.org/。
2.Git
源码管理,相信大家都比较熟悉了。
3.Tcpdump
*nix下的抓包分析,可选,用于调试。
4.Mac本一台
Windows太伤,不建议。
5.IDE
推荐idea,其它只要你顺手都行。
这个教程给大家介绍的是编写一个SMTP协议的扩展,SMTP就是我们发邮件使用的协议,加密的比较麻烦,为了方便,本教程使用不加密的名文传输的SMTP协议,默认对应端口是25。
A.源码签出
登陆Github打开https://github.com/elastic/beats

fork后得到你自己的仓库,比如我的:https://github.com/medcl/packetbeat
#创建相应目录
mkdir -p $GOPATH/src/github.com/elastic/
cd $GOPATH/src/github.com/elastic
#签出源码
git clone https://github.com/elastic/beats.git
cd beats
#修改官方仓库为upstream源,设置自己的仓库为origin源
git remote rename origin upstream
git remote add origin git@github.com:medcl/packetbeat.git
#获取上游最新的代码,如果是刚fork的话可不用管
git pull upstream master
#签出一个名为smtpbeat的分支,用于开发这个功能
git checkout -b smtpbeat
#切换到packetbeat模块
cd packetbeat
#获取依赖信息
(mkdir -p $GOPATH/src/golang.org/x/&&cd $GOPATH/src/golang.org/x &&git clone https://github.com/golang/tools.git )
go get github.com/tools/godep
#编译
make
编译出来的文件:packetbeat就在根目录
现在我们测试一下
修改etc/packetbeat.yml,在output下面的elasticsearch下面添加enabled: true,默认是不启用的,另外如果你的Elasticsearch安装了Shield,比如我的Elasticsearch的用户名和密码都是tribe_user,哦,忘了说了,我们的Elasticsearch跑在本机。
packetbeat.yml的详细配置可参见:https://www.elastic.co/guide/e ... .html
output:
elasticsearch:
enabled: true
hosts: ["localhost:9200"]
username: "tribe_user"
password: "tribe_user"
现在可以运行命令启动packetbeat了,默认会监听所有内置的协议,如HTTP、DNS等。
./packetbeat -e -c etc/packetbeat.yml -d "publish"
介绍一下常用的参数:
-N dry run模式,不实际output存储日志
-e 控制台输出调试日志
-d 仅显示对应logger的日志
好的,我们打开几个网页,控制台会有相应的输出,如下:
2015/12/29 14:24:39.965037 preprocess.go:37: DBG Start Preprocessing
2015/12/29 14:24:39.965366 publish.go:98: DBG Publish: {
"@timestamp": "2015-12-29T14:24:39.709Z",
"beat": {
"hostname": "medcls-MacBook.local",
"name": "medcls-MacBook.local"
},
"bytes_in": 31,
"bytes_out": 115,
"client_ip": "192.168.3.10",
"client_port": 53669,
"client_proc": "",
"client_server": "",
"count": 1,
"direction": "out",
"dns": {
"additionals_count": 0,
"answers": [
{
"class": "IN",
"data": "www.a.shifen.com",
"name": "sp2.baidu.com",
"ttl": 333,
"type": "CNAME"
}
],
"answers_count": 1,
"authorities": [
{
"class": "IN",
"data": "ns1.a.shifen.com",
"expire": 86400,
"minimum": 3600,
"name": "a.shifen.com",
"refresh": 5,
"retry": 5,
"rname": "baidu_dns_master.baidu.com",
"serial": 1512240003,
"ttl": 12,
"type": "SOA"
}
],
"authorities_count": 1,
"flags": {
"authoritative": false,
"recursion_allowed": true,
"recursion_desired": true,
"truncated_response": false
},
"id": 7435,
"op_code": "QUERY",
"question": {
"class": "IN",
"name": "sp2.baidu.com",
"type": "AAAA"
},
"response_code": "NOERROR"
},
"ip": "192.168.3.1",
"method": "QUERY",
"port": 53,
"proc": "",
"query": "class IN, type AAAA, sp2.baidu.com",
"resource": "sp2.baidu.com",
"responsetime": 18,
"server": "",
"status": "OK",
"transport": "udp",
"type": "dns"
}
2015/12/29 14:24:39.965774 preprocess.go:94: DBG Forward preprocessed events
2015/12/29 14:24:39.965796 async.go:42: DBG async forward to outputers (1)
2015/12/29 14:24:40.099973 output.go:103: DBG output worker: publish 2 events
然后Elasticsearch应该就会有数据进去了,我们看看:
curl http://localhost:9200/_cat/indices\?pretty\=true -u tribe_user:tribe_user
yellow open packetbeat-2015.12.29 5 1 135 0 561.2kb 561.2kb
至此,packetbeat源码的build成功,我们整个开发流程已经跑通了,下一节正式开始介绍SMTP协议的扩展。 收起阅读 »

关于提示TooManyClauses[maxClauseCount is set to 1024]的问题。
通过数据库获取到了1126个条件数据,然后叠加进bool进行查询,直接抛出个异常:
TooManyClauses[maxClauseCount is set to 1024]
问了Medcl大神,得知是超过默认搜索条件大小的问题,可以通过参数修改
index.query.bool.max_clause_count: 4096
M大也说,太BT了。。。 这么多条件查询。。。我也觉得挺BT的,自己想想都有点小激动,太佩服自己了。。。
通过数据库获取到了1126个条件数据,然后叠加进bool进行查询,直接抛出个异常:
TooManyClauses[maxClauseCount is set to 1024]
问了Medcl大神,得知是超过默认搜索条件大小的问题,可以通过参数修改
index.query.bool.max_clause_count: 4096
M大也说,太BT了。。。 这么多条件查询。。。我也觉得挺BT的,自己想想都有点小激动,太佩服自己了。。。 收起阅读 »

Day24: Elasticsearch添加Shield后TransportClient如何连接?
Elasticsearch使用了Shield后,Elasticsearch就需要权限才能访问了,和默认的调用方式有些不同,下面简单介绍一下HTTP和TCP两种方式的连接.
关于Shield的安装和配置我这里不就具体介绍,创建了一个用户名和密码都是tribe_user的用户,权限是admin.
1.HTTP方式
现在直接访问es的http接口就会报错
curl http://localhost:9200
{"error":{"root_cause":[{"type":"security_exception","reason":"missing authentication token for REST request [/]","header":{"WWW-Authenticate":"Basic realm=\"shield\""}}],"type":"security_exception","reason":"missing authentication token for REST request [/]","header":{"WWW-Authenticate":"Basic realm=\"shield\""}},"status":401}
shield支持HttpBasic验证,所以正确的访问姿势是:
curl -u tribe_user:tribe_user http://localhost:9200 { "name" : "Melter", "cluster_name" : "elasticsearch", "version" : { "number" : "2.1.1", "build_hash" : "805c528f3167980046f224310f9147fa745e5371", "build_timestamp" : "2015-12-09T20:23:16Z", "build_snapshot" : false, "lucene_version" : "5.3.1" }, "tagline" : "You Know, for Search" }
如果是浏览器访问的话,第一次访问会弹出验证窗口,后续只要不关闭这个浏览器保持这个session就能一直访问.
注意http basic是不安全的认证方式,仅供开发调试使用,生产环境还需要结合HTTPS的加密通道使用.
2.TransportClient方式的访问Shield加防的Elasticsearch,稍微麻烦点,需要依赖Shield的包,步骤如下:
2.1 如果你是maven管理的项目,在pom.xml文件里添加Elasticsearch的maven仓库源,如下:
<repositories>
<repository>
<id>elasticsearch-releases</id>
<url>https://maven.elasticsearch.or ... gt%3B
<releases> <enabled>true</enabled> </releases>
<snapshots> <enabled>false</enabled> </snapshots>
</repository>
</repositories>
2.2 添加依赖的配置
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>shield</artifactId>
<version>2.1.1</version>
</dependency
2.3 构建TransportClient的地方增加访问用户的配置
import org.elasticsearch.shield.ShieldPlugin; import org.elasticsearch.shield.authc.support.SecuredString; import static org.elasticsearch.shield.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
String clusterName="elasticsearch"; String ip= "127.0.0.1";
Settings settings = Settings.settingsBuilder()
.put("cluster.name", clusterName)
.put("shield.user", "tribe_user:tribe_user")
.build();
try { client = TransportClient.builder()
.addPlugin(ShieldPlugin.class)
.settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ip),9300));
String token = basicAuthHeaderValue("tribe_user", new SecuredString("tribe_user".toCharArray())); client.prepareSearch()
.putHeader("Authorization", token).get(); }
catch (UnknownHostException e)
{ logger.error("es",e); }
现在的编辑器贴代码有点恶心,可以看这里:
http://log.medcl.net/item/2015 ... -1252
Elasticsearch使用了Shield后,Elasticsearch就需要权限才能访问了,和默认的调用方式有些不同,下面简单介绍一下HTTP和TCP两种方式的连接.
关于Shield的安装和配置我这里不就具体介绍,创建了一个用户名和密码都是tribe_user的用户,权限是admin.
1.HTTP方式
现在直接访问es的http接口就会报错
curl http://localhost:9200
{"error":{"root_cause":[{"type":"security_exception","reason":"missing authentication token for REST request [/]","header":{"WWW-Authenticate":"Basic realm=\"shield\""}}],"type":"security_exception","reason":"missing authentication token for REST request [/]","header":{"WWW-Authenticate":"Basic realm=\"shield\""}},"status":401}
shield支持HttpBasic验证,所以正确的访问姿势是:
curl -u tribe_user:tribe_user http://localhost:9200 { "name" : "Melter", "cluster_name" : "elasticsearch", "version" : { "number" : "2.1.1", "build_hash" : "805c528f3167980046f224310f9147fa745e5371", "build_timestamp" : "2015-12-09T20:23:16Z", "build_snapshot" : false, "lucene_version" : "5.3.1" }, "tagline" : "You Know, for Search" }
如果是浏览器访问的话,第一次访问会弹出验证窗口,后续只要不关闭这个浏览器保持这个session就能一直访问.
注意http basic是不安全的认证方式,仅供开发调试使用,生产环境还需要结合HTTPS的加密通道使用.
2.TransportClient方式的访问Shield加防的Elasticsearch,稍微麻烦点,需要依赖Shield的包,步骤如下:
2.1 如果你是maven管理的项目,在pom.xml文件里添加Elasticsearch的maven仓库源,如下:
<repositories>
<repository>
<id>elasticsearch-releases</id>
<url>https://maven.elasticsearch.or ... gt%3B
<releases> <enabled>true</enabled> </releases>
<snapshots> <enabled>false</enabled> </snapshots>
</repository>
</repositories>
2.2 添加依赖的配置
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>shield</artifactId>
<version>2.1.1</version>
</dependency
2.3 构建TransportClient的地方增加访问用户的配置
import org.elasticsearch.shield.ShieldPlugin; import org.elasticsearch.shield.authc.support.SecuredString; import static org.elasticsearch.shield.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
String clusterName="elasticsearch"; String ip= "127.0.0.1";
Settings settings = Settings.settingsBuilder()
.put("cluster.name", clusterName)
.put("shield.user", "tribe_user:tribe_user")
.build();
try { client = TransportClient.builder()
.addPlugin(ShieldPlugin.class)
.settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ip),9300));
String token = basicAuthHeaderValue("tribe_user", new SecuredString("tribe_user".toCharArray())); client.prepareSearch()
.putHeader("Authorization", token).get(); }
catch (UnknownHostException e)
{ logger.error("es",e); }
现在的编辑器贴代码有点恶心,可以看这里:
http://log.medcl.net/item/2015 ... -1252 收起阅读 »

Day 23 谈谈ES 的Recovery
Recovery是指将一个索引的未分配shard分配到一个结点的过程。 在快照恢复,更改索引复制片数量,结点故障或者结点启动时发生。由于master持有整个集群的状态信息,因此可以判断出哪些shard需要做再分配,以及分配到哪个结点。例如:
- 如果某个shard主片在,副片所在结点挂了,那么选择另外一个可用结点,将副片分配(allocate)上去,然后进行主从片的复制。
- 如果某个shard的主片所在结点挂了,副片还在,那么将副片升级为主片,然后做主副复制。
- 如果某个shard的主副片所在结点都挂了,则暂时无法恢复,等待持有相关数据的结点重新加入集群后,从结点上恢复主分片,再选择某个结点分配复制片,并从主分片同步数据。
通过CAT health API,我们可以查看集群的状态,从而获知数据的完整性情况:
可能的状态及含义:
Green: 所有的shard主副片都完好的
Yellow: 所有shard的主片都完好,部分副片没有了,数据完整性依然完好。
Red: 某些shard的主副片都没有了,对应的索引数据不完整
Recovery过程要消耗额外的资源,CPU、内存、结点之间的网络带宽等等。 这些额外的资源消耗,有可能会导致集群的服务能力降级,或者一部分功能暂时不可用。了解一些Recovery的过程和相关的配置参数,对于减小recovery带来的资源消耗,加快集群恢复过程都是很有帮助的。
减少集群Full Restart造成的数据来回拷贝
集群可能会有整体重启的需要,比如需要升级硬件、升级操作系统或者升级ES大版本。重启所有结点可能带来的一个问题: 某些结点可能先于其他结点加入集群。 先加入集群的结点可能已经可以选举好master,并立即启动了recovery的过程,由于这个时候整个集群数据还不完整,master会指示一些结点之间相互开始复制数据。 那些晚到的结点,一旦发现本地的数据已经被复制到其他结点,则直接删除掉本地“失效”的数据。 当整个集群恢复完毕后,数据分布不均衡显然是不均衡的,master会触发rebalance过程,将数据在结点之间挪动。整个过程无谓消耗了大量的网络流量。 合理设置recovery相关参数则可以防范这种问题的发生。
gateway.expected_nodes
gateway.expected_master_nodes
gateway.expected_data_nodes
以上三个参数是说集群里一旦有多少个结点就立即开始recovery过程。 不同之处在于,第一个参数指的是master或者data都算在内,而后面两个参数则分指master和data node。
在期待的节点数条件满足之前, recovery过程会等待gateway.recover_after_time (默认5分钟) 这么长时间,一旦等待超时,则会根据以下条件判断是否启动:
gateway.recover_after_nodes
gateway.recover_after_master_nodes
gateway.recover_after_data_nodes
举例来说,对于一个有10个data node的集群,如果有以下的设置:
gateway.expected_data_nodes: 10
gateway.recover_after_time: 5m
gateway.recover_after_data_nodes: 8
那么集群5分钟以内10个data node都加入了,或者5分钟以后8个以上的data node加入了,都会立即启动recovery过程。
减少主副本之间的数据复制
如果不是full restart,而是重启单个data node,仍然会造成数据在不同结点之间来回复制。为避免这个问题,可以在重启之前,先关闭集群的shard allocation:
然后在结点重启完成加入集群后,再重新打开:
这样在结点重启完成后,尽量多的从本地直接恢复数据。
但是在ES1.6版本之前,即使做了以上措施,仍然会发现有大量主副本之间的数据拷贝。从表面去看,这点很让人不能理解。 主副本数据完全一致,ES应该直接从副本本地恢复数据就好了,为什么要重新从主片再复制一遍呢? 原因在于Recovery是简单对比主副本的segment file来判断哪些数据一致可以本地恢复,哪些不一致需要远端拷贝的。而不同结点的segment merge是完全独立运行的,可能导致主副本merge的深度不完全一样,从而造成即使文档集完全一样,产生的segment file却不完全一样。
为了解决这个问题,ES1.6版本以后加入了synced flush的新特性。 对于5分钟没有更新过的shard,会自动synced flush一下,实质是为对应的shard加了一个synced flush ID。这样当重启结点的时候,先对比一下shard的synced flush ID,就可以知道两个shard是否完全相同,避免了不必要的segment file拷贝,极大加快了冷索引的恢复速度。
需要注意的是synced flush只对冷索引有效,对于热索引(5分钟内有更新的索引)没有作用。 如果重启的结点包含有热索引,那么还是免不了大量的文件拷贝。因此在重启一个结点之前,最好按照以下步骤执行,recovery几乎可以瞬间完成:
- 暂停数据写入程序
- 关闭集群shard allocation
- 手动执行POST /_flush/synced
- 重启结点
- 重新开启集群shard allocation
- 等待recovery完成,集群health status变成green
- 重新开启数据写入程序
(特别大的)热索引为何恢复慢
对于冷索引,由于数据不再更新,利用synced flush特性,可以快速直接从本地恢复数据。 而对于热索引,特别是shard很大的热索引,除了synced flush派不上用场需要大量跨结点拷贝segment file以外,translog recovery是导致慢的更重要的原因。
从主片恢复数据到副片需要经历3个阶段:
- 对主片上的segment file做一个快照,然后拷贝到复制片分配到的结点。数据拷贝期间,不会阻塞索引请求,新增索引操作记录到translog里。
- 对translog做一个快照,此快照包含第一阶段新增的索引请求,然后重放快照里的索引操作。此阶段仍然不阻塞索引请求,新增索引操作记录到translog里。
- 为了能达到主副片完全同步,阻塞掉新索引请求,然后重放阶段二新增的translog操作。
可见,在recovery完成之前,translog是不能够被清除掉的(禁用掉正常运作期间后台的flush操作)。如果shard比较大,第一阶段耗时很长,会导致此阶段产生的translog很大。重放translog比起简单的文件拷贝耗时要长得多,因此第二阶段的translog耗时也会显著增加。等到第三阶段,需要重放的translog可能会比第二阶段还要多。 而第三阶段是会阻塞新索引写入的,在对写入实时性要求很高的场合,就会非常影响用户体验。 因此,要加快大的热索引恢复速度,最好的方式是遵从上一节提到的方法: 暂停新数据写入,手动sync flush,等待数据恢复完成后,重新开启数据写入,这样可以将数据延迟影响可以降到最低。
万一遇到Recovery慢,想知道进度怎么办呢? CAT Recovery API可以显示详细的recovery各个阶段的状态。 这个API怎么用就不在这里赘述了,参考: CAT Recovery
其他Recovery相关的专家级设置
还有其他一些专家级的设置(参见: recovery)可以影响recovery的速度,但提升速度的代价是更多的资源消耗,因此在生产集群上调整这些参数需要结合实际情况谨慎调整,一旦影响应用要立即调整回来。 对于搜索并发量要求高,延迟要求低的场合,默认设置一般就不要去动了。 对于日志实时分析类对于搜索延迟要求不高,但对于数据写入延迟期望比较低的场合,可以适当调大indices.recovery.max_bytes_per_sec,提升recovery速度,减少数据写入被阻塞的时长。
最后要说的一点是ES的版本迭代很快,对于Recovery的机制也在不断的优化中。 其中有一些版本甚至引入了一些bug,比如在ES1.4.x有严重的translog recovery bug,导致大的索引trans log recovery几乎无法完成 (issue #9226) 。因此实际使用中如果遇到问题,最好在Github的issue list里搜索一下,看是否使用的版本有其他人反映同样的问题。
Recovery是指将一个索引的未分配shard分配到一个结点的过程。 在快照恢复,更改索引复制片数量,结点故障或者结点启动时发生。由于master持有整个集群的状态信息,因此可以判断出哪些shard需要做再分配,以及分配到哪个结点。例如:
- 如果某个shard主片在,副片所在结点挂了,那么选择另外一个可用结点,将副片分配(allocate)上去,然后进行主从片的复制。
- 如果某个shard的主片所在结点挂了,副片还在,那么将副片升级为主片,然后做主副复制。
- 如果某个shard的主副片所在结点都挂了,则暂时无法恢复,等待持有相关数据的结点重新加入集群后,从结点上恢复主分片,再选择某个结点分配复制片,并从主分片同步数据。
通过CAT health API,我们可以查看集群的状态,从而获知数据的完整性情况:
可能的状态及含义:
Green: 所有的shard主副片都完好的
Yellow: 所有shard的主片都完好,部分副片没有了,数据完整性依然完好。
Red: 某些shard的主副片都没有了,对应的索引数据不完整
Recovery过程要消耗额外的资源,CPU、内存、结点之间的网络带宽等等。 这些额外的资源消耗,有可能会导致集群的服务能力降级,或者一部分功能暂时不可用。了解一些Recovery的过程和相关的配置参数,对于减小recovery带来的资源消耗,加快集群恢复过程都是很有帮助的。
减少集群Full Restart造成的数据来回拷贝
集群可能会有整体重启的需要,比如需要升级硬件、升级操作系统或者升级ES大版本。重启所有结点可能带来的一个问题: 某些结点可能先于其他结点加入集群。 先加入集群的结点可能已经可以选举好master,并立即启动了recovery的过程,由于这个时候整个集群数据还不完整,master会指示一些结点之间相互开始复制数据。 那些晚到的结点,一旦发现本地的数据已经被复制到其他结点,则直接删除掉本地“失效”的数据。 当整个集群恢复完毕后,数据分布不均衡显然是不均衡的,master会触发rebalance过程,将数据在结点之间挪动。整个过程无谓消耗了大量的网络流量。 合理设置recovery相关参数则可以防范这种问题的发生。
gateway.expected_nodes
gateway.expected_master_nodes
gateway.expected_data_nodes
以上三个参数是说集群里一旦有多少个结点就立即开始recovery过程。 不同之处在于,第一个参数指的是master或者data都算在内,而后面两个参数则分指master和data node。
在期待的节点数条件满足之前, recovery过程会等待gateway.recover_after_time (默认5分钟) 这么长时间,一旦等待超时,则会根据以下条件判断是否启动:
gateway.recover_after_nodes
gateway.recover_after_master_nodes
gateway.recover_after_data_nodes
举例来说,对于一个有10个data node的集群,如果有以下的设置:
gateway.expected_data_nodes: 10
gateway.recover_after_time: 5m
gateway.recover_after_data_nodes: 8
那么集群5分钟以内10个data node都加入了,或者5分钟以后8个以上的data node加入了,都会立即启动recovery过程。
减少主副本之间的数据复制
如果不是full restart,而是重启单个data node,仍然会造成数据在不同结点之间来回复制。为避免这个问题,可以在重启之前,先关闭集群的shard allocation:
然后在结点重启完成加入集群后,再重新打开:
这样在结点重启完成后,尽量多的从本地直接恢复数据。
但是在ES1.6版本之前,即使做了以上措施,仍然会发现有大量主副本之间的数据拷贝。从表面去看,这点很让人不能理解。 主副本数据完全一致,ES应该直接从副本本地恢复数据就好了,为什么要重新从主片再复制一遍呢? 原因在于Recovery是简单对比主副本的segment file来判断哪些数据一致可以本地恢复,哪些不一致需要远端拷贝的。而不同结点的segment merge是完全独立运行的,可能导致主副本merge的深度不完全一样,从而造成即使文档集完全一样,产生的segment file却不完全一样。
为了解决这个问题,ES1.6版本以后加入了synced flush的新特性。 对于5分钟没有更新过的shard,会自动synced flush一下,实质是为对应的shard加了一个synced flush ID。这样当重启结点的时候,先对比一下shard的synced flush ID,就可以知道两个shard是否完全相同,避免了不必要的segment file拷贝,极大加快了冷索引的恢复速度。
需要注意的是synced flush只对冷索引有效,对于热索引(5分钟内有更新的索引)没有作用。 如果重启的结点包含有热索引,那么还是免不了大量的文件拷贝。因此在重启一个结点之前,最好按照以下步骤执行,recovery几乎可以瞬间完成:
- 暂停数据写入程序
- 关闭集群shard allocation
- 手动执行POST /_flush/synced
- 重启结点
- 重新开启集群shard allocation
- 等待recovery完成,集群health status变成green
- 重新开启数据写入程序
(特别大的)热索引为何恢复慢
对于冷索引,由于数据不再更新,利用synced flush特性,可以快速直接从本地恢复数据。 而对于热索引,特别是shard很大的热索引,除了synced flush派不上用场需要大量跨结点拷贝segment file以外,translog recovery是导致慢的更重要的原因。
从主片恢复数据到副片需要经历3个阶段:
- 对主片上的segment file做一个快照,然后拷贝到复制片分配到的结点。数据拷贝期间,不会阻塞索引请求,新增索引操作记录到translog里。
- 对translog做一个快照,此快照包含第一阶段新增的索引请求,然后重放快照里的索引操作。此阶段仍然不阻塞索引请求,新增索引操作记录到translog里。
- 为了能达到主副片完全同步,阻塞掉新索引请求,然后重放阶段二新增的translog操作。
可见,在recovery完成之前,translog是不能够被清除掉的(禁用掉正常运作期间后台的flush操作)。如果shard比较大,第一阶段耗时很长,会导致此阶段产生的translog很大。重放translog比起简单的文件拷贝耗时要长得多,因此第二阶段的translog耗时也会显著增加。等到第三阶段,需要重放的translog可能会比第二阶段还要多。 而第三阶段是会阻塞新索引写入的,在对写入实时性要求很高的场合,就会非常影响用户体验。 因此,要加快大的热索引恢复速度,最好的方式是遵从上一节提到的方法: 暂停新数据写入,手动sync flush,等待数据恢复完成后,重新开启数据写入,这样可以将数据延迟影响可以降到最低。
万一遇到Recovery慢,想知道进度怎么办呢? CAT Recovery API可以显示详细的recovery各个阶段的状态。 这个API怎么用就不在这里赘述了,参考: CAT Recovery
其他Recovery相关的专家级设置
还有其他一些专家级的设置(参见: recovery)可以影响recovery的速度,但提升速度的代价是更多的资源消耗,因此在生产集群上调整这些参数需要结合实际情况谨慎调整,一旦影响应用要立即调整回来。 对于搜索并发量要求高,延迟要求低的场合,默认设置一般就不要去动了。 对于日志实时分析类对于搜索延迟要求不高,但对于数据写入延迟期望比较低的场合,可以适当调大indices.recovery.max_bytes_per_sec,提升recovery速度,减少数据写入被阻塞的时长。
最后要说的一点是ES的版本迭代很快,对于Recovery的机制也在不断的优化中。 其中有一些版本甚至引入了一些bug,比如在ES1.4.x有严重的translog recovery bug,导致大的索引trans log recovery几乎无法完成 (issue #9226) 。因此实际使用中如果遇到问题,最好在Github的issue list里搜索一下,看是否使用的版本有其他人反映同样的问题。 收起阅读 »

elasticsearch-rtf更新至2.1.1
使用git快速签出最新版:
git clone git://github.com/medcl/elasticsearch-rtf.git -b master --depth 1
包含插件:
elasticsearch-analysis-ik-1.6.2 elasticsearch-analysis-pinyin-1.5.2
elasticsearch-analysis-mmseg-1.6.2 elasticsearch-analysis-stconvert-1.6.1
使用:
cd elasticsearch/bin
./elasticsearch
使用git快速签出最新版:
git clone git://github.com/medcl/elasticsearch-rtf.git -b master --depth 1
包含插件:
elasticsearch-analysis-ik-1.6.2 elasticsearch-analysis-pinyin-1.5.2
elasticsearch-analysis-mmseg-1.6.2 elasticsearch-analysis-stconvert-1.6.1
使用:
cd elasticsearch/bin
./elasticsearch
收起阅读 »

Day22:pipeline aggregation计算日留存率示例
目前我想到的比较容易达成的做法,是我们在记录用户登录操作日志的时候,把该用户的注册时间也同期输出。也就是说,这个索引的 mapping 是下面这样:
curl -XPUT 'http://127.0.0.1:9200/login-2015.12.23/' -d '{
"settings" : {
"number_of_shards" : 1
},
"mappings" : {
"logs" : {
"properties" : {
"uid" : { "type" : "string", "index" : "not_analyzed" },
"register_time" : { "type" : "date", "index" : "not_analyzed" },
"login_time" : { "type" : "date", "index" : "not_analyzed" }
}
}
}
}'
那么实际记录的日志会类似这样:{"index":{"_index":"login-2015.12.23","_type":"logs"}}
{"uid":"1","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-23T12:00:00Z"}
{"index":{"_index":"login-2015.12.23","_type":"logs"}}
{"uid":"2","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-23T12:00:00Z"}
{"index":{"_index":"login-2015.12.24","_type":"logs"}}
{"uid":"1","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-24T12:00:00Z"}
这段我虚拟的数据,表示 uid 为 1 的用户,23 号注册并登录,24 号再次登录;uid 为 2 的用户,23 号注册并登录,24 号无登录。显然以这短短 3 行示例数据,我们口算都知道单日留存率是 50% 了。那么怎么通过一次 ES 请求也算出来呢?下面就要用到 ES 2.0 新增加的 pipeline aggregation 了。
curl -XPOST 'http://127.0.0.1:9200/login-2015.12.23,login-2015.12.24/_search' -d'
{
"size" : 0,
"aggs" : {
"new_users" : {
"filters" : {
"filters" : [
{
"range" : {
"register_time" : {
"gte" : "2015-12-23",
"lt" : "2015-12-24"
}
}
}
]
},
"aggs" : {
"register_count" : {
"cardinality" : {
"field" : "uid"
}
},
"today" : {
"filter" : {
"range" : {
"login_time" : {
"gte" : "2015-12-24",
"lt" : "2015-12-25"
}
}
},
"aggs" : {
"login_count" : {
"cardinality" : {
"field" : "uid"
}
}
}
},
"retention" : {
"bucket_script" : {
"buckets_path" : {
"today_count" : "today>login_count",
"yesterday_count" : "register_count"
},
"script" : {
"lang" : "expression",
"inline" : "today_count / yesterday_count"
}
}
}
}
}
}
}'
这个 pipeline aggregation 在使用上有几个要点:- pipeline agg 的 parent agg 必须是返回数组的 buckets agg 类型。我这里曾经打算使用 filter agg 直接请求register_time:["now-2d" TO "now-1d"],结果报错说找不到 buckets_path 的 START_OBJECT。所以改用了 filters agg 的数组格式。
- bucket_script agg 同样受 scripting module 的影响。也就是说,官网示例里的"script":"today_count / yesterday_count" 这种写法,是采用了 groovy 引擎的 inline 模式。在 ES 2.0 的默认设置下,是被禁止运行的!所以,应该按照 scripting module 的统一要求,改写成 file 形式存放到 config/scripts下;或者改用 Lucene Expression 运行。考虑到 pipeline aggregation 只支持数值运算,这里使用 groovy 价值不大,所以直接指明 lang 参数即可。
最终这次请求的响应如下:
{
"took" : 3,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"failed" : 0
},
"hits" : {
"total" : 3,
"max_score" : 0.0,
"hits" : [ ]
},
"aggregations" : {
"new_users" : {
"buckets" : [ {
"doc_count" : 3,
"today" : {
"doc_count" : 1,
"login_count" : {
"value" : 1
}
},
"register_count" : {
"value" : 2
},
"retention" : {
"value" : 0.5
}
} ]
}
}
}
这个 retention 数据,就是我们要求解的 0.5 了。目前我想到的比较容易达成的做法,是我们在记录用户登录操作日志的时候,把该用户的注册时间也同期输出。也就是说,这个索引的 mapping 是下面这样:
curl -XPUT 'http://127.0.0.1:9200/login-2015.12.23/' -d '{
"settings" : {
"number_of_shards" : 1
},
"mappings" : {
"logs" : {
"properties" : {
"uid" : { "type" : "string", "index" : "not_analyzed" },
"register_time" : { "type" : "date", "index" : "not_analyzed" },
"login_time" : { "type" : "date", "index" : "not_analyzed" }
}
}
}
}'
那么实际记录的日志会类似这样:{"index":{"_index":"login-2015.12.23","_type":"logs"}}
{"uid":"1","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-23T12:00:00Z"}
{"index":{"_index":"login-2015.12.23","_type":"logs"}}
{"uid":"2","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-23T12:00:00Z"}
{"index":{"_index":"login-2015.12.24","_type":"logs"}}
{"uid":"1","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-24T12:00:00Z"}
这段我虚拟的数据,表示 uid 为 1 的用户,23 号注册并登录,24 号再次登录;uid 为 2 的用户,23 号注册并登录,24 号无登录。显然以这短短 3 行示例数据,我们口算都知道单日留存率是 50% 了。那么怎么通过一次 ES 请求也算出来呢?下面就要用到 ES 2.0 新增加的 pipeline aggregation 了。
curl -XPOST 'http://127.0.0.1:9200/login-2015.12.23,login-2015.12.24/_search' -d'
{
"size" : 0,
"aggs" : {
"new_users" : {
"filters" : {
"filters" : [
{
"range" : {
"register_time" : {
"gte" : "2015-12-23",
"lt" : "2015-12-24"
}
}
}
]
},
"aggs" : {
"register_count" : {
"cardinality" : {
"field" : "uid"
}
},
"today" : {
"filter" : {
"range" : {
"login_time" : {
"gte" : "2015-12-24",
"lt" : "2015-12-25"
}
}
},
"aggs" : {
"login_count" : {
"cardinality" : {
"field" : "uid"
}
}
}
},
"retention" : {
"bucket_script" : {
"buckets_path" : {
"today_count" : "today>login_count",
"yesterday_count" : "register_count"
},
"script" : {
"lang" : "expression",
"inline" : "today_count / yesterday_count"
}
}
}
}
}
}
}'
这个 pipeline aggregation 在使用上有几个要点:- pipeline agg 的 parent agg 必须是返回数组的 buckets agg 类型。我这里曾经打算使用 filter agg 直接请求register_time:["now-2d" TO "now-1d"],结果报错说找不到 buckets_path 的 START_OBJECT。所以改用了 filters agg 的数组格式。
- bucket_script agg 同样受 scripting module 的影响。也就是说,官网示例里的"script":"today_count / yesterday_count" 这种写法,是采用了 groovy 引擎的 inline 模式。在 ES 2.0 的默认设置下,是被禁止运行的!所以,应该按照 scripting module 的统一要求,改写成 file 形式存放到 config/scripts下;或者改用 Lucene Expression 运行。考虑到 pipeline aggregation 只支持数值运算,这里使用 groovy 价值不大,所以直接指明 lang 参数即可。
最终这次请求的响应如下:
{
"took" : 3,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"failed" : 0
},
"hits" : {
"total" : 3,
"max_score" : 0.0,
"hits" : [ ]
},
"aggregations" : {
"new_users" : {
"buckets" : [ {
"doc_count" : 3,
"today" : {
"doc_count" : 1,
"login_count" : {
"value" : 1
}
},
"register_count" : {
"value" : 2
},
"retention" : {
"value" : 0.5
}
} ]
}
}
}
这个 retention 数据,就是我们要求解的 0.5 了。收起阅读 »

Day21: 如何快速把Kibana4 Discover页的Document Table导出成CSV
但是Discover页上,除了顶部的date_histogram这个visualize,更重要的是下边的search document table的内容。当我们通过搜索发现异常信息,想要长期保存证据,或者分享给其他没有权限的外部人员的时候,单纯保存search到es,或者分享单条日志的link都不顶用,还是需要能导出成一个文件。
可惜Kibana4没有针对search document table的导出!
国外一家叫MineWhat的公司,最近公开了一个非常细小的创新方案,意图解决这个问题。他们的方式是:避免修改Kibana源码,而通过chrome浏览器插件完成……
点击这个地址安装chrome插件:https://chrome.google.com/webs ... lated
然后再访问Kibana的时候,你会发现自己的搜索框最右侧多了一个CSV按钮:

然后点击这个『CSV』按钮,会弹出一片提示:

可以点击选择,把search document table内容保存到本机的复制粘贴板,还是Google Drive网盘。
我们当然选择本机……
然后打开本地的文本文件,Ctrl+V,就看到编辑器里出现了整个CSV内容。
实测下来,发现有个小问题,粘贴出来的数据里丢掉了空格~不过聊胜于无吧,还是介绍给大家一试。
注意:这个功能只会导出目前页面上已经展示出来的table内容。并不代表其使用了scroll API去ES拉取全部结果集!
但是Discover页上,除了顶部的date_histogram这个visualize,更重要的是下边的search document table的内容。当我们通过搜索发现异常信息,想要长期保存证据,或者分享给其他没有权限的外部人员的时候,单纯保存search到es,或者分享单条日志的link都不顶用,还是需要能导出成一个文件。
可惜Kibana4没有针对search document table的导出!
国外一家叫MineWhat的公司,最近公开了一个非常细小的创新方案,意图解决这个问题。他们的方式是:避免修改Kibana源码,而通过chrome浏览器插件完成……
点击这个地址安装chrome插件:https://chrome.google.com/webs ... lated
然后再访问Kibana的时候,你会发现自己的搜索框最右侧多了一个CSV按钮:

然后点击这个『CSV』按钮,会弹出一片提示:

可以点击选择,把search document table内容保存到本机的复制粘贴板,还是Google Drive网盘。
我们当然选择本机……
然后打开本地的文本文件,Ctrl+V,就看到编辑器里出现了整个CSV内容。
实测下来,发现有个小问题,粘贴出来的数据里丢掉了空格~不过聊胜于无吧,还是介绍给大家一试。
注意:这个功能只会导出目前页面上已经展示出来的table内容。并不代表其使用了scroll API去ES拉取全部结果集! 收起阅读 »

简繁体转换插件更新:elasticsearch-analysis-stconvert 升级支持2.0
项目地址:https://github.com/medcl/elast ... nvert
mvn 编译打包,拷贝release下面的zip并解压到你的es plugins目录即可,需要重启es
这个插件帮你处理简繁体,简繁体全部统一成简体或繁体,不管输入的简体还是繁体,都能得到搜索结果
比如:
不管输入的是『北京国际电视台』的还是『北京國際電視臺』都能命中。
详细配置和使用请参照上面的地址。
项目地址:https://github.com/medcl/elast ... nvert
mvn 编译打包,拷贝release下面的zip并解压到你的es plugins目录即可,需要重启es
这个插件帮你处理简繁体,简繁体全部统一成简体或繁体,不管输入的简体还是繁体,都能得到搜索结果
比如:
不管输入的是『北京国际电视台』的还是『北京國際電視臺』都能命中。
详细配置和使用请参照上面的地址。
收起阅读 »

Day20 利用tcpdump和kafka协议定位不合法topic的来源
因为在topic名字里面不能含有%字符, 所以kafka server的日志里面大量报错. Logstash每发一次数据, kafka就会生成下面一大段错误
[2015-12-23 23:20:47,749] ERROR [KafkaApi-0] error when handling request Name: TopicMetadataRequest; Version: 0; CorrelationId: 48; ClientId: ; Topics: test-%{type} (kafka.server.KafkaApis)
kafka.common.InvalidTopicException: topic name test-%{type} is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'
at kafka.common.Topic$.validate(Topic.scala:42)
at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:181)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172)
at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:520)
at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
at scala.collection.SetLike$class.map(SetLike.scala:93)
at scala.collection.AbstractSet.map(Set.scala:47)
at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503)
at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542)
at kafka.server.KafkaApis.handle(KafkaApis.scala:62)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
at java.lang.Thread.run(Thread.java:744)
把可用的信息瞬间淹没.
更不幸的是, 错误日志里面并没有客户来源的信息, 根本不知道是哪些机器还有问题.
我想做的, 就是把有问题的logstash机器找出来.
我就先事后诸葛亮一把, 用下面这个命令就可以把配置错误的机器找出来(也可以没有任何结果, 原因后面说)
tcpdump -nn 'dst port 9092 and tcp[37]==3 and tcp[57]==37'
dst port 9092就不说了, 这是kafka的默认端口, 后面的tcp[37]==3 and tcp[57]==37是啥意思呢, 我们慢慢说.
先要说一下: client要生产数据到kafka, 在发送消息之前, 首先得向kafka"询问"这个topic的metadata信息, 包括有几个partiton, 每个parttion在哪个服务器上面等信息, 拿到这些信息之后, 才能把消息发到正确的kafka服务器上.
重点来了! 向kafka"询问"topic的metadata, 其实就是发送一个tcp包过去, 我们需要知道的是这个tcp包的格式. 我已经帮你找到了, 就在这里 https://cwiki.apache.org/confl ... quest
看完文档之后(半小时或者更长时间过去了), 你就会知道, tcp body(除去tcp head)里面的第6个字节是03, 代表这是一个TopicMetadataRequest请求. topicname里面的%字符出现在tcp body的第26个字节, %的ascii码是37
tcp头一般是20个字符, 所以加上这20个字节, 然后下标从0算起, 就是tcp[20+5]==3 and tcp[20+25]==37, 也就是tcp[25]==3 and tcp[45]==37.
咦, 为啥和开始写的那个过滤条件不一样呢, 因为tcp头"一般"是20字节, 但是如果其中还包含了tcp选项的话, 就可能比20多了. 反正我这里看到的的tcp头都是32个字节, 所以不能加20, 要加32, 也就是最开始写的 tcp[37]==3 and tcp[57]==37
最后呢, 再提2点结束.
1. 终极大杀器, 不过tcp头的长度是多少, 20也好, 32也好, 或者其他也好, 下面这样都能搞定
tcpdump -nn 'dst port 9092 and tcp[(tcp[12]>>2)+5]==3 and tcp[(tcp[12]>>2)+25]==37'
2. 不要一上来就这么高端, 其实我最开始是这样先确定问题的
tcpdump -vv -nn -X -s 0 dst port 9092 | grep -C 5 "test-"
你问我为啥不把test-t{type}写完整? 不是为了省事, 其实是因为很不幸, test-%{t 到这里的时候, 正好换行了.
因为在topic名字里面不能含有%字符, 所以kafka server的日志里面大量报错. Logstash每发一次数据, kafka就会生成下面一大段错误
[2015-12-23 23:20:47,749] ERROR [KafkaApi-0] error when handling request Name: TopicMetadataRequest; Version: 0; CorrelationId: 48; ClientId: ; Topics: test-%{type} (kafka.server.KafkaApis)
kafka.common.InvalidTopicException: topic name test-%{type} is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'
at kafka.common.Topic$.validate(Topic.scala:42)
at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:181)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172)
at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:520)
at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
at scala.collection.SetLike$class.map(SetLike.scala:93)
at scala.collection.AbstractSet.map(Set.scala:47)
at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503)
at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542)
at kafka.server.KafkaApis.handle(KafkaApis.scala:62)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
at java.lang.Thread.run(Thread.java:744)
把可用的信息瞬间淹没.
更不幸的是, 错误日志里面并没有客户来源的信息, 根本不知道是哪些机器还有问题.
我想做的, 就是把有问题的logstash机器找出来.
我就先事后诸葛亮一把, 用下面这个命令就可以把配置错误的机器找出来(也可以没有任何结果, 原因后面说)
tcpdump -nn 'dst port 9092 and tcp[37]==3 and tcp[57]==37'
dst port 9092就不说了, 这是kafka的默认端口, 后面的tcp[37]==3 and tcp[57]==37是啥意思呢, 我们慢慢说.
先要说一下: client要生产数据到kafka, 在发送消息之前, 首先得向kafka"询问"这个topic的metadata信息, 包括有几个partiton, 每个parttion在哪个服务器上面等信息, 拿到这些信息之后, 才能把消息发到正确的kafka服务器上.
重点来了! 向kafka"询问"topic的metadata, 其实就是发送一个tcp包过去, 我们需要知道的是这个tcp包的格式. 我已经帮你找到了, 就在这里 https://cwiki.apache.org/confl ... quest
看完文档之后(半小时或者更长时间过去了), 你就会知道, tcp body(除去tcp head)里面的第6个字节是03, 代表这是一个TopicMetadataRequest请求. topicname里面的%字符出现在tcp body的第26个字节, %的ascii码是37
tcp头一般是20个字符, 所以加上这20个字节, 然后下标从0算起, 就是tcp[20+5]==3 and tcp[20+25]==37, 也就是tcp[25]==3 and tcp[45]==37.
咦, 为啥和开始写的那个过滤条件不一样呢, 因为tcp头"一般"是20字节, 但是如果其中还包含了tcp选项的话, 就可能比20多了. 反正我这里看到的的tcp头都是32个字节, 所以不能加20, 要加32, 也就是最开始写的 tcp[37]==3 and tcp[57]==37
最后呢, 再提2点结束.
1. 终极大杀器, 不过tcp头的长度是多少, 20也好, 32也好, 或者其他也好, 下面这样都能搞定
tcpdump -nn 'dst port 9092 and tcp[(tcp[12]>>2)+5]==3 and tcp[(tcp[12]>>2)+25]==37'
2. 不要一上来就这么高端, 其实我最开始是这样先确定问题的
tcpdump -vv -nn -X -s 0 dst port 9092 | grep -C 5 "test-"
你问我为啥不把test-t{type}写完整? 不是为了省事, 其实是因为很不幸, test-%{t 到这里的时候, 正好换行了. 收起阅读 »