参考
https://elasticsearch.cn/article/113
https://www.elastic.co/blog/build-your-own-beat
介绍
公司内部有统一的log收集系统,并且实现了定制的filebeat进行log收集。为了实现实时报警和监控,自定义的beat并没有直接把输出发给elasticsearch后端,而是中间会经过storm或者durid进行实时分析,然后落入es或者hdfs。同时由于是统一log收集,所以目前还没有针对具体的不同应用进行log的内容的切分,导致所有的log都是以一行为单位落入后端存储。于是需要针对不同的业务部门定制不同的beat。 本文初步尝试定制一个可以在beat端解析hdfs audit log的beat,限于篇幅,只实现了基本的文件解析功能。下面会从环境配置,代码实现,运行测试三个方面进行讲解。
环境配置
go version go1.9.4 linux/amd64
python version: 2.7.9
不得不吐槽下python的安装,各种坑。因为系统默认的python版本是2.7.5,而cookiecutter建议使用2.7.9
下面的工具会提供python本身需要依赖的一些native包
yum install openssl -y
yum install openssl-devel -y
yum install zlib-devel -y
yum install zlib -y
安装python
wget https://www.python.org/ftp/python/2.7.9/Python-2.7.9.tgz
tar -zxvf Python-2.7.9.tgz
cd ~/python/Python-2.7.9
./configure --prefix=/usr/local/python-2.7.9
make
make install
rm -f /bin/python
ln -s /usr/local/python-2.7.9/bin/python /bin/python
安装工具包 distribute, setuptools, pip
cd ~/python/setuptools-19.6 && python setup.py install
cd ~/python/pip-1.5.4 && python setup.py install
cd ~/python/distribute-0.7.3 && python setup.py install
安装cookiecutter
pip install --user cookiecutter
安装cookiecutter所依赖的工具
pip install backports.functools-lru-cache
pip install six
pip install virtualenv
*** virtualenv 安装好了之后,所在目录是在python的目录里面 (/usr/local/python-2.7.9/bin/virtualenv),需要配置好PATH,这个工具稍后会被beat的Makefile用到
代码实现
需要实现的功能比较简单,目标是打开hdfs-audit.log文件,然后逐行读取,同时解析出必要的信息,组装成event,然后发送出去,如果对接的es的话,需要同时支持自动在es端创建正确的mapping
使用官方提供的beat模板创建自己的beat
- 需要设置好环境变量$GOPATH,本例子中GOPATH=/root/go
$ go get github.com/elastic/beats
$ cd $GOPATH/src/github.com/elastic/beats
$ git checkout 5.1
[root@minikube-2830379 suxingfate]# cookiecutter /root/go/src/github.com/elastic/beats/generate/beat
project_name [Examplebeat]: hdfsauditbeat
github_name [your-github-name]: suxingfate
beat [hdfsauditbeat]:
beat_path [github.com/suxingfate]:
full_name [Firstname Lastname]: xinglong
make setup
到这里,模板就生成了,然后就是定制需要的东西。
- 1 _meta/beat.yml # 配置模板文件,定义我们的beat会接受哪些配置项
- 2 config/config.go #使用go的struct定义整个config对象,包含所有的配置项
- 3 beater/hdfsauditbeat.go # 核心逻辑代码
- 4 _meta/fields.yml #这里是跟es对接的时候给es定义的mapping
1 _meta/beat.yml
这里增加了path,为后面配置hdfs-audit.log文件的位置留好坑
[root@minikube-2830379 hdfsauditbeat]# cat _meta/beat.yml
################### Hdfsauditbeat Configuration Example #########################
############################# Hdfsauditbeat ######################################
hdfsauditbeat:
# Defines how often an event is sent to the output
period: 1s
path: "."
2 config/config.go
这里把path定义到struct里面,后面核心代码就可以从config对象获得path了
[root@minikube-2830379 hdfsauditbeat]# cat config/config.go
// Config is put into a different package to prevent cyclic imports in case
// it is needed in several locations
package config
import "time"
type Config struct {
Period time.Duration `config:"period"`
Path string `config:"path"`
}
var DefaultConfig = Config{
Period: 1 * time.Second,
Path: ".",
}
3 beater/hdfsauditbeat.go
这里需要改动的地方是: 3.1 定义了一个catAudit函数来解析目标文件的每一行,同时生成自定义的event,然后发送出去 3.2 Run函数调用自定义的catAudit函数,从而把我们的功能嵌入
[root@minikube-2830379 hdfsauditbeat]# cat beater/hdfsauditbeat.go
package beater
import (
"fmt"
"time"
"os"
"io"
"bufio"
"strings"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
"github.com/suxingfate/hdfsauditbeat/config"
)
type Hdfsauditbeat struct {
done chan struct{}
config config.Config
client publisher.Client
}
// Creates beater
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
config := config.DefaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, fmt.Errorf("Error reading config file: %v", err)
}
bt := &Hdfsauditbeat{
done: make(chan struct{}),
config: config,
}
return bt, nil
}
func (bt *Hdfsauditbeat) Run(b *beat.Beat) error {
logp.Info("hdfsauditbeat is running! Hit CTRL-C to stop it.")
bt.client = b.Publisher.Connect()
ticker := time.NewTicker(bt.config.Period)
counter := 1
for {
select {
case <-bt.done:
return nil
case <-ticker.C:
}
bt.catAudit(bt.config.Path)
logp.Info("Event sent")
counter++
}
}
func (bt *Hdfsauditbeat) Stop() {
bt.client.Close()
close(bt.done)
}
func (bt *Hdfsauditbeat) catAudit(auditFile string) {
file, err := os.OpenFile(auditFile, os.O_RDWR, 0666)
if err != nil {
//fmt.Println("Open file error!", err)
return
}
defer file.Close()
buf := bufio.NewReader(file)
for {
line, err := buf.ReadString('\n')
line = strings.TrimSpace(line)
if line == "" {
return
}
timeEnd := strings.Index(line, ",")
timeString := line[0 :timeEnd]
tm, _ := time.Parse("2006-01-02 03:04:05", timeString)
ugiStart := strings.Index(line, "ugi=") + 4
ugiEnd := strings.Index(line, " (auth")
ugi := line[ugiStart :ugiEnd]
cmdStart := strings.Index(line, "cmd=") + 4
line = line[cmdStart:len(line)]
cmdEnd := strings.Index(line, " ")
cmd := line[0 : cmdEnd]
srcStart := strings.Index(line, "src=") + 4
line = line[srcStart:len(line)]
srcEnd := strings.Index(line, " ")
src := line[0:srcEnd]
dstStart := strings.Index(line, "dst=") + 4
line = line[dstStart:len(line)]
dstEnd := strings.Index(line, " ")
dst := line[0:dstEnd]
event := common.MapStr{
"@timestamp": common.Time(time.Unix(tm.Unix(), 0)),
"ugi": ugi,
"cmd": cmd,
"src": src,
"dst": dst,
}
bt.client.PublishEvent(event)
if err != nil {
if err == io.EOF {
//fmt.Println("File read ok!")
break
} else {
//fmt.Println("Read file error!", err)
return
}
}
}
}
4 _meat/fields.yml
[root@minikube-2830379 hdfsauditbeat]# less _meta/fields.yml
- key: hdfsauditbeat
title: hdfsauditbeat
description:
fields:
- name: counter
type: long
required: true
description: >
PLEASE UPDATE DOCUMENTATION
#new fiels added hdfsaudit
- name: entrytime
type: date
- name: ugi
type: keyword
- name: cmd
type: keyword
- name: src
type: keyword
- name: dst
type: keyword
测试
首先编译好项目
make update
make
然后会发现生成了一个hdfsauditbeat文件,这个就是二进制的可执行文件。下面进行测试,这里偷了个懒,没有发给es,而是吐到console进行观察。 修改了一下配置文件,需要指定正确的需要消费的audit log文件的路径,另外就是修改了output为console
[root@minikube-2830379 hdfsauditbeat]# cat hdfsauditbeat.yml
################### Hdfsauditbeat Configuration Example #########################
############################# Hdfsauditbeat ######################################
hdfsauditbeat:
# Defines how often an event is sent to the output
period: 1s
path: "/root/go/hdfs-audit.log"
#================================ General =====================================
# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
#name:
# The tags of the shipper are included in their own field with each
# transaction published.
#tags: ["service-X", "web-tier"]
# Optional fields that you can specify to add additional information to the
# output.
#fields:
# env: staging
#================================ Outputs =====================================
# Configure what outputs to use when sending the data collected by the beat.
# Multiple outputs may be used.
#-------------------------- Elasticsearch output ------------------------------
#output.elasticsearch:
# Array of hosts to connect to.
# hosts: ["localhost:9200"]
# Optional protocol and basic auth credentials.
#protocol: "https"
#username: "elastic"
#password: "changeme"
#----------------------------- Logstash output --------------------------------
#output.logstash:
# The Logstash hosts
#hosts: ["localhost:5044"]
# Optional SSL. By default is off.
# List of root certificates for HTTPS server verifications
#ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]
# Certificate for SSL client authentication
#ssl.certificate: "/etc/pki/client/cert.pem"
# Client Certificate Key
#ssl.key: "/etc/pki/client/cert.key"
output.console:
pretty: true
#================================ Logging =====================================
# Sets log level. The default log level is info.
# Available log levels are: critical, error, warning, info, debug
#logging.level: debug
# At debug level, you can selectively enable logging only for some components.
# To enable all selectors use ["*"]. Examples of other selectors are "beat",
# "publish", "service".
#logging.selectors: ["*"]
开始执行
[root@minikube-2830379 hdfsauditbeat]# ./hdfsauditbeat
{
"@timestamp": "2018-12-09T03:00:00.000Z",
"beat": {
"hostname": "minikube-2830379.lvs02.dev.abc.com",
"name": "minikube-2830379.lvs02.dev.abc.com",
"version": "5.1.3"
},
"cmd": "create",
"dst": "null",
"src": "/app-logs/app/logs/application_1540949675029_717305/lvsdpehdc25dn0444.stratus.lvs.abc.com_8042.tmp",
"ugi": "appmon@APD.ABC.COM"
}
{
"@timestamp": "2018-12-09T03:00:00.000Z",
"beat": {
"hostname": "minikube-2830379.lvs02.dev.abc.com",
"name": "minikube-2830379.lvs02.dev.abc.com",
"version": "5.1.3"
},
"cmd": "create",
"dst": "null",
"src": "/app-logs/appmon/logs/application_1540949675029_717305/lvsdpehdc25dn0444.stratus.lvs.abc.com_8042.tmp",
"ugi": "appmon@APD.ABC.COM"
}
结束
使用自定义beat给我们提供了很大的灵活性,虽然pipline或者logstash也可以做到,但是使用场景还是有很大差别的。如果是调用特殊的命令获得输出,或者是本文的场景都更适合定制化beat。
本文地址:http://searchkit.cn/article/6182