怎么又是你

Elasticsearch:使用 Filebeat 从 Node.js Web 应用程序提取日志

本指南演示了如何从 Node.js Web 应用程序中提取日志并将它们安全地传送到 Elasticsearch Service 部署中。 你将设置 Filebeat 来监控具有标准 Elastic Common Schema (ECS) 格式字段的 JSON 结构化日志文件,然后你将在向 Node.js 服务器发出请求时查看 Kibana 中日志事件的实时可视化。 虽然此示例使用了 Node.js,但这种监视日志输出的方法适用于许多客户端类型。 检查可用 ECS 日志记录插件的列表。

https://elasticstack.blog.csdn ... 42695
继续阅读 »
本指南演示了如何从 Node.js Web 应用程序中提取日志并将它们安全地传送到 Elasticsearch Service 部署中。 你将设置 Filebeat 来监控具有标准 Elastic Common Schema (ECS) 格式字段的 JSON 结构化日志文件,然后你将在向 Node.js 服务器发出请求时查看 Kibana 中日志事件的实时可视化。 虽然此示例使用了 Node.js,但这种监视日志输出的方法适用于许多客户端类型。 检查可用 ECS 日志记录插件的列表。

https://elasticstack.blog.csdn ... 42695 收起阅读 »

Beats:运用 Filebeat 来对微服务 API 进行分析

在之前的文章 “Logstash:使用 ELK 堆栈进行 API 分析”,我展示了如何使用 Logstash 的 http_poller input plugin 来对微服务的 API 数据进行分析。如果我们只是为了对微服务的数据采集使用 Logstash,感觉有些 Logstash 有些大材小用了,比较 Logstash 的部署还是蛮耗资源的。相比较而言,我们可以使用 Filebeat 的 HTTP JSON input 来对微服务的数据进行采集。我们知道 Beats 的架构是属于一种轻量级的部署。可以直接运用于客户端,或者需要采集的服务器中。


原文链接:https://blog.csdn.net/UbuntuTo ... 45104
继续阅读 »
在之前的文章 “Logstash:使用 ELK 堆栈进行 API 分析”,我展示了如何使用 Logstash 的 http_poller input plugin 来对微服务的 API 数据进行分析。如果我们只是为了对微服务的数据采集使用 Logstash,感觉有些 Logstash 有些大材小用了,比较 Logstash 的部署还是蛮耗资源的。相比较而言,我们可以使用 Filebeat 的 HTTP JSON input 来对微服务的数据进行采集。我们知道 Beats 的架构是属于一种轻量级的部署。可以直接运用于客户端,或者需要采集的服务器中。


原文链接:https://blog.csdn.net/UbuntuTo ... 45104 收起阅读 »

Beats:运用 Filebeat 把 MQTT 数据导入到 Elasticsearch

物联网(IoT)描述了嵌入传感器,软件和其他技术的物理对象(“物”)网络,目的是通过 Internet 与其他设备和系统进行数据连接和交换。物联网是一张很大的网,它比互联网更大,更为广泛。随着物联网的发展,大量的数据可以上传到 Elasticsearch 中,并作为理想的存储空间。Elasticsearch 可以存储 PB 级的数据并实时分析数据。Elasticsearch 可以用甚至运用机器学习对数据进行异常分析,并作出异常报警。 MQTT 协议被广泛使用于物联网中。使用 MQTT 输入读取用于小型和移动设备的,使用轻量级消息传递协议传输的数据,该协议针对高延迟或不可靠的网络进行了优化。那么我们该如果把 MQTT 的数据传入到 Elasticsearch 中呢?

在 Elastic Stack 中的 filebeat 已经对这种协议进行了支持。我们可以使用 MQTT input 来作为 filebeat 的输入。此输入连接到 MQTT 代理,订阅选定的主题并将数据解析为公共消息行。 一切都发生在行过滤,多行和 JSON 解码之前,因此此输入可以与这些设置结合使用。
 
详细阅读,请参阅 https://elasticstack.blog.csdn ... 77596
继续阅读 »
物联网(IoT)描述了嵌入传感器,软件和其他技术的物理对象(“物”)网络,目的是通过 Internet 与其他设备和系统进行数据连接和交换。物联网是一张很大的网,它比互联网更大,更为广泛。随着物联网的发展,大量的数据可以上传到 Elasticsearch 中,并作为理想的存储空间。Elasticsearch 可以存储 PB 级的数据并实时分析数据。Elasticsearch 可以用甚至运用机器学习对数据进行异常分析,并作出异常报警。 MQTT 协议被广泛使用于物联网中。使用 MQTT 输入读取用于小型和移动设备的,使用轻量级消息传递协议传输的数据,该协议针对高延迟或不可靠的网络进行了优化。那么我们该如果把 MQTT 的数据传入到 Elasticsearch 中呢?

在 Elastic Stack 中的 filebeat 已经对这种协议进行了支持。我们可以使用 MQTT input 来作为 filebeat 的输入。此输入连接到 MQTT 代理,订阅选定的主题并将数据解析为公共消息行。 一切都发生在行过滤,多行和 JSON 解码之前,因此此输入可以与这些设置结合使用。
 
详细阅读,请参阅 https://elasticstack.blog.csdn ... 77596 收起阅读 »

一次filebeat i/o timeout 问题记录-ES内存引起


问题
kibana展示数据表明数据采集中断了,没有新的日志数据进来了。
排查
查看logstash日志:
[2019-01-07T14:59:27,435][INFO ][org.logstash.beats.BeatsHandler] Exception: Connection reset by peer
[2019-01-07T14:59:29,870][INFO ][org.logstash.beats.BeatsHandler] Exception: Connection reset by peer
[2019-01-07T14:59:29,870][INFO ][org.logstash.beats.BeatsHandler] Exception: Connection reset by peer
[2019-01-07T14:59:41,719][INFO ][org.logstash.beats.BeatsHandler] Exception: Connection reset by peer
[2019-01-07T14:59:42,777][INFO ][org.logstash.beats.BeatsHandler] Exception: Connection reset by peer
[2019-01-07T14:59:48,227][INFO ][org.logstash.beats.BeatsHandler] Exception: Connection reset by peer

查看filebeat日志:
2019-01-07T15:00:13+08:00 INFO No non-zero metrics in the last 30s
2019-01-07T15:00:43+08:00 INFO Non-zero metrics in the last 30s: libbeat.logstash.call_count.PublishEvents=1 libbeat.logstash.publish.write_bytes=241120
2019-01-07T15:00:48+08:00 ERR Failed to publish events (host: 10.68.24.138:5044:10200), caused by: read tcp 10.68.24.46:59310->10.68.24.138:5044: i/o timeout
2019-01-07T15:00:48+08:00 INFO Error publishing events (retrying): read tcp 10.68.24.46:59310->10.68.24.138:5044: i/o timeout
2019-01-07T15:01:13+08:00 INFO Non-zero metrics in the last 30s: libbeat.logstash.publish.read_errors=1 libbeat.logstash.published_but_not_acked_events=2034

查看的初步结果是,filebeat连不上logstash,logstash一直重置filebeat的连接,但是这两个机器是一点问题没有。
 
日志看过了,没有明显的问题,那就按部就班一步一步查吧

1、先来最基本的,查看elasticsearch、logstash、filebeat是否启动。
2、网络,网络环境是之前配置好的,一直没有变的,网络的可能性小一些,但是也是使用telnet测试一下各个端口是不是通的。
3、logstash故障,查看是不是因为logstash的未知故障,记录logstash的日志,然后重启logstash,看看重启logstash后是否解决问题了。
4、日志,查看日志是否是在更新,在5分钟以内是否在更新,因为是在运行的环境,日志一般不会断,所以我把这个检查放在了第四步。
5、查看ES的硬盘和内存。
GET /_cat/allocation?v
GET _cat/nodes?v

问题排查到第五步已经发现原因了:ES其中一台机器的内存满了。

原因始末
在部署这套ELK环境的时候,由于服务器提供方当时提供的两台ES机器的内存不一样,一台是8G的,一台是4G的,所以在使用的的时候,我配置的ES的堆内存一台是4G,一台是2G;ES集群就两台机器,也没配置数据节点和客户端节点,其实三台、四台我也都不配置的,集群太小再分开配置,就没有服务器了。
开始使用的时候是没有问题的,但是当日志达到一定量的时候,2G的那台机器堆内存耗光了,然后就出现了日志不能采集的i/o timeout问题。

经验
在使用ELK的过程中,以上的五种原因导致的filebeat日志采集异常,我都遇见过,其中容易忽略的就是ES的内存和硬盘是否已经满了,当ES集群中其中一台机器的堆内存和硬盘满了的话,都会引起日志采集异常。所以在配置ES集群的时候最好所有的data节点的内存和硬盘配置一致。
 
继续阅读 »

问题
kibana展示数据表明数据采集中断了,没有新的日志数据进来了。
排查
查看logstash日志:
[2019-01-07T14:59:27,435][INFO ][org.logstash.beats.BeatsHandler] Exception: Connection reset by peer
[2019-01-07T14:59:29,870][INFO ][org.logstash.beats.BeatsHandler] Exception: Connection reset by peer
[2019-01-07T14:59:29,870][INFO ][org.logstash.beats.BeatsHandler] Exception: Connection reset by peer
[2019-01-07T14:59:41,719][INFO ][org.logstash.beats.BeatsHandler] Exception: Connection reset by peer
[2019-01-07T14:59:42,777][INFO ][org.logstash.beats.BeatsHandler] Exception: Connection reset by peer
[2019-01-07T14:59:48,227][INFO ][org.logstash.beats.BeatsHandler] Exception: Connection reset by peer

查看filebeat日志:
2019-01-07T15:00:13+08:00 INFO No non-zero metrics in the last 30s
2019-01-07T15:00:43+08:00 INFO Non-zero metrics in the last 30s: libbeat.logstash.call_count.PublishEvents=1 libbeat.logstash.publish.write_bytes=241120
2019-01-07T15:00:48+08:00 ERR Failed to publish events (host: 10.68.24.138:5044:10200), caused by: read tcp 10.68.24.46:59310->10.68.24.138:5044: i/o timeout
2019-01-07T15:00:48+08:00 INFO Error publishing events (retrying): read tcp 10.68.24.46:59310->10.68.24.138:5044: i/o timeout
2019-01-07T15:01:13+08:00 INFO Non-zero metrics in the last 30s: libbeat.logstash.publish.read_errors=1 libbeat.logstash.published_but_not_acked_events=2034

查看的初步结果是,filebeat连不上logstash,logstash一直重置filebeat的连接,但是这两个机器是一点问题没有。
 
日志看过了,没有明显的问题,那就按部就班一步一步查吧

1、先来最基本的,查看elasticsearch、logstash、filebeat是否启动。
2、网络,网络环境是之前配置好的,一直没有变的,网络的可能性小一些,但是也是使用telnet测试一下各个端口是不是通的。
3、logstash故障,查看是不是因为logstash的未知故障,记录logstash的日志,然后重启logstash,看看重启logstash后是否解决问题了。
4、日志,查看日志是否是在更新,在5分钟以内是否在更新,因为是在运行的环境,日志一般不会断,所以我把这个检查放在了第四步。
5、查看ES的硬盘和内存。
GET /_cat/allocation?v
GET _cat/nodes?v

问题排查到第五步已经发现原因了:ES其中一台机器的内存满了。

原因始末
在部署这套ELK环境的时候,由于服务器提供方当时提供的两台ES机器的内存不一样,一台是8G的,一台是4G的,所以在使用的的时候,我配置的ES的堆内存一台是4G,一台是2G;ES集群就两台机器,也没配置数据节点和客户端节点,其实三台、四台我也都不配置的,集群太小再分开配置,就没有服务器了。
开始使用的时候是没有问题的,但是当日志达到一定量的时候,2G的那台机器堆内存耗光了,然后就出现了日志不能采集的i/o timeout问题。

经验
在使用ELK的过程中,以上的五种原因导致的filebeat日志采集异常,我都遇见过,其中容易忽略的就是ES的内存和硬盘是否已经满了,当ES集群中其中一台机器的堆内存和硬盘满了的话,都会引起日志采集异常。所以在配置ES集群的时候最好所有的data节点的内存和硬盘配置一致。
  收起阅读 »

Day 18: 记filebeat内存泄漏问题分析及调优

ELK 从发布5.0之后加入了beats套件之后,就改名叫做elastic stack了。beats是一组轻量级的软件,给我们提供了简便,快捷的方式来实时收集、丰富更多的数据用以支撑我们的分析。但由于beats都需要安装在ELK集群之外,在宿主机之上,其对宿主机的性能的影响往往成为了考量其是否能被使用的关键,而不是它到底提供了什么样的功能。因为业务的稳定运行才是核心KPI,而其他因运维而生的数据永远是更低的优先级。影响宿主机性能的方面可能有很多,比如CPU占用率,网络吞吐占用率,磁盘IO,内存等,这里我们详细讨论一下内存泄漏的问题

@[toc]

filebeat是beats套件的核心组件之一(另一个核心是metricbeat),用于采集文件内容并发送到收集端(ES),它一般安装在宿主机上,即生成文件的机器。根据文档的描述,filebeat是不建议用来采集NFS(网络共享磁盘)上的数据的,因此,我们这里只讨论filebeat对本地文件进行采集时的性能情况。

当filebeat部署和运行之后,必定会对cpu,内存,网络等资源产生一定的消耗,当这种消耗能够限定在一个可接受的范围时,在企业内部的生产服务器上大规模部署filebeat是可行的。但如果出现一些非预期的情况,比如占用了大量的内存,那么运维团队肯定是优先保障核心业务的资源,把filebeat进程给杀了。很可惜的是,内存泄漏的问题,从filebeat的诞生到现在就一直没有完全解决过。(可以区社区讨论贴看看,直到现在V6.5.1都还有人在报告内存泄漏的问题)。在特定的场景和配置下,内存占用过多已经成为了抑止filebeat大规模部署的主要问题了。在这里,我主要描述一下我碰到的在filebeat 6.0上遇到的问题。

问题场景和配置

一开始我们在很多机器上部署了filebeat,并且使用了一套统一无差别的的简单配置。对于想要在企业内部大规模推广filebeat的同学来说,这是大忌!!! 合理的方式是具体问题具体分析,需对每台机器上产生文件的方式和rotate的方式进行充分的调研,针对不同的场景是做定制化的配置。以下是我们之前使用的配置:

  • multiline,多行的配置,当日志文件不符合规范,大量的匹配pattern的时候,会造成内存泄漏
  • max_procs,限制filebeat的进程数量,其实是内核数,建议手动设为1
filebeat.prospectors:
- type: log
  enabled: true
  paths:
    - /qhapp/*/*.log
  tail_files: true
  multiline.pattern: '^[[:space:]]+|^Caused by:|^.+Exception:|^\d+\serror'
  multiline.negate: false
  multiline.match: after
  fields:
    app_id: bi_lass
    service: "{{ hostvars[inventory_hostname]['service'] }}"
    ip_address: "{{ hostvars[inventory_hostname]['ansible_host'] }}"
    topic: qh_app_raw_log

filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false

setup.template.settings:
  index.number_of_shards: 3
  #index.codec: best_compression
  #_source.enabled: false
output.kafka:
  enabled: true
  hosts: [{{kafka_url}}]

  topic: '%{[fields][topic]}'

max_procs: 1

注意,以上的配置中,仅仅对cpu的内核数进行了限制,而没有对内存的使用率进行特殊的限制。从配置层面来说,影响filebeat内存使用情况的指标主要有两个:

  • queue.mem.events消息队列的大小,默认值是4096,这个参数在6.0以前的版本是spool-size,通过命令行,在启动时进行配置
  • max_message_bytes 单条消息的大小, 默认值是10M

filebeat最大的可能占用的内存是max_message_bytes * queue.mem.events = 40G,考虑到这个queue是用于存储encode过的数据,raw数据也是要存储的,所以,在没有对内存进行限制的情况下,最大的内存占用情况是可以达到超过80G

因此,建议是同时对filebeat的CPU和内存进行限制。

下面,我们看看,使用以上的配置在什么情况下会观测到内存泄漏

监控文件过多

对于实时大量产生内容的文件,比如日志,常用的做法往往是将日志文件进行rotate,根据策略的不同,每隔一段时间或者达到固定大小之后,将日志rotate。 这样,在文件目录下可能会产生大量的日志文件。 如果我们使用通配符的方式,去监控该目录,则filebeat会启动大量的harvester实例去采集文件。但是,请记住,我这里不是说这样一定会产生内存泄漏,只是在这里观测到了内存泄漏而已,不是说这是造成内存泄漏的原因。

20181126205516139.png

当filebeat运行了几个月之后,占用了超过10个G的内存

20181126205316830.png

非常频繁的rotate日志

另一个可能是,filebeat只配置监控了一个文件,比如test2.log,但由于test2.log不停的rotate出新的文件,虽然没有使用通配符采集该目录下的所有文件,但因为linux系统是使用inode number来唯一标示文件的,rotate出来的新文件并没有改变其inode number,因此,时间上filebeat还是同时开启了对多个文件的监控。

20181126220013971.png

另外,因为对文件进行rotate的时候,一般会限制rotate的个数,即到达一定数量时,新rotate一个文件,必然会删除一个旧的文件,文件删除之后,inode number是可以复用的,如果不巧,新rotate出来的文件被分配了一个之前已删掉文件的inode number,而此时filebeat还没有监测之前持有该inode number的文件已删除,则会抛出以下异常:

2018-11-21T18:06:55+08:00 ERR  Harvester could not be started on truncated file: /qhapp/logs/bd-etl/logs/test2.log, Err: Error setting up harvester: Harvester setup failed. Unexpected file opening error: file info is not identical with opened file. Aborting harvesting and retrying file later again

而类似Harvester setup failed.的异常会导致内存泄漏

https://github.com/elastic/beats/issues/6797

因为multiline导致内存占用过多

multiline.pattern: '^[[:space:]]+|^Caused by:|^.+Exception:|^\d+\serror,比如这个配置,认为空格或者制表符开头的line是上一行的附加内容,需要作为多行模式,存储到同一个event当中。当你监控的文件刚巧在文件的每一行带有一个空格时,会错误的匹配多行,造成filebeat解析过后,单条event的行数达到了上千行,大小达到了10M,并且在这过程中使用的是正则表达式,每一条event的处理都会极大的消耗内存。因为大多数的filebeat output是需应答的,buffer这些event必然会大量的消耗内存。

模拟场景

这里不多说,简单来一段python的代码:

[loggers]
keys=root

[handlers]
keys=NormalHandler

[formatters]
keys=formatter

[logger_root]
level=DEBUG
handlers=NormalHandler

[handler_NormalHandler]
class=logging.handlers.TimedRotatingFileHandler
formatter=formatter
args=('./test2.log', 'S', 10, 200)

[formatter_formatter]
format=%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s

以上,每隔10秒('S', 'M' = 分钟,'D'= 天)rotate一个文件,一共可以rotate 200个文件。 然后,随便找一段日志,不停的打,以下是330条/秒

import logging
from logging.config import fileConfig
import os
import time
CURRENT_FOLDER = os.path.dirname(os.path.realpath(__file__))

fileConfig(CURRENT_FOLDER + '/logging.ini')
logger = logging.getLogger()

while True:
    logger.debug("DEBUG 2018-11-26 09:31:35 com.sunyard.insurance.server.GetImage 43 - 资源请求:date=20181126&file_name=/imagedata/imv2/pool1/images/GXTB/2017/11/14/57/06b6bcdd31763b70b20f56c689e51f5e_1/06b6bcdd31763b70b20f56c689e51f5e_2.syd&file_encrypt=0&token=HUtGGG20GH4GAqq209R9tc9UGtAURR0b DEBUG 2018-11-26 09:31:40 com.sunyard.insurance.scheduler.job.DbEroorHandleJob 26 - [数据库操作异常处理JOB]处理异常文件,本机不运行,退出任务!")
    logger.debug("DEBUG 2018-11-26 09:31:35 com.sunyard.insurance.server.GetImage 43 - 资源请求:date=20181126&file_name=/imagedata/imv2/pool1/images/GXTB/2017/11/14/57/06b6bcdd31763b70b20f56c689e51f5e_1/06b6bcdd31763b70b20f56c689e51f5e_2.syd&file_encrypt=0&token=HUtGGG20GH4GAqq209R9tc9UGtAURR0b DEBUG 2018-11-26 09:31:40 com.sunyard.insurance.scheduler.job.DbEroorHandleJob 26 - [数据库操作异常处理JOB]处理异常文件,本机不运行,退出任务!")
    logger.debug("DEBUG 2018-11-26 09:31:35 com.sunyard.insurance.server.GetImage 43 - 资源请求:date=20181126&file_name=/imagedata/imv2/pool1/images/GXTB/2017/11/14/57/06b6bcdd31763b70b20f56c689e51f5e_1/06b6bcdd31763b70b20f56c689e51f5e_2.syd&file_encrypt=0&token=HUtGGG20GH4GAqq209R9tc9UGtAURR0b DEBUG 2018-11-26 09:31:40 com.sunyard.insurance.scheduler.job.DbEroorHandleJob 26 - [数据库操作异常处理JOB]处理异常文件,本机不运行,退出任务!")
    logger.debug("DEBUG 2018-11-26 09:31:35 com.sunyard.insurance.server.GetImage 43 - 资源请求:date=20181126&file_name=/imagedata/imv2/pool1/images/GXTB/2017/11/14/57/06b6bcdd31763b70b20f56c689e51f5e_1/06b6bcdd31763b70b20f56c689e51f5e_2.syd&file_encrypt=0&token=HUtGGG20GH4GAqq209R9tc9UGtAURR0b DEBUG 2018-11-26 09:31:40 com.sunyard.insurance.scheduler.job.DbEroorHandleJob 26 - [数据库操作异常处理JOB]处理异常文件,本机不运行,退出任务!")
    logger.debug("DEBUG 2018-11-26 09:31:35 com.sunyard.insurance.server.GetImage 43 - 资源请求:date=20181126&file_name=/imagedata/imv2/pool1/images/GXTB/2017/11/14/57/06b6bcdd31763b70b20f56c689e51f5e_1/06b6bcdd31763b70b20f56c689e51f5e_2.syd&file_encrypt=0&token=HUtGGG20GH4GAqq209R9tc9UGtAURR0b DEBUG 2018-11-26 09:31:40 com.sunyard.insurance.scheduler.job.DbEroorHandleJob 26 - [数据库操作异常处理JOB]处理异常文件,本机不运行,退出任务!")
    logger.debug("DEBUG 2018-11-26 09:31:35 com.sunyard.insurance.server.GetImage 43 - 资源请求:date=20181126&file_name=/imagedata/imv2/pool1/images/GXTB/2017/11/14/57/06b6bcdd31763b70b20f56c689e51f5e_1/06b6bcdd31763b70b20f56c689e51f5e_2.syd&file_encrypt=0&token=HUtGGG20GH4GAqq209R9tc9UGtAURR0b DEBUG 2018-11-26 09:31:40 com.sunyard.insurance.scheduler.job.DbEroorHandleJob 26 - [数据库操作异常处理JOB]处理异常文件,本机不运行,退出任务!")
    logger.debug("DEBUG 2018-11-26 09:31:35 com.sunyard.insurance.server.GetImage 43 - 资源请求:date=20181126&file_name=/imagedata/imv2/pool1/images/GXTB/2017/11/14/57/06b6bcdd31763b70b20f56c689e51f5e_1/06b6bcdd31763b70b20f56c689e51f5e_2.syd&file_encrypt=0&token=HUtGGG20GH4GAqq209R9tc9UGtAURR0b DEBUG 2018-11-26 09:31:40 com.sunyard.insurance.scheduler.job.DbEroorHandleJob 26 - [数据库操作异常处理JOB]处理异常文件,本机不运行,退出任务!")
    logger.debug("DEBUG 2018-11-26 09:31:35 com.sunyard.insurance.server.GetImage 43 - 资源请求:date=20181126&file_name=/imagedata/imv2/pool1/images/GXTB/2017/11/14/57/06b6bcdd31763b70b20f56c689e51f5e_1/06b6bcdd31763b70b20f56c689e51f5e_2.syd&file_encrypt=0&token=HUtGGG20GH4GAqq209R9tc9UGtAURR0b DEBUG 2018-11-26 09:31:40 com.sunyard.insurance.scheduler.job.DbEroorHandleJob 26 - [数据库操作异常处理JOB]处理异常文件,本机不运行,退出任务!")
    logger.debug("DEBUG 2018-11-26 09:31:35 com.sunyard.insurance.server.GetImage 43 - 资源请求:date=20181126&file_name=/imagedata/imv2/pool1/images/GXTB/2017/11/14/57/06b6bcdd31763b70b20f56c689e51f5e_1/06b6bcdd31763b70b20f56c689e51f5e_2.syd&file_encrypt=0&token=HUtGGG20GH4GAqq209R9tc9UGtAURR0b DEBUG 2018-11-26 09:31:40 com.sunyard.insurance.scheduler.job.DbEroorHandleJob 26 - [数据库操作异常处理JOB]处理异常文件,本机不运行,退出任务!")
    logger.debug("DEBUG 2018-11-26 09:31:35 com.sunyard.insurance.server.GetImage 43 - 资源请求:date=20181126&file_name=/imagedata/imv2/pool1/images/GXTB/2017/11/14/57/06b6bcdd31763b70b20f56c689e51f5e_1/06b6bcdd31763b70b20f56c689e51f5e_2.syd&file_encrypt=0&token=HUtGGG20GH4GAqq209R9tc9UGtAURR0b DEBUG 2018-11-26 09:31:40 com.sunyard.insurance.scheduler.job.DbEroorHandleJob 26 - [数据库操作异常处理JOB]处理异常文件,本机不运行,退出任务!!@#!@#!@#!@#!@#!@#!@#!@#!@#!@#!@#!#@!!!@##########################################################################################################################################################")
    time.sleep(0.03)

如何观察filebeat的内存

在6.3版本之前,我们是无法通过xpack的monitoring功能来观察beats套件的性能的。因此,这里讨论的是没有monitoring时,我们如何去检测filebeat的性能。当然,简单的方法是通过top,ps等操作系统的命令进行查看,但这些都是实时的,无法做趋势的观察,并且都是进程级别的,无法看到filebeat内部的真是情况。因此,这里介绍如何通过filebeat的日志和pprof这个工具来观察内存的使用情况

通过filebeat的日志

filebeat文件解读

其实filebeat的日志,已经包含了很多参数用于实时观测filebeat的资源使用情况,以下是filebeat的一个日志片段(这里的日志片段是6.0版本的,6.3版本之后,整个日志格式变了,从kv格式变成了json对象格式,xpack可以直接通过日志进行filebeat的monitoring):

2018-11-02T17:40:01+08:00 INFO Non-zero metrics in the last 30s: beat.memstats.gc_next=623475680 beat.memstats.memory_alloc=391032232 beat.memstats.memory_total=155885103371024 filebeat.events.active=-402 filebeat.events.added=13279 filebeat.events.done=13681 filebeat.harvester.closed=1 filebeat.harvester.open_files=7 filebeat.harvester.running=7 filebeat.harvester.started=2 libbeat.config.module.running=0 libbeat.output.events.acked=13677 libbeat.output.events.batches=28 libbeat.output.events.total=13677 libbeat.outputs.kafka.bytes_read=12112 libbeat.outputs.kafka.bytes_write=1043381 libbeat.pipeline.clients=1 libbeat.pipeline.events.active=0 libbeat.pipeline.events.filtered=4 libbeat.pipeline.events.published=13275 libbeat.pipeline.events.total=13279 libbeat.pipeline.queue.acked=13677 registrar.states.cleanup=1 registrar.states.current=8 registrar.states.update=13681 registrar.writes=28

里面的参数主要分成三个部分:

  • beat.*,包含memstats.gc_next,memstats.memory_alloc,memstats.memory_total,这个是所有beat组件都有的指标,是filebeat继承来的,主要是内存相关的,我们这里特别关注memstats.memory_alloc,alloc的越多,占用内存越大
  • filebeat.*,这部分是filebeat特有的指标,通过event相关的指标,我们知道吞吐,通过harvester,我们知道正在监控多少个文件,未消费event堆积的越多,havester创建的越多,消耗内存越大
  • libbeat.*,也是beats组件通用的指标,包含outputs和pipeline等信息。这里要主要当outputs发生阻塞的时候,会直接影响queue里面event的消费,造成内存堆积
  • registrar,filebeat将监控文件的状态放在registry文件里面,当监控文件非常多的时候,比如10万个,而且没有合理的设置close_inactive参数,这个文件能达到100M,载入内存后,直接占用内存

filebeat日志解析

当然,我们不可能直接去读这个日志,既然我们使用ELK,肯定是用ELK进行解读。因为是kv格式,很方便,用logstash的kv plugin:

filter {
  kv {}
}

kv无法指定properties的type,所以,我们需要稍微指定了一下索引的模版:

PUT _template/template_1
{
  "index_patterns": ["filebeat*"],
  "settings": {
    "number_of_shards": 1
  },
  "mappings": {
    "doc": {
      "_source": {
        "enabled": false
      },
      "dynamic_templates": [
        {
          "longs_as_strings": {
            "match_mapping_type": "string",
            "path_match":   "*beat.*",
            "path_unmatch": "*.*name",
            "mapping": {
              "type": "long"
            }
          }
        }
      ]
    }
  }
}

上面的模版,将kv解析出的properties都mapping到long类型,但注意"path_match": "*beat.*"无法match到registrar的指标,读者可以自己写一个更完善的mapping。 这样,我们就可以通过kibana可视化组件,清楚的看到内存泄漏的过程

20181127114253940.png

以及资源的使用情况:

20181127114342557.png

将信息可视化之后,我们可以明显的发现,内存的突变和ERR是同时发生的

20181127114536608.png

即以下error: 2018-11-27T09:05:44+08:00 ERR Harvester could not be started on new file: /qhapp/logs/bd-etl/logs/test2.log, Err: Error setting up harvester: Harvester setup failed. Unexpected file opening error: file info is not identical with opened file. Aborting harvesting and retrying file later again

会导致filebeat突然申请了额外的内存。具体请查看issue

通过pprof

众所周知,filebeat是用go语言实现的,而go语言本身的基础库里面就包含pprof这个功能极其强大的性能分析工具,只是这个工具是用于debug的,在正常模式下,filebeat是不会启动这个选贤的,并且很遗憾,在官方文档里面根本没有提及我们可以使用pprof来观测filebeat。我们接下来可以通过6.3上修复的一个内存泄漏的issue,来学习怎么使用pprof进行分析

启动pprof监测

首先,需要让filebeat在启动的时候运行pprof,具体的做法是在启动是加上参数-httpprof localhost:6060,即/usr/share/filebeat/bin/filebeat -c /etc/filebeat/filebeat.yml -path.home /usr/share/filebeat -path.config /etc/filebeat -path.data /var/lib/filebeat -path.logs /var/log/filebeat -httpprof localhost:6060。这里只绑定了localhost,无法通过远程访问,如果想远程访问,应该使用0.0.0.0。 这时,你就可以通过curl http://localhost:6060/debug/pprof/heap > profile.txt等命令,获取filebeat的实时堆栈信息了。

远程连接

当然,你也可以通过在你的本地电脑上安装go,然后通过go tool远程连接pprof。 因为我们是需要研究内存的问题,所以以下连接访问的是/heap子路径 go tool pprof http://10.60.x.x:6060/debug/pprof/heap

top 命令

连接之后,你可以通过top命令,查看消耗内存最多的几个实例:

33159.58kB of 33159.58kB total (  100%)
Dropped 308 nodes (cum <= 165.80kB)
Showing top 10 nodes out of 51 (cum >= 512.04kB)
      flat  flat%   sum%        cum   cum%
19975.92kB 60.24% 60.24% 19975.92kB 60.24%  runtime.malg
 7680.66kB 23.16% 83.40%  7680.66kB 23.16%  github.com/elastic/beats/filebeat/channel.SubOutlet
 2048.19kB  6.18% 89.58%  2048.19kB  6.18%  github.com/elastic/beats/filebeat/prospector/log.NewHarvester
 1357.91kB  4.10% 93.68%  1357.91kB  4.10%  runtime.allgadd
 1024.08kB  3.09% 96.76%  1024.08kB  3.09%  runtime.acquireSudog
  544.67kB  1.64% 98.41%   544.67kB  1.64%  github.com/elastic/beats/libbeat/publisher/queue/memqueue.NewBroker
  528.17kB  1.59%   100%   528.17kB  1.59%  regexp.(*bitState).reset
         0     0%   100%   528.17kB  1.59%  github.com/elastic/beats/filebeat/beater.(*Filebeat).Run
         0     0%   100%   512.04kB  1.54%  github.com/elastic/beats/filebeat/channel.CloseOnSignal.func1
         0     0%   100%   512.04kB  1.54%  github.com/elastic/beats/filebeat/channel.SubOutlet.func1

查看堆栈调用图

输入web命令,会生产堆栈调用关系的svg图,在这个svg图中,你可以结合top命令一起查看,在top中,我们已经知道github.com/elastic/beats/filebeat/channel.SubOutlet占用了很多的内存,在图中,展现的是调用关系栈,你可以看到这个类是怎么被实例化的,并且在整个堆中,内存是怎么分布的。最直观的是,实例所处的长方形面积越大,代表占用的内存越多。:

20181126222514954.png

查看源码

通过list命令,可以迅速查看可以实例的问题源码,比如在之前的top10命令中,我们已经看到github.com/elastic/beats/filebeat/channel.SubOutlet这个类的实例占用了大量的内存,我们可以通过list做进一步的分析,看看这个类内部在哪个语句开始出现内存的占用:

(pprof) list SubOutlet
Total: 32.38MB
ROUTINE ======================== github.com/elastic/beats/filebeat/channel.SubOutlet in /home/jeremy/src/go/src/github.com/elastic/beats/filebeat/channel/util.go
    7.50MB     7.50MB (flat, cum) 23.16% of Total
         .          .     15:// SubOutlet create a sub-outlet, which can be closed individually, without closing the
         .          .     16:// underlying outlet.
         .          .     17:func SubOutlet(out Outleter) Outleter {
         .          .     18:   s := &subOutlet{
         .          .     19:       isOpen: atomic.MakeBool(true),
       1MB        1MB     20:       done:   make(chan struct{}),
       2MB        2MB     21:       ch:     make(chan *util.Data),
    4.50MB     4.50MB     22:       res:    make(chan bool, 1),
         .          .     23:   }
         .          .     24:
         .          .     25:   go func() {
         .          .     26:       for event := range s.ch {
         .          .     27:           s.res <- out.OnEvent(event) 

如何调优

其实调优的过程就是调整参数的过程,之前说过了,和内存相关的参数, max_message_bytes,queue.mem.events,queue.mem.flush.min_events,以及队列占用内存的公式:max_message_bytes * queue.mem.events

output.kafka:
  enabled: true
#  max_message_bytes: 1000000
  hosts: ["10.60.x.x:9092"]
  topic: '%{[fields][topic]}'
max_procs: 1 
#queue.mem.events: 256
#queue.mem.flush.min_events: 128

但其实,不同的环境下,不同的原因都可能会造成filebeat占用的内存过大,此时,需要仔细的确认你的上下文环境:

  • 是否因为通配符的原因,造成同时监控数量巨大的文件,这种情况应该避免用通配符监控无用的文件。
  • 是否文件的单行内容巨大,确定是否需要改造文件内容,或者将其过滤
  • 是否过多的匹配了multiline的pattern,并且多行的event是否单条体积过大。这时,就需要暂时关闭multiline,修改文件内容或者multiline的pattern。
  • 是否output经常阻塞,event queue里面总是一直缓存event。这时要检查你的网络环境或者消息队列等中间件是否正常
继续阅读 »

ELK 从发布5.0之后加入了beats套件之后,就改名叫做elastic stack了。beats是一组轻量级的软件,给我们提供了简便,快捷的方式来实时收集、丰富更多的数据用以支撑我们的分析。但由于beats都需要安装在ELK集群之外,在宿主机之上,其对宿主机的性能的影响往往成为了考量其是否能被使用的关键,而不是它到底提供了什么样的功能。因为业务的稳定运行才是核心KPI,而其他因运维而生的数据永远是更低的优先级。影响宿主机性能的方面可能有很多,比如CPU占用率,网络吞吐占用率,磁盘IO,内存等,这里我们详细讨论一下内存泄漏的问题

@[toc]

filebeat是beats套件的核心组件之一(另一个核心是metricbeat),用于采集文件内容并发送到收集端(ES),它一般安装在宿主机上,即生成文件的机器。根据文档的描述,filebeat是不建议用来采集NFS(网络共享磁盘)上的数据的,因此,我们这里只讨论filebeat对本地文件进行采集时的性能情况。

当filebeat部署和运行之后,必定会对cpu,内存,网络等资源产生一定的消耗,当这种消耗能够限定在一个可接受的范围时,在企业内部的生产服务器上大规模部署filebeat是可行的。但如果出现一些非预期的情况,比如占用了大量的内存,那么运维团队肯定是优先保障核心业务的资源,把filebeat进程给杀了。很可惜的是,内存泄漏的问题,从filebeat的诞生到现在就一直没有完全解决过。(可以区社区讨论贴看看,直到现在V6.5.1都还有人在报告内存泄漏的问题)。在特定的场景和配置下,内存占用过多已经成为了抑止filebeat大规模部署的主要问题了。在这里,我主要描述一下我碰到的在filebeat 6.0上遇到的问题。

问题场景和配置

一开始我们在很多机器上部署了filebeat,并且使用了一套统一无差别的的简单配置。对于想要在企业内部大规模推广filebeat的同学来说,这是大忌!!! 合理的方式是具体问题具体分析,需对每台机器上产生文件的方式和rotate的方式进行充分的调研,针对不同的场景是做定制化的配置。以下是我们之前使用的配置:

  • multiline,多行的配置,当日志文件不符合规范,大量的匹配pattern的时候,会造成内存泄漏
  • max_procs,限制filebeat的进程数量,其实是内核数,建议手动设为1
filebeat.prospectors:
- type: log
  enabled: true
  paths:
    - /qhapp/*/*.log
  tail_files: true
  multiline.pattern: '^[[:space:]]+|^Caused by:|^.+Exception:|^\d+\serror'
  multiline.negate: false
  multiline.match: after
  fields:
    app_id: bi_lass
    service: "{{ hostvars[inventory_hostname]['service'] }}"
    ip_address: "{{ hostvars[inventory_hostname]['ansible_host'] }}"
    topic: qh_app_raw_log

filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false

setup.template.settings:
  index.number_of_shards: 3
  #index.codec: best_compression
  #_source.enabled: false
output.kafka:
  enabled: true
  hosts: [{{kafka_url}}]

  topic: '%{[fields][topic]}'

max_procs: 1

注意,以上的配置中,仅仅对cpu的内核数进行了限制,而没有对内存的使用率进行特殊的限制。从配置层面来说,影响filebeat内存使用情况的指标主要有两个:

  • queue.mem.events消息队列的大小,默认值是4096,这个参数在6.0以前的版本是spool-size,通过命令行,在启动时进行配置
  • max_message_bytes 单条消息的大小, 默认值是10M

filebeat最大的可能占用的内存是max_message_bytes * queue.mem.events = 40G,考虑到这个queue是用于存储encode过的数据,raw数据也是要存储的,所以,在没有对内存进行限制的情况下,最大的内存占用情况是可以达到超过80G

因此,建议是同时对filebeat的CPU和内存进行限制。

下面,我们看看,使用以上的配置在什么情况下会观测到内存泄漏

监控文件过多

对于实时大量产生内容的文件,比如日志,常用的做法往往是将日志文件进行rotate,根据策略的不同,每隔一段时间或者达到固定大小之后,将日志rotate。 这样,在文件目录下可能会产生大量的日志文件。 如果我们使用通配符的方式,去监控该目录,则filebeat会启动大量的harvester实例去采集文件。但是,请记住,我这里不是说这样一定会产生内存泄漏,只是在这里观测到了内存泄漏而已,不是说这是造成内存泄漏的原因。

20181126205516139.png

当filebeat运行了几个月之后,占用了超过10个G的内存

20181126205316830.png

非常频繁的rotate日志

另一个可能是,filebeat只配置监控了一个文件,比如test2.log,但由于test2.log不停的rotate出新的文件,虽然没有使用通配符采集该目录下的所有文件,但因为linux系统是使用inode number来唯一标示文件的,rotate出来的新文件并没有改变其inode number,因此,时间上filebeat还是同时开启了对多个文件的监控。

20181126220013971.png

另外,因为对文件进行rotate的时候,一般会限制rotate的个数,即到达一定数量时,新rotate一个文件,必然会删除一个旧的文件,文件删除之后,inode number是可以复用的,如果不巧,新rotate出来的文件被分配了一个之前已删掉文件的inode number,而此时filebeat还没有监测之前持有该inode number的文件已删除,则会抛出以下异常:

2018-11-21T18:06:55+08:00 ERR  Harvester could not be started on truncated file: /qhapp/logs/bd-etl/logs/test2.log, Err: Error setting up harvester: Harvester setup failed. Unexpected file opening error: file info is not identical with opened file. Aborting harvesting and retrying file later again

而类似Harvester setup failed.的异常会导致内存泄漏

https://github.com/elastic/beats/issues/6797

因为multiline导致内存占用过多

multiline.pattern: '^[[:space:]]+|^Caused by:|^.+Exception:|^\d+\serror,比如这个配置,认为空格或者制表符开头的line是上一行的附加内容,需要作为多行模式,存储到同一个event当中。当你监控的文件刚巧在文件的每一行带有一个空格时,会错误的匹配多行,造成filebeat解析过后,单条event的行数达到了上千行,大小达到了10M,并且在这过程中使用的是正则表达式,每一条event的处理都会极大的消耗内存。因为大多数的filebeat output是需应答的,buffer这些event必然会大量的消耗内存。

模拟场景

这里不多说,简单来一段python的代码:

[loggers]
keys=root

[handlers]
keys=NormalHandler

[formatters]
keys=formatter

[logger_root]
level=DEBUG
handlers=NormalHandler

[handler_NormalHandler]
class=logging.handlers.TimedRotatingFileHandler
formatter=formatter
args=('./test2.log', 'S', 10, 200)

[formatter_formatter]
format=%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s

以上,每隔10秒('S', 'M' = 分钟,'D'= 天)rotate一个文件,一共可以rotate 200个文件。 然后,随便找一段日志,不停的打,以下是330条/秒

import logging
from logging.config import fileConfig
import os
import time
CURRENT_FOLDER = os.path.dirname(os.path.realpath(__file__))

fileConfig(CURRENT_FOLDER + '/logging.ini')
logger = logging.getLogger()

while True:
    logger.debug("DEBUG 2018-11-26 09:31:35 com.sunyard.insurance.server.GetImage 43 - 资源请求:date=20181126&file_name=/imagedata/imv2/pool1/images/GXTB/2017/11/14/57/06b6bcdd31763b70b20f56c689e51f5e_1/06b6bcdd31763b70b20f56c689e51f5e_2.syd&file_encrypt=0&token=HUtGGG20GH4GAqq209R9tc9UGtAURR0b DEBUG 2018-11-26 09:31:40 com.sunyard.insurance.scheduler.job.DbEroorHandleJob 26 - [数据库操作异常处理JOB]处理异常文件,本机不运行,退出任务!")
    logger.debug("DEBUG 2018-11-26 09:31:35 com.sunyard.insurance.server.GetImage 43 - 资源请求:date=20181126&file_name=/imagedata/imv2/pool1/images/GXTB/2017/11/14/57/06b6bcdd31763b70b20f56c689e51f5e_1/06b6bcdd31763b70b20f56c689e51f5e_2.syd&file_encrypt=0&token=HUtGGG20GH4GAqq209R9tc9UGtAURR0b DEBUG 2018-11-26 09:31:40 com.sunyard.insurance.scheduler.job.DbEroorHandleJob 26 - [数据库操作异常处理JOB]处理异常文件,本机不运行,退出任务!")
    logger.debug("DEBUG 2018-11-26 09:31:35 com.sunyard.insurance.server.GetImage 43 - 资源请求:date=20181126&file_name=/imagedata/imv2/pool1/images/GXTB/2017/11/14/57/06b6bcdd31763b70b20f56c689e51f5e_1/06b6bcdd31763b70b20f56c689e51f5e_2.syd&file_encrypt=0&token=HUtGGG20GH4GAqq209R9tc9UGtAURR0b DEBUG 2018-11-26 09:31:40 com.sunyard.insurance.scheduler.job.DbEroorHandleJob 26 - [数据库操作异常处理JOB]处理异常文件,本机不运行,退出任务!")
    logger.debug("DEBUG 2018-11-26 09:31:35 com.sunyard.insurance.server.GetImage 43 - 资源请求:date=20181126&file_name=/imagedata/imv2/pool1/images/GXTB/2017/11/14/57/06b6bcdd31763b70b20f56c689e51f5e_1/06b6bcdd31763b70b20f56c689e51f5e_2.syd&file_encrypt=0&token=HUtGGG20GH4GAqq209R9tc9UGtAURR0b DEBUG 2018-11-26 09:31:40 com.sunyard.insurance.scheduler.job.DbEroorHandleJob 26 - [数据库操作异常处理JOB]处理异常文件,本机不运行,退出任务!")
    logger.debug("DEBUG 2018-11-26 09:31:35 com.sunyard.insurance.server.GetImage 43 - 资源请求:date=20181126&file_name=/imagedata/imv2/pool1/images/GXTB/2017/11/14/57/06b6bcdd31763b70b20f56c689e51f5e_1/06b6bcdd31763b70b20f56c689e51f5e_2.syd&file_encrypt=0&token=HUtGGG20GH4GAqq209R9tc9UGtAURR0b DEBUG 2018-11-26 09:31:40 com.sunyard.insurance.scheduler.job.DbEroorHandleJob 26 - [数据库操作异常处理JOB]处理异常文件,本机不运行,退出任务!")
    logger.debug("DEBUG 2018-11-26 09:31:35 com.sunyard.insurance.server.GetImage 43 - 资源请求:date=20181126&file_name=/imagedata/imv2/pool1/images/GXTB/2017/11/14/57/06b6bcdd31763b70b20f56c689e51f5e_1/06b6bcdd31763b70b20f56c689e51f5e_2.syd&file_encrypt=0&token=HUtGGG20GH4GAqq209R9tc9UGtAURR0b DEBUG 2018-11-26 09:31:40 com.sunyard.insurance.scheduler.job.DbEroorHandleJob 26 - [数据库操作异常处理JOB]处理异常文件,本机不运行,退出任务!")
    logger.debug("DEBUG 2018-11-26 09:31:35 com.sunyard.insurance.server.GetImage 43 - 资源请求:date=20181126&file_name=/imagedata/imv2/pool1/images/GXTB/2017/11/14/57/06b6bcdd31763b70b20f56c689e51f5e_1/06b6bcdd31763b70b20f56c689e51f5e_2.syd&file_encrypt=0&token=HUtGGG20GH4GAqq209R9tc9UGtAURR0b DEBUG 2018-11-26 09:31:40 com.sunyard.insurance.scheduler.job.DbEroorHandleJob 26 - [数据库操作异常处理JOB]处理异常文件,本机不运行,退出任务!")
    logger.debug("DEBUG 2018-11-26 09:31:35 com.sunyard.insurance.server.GetImage 43 - 资源请求:date=20181126&file_name=/imagedata/imv2/pool1/images/GXTB/2017/11/14/57/06b6bcdd31763b70b20f56c689e51f5e_1/06b6bcdd31763b70b20f56c689e51f5e_2.syd&file_encrypt=0&token=HUtGGG20GH4GAqq209R9tc9UGtAURR0b DEBUG 2018-11-26 09:31:40 com.sunyard.insurance.scheduler.job.DbEroorHandleJob 26 - [数据库操作异常处理JOB]处理异常文件,本机不运行,退出任务!")
    logger.debug("DEBUG 2018-11-26 09:31:35 com.sunyard.insurance.server.GetImage 43 - 资源请求:date=20181126&file_name=/imagedata/imv2/pool1/images/GXTB/2017/11/14/57/06b6bcdd31763b70b20f56c689e51f5e_1/06b6bcdd31763b70b20f56c689e51f5e_2.syd&file_encrypt=0&token=HUtGGG20GH4GAqq209R9tc9UGtAURR0b DEBUG 2018-11-26 09:31:40 com.sunyard.insurance.scheduler.job.DbEroorHandleJob 26 - [数据库操作异常处理JOB]处理异常文件,本机不运行,退出任务!")
    logger.debug("DEBUG 2018-11-26 09:31:35 com.sunyard.insurance.server.GetImage 43 - 资源请求:date=20181126&file_name=/imagedata/imv2/pool1/images/GXTB/2017/11/14/57/06b6bcdd31763b70b20f56c689e51f5e_1/06b6bcdd31763b70b20f56c689e51f5e_2.syd&file_encrypt=0&token=HUtGGG20GH4GAqq209R9tc9UGtAURR0b DEBUG 2018-11-26 09:31:40 com.sunyard.insurance.scheduler.job.DbEroorHandleJob 26 - [数据库操作异常处理JOB]处理异常文件,本机不运行,退出任务!!@#!@#!@#!@#!@#!@#!@#!@#!@#!@#!@#!#@!!!@##########################################################################################################################################################")
    time.sleep(0.03)

如何观察filebeat的内存

在6.3版本之前,我们是无法通过xpack的monitoring功能来观察beats套件的性能的。因此,这里讨论的是没有monitoring时,我们如何去检测filebeat的性能。当然,简单的方法是通过top,ps等操作系统的命令进行查看,但这些都是实时的,无法做趋势的观察,并且都是进程级别的,无法看到filebeat内部的真是情况。因此,这里介绍如何通过filebeat的日志和pprof这个工具来观察内存的使用情况

通过filebeat的日志

filebeat文件解读

其实filebeat的日志,已经包含了很多参数用于实时观测filebeat的资源使用情况,以下是filebeat的一个日志片段(这里的日志片段是6.0版本的,6.3版本之后,整个日志格式变了,从kv格式变成了json对象格式,xpack可以直接通过日志进行filebeat的monitoring):

2018-11-02T17:40:01+08:00 INFO Non-zero metrics in the last 30s: beat.memstats.gc_next=623475680 beat.memstats.memory_alloc=391032232 beat.memstats.memory_total=155885103371024 filebeat.events.active=-402 filebeat.events.added=13279 filebeat.events.done=13681 filebeat.harvester.closed=1 filebeat.harvester.open_files=7 filebeat.harvester.running=7 filebeat.harvester.started=2 libbeat.config.module.running=0 libbeat.output.events.acked=13677 libbeat.output.events.batches=28 libbeat.output.events.total=13677 libbeat.outputs.kafka.bytes_read=12112 libbeat.outputs.kafka.bytes_write=1043381 libbeat.pipeline.clients=1 libbeat.pipeline.events.active=0 libbeat.pipeline.events.filtered=4 libbeat.pipeline.events.published=13275 libbeat.pipeline.events.total=13279 libbeat.pipeline.queue.acked=13677 registrar.states.cleanup=1 registrar.states.current=8 registrar.states.update=13681 registrar.writes=28

里面的参数主要分成三个部分:

  • beat.*,包含memstats.gc_next,memstats.memory_alloc,memstats.memory_total,这个是所有beat组件都有的指标,是filebeat继承来的,主要是内存相关的,我们这里特别关注memstats.memory_alloc,alloc的越多,占用内存越大
  • filebeat.*,这部分是filebeat特有的指标,通过event相关的指标,我们知道吞吐,通过harvester,我们知道正在监控多少个文件,未消费event堆积的越多,havester创建的越多,消耗内存越大
  • libbeat.*,也是beats组件通用的指标,包含outputs和pipeline等信息。这里要主要当outputs发生阻塞的时候,会直接影响queue里面event的消费,造成内存堆积
  • registrar,filebeat将监控文件的状态放在registry文件里面,当监控文件非常多的时候,比如10万个,而且没有合理的设置close_inactive参数,这个文件能达到100M,载入内存后,直接占用内存

filebeat日志解析

当然,我们不可能直接去读这个日志,既然我们使用ELK,肯定是用ELK进行解读。因为是kv格式,很方便,用logstash的kv plugin:

filter {
  kv {}
}

kv无法指定properties的type,所以,我们需要稍微指定了一下索引的模版:

PUT _template/template_1
{
  "index_patterns": ["filebeat*"],
  "settings": {
    "number_of_shards": 1
  },
  "mappings": {
    "doc": {
      "_source": {
        "enabled": false
      },
      "dynamic_templates": [
        {
          "longs_as_strings": {
            "match_mapping_type": "string",
            "path_match":   "*beat.*",
            "path_unmatch": "*.*name",
            "mapping": {
              "type": "long"
            }
          }
        }
      ]
    }
  }
}

上面的模版,将kv解析出的properties都mapping到long类型,但注意"path_match": "*beat.*"无法match到registrar的指标,读者可以自己写一个更完善的mapping。 这样,我们就可以通过kibana可视化组件,清楚的看到内存泄漏的过程

20181127114253940.png

以及资源的使用情况:

20181127114342557.png

将信息可视化之后,我们可以明显的发现,内存的突变和ERR是同时发生的

20181127114536608.png

即以下error: 2018-11-27T09:05:44+08:00 ERR Harvester could not be started on new file: /qhapp/logs/bd-etl/logs/test2.log, Err: Error setting up harvester: Harvester setup failed. Unexpected file opening error: file info is not identical with opened file. Aborting harvesting and retrying file later again

会导致filebeat突然申请了额外的内存。具体请查看issue

通过pprof

众所周知,filebeat是用go语言实现的,而go语言本身的基础库里面就包含pprof这个功能极其强大的性能分析工具,只是这个工具是用于debug的,在正常模式下,filebeat是不会启动这个选贤的,并且很遗憾,在官方文档里面根本没有提及我们可以使用pprof来观测filebeat。我们接下来可以通过6.3上修复的一个内存泄漏的issue,来学习怎么使用pprof进行分析

启动pprof监测

首先,需要让filebeat在启动的时候运行pprof,具体的做法是在启动是加上参数-httpprof localhost:6060,即/usr/share/filebeat/bin/filebeat -c /etc/filebeat/filebeat.yml -path.home /usr/share/filebeat -path.config /etc/filebeat -path.data /var/lib/filebeat -path.logs /var/log/filebeat -httpprof localhost:6060。这里只绑定了localhost,无法通过远程访问,如果想远程访问,应该使用0.0.0.0。 这时,你就可以通过curl http://localhost:6060/debug/pprof/heap > profile.txt等命令,获取filebeat的实时堆栈信息了。

远程连接

当然,你也可以通过在你的本地电脑上安装go,然后通过go tool远程连接pprof。 因为我们是需要研究内存的问题,所以以下连接访问的是/heap子路径 go tool pprof http://10.60.x.x:6060/debug/pprof/heap

top 命令

连接之后,你可以通过top命令,查看消耗内存最多的几个实例:

33159.58kB of 33159.58kB total (  100%)
Dropped 308 nodes (cum <= 165.80kB)
Showing top 10 nodes out of 51 (cum >= 512.04kB)
      flat  flat%   sum%        cum   cum%
19975.92kB 60.24% 60.24% 19975.92kB 60.24%  runtime.malg
 7680.66kB 23.16% 83.40%  7680.66kB 23.16%  github.com/elastic/beats/filebeat/channel.SubOutlet
 2048.19kB  6.18% 89.58%  2048.19kB  6.18%  github.com/elastic/beats/filebeat/prospector/log.NewHarvester
 1357.91kB  4.10% 93.68%  1357.91kB  4.10%  runtime.allgadd
 1024.08kB  3.09% 96.76%  1024.08kB  3.09%  runtime.acquireSudog
  544.67kB  1.64% 98.41%   544.67kB  1.64%  github.com/elastic/beats/libbeat/publisher/queue/memqueue.NewBroker
  528.17kB  1.59%   100%   528.17kB  1.59%  regexp.(*bitState).reset
         0     0%   100%   528.17kB  1.59%  github.com/elastic/beats/filebeat/beater.(*Filebeat).Run
         0     0%   100%   512.04kB  1.54%  github.com/elastic/beats/filebeat/channel.CloseOnSignal.func1
         0     0%   100%   512.04kB  1.54%  github.com/elastic/beats/filebeat/channel.SubOutlet.func1

查看堆栈调用图

输入web命令,会生产堆栈调用关系的svg图,在这个svg图中,你可以结合top命令一起查看,在top中,我们已经知道github.com/elastic/beats/filebeat/channel.SubOutlet占用了很多的内存,在图中,展现的是调用关系栈,你可以看到这个类是怎么被实例化的,并且在整个堆中,内存是怎么分布的。最直观的是,实例所处的长方形面积越大,代表占用的内存越多。:

20181126222514954.png

查看源码

通过list命令,可以迅速查看可以实例的问题源码,比如在之前的top10命令中,我们已经看到github.com/elastic/beats/filebeat/channel.SubOutlet这个类的实例占用了大量的内存,我们可以通过list做进一步的分析,看看这个类内部在哪个语句开始出现内存的占用:

(pprof) list SubOutlet
Total: 32.38MB
ROUTINE ======================== github.com/elastic/beats/filebeat/channel.SubOutlet in /home/jeremy/src/go/src/github.com/elastic/beats/filebeat/channel/util.go
    7.50MB     7.50MB (flat, cum) 23.16% of Total
         .          .     15:// SubOutlet create a sub-outlet, which can be closed individually, without closing the
         .          .     16:// underlying outlet.
         .          .     17:func SubOutlet(out Outleter) Outleter {
         .          .     18:   s := &subOutlet{
         .          .     19:       isOpen: atomic.MakeBool(true),
       1MB        1MB     20:       done:   make(chan struct{}),
       2MB        2MB     21:       ch:     make(chan *util.Data),
    4.50MB     4.50MB     22:       res:    make(chan bool, 1),
         .          .     23:   }
         .          .     24:
         .          .     25:   go func() {
         .          .     26:       for event := range s.ch {
         .          .     27:           s.res <- out.OnEvent(event) 

如何调优

其实调优的过程就是调整参数的过程,之前说过了,和内存相关的参数, max_message_bytes,queue.mem.events,queue.mem.flush.min_events,以及队列占用内存的公式:max_message_bytes * queue.mem.events

output.kafka:
  enabled: true
#  max_message_bytes: 1000000
  hosts: ["10.60.x.x:9092"]
  topic: '%{[fields][topic]}'
max_procs: 1 
#queue.mem.events: 256
#queue.mem.flush.min_events: 128

但其实,不同的环境下,不同的原因都可能会造成filebeat占用的内存过大,此时,需要仔细的确认你的上下文环境:

  • 是否因为通配符的原因,造成同时监控数量巨大的文件,这种情况应该避免用通配符监控无用的文件。
  • 是否文件的单行内容巨大,确定是否需要改造文件内容,或者将其过滤
  • 是否过多的匹配了multiline的pattern,并且多行的event是否单条体积过大。这时,就需要暂时关闭multiline,修改文件内容或者multiline的pattern。
  • 是否output经常阻塞,event queue里面总是一直缓存event。这时要检查你的网络环境或者消息队列等中间件是否正常
收起阅读 »

mysql协议解析扩展

elastic/beats项目中支持mysql协议的解析,但实际使用过程中发现不支持预编译和压缩通信协议的解析,所以扩展了预编译SQL和压缩通信协议的支持,目前已稳定运行在生产环境,所有SQL都能完美解析,已提交PR,有相同需求的同学可参考。
继续阅读 »
elastic/beats项目中支持mysql协议的解析,但实际使用过程中发现不支持预编译和压缩通信协议的解析,所以扩展了预编译SQL和压缩通信协议的支持,目前已稳定运行在生产环境,所有SQL都能完美解析,已提交PR,有相同需求的同学可参考。 收起阅读 »

Filebeat使用的若干问题

1、日志文件是json格式(对应ES的索引的字段),如果输出到ES,那么对应ES的格式数据(mapping)应该怎么定义,放到哪个目录,怎么引用。
 
2、为了方便我每条日志生成一个文件,这样文件多了会不会影响Filebeat的性能。
 
3、接第2问怎么配置可以“读取并输出成功”就删除该日志文件。
继续阅读 »
1、日志文件是json格式(对应ES的索引的字段),如果输出到ES,那么对应ES的格式数据(mapping)应该怎么定义,放到哪个目录,怎么引用。
 
2、为了方便我每条日志生成一个文件,这样文件多了会不会影响Filebeat的性能。
 
3、接第2问怎么配置可以“读取并输出成功”就删除该日志文件。 收起阅读 »

Filebeat 配置「SSL」證書加密 出現錯誤 ... ERR Failed to publish events

各位朋友大家好:
 
進行配置「Filebeat」證書加密 出現錯誤如下,有哪位朋友遇過此問題可以幫幫忙!
 
ERR Failed to publish events caused by: read tcp 192.168.1.57:56182->192.168.1.249:5043: wsarecv: An existing connection was forcibly closed by the remote host.
 
使用過「telnet IP Port」測試「ELK」服務器,確認通訊協議 OK !
 
Filebeat 配置如檔如下:  (Windows 環境)
filebeat.prospectors:
- input_type: log #輸入 type「log」
paths:
- D:\Wireshark_Log\* #指定推送日誌「Log」文件

output.logstash:
hosts: ["192.168.1.249:5043"] #指定接收Logstash
tls:
certificate_authorities:
- C:\filebeat-5.5.0-windows-x86_64\ssl\logstash\192.168.1.249.crt
ssl.certificate:
- C:\filebeat-5.5.0-windows-x86_64\ssl\filebeat\192.168.1.57.crt
ssl.certificate:
- C:\filebeat-5.5.0-windows-x86_64\ssl\filebeat\192.168.1.57.key

以下是「FileBeat」錯誤 日誌
2017-09-08T14:14:57+08:00 ERR Failed to publish events caused by: read tcp 192.168.1.57:56202->192.168.1.249:5043: wsarecv: An existing connection was forcibly closed by the remote host.
2017-09-08T14:14:57+08:00 INFO Error publishing events (retrying): read tcp 192.168.1.57:56202->192.168.1.249:5043: wsarecv: An existing connection was forcibly closed by the remote host.
2017-09-08T14:15:19+08:00 INFO Non-zero metrics in the last 30s: filebeat.harvester.closed=1 filebeat.harvester.open_files=-1 filebeat.harvester.running=-1 libbeat.logstash.call_count.PublishEvents=1 libbeat.logstash.publish.read_errors=1 libbeat.logstash.publish.write_bytes=323 libbeat.logstash.published_but_not_acked_events=5
2017-09-08T14:15:49+08:00 INFO No non-zero metrics in the last 30s

 2017.09.08 感謝 medcl 兄弟幫忙,再次修改如下:
filebeat.prospectors:
- input_type: log #輸入 type「log」
paths:
- D:\Wireshark_Log\* #指定推送日誌「Log」文件

output.logstash:
hosts: ["192.168.1.249:5043"] #指定接收Logstash
ssl: # <=== 新版本貌似要改成「SSL」
certificate_authorities:
- C:\filebeat-5.5.0-windows-x86_64\ssl\logstash\192.168.1.249.crt
ssl.certificate:
- C:\filebeat-5.5.0-windows-x86_64\ssl\filebeat\192.168.1.57.crt
ssl.key: # <=== 修正為「ssl.key」
- C:\filebeat-5.5.0-windows-x86_64\ssl\filebeat\192.168.1.57.key

以下是「FileBeat」錯誤 日誌
2017-09-08T15:40:23+08:00 INFO Non-zero metrics in the last 30s: filebeat.harvester.open_files=1 filebeat.harvester.running=1 filebeat.harvester.started=1 libbeat.logstash.publish.read_bytes=5120 libbeat.logstash.publish.write_bytes=660 libbeat.publisher.published_events=20
2017-09-08T15:40:29+08:00 ERR Connecting error publishing events (retrying): x509: certificate is valid for 192.168.1.57, not 192.168.1.249
2017-09-08T15:40:53+08:00 INFO Non-zero metrics in the last 30s: libbeat.logstash.publish.read_bytes=1024 libbeat.logstash.publish.write_bytes=132
2017-09-08T15:41:01+08:00 ERR Connecting error publishing events (retrying): x509: certificate is valid for 192.168.1.57, not 192.168.1.249
2017-09-08T15:41:23+08:00 INFO Non-zero metrics in the last 30s: libbeat.logstash.publish.read_bytes=1024 libbeat.logstash.publish.write_bytes=132
2017-09-08T15:41:53+08:00 INFO No non-zero metrics in the last 30s
意思是說 
证书对 192.168.1.57 有效,而不是192.168.1.249 。 這裡有些不明白...
继续阅读 »
各位朋友大家好:
 
進行配置「Filebeat」證書加密 出現錯誤如下,有哪位朋友遇過此問題可以幫幫忙!
 
ERR Failed to publish events caused by: read tcp 192.168.1.57:56182->192.168.1.249:5043: wsarecv: An existing connection was forcibly closed by the remote host.
 
使用過「telnet IP Port」測試「ELK」服務器,確認通訊協議 OK !
 
Filebeat 配置如檔如下:  (Windows 環境)
filebeat.prospectors:
- input_type: log #輸入 type「log」
paths:
- D:\Wireshark_Log\* #指定推送日誌「Log」文件

output.logstash:
hosts: ["192.168.1.249:5043"] #指定接收Logstash
tls:
certificate_authorities:
- C:\filebeat-5.5.0-windows-x86_64\ssl\logstash\192.168.1.249.crt
ssl.certificate:
- C:\filebeat-5.5.0-windows-x86_64\ssl\filebeat\192.168.1.57.crt
ssl.certificate:
- C:\filebeat-5.5.0-windows-x86_64\ssl\filebeat\192.168.1.57.key

以下是「FileBeat」錯誤 日誌
2017-09-08T14:14:57+08:00 ERR Failed to publish events caused by: read tcp 192.168.1.57:56202->192.168.1.249:5043: wsarecv: An existing connection was forcibly closed by the remote host.
2017-09-08T14:14:57+08:00 INFO Error publishing events (retrying): read tcp 192.168.1.57:56202->192.168.1.249:5043: wsarecv: An existing connection was forcibly closed by the remote host.
2017-09-08T14:15:19+08:00 INFO Non-zero metrics in the last 30s: filebeat.harvester.closed=1 filebeat.harvester.open_files=-1 filebeat.harvester.running=-1 libbeat.logstash.call_count.PublishEvents=1 libbeat.logstash.publish.read_errors=1 libbeat.logstash.publish.write_bytes=323 libbeat.logstash.published_but_not_acked_events=5
2017-09-08T14:15:49+08:00 INFO No non-zero metrics in the last 30s

 2017.09.08 感謝 medcl 兄弟幫忙,再次修改如下:
filebeat.prospectors:
- input_type: log #輸入 type「log」
paths:
- D:\Wireshark_Log\* #指定推送日誌「Log」文件

output.logstash:
hosts: ["192.168.1.249:5043"] #指定接收Logstash
ssl: # <=== 新版本貌似要改成「SSL」
certificate_authorities:
- C:\filebeat-5.5.0-windows-x86_64\ssl\logstash\192.168.1.249.crt
ssl.certificate:
- C:\filebeat-5.5.0-windows-x86_64\ssl\filebeat\192.168.1.57.crt
ssl.key: # <=== 修正為「ssl.key」
- C:\filebeat-5.5.0-windows-x86_64\ssl\filebeat\192.168.1.57.key

以下是「FileBeat」錯誤 日誌
2017-09-08T15:40:23+08:00 INFO Non-zero metrics in the last 30s: filebeat.harvester.open_files=1 filebeat.harvester.running=1 filebeat.harvester.started=1 libbeat.logstash.publish.read_bytes=5120 libbeat.logstash.publish.write_bytes=660 libbeat.publisher.published_events=20
2017-09-08T15:40:29+08:00 ERR Connecting error publishing events (retrying): x509: certificate is valid for 192.168.1.57, not 192.168.1.249
2017-09-08T15:40:53+08:00 INFO Non-zero metrics in the last 30s: libbeat.logstash.publish.read_bytes=1024 libbeat.logstash.publish.write_bytes=132
2017-09-08T15:41:01+08:00 ERR Connecting error publishing events (retrying): x509: certificate is valid for 192.168.1.57, not 192.168.1.249
2017-09-08T15:41:23+08:00 INFO Non-zero metrics in the last 30s: libbeat.logstash.publish.read_bytes=1024 libbeat.logstash.publish.write_bytes=132
2017-09-08T15:41:53+08:00 INFO No non-zero metrics in the last 30s
意思是說 
证书对 192.168.1.57 有效,而不是192.168.1.249 。 這裡有些不明白... 收起阅读 »

packetbeat的oracle协议扩展

oracle由于是商用软件,协议并不公开,而且相比mysql等开源数据库软件,协议复杂度加了不止一个量级。
出于版权考虑,packetbeat并没有加入oracle协议的支持,只能自己动手。
好在beats充分考虑了扩展性,把公共的基础工作抽象成框架,新协议的扩展只需要专注于协议的分析和解码。
tns协议是oracle客户端和服务端通信协议,应用可以通过OCI、JDBC等接口去访问数据库。
tns协议有多个版本,不同版本之间差异也比较大,11g是主流tns版本为314。
目前完成308、310、313、314、315版本的解析
 
packetbeat支持pcap、pf_ring等抓包方式,通过kafka+es+kibana展示,效果如图
 
继续阅读 »
oracle由于是商用软件,协议并不公开,而且相比mysql等开源数据库软件,协议复杂度加了不止一个量级。
出于版权考虑,packetbeat并没有加入oracle协议的支持,只能自己动手。
好在beats充分考虑了扩展性,把公共的基础工作抽象成框架,新协议的扩展只需要专注于协议的分析和解码。
tns协议是oracle客户端和服务端通信协议,应用可以通过OCI、JDBC等接口去访问数据库。
tns协议有多个版本,不同版本之间差异也比较大,11g是主流tns版本为314。
目前完成308、310、313、314、315版本的解析
 
packetbeat支持pcap、pf_ring等抓包方式,通过kafka+es+kibana展示,效果如图
  收起阅读 »

使用 Elastic Stack 来监控和调优 Golang 应用程序

Golang 因为其语法简单,上手快且方便部署正被越来越多的开发者所青睐,一个 Golang 程序开发好了之后,势必要关心其运行情况,今天在这里就给大家介绍一下如果使用 Elastic Stack 来分析 Golang 程序的内存使用情况,方便对 Golang 程序做长期监控进而调优和诊断,甚至发现一些潜在的内存泄露等问题。
 
Elastic Stack 其实是一个集合,包含 Elasticsearch、Logstash 和 Beats 这几个开源软件,而 Beats 又包含 Filebeat、Packetbeat、Winlogbeat、Metricbeat 和新出的 Heartbeat,呵呵,有点多吧,恩,每个 beat 做的事情不一样,没关系,今天主要用到 Elasticsearch、Metricbeat 和 Kibana 就行了。
 
Metricbeat 是一个专门用来获取服务器或应用服务内部运行指标数据的收集程序,也是 Golang 写的,部署包比较小才10M 左右,对目标服务器的部署环境也没有依赖,内存资源占用和 CPU 开销也较小,目前除了可以监控服务器本身的资源使用情况外,还支持常见的应用服务器和服务,目前支持列表如下:
  • Apache Module
  • Couchbase Module
  • Docker Module
  • HAProxy Module
  • kafka Module
  • MongoDB Module
  • MySQL Module
  • Nginx Module
  • PostgreSQL Module
  • Prometheus Module
  • Redis Module
  • System Module
  • ZooKeeper Module

当然,也有可能你的应用不在上述列表,没关系,Metricbeat 是可以扩展的,你可以很方便的实现一个模块,而本文接下来所用的 Golang Module 也就是我刚刚为 Metricbeat 添加的扩展模块,目前已经 merge 进入 Metricbeat 的 master 分支,预计会在 6.0 版本发布,想了解是如何扩展这个模块的可以查看 代码路径 和 PR地址
 
上面的这些可能还不够吸引人,我们来看一下 Kibana 对 Metricbeat 使用 Golang 模块收集的数据进行的可视化分析吧:

df9c563e-f831-11e6-835c-183f3f9e5b94.png

 
上面的图简单解读一下:
最上面一栏是 Golang Heap 的摘要信息,可以大致了解 Golang 的内存使用和 GC 情况,System 表示 Golang 程序从操作系统申请的内存,可以理解为进程所占的内存(注意不是进程对应的虚拟内存),Bytes allocated 表示 Heap 目前分配的内存,也就是 Golang 里面直接可使用的内存,GC limit 表示当 Golang 的 Heap 内存分配达到这个 limit 值之后就会开始执行 GC,这个值会随着每次 GC 而变化, GC cycles 则代表监控周期内的 GC 次数;
 
中间的三列分别是堆内存、进程内存和对象的统计情况;Heap Allocated 表示正在用和没有用但还未被回收的对象的大小;Heap Inuse 显然就是活跃的对象大小了;Heap Idle 表示已分配但空闲的内存;

底下两列是 GC 时间和 GC 次数的监控统计,CPUFraction 这个代表该进程 CPU 占用时间花在 GC 上面的百分比,值越大说明 GC 越频繁,浪费更多的时间在 GC 上面,上图虽然趋势陡峭,但是看范围在0.41%~0.52%之间,看起来还算可以,如果GC 比率占到个位数甚至更多比例,那肯定需要进一步优化程序了。
 
有了这些信息我们就能够知道该 Golang 的内存使用和分配情况和 GC 的执行情况,假如要分析是否有内存泄露,看内存使用和堆内存分配的趋势是否平稳就可以了,另外 GC_Limit 和 Byte Allocation 一直上升,那肯定就是有内存泄露了,结合历史信息还能对不同版本/提交对 Golang 的内存使用和 GC 影响进行分析。

接下来就要给大家介绍如何具体使用了,首先需要启用 Golang 的 expvar 服务,expvar(https://golang.org/pkg/expvar/) 是 Golang 提供的一个暴露内部变量或统计信息的标准包。
使用的方法很简单,只需要在 Golang 的程序引入该包即可,它会自动注册现有的 http 服务上,如下:
import _ "expvar"
如果 Golang 没有启动 http 服务,使用下面的方式启动一个即可,这里端口是 6060,如下:
func metricsHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")

first := true
report := func(key string, value interface{}) {
if !first {
fmt.Fprintf(w, ",\n")
}
first = false
if str, ok := value.(string); ok {
fmt.Fprintf(w, "%q: %q", key, str)
} else {
fmt.Fprintf(w, "%q: %v", key, value)
}
}

fmt.Fprintf(w, "{\n")
expvar.Do(func(kv expvar.KeyValue) {
report(kv.Key, kv.Value)
})
fmt.Fprintf(w, "\n}\n")
}

func main() {
mux := http.NewServeMux()
mux.HandleFunc("/debug/vars", metricsHandler)
endpoint := http.ListenAndServe("localhost:6060", mux)
}
默认注册的访问路径是/debug/vars, 编译启动之后,就可以通过 http://localhost:6060/debug/vars  来访问 expvar 以 JSON 格式暴露出来的这些内部变量,默认提供了 Golang 的 runtime.Memstats 信息,也就是上面分析的数据源,当然你还可以注册自己的变量,这里暂时不提。
 
OK,现在我们的 Golang 程序已经启动了,并且通过 expvar 暴露出了运行时的内存使用情况,现在我们需要使用 Metricbeat 来获取这些信息并存进 Elasticsearch。
 
关于 Metricbeat 的安装其实很简单,下载对应平台的包解压(下载地址:https://www.elastic.co/downloads/beats/metricbeat ),启动 Metricbeat 前,修改配置文件:metricbeat.yml
metricbeat.modules:
- module: golang
metricsets: ["heap"]
enabled: true
period: 10s
hosts: ["localhost:6060"]
heap.path: "/debug/vars"
上面的参数启用了 Golang 监控模块,并且会10秒获取一次配置路径的返回内存数据,我们同样配置该配置文件,设置数据输出到本机的 Elasticsearch:
output.elasticsearch:
hosts: ["localhost:9200"]

现在启动 Metricbeat:
./metricbeat -e -v
现在在 Elasticsearch 应该就有数据了,当然记得确保 Elasticsearch 和 Kibana 是可用状态,你可以在 Kibana 根据数据灵活自定义可视化,推荐使用 Timelion 来进行分析,当然为了方便也可以直接导入提供的样例仪表板,就是上面第一个图的效果。
关于如何导入样例仪表板请参照这个文档:https://www.elastic.co/guide/e ... .html 
 
除了监控已经有的内存信息之外,如果你还有一些内部的业务指标想要暴露出来,也是可以的,通过 expvar 来做同样可以。一个简单的例子如下:
var inerInt int64 = 1024
pubInt := expvar.NewInt("your_metric_key")
pubInt.Set(inerInt)
pubInt.Add(2)
在 Metricbeat 内部也同样暴露了很多内部运行的信息,所以 Metricbeat 可以自己监控自己了。。。
首先,启动的时候带上参数设置pprof监控的地址,如下:
./metricbeat -httpprof="127.0.0.1:6060" -e -v
这样我们就能够通过 [url=http://127.0.0.1:6060/debug/vars]http://127.0.0.1:6060/debug/vars[/url] 访问到内部运行情况了,如下:
{
"output.events.acked": 1088,
"output.write.bytes": 1027455,
"output.write.errors": 0,
"output.messages.dropped": 0,
"output.elasticsearch.publishEvents.call.count": 24,
"output.elasticsearch.read.bytes": 12215,
"output.elasticsearch.read.errors": 0,
"output.elasticsearch.write.bytes": 1027455,
"output.elasticsearch.write.errors": 0,
"output.elasticsearch.events.acked": 1088,
"output.elasticsearch.events.not_acked": 0,
"output.kafka.events.acked": 0,
"output.kafka.events.not_acked": 0,
"output.kafka.publishEvents.call.count": 0,
"output.logstash.write.errors": 0,
"output.logstash.write.bytes": 0,
"output.logstash.events.acked": 0,
"output.logstash.events.not_acked": 0,
"output.logstash.publishEvents.call.count": 0,
"output.logstash.read.bytes": 0,
"output.logstash.read.errors": 0,
"output.redis.events.acked": 0,
"output.redis.events.not_acked": 0,
"output.redis.read.bytes": 0,
"output.redis.read.errors": 0,
"output.redis.write.bytes": 0,
"output.redis.write.errors": 0,
"beat.memstats.memory_total": 155721720,
"beat.memstats.memory_alloc": 3632728,
"beat.memstats.gc_next": 6052800,
"cmdline": ["./metricbeat","-httpprof=127.0.0.1:6060","-e","-v"],
"fetches": {"system-cpu": {"events": 4, "failures": 0, "success": 4}, "system-filesystem": {"events": 20, "failures": 0, "success": 4}, "system-fsstat": {"events": 4, "failures": 0, "success": 4}, "system-load": {"events": 4, "failures": 0, "success": 4}, "system-memory": {"events": 4, "failures": 0, "success": 4}, "system-network": {"events": 44, "failures": 0, "success": 4}, "system-process": {"events": 1008, "failures": 0, "success": 4}},
"libbeat.config.module.running": 0,
"libbeat.config.module.starts": 0,
"libbeat.config.module.stops": 0,
"libbeat.config.reloads": 0,
"memstats": {"Alloc":3637704,"TotalAlloc":155
... ...
比如,上面就能看到output模块Elasticsearch的处理情况,如 output.elasticsearch.events.acked 参数表示发送到 Elasticsearch Ack返回之后的消息。
 
现在我们要修改 Metricbeat 的配置文件,Golang 模块有两个 metricset,可以理解为两个监控的指标类型,我们现在需要加入一个新的 expvar 类型,这个即自定义的其他指标,相应配置文件修改如下:
- module: golang
metricsets: ["heap","expvar"]
enabled: true
period: 1s
hosts: ["localhost:6060"]
heap.path: "/debug/vars"
expvar:
namespace: "metricbeat"
path: "/debug/vars"
上面的一个参数 namespace 表示自定义指标的一个命令空间,主要是为了方便管理,这里是 Metricbeat 自身的信息,所以 namespace 就是 metricbeat。
 
重启 Metricbeat 应该就能收到新的数据了,我们前往 Kibana。
 
这里假设关注 output.elasticsearch.events.acked和
output.elasticsearch.events.not_acked这两个指标,我们在Kibana里面简单定义一个曲线图就能看到 Metricbeat 发往 Elasticsearch 消息的成功和失败趋势。
Timelion 表达式:
.es("metricbeat*",metric="max:golang.metricbeat.output.elasticsearch.events.acked").derivative().label("Elasticsearch Success"),.es("metricbeat*",metric="max:golang.metricbeat.output.elasticsearch.events.not_acked").derivative().label("Elasticsearch Failed")
效果如下:

Snip20170304_9.png

从上图可以看到,发往 Elasticsearch 的消息很稳定,没有出现丢消息的情况,同时关于 Metricbeat 的内存情况,我们打开导入的 Dashboard 查看:

Snip20170304_10.png


关于如何使用 Metricbeat 来监控 Golang 应用程序的内容基本上就差不多到这里了,上面介绍了如何基于 expvar 来监控 Golang 的内存情况和自定义业务监控指标,在结合 Elastic Stack 可以快速的进行分析,希望对大家有用。

最后,这个 Golang 模块目前还没 release,估计在 beats 6.0 发布,有兴趣尝鲜的可以自己下载源码打包。
继续阅读 »
Golang 因为其语法简单,上手快且方便部署正被越来越多的开发者所青睐,一个 Golang 程序开发好了之后,势必要关心其运行情况,今天在这里就给大家介绍一下如果使用 Elastic Stack 来分析 Golang 程序的内存使用情况,方便对 Golang 程序做长期监控进而调优和诊断,甚至发现一些潜在的内存泄露等问题。
 
Elastic Stack 其实是一个集合,包含 Elasticsearch、Logstash 和 Beats 这几个开源软件,而 Beats 又包含 Filebeat、Packetbeat、Winlogbeat、Metricbeat 和新出的 Heartbeat,呵呵,有点多吧,恩,每个 beat 做的事情不一样,没关系,今天主要用到 Elasticsearch、Metricbeat 和 Kibana 就行了。
 
Metricbeat 是一个专门用来获取服务器或应用服务内部运行指标数据的收集程序,也是 Golang 写的,部署包比较小才10M 左右,对目标服务器的部署环境也没有依赖,内存资源占用和 CPU 开销也较小,目前除了可以监控服务器本身的资源使用情况外,还支持常见的应用服务器和服务,目前支持列表如下:
  • Apache Module
  • Couchbase Module
  • Docker Module
  • HAProxy Module
  • kafka Module
  • MongoDB Module
  • MySQL Module
  • Nginx Module
  • PostgreSQL Module
  • Prometheus Module
  • Redis Module
  • System Module
  • ZooKeeper Module

当然,也有可能你的应用不在上述列表,没关系,Metricbeat 是可以扩展的,你可以很方便的实现一个模块,而本文接下来所用的 Golang Module 也就是我刚刚为 Metricbeat 添加的扩展模块,目前已经 merge 进入 Metricbeat 的 master 分支,预计会在 6.0 版本发布,想了解是如何扩展这个模块的可以查看 代码路径 和 PR地址
 
上面的这些可能还不够吸引人,我们来看一下 Kibana 对 Metricbeat 使用 Golang 模块收集的数据进行的可视化分析吧:

df9c563e-f831-11e6-835c-183f3f9e5b94.png

 
上面的图简单解读一下:
最上面一栏是 Golang Heap 的摘要信息,可以大致了解 Golang 的内存使用和 GC 情况,System 表示 Golang 程序从操作系统申请的内存,可以理解为进程所占的内存(注意不是进程对应的虚拟内存),Bytes allocated 表示 Heap 目前分配的内存,也就是 Golang 里面直接可使用的内存,GC limit 表示当 Golang 的 Heap 内存分配达到这个 limit 值之后就会开始执行 GC,这个值会随着每次 GC 而变化, GC cycles 则代表监控周期内的 GC 次数;
 
中间的三列分别是堆内存、进程内存和对象的统计情况;Heap Allocated 表示正在用和没有用但还未被回收的对象的大小;Heap Inuse 显然就是活跃的对象大小了;Heap Idle 表示已分配但空闲的内存;

底下两列是 GC 时间和 GC 次数的监控统计,CPUFraction 这个代表该进程 CPU 占用时间花在 GC 上面的百分比,值越大说明 GC 越频繁,浪费更多的时间在 GC 上面,上图虽然趋势陡峭,但是看范围在0.41%~0.52%之间,看起来还算可以,如果GC 比率占到个位数甚至更多比例,那肯定需要进一步优化程序了。
 
有了这些信息我们就能够知道该 Golang 的内存使用和分配情况和 GC 的执行情况,假如要分析是否有内存泄露,看内存使用和堆内存分配的趋势是否平稳就可以了,另外 GC_Limit 和 Byte Allocation 一直上升,那肯定就是有内存泄露了,结合历史信息还能对不同版本/提交对 Golang 的内存使用和 GC 影响进行分析。

接下来就要给大家介绍如何具体使用了,首先需要启用 Golang 的 expvar 服务,expvar(https://golang.org/pkg/expvar/) 是 Golang 提供的一个暴露内部变量或统计信息的标准包。
使用的方法很简单,只需要在 Golang 的程序引入该包即可,它会自动注册现有的 http 服务上,如下:
import _ "expvar"
如果 Golang 没有启动 http 服务,使用下面的方式启动一个即可,这里端口是 6060,如下:
func metricsHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")

first := true
report := func(key string, value interface{}) {
if !first {
fmt.Fprintf(w, ",\n")
}
first = false
if str, ok := value.(string); ok {
fmt.Fprintf(w, "%q: %q", key, str)
} else {
fmt.Fprintf(w, "%q: %v", key, value)
}
}

fmt.Fprintf(w, "{\n")
expvar.Do(func(kv expvar.KeyValue) {
report(kv.Key, kv.Value)
})
fmt.Fprintf(w, "\n}\n")
}

func main() {
mux := http.NewServeMux()
mux.HandleFunc("/debug/vars", metricsHandler)
endpoint := http.ListenAndServe("localhost:6060", mux)
}
默认注册的访问路径是/debug/vars, 编译启动之后,就可以通过 http://localhost:6060/debug/vars  来访问 expvar 以 JSON 格式暴露出来的这些内部变量,默认提供了 Golang 的 runtime.Memstats 信息,也就是上面分析的数据源,当然你还可以注册自己的变量,这里暂时不提。
 
OK,现在我们的 Golang 程序已经启动了,并且通过 expvar 暴露出了运行时的内存使用情况,现在我们需要使用 Metricbeat 来获取这些信息并存进 Elasticsearch。
 
关于 Metricbeat 的安装其实很简单,下载对应平台的包解压(下载地址:https://www.elastic.co/downloads/beats/metricbeat ),启动 Metricbeat 前,修改配置文件:metricbeat.yml
metricbeat.modules:
- module: golang
metricsets: ["heap"]
enabled: true
period: 10s
hosts: ["localhost:6060"]
heap.path: "/debug/vars"
上面的参数启用了 Golang 监控模块,并且会10秒获取一次配置路径的返回内存数据,我们同样配置该配置文件,设置数据输出到本机的 Elasticsearch:
output.elasticsearch:
hosts: ["localhost:9200"]

现在启动 Metricbeat:
./metricbeat -e -v
现在在 Elasticsearch 应该就有数据了,当然记得确保 Elasticsearch 和 Kibana 是可用状态,你可以在 Kibana 根据数据灵活自定义可视化,推荐使用 Timelion 来进行分析,当然为了方便也可以直接导入提供的样例仪表板,就是上面第一个图的效果。
关于如何导入样例仪表板请参照这个文档:https://www.elastic.co/guide/e ... .html 
 
除了监控已经有的内存信息之外,如果你还有一些内部的业务指标想要暴露出来,也是可以的,通过 expvar 来做同样可以。一个简单的例子如下:
var inerInt int64 = 1024
pubInt := expvar.NewInt("your_metric_key")
pubInt.Set(inerInt)
pubInt.Add(2)
在 Metricbeat 内部也同样暴露了很多内部运行的信息,所以 Metricbeat 可以自己监控自己了。。。
首先,启动的时候带上参数设置pprof监控的地址,如下:
./metricbeat -httpprof="127.0.0.1:6060" -e -v
这样我们就能够通过 [url=http://127.0.0.1:6060/debug/vars]http://127.0.0.1:6060/debug/vars[/url] 访问到内部运行情况了,如下:
{
"output.events.acked": 1088,
"output.write.bytes": 1027455,
"output.write.errors": 0,
"output.messages.dropped": 0,
"output.elasticsearch.publishEvents.call.count": 24,
"output.elasticsearch.read.bytes": 12215,
"output.elasticsearch.read.errors": 0,
"output.elasticsearch.write.bytes": 1027455,
"output.elasticsearch.write.errors": 0,
"output.elasticsearch.events.acked": 1088,
"output.elasticsearch.events.not_acked": 0,
"output.kafka.events.acked": 0,
"output.kafka.events.not_acked": 0,
"output.kafka.publishEvents.call.count": 0,
"output.logstash.write.errors": 0,
"output.logstash.write.bytes": 0,
"output.logstash.events.acked": 0,
"output.logstash.events.not_acked": 0,
"output.logstash.publishEvents.call.count": 0,
"output.logstash.read.bytes": 0,
"output.logstash.read.errors": 0,
"output.redis.events.acked": 0,
"output.redis.events.not_acked": 0,
"output.redis.read.bytes": 0,
"output.redis.read.errors": 0,
"output.redis.write.bytes": 0,
"output.redis.write.errors": 0,
"beat.memstats.memory_total": 155721720,
"beat.memstats.memory_alloc": 3632728,
"beat.memstats.gc_next": 6052800,
"cmdline": ["./metricbeat","-httpprof=127.0.0.1:6060","-e","-v"],
"fetches": {"system-cpu": {"events": 4, "failures": 0, "success": 4}, "system-filesystem": {"events": 20, "failures": 0, "success": 4}, "system-fsstat": {"events": 4, "failures": 0, "success": 4}, "system-load": {"events": 4, "failures": 0, "success": 4}, "system-memory": {"events": 4, "failures": 0, "success": 4}, "system-network": {"events": 44, "failures": 0, "success": 4}, "system-process": {"events": 1008, "failures": 0, "success": 4}},
"libbeat.config.module.running": 0,
"libbeat.config.module.starts": 0,
"libbeat.config.module.stops": 0,
"libbeat.config.reloads": 0,
"memstats": {"Alloc":3637704,"TotalAlloc":155
... ...
比如,上面就能看到output模块Elasticsearch的处理情况,如 output.elasticsearch.events.acked 参数表示发送到 Elasticsearch Ack返回之后的消息。
 
现在我们要修改 Metricbeat 的配置文件,Golang 模块有两个 metricset,可以理解为两个监控的指标类型,我们现在需要加入一个新的 expvar 类型,这个即自定义的其他指标,相应配置文件修改如下:
- module: golang
metricsets: ["heap","expvar"]
enabled: true
period: 1s
hosts: ["localhost:6060"]
heap.path: "/debug/vars"
expvar:
namespace: "metricbeat"
path: "/debug/vars"
上面的一个参数 namespace 表示自定义指标的一个命令空间,主要是为了方便管理,这里是 Metricbeat 自身的信息,所以 namespace 就是 metricbeat。
 
重启 Metricbeat 应该就能收到新的数据了,我们前往 Kibana。
 
这里假设关注 output.elasticsearch.events.acked和
output.elasticsearch.events.not_acked这两个指标,我们在Kibana里面简单定义一个曲线图就能看到 Metricbeat 发往 Elasticsearch 消息的成功和失败趋势。
Timelion 表达式:
.es("metricbeat*",metric="max:golang.metricbeat.output.elasticsearch.events.acked").derivative().label("Elasticsearch Success"),.es("metricbeat*",metric="max:golang.metricbeat.output.elasticsearch.events.not_acked").derivative().label("Elasticsearch Failed")
效果如下:

Snip20170304_9.png

从上图可以看到,发往 Elasticsearch 的消息很稳定,没有出现丢消息的情况,同时关于 Metricbeat 的内存情况,我们打开导入的 Dashboard 查看:

Snip20170304_10.png


关于如何使用 Metricbeat 来监控 Golang 应用程序的内容基本上就差不多到这里了,上面介绍了如何基于 expvar 来监控 Golang 的内存情况和自定义业务监控指标,在结合 Elastic Stack 可以快速的进行分析,希望对大家有用。

最后,这个 Golang 模块目前还没 release,估计在 beats 6.0 发布,有兴趣尝鲜的可以自己下载源码打包。 收起阅读 »

Packetbeat的Cassandra协议扩展

论坛有多少人在用Cassandra的啊?弄了一个Cassandra的协议,有在用的Cassandra么?帮忙测试一下,看看有没有bug,
欢迎反馈。
 
https://github.com/elastic/beats/pull/1959
 
继续阅读 »
论坛有多少人在用Cassandra的啊?弄了一个Cassandra的协议,有在用的Cassandra么?帮忙测试一下,看看有没有bug,
欢迎反馈。
 
https://github.com/elastic/beats/pull/1959
  收起阅读 »

Packetbeat协议扩展开发教程(3)

 书接上回:http://elasticsearch.cn/article/53
 
前面介绍了Packetbeat的项目结构,今天终于要开始写代码了,想想还是有点小激动呢。(你快点吧,拖半天了)
网络传输两大协议TCP和UDP,我们的所有协议都不离这两种,HTTP、MySQL走的是TCP传输协议,DNS走的是UDP协议,在Packetbeat里面,实现一个自己的协议非常简单,继承并实现这两者对应的接口就行了,我们看一下长什么样:
打开一个现有的UDP和HTTP协议接口定义:
/~/go/src/github.com/elastic/beats/packetbeat/protos/protos.go
// Functions to be exported by a protocol plugin
type ProtocolPlugin interface {
// Called to initialize the Plugin
Init(test_mode bool, results publisher.Client) error

// Called to return the configured ports
GetPorts() int
}

type TcpProtocolPlugin interface {
ProtocolPlugin

// Called when TCP payload data is available for parsing.
Parse(pkt *Packet, tcptuple *common.TcpTuple,
dir uint8, private ProtocolData) ProtocolData

// Called when the FIN flag is seen in the TCP stream.
ReceivedFin(tcptuple *common.TcpTuple, dir uint8,
private ProtocolData) ProtocolData

// Called when a packets are missing from the tcp
// stream.
GapInStream(tcptuple *common.TcpTuple, dir uint8, nbytes int,
private ProtocolData) (priv ProtocolData, drop bool)

// ConnectionTimeout returns the per stream connection timeout.
// Return <=0 to set default tcp module transaction timeout.
ConnectionTimeout() time.Duration
}

type UdpProtocolPlugin interface {
ProtocolPlugin

// ParseUdp is invoked when UDP payload data is available for parsing.
ParseUdp(pkt *Packet)
}
TcpProtocolPlugin:TCP协议插件的接口定义,依次是:Parse() 解析Packet,ReceivedFin()处理TCP断开连接,GapInStream()处理空包丢包,ConnectionTimeout()超时时间;
UdpProtocolPlugin: UDP协议的接口定义,UDP协议是不需要握手和保障数据可靠性的,扔出去就结束,速度快,不保证数据可靠送达,所以只有ParseUdp一个方法需要实现,比较简单;
ProtocolPlugin:TCP和UDP都需要实现ProtocolPlugin的基础接口,其实就定义了获取端口和初始化接口。

请问:
Packetbeat怎么工作的?

回答:
每一个协议都有一个固定的端口用于通信,你要做的事情就是定义协议端口,然后按协议是TCP还是UDP来实现对应的接口,Packetbeat将会截获指定端口的数据包(Packet),然后如果交给你定义的方法来进行解析,TCP是Parse,UDP是ParseUdp,都在上面的接口定义好的,然后将解析出来的结构化数据封装成Json,然后扔给Elasticsearch,后续的就的如何对这些数据做一些有趣的分析和应用了。

貌似很简单嘛!

进入每个端口的数据包,我们假设是一个自来水管,拧开80端口,哗啦啦出来的全是HTTP请求的数据包,Packetbeat里面Http协议监听的是80端口啊,所有这些包统统都交给Packetbeat里面的Http协议模块来进行解析,Http协议会一个个的检查这些数据包,也就是每个数据包都会调用一次Parse接口,到这里提到了传过来一个Packet,我们看看它的数据结构长什么样?
type Packet struct {
Ts time.Time
Tuple common.IpPortTuple
Payload byte
}
Packet结构简单,
Ts是收到数据包的时间戳;
Tuple是一个来源IP+来源端口和目的IP+目的端口的元组;
Payload就是这个包里面的传输的有用的数据,应用层的字节数据,不包括IP和TCP/UDP头信息,是不是处理起来简单许多。

首选我们确定SMTP协议的配置,每个协议在packetbeat.yml的protocol下面都应该有一个配置节点,如下:
protocols:
smtp:
# Configure the ports where to listen for Smtp traffic. You can disable
# the Smtp protocol by commenting out the list of ports.
ports: [25]
还需要在对应的config类文件:packetbeat/config/config.go,增加SMTP的结构体,目前只支持一个端口参数,继承基类ProtocolCommon就行,如下:
git diff config/config.go
@@ -42,6 +42,7 @@ type Protocols struct {
Pgsql Pgsql
Redis Redis
Thrift Thrift
+ Smtp Smtp
}

type Dns struct {
@@ -118,5 +119,9 @@ type Redis struct {
Send_response *bool
}

+type Smtp struct {
+ ProtocolCommon `yaml:",inline"`
+}
+
// Config Singleton
var ConfigSingleton Config
在protos文件夹下面,新增smtp目录,并新增空白文件smtp.go,路径:packetbeat/protos/smtp/smtp.go,
这里就是解析SMTP协议的地方,也是我们扩展协议的主要的工作。
...TODO...
修改protos/protos.go,增加SMTP协议枚举,这里记得保证顺序一致,并且protocol名称必须和配置的节点名称一致,如这里都是smtp。
git diff protos/protos.go
@@ -103,6 +103,7 @@ const (
MongodbProtocol
DnsProtocol
MemcacheProtocol
+ SmtpProtocol
)

// Protocol names
@@ -116,6 +117,7 @@ var ProtocolNames = string{
"mongodb",
"dns",
"memcache",
+ "smtp",
}

继续修改packetbeat.go主文件,允许SMTP协议并加载。
git diff packetbeat.go
@@ -27,6 +27,7 @@ import (
"github.com/elastic/packetbeat/protos/tcp"
"github.com/elastic/packetbeat/protos/thrift"
"github.com/elastic/packetbeat/protos/udp"
+ "github.com/elastic/packetbeat/protos/smtp"
"github.com/elastic/packetbeat/sniffer"
)

@@ -43,6 +44,7 @@ var EnabledProtocolPlugins map[protos.Protocol]protos.ProtocolPlugin = map[proto
protos.ThriftProtocol: new(thrift.Thrift),
protos.MongodbProtocol: new(mongodb.Mongodb),
protos.DnsProtocol: new(dns.Dns),
+ protos.SmtpProtocol: new(smtp.Smtp),
}

做完上面一系列修改之后,一个空白的SMTP协议的插件的架子就搭好了,并且插件也注册到了Packetbeat里面了,接下来我们再把packetbeat/protos/smtp/smtp.go按照TCPplugin接口的要求实现一下。

说实话TCP处理起来很难,开始之前,我们先明确几个概念,TCP协议是有状态的,并且是流式的,我们关注的是七层应用层的消息,如HTTP里面的一个HTTP请求和返回,但是TCP底层都是一系列数据包,并且不同的请求的数据包是混杂在一起的,也就是说一个数据包里面可能只是一个HTTP请求的一部分也可能包含多条HTTP请求的一部分,所以Parse()里面需要处理跨数据包的状态信息,我们要把这些数据包和具体的七层的应用层的消息关联起来。

现在我们仔细看看Parse()接口的各个参数定义是做什么用的
Parse(pkt *Packet, tcptuple *common.TcpTuple,
dir uint8, private ProtocolData) ProtocolData

pkt不用说了,是送进来的数据包,前面已经介绍了其数据结构,tcptuple是该数据包所属的TCP数据流所在的唯一标示(一个未关闭的TCP数据量包含若干数据包,直到TCP链接关闭),使用tcptuple.Hashable()获取唯一值;dir参数标示数据包在TCP数据流中的流向,和第一个TCP数据包方向一致是TcpDirectionOriginal,否则是TcpDirectionReverse;private参数可用来在TCP流中存储状态信息,可在运行时转换成具体的强类型,任意修改和传递给下一个Parse方法,简单来说就是进行中间数据的共享。

下面看段MySQL模块里面的例子
 priv := mysqlPrivateData{}
if private != nil {
var ok bool
priv, ok = private.(mysqlPrivateData)
if !ok {
priv = mysqlPrivateData{}
}
}

[ ... ]

return priv
上面的代码就是将private强制转换成mysqlPrivateData结构,然后再使用。
我们再继续看后续怎么处理这些包的一个逻辑例子
ok, complete := mysqlMessageParser(priv.Data[dir])
if !ok {
// drop this tcp stream. Will retry parsing with the next
// segment in it
priv.Data[dir] = nil
logp.Debug("mysql", "Ignore MySQL message. Drop tcp stream.")
return priv
}

if complete {
mysql.messageComplete(tcptuple, dir, stream)
} else {
// wait for more data
break
}
mysqlMessageParser是一个解析mysql消息的方法,细节我们忽略,我们只需要关心它的返回,ok标示成功或者失败,true则继续处理,false表示数据包不能用,那就直接忽略;第二个参数complete表示判断这一个MySQL消息是否已经完整了,如果完整了,我们就可以扔出去了,否则继续等待剩下的消息内容。

好的,我们看看SMTP协议怎么折腾吧,先看看一个邮件交互的流程图,来自RFC5321

由上图可见,发送端和邮件服务器通过一系列命令来执行邮件的发送,下面看看一个具体的命令操作流程(来源:简单邮件传输协议)[/url]
S: 220 www.example.com ESMTP Postfix
C: HELO mydomain.com
S: 250 Hello mydomain.com
C: MAIL FROM:
S: 250 Ok
C: RCPT TO:
S: 250 Ok
C: DATA
S: 354 End data with .
C: Subject: test message
C: From:""< sender@mydomain.com>
C: To:""< friend@example.com>
C:
C: Hello,
C: This is a test.
C: Goodbye.
C: .
S: 250 Ok: queued as 12345
C: quit
S: 221 Bye
上面的过程可以看到就几个命令就能将邮件发送出去,但是其实SMTP协议比较复杂,还包括身份认证、附件、多媒体编码等等,我们今天精简一下,我们目前只关心谁给谁发了邮件,发送内容先不管,这样相比完整的SMTP协议(RFC5321),我们只需要关注以下几个命令:
MAIL:开始一份邮件 mail from: xxx@xx.com
RCPT: 标识单个的邮件接收人;常在mail命令后面 可有多个rcpt to: xx@xx.com
QUIT:结束SMTP会话,不一定发送了邮件,注意
RESET:重置会话,当前传输被取消 

最终希望通过Packetbeat将这些数据解析并处理成我们想要的如下JSON数据,即大功告成:
{
"timestamp":"2016-1-15 12:00:00",
"from":"medcl@example.co",
"to":["lcdem@example.co"]
}
我们还需要一个测试数据,这里有一个下载各种协议测试数据包的地方,由wireshark站点提供:https://wiki.wireshark.org/SampleCaptures/
Ctrl+F找到SMTP的下载地址:smtp.pcap
用wireshark打开我们刚刚下载的smtp.pcap文件,然后再输入过滤条件:tcp.port == 25,只看25端口的数据,如下图:

上图可以看到25端口的跑的数据有很多,不过我们只关心我们需要的那几个命令就好了。

打开/~/go/src/github.com/elastic/beats/packetbeat/protos/smtp/smtp.go
定义smtpPrivateData,里面的Data是一个数组,分别是TCP两个方向的数据,SmtpMessage是解析出来的邮件信息
type smtpPrivateData struct{
Data [2]*SmtpStream
}

type SmtpStream struct {
tcptuple *common.TcpTuple

data byte

parseOffset int
isClient bool
message *SmtpMessage
}

type SmtpMessage struct {
Ts time.Time
From string
To string
}
然后参照MySQL协议,定义相应的方法,最终如下:
package smtp

import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/packetbeat/config"
"github.com/elastic/beats/packetbeat/protos"
"github.com/elastic/beats/packetbeat/protos/tcp"
"bytes"
"time"
"strings"
)

type smtpPrivateData struct{
Data [2]*SmtpStream
}

type SmtpStream struct {
tcptuple *common.TcpTuple

data byte

parseOffset int
isClient bool

message *SmtpMessage
}

type SmtpMessage struct {
start int
end int

Ts time.Time
From string
To string
IgnoreMessage bool
}

type Smtp struct {
SendRequest bool
SendResponse bool
transactionTimeout time.Duration
Ports int
results publisher.Client
}

func (smtp *Smtp) initDefaults() {
smtp.SendRequest = false
smtp.SendResponse = false
smtp.transactionTimeout = protos.DefaultTransactionExpiration
}

func (smtp *Smtp) setFromConfig(config config.Smtp) error {
smtp.Ports = config.Ports
if config.SendRequest != nil {
smtp.SendRequest = *config.SendRequest
}
if config.SendResponse != nil {
smtp.SendResponse = *config.SendResponse
}

if config.TransactionTimeout != nil && *config.TransactionTimeout > 0 {
smtp.transactionTimeout = time.Duration(*config.TransactionTimeout) * time.Second
}

return nil
}

func (smtp *Smtp) GetPorts() int {
return smtp.Ports
}

func (smtp *Smtp) Init(test_mode bool, results publisher.Client) error {
smtp.initDefaults()

if !test_mode {
err := smtp.setFromConfig(config.ConfigSingleton.Protocols.Smtp)
if err != nil {
return err
}
}
smtp.results = results

return nil
}

func readLine(data byte, offset int) (bool, string, int) {
q := bytes.Index(data[offset:], byte("\r\n"))
if q == -1 {
return false, "", 0
}
return true, string(data[offset : offset+q]), offset + q + 2
}

func (smtp *Smtp) Parse(pkt *protos.Packet, tcptuple *common.TcpTuple, dir uint8, private protos.ProtocolData, ) protos.ProtocolData {

defer logp.Recover("ParseSmtp exception")

priv := smtpPrivateData{}
if private != nil {
var ok bool
priv, ok = private.(smtpPrivateData)
if !ok {
priv = smtpPrivateData{}
}
}

if priv.Data[dir] == nil {
priv.Data[dir] = &SmtpStream{
tcptuple: tcptuple,
data: pkt.Payload,
message: &SmtpMessage{Ts: pkt.Ts},
}
} else {
// concatenate bytes
priv.Data[dir].data = append(priv.Data[dir].data, pkt.Payload...)
if len(priv.Data[dir].data) > tcp.TCP_MAX_DATA_IN_STREAM {
logp.Debug("smtp", "Stream data too large, dropping TCP stream")
priv.Data[dir] = nil
return priv
}
}

stream := priv.Data[dir]
for len(stream.data) > 0 {
if stream.message == nil {
stream.message = &SmtpMessage{Ts: pkt.Ts}
}

ok, complete := stmpMessageParser(priv.Data[dir])
if !ok {
// drop this tcp stream. Will retry parsing with the next
// segment in it
priv.Data[dir] = nil
logp.Debug("smtp", "Ignore SMTP message. Drop tcp stream. Try parsing with the next segment")
return priv
}

if complete {
smtp.messageComplete(tcptuple, dir, stream)
} else {
logp.Debug("smtp","still wait message...")
// wait for more data
break
}
}

return priv
}

func (smtp *Smtp) ConnectionTimeout() time.Duration {
return smtp.transactionTimeout
}

func stmpMessageParser(s *SmtpStream) (bool, bool) {

var value string=""

for s.parseOffset < len(s.data) {


logp.Debug("smtp", "Parse message: %s", string(s.data[s.parseOffset]))


if strings.HasPrefix(string(s.data[s.parseOffset]),"MAIL" ) {

logp.Debug("smtp", "Hit MAIL command: %s", string(s.data[s.parseOffset]))

found, line, off := readLine(s.data, s.parseOffset)
if !found {
return true, false
}

value = line[1:]
logp.Debug("smtp", "value %s", value)

s.parseOffset = off
} else {
logp.Debug("smtp", "Unexpected message starting with %s", s.data[s.parseOffset:])
return false, false
}
}

return true, false
}

func handleSmtp(stmp *Smtp, m *SmtpMessage, tcptuple *common.TcpTuple,
dir uint8, raw_msg byte) {
logp.Info("smtp","handle smtp message...")

//TODO

}

// Called when the parser has identified a full message.
func (smtp *Smtp) messageComplete(tcptuple *common.TcpTuple, dir uint8, stream *SmtpStream) {

logp.Info("smtp","message completed...")

// all ok, ship it
msg := stream.data[stream.message.start:stream.message.end]

if !stream.message.IgnoreMessage {
handleSmtp(smtp, stream.message, tcptuple, dir, msg)
}

// and reset message
stream.PrepareForNewMessage()
}

func (stream *SmtpStream) PrepareForNewMessage() {
logp.Info("smtp","prepare for new message...")

stream.data = stream.data[stream.parseOffset:]
stream.parseOffset = 0
stream.isClient = false
stream.message = nil
}



func (smtp *Smtp) GapInStream(tcptuple *common.TcpTuple, dir uint8,
nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool) {

defer logp.Recover("GapInStream(smtp) exception")

if private == nil {
return private, false
}

return private, true
}

func (smtp *Smtp) ReceivedFin(tcptuple *common.TcpTuple, dir uint8,
private protos.ProtocolData) protos.ProtocolData {

logp.Info("smtp","stream closed...")

// TODO: check if we have data pending and either drop it to free
// memory or send it up the stack.
return private
}

现在切换到命令行,编译一下
cd ~/go/src/github.com/elastic/beats/packetbeat
make

编译成功,一个滚烫的packetbeat可执行文件就躺在当前目录下了,运行一下先,参数-I 指定pcap文件(还记得前面下载的那个测试文件吧)
./packetbeat -d "smtp" -c etc/packetbeat.yml -I ~/Downloads/smtp.pcap  -e -N

运行查看控制台输出结果:
➜  packetbeat git:(smtpbeat) ✗ ./packetbeat -d "smtp" -c etc/packetbeat.yml -I ~/Downloads/smtp.pcap  -e -N 
2016/01/15 10:12:19.058535 publish.go:191: INFO Dry run mode. All output types except the file based one are disabled.
2016/01/15 10:12:19.058570 geolite.go:24: INFO GeoIP disabled: No paths were set under output.geoip.paths
2016/01/15 10:12:19.058592 publish.go:262: INFO Publisher name: medcls-MacBook.local
2016/01/15 10:12:19.058724 beat.go:145: INFO Init Beat: packetbeat; Version: 1.0.0
2016/01/15 10:12:19.059758 beat.go:171: INFO packetbeat sucessfully setup. Start running.
2016/01/15 10:12:20.155335 smtp.go:163: DBG Parse message: 2
2016/01/15 10:12:20.155416 smtp.go:180: DBG Unexpected message starting with 250-xc90.websitewelcome.com Hello GP [122.162.143.157]
250-SIZE 52428800
250-PIPELINING
250-AUTH PLAIN LOGIN
250-STARTTLS
250 HELP
2016/01/15 10:12:22.310974 smtp.go:163: DBG Parse message: F
2016/01/15 10:12:22.311025 smtp.go:180: DBG Unexpected message starting with From: "Gurpartap Singh"
To:
Subject: SMTP
Date: Mon, 5 Oct 2009 11:36:07 +0530
Message-ID: <000301ca4581$ef9e57f0$cedb07d0$@in>
MIME-Version: 1.0
...

成功了,邮件内容都在控制台输出了,但这还不是我们要的最终结果,我需要里面的关键信息,我们继续修改smtp.go这个文件。
留待下回分解。
继续阅读 »
 书接上回:http://elasticsearch.cn/article/53
 
前面介绍了Packetbeat的项目结构,今天终于要开始写代码了,想想还是有点小激动呢。(你快点吧,拖半天了)
网络传输两大协议TCP和UDP,我们的所有协议都不离这两种,HTTP、MySQL走的是TCP传输协议,DNS走的是UDP协议,在Packetbeat里面,实现一个自己的协议非常简单,继承并实现这两者对应的接口就行了,我们看一下长什么样:
打开一个现有的UDP和HTTP协议接口定义:
/~/go/src/github.com/elastic/beats/packetbeat/protos/protos.go
// Functions to be exported by a protocol plugin
type ProtocolPlugin interface {
// Called to initialize the Plugin
Init(test_mode bool, results publisher.Client) error

// Called to return the configured ports
GetPorts() int
}

type TcpProtocolPlugin interface {
ProtocolPlugin

// Called when TCP payload data is available for parsing.
Parse(pkt *Packet, tcptuple *common.TcpTuple,
dir uint8, private ProtocolData) ProtocolData

// Called when the FIN flag is seen in the TCP stream.
ReceivedFin(tcptuple *common.TcpTuple, dir uint8,
private ProtocolData) ProtocolData

// Called when a packets are missing from the tcp
// stream.
GapInStream(tcptuple *common.TcpTuple, dir uint8, nbytes int,
private ProtocolData) (priv ProtocolData, drop bool)

// ConnectionTimeout returns the per stream connection timeout.
// Return <=0 to set default tcp module transaction timeout.
ConnectionTimeout() time.Duration
}

type UdpProtocolPlugin interface {
ProtocolPlugin

// ParseUdp is invoked when UDP payload data is available for parsing.
ParseUdp(pkt *Packet)
}
TcpProtocolPlugin:TCP协议插件的接口定义,依次是:Parse() 解析Packet,ReceivedFin()处理TCP断开连接,GapInStream()处理空包丢包,ConnectionTimeout()超时时间;
UdpProtocolPlugin: UDP协议的接口定义,UDP协议是不需要握手和保障数据可靠性的,扔出去就结束,速度快,不保证数据可靠送达,所以只有ParseUdp一个方法需要实现,比较简单;
ProtocolPlugin:TCP和UDP都需要实现ProtocolPlugin的基础接口,其实就定义了获取端口和初始化接口。

请问:
Packetbeat怎么工作的?

回答:
每一个协议都有一个固定的端口用于通信,你要做的事情就是定义协议端口,然后按协议是TCP还是UDP来实现对应的接口,Packetbeat将会截获指定端口的数据包(Packet),然后如果交给你定义的方法来进行解析,TCP是Parse,UDP是ParseUdp,都在上面的接口定义好的,然后将解析出来的结构化数据封装成Json,然后扔给Elasticsearch,后续的就的如何对这些数据做一些有趣的分析和应用了。

貌似很简单嘛!

进入每个端口的数据包,我们假设是一个自来水管,拧开80端口,哗啦啦出来的全是HTTP请求的数据包,Packetbeat里面Http协议监听的是80端口啊,所有这些包统统都交给Packetbeat里面的Http协议模块来进行解析,Http协议会一个个的检查这些数据包,也就是每个数据包都会调用一次Parse接口,到这里提到了传过来一个Packet,我们看看它的数据结构长什么样?
type Packet struct {
Ts time.Time
Tuple common.IpPortTuple
Payload byte
}
Packet结构简单,
Ts是收到数据包的时间戳;
Tuple是一个来源IP+来源端口和目的IP+目的端口的元组;
Payload就是这个包里面的传输的有用的数据,应用层的字节数据,不包括IP和TCP/UDP头信息,是不是处理起来简单许多。

首选我们确定SMTP协议的配置,每个协议在packetbeat.yml的protocol下面都应该有一个配置节点,如下:
protocols:
smtp:
# Configure the ports where to listen for Smtp traffic. You can disable
# the Smtp protocol by commenting out the list of ports.
ports: [25]
还需要在对应的config类文件:packetbeat/config/config.go,增加SMTP的结构体,目前只支持一个端口参数,继承基类ProtocolCommon就行,如下:
git diff config/config.go
@@ -42,6 +42,7 @@ type Protocols struct {
Pgsql Pgsql
Redis Redis
Thrift Thrift
+ Smtp Smtp
}

type Dns struct {
@@ -118,5 +119,9 @@ type Redis struct {
Send_response *bool
}

+type Smtp struct {
+ ProtocolCommon `yaml:",inline"`
+}
+
// Config Singleton
var ConfigSingleton Config
在protos文件夹下面,新增smtp目录,并新增空白文件smtp.go,路径:packetbeat/protos/smtp/smtp.go,
这里就是解析SMTP协议的地方,也是我们扩展协议的主要的工作。
...TODO...
修改protos/protos.go,增加SMTP协议枚举,这里记得保证顺序一致,并且protocol名称必须和配置的节点名称一致,如这里都是smtp。
git diff protos/protos.go
@@ -103,6 +103,7 @@ const (
MongodbProtocol
DnsProtocol
MemcacheProtocol
+ SmtpProtocol
)

// Protocol names
@@ -116,6 +117,7 @@ var ProtocolNames = string{
"mongodb",
"dns",
"memcache",
+ "smtp",
}

继续修改packetbeat.go主文件,允许SMTP协议并加载。
git diff packetbeat.go
@@ -27,6 +27,7 @@ import (
"github.com/elastic/packetbeat/protos/tcp"
"github.com/elastic/packetbeat/protos/thrift"
"github.com/elastic/packetbeat/protos/udp"
+ "github.com/elastic/packetbeat/protos/smtp"
"github.com/elastic/packetbeat/sniffer"
)

@@ -43,6 +44,7 @@ var EnabledProtocolPlugins map[protos.Protocol]protos.ProtocolPlugin = map[proto
protos.ThriftProtocol: new(thrift.Thrift),
protos.MongodbProtocol: new(mongodb.Mongodb),
protos.DnsProtocol: new(dns.Dns),
+ protos.SmtpProtocol: new(smtp.Smtp),
}

做完上面一系列修改之后,一个空白的SMTP协议的插件的架子就搭好了,并且插件也注册到了Packetbeat里面了,接下来我们再把packetbeat/protos/smtp/smtp.go按照TCPplugin接口的要求实现一下。

说实话TCP处理起来很难,开始之前,我们先明确几个概念,TCP协议是有状态的,并且是流式的,我们关注的是七层应用层的消息,如HTTP里面的一个HTTP请求和返回,但是TCP底层都是一系列数据包,并且不同的请求的数据包是混杂在一起的,也就是说一个数据包里面可能只是一个HTTP请求的一部分也可能包含多条HTTP请求的一部分,所以Parse()里面需要处理跨数据包的状态信息,我们要把这些数据包和具体的七层的应用层的消息关联起来。

现在我们仔细看看Parse()接口的各个参数定义是做什么用的
Parse(pkt *Packet, tcptuple *common.TcpTuple,
dir uint8, private ProtocolData) ProtocolData

pkt不用说了,是送进来的数据包,前面已经介绍了其数据结构,tcptuple是该数据包所属的TCP数据流所在的唯一标示(一个未关闭的TCP数据量包含若干数据包,直到TCP链接关闭),使用tcptuple.Hashable()获取唯一值;dir参数标示数据包在TCP数据流中的流向,和第一个TCP数据包方向一致是TcpDirectionOriginal,否则是TcpDirectionReverse;private参数可用来在TCP流中存储状态信息,可在运行时转换成具体的强类型,任意修改和传递给下一个Parse方法,简单来说就是进行中间数据的共享。

下面看段MySQL模块里面的例子
 priv := mysqlPrivateData{}
if private != nil {
var ok bool
priv, ok = private.(mysqlPrivateData)
if !ok {
priv = mysqlPrivateData{}
}
}

[ ... ]

return priv
上面的代码就是将private强制转换成mysqlPrivateData结构,然后再使用。
我们再继续看后续怎么处理这些包的一个逻辑例子
ok, complete := mysqlMessageParser(priv.Data[dir])
if !ok {
// drop this tcp stream. Will retry parsing with the next
// segment in it
priv.Data[dir] = nil
logp.Debug("mysql", "Ignore MySQL message. Drop tcp stream.")
return priv
}

if complete {
mysql.messageComplete(tcptuple, dir, stream)
} else {
// wait for more data
break
}
mysqlMessageParser是一个解析mysql消息的方法,细节我们忽略,我们只需要关心它的返回,ok标示成功或者失败,true则继续处理,false表示数据包不能用,那就直接忽略;第二个参数complete表示判断这一个MySQL消息是否已经完整了,如果完整了,我们就可以扔出去了,否则继续等待剩下的消息内容。

好的,我们看看SMTP协议怎么折腾吧,先看看一个邮件交互的流程图,来自RFC5321

由上图可见,发送端和邮件服务器通过一系列命令来执行邮件的发送,下面看看一个具体的命令操作流程(来源:简单邮件传输协议)[/url]
S: 220 www.example.com ESMTP Postfix
C: HELO mydomain.com
S: 250 Hello mydomain.com
C: MAIL FROM:
S: 250 Ok
C: RCPT TO:
S: 250 Ok
C: DATA
S: 354 End data with .
C: Subject: test message
C: From:""< sender@mydomain.com>
C: To:""< friend@example.com>
C:
C: Hello,
C: This is a test.
C: Goodbye.
C: .
S: 250 Ok: queued as 12345
C: quit
S: 221 Bye
上面的过程可以看到就几个命令就能将邮件发送出去,但是其实SMTP协议比较复杂,还包括身份认证、附件、多媒体编码等等,我们今天精简一下,我们目前只关心谁给谁发了邮件,发送内容先不管,这样相比完整的SMTP协议(RFC5321),我们只需要关注以下几个命令:
MAIL:开始一份邮件 mail from: xxx@xx.com
RCPT: 标识单个的邮件接收人;常在mail命令后面 可有多个rcpt to: xx@xx.com
QUIT:结束SMTP会话,不一定发送了邮件,注意
RESET:重置会话,当前传输被取消 

最终希望通过Packetbeat将这些数据解析并处理成我们想要的如下JSON数据,即大功告成:
{
"timestamp":"2016-1-15 12:00:00",
"from":"medcl@example.co",
"to":["lcdem@example.co"]
}
我们还需要一个测试数据,这里有一个下载各种协议测试数据包的地方,由wireshark站点提供:https://wiki.wireshark.org/SampleCaptures/
Ctrl+F找到SMTP的下载地址:smtp.pcap
用wireshark打开我们刚刚下载的smtp.pcap文件,然后再输入过滤条件:tcp.port == 25,只看25端口的数据,如下图:

上图可以看到25端口的跑的数据有很多,不过我们只关心我们需要的那几个命令就好了。

打开/~/go/src/github.com/elastic/beats/packetbeat/protos/smtp/smtp.go
定义smtpPrivateData,里面的Data是一个数组,分别是TCP两个方向的数据,SmtpMessage是解析出来的邮件信息
type smtpPrivateData struct{
Data [2]*SmtpStream
}

type SmtpStream struct {
tcptuple *common.TcpTuple

data byte

parseOffset int
isClient bool
message *SmtpMessage
}

type SmtpMessage struct {
Ts time.Time
From string
To string
}
然后参照MySQL协议,定义相应的方法,最终如下:
package smtp

import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/packetbeat/config"
"github.com/elastic/beats/packetbeat/protos"
"github.com/elastic/beats/packetbeat/protos/tcp"
"bytes"
"time"
"strings"
)

type smtpPrivateData struct{
Data [2]*SmtpStream
}

type SmtpStream struct {
tcptuple *common.TcpTuple

data byte

parseOffset int
isClient bool

message *SmtpMessage
}

type SmtpMessage struct {
start int
end int

Ts time.Time
From string
To string
IgnoreMessage bool
}

type Smtp struct {
SendRequest bool
SendResponse bool
transactionTimeout time.Duration
Ports int
results publisher.Client
}

func (smtp *Smtp) initDefaults() {
smtp.SendRequest = false
smtp.SendResponse = false
smtp.transactionTimeout = protos.DefaultTransactionExpiration
}

func (smtp *Smtp) setFromConfig(config config.Smtp) error {
smtp.Ports = config.Ports
if config.SendRequest != nil {
smtp.SendRequest = *config.SendRequest
}
if config.SendResponse != nil {
smtp.SendResponse = *config.SendResponse
}

if config.TransactionTimeout != nil && *config.TransactionTimeout > 0 {
smtp.transactionTimeout = time.Duration(*config.TransactionTimeout) * time.Second
}

return nil
}

func (smtp *Smtp) GetPorts() int {
return smtp.Ports
}

func (smtp *Smtp) Init(test_mode bool, results publisher.Client) error {
smtp.initDefaults()

if !test_mode {
err := smtp.setFromConfig(config.ConfigSingleton.Protocols.Smtp)
if err != nil {
return err
}
}
smtp.results = results

return nil
}

func readLine(data byte, offset int) (bool, string, int) {
q := bytes.Index(data[offset:], byte("\r\n"))
if q == -1 {
return false, "", 0
}
return true, string(data[offset : offset+q]), offset + q + 2
}

func (smtp *Smtp) Parse(pkt *protos.Packet, tcptuple *common.TcpTuple, dir uint8, private protos.ProtocolData, ) protos.ProtocolData {

defer logp.Recover("ParseSmtp exception")

priv := smtpPrivateData{}
if private != nil {
var ok bool
priv, ok = private.(smtpPrivateData)
if !ok {
priv = smtpPrivateData{}
}
}

if priv.Data[dir] == nil {
priv.Data[dir] = &SmtpStream{
tcptuple: tcptuple,
data: pkt.Payload,
message: &SmtpMessage{Ts: pkt.Ts},
}
} else {
// concatenate bytes
priv.Data[dir].data = append(priv.Data[dir].data, pkt.Payload...)
if len(priv.Data[dir].data) > tcp.TCP_MAX_DATA_IN_STREAM {
logp.Debug("smtp", "Stream data too large, dropping TCP stream")
priv.Data[dir] = nil
return priv
}
}

stream := priv.Data[dir]
for len(stream.data) > 0 {
if stream.message == nil {
stream.message = &SmtpMessage{Ts: pkt.Ts}
}

ok, complete := stmpMessageParser(priv.Data[dir])
if !ok {
// drop this tcp stream. Will retry parsing with the next
// segment in it
priv.Data[dir] = nil
logp.Debug("smtp", "Ignore SMTP message. Drop tcp stream. Try parsing with the next segment")
return priv
}

if complete {
smtp.messageComplete(tcptuple, dir, stream)
} else {
logp.Debug("smtp","still wait message...")
// wait for more data
break
}
}

return priv
}

func (smtp *Smtp) ConnectionTimeout() time.Duration {
return smtp.transactionTimeout
}

func stmpMessageParser(s *SmtpStream) (bool, bool) {

var value string=""

for s.parseOffset < len(s.data) {


logp.Debug("smtp", "Parse message: %s", string(s.data[s.parseOffset]))


if strings.HasPrefix(string(s.data[s.parseOffset]),"MAIL" ) {

logp.Debug("smtp", "Hit MAIL command: %s", string(s.data[s.parseOffset]))

found, line, off := readLine(s.data, s.parseOffset)
if !found {
return true, false
}

value = line[1:]
logp.Debug("smtp", "value %s", value)

s.parseOffset = off
} else {
logp.Debug("smtp", "Unexpected message starting with %s", s.data[s.parseOffset:])
return false, false
}
}

return true, false
}

func handleSmtp(stmp *Smtp, m *SmtpMessage, tcptuple *common.TcpTuple,
dir uint8, raw_msg byte) {
logp.Info("smtp","handle smtp message...")

//TODO

}

// Called when the parser has identified a full message.
func (smtp *Smtp) messageComplete(tcptuple *common.TcpTuple, dir uint8, stream *SmtpStream) {

logp.Info("smtp","message completed...")

// all ok, ship it
msg := stream.data[stream.message.start:stream.message.end]

if !stream.message.IgnoreMessage {
handleSmtp(smtp, stream.message, tcptuple, dir, msg)
}

// and reset message
stream.PrepareForNewMessage()
}

func (stream *SmtpStream) PrepareForNewMessage() {
logp.Info("smtp","prepare for new message...")

stream.data = stream.data[stream.parseOffset:]
stream.parseOffset = 0
stream.isClient = false
stream.message = nil
}



func (smtp *Smtp) GapInStream(tcptuple *common.TcpTuple, dir uint8,
nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool) {

defer logp.Recover("GapInStream(smtp) exception")

if private == nil {
return private, false
}

return private, true
}

func (smtp *Smtp) ReceivedFin(tcptuple *common.TcpTuple, dir uint8,
private protos.ProtocolData) protos.ProtocolData {

logp.Info("smtp","stream closed...")

// TODO: check if we have data pending and either drop it to free
// memory or send it up the stack.
return private
}

现在切换到命令行,编译一下
cd ~/go/src/github.com/elastic/beats/packetbeat
make

编译成功,一个滚烫的packetbeat可执行文件就躺在当前目录下了,运行一下先,参数-I 指定pcap文件(还记得前面下载的那个测试文件吧)
./packetbeat -d "smtp" -c etc/packetbeat.yml -I ~/Downloads/smtp.pcap  -e -N

运行查看控制台输出结果:
➜  packetbeat git:(smtpbeat) ✗ ./packetbeat -d "smtp" -c etc/packetbeat.yml -I ~/Downloads/smtp.pcap  -e -N 
2016/01/15 10:12:19.058535 publish.go:191: INFO Dry run mode. All output types except the file based one are disabled.
2016/01/15 10:12:19.058570 geolite.go:24: INFO GeoIP disabled: No paths were set under output.geoip.paths
2016/01/15 10:12:19.058592 publish.go:262: INFO Publisher name: medcls-MacBook.local
2016/01/15 10:12:19.058724 beat.go:145: INFO Init Beat: packetbeat; Version: 1.0.0
2016/01/15 10:12:19.059758 beat.go:171: INFO packetbeat sucessfully setup. Start running.
2016/01/15 10:12:20.155335 smtp.go:163: DBG Parse message: 2
2016/01/15 10:12:20.155416 smtp.go:180: DBG Unexpected message starting with 250-xc90.websitewelcome.com Hello GP [122.162.143.157]
250-SIZE 52428800
250-PIPELINING
250-AUTH PLAIN LOGIN
250-STARTTLS
250 HELP
2016/01/15 10:12:22.310974 smtp.go:163: DBG Parse message: F
2016/01/15 10:12:22.311025 smtp.go:180: DBG Unexpected message starting with From: "Gurpartap Singh"
To:
Subject: SMTP
Date: Mon, 5 Oct 2009 11:36:07 +0530
Message-ID: <000301ca4581$ef9e57f0$cedb07d0$@in>
MIME-Version: 1.0
...

成功了,邮件内容都在控制台输出了,但这还不是我们要的最终结果,我需要里面的关键信息,我们继续修改smtp.go这个文件。
留待下回分解。 收起阅读 »

Packetbeat协议扩展开发教程(2)

书接上回:http://elasticsearch.cn/article/48

我们打开Packetbeat项目,看看里面长什么样:



现在beats项目都合并在一起了,第一级可以看到各个子项目:
/libbeat: 公共依赖;
/filebeat: 替代Logstash-forwarder,处理日志类型数据;
/packetbeat: 本文扩展重点,网络抓包;
/topbeat: 监控系统性能;
/winlogbeat: 监控windows下面的日志信息;
/vender: 依赖的第三方库;
/tests: 用于测试的pcamp抓包文件,非常有用;
/scripts: 一些用于开发和测试的Docker脚本文件;

现在重点看看/packetbeat下面目录都有些什么:
/packetbeat/main.go: 启动入口,里面没有什么逻辑;
/packetbeat/beat/: 里面就一个packetbeat.go文件,packetbeat主程序,处理配置和命令行参数,协议需要在这里进行注册;
/packetbeat/config/: 里面就一个config.go文件,定义了所有的配置相关的struct结构体,新协议需要在这里定义其配置的结构体;
/packetbeat/debian/: debian打包相关;
/packetbeat/decoder/: 解码类,网络传输层包的解码;
/packetbeat/docs/: 项目的相关文档;
/packetbeat/etc/: 示例配置文件;
/packetbeat/procs/: 获取系统内核运作状态与进程信息的工具类;
/packetbeat/protos/:自定义协议类,每个目录对应一个应用协议,我们需要在此新增我们的协议,如SMTP;
/packetbeat/sniffer/: 三种不同抓包方式的实现:pcap、af_packet、pf_ring,关于这三者的区别,请参照文档:Traffic Capturing Options;
/packetbeat/tests/: 测试相关的文件,里面有每一个协议的pcab抓包样板,还有一堆Python测试脚本;

知道项目的大概架构就知道从哪下手了,下节分解。
继续阅读 »
书接上回:http://elasticsearch.cn/article/48

我们打开Packetbeat项目,看看里面长什么样:



现在beats项目都合并在一起了,第一级可以看到各个子项目:
/libbeat: 公共依赖;
/filebeat: 替代Logstash-forwarder,处理日志类型数据;
/packetbeat: 本文扩展重点,网络抓包;
/topbeat: 监控系统性能;
/winlogbeat: 监控windows下面的日志信息;
/vender: 依赖的第三方库;
/tests: 用于测试的pcamp抓包文件,非常有用;
/scripts: 一些用于开发和测试的Docker脚本文件;

现在重点看看/packetbeat下面目录都有些什么:
/packetbeat/main.go: 启动入口,里面没有什么逻辑;
/packetbeat/beat/: 里面就一个packetbeat.go文件,packetbeat主程序,处理配置和命令行参数,协议需要在这里进行注册;
/packetbeat/config/: 里面就一个config.go文件,定义了所有的配置相关的struct结构体,新协议需要在这里定义其配置的结构体;
/packetbeat/debian/: debian打包相关;
/packetbeat/decoder/: 解码类,网络传输层包的解码;
/packetbeat/docs/: 项目的相关文档;
/packetbeat/etc/: 示例配置文件;
/packetbeat/procs/: 获取系统内核运作状态与进程信息的工具类;
/packetbeat/protos/:自定义协议类,每个目录对应一个应用协议,我们需要在此新增我们的协议,如SMTP;
/packetbeat/sniffer/: 三种不同抓包方式的实现:pcap、af_packet、pf_ring,关于这三者的区别,请参照文档:Traffic Capturing Options;
/packetbeat/tests/: 测试相关的文件,里面有每一个协议的pcab抓包样板,还有一堆Python测试脚本;

知道项目的大概架构就知道从哪下手了,下节分解。 收起阅读 »

Packetbeat协议扩展开发教程(1)

Packetbeat(https://www.elastic.co/products/beats/packetbeat
是一个开源的网络抓包与分析框架,内置了很多常见的协议解析,如HTPP、MySQL、Thrift等。但是网络协议有很多,如何扩展一个自己的协议呢,本文将为您介绍如何在Packetbeat基础上扩展实现您自己的协议。

开发环境:
1.Go语言
Packetbeat是由Go语言编写,具有高性能和易部署的特点,有关Go语言的更多信息请访问:https://golang.org/
2.Git
源码管理,相信大家都比较熟悉了。
3.Tcpdump
*nix下的抓包分析,可选,用于调试。
4.Mac本一台
Windows太伤,不建议。
5.IDE
推荐idea,其它只要你顺手都行。

这个教程给大家介绍的是编写一个SMTP协议的扩展,SMTP就是我们发邮件使用的协议,加密的比较麻烦,为了方便,本教程使用不加密的名文传输的SMTP协议,默认对应端口是25。

A.源码签出
登陆Github打开https://github.com/elastic/beats 

fork后得到你自己的仓库,比如我的:https://github.com/medcl/packetbeat 
#创建相应目录
mkdir -p $GOPATH/src/github.com/elastic/ 
cd $GOPATH/src/github.com/elastic

#签出源码
git clone https://github.com/elastic/beats.git
cd beats

#修改官方仓库为upstream源,设置自己的仓库为origin源
git remote rename origin upstream
git remote add origin git@github.com:medcl/packetbeat.git

#获取上游最新的代码,如果是刚fork的话可不用管
git pull upstream master

#签出一个名为smtpbeat的分支,用于开发这个功能
git checkout -b smtpbeat

#切换到packetbeat模块
cd packetbeat

#获取依赖信息
(mkdir -p $GOPATH/src/golang.org/x/&&cd $GOPATH/src/golang.org/x &&git clone https://github.com/golang/tools.git )
go get github.com/tools/godep

#编译
make

编译出来的文件:packetbeat就在根目录
现在我们测试一下
修改etc/packetbeat.yml,在output下面的elasticsearch下面添加enabled: true,默认是不启用的,另外如果你的Elasticsearch安装了Shield,比如我的Elasticsearch的用户名和密码都是tribe_user,哦,忘了说了,我们的Elasticsearch跑在本机。
packetbeat.yml的详细配置可参见:https://www.elastic.co/guide/e ... .html 
output:
elasticsearch:
enabled: true
hosts: ["localhost:9200"]
username: "tribe_user"
password: "tribe_user"

现在可以运行命令启动packetbeat了,默认会监听所有内置的协议,如HTTP、DNS等。
./packetbeat -e -c etc/packetbeat.yml  -d "publish"

介绍一下常用的参数:
-N dry run模式,不实际output存储日志
-e 控制台输出调试日志
-d 仅显示对应logger的日志

好的,我们打开几个网页,控制台会有相应的输出,如下:
2015/12/29 14:24:39.965037 preprocess.go:37: DBG  Start Preprocessing
2015/12/29 14:24:39.965366 publish.go:98: DBG Publish: {
"@timestamp": "2015-12-29T14:24:39.709Z",
"beat": {
"hostname": "medcls-MacBook.local",
"name": "medcls-MacBook.local"
},
"bytes_in": 31,
"bytes_out": 115,
"client_ip": "192.168.3.10",
"client_port": 53669,
"client_proc": "",
"client_server": "",
"count": 1,
"direction": "out",
"dns": {
"additionals_count": 0,
"answers": [
{
"class": "IN",
"data": "www.a.shifen.com",
"name": "sp2.baidu.com",
"ttl": 333,
"type": "CNAME"
}
],
"answers_count": 1,
"authorities": [
{
"class": "IN",
"data": "ns1.a.shifen.com",
"expire": 86400,
"minimum": 3600,
"name": "a.shifen.com",
"refresh": 5,
"retry": 5,
"rname": "baidu_dns_master.baidu.com",
"serial": 1512240003,
"ttl": 12,
"type": "SOA"
}
],
"authorities_count": 1,
"flags": {
"authoritative": false,
"recursion_allowed": true,
"recursion_desired": true,
"truncated_response": false
},
"id": 7435,
"op_code": "QUERY",
"question": {
"class": "IN",
"name": "sp2.baidu.com",
"type": "AAAA"
},
"response_code": "NOERROR"
},
"ip": "192.168.3.1",
"method": "QUERY",
"port": 53,
"proc": "",
"query": "class IN, type AAAA, sp2.baidu.com",
"resource": "sp2.baidu.com",
"responsetime": 18,
"server": "",
"status": "OK",
"transport": "udp",
"type": "dns"
}
2015/12/29 14:24:39.965774 preprocess.go:94: DBG Forward preprocessed events
2015/12/29 14:24:39.965796 async.go:42: DBG async forward to outputers (1)
2015/12/29 14:24:40.099973 output.go:103: DBG output worker: publish 2 events

然后Elasticsearch应该就会有数据进去了,我们看看:
curl http://localhost:9200/_cat/indices\?pretty\=true -u tribe_user:tribe_user
yellow open packetbeat-2015.12.29 5 1 135 0 561.2kb 561.2kb

至此,packetbeat源码的build成功,我们整个开发流程已经跑通了,下一节正式开始介绍SMTP协议的扩展。
继续阅读 »
Packetbeat(https://www.elastic.co/products/beats/packetbeat
是一个开源的网络抓包与分析框架,内置了很多常见的协议解析,如HTPP、MySQL、Thrift等。但是网络协议有很多,如何扩展一个自己的协议呢,本文将为您介绍如何在Packetbeat基础上扩展实现您自己的协议。

开发环境:
1.Go语言
Packetbeat是由Go语言编写,具有高性能和易部署的特点,有关Go语言的更多信息请访问:https://golang.org/
2.Git
源码管理,相信大家都比较熟悉了。
3.Tcpdump
*nix下的抓包分析,可选,用于调试。
4.Mac本一台
Windows太伤,不建议。
5.IDE
推荐idea,其它只要你顺手都行。

这个教程给大家介绍的是编写一个SMTP协议的扩展,SMTP就是我们发邮件使用的协议,加密的比较麻烦,为了方便,本教程使用不加密的名文传输的SMTP协议,默认对应端口是25。

A.源码签出
登陆Github打开https://github.com/elastic/beats 

fork后得到你自己的仓库,比如我的:https://github.com/medcl/packetbeat 
#创建相应目录
mkdir -p $GOPATH/src/github.com/elastic/ 
cd $GOPATH/src/github.com/elastic

#签出源码
git clone https://github.com/elastic/beats.git
cd beats

#修改官方仓库为upstream源,设置自己的仓库为origin源
git remote rename origin upstream
git remote add origin git@github.com:medcl/packetbeat.git

#获取上游最新的代码,如果是刚fork的话可不用管
git pull upstream master

#签出一个名为smtpbeat的分支,用于开发这个功能
git checkout -b smtpbeat

#切换到packetbeat模块
cd packetbeat

#获取依赖信息
(mkdir -p $GOPATH/src/golang.org/x/&&cd $GOPATH/src/golang.org/x &&git clone https://github.com/golang/tools.git )
go get github.com/tools/godep

#编译
make

编译出来的文件:packetbeat就在根目录
现在我们测试一下
修改etc/packetbeat.yml,在output下面的elasticsearch下面添加enabled: true,默认是不启用的,另外如果你的Elasticsearch安装了Shield,比如我的Elasticsearch的用户名和密码都是tribe_user,哦,忘了说了,我们的Elasticsearch跑在本机。
packetbeat.yml的详细配置可参见:https://www.elastic.co/guide/e ... .html 
output:
elasticsearch:
enabled: true
hosts: ["localhost:9200"]
username: "tribe_user"
password: "tribe_user"

现在可以运行命令启动packetbeat了,默认会监听所有内置的协议,如HTTP、DNS等。
./packetbeat -e -c etc/packetbeat.yml  -d "publish"

介绍一下常用的参数:
-N dry run模式,不实际output存储日志
-e 控制台输出调试日志
-d 仅显示对应logger的日志

好的,我们打开几个网页,控制台会有相应的输出,如下:
2015/12/29 14:24:39.965037 preprocess.go:37: DBG  Start Preprocessing
2015/12/29 14:24:39.965366 publish.go:98: DBG Publish: {
"@timestamp": "2015-12-29T14:24:39.709Z",
"beat": {
"hostname": "medcls-MacBook.local",
"name": "medcls-MacBook.local"
},
"bytes_in": 31,
"bytes_out": 115,
"client_ip": "192.168.3.10",
"client_port": 53669,
"client_proc": "",
"client_server": "",
"count": 1,
"direction": "out",
"dns": {
"additionals_count": 0,
"answers": [
{
"class": "IN",
"data": "www.a.shifen.com",
"name": "sp2.baidu.com",
"ttl": 333,
"type": "CNAME"
}
],
"answers_count": 1,
"authorities": [
{
"class": "IN",
"data": "ns1.a.shifen.com",
"expire": 86400,
"minimum": 3600,
"name": "a.shifen.com",
"refresh": 5,
"retry": 5,
"rname": "baidu_dns_master.baidu.com",
"serial": 1512240003,
"ttl": 12,
"type": "SOA"
}
],
"authorities_count": 1,
"flags": {
"authoritative": false,
"recursion_allowed": true,
"recursion_desired": true,
"truncated_response": false
},
"id": 7435,
"op_code": "QUERY",
"question": {
"class": "IN",
"name": "sp2.baidu.com",
"type": "AAAA"
},
"response_code": "NOERROR"
},
"ip": "192.168.3.1",
"method": "QUERY",
"port": 53,
"proc": "",
"query": "class IN, type AAAA, sp2.baidu.com",
"resource": "sp2.baidu.com",
"responsetime": 18,
"server": "",
"status": "OK",
"transport": "udp",
"type": "dns"
}
2015/12/29 14:24:39.965774 preprocess.go:94: DBG Forward preprocessed events
2015/12/29 14:24:39.965796 async.go:42: DBG async forward to outputers (1)
2015/12/29 14:24:40.099973 output.go:103: DBG output worker: publish 2 events

然后Elasticsearch应该就会有数据进去了,我们看看:
curl http://localhost:9200/_cat/indices\?pretty\=true -u tribe_user:tribe_user
yellow open packetbeat-2015.12.29 5 1 135 0 561.2kb 561.2kb

至此,packetbeat源码的build成功,我们整个开发流程已经跑通了,下一节正式开始介绍SMTP协议的扩展。 收起阅读 »