在 Mapping 里面,将 dynamic 参数设置成 strict 可以拒绝索引包含未知字段的文档。 此条 Tips 由 medcl 贡献。
beat

beat

heartbeat能否监控其余beat?

Beatspapaw 回复了问题 • 2 人关注 • 2 个回复 • 3467 次浏览 • 2022-03-24 16:51 • 来自相关话题

filebeat 可以把日志的时间替换为默认的timestamp时间吗?

Beatszgq25302111 回复了问题 • 3 人关注 • 2 个回复 • 5271 次浏览 • 2019-09-28 18:31 • 来自相关话题

通过packetbeat获取镜像流量,只想保留内网请求的(单向)流量

Beatstinker 回复了问题 • 2 人关注 • 1 个回复 • 3693 次浏览 • 2019-07-17 15:19 • 来自相关话题

windows下运行beat,只能前台运行,后台运行,就收集不到

回复

BeatsLeon_ 发起了问题 • 1 人关注 • 0 个回复 • 2694 次浏览 • 2019-05-27 10:19 • 来自相关话题

Day 9 - 动手实现一个自定义beat

AdventXinglong 发表了文章 • 0 个评论 • 3929 次浏览 • 2018-12-09 21:09 • 来自相关话题

参考

https://elasticsearch.cn/article/113

https://www.elastic.co/blog/build-your-own-beat

介绍

公司内部有统一的log收集系统,并且实现了定制的filebeat进行log收集。为了实现实时报警和监控,自定义的beat并没有直接把输出发给elasticsearch后端,而是中间会经过storm或者durid进行实时分析,然后落入es或者hdfs。同时由于是统一log收集,所以目前还没有针对具体的不同应用进行log的内容的切分,导致所有的log都是以一行为单位落入后端存储。于是需要针对不同的业务部门定制不同的beat。 本文初步尝试定制一个可以在beat端解析hdfs audit log的beat,限于篇幅,只实现了基本的文件解析功能。下面会从环境配置,代码实现,运行测试三个方面进行讲解。

环境配置

go version go1.9.4 linux/amd64

python version: 2.7.9

不得不吐槽下python的安装,各种坑。因为系统默认的python版本是2.7.5,而cookiecutter建议使用2.7.9

下面的工具会提供python本身需要依赖的一些native包

yum install openssl -y
yum install openssl-devel -y
yum install zlib-devel -y
yum install zlib -y

安装python

wget https://www.python.org/ftp/python/2.7.9/Python-2.7.9.tgz
tar -zxvf Python-2.7.9.tgz
cd ~/python/Python-2.7.9
./configure --prefix=/usr/local/python-2.7.9
make
make install

rm -f /bin/python
ln -s /usr/local/python-2.7.9/bin/python /bin/python

安装工具包 distribute, setuptools, pip

cd ~/python/setuptools-19.6 && python setup.py install
cd ~/python/pip-1.5.4  && python setup.py install
cd ~/python/distribute-0.7.3  && python setup.py install

安装cookiecutter

pip install --user cookiecutter

安装cookiecutter所依赖的工具

pip install backports.functools-lru-cache
pip install six
pip install virtualenv

*** virtualenv 安装好了之后,所在目录是在python的目录里面 (/usr/local/python-2.7.9/bin/virtualenv),需要配置好PATH,这个工具稍后会被beat的Makefile用到

代码实现

需要实现的功能比较简单,目标是打开hdfs-audit.log文件,然后逐行读取,同时解析出必要的信息,组装成event,然后发送出去,如果对接的es的话,需要同时支持自动在es端创建正确的mapping

使用官方提供的beat模板创建自己的beat

  • 需要设置好环境变量$GOPATH,本例子中GOPATH=/root/go
$ go get github.com/elastic/beats
$ cd $GOPATH/src/github.com/elastic/beats
$ git checkout 5.1

[root@minikube-2830379 suxingfate]# cookiecutter /root/go/src/github.com/elastic/beats/generate/beat
project_name [Examplebeat]: hdfsauditbeat
github_name [your-github-name]: suxingfate
beat [hdfsauditbeat]:
beat_path [github.com/suxingfate]:
full_name [Firstname Lastname]: xinglong

make setup

到这里,模板就生成了,然后就是定制需要的东西。

  • 1 _meta/beat.yml # 配置模板文件,定义我们的beat会接受哪些配置项
  • 2 config/config.go #使用go的struct定义整个config对象,包含所有的配置项
  • 3 beater/hdfsauditbeat.go # 核心逻辑代码
  • 4 _meta/fields.yml #这里是跟es对接的时候给es定义的mapping

1 _meta/beat.yml

这里增加了path,为后面配置hdfs-audit.log文件的位置留好坑

[root@minikube-2830379 hdfsauditbeat]# cat _meta/beat.yml
################### Hdfsauditbeat Configuration Example #########################

############################# Hdfsauditbeat ######################################

hdfsauditbeat:
  # Defines how often an event is sent to the output
  period: 1s
  path: "."

2 config/config.go

这里把path定义到struct里面,后面核心代码就可以从config对象获得path了

[root@minikube-2830379 hdfsauditbeat]# cat config/config.go
// Config is put into a different package to prevent cyclic imports in case
// it is needed in several locations

package config

import "time"

type Config struct {
    Period time.Duration `config:"period"`
        Path   string        `config:"path"`
}

var DefaultConfig = Config{
    Period: 1 * time.Second,
        Path:   ".",
}

3 beater/hdfsauditbeat.go

这里需要改动的地方是: 3.1 定义了一个catAudit函数来解析目标文件的每一行,同时生成自定义的event,然后发送出去 3.2 Run函数调用自定义的catAudit函数,从而把我们的功能嵌入

[root@minikube-2830379 hdfsauditbeat]# cat beater/hdfsauditbeat.go
package beater

import (
    "fmt"
    "time"
        "os"
        "io"
        "bufio"
        "strings"
    "github.com/elastic/beats/libbeat/beat"
    "github.com/elastic/beats/libbeat/common"
    "github.com/elastic/beats/libbeat/logp"
    "github.com/elastic/beats/libbeat/publisher"

    "github.com/suxingfate/hdfsauditbeat/config"
)

type Hdfsauditbeat struct {
    done   chan struct{}
    config config.Config
    client publisher.Client
}

// Creates beater
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
    config := config.DefaultConfig
    if err := cfg.Unpack(&config); err != nil {
        return nil, fmt.Errorf("Error reading config file: %v", err)
    }

    bt := &Hdfsauditbeat{
        done: make(chan struct{}),
        config: config,
    }
    return bt, nil
}

func (bt *Hdfsauditbeat) Run(b *beat.Beat) error {
    logp.Info("hdfsauditbeat is running! Hit CTRL-C to stop it.")

    bt.client = b.Publisher.Connect()
    ticker := time.NewTicker(bt.config.Period)
    counter := 1
    for {
        select {
        case <-bt.done:
            return nil
        case <-ticker.C:
        }

        bt.catAudit(bt.config.Path)

        logp.Info("Event sent")
        counter++
    }
}

func (bt *Hdfsauditbeat) Stop() {
    bt.client.Close()
    close(bt.done)
}

func (bt *Hdfsauditbeat) catAudit(auditFile string) {
    file, err := os.OpenFile(auditFile, os.O_RDWR, 0666)
    if err != nil {
        //fmt.Println("Open file error!", err)
        return
    }
    defer file.Close()

    buf := bufio.NewReader(file)
    for {
        line, err := buf.ReadString('\n')
        line = strings.TrimSpace(line)
        if line == "" {
            return
        }

    timeEnd := strings.Index(line, ",")
        timeString := line[0 :timeEnd]
        tm, _ := time.Parse("2006-01-02 03:04:05", timeString)

        ugiStart := strings.Index(line, "ugi=") + 4
        ugiEnd := strings.Index(line, " (auth")
        ugi := line[ugiStart :ugiEnd]

        cmdStart := strings.Index(line, "cmd=") + 4
        line = line[cmdStart:len(line)]
        cmdEnd := strings.Index(line, " ")
        cmd := line[0 : cmdEnd]

        srcStart := strings.Index(line, "src=") + 4
        line = line[srcStart:len(line)]
        srcEnd := strings.Index(line, " ")
        src := line[0:srcEnd]

        dstStart := strings.Index(line, "dst=") + 4
        line = line[dstStart:len(line)]
        dstEnd := strings.Index(line, " ")
        dst := line[0:dstEnd]

        event := common.MapStr{
                "@timestamp": common.Time(time.Unix(tm.Unix(), 0)),
                "ugi":       ugi,
                "cmd":       cmd,
                "src":    src,
                "dst":   dst,
            }
            bt.client.PublishEvent(event)

        if err != nil {
            if err == io.EOF {
                //fmt.Println("File read ok!")
                break
            } else {
                //fmt.Println("Read file error!", err)
                return
            }
        }
    }
}

4 _meat/fields.yml

[root@minikube-2830379 hdfsauditbeat]# less _meta/fields.yml
- key: hdfsauditbeat
  title: hdfsauditbeat
  description:
  fields:
    - name: counter
      type: long
      required: true
      description: >
        PLEASE UPDATE DOCUMENTATION
    #new fiels added hdfsaudit
    - name: entrytime
      type: date
    - name: ugi
      type: keyword
    - name: cmd
      type: keyword
    - name: src
      type: keyword
    - name: dst
      type: keyword

测试

首先编译好项目

make update
make

然后会发现生成了一个hdfsauditbeat文件,这个就是二进制的可执行文件。下面进行测试,这里偷了个懒,没有发给es,而是吐到console进行观察。 修改了一下配置文件,需要指定正确的需要消费的audit log文件的路径,另外就是修改了output为console

[root@minikube-2830379 hdfsauditbeat]# cat hdfsauditbeat.yml
################### Hdfsauditbeat Configuration Example #########################

############################# Hdfsauditbeat ######################################

hdfsauditbeat:
  # Defines how often an event is sent to the output
  period: 1s
  path: "/root/go/hdfs-audit.log"

#================================ General =====================================

# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
#name:

# The tags of the shipper are included in their own field with each
# transaction published.
#tags: ["service-X", "web-tier"]

# Optional fields that you can specify to add additional information to the
# output.
#fields:
#  env: staging

#================================ Outputs =====================================

# Configure what outputs to use when sending the data collected by the beat.
# Multiple outputs may be used.
#-------------------------- Elasticsearch output ------------------------------
#output.elasticsearch:
  # Array of hosts to connect to.
#  hosts: ["localhost:9200"]

  # Optional protocol and basic auth credentials.
  #protocol: "https"
  #username: "elastic"
  #password: "changeme"

#----------------------------- Logstash output --------------------------------
#output.logstash:
  # The Logstash hosts
  #hosts: ["localhost:5044"]

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

  # Certificate for SSL client authentication
  #ssl.certificate: "/etc/pki/client/cert.pem"

  # Client Certificate Key
  #ssl.key: "/etc/pki/client/cert.key"

output.console:
  pretty: true
#================================ Logging =====================================

# Sets log level. The default log level is info.
# Available log levels are: critical, error, warning, info, debug
#logging.level: debug

# At debug level, you can selectively enable logging only for some components.
# To enable all selectors use ["*"]. Examples of other selectors are "beat",
# "publish", "service".
#logging.selectors: ["*"]

开始执行

[root@minikube-2830379 hdfsauditbeat]# ./hdfsauditbeat
{
  "@timestamp": "2018-12-09T03:00:00.000Z",
  "beat": {
    "hostname": "minikube-2830379.lvs02.dev.abc.com",
    "name": "minikube-2830379.lvs02.dev.abc.com",
    "version": "5.1.3"
  },
  "cmd": "create",
  "dst": "null",
  "src": "/app-logs/app/logs/application_1540949675029_717305/lvsdpehdc25dn0444.stratus.lvs.abc.com_8042.tmp",
  "ugi": "appmon@APD.ABC.COM"
}
{
  "@timestamp": "2018-12-09T03:00:00.000Z",
  "beat": {
    "hostname": "minikube-2830379.lvs02.dev.abc.com",
    "name": "minikube-2830379.lvs02.dev.abc.com",
    "version": "5.1.3"
  },
  "cmd": "create",
  "dst": "null",
  "src": "/app-logs/appmon/logs/application_1540949675029_717305/lvsdpehdc25dn0444.stratus.lvs.abc.com_8042.tmp",
  "ugi": "appmon@APD.ABC.COM"
}

结束

使用自定义beat给我们提供了很大的灵活性,虽然pipline或者logstash也可以做到,但是使用场景还是有很大差别的。如果是调用特殊的命令获得输出,或者是本文的场景都更适合定制化beat。

请问这样做,es中的数据会重复吗

Elasticsearchlaoyang360 回复了问题 • 2 人关注 • 1 个回复 • 5250 次浏览 • 2018-05-01 10:24 • 来自相关话题

filebeat Template 报错

Beatsrockybean 回复了问题 • 2 人关注 • 1 个回复 • 13101 次浏览 • 2018-01-18 07:29 • 来自相关话题

在Elasticsearch-head看到乱码

回复

BeatsWithLin 发起了问题 • 1 人关注 • 0 个回复 • 3114 次浏览 • 2017-12-06 21:34 • 来自相关话题

beat收集数据meta头里面ip问题

Beatsmedcl 回复了问题 • 3 人关注 • 1 个回复 • 4976 次浏览 • 2017-07-27 09:02 • 来自相关话题

filebeat如何根据document type 映射到redis的多个key中

Beats谭雁宏 回复了问题 • 2 人关注 • 2 个回复 • 8808 次浏览 • 2017-06-26 16:38 • 来自相关话题

怎样在Kibana中显示Byte数据?

回复

Kibanasitanxin 发起了问题 • 1 人关注 • 0 个回复 • 4769 次浏览 • 2016-12-12 22:48 • 来自相关话题

Day6:用logstash-input-http_poller模拟nginxbeat

Advent三斗室 发表了文章 • 0 个评论 • 5999 次浏览 • 2015-12-07 00:18 • 来自相关话题

Elastic 公司最近推出了 beats 系列,在官方的 packet/top/file{beat} 之外,社区也自发制作了一些比如 docker/nginx/ 不过很可惜的是:nginxbeat 只支持两个数据来源:标准的 ngx_http_stub_status_module 和商业版 Nginx Plus 的ngx_http_status_module 我们都知道,ngx_http_stub_status_module 输出的信息太少,除了进程级别的连接数,啥都没有。那么,在使用开源版本 Nginx 的我们,还有别的办法么? 在官网的第三方模块列表里,发现了一个韩国人写的 nginx-module-vts。这个扩展可以做到 vhost 级别的状态信息输出。(我知道国人还有很多类似的统计扩展,但是没上官网,不便普及,就忽略吧) 但是,不懂 Golang 的话,没法自己动手实现一个 nginx-vts-beat 啊。怎么办? 其实我们可以用 logstash-input-http_poller 实现类似的功能。 首先,我们要给自己的 Nginx 加上 vts 扩展。编译方式这里就不讲了,和所有其他第三方模块一样。配置方式详见README。我们这里假设是按照核心和非核心接口来统计 URL 的状态:
http {
    vhost_traffic_status_zone;

    map $uri $filter_uri {
        default 'non-core';
        /2/api/timeline core;
        ~^/2/api/unread core;
    }

    server {
        vhost_traffic_status_filter_by_set_key $filter_uri;
        location /status {
            auth_basic "Restricted"; 
            auth_basic_user_file pass_file;
            vhost_traffic_status_display;
            vhost_traffic_status_display_format json;
        }
    }
}
然后我们需要下面一段 Logstash 配置来定期获取这个数据:
input {
  http_poller {
    urls => {
      0 => {
        method => get
        url => "http://localhost:80/status/format/json&quot;
        headers => {
          Accept => "application/json"
        }
        auth => {
          user => "YouKnowIKnow"
          password => "IKnowYouDonotKnow"
        }
      }
      1 => {
        method => get
        url => "http://localhost:80/status/con ... up%3D*"
        headers => {
          Accept => "application/json"
        }
        auth => {
          user => "YouKnowIKnow"
          password => "IKnowYouDonotKnow"
        }
      }
    }
    request_timeout => 60
    interval => 60
    codec => "json"
  }
}
这样,就可以每 60 秒,获得一次 vts 数据,并重置计数了。 注意,urls 是一个 Hash,所以他的执行顺序是根据 Hash.map 来的,为了确保我们是先获取数据再重置,这里干脆用 0, 1 来作为 Hash 的 key,这样顺序就没问题了。 想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。

heartbeat能否监控其余beat?

回复

Beatspapaw 回复了问题 • 2 人关注 • 2 个回复 • 3467 次浏览 • 2022-03-24 16:51 • 来自相关话题

filebeat 可以把日志的时间替换为默认的timestamp时间吗?

回复

Beatszgq25302111 回复了问题 • 3 人关注 • 2 个回复 • 5271 次浏览 • 2019-09-28 18:31 • 来自相关话题

通过packetbeat获取镜像流量,只想保留内网请求的(单向)流量

回复

Beatstinker 回复了问题 • 2 人关注 • 1 个回复 • 3693 次浏览 • 2019-07-17 15:19 • 来自相关话题

windows下运行beat,只能前台运行,后台运行,就收集不到

回复

BeatsLeon_ 发起了问题 • 1 人关注 • 0 个回复 • 2694 次浏览 • 2019-05-27 10:19 • 来自相关话题

请问这样做,es中的数据会重复吗

回复

Elasticsearchlaoyang360 回复了问题 • 2 人关注 • 1 个回复 • 5250 次浏览 • 2018-05-01 10:24 • 来自相关话题

filebeat Template 报错

回复

Beatsrockybean 回复了问题 • 2 人关注 • 1 个回复 • 13101 次浏览 • 2018-01-18 07:29 • 来自相关话题

在Elasticsearch-head看到乱码

回复

BeatsWithLin 发起了问题 • 1 人关注 • 0 个回复 • 3114 次浏览 • 2017-12-06 21:34 • 来自相关话题

beat收集数据meta头里面ip问题

回复

Beatsmedcl 回复了问题 • 3 人关注 • 1 个回复 • 4976 次浏览 • 2017-07-27 09:02 • 来自相关话题

filebeat如何根据document type 映射到redis的多个key中

回复

Beats谭雁宏 回复了问题 • 2 人关注 • 2 个回复 • 8808 次浏览 • 2017-06-26 16:38 • 来自相关话题

怎样在Kibana中显示Byte数据?

回复

Kibanasitanxin 发起了问题 • 1 人关注 • 0 个回复 • 4769 次浏览 • 2016-12-12 22:48 • 来自相关话题

Day 9 - 动手实现一个自定义beat

AdventXinglong 发表了文章 • 0 个评论 • 3929 次浏览 • 2018-12-09 21:09 • 来自相关话题

参考

https://elasticsearch.cn/article/113

https://www.elastic.co/blog/build-your-own-beat

介绍

公司内部有统一的log收集系统,并且实现了定制的filebeat进行log收集。为了实现实时报警和监控,自定义的beat并没有直接把输出发给elasticsearch后端,而是中间会经过storm或者durid进行实时分析,然后落入es或者hdfs。同时由于是统一log收集,所以目前还没有针对具体的不同应用进行log的内容的切分,导致所有的log都是以一行为单位落入后端存储。于是需要针对不同的业务部门定制不同的beat。 本文初步尝试定制一个可以在beat端解析hdfs audit log的beat,限于篇幅,只实现了基本的文件解析功能。下面会从环境配置,代码实现,运行测试三个方面进行讲解。

环境配置

go version go1.9.4 linux/amd64

python version: 2.7.9

不得不吐槽下python的安装,各种坑。因为系统默认的python版本是2.7.5,而cookiecutter建议使用2.7.9

下面的工具会提供python本身需要依赖的一些native包

yum install openssl -y
yum install openssl-devel -y
yum install zlib-devel -y
yum install zlib -y

安装python

wget https://www.python.org/ftp/python/2.7.9/Python-2.7.9.tgz
tar -zxvf Python-2.7.9.tgz
cd ~/python/Python-2.7.9
./configure --prefix=/usr/local/python-2.7.9
make
make install

rm -f /bin/python
ln -s /usr/local/python-2.7.9/bin/python /bin/python

安装工具包 distribute, setuptools, pip

cd ~/python/setuptools-19.6 && python setup.py install
cd ~/python/pip-1.5.4  && python setup.py install
cd ~/python/distribute-0.7.3  && python setup.py install

安装cookiecutter

pip install --user cookiecutter

安装cookiecutter所依赖的工具

pip install backports.functools-lru-cache
pip install six
pip install virtualenv

*** virtualenv 安装好了之后,所在目录是在python的目录里面 (/usr/local/python-2.7.9/bin/virtualenv),需要配置好PATH,这个工具稍后会被beat的Makefile用到

代码实现

需要实现的功能比较简单,目标是打开hdfs-audit.log文件,然后逐行读取,同时解析出必要的信息,组装成event,然后发送出去,如果对接的es的话,需要同时支持自动在es端创建正确的mapping

使用官方提供的beat模板创建自己的beat

  • 需要设置好环境变量$GOPATH,本例子中GOPATH=/root/go
$ go get github.com/elastic/beats
$ cd $GOPATH/src/github.com/elastic/beats
$ git checkout 5.1

[root@minikube-2830379 suxingfate]# cookiecutter /root/go/src/github.com/elastic/beats/generate/beat
project_name [Examplebeat]: hdfsauditbeat
github_name [your-github-name]: suxingfate
beat [hdfsauditbeat]:
beat_path [github.com/suxingfate]:
full_name [Firstname Lastname]: xinglong

make setup

到这里,模板就生成了,然后就是定制需要的东西。

  • 1 _meta/beat.yml # 配置模板文件,定义我们的beat会接受哪些配置项
  • 2 config/config.go #使用go的struct定义整个config对象,包含所有的配置项
  • 3 beater/hdfsauditbeat.go # 核心逻辑代码
  • 4 _meta/fields.yml #这里是跟es对接的时候给es定义的mapping

1 _meta/beat.yml

这里增加了path,为后面配置hdfs-audit.log文件的位置留好坑

[root@minikube-2830379 hdfsauditbeat]# cat _meta/beat.yml
################### Hdfsauditbeat Configuration Example #########################

############################# Hdfsauditbeat ######################################

hdfsauditbeat:
  # Defines how often an event is sent to the output
  period: 1s
  path: "."

2 config/config.go

这里把path定义到struct里面,后面核心代码就可以从config对象获得path了

[root@minikube-2830379 hdfsauditbeat]# cat config/config.go
// Config is put into a different package to prevent cyclic imports in case
// it is needed in several locations

package config

import "time"

type Config struct {
    Period time.Duration `config:"period"`
        Path   string        `config:"path"`
}

var DefaultConfig = Config{
    Period: 1 * time.Second,
        Path:   ".",
}

3 beater/hdfsauditbeat.go

这里需要改动的地方是: 3.1 定义了一个catAudit函数来解析目标文件的每一行,同时生成自定义的event,然后发送出去 3.2 Run函数调用自定义的catAudit函数,从而把我们的功能嵌入

[root@minikube-2830379 hdfsauditbeat]# cat beater/hdfsauditbeat.go
package beater

import (
    "fmt"
    "time"
        "os"
        "io"
        "bufio"
        "strings"
    "github.com/elastic/beats/libbeat/beat"
    "github.com/elastic/beats/libbeat/common"
    "github.com/elastic/beats/libbeat/logp"
    "github.com/elastic/beats/libbeat/publisher"

    "github.com/suxingfate/hdfsauditbeat/config"
)

type Hdfsauditbeat struct {
    done   chan struct{}
    config config.Config
    client publisher.Client
}

// Creates beater
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
    config := config.DefaultConfig
    if err := cfg.Unpack(&config); err != nil {
        return nil, fmt.Errorf("Error reading config file: %v", err)
    }

    bt := &Hdfsauditbeat{
        done: make(chan struct{}),
        config: config,
    }
    return bt, nil
}

func (bt *Hdfsauditbeat) Run(b *beat.Beat) error {
    logp.Info("hdfsauditbeat is running! Hit CTRL-C to stop it.")

    bt.client = b.Publisher.Connect()
    ticker := time.NewTicker(bt.config.Period)
    counter := 1
    for {
        select {
        case <-bt.done:
            return nil
        case <-ticker.C:
        }

        bt.catAudit(bt.config.Path)

        logp.Info("Event sent")
        counter++
    }
}

func (bt *Hdfsauditbeat) Stop() {
    bt.client.Close()
    close(bt.done)
}

func (bt *Hdfsauditbeat) catAudit(auditFile string) {
    file, err := os.OpenFile(auditFile, os.O_RDWR, 0666)
    if err != nil {
        //fmt.Println("Open file error!", err)
        return
    }
    defer file.Close()

    buf := bufio.NewReader(file)
    for {
        line, err := buf.ReadString('\n')
        line = strings.TrimSpace(line)
        if line == "" {
            return
        }

    timeEnd := strings.Index(line, ",")
        timeString := line[0 :timeEnd]
        tm, _ := time.Parse("2006-01-02 03:04:05", timeString)

        ugiStart := strings.Index(line, "ugi=") + 4
        ugiEnd := strings.Index(line, " (auth")
        ugi := line[ugiStart :ugiEnd]

        cmdStart := strings.Index(line, "cmd=") + 4
        line = line[cmdStart:len(line)]
        cmdEnd := strings.Index(line, " ")
        cmd := line[0 : cmdEnd]

        srcStart := strings.Index(line, "src=") + 4
        line = line[srcStart:len(line)]
        srcEnd := strings.Index(line, " ")
        src := line[0:srcEnd]

        dstStart := strings.Index(line, "dst=") + 4
        line = line[dstStart:len(line)]
        dstEnd := strings.Index(line, " ")
        dst := line[0:dstEnd]

        event := common.MapStr{
                "@timestamp": common.Time(time.Unix(tm.Unix(), 0)),
                "ugi":       ugi,
                "cmd":       cmd,
                "src":    src,
                "dst":   dst,
            }
            bt.client.PublishEvent(event)

        if err != nil {
            if err == io.EOF {
                //fmt.Println("File read ok!")
                break
            } else {
                //fmt.Println("Read file error!", err)
                return
            }
        }
    }
}

4 _meat/fields.yml

[root@minikube-2830379 hdfsauditbeat]# less _meta/fields.yml
- key: hdfsauditbeat
  title: hdfsauditbeat
  description:
  fields:
    - name: counter
      type: long
      required: true
      description: >
        PLEASE UPDATE DOCUMENTATION
    #new fiels added hdfsaudit
    - name: entrytime
      type: date
    - name: ugi
      type: keyword
    - name: cmd
      type: keyword
    - name: src
      type: keyword
    - name: dst
      type: keyword

测试

首先编译好项目

make update
make

然后会发现生成了一个hdfsauditbeat文件,这个就是二进制的可执行文件。下面进行测试,这里偷了个懒,没有发给es,而是吐到console进行观察。 修改了一下配置文件,需要指定正确的需要消费的audit log文件的路径,另外就是修改了output为console

[root@minikube-2830379 hdfsauditbeat]# cat hdfsauditbeat.yml
################### Hdfsauditbeat Configuration Example #########################

############################# Hdfsauditbeat ######################################

hdfsauditbeat:
  # Defines how often an event is sent to the output
  period: 1s
  path: "/root/go/hdfs-audit.log"

#================================ General =====================================

# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
#name:

# The tags of the shipper are included in their own field with each
# transaction published.
#tags: ["service-X", "web-tier"]

# Optional fields that you can specify to add additional information to the
# output.
#fields:
#  env: staging

#================================ Outputs =====================================

# Configure what outputs to use when sending the data collected by the beat.
# Multiple outputs may be used.
#-------------------------- Elasticsearch output ------------------------------
#output.elasticsearch:
  # Array of hosts to connect to.
#  hosts: ["localhost:9200"]

  # Optional protocol and basic auth credentials.
  #protocol: "https"
  #username: "elastic"
  #password: "changeme"

#----------------------------- Logstash output --------------------------------
#output.logstash:
  # The Logstash hosts
  #hosts: ["localhost:5044"]

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

  # Certificate for SSL client authentication
  #ssl.certificate: "/etc/pki/client/cert.pem"

  # Client Certificate Key
  #ssl.key: "/etc/pki/client/cert.key"

output.console:
  pretty: true
#================================ Logging =====================================

# Sets log level. The default log level is info.
# Available log levels are: critical, error, warning, info, debug
#logging.level: debug

# At debug level, you can selectively enable logging only for some components.
# To enable all selectors use ["*"]. Examples of other selectors are "beat",
# "publish", "service".
#logging.selectors: ["*"]

开始执行

[root@minikube-2830379 hdfsauditbeat]# ./hdfsauditbeat
{
  "@timestamp": "2018-12-09T03:00:00.000Z",
  "beat": {
    "hostname": "minikube-2830379.lvs02.dev.abc.com",
    "name": "minikube-2830379.lvs02.dev.abc.com",
    "version": "5.1.3"
  },
  "cmd": "create",
  "dst": "null",
  "src": "/app-logs/app/logs/application_1540949675029_717305/lvsdpehdc25dn0444.stratus.lvs.abc.com_8042.tmp",
  "ugi": "appmon@APD.ABC.COM"
}
{
  "@timestamp": "2018-12-09T03:00:00.000Z",
  "beat": {
    "hostname": "minikube-2830379.lvs02.dev.abc.com",
    "name": "minikube-2830379.lvs02.dev.abc.com",
    "version": "5.1.3"
  },
  "cmd": "create",
  "dst": "null",
  "src": "/app-logs/appmon/logs/application_1540949675029_717305/lvsdpehdc25dn0444.stratus.lvs.abc.com_8042.tmp",
  "ugi": "appmon@APD.ABC.COM"
}

结束

使用自定义beat给我们提供了很大的灵活性,虽然pipline或者logstash也可以做到,但是使用场景还是有很大差别的。如果是调用特殊的命令获得输出,或者是本文的场景都更适合定制化beat。

Day6:用logstash-input-http_poller模拟nginxbeat

Advent三斗室 发表了文章 • 0 个评论 • 5999 次浏览 • 2015-12-07 00:18 • 来自相关话题

Elastic 公司最近推出了 beats 系列,在官方的 packet/top/file{beat} 之外,社区也自发制作了一些比如 docker/nginx/ 不过很可惜的是:nginxbeat 只支持两个数据来源:标准的 ngx_http_stub_status_module 和商业版 Nginx Plus 的ngx_http_status_module 我们都知道,ngx_http_stub_status_module 输出的信息太少,除了进程级别的连接数,啥都没有。那么,在使用开源版本 Nginx 的我们,还有别的办法么? 在官网的第三方模块列表里,发现了一个韩国人写的 nginx-module-vts。这个扩展可以做到 vhost 级别的状态信息输出。(我知道国人还有很多类似的统计扩展,但是没上官网,不便普及,就忽略吧) 但是,不懂 Golang 的话,没法自己动手实现一个 nginx-vts-beat 啊。怎么办? 其实我们可以用 logstash-input-http_poller 实现类似的功能。 首先,我们要给自己的 Nginx 加上 vts 扩展。编译方式这里就不讲了,和所有其他第三方模块一样。配置方式详见README。我们这里假设是按照核心和非核心接口来统计 URL 的状态:
http {
    vhost_traffic_status_zone;

    map $uri $filter_uri {
        default 'non-core';
        /2/api/timeline core;
        ~^/2/api/unread core;
    }

    server {
        vhost_traffic_status_filter_by_set_key $filter_uri;
        location /status {
            auth_basic "Restricted"; 
            auth_basic_user_file pass_file;
            vhost_traffic_status_display;
            vhost_traffic_status_display_format json;
        }
    }
}
然后我们需要下面一段 Logstash 配置来定期获取这个数据:
input {
  http_poller {
    urls => {
      0 => {
        method => get
        url => "http://localhost:80/status/format/json&quot;
        headers => {
          Accept => "application/json"
        }
        auth => {
          user => "YouKnowIKnow"
          password => "IKnowYouDonotKnow"
        }
      }
      1 => {
        method => get
        url => "http://localhost:80/status/con ... up%3D*"
        headers => {
          Accept => "application/json"
        }
        auth => {
          user => "YouKnowIKnow"
          password => "IKnowYouDonotKnow"
        }
      }
    }
    request_timeout => 60
    interval => 60
    codec => "json"
  }
}
这样,就可以每 60 秒,获得一次 vts 数据,并重置计数了。 注意,urls 是一个 Hash,所以他的执行顺序是根据 Hash.map 来的,为了确保我们是先获取数据再重置,这里干脆用 0, 1 来作为 Hash 的 key,这样顺序就没问题了。 想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。