Packetbeat协议扩展开发教程(3)
前面介绍了Packetbeat的项目结构,今天终于要开始写代码了,想想还是有点小激动呢。(你快点吧,拖半天了)
网络传输两大协议TCP和UDP,我们的所有协议都不离这两种,HTTP、MySQL走的是TCP传输协议,DNS走的是UDP协议,在Packetbeat里面,实现一个自己的协议非常简单,继承并实现这两者对应的接口就行了,我们看一下长什么样:
打开一个现有的UDP和HTTP协议接口定义:
/~/go/src/github.com/elastic/beats/packetbeat/protos/protos.go
// Functions to be exported by a protocol plugin
type ProtocolPlugin interface {
// Called to initialize the Plugin
Init(test_mode bool, results publisher.Client) error
// Called to return the configured ports
GetPorts() int
}
type TcpProtocolPlugin interface {
ProtocolPlugin
// Called when TCP payload data is available for parsing.
Parse(pkt *Packet, tcptuple *common.TcpTuple,
dir uint8, private ProtocolData) ProtocolData
// Called when the FIN flag is seen in the TCP stream.
ReceivedFin(tcptuple *common.TcpTuple, dir uint8,
private ProtocolData) ProtocolData
// Called when a packets are missing from the tcp
// stream.
GapInStream(tcptuple *common.TcpTuple, dir uint8, nbytes int,
private ProtocolData) (priv ProtocolData, drop bool)
// ConnectionTimeout returns the per stream connection timeout.
// Return <=0 to set default tcp module transaction timeout.
ConnectionTimeout() time.Duration
}
type UdpProtocolPlugin interface {
ProtocolPlugin
// ParseUdp is invoked when UDP payload data is available for parsing.
ParseUdp(pkt *Packet)
}
TcpProtocolPlugin:TCP协议插件的接口定义,依次是:Parse() 解析Packet,ReceivedFin()处理TCP断开连接,GapInStream()处理空包丢包,ConnectionTimeout()超时时间;UdpProtocolPlugin: UDP协议的接口定义,UDP协议是不需要握手和保障数据可靠性的,扔出去就结束,速度快,不保证数据可靠送达,所以只有ParseUdp一个方法需要实现,比较简单;
ProtocolPlugin:TCP和UDP都需要实现ProtocolPlugin的基础接口,其实就定义了获取端口和初始化接口。
请问:
Packetbeat怎么工作的?
回答:
每一个协议都有一个固定的端口用于通信,你要做的事情就是定义协议端口,然后按协议是TCP还是UDP来实现对应的接口,Packetbeat将会截获指定端口的数据包(Packet),然后如果交给你定义的方法来进行解析,TCP是Parse,UDP是ParseUdp,都在上面的接口定义好的,然后将解析出来的结构化数据封装成Json,然后扔给Elasticsearch,后续的就的如何对这些数据做一些有趣的分析和应用了。
貌似很简单嘛!
进入每个端口的数据包,我们假设是一个自来水管,拧开80端口,哗啦啦出来的全是HTTP请求的数据包,Packetbeat里面Http协议监听的是80端口啊,所有这些包统统都交给Packetbeat里面的Http协议模块来进行解析,Http协议会一个个的检查这些数据包,也就是每个数据包都会调用一次Parse接口,到这里提到了传过来一个Packet,我们看看它的数据结构长什么样?
type Packet struct {
Ts time.Time
Tuple common.IpPortTuple
Payload byte
}
Packet结构简单,Ts是收到数据包的时间戳;
Tuple是一个来源IP+来源端口和目的IP+目的端口的元组;
Payload就是这个包里面的传输的有用的数据,应用层的字节数据,不包括IP和TCP/UDP头信息,是不是处理起来简单许多。
首选我们确定SMTP协议的配置,每个协议在packetbeat.yml的protocol下面都应该有一个配置节点,如下:
protocols:
smtp:
# Configure the ports where to listen for Smtp traffic. You can disable
# the Smtp protocol by commenting out the list of ports.
ports: [25]
还需要在对应的config类文件:packetbeat/config/config.go,增加SMTP的结构体,目前只支持一个端口参数,继承基类ProtocolCommon就行,如下:git diff config/config.go
@@ -42,6 +42,7 @@ type Protocols struct {
Pgsql Pgsql
Redis Redis
Thrift Thrift
+ Smtp Smtp
}
type Dns struct {
@@ -118,5 +119,9 @@ type Redis struct {
Send_response *bool
}
+type Smtp struct {
+ ProtocolCommon `yaml:",inline"`
+}
+
// Config Singleton
var ConfigSingleton Config
在protos文件夹下面,新增smtp目录,并新增空白文件smtp.go,路径:packetbeat/protos/smtp/smtp.go,这里就是解析SMTP协议的地方,也是我们扩展协议的主要的工作。
...TODO...
修改protos/protos.go,增加SMTP协议枚举,这里记得保证顺序一致,并且protocol名称必须和配置的节点名称一致,如这里都是smtp。git diff protos/protos.go
@@ -103,6 +103,7 @@ const (
MongodbProtocol
DnsProtocol
MemcacheProtocol
+ SmtpProtocol
)
// Protocol names
@@ -116,6 +117,7 @@ var ProtocolNames = string{
"mongodb",
"dns",
"memcache",
+ "smtp",
}
继续修改packetbeat.go主文件,允许SMTP协议并加载。git diff packetbeat.go
@@ -27,6 +27,7 @@ import (
"github.com/elastic/packetbeat/protos/tcp"
"github.com/elastic/packetbeat/protos/thrift"
"github.com/elastic/packetbeat/protos/udp"
+ "github.com/elastic/packetbeat/protos/smtp"
"github.com/elastic/packetbeat/sniffer"
)
@@ -43,6 +44,7 @@ var EnabledProtocolPlugins map[protos.Protocol]protos.ProtocolPlugin = map[proto
protos.ThriftProtocol: new(thrift.Thrift),
protos.MongodbProtocol: new(mongodb.Mongodb),
protos.DnsProtocol: new(dns.Dns),
+ protos.SmtpProtocol: new(smtp.Smtp),
}
做完上面一系列修改之后,一个空白的SMTP协议的插件的架子就搭好了,并且插件也注册到了Packetbeat里面了,接下来我们再把packetbeat/protos/smtp/smtp.go按照TCPplugin接口的要求实现一下。说实话TCP处理起来很难,开始之前,我们先明确几个概念,TCP协议是有状态的,并且是流式的,我们关注的是七层应用层的消息,如HTTP里面的一个HTTP请求和返回,但是TCP底层都是一系列数据包,并且不同的请求的数据包是混杂在一起的,也就是说一个数据包里面可能只是一个HTTP请求的一部分也可能包含多条HTTP请求的一部分,所以Parse()里面需要处理跨数据包的状态信息,我们要把这些数据包和具体的七层的应用层的消息关联起来。
现在我们仔细看看Parse()接口的各个参数定义是做什么用的
Parse(pkt *Packet, tcptuple *common.TcpTuple,
dir uint8, private ProtocolData) ProtocolData
pkt不用说了,是送进来的数据包,前面已经介绍了其数据结构,tcptuple是该数据包所属的TCP数据流所在的唯一标示(一个未关闭的TCP数据量包含若干数据包,直到TCP链接关闭),使用tcptuple.Hashable()获取唯一值;dir参数标示数据包在TCP数据流中的流向,和第一个TCP数据包方向一致是TcpDirectionOriginal,否则是TcpDirectionReverse;private参数可用来在TCP流中存储状态信息,可在运行时转换成具体的强类型,任意修改和传递给下一个Parse方法,简单来说就是进行中间数据的共享。下面看段MySQL模块里面的例子
priv := mysqlPrivateData{}
if private != nil {
var ok bool
priv, ok = private.(mysqlPrivateData)
if !ok {
priv = mysqlPrivateData{}
}
}
[ ... ]
return priv
上面的代码就是将private强制转换成mysqlPrivateData结构,然后再使用。我们再继续看后续怎么处理这些包的一个逻辑例子
ok, complete := mysqlMessageParser(priv.Data[dir])
if !ok {
// drop this tcp stream. Will retry parsing with the next
// segment in it
priv.Data[dir] = nil
logp.Debug("mysql", "Ignore MySQL message. Drop tcp stream.")
return priv
}
if complete {
mysql.messageComplete(tcptuple, dir, stream)
} else {
// wait for more data
break
}
mysqlMessageParser是一个解析mysql消息的方法,细节我们忽略,我们只需要关心它的返回,ok标示成功或者失败,true则继续处理,false表示数据包不能用,那就直接忽略;第二个参数complete表示判断这一个MySQL消息是否已经完整了,如果完整了,我们就可以扔出去了,否则继续等待剩下的消息内容。好的,我们看看SMTP协议怎么折腾吧,先看看一个邮件交互的流程图,来自RFC5321:
由上图可见,发送端和邮件服务器通过一系列命令来执行邮件的发送,下面看看一个具体的命令操作流程(来源:简单邮件传输协议)[/url]
S: 220 www.example.com ESMTP Postfix
C: HELO mydomain.com
S: 250 Hello mydomain.com
C: MAIL FROM:
S: 250 Ok
C: RCPT TO:
S: 250 Ok
C: DATA
S: 354 End data with .
C: Subject: test message
C: From:""< sender@mydomain.com>
C: To:""< friend@example.com>
C:
C: Hello,
C: This is a test.
C: Goodbye.
C: .
S: 250 Ok: queued as 12345
C: quit
S: 221 Bye
上面的过程可以看到就几个命令就能将邮件发送出去,但是其实SMTP协议比较复杂,还包括身份认证、附件、多媒体编码等等,我们今天精简一下,我们目前只关心谁给谁发了邮件,发送内容先不管,这样相比完整的SMTP协议(RFC5321),我们只需要关注以下几个命令:MAIL:开始一份邮件 mail from: xxx@xx.com
RCPT: 标识单个的邮件接收人;常在mail命令后面 可有多个rcpt to: xx@xx.com
QUIT:结束SMTP会话,不一定发送了邮件,注意
RESET:重置会话,当前传输被取消
最终希望通过Packetbeat将这些数据解析并处理成我们想要的如下JSON数据,即大功告成:
{
"timestamp":"2016-1-15 12:00:00",
"from":"medcl@example.co",
"to":["lcdem@example.co"]
}
我们还需要一个测试数据,这里有一个下载各种协议测试数据包的地方,由wireshark站点提供:https://wiki.wireshark.org/SampleCaptures/Ctrl+F找到SMTP的下载地址:smtp.pcap
用wireshark打开我们刚刚下载的smtp.pcap文件,然后再输入过滤条件:tcp.port == 25,只看25端口的数据,如下图:
上图可以看到25端口的跑的数据有很多,不过我们只关心我们需要的那几个命令就好了。
打开/~/go/src/github.com/elastic/beats/packetbeat/protos/smtp/smtp.go
定义smtpPrivateData,里面的Data是一个数组,分别是TCP两个方向的数据,SmtpMessage是解析出来的邮件信息
type smtpPrivateData struct{
Data [2]*SmtpStream
}
type SmtpStream struct {
tcptuple *common.TcpTuple
data byte
parseOffset int
isClient bool
message *SmtpMessage
}
type SmtpMessage struct {
Ts time.Time
From string
To string
}
然后参照MySQL协议,定义相应的方法,最终如下:package smtp
import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/packetbeat/config"
"github.com/elastic/beats/packetbeat/protos"
"github.com/elastic/beats/packetbeat/protos/tcp"
"bytes"
"time"
"strings"
)
type smtpPrivateData struct{
Data [2]*SmtpStream
}
type SmtpStream struct {
tcptuple *common.TcpTuple
data byte
parseOffset int
isClient bool
message *SmtpMessage
}
type SmtpMessage struct {
start int
end int
Ts time.Time
From string
To string
IgnoreMessage bool
}
type Smtp struct {
SendRequest bool
SendResponse bool
transactionTimeout time.Duration
Ports int
results publisher.Client
}
func (smtp *Smtp) initDefaults() {
smtp.SendRequest = false
smtp.SendResponse = false
smtp.transactionTimeout = protos.DefaultTransactionExpiration
}
func (smtp *Smtp) setFromConfig(config config.Smtp) error {
smtp.Ports = config.Ports
if config.SendRequest != nil {
smtp.SendRequest = *config.SendRequest
}
if config.SendResponse != nil {
smtp.SendResponse = *config.SendResponse
}
if config.TransactionTimeout != nil && *config.TransactionTimeout > 0 {
smtp.transactionTimeout = time.Duration(*config.TransactionTimeout) * time.Second
}
return nil
}
func (smtp *Smtp) GetPorts() int {
return smtp.Ports
}
func (smtp *Smtp) Init(test_mode bool, results publisher.Client) error {
smtp.initDefaults()
if !test_mode {
err := smtp.setFromConfig(config.ConfigSingleton.Protocols.Smtp)
if err != nil {
return err
}
}
smtp.results = results
return nil
}
func readLine(data byte, offset int) (bool, string, int) {
q := bytes.Index(data[offset:], byte("\r\n"))
if q == -1 {
return false, "", 0
}
return true, string(data[offset : offset+q]), offset + q + 2
}
func (smtp *Smtp) Parse(pkt *protos.Packet, tcptuple *common.TcpTuple, dir uint8, private protos.ProtocolData, ) protos.ProtocolData {
defer logp.Recover("ParseSmtp exception")
priv := smtpPrivateData{}
if private != nil {
var ok bool
priv, ok = private.(smtpPrivateData)
if !ok {
priv = smtpPrivateData{}
}
}
if priv.Data[dir] == nil {
priv.Data[dir] = &SmtpStream{
tcptuple: tcptuple,
data: pkt.Payload,
message: &SmtpMessage{Ts: pkt.Ts},
}
} else {
// concatenate bytes
priv.Data[dir].data = append(priv.Data[dir].data, pkt.Payload...)
if len(priv.Data[dir].data) > tcp.TCP_MAX_DATA_IN_STREAM {
logp.Debug("smtp", "Stream data too large, dropping TCP stream")
priv.Data[dir] = nil
return priv
}
}
stream := priv.Data[dir]
for len(stream.data) > 0 {
if stream.message == nil {
stream.message = &SmtpMessage{Ts: pkt.Ts}
}
ok, complete := stmpMessageParser(priv.Data[dir])
if !ok {
// drop this tcp stream. Will retry parsing with the next
// segment in it
priv.Data[dir] = nil
logp.Debug("smtp", "Ignore SMTP message. Drop tcp stream. Try parsing with the next segment")
return priv
}
if complete {
smtp.messageComplete(tcptuple, dir, stream)
} else {
logp.Debug("smtp","still wait message...")
// wait for more data
break
}
}
return priv
}
func (smtp *Smtp) ConnectionTimeout() time.Duration {
return smtp.transactionTimeout
}
func stmpMessageParser(s *SmtpStream) (bool, bool) {
var value string=""
for s.parseOffset < len(s.data) {
logp.Debug("smtp", "Parse message: %s", string(s.data[s.parseOffset]))
if strings.HasPrefix(string(s.data[s.parseOffset]),"MAIL" ) {
logp.Debug("smtp", "Hit MAIL command: %s", string(s.data[s.parseOffset]))
found, line, off := readLine(s.data, s.parseOffset)
if !found {
return true, false
}
value = line[1:]
logp.Debug("smtp", "value %s", value)
s.parseOffset = off
} else {
logp.Debug("smtp", "Unexpected message starting with %s", s.data[s.parseOffset:])
return false, false
}
}
return true, false
}
func handleSmtp(stmp *Smtp, m *SmtpMessage, tcptuple *common.TcpTuple,
dir uint8, raw_msg byte) {
logp.Info("smtp","handle smtp message...")
//TODO
}
// Called when the parser has identified a full message.
func (smtp *Smtp) messageComplete(tcptuple *common.TcpTuple, dir uint8, stream *SmtpStream) {
logp.Info("smtp","message completed...")
// all ok, ship it
msg := stream.data[stream.message.start:stream.message.end]
if !stream.message.IgnoreMessage {
handleSmtp(smtp, stream.message, tcptuple, dir, msg)
}
// and reset message
stream.PrepareForNewMessage()
}
func (stream *SmtpStream) PrepareForNewMessage() {
logp.Info("smtp","prepare for new message...")
stream.data = stream.data[stream.parseOffset:]
stream.parseOffset = 0
stream.isClient = false
stream.message = nil
}
func (smtp *Smtp) GapInStream(tcptuple *common.TcpTuple, dir uint8,
nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool) {
defer logp.Recover("GapInStream(smtp) exception")
if private == nil {
return private, false
}
return private, true
}
func (smtp *Smtp) ReceivedFin(tcptuple *common.TcpTuple, dir uint8,
private protos.ProtocolData) protos.ProtocolData {
logp.Info("smtp","stream closed...")
// TODO: check if we have data pending and either drop it to free
// memory or send it up the stack.
return private
}
现在切换到命令行,编译一下cd ~/go/src/github.com/elastic/beats/packetbeat
make
编译成功,一个滚烫的packetbeat可执行文件就躺在当前目录下了,运行一下先,参数-I 指定pcap文件(还记得前面下载的那个测试文件吧)./packetbeat -d "smtp" -c etc/packetbeat.yml -I ~/Downloads/smtp.pcap -e -N
运行查看控制台输出结果:➜ packetbeat git:(smtpbeat) ✗ ./packetbeat -d "smtp" -c etc/packetbeat.yml -I ~/Downloads/smtp.pcap -e -N
2016/01/15 10:12:19.058535 publish.go:191: INFO Dry run mode. All output types except the file based one are disabled.
2016/01/15 10:12:19.058570 geolite.go:24: INFO GeoIP disabled: No paths were set under output.geoip.paths
2016/01/15 10:12:19.058592 publish.go:262: INFO Publisher name: medcls-MacBook.local
2016/01/15 10:12:19.058724 beat.go:145: INFO Init Beat: packetbeat; Version: 1.0.0
2016/01/15 10:12:19.059758 beat.go:171: INFO packetbeat sucessfully setup. Start running.
2016/01/15 10:12:20.155335 smtp.go:163: DBG Parse message: 2
2016/01/15 10:12:20.155416 smtp.go:180: DBG Unexpected message starting with 250-xc90.websitewelcome.com Hello GP [122.162.143.157]
250-SIZE 52428800
250-PIPELINING
250-AUTH PLAIN LOGIN
250-STARTTLS
250 HELP
2016/01/15 10:12:22.310974 smtp.go:163: DBG Parse message: F
2016/01/15 10:12:22.311025 smtp.go:180: DBG Unexpected message starting with From: "Gurpartap Singh"
To:
Subject: SMTP
Date: Mon, 5 Oct 2009 11:36:07 +0530
Message-ID: <000301ca4581$ef9e57f0$cedb07d0$@in>
MIME-Version: 1.0
...
成功了,邮件内容都在控制台输出了,但这还不是我们要的最终结果,我需要里面的关键信息,我们继续修改smtp.go这个文件。留待下回分解。
前面介绍了Packetbeat的项目结构,今天终于要开始写代码了,想想还是有点小激动呢。(你快点吧,拖半天了)
网络传输两大协议TCP和UDP,我们的所有协议都不离这两种,HTTP、MySQL走的是TCP传输协议,DNS走的是UDP协议,在Packetbeat里面,实现一个自己的协议非常简单,继承并实现这两者对应的接口就行了,我们看一下长什么样:
打开一个现有的UDP和HTTP协议接口定义:
/~/go/src/github.com/elastic/beats/packetbeat/protos/protos.go
// Functions to be exported by a protocol plugin
type ProtocolPlugin interface {
// Called to initialize the Plugin
Init(test_mode bool, results publisher.Client) error
// Called to return the configured ports
GetPorts() int
}
type TcpProtocolPlugin interface {
ProtocolPlugin
// Called when TCP payload data is available for parsing.
Parse(pkt *Packet, tcptuple *common.TcpTuple,
dir uint8, private ProtocolData) ProtocolData
// Called when the FIN flag is seen in the TCP stream.
ReceivedFin(tcptuple *common.TcpTuple, dir uint8,
private ProtocolData) ProtocolData
// Called when a packets are missing from the tcp
// stream.
GapInStream(tcptuple *common.TcpTuple, dir uint8, nbytes int,
private ProtocolData) (priv ProtocolData, drop bool)
// ConnectionTimeout returns the per stream connection timeout.
// Return <=0 to set default tcp module transaction timeout.
ConnectionTimeout() time.Duration
}
type UdpProtocolPlugin interface {
ProtocolPlugin
// ParseUdp is invoked when UDP payload data is available for parsing.
ParseUdp(pkt *Packet)
}
TcpProtocolPlugin:TCP协议插件的接口定义,依次是:Parse() 解析Packet,ReceivedFin()处理TCP断开连接,GapInStream()处理空包丢包,ConnectionTimeout()超时时间;UdpProtocolPlugin: UDP协议的接口定义,UDP协议是不需要握手和保障数据可靠性的,扔出去就结束,速度快,不保证数据可靠送达,所以只有ParseUdp一个方法需要实现,比较简单;
ProtocolPlugin:TCP和UDP都需要实现ProtocolPlugin的基础接口,其实就定义了获取端口和初始化接口。
请问:
Packetbeat怎么工作的?
回答:
每一个协议都有一个固定的端口用于通信,你要做的事情就是定义协议端口,然后按协议是TCP还是UDP来实现对应的接口,Packetbeat将会截获指定端口的数据包(Packet),然后如果交给你定义的方法来进行解析,TCP是Parse,UDP是ParseUdp,都在上面的接口定义好的,然后将解析出来的结构化数据封装成Json,然后扔给Elasticsearch,后续的就的如何对这些数据做一些有趣的分析和应用了。
貌似很简单嘛!
进入每个端口的数据包,我们假设是一个自来水管,拧开80端口,哗啦啦出来的全是HTTP请求的数据包,Packetbeat里面Http协议监听的是80端口啊,所有这些包统统都交给Packetbeat里面的Http协议模块来进行解析,Http协议会一个个的检查这些数据包,也就是每个数据包都会调用一次Parse接口,到这里提到了传过来一个Packet,我们看看它的数据结构长什么样?
type Packet struct {
Ts time.Time
Tuple common.IpPortTuple
Payload byte
}
Packet结构简单,Ts是收到数据包的时间戳;
Tuple是一个来源IP+来源端口和目的IP+目的端口的元组;
Payload就是这个包里面的传输的有用的数据,应用层的字节数据,不包括IP和TCP/UDP头信息,是不是处理起来简单许多。
首选我们确定SMTP协议的配置,每个协议在packetbeat.yml的protocol下面都应该有一个配置节点,如下:
protocols:
smtp:
# Configure the ports where to listen for Smtp traffic. You can disable
# the Smtp protocol by commenting out the list of ports.
ports: [25]
还需要在对应的config类文件:packetbeat/config/config.go,增加SMTP的结构体,目前只支持一个端口参数,继承基类ProtocolCommon就行,如下:git diff config/config.go
@@ -42,6 +42,7 @@ type Protocols struct {
Pgsql Pgsql
Redis Redis
Thrift Thrift
+ Smtp Smtp
}
type Dns struct {
@@ -118,5 +119,9 @@ type Redis struct {
Send_response *bool
}
+type Smtp struct {
+ ProtocolCommon `yaml:",inline"`
+}
+
// Config Singleton
var ConfigSingleton Config
在protos文件夹下面,新增smtp目录,并新增空白文件smtp.go,路径:packetbeat/protos/smtp/smtp.go,这里就是解析SMTP协议的地方,也是我们扩展协议的主要的工作。
...TODO...
修改protos/protos.go,增加SMTP协议枚举,这里记得保证顺序一致,并且protocol名称必须和配置的节点名称一致,如这里都是smtp。git diff protos/protos.go
@@ -103,6 +103,7 @@ const (
MongodbProtocol
DnsProtocol
MemcacheProtocol
+ SmtpProtocol
)
// Protocol names
@@ -116,6 +117,7 @@ var ProtocolNames = string{
"mongodb",
"dns",
"memcache",
+ "smtp",
}
继续修改packetbeat.go主文件,允许SMTP协议并加载。git diff packetbeat.go
@@ -27,6 +27,7 @@ import (
"github.com/elastic/packetbeat/protos/tcp"
"github.com/elastic/packetbeat/protos/thrift"
"github.com/elastic/packetbeat/protos/udp"
+ "github.com/elastic/packetbeat/protos/smtp"
"github.com/elastic/packetbeat/sniffer"
)
@@ -43,6 +44,7 @@ var EnabledProtocolPlugins map[protos.Protocol]protos.ProtocolPlugin = map[proto
protos.ThriftProtocol: new(thrift.Thrift),
protos.MongodbProtocol: new(mongodb.Mongodb),
protos.DnsProtocol: new(dns.Dns),
+ protos.SmtpProtocol: new(smtp.Smtp),
}
做完上面一系列修改之后,一个空白的SMTP协议的插件的架子就搭好了,并且插件也注册到了Packetbeat里面了,接下来我们再把packetbeat/protos/smtp/smtp.go按照TCPplugin接口的要求实现一下。说实话TCP处理起来很难,开始之前,我们先明确几个概念,TCP协议是有状态的,并且是流式的,我们关注的是七层应用层的消息,如HTTP里面的一个HTTP请求和返回,但是TCP底层都是一系列数据包,并且不同的请求的数据包是混杂在一起的,也就是说一个数据包里面可能只是一个HTTP请求的一部分也可能包含多条HTTP请求的一部分,所以Parse()里面需要处理跨数据包的状态信息,我们要把这些数据包和具体的七层的应用层的消息关联起来。
现在我们仔细看看Parse()接口的各个参数定义是做什么用的
Parse(pkt *Packet, tcptuple *common.TcpTuple,
dir uint8, private ProtocolData) ProtocolData
pkt不用说了,是送进来的数据包,前面已经介绍了其数据结构,tcptuple是该数据包所属的TCP数据流所在的唯一标示(一个未关闭的TCP数据量包含若干数据包,直到TCP链接关闭),使用tcptuple.Hashable()获取唯一值;dir参数标示数据包在TCP数据流中的流向,和第一个TCP数据包方向一致是TcpDirectionOriginal,否则是TcpDirectionReverse;private参数可用来在TCP流中存储状态信息,可在运行时转换成具体的强类型,任意修改和传递给下一个Parse方法,简单来说就是进行中间数据的共享。下面看段MySQL模块里面的例子
priv := mysqlPrivateData{}
if private != nil {
var ok bool
priv, ok = private.(mysqlPrivateData)
if !ok {
priv = mysqlPrivateData{}
}
}
[ ... ]
return priv
上面的代码就是将private强制转换成mysqlPrivateData结构,然后再使用。我们再继续看后续怎么处理这些包的一个逻辑例子
ok, complete := mysqlMessageParser(priv.Data[dir])
if !ok {
// drop this tcp stream. Will retry parsing with the next
// segment in it
priv.Data[dir] = nil
logp.Debug("mysql", "Ignore MySQL message. Drop tcp stream.")
return priv
}
if complete {
mysql.messageComplete(tcptuple, dir, stream)
} else {
// wait for more data
break
}
mysqlMessageParser是一个解析mysql消息的方法,细节我们忽略,我们只需要关心它的返回,ok标示成功或者失败,true则继续处理,false表示数据包不能用,那就直接忽略;第二个参数complete表示判断这一个MySQL消息是否已经完整了,如果完整了,我们就可以扔出去了,否则继续等待剩下的消息内容。好的,我们看看SMTP协议怎么折腾吧,先看看一个邮件交互的流程图,来自RFC5321:
由上图可见,发送端和邮件服务器通过一系列命令来执行邮件的发送,下面看看一个具体的命令操作流程(来源:简单邮件传输协议)[/url]
S: 220 www.example.com ESMTP Postfix
C: HELO mydomain.com
S: 250 Hello mydomain.com
C: MAIL FROM:
S: 250 Ok
C: RCPT TO:
S: 250 Ok
C: DATA
S: 354 End data with .
C: Subject: test message
C: From:""< sender@mydomain.com>
C: To:""< friend@example.com>
C:
C: Hello,
C: This is a test.
C: Goodbye.
C: .
S: 250 Ok: queued as 12345
C: quit
S: 221 Bye
上面的过程可以看到就几个命令就能将邮件发送出去,但是其实SMTP协议比较复杂,还包括身份认证、附件、多媒体编码等等,我们今天精简一下,我们目前只关心谁给谁发了邮件,发送内容先不管,这样相比完整的SMTP协议(RFC5321),我们只需要关注以下几个命令:MAIL:开始一份邮件 mail from: xxx@xx.com
RCPT: 标识单个的邮件接收人;常在mail命令后面 可有多个rcpt to: xx@xx.com
QUIT:结束SMTP会话,不一定发送了邮件,注意
RESET:重置会话,当前传输被取消
最终希望通过Packetbeat将这些数据解析并处理成我们想要的如下JSON数据,即大功告成:
{
"timestamp":"2016-1-15 12:00:00",
"from":"medcl@example.co",
"to":["lcdem@example.co"]
}
我们还需要一个测试数据,这里有一个下载各种协议测试数据包的地方,由wireshark站点提供:https://wiki.wireshark.org/SampleCaptures/Ctrl+F找到SMTP的下载地址:smtp.pcap
用wireshark打开我们刚刚下载的smtp.pcap文件,然后再输入过滤条件:tcp.port == 25,只看25端口的数据,如下图:
上图可以看到25端口的跑的数据有很多,不过我们只关心我们需要的那几个命令就好了。
打开/~/go/src/github.com/elastic/beats/packetbeat/protos/smtp/smtp.go
定义smtpPrivateData,里面的Data是一个数组,分别是TCP两个方向的数据,SmtpMessage是解析出来的邮件信息
type smtpPrivateData struct{
Data [2]*SmtpStream
}
type SmtpStream struct {
tcptuple *common.TcpTuple
data byte
parseOffset int
isClient bool
message *SmtpMessage
}
type SmtpMessage struct {
Ts time.Time
From string
To string
}
然后参照MySQL协议,定义相应的方法,最终如下:package smtp
import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/packetbeat/config"
"github.com/elastic/beats/packetbeat/protos"
"github.com/elastic/beats/packetbeat/protos/tcp"
"bytes"
"time"
"strings"
)
type smtpPrivateData struct{
Data [2]*SmtpStream
}
type SmtpStream struct {
tcptuple *common.TcpTuple
data byte
parseOffset int
isClient bool
message *SmtpMessage
}
type SmtpMessage struct {
start int
end int
Ts time.Time
From string
To string
IgnoreMessage bool
}
type Smtp struct {
SendRequest bool
SendResponse bool
transactionTimeout time.Duration
Ports int
results publisher.Client
}
func (smtp *Smtp) initDefaults() {
smtp.SendRequest = false
smtp.SendResponse = false
smtp.transactionTimeout = protos.DefaultTransactionExpiration
}
func (smtp *Smtp) setFromConfig(config config.Smtp) error {
smtp.Ports = config.Ports
if config.SendRequest != nil {
smtp.SendRequest = *config.SendRequest
}
if config.SendResponse != nil {
smtp.SendResponse = *config.SendResponse
}
if config.TransactionTimeout != nil && *config.TransactionTimeout > 0 {
smtp.transactionTimeout = time.Duration(*config.TransactionTimeout) * time.Second
}
return nil
}
func (smtp *Smtp) GetPorts() int {
return smtp.Ports
}
func (smtp *Smtp) Init(test_mode bool, results publisher.Client) error {
smtp.initDefaults()
if !test_mode {
err := smtp.setFromConfig(config.ConfigSingleton.Protocols.Smtp)
if err != nil {
return err
}
}
smtp.results = results
return nil
}
func readLine(data byte, offset int) (bool, string, int) {
q := bytes.Index(data[offset:], byte("\r\n"))
if q == -1 {
return false, "", 0
}
return true, string(data[offset : offset+q]), offset + q + 2
}
func (smtp *Smtp) Parse(pkt *protos.Packet, tcptuple *common.TcpTuple, dir uint8, private protos.ProtocolData, ) protos.ProtocolData {
defer logp.Recover("ParseSmtp exception")
priv := smtpPrivateData{}
if private != nil {
var ok bool
priv, ok = private.(smtpPrivateData)
if !ok {
priv = smtpPrivateData{}
}
}
if priv.Data[dir] == nil {
priv.Data[dir] = &SmtpStream{
tcptuple: tcptuple,
data: pkt.Payload,
message: &SmtpMessage{Ts: pkt.Ts},
}
} else {
// concatenate bytes
priv.Data[dir].data = append(priv.Data[dir].data, pkt.Payload...)
if len(priv.Data[dir].data) > tcp.TCP_MAX_DATA_IN_STREAM {
logp.Debug("smtp", "Stream data too large, dropping TCP stream")
priv.Data[dir] = nil
return priv
}
}
stream := priv.Data[dir]
for len(stream.data) > 0 {
if stream.message == nil {
stream.message = &SmtpMessage{Ts: pkt.Ts}
}
ok, complete := stmpMessageParser(priv.Data[dir])
if !ok {
// drop this tcp stream. Will retry parsing with the next
// segment in it
priv.Data[dir] = nil
logp.Debug("smtp", "Ignore SMTP message. Drop tcp stream. Try parsing with the next segment")
return priv
}
if complete {
smtp.messageComplete(tcptuple, dir, stream)
} else {
logp.Debug("smtp","still wait message...")
// wait for more data
break
}
}
return priv
}
func (smtp *Smtp) ConnectionTimeout() time.Duration {
return smtp.transactionTimeout
}
func stmpMessageParser(s *SmtpStream) (bool, bool) {
var value string=""
for s.parseOffset < len(s.data) {
logp.Debug("smtp", "Parse message: %s", string(s.data[s.parseOffset]))
if strings.HasPrefix(string(s.data[s.parseOffset]),"MAIL" ) {
logp.Debug("smtp", "Hit MAIL command: %s", string(s.data[s.parseOffset]))
found, line, off := readLine(s.data, s.parseOffset)
if !found {
return true, false
}
value = line[1:]
logp.Debug("smtp", "value %s", value)
s.parseOffset = off
} else {
logp.Debug("smtp", "Unexpected message starting with %s", s.data[s.parseOffset:])
return false, false
}
}
return true, false
}
func handleSmtp(stmp *Smtp, m *SmtpMessage, tcptuple *common.TcpTuple,
dir uint8, raw_msg byte) {
logp.Info("smtp","handle smtp message...")
//TODO
}
// Called when the parser has identified a full message.
func (smtp *Smtp) messageComplete(tcptuple *common.TcpTuple, dir uint8, stream *SmtpStream) {
logp.Info("smtp","message completed...")
// all ok, ship it
msg := stream.data[stream.message.start:stream.message.end]
if !stream.message.IgnoreMessage {
handleSmtp(smtp, stream.message, tcptuple, dir, msg)
}
// and reset message
stream.PrepareForNewMessage()
}
func (stream *SmtpStream) PrepareForNewMessage() {
logp.Info("smtp","prepare for new message...")
stream.data = stream.data[stream.parseOffset:]
stream.parseOffset = 0
stream.isClient = false
stream.message = nil
}
func (smtp *Smtp) GapInStream(tcptuple *common.TcpTuple, dir uint8,
nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool) {
defer logp.Recover("GapInStream(smtp) exception")
if private == nil {
return private, false
}
return private, true
}
func (smtp *Smtp) ReceivedFin(tcptuple *common.TcpTuple, dir uint8,
private protos.ProtocolData) protos.ProtocolData {
logp.Info("smtp","stream closed...")
// TODO: check if we have data pending and either drop it to free
// memory or send it up the stack.
return private
}
现在切换到命令行,编译一下cd ~/go/src/github.com/elastic/beats/packetbeat
make
编译成功,一个滚烫的packetbeat可执行文件就躺在当前目录下了,运行一下先,参数-I 指定pcap文件(还记得前面下载的那个测试文件吧)./packetbeat -d "smtp" -c etc/packetbeat.yml -I ~/Downloads/smtp.pcap -e -N
运行查看控制台输出结果:➜ packetbeat git:(smtpbeat) ✗ ./packetbeat -d "smtp" -c etc/packetbeat.yml -I ~/Downloads/smtp.pcap -e -N
2016/01/15 10:12:19.058535 publish.go:191: INFO Dry run mode. All output types except the file based one are disabled.
2016/01/15 10:12:19.058570 geolite.go:24: INFO GeoIP disabled: No paths were set under output.geoip.paths
2016/01/15 10:12:19.058592 publish.go:262: INFO Publisher name: medcls-MacBook.local
2016/01/15 10:12:19.058724 beat.go:145: INFO Init Beat: packetbeat; Version: 1.0.0
2016/01/15 10:12:19.059758 beat.go:171: INFO packetbeat sucessfully setup. Start running.
2016/01/15 10:12:20.155335 smtp.go:163: DBG Parse message: 2
2016/01/15 10:12:20.155416 smtp.go:180: DBG Unexpected message starting with 250-xc90.websitewelcome.com Hello GP [122.162.143.157]
250-SIZE 52428800
250-PIPELINING
250-AUTH PLAIN LOGIN
250-STARTTLS
250 HELP
2016/01/15 10:12:22.310974 smtp.go:163: DBG Parse message: F
2016/01/15 10:12:22.311025 smtp.go:180: DBG Unexpected message starting with From: "Gurpartap Singh"
To:
Subject: SMTP
Date: Mon, 5 Oct 2009 11:36:07 +0530
Message-ID: <000301ca4581$ef9e57f0$cedb07d0$@in>
MIME-Version: 1.0
...
成功了,邮件内容都在控制台输出了,但这还不是我们要的最终结果,我需要里面的关键信息,我们继续修改smtp.go这个文件。留待下回分解。 收起阅读 »
Packetbeat协议扩展开发教程(2)
我们打开Packetbeat项目,看看里面长什么样:
现在beats项目都合并在一起了,第一级可以看到各个子项目:
/libbeat: 公共依赖;
/filebeat: 替代Logstash-forwarder,处理日志类型数据;
/packetbeat: 本文扩展重点,网络抓包;
/topbeat: 监控系统性能;
/winlogbeat: 监控windows下面的日志信息;
/vender: 依赖的第三方库;
/tests: 用于测试的pcamp抓包文件,非常有用;
/scripts: 一些用于开发和测试的Docker脚本文件;
现在重点看看/packetbeat下面目录都有些什么:
/packetbeat/main.go: 启动入口,里面没有什么逻辑;
/packetbeat/beat/: 里面就一个packetbeat.go文件,packetbeat主程序,处理配置和命令行参数,协议需要在这里进行注册;
/packetbeat/config/: 里面就一个config.go文件,定义了所有的配置相关的struct结构体,新协议需要在这里定义其配置的结构体;
/packetbeat/debian/: debian打包相关;
/packetbeat/decoder/: 解码类,网络传输层包的解码;
/packetbeat/docs/: 项目的相关文档;
/packetbeat/etc/: 示例配置文件;
/packetbeat/procs/: 获取系统内核运作状态与进程信息的工具类;
/packetbeat/protos/:自定义协议类,每个目录对应一个应用协议,我们需要在此新增我们的协议,如SMTP;
/packetbeat/sniffer/: 三种不同抓包方式的实现:pcap、af_packet、pf_ring,关于这三者的区别,请参照文档:Traffic Capturing Options;
/packetbeat/tests/: 测试相关的文件,里面有每一个协议的pcab抓包样板,还有一堆Python测试脚本;
知道项目的大概架构就知道从哪下手了,下节分解。
我们打开Packetbeat项目,看看里面长什么样:
现在beats项目都合并在一起了,第一级可以看到各个子项目:
/libbeat: 公共依赖;
/filebeat: 替代Logstash-forwarder,处理日志类型数据;
/packetbeat: 本文扩展重点,网络抓包;
/topbeat: 监控系统性能;
/winlogbeat: 监控windows下面的日志信息;
/vender: 依赖的第三方库;
/tests: 用于测试的pcamp抓包文件,非常有用;
/scripts: 一些用于开发和测试的Docker脚本文件;
现在重点看看/packetbeat下面目录都有些什么:
/packetbeat/main.go: 启动入口,里面没有什么逻辑;
/packetbeat/beat/: 里面就一个packetbeat.go文件,packetbeat主程序,处理配置和命令行参数,协议需要在这里进行注册;
/packetbeat/config/: 里面就一个config.go文件,定义了所有的配置相关的struct结构体,新协议需要在这里定义其配置的结构体;
/packetbeat/debian/: debian打包相关;
/packetbeat/decoder/: 解码类,网络传输层包的解码;
/packetbeat/docs/: 项目的相关文档;
/packetbeat/etc/: 示例配置文件;
/packetbeat/procs/: 获取系统内核运作状态与进程信息的工具类;
/packetbeat/protos/:自定义协议类,每个目录对应一个应用协议,我们需要在此新增我们的协议,如SMTP;
/packetbeat/sniffer/: 三种不同抓包方式的实现:pcap、af_packet、pf_ring,关于这三者的区别,请参照文档:Traffic Capturing Options;
/packetbeat/tests/: 测试相关的文件,里面有每一个协议的pcab抓包样板,还有一堆Python测试脚本;
知道项目的大概架构就知道从哪下手了,下节分解。 收起阅读 »
一个把数据从MySQL同步到Elasticsearch的工具
这个工具用python实现,主要使用了mysqldump输出xml进行初次同步,以及binlog进行增量同步,欢迎试用以及提出修改意见。
最近刚刚更新了中文文档。
这个工具用python实现,主要使用了mysqldump输出xml进行初次同步,以及binlog进行增量同步,欢迎试用以及提出修改意见。
最近刚刚更新了中文文档。 收起阅读 »
社区福利:Elastic-playground
Kibana:
https://6e0ccaba29cd55a7f07f83 ... ibana
用户名/密码:elasticsearch-cn
集群名:"e064eb",使用Java客户端的时候需要,如何连接,参考:http://elasticsearch.cn/article/46
HTTP http://e064eb4b0aa993db28ad513 ... :9200
HTTPS https://e064eb4b0aa993db28ad51 ... :9243
curl -u elasticsearch-cn:elasticsearch-cn http://e064eb4b0aa993db28ad513 ... 9200/
{
"name" : "instance-0000000009",
"cluster_name" : "e064eb4b0aa993db28ad513e4d2df5e3",
"version" : {
"number" : "2.1.1",
"build_hash" : "40e2c53a6b6c2972b3d13846e450e66f4375bd71",
"build_timestamp" : "2015-12-15T13:05:55Z",
"build_snapshot" : false,
"lucene_version" : "5.3.1"
},
"tagline" : "You Know, for Search"
}
内置常用插件,有其他插件要安装的请留言。
Kibana:
https://6e0ccaba29cd55a7f07f83 ... ibana
用户名/密码:elasticsearch-cn
集群名:"e064eb",使用Java客户端的时候需要,如何连接,参考:http://elasticsearch.cn/article/46
HTTP http://e064eb4b0aa993db28ad513 ... :9200
HTTPS https://e064eb4b0aa993db28ad51 ... :9243
curl -u elasticsearch-cn:elasticsearch-cn http://e064eb4b0aa993db28ad513 ... 9200/
{
"name" : "instance-0000000009",
"cluster_name" : "e064eb4b0aa993db28ad513e4d2df5e3",
"version" : {
"number" : "2.1.1",
"build_hash" : "40e2c53a6b6c2972b3d13846e450e66f4375bd71",
"build_timestamp" : "2015-12-15T13:05:55Z",
"build_snapshot" : false,
"lucene_version" : "5.3.1"
},
"tagline" : "You Know, for Search"
}
内置常用插件,有其他插件要安装的请留言。 收起阅读 »
elasticsearch-analysis-ik和elasticsearch-analysis-mmseg更新至1.7.0
https://github.com/medcl/elasticsearch-analysis-ik
elasticsearch-analysis-mmseg:
https://github.com/medcl/elast ... -mseg
主要更新配置文件存放路径,之前版本的配置文件存放在elasticsearch的config目录,现在都修改为插件的相对目录了,主要是简化部署,现在可在Found(https://found.elastic.co)部署了。
https://github.com/medcl/elasticsearch-analysis-ik
elasticsearch-analysis-mmseg:
https://github.com/medcl/elast ... -mseg
主要更新配置文件存放路径,之前版本的配置文件存放在elasticsearch的config目录,现在都修改为插件的相对目录了,主要是简化部署,现在可在Found(https://found.elastic.co)部署了。 收起阅读 »
通过elasticsearch-mapper attachment插件实现文件建立索引
bin/plugin install elasticsearch/elasticsearch-mapper-attachments/3.1.1
2.按照插件官方文档来测试
3.插件需要手动把文档内容转化为base64编码然后建立索引,代码如下
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.tika.Tika;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.parser.AutoDetectParser;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.parser.Parser;
import org.apache.tika.parser.pdf.PDFParser;
import org.apache.tika.sax.BodyContentHandler;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.xml.sax.ContentHandler;
import com.spatial4j.core.io.ParseUtils;
import static org.elasticsearch.common.xcontent.XContentFactory.*;
public class sysfiles {
public static void main(String[] args) throws Exception{
sys();
}
private static void sys() throws IOException {
// TODO Auto-generated method stub
String idxName = "test";
String idxType = "attachments";
Settings settings =ImmutableSettings.settingsBuilder().put("cluster.name","az_bsms_elasticsearch").build();
Client client=new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress("127.0.0.1", 9300));
String data64=org.elasticsearch.common.Base64.encodeFromFile(filepath);
XContentBuilder source = jsonBuilder().startObject()
.field("file", data64)
.field("text", data64)
.endObject();
String id = "file"+11;
IndexResponse idxResp = client.prepareIndex().setIndex(idxName).setType(idxType).setId(id)
.setSource(source).setRefresh(true).execute().actionGet();
System.out.println(idxResp);
client.close();
}
4.按官方文档正常的搜索就可以了
bin/plugin install elasticsearch/elasticsearch-mapper-attachments/3.1.1
2.按照插件官方文档来测试
3.插件需要手动把文档内容转化为base64编码然后建立索引,代码如下
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.tika.Tika;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.parser.AutoDetectParser;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.parser.Parser;
import org.apache.tika.parser.pdf.PDFParser;
import org.apache.tika.sax.BodyContentHandler;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.xml.sax.ContentHandler;
import com.spatial4j.core.io.ParseUtils;
import static org.elasticsearch.common.xcontent.XContentFactory.*;
public class sysfiles {
public static void main(String[] args) throws Exception{
sys();
}
private static void sys() throws IOException {
// TODO Auto-generated method stub
String idxName = "test";
String idxType = "attachments";
Settings settings =ImmutableSettings.settingsBuilder().put("cluster.name","az_bsms_elasticsearch").build();
Client client=new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress("127.0.0.1", 9300));
String data64=org.elasticsearch.common.Base64.encodeFromFile(filepath);
XContentBuilder source = jsonBuilder().startObject()
.field("file", data64)
.field("text", data64)
.endObject();
String id = "file"+11;
IndexResponse idxResp = client.prepareIndex().setIndex(idxName).setType(idxType).setId(id)
.setSource(source).setRefresh(true).execute().actionGet();
System.out.println(idxResp);
client.close();
}
4.按官方文档正常的搜索就可以了 收起阅读 »
Packetbeat协议扩展开发教程(1)
是一个开源的网络抓包与分析框架,内置了很多常见的协议解析,如HTPP、MySQL、Thrift等。但是网络协议有很多,如何扩展一个自己的协议呢,本文将为您介绍如何在Packetbeat基础上扩展实现您自己的协议。
开发环境:
1.Go语言
Packetbeat是由Go语言编写,具有高性能和易部署的特点,有关Go语言的更多信息请访问:https://golang.org/。
2.Git
源码管理,相信大家都比较熟悉了。
3.Tcpdump
*nix下的抓包分析,可选,用于调试。
4.Mac本一台
Windows太伤,不建议。
5.IDE
推荐idea,其它只要你顺手都行。
这个教程给大家介绍的是编写一个SMTP协议的扩展,SMTP就是我们发邮件使用的协议,加密的比较麻烦,为了方便,本教程使用不加密的名文传输的SMTP协议,默认对应端口是25。
A.源码签出
登陆Github打开https://github.com/elastic/beats
fork后得到你自己的仓库,比如我的:https://github.com/medcl/packetbeat
#创建相应目录
mkdir -p $GOPATH/src/github.com/elastic/
cd $GOPATH/src/github.com/elastic
#签出源码
git clone https://github.com/elastic/beats.git
cd beats
#修改官方仓库为upstream源,设置自己的仓库为origin源
git remote rename origin upstream
git remote add origin git@github.com:medcl/packetbeat.git
#获取上游最新的代码,如果是刚fork的话可不用管
git pull upstream master
#签出一个名为smtpbeat的分支,用于开发这个功能
git checkout -b smtpbeat
#切换到packetbeat模块
cd packetbeat
#获取依赖信息
(mkdir -p $GOPATH/src/golang.org/x/&&cd $GOPATH/src/golang.org/x &&git clone https://github.com/golang/tools.git )
go get github.com/tools/godep
#编译
make
编译出来的文件:packetbeat就在根目录
现在我们测试一下
修改etc/packetbeat.yml,在output下面的elasticsearch下面添加enabled: true,默认是不启用的,另外如果你的Elasticsearch安装了Shield,比如我的Elasticsearch的用户名和密码都是tribe_user,哦,忘了说了,我们的Elasticsearch跑在本机。
packetbeat.yml的详细配置可参见:https://www.elastic.co/guide/e ... .html
output:
elasticsearch:
enabled: true
hosts: ["localhost:9200"]
username: "tribe_user"
password: "tribe_user"
现在可以运行命令启动packetbeat了,默认会监听所有内置的协议,如HTTP、DNS等。
./packetbeat -e -c etc/packetbeat.yml -d "publish"
介绍一下常用的参数:
-N dry run模式,不实际output存储日志
-e 控制台输出调试日志
-d 仅显示对应logger的日志
好的,我们打开几个网页,控制台会有相应的输出,如下:
2015/12/29 14:24:39.965037 preprocess.go:37: DBG Start Preprocessing
2015/12/29 14:24:39.965366 publish.go:98: DBG Publish: {
"@timestamp": "2015-12-29T14:24:39.709Z",
"beat": {
"hostname": "medcls-MacBook.local",
"name": "medcls-MacBook.local"
},
"bytes_in": 31,
"bytes_out": 115,
"client_ip": "192.168.3.10",
"client_port": 53669,
"client_proc": "",
"client_server": "",
"count": 1,
"direction": "out",
"dns": {
"additionals_count": 0,
"answers": [
{
"class": "IN",
"data": "www.a.shifen.com",
"name": "sp2.baidu.com",
"ttl": 333,
"type": "CNAME"
}
],
"answers_count": 1,
"authorities": [
{
"class": "IN",
"data": "ns1.a.shifen.com",
"expire": 86400,
"minimum": 3600,
"name": "a.shifen.com",
"refresh": 5,
"retry": 5,
"rname": "baidu_dns_master.baidu.com",
"serial": 1512240003,
"ttl": 12,
"type": "SOA"
}
],
"authorities_count": 1,
"flags": {
"authoritative": false,
"recursion_allowed": true,
"recursion_desired": true,
"truncated_response": false
},
"id": 7435,
"op_code": "QUERY",
"question": {
"class": "IN",
"name": "sp2.baidu.com",
"type": "AAAA"
},
"response_code": "NOERROR"
},
"ip": "192.168.3.1",
"method": "QUERY",
"port": 53,
"proc": "",
"query": "class IN, type AAAA, sp2.baidu.com",
"resource": "sp2.baidu.com",
"responsetime": 18,
"server": "",
"status": "OK",
"transport": "udp",
"type": "dns"
}
2015/12/29 14:24:39.965774 preprocess.go:94: DBG Forward preprocessed events
2015/12/29 14:24:39.965796 async.go:42: DBG async forward to outputers (1)
2015/12/29 14:24:40.099973 output.go:103: DBG output worker: publish 2 events
然后Elasticsearch应该就会有数据进去了,我们看看:
curl http://localhost:9200/_cat/indices\?pretty\=true -u tribe_user:tribe_user
yellow open packetbeat-2015.12.29 5 1 135 0 561.2kb 561.2kb
至此,packetbeat源码的build成功,我们整个开发流程已经跑通了,下一节正式开始介绍SMTP协议的扩展。
是一个开源的网络抓包与分析框架,内置了很多常见的协议解析,如HTPP、MySQL、Thrift等。但是网络协议有很多,如何扩展一个自己的协议呢,本文将为您介绍如何在Packetbeat基础上扩展实现您自己的协议。
开发环境:
1.Go语言
Packetbeat是由Go语言编写,具有高性能和易部署的特点,有关Go语言的更多信息请访问:https://golang.org/。
2.Git
源码管理,相信大家都比较熟悉了。
3.Tcpdump
*nix下的抓包分析,可选,用于调试。
4.Mac本一台
Windows太伤,不建议。
5.IDE
推荐idea,其它只要你顺手都行。
这个教程给大家介绍的是编写一个SMTP协议的扩展,SMTP就是我们发邮件使用的协议,加密的比较麻烦,为了方便,本教程使用不加密的名文传输的SMTP协议,默认对应端口是25。
A.源码签出
登陆Github打开https://github.com/elastic/beats
fork后得到你自己的仓库,比如我的:https://github.com/medcl/packetbeat
#创建相应目录
mkdir -p $GOPATH/src/github.com/elastic/
cd $GOPATH/src/github.com/elastic
#签出源码
git clone https://github.com/elastic/beats.git
cd beats
#修改官方仓库为upstream源,设置自己的仓库为origin源
git remote rename origin upstream
git remote add origin git@github.com:medcl/packetbeat.git
#获取上游最新的代码,如果是刚fork的话可不用管
git pull upstream master
#签出一个名为smtpbeat的分支,用于开发这个功能
git checkout -b smtpbeat
#切换到packetbeat模块
cd packetbeat
#获取依赖信息
(mkdir -p $GOPATH/src/golang.org/x/&&cd $GOPATH/src/golang.org/x &&git clone https://github.com/golang/tools.git )
go get github.com/tools/godep
#编译
make
编译出来的文件:packetbeat就在根目录
现在我们测试一下
修改etc/packetbeat.yml,在output下面的elasticsearch下面添加enabled: true,默认是不启用的,另外如果你的Elasticsearch安装了Shield,比如我的Elasticsearch的用户名和密码都是tribe_user,哦,忘了说了,我们的Elasticsearch跑在本机。
packetbeat.yml的详细配置可参见:https://www.elastic.co/guide/e ... .html
output:
elasticsearch:
enabled: true
hosts: ["localhost:9200"]
username: "tribe_user"
password: "tribe_user"
现在可以运行命令启动packetbeat了,默认会监听所有内置的协议,如HTTP、DNS等。
./packetbeat -e -c etc/packetbeat.yml -d "publish"
介绍一下常用的参数:
-N dry run模式,不实际output存储日志
-e 控制台输出调试日志
-d 仅显示对应logger的日志
好的,我们打开几个网页,控制台会有相应的输出,如下:
2015/12/29 14:24:39.965037 preprocess.go:37: DBG Start Preprocessing
2015/12/29 14:24:39.965366 publish.go:98: DBG Publish: {
"@timestamp": "2015-12-29T14:24:39.709Z",
"beat": {
"hostname": "medcls-MacBook.local",
"name": "medcls-MacBook.local"
},
"bytes_in": 31,
"bytes_out": 115,
"client_ip": "192.168.3.10",
"client_port": 53669,
"client_proc": "",
"client_server": "",
"count": 1,
"direction": "out",
"dns": {
"additionals_count": 0,
"answers": [
{
"class": "IN",
"data": "www.a.shifen.com",
"name": "sp2.baidu.com",
"ttl": 333,
"type": "CNAME"
}
],
"answers_count": 1,
"authorities": [
{
"class": "IN",
"data": "ns1.a.shifen.com",
"expire": 86400,
"minimum": 3600,
"name": "a.shifen.com",
"refresh": 5,
"retry": 5,
"rname": "baidu_dns_master.baidu.com",
"serial": 1512240003,
"ttl": 12,
"type": "SOA"
}
],
"authorities_count": 1,
"flags": {
"authoritative": false,
"recursion_allowed": true,
"recursion_desired": true,
"truncated_response": false
},
"id": 7435,
"op_code": "QUERY",
"question": {
"class": "IN",
"name": "sp2.baidu.com",
"type": "AAAA"
},
"response_code": "NOERROR"
},
"ip": "192.168.3.1",
"method": "QUERY",
"port": 53,
"proc": "",
"query": "class IN, type AAAA, sp2.baidu.com",
"resource": "sp2.baidu.com",
"responsetime": 18,
"server": "",
"status": "OK",
"transport": "udp",
"type": "dns"
}
2015/12/29 14:24:39.965774 preprocess.go:94: DBG Forward preprocessed events
2015/12/29 14:24:39.965796 async.go:42: DBG async forward to outputers (1)
2015/12/29 14:24:40.099973 output.go:103: DBG output worker: publish 2 events
然后Elasticsearch应该就会有数据进去了,我们看看:
curl http://localhost:9200/_cat/indices\?pretty\=true -u tribe_user:tribe_user
yellow open packetbeat-2015.12.29 5 1 135 0 561.2kb 561.2kb
至此,packetbeat源码的build成功,我们整个开发流程已经跑通了,下一节正式开始介绍SMTP协议的扩展。 收起阅读 »
关于提示TooManyClauses[maxClauseCount is set to 1024]的问题。
通过数据库获取到了1126个条件数据,然后叠加进bool进行查询,直接抛出个异常:
TooManyClauses[maxClauseCount is set to 1024]
问了Medcl大神,得知是超过默认搜索条件大小的问题,可以通过参数修改
index.query.bool.max_clause_count: 4096
M大也说,太BT了。。。 这么多条件查询。。。我也觉得挺BT的,自己想想都有点小激动,太佩服自己了。。。
通过数据库获取到了1126个条件数据,然后叠加进bool进行查询,直接抛出个异常:
TooManyClauses[maxClauseCount is set to 1024]
问了Medcl大神,得知是超过默认搜索条件大小的问题,可以通过参数修改
index.query.bool.max_clause_count: 4096
M大也说,太BT了。。。 这么多条件查询。。。我也觉得挺BT的,自己想想都有点小激动,太佩服自己了。。。 收起阅读 »
Day24: Elasticsearch添加Shield后TransportClient如何连接?
Elasticsearch使用了Shield后,Elasticsearch就需要权限才能访问了,和默认的调用方式有些不同,下面简单介绍一下HTTP和TCP两种方式的连接.
关于Shield的安装和配置我这里不就具体介绍,创建了一个用户名和密码都是tribe_user的用户,权限是admin.
1.HTTP方式
现在直接访问es的http接口就会报错
curl http://localhost:9200
{"error":{"root_cause":[{"type":"security_exception","reason":"missing authentication token for REST request [/]","header":{"WWW-Authenticate":"Basic realm=\"shield\""}}],"type":"security_exception","reason":"missing authentication token for REST request [/]","header":{"WWW-Authenticate":"Basic realm=\"shield\""}},"status":401}
shield支持HttpBasic验证,所以正确的访问姿势是:
curl -u tribe_user:tribe_user http://localhost:9200 { "name" : "Melter", "cluster_name" : "elasticsearch", "version" : { "number" : "2.1.1", "build_hash" : "805c528f3167980046f224310f9147fa745e5371", "build_timestamp" : "2015-12-09T20:23:16Z", "build_snapshot" : false, "lucene_version" : "5.3.1" }, "tagline" : "You Know, for Search" }
如果是浏览器访问的话,第一次访问会弹出验证窗口,后续只要不关闭这个浏览器保持这个session就能一直访问.
注意http basic是不安全的认证方式,仅供开发调试使用,生产环境还需要结合HTTPS的加密通道使用.
2.TransportClient方式的访问Shield加防的Elasticsearch,稍微麻烦点,需要依赖Shield的包,步骤如下:
2.1 如果你是maven管理的项目,在pom.xml文件里添加Elasticsearch的maven仓库源,如下:
<repositories>
<repository>
<id>elasticsearch-releases</id>
<url>https://maven.elasticsearch.or ... gt%3B
<releases> <enabled>true</enabled> </releases>
<snapshots> <enabled>false</enabled> </snapshots>
</repository>
</repositories>
2.2 添加依赖的配置
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>shield</artifactId>
<version>2.1.1</version>
</dependency
2.3 构建TransportClient的地方增加访问用户的配置
import org.elasticsearch.shield.ShieldPlugin; import org.elasticsearch.shield.authc.support.SecuredString; import static org.elasticsearch.shield.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
String clusterName="elasticsearch"; String ip= "127.0.0.1";
Settings settings = Settings.settingsBuilder()
.put("cluster.name", clusterName)
.put("shield.user", "tribe_user:tribe_user")
.build();
try { client = TransportClient.builder()
.addPlugin(ShieldPlugin.class)
.settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ip),9300));
String token = basicAuthHeaderValue("tribe_user", new SecuredString("tribe_user".toCharArray())); client.prepareSearch()
.putHeader("Authorization", token).get(); }
catch (UnknownHostException e)
{ logger.error("es",e); }
现在的编辑器贴代码有点恶心,可以看这里:
http://log.medcl.net/item/2015 ... -1252
Elasticsearch使用了Shield后,Elasticsearch就需要权限才能访问了,和默认的调用方式有些不同,下面简单介绍一下HTTP和TCP两种方式的连接.
关于Shield的安装和配置我这里不就具体介绍,创建了一个用户名和密码都是tribe_user的用户,权限是admin.
1.HTTP方式
现在直接访问es的http接口就会报错
curl http://localhost:9200
{"error":{"root_cause":[{"type":"security_exception","reason":"missing authentication token for REST request [/]","header":{"WWW-Authenticate":"Basic realm=\"shield\""}}],"type":"security_exception","reason":"missing authentication token for REST request [/]","header":{"WWW-Authenticate":"Basic realm=\"shield\""}},"status":401}
shield支持HttpBasic验证,所以正确的访问姿势是:
curl -u tribe_user:tribe_user http://localhost:9200 { "name" : "Melter", "cluster_name" : "elasticsearch", "version" : { "number" : "2.1.1", "build_hash" : "805c528f3167980046f224310f9147fa745e5371", "build_timestamp" : "2015-12-09T20:23:16Z", "build_snapshot" : false, "lucene_version" : "5.3.1" }, "tagline" : "You Know, for Search" }
如果是浏览器访问的话,第一次访问会弹出验证窗口,后续只要不关闭这个浏览器保持这个session就能一直访问.
注意http basic是不安全的认证方式,仅供开发调试使用,生产环境还需要结合HTTPS的加密通道使用.
2.TransportClient方式的访问Shield加防的Elasticsearch,稍微麻烦点,需要依赖Shield的包,步骤如下:
2.1 如果你是maven管理的项目,在pom.xml文件里添加Elasticsearch的maven仓库源,如下:
<repositories>
<repository>
<id>elasticsearch-releases</id>
<url>https://maven.elasticsearch.or ... gt%3B
<releases> <enabled>true</enabled> </releases>
<snapshots> <enabled>false</enabled> </snapshots>
</repository>
</repositories>
2.2 添加依赖的配置
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>shield</artifactId>
<version>2.1.1</version>
</dependency
2.3 构建TransportClient的地方增加访问用户的配置
import org.elasticsearch.shield.ShieldPlugin; import org.elasticsearch.shield.authc.support.SecuredString; import static org.elasticsearch.shield.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
String clusterName="elasticsearch"; String ip= "127.0.0.1";
Settings settings = Settings.settingsBuilder()
.put("cluster.name", clusterName)
.put("shield.user", "tribe_user:tribe_user")
.build();
try { client = TransportClient.builder()
.addPlugin(ShieldPlugin.class)
.settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ip),9300));
String token = basicAuthHeaderValue("tribe_user", new SecuredString("tribe_user".toCharArray())); client.prepareSearch()
.putHeader("Authorization", token).get(); }
catch (UnknownHostException e)
{ logger.error("es",e); }
现在的编辑器贴代码有点恶心,可以看这里:
http://log.medcl.net/item/2015 ... -1252 收起阅读 »
Day 23 谈谈ES 的Recovery
Recovery是指将一个索引的未分配shard分配到一个结点的过程。 在快照恢复,更改索引复制片数量,结点故障或者结点启动时发生。由于master持有整个集群的状态信息,因此可以判断出哪些shard需要做再分配,以及分配到哪个结点。例如:
- 如果某个shard主片在,副片所在结点挂了,那么选择另外一个可用结点,将副片分配(allocate)上去,然后进行主从片的复制。
- 如果某个shard的主片所在结点挂了,副片还在,那么将副片升级为主片,然后做主副复制。
- 如果某个shard的主副片所在结点都挂了,则暂时无法恢复,等待持有相关数据的结点重新加入集群后,从结点上恢复主分片,再选择某个结点分配复制片,并从主分片同步数据。
通过CAT health API,我们可以查看集群的状态,从而获知数据的完整性情况:
可能的状态及含义:
Green: 所有的shard主副片都完好的
Yellow: 所有shard的主片都完好,部分副片没有了,数据完整性依然完好。
Red: 某些shard的主副片都没有了,对应的索引数据不完整
Recovery过程要消耗额外的资源,CPU、内存、结点之间的网络带宽等等。 这些额外的资源消耗,有可能会导致集群的服务能力降级,或者一部分功能暂时不可用。了解一些Recovery的过程和相关的配置参数,对于减小recovery带来的资源消耗,加快集群恢复过程都是很有帮助的。
减少集群Full Restart造成的数据来回拷贝
集群可能会有整体重启的需要,比如需要升级硬件、升级操作系统或者升级ES大版本。重启所有结点可能带来的一个问题: 某些结点可能先于其他结点加入集群。 先加入集群的结点可能已经可以选举好master,并立即启动了recovery的过程,由于这个时候整个集群数据还不完整,master会指示一些结点之间相互开始复制数据。 那些晚到的结点,一旦发现本地的数据已经被复制到其他结点,则直接删除掉本地“失效”的数据。 当整个集群恢复完毕后,数据分布不均衡显然是不均衡的,master会触发rebalance过程,将数据在结点之间挪动。整个过程无谓消耗了大量的网络流量。 合理设置recovery相关参数则可以防范这种问题的发生。
gateway.expected_nodes
gateway.expected_master_nodes
gateway.expected_data_nodes
以上三个参数是说集群里一旦有多少个结点就立即开始recovery过程。 不同之处在于,第一个参数指的是master或者data都算在内,而后面两个参数则分指master和data node。
在期待的节点数条件满足之前, recovery过程会等待gateway.recover_after_time (默认5分钟) 这么长时间,一旦等待超时,则会根据以下条件判断是否启动:
gateway.recover_after_nodes
gateway.recover_after_master_nodes
gateway.recover_after_data_nodes
举例来说,对于一个有10个data node的集群,如果有以下的设置:
gateway.expected_data_nodes: 10
gateway.recover_after_time: 5m
gateway.recover_after_data_nodes: 8
那么集群5分钟以内10个data node都加入了,或者5分钟以后8个以上的data node加入了,都会立即启动recovery过程。
减少主副本之间的数据复制
如果不是full restart,而是重启单个data node,仍然会造成数据在不同结点之间来回复制。为避免这个问题,可以在重启之前,先关闭集群的shard allocation:
然后在结点重启完成加入集群后,再重新打开:
这样在结点重启完成后,尽量多的从本地直接恢复数据。
但是在ES1.6版本之前,即使做了以上措施,仍然会发现有大量主副本之间的数据拷贝。从表面去看,这点很让人不能理解。 主副本数据完全一致,ES应该直接从副本本地恢复数据就好了,为什么要重新从主片再复制一遍呢? 原因在于Recovery是简单对比主副本的segment file来判断哪些数据一致可以本地恢复,哪些不一致需要远端拷贝的。而不同结点的segment merge是完全独立运行的,可能导致主副本merge的深度不完全一样,从而造成即使文档集完全一样,产生的segment file却不完全一样。
为了解决这个问题,ES1.6版本以后加入了synced flush的新特性。 对于5分钟没有更新过的shard,会自动synced flush一下,实质是为对应的shard加了一个synced flush ID。这样当重启结点的时候,先对比一下shard的synced flush ID,就可以知道两个shard是否完全相同,避免了不必要的segment file拷贝,极大加快了冷索引的恢复速度。
需要注意的是synced flush只对冷索引有效,对于热索引(5分钟内有更新的索引)没有作用。 如果重启的结点包含有热索引,那么还是免不了大量的文件拷贝。因此在重启一个结点之前,最好按照以下步骤执行,recovery几乎可以瞬间完成:
- 暂停数据写入程序
- 关闭集群shard allocation
- 手动执行POST /_flush/synced
- 重启结点
- 重新开启集群shard allocation
- 等待recovery完成,集群health status变成green
- 重新开启数据写入程序
(特别大的)热索引为何恢复慢
对于冷索引,由于数据不再更新,利用synced flush特性,可以快速直接从本地恢复数据。 而对于热索引,特别是shard很大的热索引,除了synced flush派不上用场需要大量跨结点拷贝segment file以外,translog recovery是导致慢的更重要的原因。
从主片恢复数据到副片需要经历3个阶段:
- 对主片上的segment file做一个快照,然后拷贝到复制片分配到的结点。数据拷贝期间,不会阻塞索引请求,新增索引操作记录到translog里。
- 对translog做一个快照,此快照包含第一阶段新增的索引请求,然后重放快照里的索引操作。此阶段仍然不阻塞索引请求,新增索引操作记录到translog里。
- 为了能达到主副片完全同步,阻塞掉新索引请求,然后重放阶段二新增的translog操作。
可见,在recovery完成之前,translog是不能够被清除掉的(禁用掉正常运作期间后台的flush操作)。如果shard比较大,第一阶段耗时很长,会导致此阶段产生的translog很大。重放translog比起简单的文件拷贝耗时要长得多,因此第二阶段的translog耗时也会显著增加。等到第三阶段,需要重放的translog可能会比第二阶段还要多。 而第三阶段是会阻塞新索引写入的,在对写入实时性要求很高的场合,就会非常影响用户体验。 因此,要加快大的热索引恢复速度,最好的方式是遵从上一节提到的方法: 暂停新数据写入,手动sync flush,等待数据恢复完成后,重新开启数据写入,这样可以将数据延迟影响可以降到最低。
万一遇到Recovery慢,想知道进度怎么办呢? CAT Recovery API可以显示详细的recovery各个阶段的状态。 这个API怎么用就不在这里赘述了,参考: CAT Recovery
其他Recovery相关的专家级设置
还有其他一些专家级的设置(参见: recovery)可以影响recovery的速度,但提升速度的代价是更多的资源消耗,因此在生产集群上调整这些参数需要结合实际情况谨慎调整,一旦影响应用要立即调整回来。 对于搜索并发量要求高,延迟要求低的场合,默认设置一般就不要去动了。 对于日志实时分析类对于搜索延迟要求不高,但对于数据写入延迟期望比较低的场合,可以适当调大indices.recovery.max_bytes_per_sec,提升recovery速度,减少数据写入被阻塞的时长。
最后要说的一点是ES的版本迭代很快,对于Recovery的机制也在不断的优化中。 其中有一些版本甚至引入了一些bug,比如在ES1.4.x有严重的translog recovery bug,导致大的索引trans log recovery几乎无法完成 (issue #9226) 。因此实际使用中如果遇到问题,最好在Github的issue list里搜索一下,看是否使用的版本有其他人反映同样的问题。
Recovery是指将一个索引的未分配shard分配到一个结点的过程。 在快照恢复,更改索引复制片数量,结点故障或者结点启动时发生。由于master持有整个集群的状态信息,因此可以判断出哪些shard需要做再分配,以及分配到哪个结点。例如:
- 如果某个shard主片在,副片所在结点挂了,那么选择另外一个可用结点,将副片分配(allocate)上去,然后进行主从片的复制。
- 如果某个shard的主片所在结点挂了,副片还在,那么将副片升级为主片,然后做主副复制。
- 如果某个shard的主副片所在结点都挂了,则暂时无法恢复,等待持有相关数据的结点重新加入集群后,从结点上恢复主分片,再选择某个结点分配复制片,并从主分片同步数据。
通过CAT health API,我们可以查看集群的状态,从而获知数据的完整性情况:
可能的状态及含义:
Green: 所有的shard主副片都完好的
Yellow: 所有shard的主片都完好,部分副片没有了,数据完整性依然完好。
Red: 某些shard的主副片都没有了,对应的索引数据不完整
Recovery过程要消耗额外的资源,CPU、内存、结点之间的网络带宽等等。 这些额外的资源消耗,有可能会导致集群的服务能力降级,或者一部分功能暂时不可用。了解一些Recovery的过程和相关的配置参数,对于减小recovery带来的资源消耗,加快集群恢复过程都是很有帮助的。
减少集群Full Restart造成的数据来回拷贝
集群可能会有整体重启的需要,比如需要升级硬件、升级操作系统或者升级ES大版本。重启所有结点可能带来的一个问题: 某些结点可能先于其他结点加入集群。 先加入集群的结点可能已经可以选举好master,并立即启动了recovery的过程,由于这个时候整个集群数据还不完整,master会指示一些结点之间相互开始复制数据。 那些晚到的结点,一旦发现本地的数据已经被复制到其他结点,则直接删除掉本地“失效”的数据。 当整个集群恢复完毕后,数据分布不均衡显然是不均衡的,master会触发rebalance过程,将数据在结点之间挪动。整个过程无谓消耗了大量的网络流量。 合理设置recovery相关参数则可以防范这种问题的发生。
gateway.expected_nodes
gateway.expected_master_nodes
gateway.expected_data_nodes
以上三个参数是说集群里一旦有多少个结点就立即开始recovery过程。 不同之处在于,第一个参数指的是master或者data都算在内,而后面两个参数则分指master和data node。
在期待的节点数条件满足之前, recovery过程会等待gateway.recover_after_time (默认5分钟) 这么长时间,一旦等待超时,则会根据以下条件判断是否启动:
gateway.recover_after_nodes
gateway.recover_after_master_nodes
gateway.recover_after_data_nodes
举例来说,对于一个有10个data node的集群,如果有以下的设置:
gateway.expected_data_nodes: 10
gateway.recover_after_time: 5m
gateway.recover_after_data_nodes: 8
那么集群5分钟以内10个data node都加入了,或者5分钟以后8个以上的data node加入了,都会立即启动recovery过程。
减少主副本之间的数据复制
如果不是full restart,而是重启单个data node,仍然会造成数据在不同结点之间来回复制。为避免这个问题,可以在重启之前,先关闭集群的shard allocation:
然后在结点重启完成加入集群后,再重新打开:
这样在结点重启完成后,尽量多的从本地直接恢复数据。
但是在ES1.6版本之前,即使做了以上措施,仍然会发现有大量主副本之间的数据拷贝。从表面去看,这点很让人不能理解。 主副本数据完全一致,ES应该直接从副本本地恢复数据就好了,为什么要重新从主片再复制一遍呢? 原因在于Recovery是简单对比主副本的segment file来判断哪些数据一致可以本地恢复,哪些不一致需要远端拷贝的。而不同结点的segment merge是完全独立运行的,可能导致主副本merge的深度不完全一样,从而造成即使文档集完全一样,产生的segment file却不完全一样。
为了解决这个问题,ES1.6版本以后加入了synced flush的新特性。 对于5分钟没有更新过的shard,会自动synced flush一下,实质是为对应的shard加了一个synced flush ID。这样当重启结点的时候,先对比一下shard的synced flush ID,就可以知道两个shard是否完全相同,避免了不必要的segment file拷贝,极大加快了冷索引的恢复速度。
需要注意的是synced flush只对冷索引有效,对于热索引(5分钟内有更新的索引)没有作用。 如果重启的结点包含有热索引,那么还是免不了大量的文件拷贝。因此在重启一个结点之前,最好按照以下步骤执行,recovery几乎可以瞬间完成:
- 暂停数据写入程序
- 关闭集群shard allocation
- 手动执行POST /_flush/synced
- 重启结点
- 重新开启集群shard allocation
- 等待recovery完成,集群health status变成green
- 重新开启数据写入程序
(特别大的)热索引为何恢复慢
对于冷索引,由于数据不再更新,利用synced flush特性,可以快速直接从本地恢复数据。 而对于热索引,特别是shard很大的热索引,除了synced flush派不上用场需要大量跨结点拷贝segment file以外,translog recovery是导致慢的更重要的原因。
从主片恢复数据到副片需要经历3个阶段:
- 对主片上的segment file做一个快照,然后拷贝到复制片分配到的结点。数据拷贝期间,不会阻塞索引请求,新增索引操作记录到translog里。
- 对translog做一个快照,此快照包含第一阶段新增的索引请求,然后重放快照里的索引操作。此阶段仍然不阻塞索引请求,新增索引操作记录到translog里。
- 为了能达到主副片完全同步,阻塞掉新索引请求,然后重放阶段二新增的translog操作。
可见,在recovery完成之前,translog是不能够被清除掉的(禁用掉正常运作期间后台的flush操作)。如果shard比较大,第一阶段耗时很长,会导致此阶段产生的translog很大。重放translog比起简单的文件拷贝耗时要长得多,因此第二阶段的translog耗时也会显著增加。等到第三阶段,需要重放的translog可能会比第二阶段还要多。 而第三阶段是会阻塞新索引写入的,在对写入实时性要求很高的场合,就会非常影响用户体验。 因此,要加快大的热索引恢复速度,最好的方式是遵从上一节提到的方法: 暂停新数据写入,手动sync flush,等待数据恢复完成后,重新开启数据写入,这样可以将数据延迟影响可以降到最低。
万一遇到Recovery慢,想知道进度怎么办呢? CAT Recovery API可以显示详细的recovery各个阶段的状态。 这个API怎么用就不在这里赘述了,参考: CAT Recovery
其他Recovery相关的专家级设置
还有其他一些专家级的设置(参见: recovery)可以影响recovery的速度,但提升速度的代价是更多的资源消耗,因此在生产集群上调整这些参数需要结合实际情况谨慎调整,一旦影响应用要立即调整回来。 对于搜索并发量要求高,延迟要求低的场合,默认设置一般就不要去动了。 对于日志实时分析类对于搜索延迟要求不高,但对于数据写入延迟期望比较低的场合,可以适当调大indices.recovery.max_bytes_per_sec,提升recovery速度,减少数据写入被阻塞的时长。
最后要说的一点是ES的版本迭代很快,对于Recovery的机制也在不断的优化中。 其中有一些版本甚至引入了一些bug,比如在ES1.4.x有严重的translog recovery bug,导致大的索引trans log recovery几乎无法完成 (issue #9226) 。因此实际使用中如果遇到问题,最好在Github的issue list里搜索一下,看是否使用的版本有其他人反映同样的问题。 收起阅读 »
elasticsearch-rtf更新至2.1.1
使用git快速签出最新版:
git clone git://github.com/medcl/elasticsearch-rtf.git -b master --depth 1
包含插件:
elasticsearch-analysis-ik-1.6.2 elasticsearch-analysis-pinyin-1.5.2
elasticsearch-analysis-mmseg-1.6.2 elasticsearch-analysis-stconvert-1.6.1
使用:
cd elasticsearch/bin
./elasticsearch
使用git快速签出最新版:
git clone git://github.com/medcl/elasticsearch-rtf.git -b master --depth 1
包含插件:
elasticsearch-analysis-ik-1.6.2 elasticsearch-analysis-pinyin-1.5.2
elasticsearch-analysis-mmseg-1.6.2 elasticsearch-analysis-stconvert-1.6.1
使用:
cd elasticsearch/bin
./elasticsearch
收起阅读 »
Day22:pipeline aggregation计算日留存率示例
目前我想到的比较容易达成的做法,是我们在记录用户登录操作日志的时候,把该用户的注册时间也同期输出。也就是说,这个索引的 mapping 是下面这样:
curl -XPUT 'http://127.0.0.1:9200/login-2015.12.23/' -d '{
"settings" : {
"number_of_shards" : 1
},
"mappings" : {
"logs" : {
"properties" : {
"uid" : { "type" : "string", "index" : "not_analyzed" },
"register_time" : { "type" : "date", "index" : "not_analyzed" },
"login_time" : { "type" : "date", "index" : "not_analyzed" }
}
}
}
}'
那么实际记录的日志会类似这样:{"index":{"_index":"login-2015.12.23","_type":"logs"}}
{"uid":"1","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-23T12:00:00Z"}
{"index":{"_index":"login-2015.12.23","_type":"logs"}}
{"uid":"2","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-23T12:00:00Z"}
{"index":{"_index":"login-2015.12.24","_type":"logs"}}
{"uid":"1","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-24T12:00:00Z"}
这段我虚拟的数据,表示 uid 为 1 的用户,23 号注册并登录,24 号再次登录;uid 为 2 的用户,23 号注册并登录,24 号无登录。显然以这短短 3 行示例数据,我们口算都知道单日留存率是 50% 了。那么怎么通过一次 ES 请求也算出来呢?下面就要用到 ES 2.0 新增加的 pipeline aggregation 了。
curl -XPOST 'http://127.0.0.1:9200/login-2015.12.23,login-2015.12.24/_search' -d'
{
"size" : 0,
"aggs" : {
"new_users" : {
"filters" : {
"filters" : [
{
"range" : {
"register_time" : {
"gte" : "2015-12-23",
"lt" : "2015-12-24"
}
}
}
]
},
"aggs" : {
"register_count" : {
"cardinality" : {
"field" : "uid"
}
},
"today" : {
"filter" : {
"range" : {
"login_time" : {
"gte" : "2015-12-24",
"lt" : "2015-12-25"
}
}
},
"aggs" : {
"login_count" : {
"cardinality" : {
"field" : "uid"
}
}
}
},
"retention" : {
"bucket_script" : {
"buckets_path" : {
"today_count" : "today>login_count",
"yesterday_count" : "register_count"
},
"script" : {
"lang" : "expression",
"inline" : "today_count / yesterday_count"
}
}
}
}
}
}
}'
这个 pipeline aggregation 在使用上有几个要点:- pipeline agg 的 parent agg 必须是返回数组的 buckets agg 类型。我这里曾经打算使用 filter agg 直接请求register_time:["now-2d" TO "now-1d"],结果报错说找不到 buckets_path 的 START_OBJECT。所以改用了 filters agg 的数组格式。
- bucket_script agg 同样受 scripting module 的影响。也就是说,官网示例里的"script":"today_count / yesterday_count" 这种写法,是采用了 groovy 引擎的 inline 模式。在 ES 2.0 的默认设置下,是被禁止运行的!所以,应该按照 scripting module 的统一要求,改写成 file 形式存放到 config/scripts下;或者改用 Lucene Expression 运行。考虑到 pipeline aggregation 只支持数值运算,这里使用 groovy 价值不大,所以直接指明 lang 参数即可。
最终这次请求的响应如下:
{
"took" : 3,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"failed" : 0
},
"hits" : {
"total" : 3,
"max_score" : 0.0,
"hits" : [ ]
},
"aggregations" : {
"new_users" : {
"buckets" : [ {
"doc_count" : 3,
"today" : {
"doc_count" : 1,
"login_count" : {
"value" : 1
}
},
"register_count" : {
"value" : 2
},
"retention" : {
"value" : 0.5
}
} ]
}
}
}
这个 retention 数据,就是我们要求解的 0.5 了。目前我想到的比较容易达成的做法,是我们在记录用户登录操作日志的时候,把该用户的注册时间也同期输出。也就是说,这个索引的 mapping 是下面这样:
curl -XPUT 'http://127.0.0.1:9200/login-2015.12.23/' -d '{
"settings" : {
"number_of_shards" : 1
},
"mappings" : {
"logs" : {
"properties" : {
"uid" : { "type" : "string", "index" : "not_analyzed" },
"register_time" : { "type" : "date", "index" : "not_analyzed" },
"login_time" : { "type" : "date", "index" : "not_analyzed" }
}
}
}
}'
那么实际记录的日志会类似这样:{"index":{"_index":"login-2015.12.23","_type":"logs"}}
{"uid":"1","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-23T12:00:00Z"}
{"index":{"_index":"login-2015.12.23","_type":"logs"}}
{"uid":"2","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-23T12:00:00Z"}
{"index":{"_index":"login-2015.12.24","_type":"logs"}}
{"uid":"1","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-24T12:00:00Z"}
这段我虚拟的数据,表示 uid 为 1 的用户,23 号注册并登录,24 号再次登录;uid 为 2 的用户,23 号注册并登录,24 号无登录。显然以这短短 3 行示例数据,我们口算都知道单日留存率是 50% 了。那么怎么通过一次 ES 请求也算出来呢?下面就要用到 ES 2.0 新增加的 pipeline aggregation 了。
curl -XPOST 'http://127.0.0.1:9200/login-2015.12.23,login-2015.12.24/_search' -d'
{
"size" : 0,
"aggs" : {
"new_users" : {
"filters" : {
"filters" : [
{
"range" : {
"register_time" : {
"gte" : "2015-12-23",
"lt" : "2015-12-24"
}
}
}
]
},
"aggs" : {
"register_count" : {
"cardinality" : {
"field" : "uid"
}
},
"today" : {
"filter" : {
"range" : {
"login_time" : {
"gte" : "2015-12-24",
"lt" : "2015-12-25"
}
}
},
"aggs" : {
"login_count" : {
"cardinality" : {
"field" : "uid"
}
}
}
},
"retention" : {
"bucket_script" : {
"buckets_path" : {
"today_count" : "today>login_count",
"yesterday_count" : "register_count"
},
"script" : {
"lang" : "expression",
"inline" : "today_count / yesterday_count"
}
}
}
}
}
}
}'
这个 pipeline aggregation 在使用上有几个要点:- pipeline agg 的 parent agg 必须是返回数组的 buckets agg 类型。我这里曾经打算使用 filter agg 直接请求register_time:["now-2d" TO "now-1d"],结果报错说找不到 buckets_path 的 START_OBJECT。所以改用了 filters agg 的数组格式。
- bucket_script agg 同样受 scripting module 的影响。也就是说,官网示例里的"script":"today_count / yesterday_count" 这种写法,是采用了 groovy 引擎的 inline 模式。在 ES 2.0 的默认设置下,是被禁止运行的!所以,应该按照 scripting module 的统一要求,改写成 file 形式存放到 config/scripts下;或者改用 Lucene Expression 运行。考虑到 pipeline aggregation 只支持数值运算,这里使用 groovy 价值不大,所以直接指明 lang 参数即可。
最终这次请求的响应如下:
{
"took" : 3,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"failed" : 0
},
"hits" : {
"total" : 3,
"max_score" : 0.0,
"hits" : [ ]
},
"aggregations" : {
"new_users" : {
"buckets" : [ {
"doc_count" : 3,
"today" : {
"doc_count" : 1,
"login_count" : {
"value" : 1
}
},
"register_count" : {
"value" : 2
},
"retention" : {
"value" : 0.5
}
} ]
}
}
}
这个 retention 数据,就是我们要求解的 0.5 了。收起阅读 »
Day21: 如何快速把Kibana4 Discover页的Document Table导出成CSV
但是Discover页上,除了顶部的date_histogram这个visualize,更重要的是下边的search document table的内容。当我们通过搜索发现异常信息,想要长期保存证据,或者分享给其他没有权限的外部人员的时候,单纯保存search到es,或者分享单条日志的link都不顶用,还是需要能导出成一个文件。
可惜Kibana4没有针对search document table的导出!
国外一家叫MineWhat的公司,最近公开了一个非常细小的创新方案,意图解决这个问题。他们的方式是:避免修改Kibana源码,而通过chrome浏览器插件完成……
点击这个地址安装chrome插件:https://chrome.google.com/webs ... lated
然后再访问Kibana的时候,你会发现自己的搜索框最右侧多了一个CSV按钮:
然后点击这个『CSV』按钮,会弹出一片提示:
可以点击选择,把search document table内容保存到本机的复制粘贴板,还是Google Drive网盘。
我们当然选择本机……
然后打开本地的文本文件,Ctrl+V,就看到编辑器里出现了整个CSV内容。
实测下来,发现有个小问题,粘贴出来的数据里丢掉了空格~不过聊胜于无吧,还是介绍给大家一试。
注意:这个功能只会导出目前页面上已经展示出来的table内容。并不代表其使用了scroll API去ES拉取全部结果集!
但是Discover页上,除了顶部的date_histogram这个visualize,更重要的是下边的search document table的内容。当我们通过搜索发现异常信息,想要长期保存证据,或者分享给其他没有权限的外部人员的时候,单纯保存search到es,或者分享单条日志的link都不顶用,还是需要能导出成一个文件。
可惜Kibana4没有针对search document table的导出!
国外一家叫MineWhat的公司,最近公开了一个非常细小的创新方案,意图解决这个问题。他们的方式是:避免修改Kibana源码,而通过chrome浏览器插件完成……
点击这个地址安装chrome插件:https://chrome.google.com/webs ... lated
然后再访问Kibana的时候,你会发现自己的搜索框最右侧多了一个CSV按钮:
然后点击这个『CSV』按钮,会弹出一片提示:
可以点击选择,把search document table内容保存到本机的复制粘贴板,还是Google Drive网盘。
我们当然选择本机……
然后打开本地的文本文件,Ctrl+V,就看到编辑器里出现了整个CSV内容。
实测下来,发现有个小问题,粘贴出来的数据里丢掉了空格~不过聊胜于无吧,还是介绍给大家一试。
注意:这个功能只会导出目前页面上已经展示出来的table内容。并不代表其使用了scroll API去ES拉取全部结果集! 收起阅读 »
简繁体转换插件更新:elasticsearch-analysis-stconvert 升级支持2.0
项目地址:https://github.com/medcl/elast ... nvert
mvn 编译打包,拷贝release下面的zip并解压到你的es plugins目录即可,需要重启es
这个插件帮你处理简繁体,简繁体全部统一成简体或繁体,不管输入的简体还是繁体,都能得到搜索结果
比如:
不管输入的是『北京国际电视台』的还是『北京國際電視臺』都能命中。
详细配置和使用请参照上面的地址。
项目地址:https://github.com/medcl/elast ... nvert
mvn 编译打包,拷贝release下面的zip并解压到你的es plugins目录即可,需要重启es
这个插件帮你处理简繁体,简繁体全部统一成简体或繁体,不管输入的简体还是繁体,都能得到搜索结果
比如:
不管输入的是『北京国际电视台』的还是『北京國際電視臺』都能命中。
详细配置和使用请参照上面的地址。
收起阅读 »