es索引模版配置不当导致的aggs聚合查询字段显示错误的问题
今天在es中对http日志的状态码status进行aggs搜索出现字段内容显示不正常的问题,记录过程:
http日志的情况:
1、http日志从logstash写入es时,状态码配置为status,其内容为 200 ,302 ,400 ,404等。
2、使用kibana对该日志的索引进行查询,在discover页面中显示的status内容跟logstash的内容一致,是正常的。
出现问题的场景:
(我这里使用的是kibana的sense插件进行的查询,如果直接使用curl python-ES也是一样的)
查询该索引:
POST http-2016.03.18/_search
{
"fields": ["status"],
"query":{
"bool":{
"must": [
{
"range" : {
"@timestamp" : {"gte" : "now-5m"}
}
}
]
}
},
"_source": "false",
"size": 0,
"aggs": {
"status_type": {
"terms":{"field":"status"}
}
}
}
查询返回的结果中aggregations部分的内容:
"aggregations" : {
"status_type" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [ {
"key" : -56,
"doc_count" : 376341
}, {
"key" : 46,
"doc_count" : 51439
}, {
"key" : 45,
"doc_count" : 5543
}, {
"key" : 48,
"doc_count" : 1669
}, {
"key" : -108,
"doc_count" : 1068
}, {
"key" : -50,
"doc_count" : 11
}, {
"key" : -109,
"doc_count" : 8
}, {
"key" : -112,
"doc_count" : 4
}
寻找原因:
起先先去掉了查询的aggs部分,单独查询query的内容:
POST http-2016.03.18/_search
{
"fields": ["status"],
"query":{
"bool":{
"must": [
{
"range" : {
"@timestamp" : {"gte" : "now-5m"}
}
}
]
}
}
}
返回的结果中,hits显示的status字段内容是正常的:
"hits": {
"total": 1242104,
"max_score": 1,
"hits": [
{
"_index": "http-2016.03.18",
"_type": "log",
"_id": "AVOI3EiwidwPAhB1e7gQ",
"_score": 1,
"fields": {
"status": [
"200"
]
}
}
......
然后查询了http索引的索引信息和模版配置:
GET /http-2016.03.18/
GET /_template/http
发现其中http的status的属性type类型的内容是byte :
"properties": {
"@timestamp": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
},
......
......
"status": {
"type": "byte"
},
......
......
原因:
在aggs查询中发现了status字段显示错误的情况,status的type类型在es模版中定义成了byte类型,当status的值超过127后将出现溢出的情况,因此修改为short后,恢复了正常。
(对于http的状态码status,其type类型使用short已经足够了,如果使用integer,long或默认的string类型也是可以的,这里影响的是存储空间占用的大小。)
继续阅读 »
http日志的情况:
1、http日志从logstash写入es时,状态码配置为status,其内容为 200 ,302 ,400 ,404等。
2、使用kibana对该日志的索引进行查询,在discover页面中显示的status内容跟logstash的内容一致,是正常的。
出现问题的场景:
(我这里使用的是kibana的sense插件进行的查询,如果直接使用curl python-ES也是一样的)
查询该索引:
POST http-2016.03.18/_search
{
"fields": ["status"],
"query":{
"bool":{
"must": [
{
"range" : {
"@timestamp" : {"gte" : "now-5m"}
}
}
]
}
},
"_source": "false",
"size": 0,
"aggs": {
"status_type": {
"terms":{"field":"status"}
}
}
}
查询返回的结果中aggregations部分的内容:
"aggregations" : {
"status_type" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [ {
"key" : -56,
"doc_count" : 376341
}, {
"key" : 46,
"doc_count" : 51439
}, {
"key" : 45,
"doc_count" : 5543
}, {
"key" : 48,
"doc_count" : 1669
}, {
"key" : -108,
"doc_count" : 1068
}, {
"key" : -50,
"doc_count" : 11
}, {
"key" : -109,
"doc_count" : 8
}, {
"key" : -112,
"doc_count" : 4
}
寻找原因:
起先先去掉了查询的aggs部分,单独查询query的内容:
POST http-2016.03.18/_search
{
"fields": ["status"],
"query":{
"bool":{
"must": [
{
"range" : {
"@timestamp" : {"gte" : "now-5m"}
}
}
]
}
}
}
返回的结果中,hits显示的status字段内容是正常的:
"hits": {
"total": 1242104,
"max_score": 1,
"hits": [
{
"_index": "http-2016.03.18",
"_type": "log",
"_id": "AVOI3EiwidwPAhB1e7gQ",
"_score": 1,
"fields": {
"status": [
"200"
]
}
}
......
然后查询了http索引的索引信息和模版配置:
GET /http-2016.03.18/
GET /_template/http
发现其中http的status的属性type类型的内容是byte :
"properties": {
"@timestamp": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
},
......
......
"status": {
"type": "byte"
},
......
......
原因:
在aggs查询中发现了status字段显示错误的情况,status的type类型在es模版中定义成了byte类型,当status的值超过127后将出现溢出的情况,因此修改为short后,恢复了正常。
(对于http的状态码status,其type类型使用short已经足够了,如果使用integer,long或默认的string类型也是可以的,这里影响的是存储空间占用的大小。)
今天在es中对http日志的状态码status进行aggs搜索出现字段内容显示不正常的问题,记录过程:
http日志的情况:
1、http日志从logstash写入es时,状态码配置为status,其内容为 200 ,302 ,400 ,404等。
2、使用kibana对该日志的索引进行查询,在discover页面中显示的status内容跟logstash的内容一致,是正常的。
出现问题的场景:
(我这里使用的是kibana的sense插件进行的查询,如果直接使用curl python-ES也是一样的)
查询该索引:
POST http-2016.03.18/_search
{
"fields": ["status"],
"query":{
"bool":{
"must": [
{
"range" : {
"@timestamp" : {"gte" : "now-5m"}
}
}
]
}
},
"_source": "false",
"size": 0,
"aggs": {
"status_type": {
"terms":{"field":"status"}
}
}
}
查询返回的结果中aggregations部分的内容:
"aggregations" : {
"status_type" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [ {
"key" : -56,
"doc_count" : 376341
}, {
"key" : 46,
"doc_count" : 51439
}, {
"key" : 45,
"doc_count" : 5543
}, {
"key" : 48,
"doc_count" : 1669
}, {
"key" : -108,
"doc_count" : 1068
}, {
"key" : -50,
"doc_count" : 11
}, {
"key" : -109,
"doc_count" : 8
}, {
"key" : -112,
"doc_count" : 4
}
寻找原因:
起先先去掉了查询的aggs部分,单独查询query的内容:
POST http-2016.03.18/_search
{
"fields": ["status"],
"query":{
"bool":{
"must": [
{
"range" : {
"@timestamp" : {"gte" : "now-5m"}
}
}
]
}
}
}
返回的结果中,hits显示的status字段内容是正常的:
"hits": {
"total": 1242104,
"max_score": 1,
"hits": [
{
"_index": "http-2016.03.18",
"_type": "log",
"_id": "AVOI3EiwidwPAhB1e7gQ",
"_score": 1,
"fields": {
"status": [
"200"
]
}
}
......
然后查询了http索引的索引信息和模版配置:
GET /http-2016.03.18/
GET /_template/http
发现其中http的status的属性type类型的内容是byte :
"properties": {
"@timestamp": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
},
......
......
"status": {
"type": "byte"
},
......
......
原因:
在aggs查询中发现了status字段显示错误的情况,status的type类型在es模版中定义成了byte类型,当status的值超过127后将出现溢出的情况,因此修改为short后,恢复了正常。
(对于http的状态码status,其type类型使用short已经足够了,如果使用integer,long或默认的string类型也是可以的,这里影响的是存储空间占用的大小。)
收起阅读 »
http日志的情况:
1、http日志从logstash写入es时,状态码配置为status,其内容为 200 ,302 ,400 ,404等。
2、使用kibana对该日志的索引进行查询,在discover页面中显示的status内容跟logstash的内容一致,是正常的。
出现问题的场景:
(我这里使用的是kibana的sense插件进行的查询,如果直接使用curl python-ES也是一样的)
查询该索引:
POST http-2016.03.18/_search
{
"fields": ["status"],
"query":{
"bool":{
"must": [
{
"range" : {
"@timestamp" : {"gte" : "now-5m"}
}
}
]
}
},
"_source": "false",
"size": 0,
"aggs": {
"status_type": {
"terms":{"field":"status"}
}
}
}
查询返回的结果中aggregations部分的内容:
"aggregations" : {
"status_type" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [ {
"key" : -56,
"doc_count" : 376341
}, {
"key" : 46,
"doc_count" : 51439
}, {
"key" : 45,
"doc_count" : 5543
}, {
"key" : 48,
"doc_count" : 1669
}, {
"key" : -108,
"doc_count" : 1068
}, {
"key" : -50,
"doc_count" : 11
}, {
"key" : -109,
"doc_count" : 8
}, {
"key" : -112,
"doc_count" : 4
}
寻找原因:
起先先去掉了查询的aggs部分,单独查询query的内容:
POST http-2016.03.18/_search
{
"fields": ["status"],
"query":{
"bool":{
"must": [
{
"range" : {
"@timestamp" : {"gte" : "now-5m"}
}
}
]
}
}
}
返回的结果中,hits显示的status字段内容是正常的:
"hits": {
"total": 1242104,
"max_score": 1,
"hits": [
{
"_index": "http-2016.03.18",
"_type": "log",
"_id": "AVOI3EiwidwPAhB1e7gQ",
"_score": 1,
"fields": {
"status": [
"200"
]
}
}
......
然后查询了http索引的索引信息和模版配置:
GET /http-2016.03.18/
GET /_template/http
发现其中http的status的属性type类型的内容是byte :
"properties": {
"@timestamp": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
},
......
......
"status": {
"type": "byte"
},
......
......
原因:
在aggs查询中发现了status字段显示错误的情况,status的type类型在es模版中定义成了byte类型,当status的值超过127后将出现溢出的情况,因此修改为short后,恢复了正常。
(对于http的状态码status,其type类型使用short已经足够了,如果使用integer,long或默认的string类型也是可以的,这里影响的是存储空间占用的大小。)
收起阅读 »
Max 发表于 : 2016-03-18 16:51
评论 (0)
medcl 发表于 : 2016-03-01 11:59
评论 (1)
使用 SQL 查询 Elasticsearch
我新写了一个用 SQL 查询 Elasticsearch 的工具 https://github.com/taowen/es-monitor,欢迎大家使用。详细的文档参见:https://segmentfault.com/a/1190000003502849
在此之前,有这么三个SQL查询Elasticsearch的工具:
Crate.io 的问题是它不是Elasticsearch,它的聚合是自己实现的版本,和Elasticsearch的Aggregation是两套东西。
http://sqltoelasticsearch.fr/ 语法支持很不晚上,同时 WHERE 和 GROUP BY 就翻译错了。
https://github.com/NLPchina/elasticsearch-sql 的问题在于其用Java来翻译SQL太笨拙了,如果要达到同样的SQL语法支持程度还要增加大量的Java代码。
如果只是支持SQL,很多Elasticsearch的功能是无法被充分释放的。比如Elasticsearch支持sub aggregation,每个sub aggregation就是OLAP里的下钻一次的概念。而且每下钻一次都可以有自己的指标计算。简单的SQL是无法表达这样的特性的。所以我扩充了一下SQL的语义,使得其更贴近Elasticsearch聚合的工作方式:
继续阅读 »
在此之前,有这么三个SQL查询Elasticsearch的工具:
Crate.io 的问题是它不是Elasticsearch,它的聚合是自己实现的版本,和Elasticsearch的Aggregation是两套东西。
http://sqltoelasticsearch.fr/ 语法支持很不晚上,同时 WHERE 和 GROUP BY 就翻译错了。
https://github.com/NLPchina/elasticsearch-sql 的问题在于其用Java来翻译SQL太笨拙了,如果要达到同样的SQL语法支持程度还要增加大量的Java代码。
如果只是支持SQL,很多Elasticsearch的功能是无法被充分释放的。比如Elasticsearch支持sub aggregation,每个sub aggregation就是OLAP里的下钻一次的概念。而且每下钻一次都可以有自己的指标计算。简单的SQL是无法表达这样的特性的。所以我扩充了一下SQL的语义,使得其更贴近Elasticsearch聚合的工作方式:
$ cat << EOF | ./es_query.py http://127.0.0.1:9200
WITH SELECT MAX(market_cap) AS max_all_times FROM symbol AS all_symbols;
WITH SELECT MAX(market_cap) AS max_at_2000 FROM all_symbols WHERE ipo_year=2000 AS year_2000;
WITH SELECT MAX(market_cap) AS max_at_2001 FROM all_symbols WHERE ipo_year=2001 AS year_2001;
EOF
希望我的小工具可以帮到你
我新写了一个用 SQL 查询 Elasticsearch 的工具 https://github.com/taowen/es-monitor,欢迎大家使用。详细的文档参见:https://segmentfault.com/a/1190000003502849
在此之前,有这么三个SQL查询Elasticsearch的工具:
Crate.io 的问题是它不是Elasticsearch,它的聚合是自己实现的版本,和Elasticsearch的Aggregation是两套东西。
http://sqltoelasticsearch.fr/ 语法支持很不晚上,同时 WHERE 和 GROUP BY 就翻译错了。
https://github.com/NLPchina/elasticsearch-sql 的问题在于其用Java来翻译SQL太笨拙了,如果要达到同样的SQL语法支持程度还要增加大量的Java代码。
如果只是支持SQL,很多Elasticsearch的功能是无法被充分释放的。比如Elasticsearch支持sub aggregation,每个sub aggregation就是OLAP里的下钻一次的概念。而且每下钻一次都可以有自己的指标计算。简单的SQL是无法表达这样的特性的。所以我扩充了一下SQL的语义,使得其更贴近Elasticsearch聚合的工作方式:
收起阅读 »
在此之前,有这么三个SQL查询Elasticsearch的工具:
Crate.io 的问题是它不是Elasticsearch,它的聚合是自己实现的版本,和Elasticsearch的Aggregation是两套东西。
http://sqltoelasticsearch.fr/ 语法支持很不晚上,同时 WHERE 和 GROUP BY 就翻译错了。
https://github.com/NLPchina/elasticsearch-sql 的问题在于其用Java来翻译SQL太笨拙了,如果要达到同样的SQL语法支持程度还要增加大量的Java代码。
如果只是支持SQL,很多Elasticsearch的功能是无法被充分释放的。比如Elasticsearch支持sub aggregation,每个sub aggregation就是OLAP里的下钻一次的概念。而且每下钻一次都可以有自己的指标计算。简单的SQL是无法表达这样的特性的。所以我扩充了一下SQL的语义,使得其更贴近Elasticsearch聚合的工作方式:
$ cat << EOF | ./es_query.py http://127.0.0.1:9200
WITH SELECT MAX(market_cap) AS max_all_times FROM symbol AS all_symbols;
WITH SELECT MAX(market_cap) AS max_at_2000 FROM all_symbols WHERE ipo_year=2000 AS year_2000;
WITH SELECT MAX(market_cap) AS max_at_2001 FROM all_symbols WHERE ipo_year=2001 AS year_2001;
EOF
希望我的小工具可以帮到你收起阅读 »
ElasticSearch2.1.1安装及简单配置说明
目前最新版ES超级详细的安装、配置流程。
根据自己真实的安装过程以及多篇博客文章的重要提示编写。
按照文档中的说明一步一步操作,分分钟就能开始ES2.1.1的非凡体验!
目前最新版ES超级详细的安装、配置流程。
根据自己真实的安装过程以及多篇博客文章的重要提示编写。
按照文档中的说明一步一步操作,分分钟就能开始ES2.1.1的非凡体验!
verra1448 发表于 : 2016-02-02 09:29
评论 (2)
nest驱动IndexName问题
nest驱动访问ES,按照官网文档,使用如下程序可以正常索引:
static void Main()
{
var node = new Uri("http://localhost:9200");
var settings = new ConnectionSettings(
node,
defaultIndex: "my-application"
);
var client = new ElasticClient(settings);
var person = new Person
{
Id = "1",
Firstname = "Martijn",
Lastname = "Laarman"
};
var index = client.Index(person);
}
调整为手动设置indexName时出错,示例代码如下:
static void Main()
{
var node = new Uri("http://localhost:9200");
var settings = new ConnectionSettings(
node,
defaultIndex: "my-application"
);
settings.MapDefaultTypeIndices(d => d.Add(typeof(Person), "constIndex"));
var client = new ElasticClient(settings);
var person = new Person
{
Id = "1",
Firstname = "Martijn",
Lastname = "Laarman"
};
var index = client.Index(person);
}
出错提示为:
{StatusCode: 400,
Method: PUT,
Url: http://localhost:9200/constIndex/automobile/1,
Request: {
"firstname": "Martijn",
"lastname": "Laarman",
"id": "1"
},
Response: <Response stream not captured or already read to completion by serializer, set ExposeRawResponse() on connectionsettings to force it to be set on>}
继续阅读 »
static void Main()
{
var node = new Uri("http://localhost:9200");
var settings = new ConnectionSettings(
node,
defaultIndex: "my-application"
);
var client = new ElasticClient(settings);
var person = new Person
{
Id = "1",
Firstname = "Martijn",
Lastname = "Laarman"
};
var index = client.Index(person);
}
调整为手动设置indexName时出错,示例代码如下:
static void Main()
{
var node = new Uri("http://localhost:9200");
var settings = new ConnectionSettings(
node,
defaultIndex: "my-application"
);
settings.MapDefaultTypeIndices(d => d.Add(typeof(Person), "constIndex"));
var client = new ElasticClient(settings);
var person = new Person
{
Id = "1",
Firstname = "Martijn",
Lastname = "Laarman"
};
var index = client.Index(person);
}
出错提示为:
{StatusCode: 400,
Method: PUT,
Url: http://localhost:9200/constIndex/automobile/1,
Request: {
"firstname": "Martijn",
"lastname": "Laarman",
"id": "1"
},
Response: <Response stream not captured or already read to completion by serializer, set ExposeRawResponse() on connectionsettings to force it to be set on>}
using System;
using Nest;
namespace ConsoleApplication1
{
class Program
{
private static IIndexResponse Index<T>(T person, string indexName) where T : class
{
var node = new Uri("http://localhost:9200");
var settings = new ConnectionSettings(node,defaultIndex: indexName);
var client = new ElasticClient(settings);
return client.Index(person);
}
static void Main()
{
string indexNameError = typeof(Person).FullName
.Substring(typeof(Person).FullName.LastIndexOf(".", StringComparison.Ordinal) + 1) + "Indexs";
const string indexNameOk = "test";
var person = new Person
{
Id = "1",
Firstname = "Martijn",
Lastname = "Laarman"
};
var ok = Index(person, indexNameOk);
Console.WriteLine("Result:" + ok.IsValid);
var error = Index(person, indexNameError);
Console.WriteLine("Result:" + error.IsValid);
Console.ReadKey();
}
}
[Serializable]
public class Person
{
public string Firstname { get; set; }
public string Lastname { get; set; }
public string Id { get; set; }
}
}
nest驱动访问ES,按照官网文档,使用如下程序可以正常索引:
static void Main()
{
var node = new Uri("http://localhost:9200");
var settings = new ConnectionSettings(
node,
defaultIndex: "my-application"
);
var client = new ElasticClient(settings);
var person = new Person
{
Id = "1",
Firstname = "Martijn",
Lastname = "Laarman"
};
var index = client.Index(person);
}
调整为手动设置indexName时出错,示例代码如下:
static void Main()
{
var node = new Uri("http://localhost:9200");
var settings = new ConnectionSettings(
node,
defaultIndex: "my-application"
);
settings.MapDefaultTypeIndices(d => d.Add(typeof(Person), "constIndex"));
var client = new ElasticClient(settings);
var person = new Person
{
Id = "1",
Firstname = "Martijn",
Lastname = "Laarman"
};
var index = client.Index(person);
}
出错提示为:
{StatusCode: 400,
Method: PUT,
Url: http://localhost:9200/constIndex/automobile/1,
Request: {
"firstname": "Martijn",
"lastname": "Laarman",
"id": "1"
},
Response: <Response stream not captured or already read to completion by serializer, set ExposeRawResponse() on connectionsettings to force it to be set on>}
收起阅读 »
static void Main()
{
var node = new Uri("http://localhost:9200");
var settings = new ConnectionSettings(
node,
defaultIndex: "my-application"
);
var client = new ElasticClient(settings);
var person = new Person
{
Id = "1",
Firstname = "Martijn",
Lastname = "Laarman"
};
var index = client.Index(person);
}
调整为手动设置indexName时出错,示例代码如下:
static void Main()
{
var node = new Uri("http://localhost:9200");
var settings = new ConnectionSettings(
node,
defaultIndex: "my-application"
);
settings.MapDefaultTypeIndices(d => d.Add(typeof(Person), "constIndex"));
var client = new ElasticClient(settings);
var person = new Person
{
Id = "1",
Firstname = "Martijn",
Lastname = "Laarman"
};
var index = client.Index(person);
}
出错提示为:
{StatusCode: 400,
Method: PUT,
Url: http://localhost:9200/constIndex/automobile/1,
Request: {
"firstname": "Martijn",
"lastname": "Laarman",
"id": "1"
},
Response: <Response stream not captured or already read to completion by serializer, set ExposeRawResponse() on connectionsettings to force it to be set on>}
using System;
using Nest;
namespace ConsoleApplication1
{
class Program
{
private static IIndexResponse Index<T>(T person, string indexName) where T : class
{
var node = new Uri("http://localhost:9200");
var settings = new ConnectionSettings(node,defaultIndex: indexName);
var client = new ElasticClient(settings);
return client.Index(person);
}
static void Main()
{
string indexNameError = typeof(Person).FullName
.Substring(typeof(Person).FullName.LastIndexOf(".", StringComparison.Ordinal) + 1) + "Indexs";
const string indexNameOk = "test";
var person = new Person
{
Id = "1",
Firstname = "Martijn",
Lastname = "Laarman"
};
var ok = Index(person, indexNameOk);
Console.WriteLine("Result:" + ok.IsValid);
var error = Index(person, indexNameError);
Console.WriteLine("Result:" + error.IsValid);
Console.ReadKey();
}
}
[Serializable]
public class Person
{
public string Firstname { get; set; }
public string Lastname { get; set; }
public string Id { get; set; }
}
}
收起阅读 »
cdzhoubin 发表于 : 2016-01-17 21:40
评论 (1)
Packetbeat协议扩展开发教程(3)
书接上回:http://elasticsearch.cn/article/53
前面介绍了Packetbeat的项目结构,今天终于要开始写代码了,想想还是有点小激动呢。(你快点吧,拖半天了)
网络传输两大协议TCP和UDP,我们的所有协议都不离这两种,HTTP、MySQL走的是TCP传输协议,DNS走的是UDP协议,在Packetbeat里面,实现一个自己的协议非常简单,继承并实现这两者对应的接口就行了,我们看一下长什么样:
打开一个现有的UDP和HTTP协议接口定义:
/~/go/src/github.com/elastic/beats/packetbeat/protos/protos.go
UdpProtocolPlugin: UDP协议的接口定义,UDP协议是不需要握手和保障数据可靠性的,扔出去就结束,速度快,不保证数据可靠送达,所以只有ParseUdp一个方法需要实现,比较简单;
ProtocolPlugin:TCP和UDP都需要实现ProtocolPlugin的基础接口,其实就定义了获取端口和初始化接口。
请问:
Packetbeat怎么工作的?
回答:
每一个协议都有一个固定的端口用于通信,你要做的事情就是定义协议端口,然后按协议是TCP还是UDP来实现对应的接口,Packetbeat将会截获指定端口的数据包(Packet),然后如果交给你定义的方法来进行解析,TCP是Parse,UDP是ParseUdp,都在上面的接口定义好的,然后将解析出来的结构化数据封装成Json,然后扔给Elasticsearch,后续的就的如何对这些数据做一些有趣的分析和应用了。
貌似很简单嘛!
进入每个端口的数据包,我们假设是一个自来水管,拧开80端口,哗啦啦出来的全是HTTP请求的数据包,Packetbeat里面Http协议监听的是80端口啊,所有这些包统统都交给Packetbeat里面的Http协议模块来进行解析,Http协议会一个个的检查这些数据包,也就是每个数据包都会调用一次Parse接口,到这里提到了传过来一个Packet,我们看看它的数据结构长什么样?
Ts是收到数据包的时间戳;
Tuple是一个来源IP+来源端口和目的IP+目的端口的元组;
Payload就是这个包里面的传输的有用的数据,应用层的字节数据,不包括IP和TCP/UDP头信息,是不是处理起来简单许多。
首选我们确定SMTP协议的配置,每个协议在packetbeat.yml的protocol下面都应该有一个配置节点,如下:
这里就是解析SMTP协议的地方,也是我们扩展协议的主要的工作。
说实话TCP处理起来很难,开始之前,我们先明确几个概念,TCP协议是有状态的,并且是流式的,我们关注的是七层应用层的消息,如HTTP里面的一个HTTP请求和返回,但是TCP底层都是一系列数据包,并且不同的请求的数据包是混杂在一起的,也就是说一个数据包里面可能只是一个HTTP请求的一部分也可能包含多条HTTP请求的一部分,所以Parse()里面需要处理跨数据包的状态信息,我们要把这些数据包和具体的七层的应用层的消息关联起来。
现在我们仔细看看Parse()接口的各个参数定义是做什么用的
下面看段MySQL模块里面的例子
我们再继续看后续怎么处理这些包的一个逻辑例子
好的,我们看看SMTP协议怎么折腾吧,先看看一个邮件交互的流程图,来自RFC5321:
由上图可见,发送端和邮件服务器通过一系列命令来执行邮件的发送,下面看看一个具体的命令操作流程(来源:简单邮件传输协议)[/url]
MAIL:开始一份邮件 mail from: xxx@xx.com
RCPT: 标识单个的邮件接收人;常在mail命令后面 可有多个rcpt to: xx@xx.com
QUIT:结束SMTP会话,不一定发送了邮件,注意
RESET:重置会话,当前传输被取消
最终希望通过Packetbeat将这些数据解析并处理成我们想要的如下JSON数据,即大功告成:
Ctrl+F找到SMTP的下载地址:smtp.pcap
用wireshark打开我们刚刚下载的smtp.pcap文件,然后再输入过滤条件:tcp.port == 25,只看25端口的数据,如下图:
上图可以看到25端口的跑的数据有很多,不过我们只关心我们需要的那几个命令就好了。
打开/~/go/src/github.com/elastic/beats/packetbeat/protos/smtp/smtp.go
定义smtpPrivateData,里面的Data是一个数组,分别是TCP两个方向的数据,SmtpMessage是解析出来的邮件信息
留待下回分解。
继续阅读 »
前面介绍了Packetbeat的项目结构,今天终于要开始写代码了,想想还是有点小激动呢。(你快点吧,拖半天了)
网络传输两大协议TCP和UDP,我们的所有协议都不离这两种,HTTP、MySQL走的是TCP传输协议,DNS走的是UDP协议,在Packetbeat里面,实现一个自己的协议非常简单,继承并实现这两者对应的接口就行了,我们看一下长什么样:
打开一个现有的UDP和HTTP协议接口定义:
/~/go/src/github.com/elastic/beats/packetbeat/protos/protos.go
// Functions to be exported by a protocol plugin
type ProtocolPlugin interface {
// Called to initialize the Plugin
Init(test_mode bool, results publisher.Client) error
// Called to return the configured ports
GetPorts() int
}
type TcpProtocolPlugin interface {
ProtocolPlugin
// Called when TCP payload data is available for parsing.
Parse(pkt *Packet, tcptuple *common.TcpTuple,
dir uint8, private ProtocolData) ProtocolData
// Called when the FIN flag is seen in the TCP stream.
ReceivedFin(tcptuple *common.TcpTuple, dir uint8,
private ProtocolData) ProtocolData
// Called when a packets are missing from the tcp
// stream.
GapInStream(tcptuple *common.TcpTuple, dir uint8, nbytes int,
private ProtocolData) (priv ProtocolData, drop bool)
// ConnectionTimeout returns the per stream connection timeout.
// Return <=0 to set default tcp module transaction timeout.
ConnectionTimeout() time.Duration
}
type UdpProtocolPlugin interface {
ProtocolPlugin
// ParseUdp is invoked when UDP payload data is available for parsing.
ParseUdp(pkt *Packet)
}
TcpProtocolPlugin:TCP协议插件的接口定义,依次是:Parse() 解析Packet,ReceivedFin()处理TCP断开连接,GapInStream()处理空包丢包,ConnectionTimeout()超时时间;UdpProtocolPlugin: UDP协议的接口定义,UDP协议是不需要握手和保障数据可靠性的,扔出去就结束,速度快,不保证数据可靠送达,所以只有ParseUdp一个方法需要实现,比较简单;
ProtocolPlugin:TCP和UDP都需要实现ProtocolPlugin的基础接口,其实就定义了获取端口和初始化接口。
请问:
Packetbeat怎么工作的?
回答:
每一个协议都有一个固定的端口用于通信,你要做的事情就是定义协议端口,然后按协议是TCP还是UDP来实现对应的接口,Packetbeat将会截获指定端口的数据包(Packet),然后如果交给你定义的方法来进行解析,TCP是Parse,UDP是ParseUdp,都在上面的接口定义好的,然后将解析出来的结构化数据封装成Json,然后扔给Elasticsearch,后续的就的如何对这些数据做一些有趣的分析和应用了。
貌似很简单嘛!
进入每个端口的数据包,我们假设是一个自来水管,拧开80端口,哗啦啦出来的全是HTTP请求的数据包,Packetbeat里面Http协议监听的是80端口啊,所有这些包统统都交给Packetbeat里面的Http协议模块来进行解析,Http协议会一个个的检查这些数据包,也就是每个数据包都会调用一次Parse接口,到这里提到了传过来一个Packet,我们看看它的数据结构长什么样?
type Packet struct {
Ts time.Time
Tuple common.IpPortTuple
Payload byte
}
Packet结构简单,Ts是收到数据包的时间戳;
Tuple是一个来源IP+来源端口和目的IP+目的端口的元组;
Payload就是这个包里面的传输的有用的数据,应用层的字节数据,不包括IP和TCP/UDP头信息,是不是处理起来简单许多。
首选我们确定SMTP协议的配置,每个协议在packetbeat.yml的protocol下面都应该有一个配置节点,如下:
protocols:
smtp:
# Configure the ports where to listen for Smtp traffic. You can disable
# the Smtp protocol by commenting out the list of ports.
ports: [25]
还需要在对应的config类文件:packetbeat/config/config.go,增加SMTP的结构体,目前只支持一个端口参数,继承基类ProtocolCommon就行,如下:git diff config/config.go
@@ -42,6 +42,7 @@ type Protocols struct {
Pgsql Pgsql
Redis Redis
Thrift Thrift
+ Smtp Smtp
}
type Dns struct {
@@ -118,5 +119,9 @@ type Redis struct {
Send_response *bool
}
+type Smtp struct {
+ ProtocolCommon `yaml:",inline"`
+}
+
// Config Singleton
var ConfigSingleton Config
在protos文件夹下面,新增smtp目录,并新增空白文件smtp.go,路径:packetbeat/protos/smtp/smtp.go,这里就是解析SMTP协议的地方,也是我们扩展协议的主要的工作。
...TODO...
修改protos/protos.go,增加SMTP协议枚举,这里记得保证顺序一致,并且protocol名称必须和配置的节点名称一致,如这里都是smtp。git diff protos/protos.go
@@ -103,6 +103,7 @@ const (
MongodbProtocol
DnsProtocol
MemcacheProtocol
+ SmtpProtocol
)
// Protocol names
@@ -116,6 +117,7 @@ var ProtocolNames = string{
"mongodb",
"dns",
"memcache",
+ "smtp",
}
继续修改packetbeat.go主文件,允许SMTP协议并加载。git diff packetbeat.go
@@ -27,6 +27,7 @@ import (
"github.com/elastic/packetbeat/protos/tcp"
"github.com/elastic/packetbeat/protos/thrift"
"github.com/elastic/packetbeat/protos/udp"
+ "github.com/elastic/packetbeat/protos/smtp"
"github.com/elastic/packetbeat/sniffer"
)
@@ -43,6 +44,7 @@ var EnabledProtocolPlugins map[protos.Protocol]protos.ProtocolPlugin = map[proto
protos.ThriftProtocol: new(thrift.Thrift),
protos.MongodbProtocol: new(mongodb.Mongodb),
protos.DnsProtocol: new(dns.Dns),
+ protos.SmtpProtocol: new(smtp.Smtp),
}
做完上面一系列修改之后,一个空白的SMTP协议的插件的架子就搭好了,并且插件也注册到了Packetbeat里面了,接下来我们再把packetbeat/protos/smtp/smtp.go按照TCPplugin接口的要求实现一下。说实话TCP处理起来很难,开始之前,我们先明确几个概念,TCP协议是有状态的,并且是流式的,我们关注的是七层应用层的消息,如HTTP里面的一个HTTP请求和返回,但是TCP底层都是一系列数据包,并且不同的请求的数据包是混杂在一起的,也就是说一个数据包里面可能只是一个HTTP请求的一部分也可能包含多条HTTP请求的一部分,所以Parse()里面需要处理跨数据包的状态信息,我们要把这些数据包和具体的七层的应用层的消息关联起来。
现在我们仔细看看Parse()接口的各个参数定义是做什么用的
Parse(pkt *Packet, tcptuple *common.TcpTuple,
dir uint8, private ProtocolData) ProtocolData
pkt不用说了,是送进来的数据包,前面已经介绍了其数据结构,tcptuple是该数据包所属的TCP数据流所在的唯一标示(一个未关闭的TCP数据量包含若干数据包,直到TCP链接关闭),使用tcptuple.Hashable()获取唯一值;dir参数标示数据包在TCP数据流中的流向,和第一个TCP数据包方向一致是TcpDirectionOriginal,否则是TcpDirectionReverse;private参数可用来在TCP流中存储状态信息,可在运行时转换成具体的强类型,任意修改和传递给下一个Parse方法,简单来说就是进行中间数据的共享。下面看段MySQL模块里面的例子
priv := mysqlPrivateData{}
if private != nil {
var ok bool
priv, ok = private.(mysqlPrivateData)
if !ok {
priv = mysqlPrivateData{}
}
}
[ ... ]
return priv
上面的代码就是将private强制转换成mysqlPrivateData结构,然后再使用。我们再继续看后续怎么处理这些包的一个逻辑例子
ok, complete := mysqlMessageParser(priv.Data[dir])
if !ok {
// drop this tcp stream. Will retry parsing with the next
// segment in it
priv.Data[dir] = nil
logp.Debug("mysql", "Ignore MySQL message. Drop tcp stream.")
return priv
}
if complete {
mysql.messageComplete(tcptuple, dir, stream)
} else {
// wait for more data
break
}
mysqlMessageParser是一个解析mysql消息的方法,细节我们忽略,我们只需要关心它的返回,ok标示成功或者失败,true则继续处理,false表示数据包不能用,那就直接忽略;第二个参数complete表示判断这一个MySQL消息是否已经完整了,如果完整了,我们就可以扔出去了,否则继续等待剩下的消息内容。好的,我们看看SMTP协议怎么折腾吧,先看看一个邮件交互的流程图,来自RFC5321:
由上图可见,发送端和邮件服务器通过一系列命令来执行邮件的发送,下面看看一个具体的命令操作流程(来源:简单邮件传输协议)[/url]
S: 220 www.example.com ESMTP Postfix
C: HELO mydomain.com
S: 250 Hello mydomain.com
C: MAIL FROM:
S: 250 Ok
C: RCPT TO:
S: 250 Ok
C: DATA
S: 354 End data with .
C: Subject: test message
C: From:""< sender@mydomain.com>
C: To:""< friend@example.com>
C:
C: Hello,
C: This is a test.
C: Goodbye.
C: .
S: 250 Ok: queued as 12345
C: quit
S: 221 Bye
上面的过程可以看到就几个命令就能将邮件发送出去,但是其实SMTP协议比较复杂,还包括身份认证、附件、多媒体编码等等,我们今天精简一下,我们目前只关心谁给谁发了邮件,发送内容先不管,这样相比完整的SMTP协议(RFC5321),我们只需要关注以下几个命令:MAIL:开始一份邮件 mail from: xxx@xx.com
RCPT: 标识单个的邮件接收人;常在mail命令后面 可有多个rcpt to: xx@xx.com
QUIT:结束SMTP会话,不一定发送了邮件,注意
RESET:重置会话,当前传输被取消
最终希望通过Packetbeat将这些数据解析并处理成我们想要的如下JSON数据,即大功告成:
{
"timestamp":"2016-1-15 12:00:00",
"from":"medcl@example.co",
"to":["lcdem@example.co"]
}
我们还需要一个测试数据,这里有一个下载各种协议测试数据包的地方,由wireshark站点提供:https://wiki.wireshark.org/SampleCaptures/Ctrl+F找到SMTP的下载地址:smtp.pcap
用wireshark打开我们刚刚下载的smtp.pcap文件,然后再输入过滤条件:tcp.port == 25,只看25端口的数据,如下图:
上图可以看到25端口的跑的数据有很多,不过我们只关心我们需要的那几个命令就好了。
打开/~/go/src/github.com/elastic/beats/packetbeat/protos/smtp/smtp.go
定义smtpPrivateData,里面的Data是一个数组,分别是TCP两个方向的数据,SmtpMessage是解析出来的邮件信息
type smtpPrivateData struct{
Data [2]*SmtpStream
}
type SmtpStream struct {
tcptuple *common.TcpTuple
data byte
parseOffset int
isClient bool
message *SmtpMessage
}
type SmtpMessage struct {
Ts time.Time
From string
To string
}
然后参照MySQL协议,定义相应的方法,最终如下:package smtp
import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/packetbeat/config"
"github.com/elastic/beats/packetbeat/protos"
"github.com/elastic/beats/packetbeat/protos/tcp"
"bytes"
"time"
"strings"
)
type smtpPrivateData struct{
Data [2]*SmtpStream
}
type SmtpStream struct {
tcptuple *common.TcpTuple
data byte
parseOffset int
isClient bool
message *SmtpMessage
}
type SmtpMessage struct {
start int
end int
Ts time.Time
From string
To string
IgnoreMessage bool
}
type Smtp struct {
SendRequest bool
SendResponse bool
transactionTimeout time.Duration
Ports int
results publisher.Client
}
func (smtp *Smtp) initDefaults() {
smtp.SendRequest = false
smtp.SendResponse = false
smtp.transactionTimeout = protos.DefaultTransactionExpiration
}
func (smtp *Smtp) setFromConfig(config config.Smtp) error {
smtp.Ports = config.Ports
if config.SendRequest != nil {
smtp.SendRequest = *config.SendRequest
}
if config.SendResponse != nil {
smtp.SendResponse = *config.SendResponse
}
if config.TransactionTimeout != nil && *config.TransactionTimeout > 0 {
smtp.transactionTimeout = time.Duration(*config.TransactionTimeout) * time.Second
}
return nil
}
func (smtp *Smtp) GetPorts() int {
return smtp.Ports
}
func (smtp *Smtp) Init(test_mode bool, results publisher.Client) error {
smtp.initDefaults()
if !test_mode {
err := smtp.setFromConfig(config.ConfigSingleton.Protocols.Smtp)
if err != nil {
return err
}
}
smtp.results = results
return nil
}
func readLine(data byte, offset int) (bool, string, int) {
q := bytes.Index(data[offset:], byte("\r\n"))
if q == -1 {
return false, "", 0
}
return true, string(data[offset : offset+q]), offset + q + 2
}
func (smtp *Smtp) Parse(pkt *protos.Packet, tcptuple *common.TcpTuple, dir uint8, private protos.ProtocolData, ) protos.ProtocolData {
defer logp.Recover("ParseSmtp exception")
priv := smtpPrivateData{}
if private != nil {
var ok bool
priv, ok = private.(smtpPrivateData)
if !ok {
priv = smtpPrivateData{}
}
}
if priv.Data[dir] == nil {
priv.Data[dir] = &SmtpStream{
tcptuple: tcptuple,
data: pkt.Payload,
message: &SmtpMessage{Ts: pkt.Ts},
}
} else {
// concatenate bytes
priv.Data[dir].data = append(priv.Data[dir].data, pkt.Payload...)
if len(priv.Data[dir].data) > tcp.TCP_MAX_DATA_IN_STREAM {
logp.Debug("smtp", "Stream data too large, dropping TCP stream")
priv.Data[dir] = nil
return priv
}
}
stream := priv.Data[dir]
for len(stream.data) > 0 {
if stream.message == nil {
stream.message = &SmtpMessage{Ts: pkt.Ts}
}
ok, complete := stmpMessageParser(priv.Data[dir])
if !ok {
// drop this tcp stream. Will retry parsing with the next
// segment in it
priv.Data[dir] = nil
logp.Debug("smtp", "Ignore SMTP message. Drop tcp stream. Try parsing with the next segment")
return priv
}
if complete {
smtp.messageComplete(tcptuple, dir, stream)
} else {
logp.Debug("smtp","still wait message...")
// wait for more data
break
}
}
return priv
}
func (smtp *Smtp) ConnectionTimeout() time.Duration {
return smtp.transactionTimeout
}
func stmpMessageParser(s *SmtpStream) (bool, bool) {
var value string=""
for s.parseOffset < len(s.data) {
logp.Debug("smtp", "Parse message: %s", string(s.data[s.parseOffset]))
if strings.HasPrefix(string(s.data[s.parseOffset]),"MAIL" ) {
logp.Debug("smtp", "Hit MAIL command: %s", string(s.data[s.parseOffset]))
found, line, off := readLine(s.data, s.parseOffset)
if !found {
return true, false
}
value = line[1:]
logp.Debug("smtp", "value %s", value)
s.parseOffset = off
} else {
logp.Debug("smtp", "Unexpected message starting with %s", s.data[s.parseOffset:])
return false, false
}
}
return true, false
}
func handleSmtp(stmp *Smtp, m *SmtpMessage, tcptuple *common.TcpTuple,
dir uint8, raw_msg byte) {
logp.Info("smtp","handle smtp message...")
//TODO
}
// Called when the parser has identified a full message.
func (smtp *Smtp) messageComplete(tcptuple *common.TcpTuple, dir uint8, stream *SmtpStream) {
logp.Info("smtp","message completed...")
// all ok, ship it
msg := stream.data[stream.message.start:stream.message.end]
if !stream.message.IgnoreMessage {
handleSmtp(smtp, stream.message, tcptuple, dir, msg)
}
// and reset message
stream.PrepareForNewMessage()
}
func (stream *SmtpStream) PrepareForNewMessage() {
logp.Info("smtp","prepare for new message...")
stream.data = stream.data[stream.parseOffset:]
stream.parseOffset = 0
stream.isClient = false
stream.message = nil
}
func (smtp *Smtp) GapInStream(tcptuple *common.TcpTuple, dir uint8,
nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool) {
defer logp.Recover("GapInStream(smtp) exception")
if private == nil {
return private, false
}
return private, true
}
func (smtp *Smtp) ReceivedFin(tcptuple *common.TcpTuple, dir uint8,
private protos.ProtocolData) protos.ProtocolData {
logp.Info("smtp","stream closed...")
// TODO: check if we have data pending and either drop it to free
// memory or send it up the stack.
return private
}
现在切换到命令行,编译一下cd ~/go/src/github.com/elastic/beats/packetbeat
make
编译成功,一个滚烫的packetbeat可执行文件就躺在当前目录下了,运行一下先,参数-I 指定pcap文件(还记得前面下载的那个测试文件吧)./packetbeat -d "smtp" -c etc/packetbeat.yml -I ~/Downloads/smtp.pcap -e -N
运行查看控制台输出结果:➜ packetbeat git:(smtpbeat) ✗ ./packetbeat -d "smtp" -c etc/packetbeat.yml -I ~/Downloads/smtp.pcap -e -N
2016/01/15 10:12:19.058535 publish.go:191: INFO Dry run mode. All output types except the file based one are disabled.
2016/01/15 10:12:19.058570 geolite.go:24: INFO GeoIP disabled: No paths were set under output.geoip.paths
2016/01/15 10:12:19.058592 publish.go:262: INFO Publisher name: medcls-MacBook.local
2016/01/15 10:12:19.058724 beat.go:145: INFO Init Beat: packetbeat; Version: 1.0.0
2016/01/15 10:12:19.059758 beat.go:171: INFO packetbeat sucessfully setup. Start running.
2016/01/15 10:12:20.155335 smtp.go:163: DBG Parse message: 2
2016/01/15 10:12:20.155416 smtp.go:180: DBG Unexpected message starting with 250-xc90.websitewelcome.com Hello GP [122.162.143.157]
250-SIZE 52428800
250-PIPELINING
250-AUTH PLAIN LOGIN
250-STARTTLS
250 HELP
2016/01/15 10:12:22.310974 smtp.go:163: DBG Parse message: F
2016/01/15 10:12:22.311025 smtp.go:180: DBG Unexpected message starting with From: "Gurpartap Singh"
To:
Subject: SMTP
Date: Mon, 5 Oct 2009 11:36:07 +0530
Message-ID: <000301ca4581$ef9e57f0$cedb07d0$@in>
MIME-Version: 1.0
...
成功了,邮件内容都在控制台输出了,但这还不是我们要的最终结果,我需要里面的关键信息,我们继续修改smtp.go这个文件。留待下回分解。
书接上回:http://elasticsearch.cn/article/53
前面介绍了Packetbeat的项目结构,今天终于要开始写代码了,想想还是有点小激动呢。(你快点吧,拖半天了)
网络传输两大协议TCP和UDP,我们的所有协议都不离这两种,HTTP、MySQL走的是TCP传输协议,DNS走的是UDP协议,在Packetbeat里面,实现一个自己的协议非常简单,继承并实现这两者对应的接口就行了,我们看一下长什么样:
打开一个现有的UDP和HTTP协议接口定义:
/~/go/src/github.com/elastic/beats/packetbeat/protos/protos.go
UdpProtocolPlugin: UDP协议的接口定义,UDP协议是不需要握手和保障数据可靠性的,扔出去就结束,速度快,不保证数据可靠送达,所以只有ParseUdp一个方法需要实现,比较简单;
ProtocolPlugin:TCP和UDP都需要实现ProtocolPlugin的基础接口,其实就定义了获取端口和初始化接口。
请问:
Packetbeat怎么工作的?
回答:
每一个协议都有一个固定的端口用于通信,你要做的事情就是定义协议端口,然后按协议是TCP还是UDP来实现对应的接口,Packetbeat将会截获指定端口的数据包(Packet),然后如果交给你定义的方法来进行解析,TCP是Parse,UDP是ParseUdp,都在上面的接口定义好的,然后将解析出来的结构化数据封装成Json,然后扔给Elasticsearch,后续的就的如何对这些数据做一些有趣的分析和应用了。
貌似很简单嘛!
进入每个端口的数据包,我们假设是一个自来水管,拧开80端口,哗啦啦出来的全是HTTP请求的数据包,Packetbeat里面Http协议监听的是80端口啊,所有这些包统统都交给Packetbeat里面的Http协议模块来进行解析,Http协议会一个个的检查这些数据包,也就是每个数据包都会调用一次Parse接口,到这里提到了传过来一个Packet,我们看看它的数据结构长什么样?
Ts是收到数据包的时间戳;
Tuple是一个来源IP+来源端口和目的IP+目的端口的元组;
Payload就是这个包里面的传输的有用的数据,应用层的字节数据,不包括IP和TCP/UDP头信息,是不是处理起来简单许多。
首选我们确定SMTP协议的配置,每个协议在packetbeat.yml的protocol下面都应该有一个配置节点,如下:
这里就是解析SMTP协议的地方,也是我们扩展协议的主要的工作。
说实话TCP处理起来很难,开始之前,我们先明确几个概念,TCP协议是有状态的,并且是流式的,我们关注的是七层应用层的消息,如HTTP里面的一个HTTP请求和返回,但是TCP底层都是一系列数据包,并且不同的请求的数据包是混杂在一起的,也就是说一个数据包里面可能只是一个HTTP请求的一部分也可能包含多条HTTP请求的一部分,所以Parse()里面需要处理跨数据包的状态信息,我们要把这些数据包和具体的七层的应用层的消息关联起来。
现在我们仔细看看Parse()接口的各个参数定义是做什么用的
下面看段MySQL模块里面的例子
我们再继续看后续怎么处理这些包的一个逻辑例子
好的,我们看看SMTP协议怎么折腾吧,先看看一个邮件交互的流程图,来自RFC5321:
由上图可见,发送端和邮件服务器通过一系列命令来执行邮件的发送,下面看看一个具体的命令操作流程(来源:简单邮件传输协议)[/url]
MAIL:开始一份邮件 mail from: xxx@xx.com
RCPT: 标识单个的邮件接收人;常在mail命令后面 可有多个rcpt to: xx@xx.com
QUIT:结束SMTP会话,不一定发送了邮件,注意
RESET:重置会话,当前传输被取消
最终希望通过Packetbeat将这些数据解析并处理成我们想要的如下JSON数据,即大功告成:
Ctrl+F找到SMTP的下载地址:smtp.pcap
用wireshark打开我们刚刚下载的smtp.pcap文件,然后再输入过滤条件:tcp.port == 25,只看25端口的数据,如下图:
上图可以看到25端口的跑的数据有很多,不过我们只关心我们需要的那几个命令就好了。
打开/~/go/src/github.com/elastic/beats/packetbeat/protos/smtp/smtp.go
定义smtpPrivateData,里面的Data是一个数组,分别是TCP两个方向的数据,SmtpMessage是解析出来的邮件信息
留待下回分解。 收起阅读 »
前面介绍了Packetbeat的项目结构,今天终于要开始写代码了,想想还是有点小激动呢。(你快点吧,拖半天了)
网络传输两大协议TCP和UDP,我们的所有协议都不离这两种,HTTP、MySQL走的是TCP传输协议,DNS走的是UDP协议,在Packetbeat里面,实现一个自己的协议非常简单,继承并实现这两者对应的接口就行了,我们看一下长什么样:
打开一个现有的UDP和HTTP协议接口定义:
/~/go/src/github.com/elastic/beats/packetbeat/protos/protos.go
// Functions to be exported by a protocol plugin
type ProtocolPlugin interface {
// Called to initialize the Plugin
Init(test_mode bool, results publisher.Client) error
// Called to return the configured ports
GetPorts() int
}
type TcpProtocolPlugin interface {
ProtocolPlugin
// Called when TCP payload data is available for parsing.
Parse(pkt *Packet, tcptuple *common.TcpTuple,
dir uint8, private ProtocolData) ProtocolData
// Called when the FIN flag is seen in the TCP stream.
ReceivedFin(tcptuple *common.TcpTuple, dir uint8,
private ProtocolData) ProtocolData
// Called when a packets are missing from the tcp
// stream.
GapInStream(tcptuple *common.TcpTuple, dir uint8, nbytes int,
private ProtocolData) (priv ProtocolData, drop bool)
// ConnectionTimeout returns the per stream connection timeout.
// Return <=0 to set default tcp module transaction timeout.
ConnectionTimeout() time.Duration
}
type UdpProtocolPlugin interface {
ProtocolPlugin
// ParseUdp is invoked when UDP payload data is available for parsing.
ParseUdp(pkt *Packet)
}
TcpProtocolPlugin:TCP协议插件的接口定义,依次是:Parse() 解析Packet,ReceivedFin()处理TCP断开连接,GapInStream()处理空包丢包,ConnectionTimeout()超时时间;UdpProtocolPlugin: UDP协议的接口定义,UDP协议是不需要握手和保障数据可靠性的,扔出去就结束,速度快,不保证数据可靠送达,所以只有ParseUdp一个方法需要实现,比较简单;
ProtocolPlugin:TCP和UDP都需要实现ProtocolPlugin的基础接口,其实就定义了获取端口和初始化接口。
请问:
Packetbeat怎么工作的?
回答:
每一个协议都有一个固定的端口用于通信,你要做的事情就是定义协议端口,然后按协议是TCP还是UDP来实现对应的接口,Packetbeat将会截获指定端口的数据包(Packet),然后如果交给你定义的方法来进行解析,TCP是Parse,UDP是ParseUdp,都在上面的接口定义好的,然后将解析出来的结构化数据封装成Json,然后扔给Elasticsearch,后续的就的如何对这些数据做一些有趣的分析和应用了。
貌似很简单嘛!
进入每个端口的数据包,我们假设是一个自来水管,拧开80端口,哗啦啦出来的全是HTTP请求的数据包,Packetbeat里面Http协议监听的是80端口啊,所有这些包统统都交给Packetbeat里面的Http协议模块来进行解析,Http协议会一个个的检查这些数据包,也就是每个数据包都会调用一次Parse接口,到这里提到了传过来一个Packet,我们看看它的数据结构长什么样?
type Packet struct {
Ts time.Time
Tuple common.IpPortTuple
Payload byte
}
Packet结构简单,Ts是收到数据包的时间戳;
Tuple是一个来源IP+来源端口和目的IP+目的端口的元组;
Payload就是这个包里面的传输的有用的数据,应用层的字节数据,不包括IP和TCP/UDP头信息,是不是处理起来简单许多。
首选我们确定SMTP协议的配置,每个协议在packetbeat.yml的protocol下面都应该有一个配置节点,如下:
protocols:
smtp:
# Configure the ports where to listen for Smtp traffic. You can disable
# the Smtp protocol by commenting out the list of ports.
ports: [25]
还需要在对应的config类文件:packetbeat/config/config.go,增加SMTP的结构体,目前只支持一个端口参数,继承基类ProtocolCommon就行,如下:git diff config/config.go
@@ -42,6 +42,7 @@ type Protocols struct {
Pgsql Pgsql
Redis Redis
Thrift Thrift
+ Smtp Smtp
}
type Dns struct {
@@ -118,5 +119,9 @@ type Redis struct {
Send_response *bool
}
+type Smtp struct {
+ ProtocolCommon `yaml:",inline"`
+}
+
// Config Singleton
var ConfigSingleton Config
在protos文件夹下面,新增smtp目录,并新增空白文件smtp.go,路径:packetbeat/protos/smtp/smtp.go,这里就是解析SMTP协议的地方,也是我们扩展协议的主要的工作。
...TODO...
修改protos/protos.go,增加SMTP协议枚举,这里记得保证顺序一致,并且protocol名称必须和配置的节点名称一致,如这里都是smtp。git diff protos/protos.go
@@ -103,6 +103,7 @@ const (
MongodbProtocol
DnsProtocol
MemcacheProtocol
+ SmtpProtocol
)
// Protocol names
@@ -116,6 +117,7 @@ var ProtocolNames = string{
"mongodb",
"dns",
"memcache",
+ "smtp",
}
继续修改packetbeat.go主文件,允许SMTP协议并加载。git diff packetbeat.go
@@ -27,6 +27,7 @@ import (
"github.com/elastic/packetbeat/protos/tcp"
"github.com/elastic/packetbeat/protos/thrift"
"github.com/elastic/packetbeat/protos/udp"
+ "github.com/elastic/packetbeat/protos/smtp"
"github.com/elastic/packetbeat/sniffer"
)
@@ -43,6 +44,7 @@ var EnabledProtocolPlugins map[protos.Protocol]protos.ProtocolPlugin = map[proto
protos.ThriftProtocol: new(thrift.Thrift),
protos.MongodbProtocol: new(mongodb.Mongodb),
protos.DnsProtocol: new(dns.Dns),
+ protos.SmtpProtocol: new(smtp.Smtp),
}
做完上面一系列修改之后,一个空白的SMTP协议的插件的架子就搭好了,并且插件也注册到了Packetbeat里面了,接下来我们再把packetbeat/protos/smtp/smtp.go按照TCPplugin接口的要求实现一下。说实话TCP处理起来很难,开始之前,我们先明确几个概念,TCP协议是有状态的,并且是流式的,我们关注的是七层应用层的消息,如HTTP里面的一个HTTP请求和返回,但是TCP底层都是一系列数据包,并且不同的请求的数据包是混杂在一起的,也就是说一个数据包里面可能只是一个HTTP请求的一部分也可能包含多条HTTP请求的一部分,所以Parse()里面需要处理跨数据包的状态信息,我们要把这些数据包和具体的七层的应用层的消息关联起来。
现在我们仔细看看Parse()接口的各个参数定义是做什么用的
Parse(pkt *Packet, tcptuple *common.TcpTuple,
dir uint8, private ProtocolData) ProtocolData
pkt不用说了,是送进来的数据包,前面已经介绍了其数据结构,tcptuple是该数据包所属的TCP数据流所在的唯一标示(一个未关闭的TCP数据量包含若干数据包,直到TCP链接关闭),使用tcptuple.Hashable()获取唯一值;dir参数标示数据包在TCP数据流中的流向,和第一个TCP数据包方向一致是TcpDirectionOriginal,否则是TcpDirectionReverse;private参数可用来在TCP流中存储状态信息,可在运行时转换成具体的强类型,任意修改和传递给下一个Parse方法,简单来说就是进行中间数据的共享。下面看段MySQL模块里面的例子
priv := mysqlPrivateData{}
if private != nil {
var ok bool
priv, ok = private.(mysqlPrivateData)
if !ok {
priv = mysqlPrivateData{}
}
}
[ ... ]
return priv
上面的代码就是将private强制转换成mysqlPrivateData结构,然后再使用。我们再继续看后续怎么处理这些包的一个逻辑例子
ok, complete := mysqlMessageParser(priv.Data[dir])
if !ok {
// drop this tcp stream. Will retry parsing with the next
// segment in it
priv.Data[dir] = nil
logp.Debug("mysql", "Ignore MySQL message. Drop tcp stream.")
return priv
}
if complete {
mysql.messageComplete(tcptuple, dir, stream)
} else {
// wait for more data
break
}
mysqlMessageParser是一个解析mysql消息的方法,细节我们忽略,我们只需要关心它的返回,ok标示成功或者失败,true则继续处理,false表示数据包不能用,那就直接忽略;第二个参数complete表示判断这一个MySQL消息是否已经完整了,如果完整了,我们就可以扔出去了,否则继续等待剩下的消息内容。好的,我们看看SMTP协议怎么折腾吧,先看看一个邮件交互的流程图,来自RFC5321:
由上图可见,发送端和邮件服务器通过一系列命令来执行邮件的发送,下面看看一个具体的命令操作流程(来源:简单邮件传输协议)[/url]
S: 220 www.example.com ESMTP Postfix
C: HELO mydomain.com
S: 250 Hello mydomain.com
C: MAIL FROM:
S: 250 Ok
C: RCPT TO:
S: 250 Ok
C: DATA
S: 354 End data with .
C: Subject: test message
C: From:""< sender@mydomain.com>
C: To:""< friend@example.com>
C:
C: Hello,
C: This is a test.
C: Goodbye.
C: .
S: 250 Ok: queued as 12345
C: quit
S: 221 Bye
上面的过程可以看到就几个命令就能将邮件发送出去,但是其实SMTP协议比较复杂,还包括身份认证、附件、多媒体编码等等,我们今天精简一下,我们目前只关心谁给谁发了邮件,发送内容先不管,这样相比完整的SMTP协议(RFC5321),我们只需要关注以下几个命令:MAIL:开始一份邮件 mail from: xxx@xx.com
RCPT: 标识单个的邮件接收人;常在mail命令后面 可有多个rcpt to: xx@xx.com
QUIT:结束SMTP会话,不一定发送了邮件,注意
RESET:重置会话,当前传输被取消
最终希望通过Packetbeat将这些数据解析并处理成我们想要的如下JSON数据,即大功告成:
{
"timestamp":"2016-1-15 12:00:00",
"from":"medcl@example.co",
"to":["lcdem@example.co"]
}
我们还需要一个测试数据,这里有一个下载各种协议测试数据包的地方,由wireshark站点提供:https://wiki.wireshark.org/SampleCaptures/Ctrl+F找到SMTP的下载地址:smtp.pcap
用wireshark打开我们刚刚下载的smtp.pcap文件,然后再输入过滤条件:tcp.port == 25,只看25端口的数据,如下图:
上图可以看到25端口的跑的数据有很多,不过我们只关心我们需要的那几个命令就好了。
打开/~/go/src/github.com/elastic/beats/packetbeat/protos/smtp/smtp.go
定义smtpPrivateData,里面的Data是一个数组,分别是TCP两个方向的数据,SmtpMessage是解析出来的邮件信息
type smtpPrivateData struct{
Data [2]*SmtpStream
}
type SmtpStream struct {
tcptuple *common.TcpTuple
data byte
parseOffset int
isClient bool
message *SmtpMessage
}
type SmtpMessage struct {
Ts time.Time
From string
To string
}
然后参照MySQL协议,定义相应的方法,最终如下:package smtp
import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/packetbeat/config"
"github.com/elastic/beats/packetbeat/protos"
"github.com/elastic/beats/packetbeat/protos/tcp"
"bytes"
"time"
"strings"
)
type smtpPrivateData struct{
Data [2]*SmtpStream
}
type SmtpStream struct {
tcptuple *common.TcpTuple
data byte
parseOffset int
isClient bool
message *SmtpMessage
}
type SmtpMessage struct {
start int
end int
Ts time.Time
From string
To string
IgnoreMessage bool
}
type Smtp struct {
SendRequest bool
SendResponse bool
transactionTimeout time.Duration
Ports int
results publisher.Client
}
func (smtp *Smtp) initDefaults() {
smtp.SendRequest = false
smtp.SendResponse = false
smtp.transactionTimeout = protos.DefaultTransactionExpiration
}
func (smtp *Smtp) setFromConfig(config config.Smtp) error {
smtp.Ports = config.Ports
if config.SendRequest != nil {
smtp.SendRequest = *config.SendRequest
}
if config.SendResponse != nil {
smtp.SendResponse = *config.SendResponse
}
if config.TransactionTimeout != nil && *config.TransactionTimeout > 0 {
smtp.transactionTimeout = time.Duration(*config.TransactionTimeout) * time.Second
}
return nil
}
func (smtp *Smtp) GetPorts() int {
return smtp.Ports
}
func (smtp *Smtp) Init(test_mode bool, results publisher.Client) error {
smtp.initDefaults()
if !test_mode {
err := smtp.setFromConfig(config.ConfigSingleton.Protocols.Smtp)
if err != nil {
return err
}
}
smtp.results = results
return nil
}
func readLine(data byte, offset int) (bool, string, int) {
q := bytes.Index(data[offset:], byte("\r\n"))
if q == -1 {
return false, "", 0
}
return true, string(data[offset : offset+q]), offset + q + 2
}
func (smtp *Smtp) Parse(pkt *protos.Packet, tcptuple *common.TcpTuple, dir uint8, private protos.ProtocolData, ) protos.ProtocolData {
defer logp.Recover("ParseSmtp exception")
priv := smtpPrivateData{}
if private != nil {
var ok bool
priv, ok = private.(smtpPrivateData)
if !ok {
priv = smtpPrivateData{}
}
}
if priv.Data[dir] == nil {
priv.Data[dir] = &SmtpStream{
tcptuple: tcptuple,
data: pkt.Payload,
message: &SmtpMessage{Ts: pkt.Ts},
}
} else {
// concatenate bytes
priv.Data[dir].data = append(priv.Data[dir].data, pkt.Payload...)
if len(priv.Data[dir].data) > tcp.TCP_MAX_DATA_IN_STREAM {
logp.Debug("smtp", "Stream data too large, dropping TCP stream")
priv.Data[dir] = nil
return priv
}
}
stream := priv.Data[dir]
for len(stream.data) > 0 {
if stream.message == nil {
stream.message = &SmtpMessage{Ts: pkt.Ts}
}
ok, complete := stmpMessageParser(priv.Data[dir])
if !ok {
// drop this tcp stream. Will retry parsing with the next
// segment in it
priv.Data[dir] = nil
logp.Debug("smtp", "Ignore SMTP message. Drop tcp stream. Try parsing with the next segment")
return priv
}
if complete {
smtp.messageComplete(tcptuple, dir, stream)
} else {
logp.Debug("smtp","still wait message...")
// wait for more data
break
}
}
return priv
}
func (smtp *Smtp) ConnectionTimeout() time.Duration {
return smtp.transactionTimeout
}
func stmpMessageParser(s *SmtpStream) (bool, bool) {
var value string=""
for s.parseOffset < len(s.data) {
logp.Debug("smtp", "Parse message: %s", string(s.data[s.parseOffset]))
if strings.HasPrefix(string(s.data[s.parseOffset]),"MAIL" ) {
logp.Debug("smtp", "Hit MAIL command: %s", string(s.data[s.parseOffset]))
found, line, off := readLine(s.data, s.parseOffset)
if !found {
return true, false
}
value = line[1:]
logp.Debug("smtp", "value %s", value)
s.parseOffset = off
} else {
logp.Debug("smtp", "Unexpected message starting with %s", s.data[s.parseOffset:])
return false, false
}
}
return true, false
}
func handleSmtp(stmp *Smtp, m *SmtpMessage, tcptuple *common.TcpTuple,
dir uint8, raw_msg byte) {
logp.Info("smtp","handle smtp message...")
//TODO
}
// Called when the parser has identified a full message.
func (smtp *Smtp) messageComplete(tcptuple *common.TcpTuple, dir uint8, stream *SmtpStream) {
logp.Info("smtp","message completed...")
// all ok, ship it
msg := stream.data[stream.message.start:stream.message.end]
if !stream.message.IgnoreMessage {
handleSmtp(smtp, stream.message, tcptuple, dir, msg)
}
// and reset message
stream.PrepareForNewMessage()
}
func (stream *SmtpStream) PrepareForNewMessage() {
logp.Info("smtp","prepare for new message...")
stream.data = stream.data[stream.parseOffset:]
stream.parseOffset = 0
stream.isClient = false
stream.message = nil
}
func (smtp *Smtp) GapInStream(tcptuple *common.TcpTuple, dir uint8,
nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool) {
defer logp.Recover("GapInStream(smtp) exception")
if private == nil {
return private, false
}
return private, true
}
func (smtp *Smtp) ReceivedFin(tcptuple *common.TcpTuple, dir uint8,
private protos.ProtocolData) protos.ProtocolData {
logp.Info("smtp","stream closed...")
// TODO: check if we have data pending and either drop it to free
// memory or send it up the stack.
return private
}
现在切换到命令行,编译一下cd ~/go/src/github.com/elastic/beats/packetbeat
make
编译成功,一个滚烫的packetbeat可执行文件就躺在当前目录下了,运行一下先,参数-I 指定pcap文件(还记得前面下载的那个测试文件吧)./packetbeat -d "smtp" -c etc/packetbeat.yml -I ~/Downloads/smtp.pcap -e -N
运行查看控制台输出结果:➜ packetbeat git:(smtpbeat) ✗ ./packetbeat -d "smtp" -c etc/packetbeat.yml -I ~/Downloads/smtp.pcap -e -N
2016/01/15 10:12:19.058535 publish.go:191: INFO Dry run mode. All output types except the file based one are disabled.
2016/01/15 10:12:19.058570 geolite.go:24: INFO GeoIP disabled: No paths were set under output.geoip.paths
2016/01/15 10:12:19.058592 publish.go:262: INFO Publisher name: medcls-MacBook.local
2016/01/15 10:12:19.058724 beat.go:145: INFO Init Beat: packetbeat; Version: 1.0.0
2016/01/15 10:12:19.059758 beat.go:171: INFO packetbeat sucessfully setup. Start running.
2016/01/15 10:12:20.155335 smtp.go:163: DBG Parse message: 2
2016/01/15 10:12:20.155416 smtp.go:180: DBG Unexpected message starting with 250-xc90.websitewelcome.com Hello GP [122.162.143.157]
250-SIZE 52428800
250-PIPELINING
250-AUTH PLAIN LOGIN
250-STARTTLS
250 HELP
2016/01/15 10:12:22.310974 smtp.go:163: DBG Parse message: F
2016/01/15 10:12:22.311025 smtp.go:180: DBG Unexpected message starting with From: "Gurpartap Singh"
To:
Subject: SMTP
Date: Mon, 5 Oct 2009 11:36:07 +0530
Message-ID: <000301ca4581$ef9e57f0$cedb07d0$@in>
MIME-Version: 1.0
...
成功了,邮件内容都在控制台输出了,但这还不是我们要的最终结果,我需要里面的关键信息,我们继续修改smtp.go这个文件。留待下回分解。 收起阅读 »
medcl 发表于 : 2016-01-15 18:32
评论 (12)
Packetbeat协议扩展开发教程(2)
书接上回:http://elasticsearch.cn/article/48
我们打开Packetbeat项目,看看里面长什么样:
现在beats项目都合并在一起了,第一级可以看到各个子项目:
/libbeat: 公共依赖;
/filebeat: 替代Logstash-forwarder,处理日志类型数据;
/packetbeat: 本文扩展重点,网络抓包;
/topbeat: 监控系统性能;
/winlogbeat: 监控windows下面的日志信息;
/vender: 依赖的第三方库;
/tests: 用于测试的pcamp抓包文件,非常有用;
/scripts: 一些用于开发和测试的Docker脚本文件;
现在重点看看/packetbeat下面目录都有些什么:
/packetbeat/main.go: 启动入口,里面没有什么逻辑;
/packetbeat/beat/: 里面就一个packetbeat.go文件,packetbeat主程序,处理配置和命令行参数,协议需要在这里进行注册;
/packetbeat/config/: 里面就一个config.go文件,定义了所有的配置相关的struct结构体,新协议需要在这里定义其配置的结构体;
/packetbeat/debian/: debian打包相关;
/packetbeat/decoder/: 解码类,网络传输层包的解码;
/packetbeat/docs/: 项目的相关文档;
/packetbeat/etc/: 示例配置文件;
/packetbeat/procs/: 获取系统内核运作状态与进程信息的工具类;
/packetbeat/protos/:自定义协议类,每个目录对应一个应用协议,我们需要在此新增我们的协议,如SMTP;
/packetbeat/sniffer/: 三种不同抓包方式的实现:pcap、af_packet、pf_ring,关于这三者的区别,请参照文档:Traffic Capturing Options;
/packetbeat/tests/: 测试相关的文件,里面有每一个协议的pcab抓包样板,还有一堆Python测试脚本;
知道项目的大概架构就知道从哪下手了,下节分解。
继续阅读 »
我们打开Packetbeat项目,看看里面长什么样:
现在beats项目都合并在一起了,第一级可以看到各个子项目:
/libbeat: 公共依赖;
/filebeat: 替代Logstash-forwarder,处理日志类型数据;
/packetbeat: 本文扩展重点,网络抓包;
/topbeat: 监控系统性能;
/winlogbeat: 监控windows下面的日志信息;
/vender: 依赖的第三方库;
/tests: 用于测试的pcamp抓包文件,非常有用;
/scripts: 一些用于开发和测试的Docker脚本文件;
现在重点看看/packetbeat下面目录都有些什么:
/packetbeat/main.go: 启动入口,里面没有什么逻辑;
/packetbeat/beat/: 里面就一个packetbeat.go文件,packetbeat主程序,处理配置和命令行参数,协议需要在这里进行注册;
/packetbeat/config/: 里面就一个config.go文件,定义了所有的配置相关的struct结构体,新协议需要在这里定义其配置的结构体;
/packetbeat/debian/: debian打包相关;
/packetbeat/decoder/: 解码类,网络传输层包的解码;
/packetbeat/docs/: 项目的相关文档;
/packetbeat/etc/: 示例配置文件;
/packetbeat/procs/: 获取系统内核运作状态与进程信息的工具类;
/packetbeat/protos/:自定义协议类,每个目录对应一个应用协议,我们需要在此新增我们的协议,如SMTP;
/packetbeat/sniffer/: 三种不同抓包方式的实现:pcap、af_packet、pf_ring,关于这三者的区别,请参照文档:Traffic Capturing Options;
/packetbeat/tests/: 测试相关的文件,里面有每一个协议的pcab抓包样板,还有一堆Python测试脚本;
知道项目的大概架构就知道从哪下手了,下节分解。
书接上回:http://elasticsearch.cn/article/48
我们打开Packetbeat项目,看看里面长什么样:
现在beats项目都合并在一起了,第一级可以看到各个子项目:
/libbeat: 公共依赖;
/filebeat: 替代Logstash-forwarder,处理日志类型数据;
/packetbeat: 本文扩展重点,网络抓包;
/topbeat: 监控系统性能;
/winlogbeat: 监控windows下面的日志信息;
/vender: 依赖的第三方库;
/tests: 用于测试的pcamp抓包文件,非常有用;
/scripts: 一些用于开发和测试的Docker脚本文件;
现在重点看看/packetbeat下面目录都有些什么:
/packetbeat/main.go: 启动入口,里面没有什么逻辑;
/packetbeat/beat/: 里面就一个packetbeat.go文件,packetbeat主程序,处理配置和命令行参数,协议需要在这里进行注册;
/packetbeat/config/: 里面就一个config.go文件,定义了所有的配置相关的struct结构体,新协议需要在这里定义其配置的结构体;
/packetbeat/debian/: debian打包相关;
/packetbeat/decoder/: 解码类,网络传输层包的解码;
/packetbeat/docs/: 项目的相关文档;
/packetbeat/etc/: 示例配置文件;
/packetbeat/procs/: 获取系统内核运作状态与进程信息的工具类;
/packetbeat/protos/:自定义协议类,每个目录对应一个应用协议,我们需要在此新增我们的协议,如SMTP;
/packetbeat/sniffer/: 三种不同抓包方式的实现:pcap、af_packet、pf_ring,关于这三者的区别,请参照文档:Traffic Capturing Options;
/packetbeat/tests/: 测试相关的文件,里面有每一个协议的pcab抓包样板,还有一堆Python测试脚本;
知道项目的大概架构就知道从哪下手了,下节分解。 收起阅读 »
我们打开Packetbeat项目,看看里面长什么样:
现在beats项目都合并在一起了,第一级可以看到各个子项目:
/libbeat: 公共依赖;
/filebeat: 替代Logstash-forwarder,处理日志类型数据;
/packetbeat: 本文扩展重点,网络抓包;
/topbeat: 监控系统性能;
/winlogbeat: 监控windows下面的日志信息;
/vender: 依赖的第三方库;
/tests: 用于测试的pcamp抓包文件,非常有用;
/scripts: 一些用于开发和测试的Docker脚本文件;
现在重点看看/packetbeat下面目录都有些什么:
/packetbeat/main.go: 启动入口,里面没有什么逻辑;
/packetbeat/beat/: 里面就一个packetbeat.go文件,packetbeat主程序,处理配置和命令行参数,协议需要在这里进行注册;
/packetbeat/config/: 里面就一个config.go文件,定义了所有的配置相关的struct结构体,新协议需要在这里定义其配置的结构体;
/packetbeat/debian/: debian打包相关;
/packetbeat/decoder/: 解码类,网络传输层包的解码;
/packetbeat/docs/: 项目的相关文档;
/packetbeat/etc/: 示例配置文件;
/packetbeat/procs/: 获取系统内核运作状态与进程信息的工具类;
/packetbeat/protos/:自定义协议类,每个目录对应一个应用协议,我们需要在此新增我们的协议,如SMTP;
/packetbeat/sniffer/: 三种不同抓包方式的实现:pcap、af_packet、pf_ring,关于这三者的区别,请参照文档:Traffic Capturing Options;
/packetbeat/tests/: 测试相关的文件,里面有每一个协议的pcab抓包样板,还有一堆Python测试脚本;
知道项目的大概架构就知道从哪下手了,下节分解。 收起阅读 »
medcl 发表于 : 2016-01-15 18:23
评论 (1)
一个把数据从MySQL同步到Elasticsearch的工具
https://github.com/zhongbiaode ... -sync
这个工具用python实现,主要使用了mysqldump输出xml进行初次同步,以及binlog进行增量同步,欢迎试用以及提出修改意见。
最近刚刚更新了中文文档。
继续阅读 »
这个工具用python实现,主要使用了mysqldump输出xml进行初次同步,以及binlog进行增量同步,欢迎试用以及提出修改意见。
最近刚刚更新了中文文档。
https://github.com/zhongbiaode ... -sync
这个工具用python实现,主要使用了mysqldump输出xml进行初次同步,以及binlog进行增量同步,欢迎试用以及提出修改意见。
最近刚刚更新了中文文档。 收起阅读 »
这个工具用python实现,主要使用了mysqldump输出xml进行初次同步,以及binlog进行增量同步,欢迎试用以及提出修改意见。
最近刚刚更新了中文文档。 收起阅读 »
社区福利:Elastic-playground
为大家准备了一个测试Elasticsearch/Kibana功能的地方 (Found实例):
Kibana:
https://6e0ccaba29cd55a7f07f83 ... ibana
用户名/密码:elasticsearch-cn
集群名:"e064eb",使用Java客户端的时候需要,如何连接,参考:http://elasticsearch.cn/article/46
内置常用插件,有其他插件要安装的请留言。
继续阅读 »
Kibana:
https://6e0ccaba29cd55a7f07f83 ... ibana
用户名/密码:elasticsearch-cn
集群名:"e064eb",使用Java客户端的时候需要,如何连接,参考:http://elasticsearch.cn/article/46
HTTP http://e064eb4b0aa993db28ad513 ... :9200
HTTPS https://e064eb4b0aa993db28ad51 ... :9243
curl -u elasticsearch-cn:elasticsearch-cn http://e064eb4b0aa993db28ad513 ... 9200/
{
"name" : "instance-0000000009",
"cluster_name" : "e064eb4b0aa993db28ad513e4d2df5e3",
"version" : {
"number" : "2.1.1",
"build_hash" : "40e2c53a6b6c2972b3d13846e450e66f4375bd71",
"build_timestamp" : "2015-12-15T13:05:55Z",
"build_snapshot" : false,
"lucene_version" : "5.3.1"
},
"tagline" : "You Know, for Search"
}
内置常用插件,有其他插件要安装的请留言。
为大家准备了一个测试Elasticsearch/Kibana功能的地方 (Found实例):
Kibana:
https://6e0ccaba29cd55a7f07f83 ... ibana
用户名/密码:elasticsearch-cn
集群名:"e064eb",使用Java客户端的时候需要,如何连接,参考:http://elasticsearch.cn/article/46
内置常用插件,有其他插件要安装的请留言。 收起阅读 »
Kibana:
https://6e0ccaba29cd55a7f07f83 ... ibana
用户名/密码:elasticsearch-cn
集群名:"e064eb",使用Java客户端的时候需要,如何连接,参考:http://elasticsearch.cn/article/46
HTTP http://e064eb4b0aa993db28ad513 ... :9200
HTTPS https://e064eb4b0aa993db28ad51 ... :9243
curl -u elasticsearch-cn:elasticsearch-cn http://e064eb4b0aa993db28ad513 ... 9200/
{
"name" : "instance-0000000009",
"cluster_name" : "e064eb4b0aa993db28ad513e4d2df5e3",
"version" : {
"number" : "2.1.1",
"build_hash" : "40e2c53a6b6c2972b3d13846e450e66f4375bd71",
"build_timestamp" : "2015-12-15T13:05:55Z",
"build_snapshot" : false,
"lucene_version" : "5.3.1"
},
"tagline" : "You Know, for Search"
}
内置常用插件,有其他插件要安装的请留言。 收起阅读 »
elasticsearch-analysis-ik和elasticsearch-analysis-mmseg更新至1.7.0
elasticsearch-analysis-ik:
https://github.com/medcl/elasticsearch-analysis-ik
elasticsearch-analysis-mmseg:
https://github.com/medcl/elast ... -mseg
主要更新配置文件存放路径,之前版本的配置文件存放在elasticsearch的config目录,现在都修改为插件的相对目录了,主要是简化部署,现在可在Found(https://found.elastic.co)部署了。
继续阅读 »
https://github.com/medcl/elasticsearch-analysis-ik
elasticsearch-analysis-mmseg:
https://github.com/medcl/elast ... -mseg
主要更新配置文件存放路径,之前版本的配置文件存放在elasticsearch的config目录,现在都修改为插件的相对目录了,主要是简化部署,现在可在Found(https://found.elastic.co)部署了。
elasticsearch-analysis-ik:
https://github.com/medcl/elasticsearch-analysis-ik
elasticsearch-analysis-mmseg:
https://github.com/medcl/elast ... -mseg
主要更新配置文件存放路径,之前版本的配置文件存放在elasticsearch的config目录,现在都修改为插件的相对目录了,主要是简化部署,现在可在Found(https://found.elastic.co)部署了。 收起阅读 »
https://github.com/medcl/elasticsearch-analysis-ik
elasticsearch-analysis-mmseg:
https://github.com/medcl/elast ... -mseg
主要更新配置文件存放路径,之前版本的配置文件存放在elasticsearch的config目录,现在都修改为插件的相对目录了,主要是简化部署,现在可在Found(https://found.elastic.co)部署了。 收起阅读 »
通过elasticsearch-mapper attachment插件实现文件建立索引
1.安装elasticsearch-mapper attachment
bin/plugin install elasticsearch/elasticsearch-mapper-attachments/3.1.1
2.按照插件官方文档来测试
3.插件需要手动把文档内容转化为base64编码然后建立索引,代码如下
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.tika.Tika;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.parser.AutoDetectParser;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.parser.Parser;
import org.apache.tika.parser.pdf.PDFParser;
import org.apache.tika.sax.BodyContentHandler;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.xml.sax.ContentHandler;
import com.spatial4j.core.io.ParseUtils;
import static org.elasticsearch.common.xcontent.XContentFactory.*;
public class sysfiles {
public static void main(String[] args) throws Exception{
sys();
}
private static void sys() throws IOException {
// TODO Auto-generated method stub
String idxName = "test";
String idxType = "attachments";
Settings settings =ImmutableSettings.settingsBuilder().put("cluster.name","az_bsms_elasticsearch").build();
Client client=new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress("127.0.0.1", 9300));
String data64=org.elasticsearch.common.Base64.encodeFromFile(filepath);
XContentBuilder source = jsonBuilder().startObject()
.field("file", data64)
.field("text", data64)
.endObject();
String id = "file"+11;
IndexResponse idxResp = client.prepareIndex().setIndex(idxName).setType(idxType).setId(id)
.setSource(source).setRefresh(true).execute().actionGet();
System.out.println(idxResp);
client.close();
}
4.按官方文档正常的搜索就可以了
继续阅读 »
bin/plugin install elasticsearch/elasticsearch-mapper-attachments/3.1.1
2.按照插件官方文档来测试
3.插件需要手动把文档内容转化为base64编码然后建立索引,代码如下
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.tika.Tika;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.parser.AutoDetectParser;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.parser.Parser;
import org.apache.tika.parser.pdf.PDFParser;
import org.apache.tika.sax.BodyContentHandler;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.xml.sax.ContentHandler;
import com.spatial4j.core.io.ParseUtils;
import static org.elasticsearch.common.xcontent.XContentFactory.*;
public class sysfiles {
public static void main(String[] args) throws Exception{
sys();
}
private static void sys() throws IOException {
// TODO Auto-generated method stub
String idxName = "test";
String idxType = "attachments";
Settings settings =ImmutableSettings.settingsBuilder().put("cluster.name","az_bsms_elasticsearch").build();
Client client=new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress("127.0.0.1", 9300));
String data64=org.elasticsearch.common.Base64.encodeFromFile(filepath);
XContentBuilder source = jsonBuilder().startObject()
.field("file", data64)
.field("text", data64)
.endObject();
String id = "file"+11;
IndexResponse idxResp = client.prepareIndex().setIndex(idxName).setType(idxType).setId(id)
.setSource(source).setRefresh(true).execute().actionGet();
System.out.println(idxResp);
client.close();
}
4.按官方文档正常的搜索就可以了
1.安装elasticsearch-mapper attachment
bin/plugin install elasticsearch/elasticsearch-mapper-attachments/3.1.1
2.按照插件官方文档来测试
3.插件需要手动把文档内容转化为base64编码然后建立索引,代码如下
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.tika.Tika;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.parser.AutoDetectParser;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.parser.Parser;
import org.apache.tika.parser.pdf.PDFParser;
import org.apache.tika.sax.BodyContentHandler;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.xml.sax.ContentHandler;
import com.spatial4j.core.io.ParseUtils;
import static org.elasticsearch.common.xcontent.XContentFactory.*;
public class sysfiles {
public static void main(String[] args) throws Exception{
sys();
}
private static void sys() throws IOException {
// TODO Auto-generated method stub
String idxName = "test";
String idxType = "attachments";
Settings settings =ImmutableSettings.settingsBuilder().put("cluster.name","az_bsms_elasticsearch").build();
Client client=new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress("127.0.0.1", 9300));
String data64=org.elasticsearch.common.Base64.encodeFromFile(filepath);
XContentBuilder source = jsonBuilder().startObject()
.field("file", data64)
.field("text", data64)
.endObject();
String id = "file"+11;
IndexResponse idxResp = client.prepareIndex().setIndex(idxName).setType(idxType).setId(id)
.setSource(source).setRefresh(true).execute().actionGet();
System.out.println(idxResp);
client.close();
}
4.按官方文档正常的搜索就可以了 收起阅读 »
bin/plugin install elasticsearch/elasticsearch-mapper-attachments/3.1.1
2.按照插件官方文档来测试
3.插件需要手动把文档内容转化为base64编码然后建立索引,代码如下
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.tika.Tika;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.parser.AutoDetectParser;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.parser.Parser;
import org.apache.tika.parser.pdf.PDFParser;
import org.apache.tika.sax.BodyContentHandler;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.xml.sax.ContentHandler;
import com.spatial4j.core.io.ParseUtils;
import static org.elasticsearch.common.xcontent.XContentFactory.*;
public class sysfiles {
public static void main(String[] args) throws Exception{
sys();
}
private static void sys() throws IOException {
// TODO Auto-generated method stub
String idxName = "test";
String idxType = "attachments";
Settings settings =ImmutableSettings.settingsBuilder().put("cluster.name","az_bsms_elasticsearch").build();
Client client=new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress("127.0.0.1", 9300));
String data64=org.elasticsearch.common.Base64.encodeFromFile(filepath);
XContentBuilder source = jsonBuilder().startObject()
.field("file", data64)
.field("text", data64)
.endObject();
String id = "file"+11;
IndexResponse idxResp = client.prepareIndex().setIndex(idxName).setType(idxType).setId(id)
.setSource(source).setRefresh(true).execute().actionGet();
System.out.println(idxResp);
client.close();
}
4.按官方文档正常的搜索就可以了 收起阅读 »
paopao 发表于 : 2016-01-06 10:28
评论 (5)
Packetbeat协议扩展开发教程(1)
Packetbeat(https://www.elastic.co/products/beats/packetbeat)
是一个开源的网络抓包与分析框架,内置了很多常见的协议解析,如HTPP、MySQL、Thrift等。但是网络协议有很多,如何扩展一个自己的协议呢,本文将为您介绍如何在Packetbeat基础上扩展实现您自己的协议。
开发环境:
1.Go语言
Packetbeat是由Go语言编写,具有高性能和易部署的特点,有关Go语言的更多信息请访问:https://golang.org/。
2.Git
源码管理,相信大家都比较熟悉了。
3.Tcpdump
*nix下的抓包分析,可选,用于调试。
4.Mac本一台
Windows太伤,不建议。
5.IDE
推荐idea,其它只要你顺手都行。
这个教程给大家介绍的是编写一个SMTP协议的扩展,SMTP就是我们发邮件使用的协议,加密的比较麻烦,为了方便,本教程使用不加密的名文传输的SMTP协议,默认对应端口是25。
A.源码签出
登陆Github打开https://github.com/elastic/beats
fork后得到你自己的仓库,比如我的:https://github.com/medcl/packetbeat
编译出来的文件:packetbeat就在根目录
现在我们测试一下
修改etc/packetbeat.yml,在output下面的elasticsearch下面添加enabled: true,默认是不启用的,另外如果你的Elasticsearch安装了Shield,比如我的Elasticsearch的用户名和密码都是tribe_user,哦,忘了说了,我们的Elasticsearch跑在本机。
packetbeat.yml的详细配置可参见:https://www.elastic.co/guide/e ... .html
现在可以运行命令启动packetbeat了,默认会监听所有内置的协议,如HTTP、DNS等。
介绍一下常用的参数:
-N dry run模式,不实际output存储日志
-e 控制台输出调试日志
-d 仅显示对应logger的日志
好的,我们打开几个网页,控制台会有相应的输出,如下:
然后Elasticsearch应该就会有数据进去了,我们看看:
至此,packetbeat源码的build成功,我们整个开发流程已经跑通了,下一节正式开始介绍SMTP协议的扩展。
继续阅读 »
是一个开源的网络抓包与分析框架,内置了很多常见的协议解析,如HTPP、MySQL、Thrift等。但是网络协议有很多,如何扩展一个自己的协议呢,本文将为您介绍如何在Packetbeat基础上扩展实现您自己的协议。
开发环境:
1.Go语言
Packetbeat是由Go语言编写,具有高性能和易部署的特点,有关Go语言的更多信息请访问:https://golang.org/。
2.Git
源码管理,相信大家都比较熟悉了。
3.Tcpdump
*nix下的抓包分析,可选,用于调试。
4.Mac本一台
Windows太伤,不建议。
5.IDE
推荐idea,其它只要你顺手都行。
这个教程给大家介绍的是编写一个SMTP协议的扩展,SMTP就是我们发邮件使用的协议,加密的比较麻烦,为了方便,本教程使用不加密的名文传输的SMTP协议,默认对应端口是25。
A.源码签出
登陆Github打开https://github.com/elastic/beats
fork后得到你自己的仓库,比如我的:https://github.com/medcl/packetbeat
#创建相应目录
mkdir -p $GOPATH/src/github.com/elastic/
cd $GOPATH/src/github.com/elastic
#签出源码
git clone https://github.com/elastic/beats.git
cd beats
#修改官方仓库为upstream源,设置自己的仓库为origin源
git remote rename origin upstream
git remote add origin git@github.com:medcl/packetbeat.git
#获取上游最新的代码,如果是刚fork的话可不用管
git pull upstream master
#签出一个名为smtpbeat的分支,用于开发这个功能
git checkout -b smtpbeat
#切换到packetbeat模块
cd packetbeat
#获取依赖信息
(mkdir -p $GOPATH/src/golang.org/x/&&cd $GOPATH/src/golang.org/x &&git clone https://github.com/golang/tools.git )
go get github.com/tools/godep
#编译
make
编译出来的文件:packetbeat就在根目录
现在我们测试一下
修改etc/packetbeat.yml,在output下面的elasticsearch下面添加enabled: true,默认是不启用的,另外如果你的Elasticsearch安装了Shield,比如我的Elasticsearch的用户名和密码都是tribe_user,哦,忘了说了,我们的Elasticsearch跑在本机。
packetbeat.yml的详细配置可参见:https://www.elastic.co/guide/e ... .html
output:
elasticsearch:
enabled: true
hosts: ["localhost:9200"]
username: "tribe_user"
password: "tribe_user"
现在可以运行命令启动packetbeat了,默认会监听所有内置的协议,如HTTP、DNS等。
./packetbeat -e -c etc/packetbeat.yml -d "publish"
介绍一下常用的参数:
-N dry run模式,不实际output存储日志
-e 控制台输出调试日志
-d 仅显示对应logger的日志
好的,我们打开几个网页,控制台会有相应的输出,如下:
2015/12/29 14:24:39.965037 preprocess.go:37: DBG Start Preprocessing
2015/12/29 14:24:39.965366 publish.go:98: DBG Publish: {
"@timestamp": "2015-12-29T14:24:39.709Z",
"beat": {
"hostname": "medcls-MacBook.local",
"name": "medcls-MacBook.local"
},
"bytes_in": 31,
"bytes_out": 115,
"client_ip": "192.168.3.10",
"client_port": 53669,
"client_proc": "",
"client_server": "",
"count": 1,
"direction": "out",
"dns": {
"additionals_count": 0,
"answers": [
{
"class": "IN",
"data": "www.a.shifen.com",
"name": "sp2.baidu.com",
"ttl": 333,
"type": "CNAME"
}
],
"answers_count": 1,
"authorities": [
{
"class": "IN",
"data": "ns1.a.shifen.com",
"expire": 86400,
"minimum": 3600,
"name": "a.shifen.com",
"refresh": 5,
"retry": 5,
"rname": "baidu_dns_master.baidu.com",
"serial": 1512240003,
"ttl": 12,
"type": "SOA"
}
],
"authorities_count": 1,
"flags": {
"authoritative": false,
"recursion_allowed": true,
"recursion_desired": true,
"truncated_response": false
},
"id": 7435,
"op_code": "QUERY",
"question": {
"class": "IN",
"name": "sp2.baidu.com",
"type": "AAAA"
},
"response_code": "NOERROR"
},
"ip": "192.168.3.1",
"method": "QUERY",
"port": 53,
"proc": "",
"query": "class IN, type AAAA, sp2.baidu.com",
"resource": "sp2.baidu.com",
"responsetime": 18,
"server": "",
"status": "OK",
"transport": "udp",
"type": "dns"
}
2015/12/29 14:24:39.965774 preprocess.go:94: DBG Forward preprocessed events
2015/12/29 14:24:39.965796 async.go:42: DBG async forward to outputers (1)
2015/12/29 14:24:40.099973 output.go:103: DBG output worker: publish 2 events
然后Elasticsearch应该就会有数据进去了,我们看看:
curl http://localhost:9200/_cat/indices\?pretty\=true -u tribe_user:tribe_user
yellow open packetbeat-2015.12.29 5 1 135 0 561.2kb 561.2kb
至此,packetbeat源码的build成功,我们整个开发流程已经跑通了,下一节正式开始介绍SMTP协议的扩展。
Packetbeat(https://www.elastic.co/products/beats/packetbeat)
是一个开源的网络抓包与分析框架,内置了很多常见的协议解析,如HTPP、MySQL、Thrift等。但是网络协议有很多,如何扩展一个自己的协议呢,本文将为您介绍如何在Packetbeat基础上扩展实现您自己的协议。
开发环境:
1.Go语言
Packetbeat是由Go语言编写,具有高性能和易部署的特点,有关Go语言的更多信息请访问:https://golang.org/。
2.Git
源码管理,相信大家都比较熟悉了。
3.Tcpdump
*nix下的抓包分析,可选,用于调试。
4.Mac本一台
Windows太伤,不建议。
5.IDE
推荐idea,其它只要你顺手都行。
这个教程给大家介绍的是编写一个SMTP协议的扩展,SMTP就是我们发邮件使用的协议,加密的比较麻烦,为了方便,本教程使用不加密的名文传输的SMTP协议,默认对应端口是25。
A.源码签出
登陆Github打开https://github.com/elastic/beats
fork后得到你自己的仓库,比如我的:https://github.com/medcl/packetbeat
编译出来的文件:packetbeat就在根目录
现在我们测试一下
修改etc/packetbeat.yml,在output下面的elasticsearch下面添加enabled: true,默认是不启用的,另外如果你的Elasticsearch安装了Shield,比如我的Elasticsearch的用户名和密码都是tribe_user,哦,忘了说了,我们的Elasticsearch跑在本机。
packetbeat.yml的详细配置可参见:https://www.elastic.co/guide/e ... .html
现在可以运行命令启动packetbeat了,默认会监听所有内置的协议,如HTTP、DNS等。
介绍一下常用的参数:
-N dry run模式,不实际output存储日志
-e 控制台输出调试日志
-d 仅显示对应logger的日志
好的,我们打开几个网页,控制台会有相应的输出,如下:
然后Elasticsearch应该就会有数据进去了,我们看看:
至此,packetbeat源码的build成功,我们整个开发流程已经跑通了,下一节正式开始介绍SMTP协议的扩展。 收起阅读 »
是一个开源的网络抓包与分析框架,内置了很多常见的协议解析,如HTPP、MySQL、Thrift等。但是网络协议有很多,如何扩展一个自己的协议呢,本文将为您介绍如何在Packetbeat基础上扩展实现您自己的协议。
开发环境:
1.Go语言
Packetbeat是由Go语言编写,具有高性能和易部署的特点,有关Go语言的更多信息请访问:https://golang.org/。
2.Git
源码管理,相信大家都比较熟悉了。
3.Tcpdump
*nix下的抓包分析,可选,用于调试。
4.Mac本一台
Windows太伤,不建议。
5.IDE
推荐idea,其它只要你顺手都行。
这个教程给大家介绍的是编写一个SMTP协议的扩展,SMTP就是我们发邮件使用的协议,加密的比较麻烦,为了方便,本教程使用不加密的名文传输的SMTP协议,默认对应端口是25。
A.源码签出
登陆Github打开https://github.com/elastic/beats
fork后得到你自己的仓库,比如我的:https://github.com/medcl/packetbeat
#创建相应目录
mkdir -p $GOPATH/src/github.com/elastic/
cd $GOPATH/src/github.com/elastic
#签出源码
git clone https://github.com/elastic/beats.git
cd beats
#修改官方仓库为upstream源,设置自己的仓库为origin源
git remote rename origin upstream
git remote add origin git@github.com:medcl/packetbeat.git
#获取上游最新的代码,如果是刚fork的话可不用管
git pull upstream master
#签出一个名为smtpbeat的分支,用于开发这个功能
git checkout -b smtpbeat
#切换到packetbeat模块
cd packetbeat
#获取依赖信息
(mkdir -p $GOPATH/src/golang.org/x/&&cd $GOPATH/src/golang.org/x &&git clone https://github.com/golang/tools.git )
go get github.com/tools/godep
#编译
make
编译出来的文件:packetbeat就在根目录
现在我们测试一下
修改etc/packetbeat.yml,在output下面的elasticsearch下面添加enabled: true,默认是不启用的,另外如果你的Elasticsearch安装了Shield,比如我的Elasticsearch的用户名和密码都是tribe_user,哦,忘了说了,我们的Elasticsearch跑在本机。
packetbeat.yml的详细配置可参见:https://www.elastic.co/guide/e ... .html
output:
elasticsearch:
enabled: true
hosts: ["localhost:9200"]
username: "tribe_user"
password: "tribe_user"
现在可以运行命令启动packetbeat了,默认会监听所有内置的协议,如HTTP、DNS等。
./packetbeat -e -c etc/packetbeat.yml -d "publish"
介绍一下常用的参数:
-N dry run模式,不实际output存储日志
-e 控制台输出调试日志
-d 仅显示对应logger的日志
好的,我们打开几个网页,控制台会有相应的输出,如下:
2015/12/29 14:24:39.965037 preprocess.go:37: DBG Start Preprocessing
2015/12/29 14:24:39.965366 publish.go:98: DBG Publish: {
"@timestamp": "2015-12-29T14:24:39.709Z",
"beat": {
"hostname": "medcls-MacBook.local",
"name": "medcls-MacBook.local"
},
"bytes_in": 31,
"bytes_out": 115,
"client_ip": "192.168.3.10",
"client_port": 53669,
"client_proc": "",
"client_server": "",
"count": 1,
"direction": "out",
"dns": {
"additionals_count": 0,
"answers": [
{
"class": "IN",
"data": "www.a.shifen.com",
"name": "sp2.baidu.com",
"ttl": 333,
"type": "CNAME"
}
],
"answers_count": 1,
"authorities": [
{
"class": "IN",
"data": "ns1.a.shifen.com",
"expire": 86400,
"minimum": 3600,
"name": "a.shifen.com",
"refresh": 5,
"retry": 5,
"rname": "baidu_dns_master.baidu.com",
"serial": 1512240003,
"ttl": 12,
"type": "SOA"
}
],
"authorities_count": 1,
"flags": {
"authoritative": false,
"recursion_allowed": true,
"recursion_desired": true,
"truncated_response": false
},
"id": 7435,
"op_code": "QUERY",
"question": {
"class": "IN",
"name": "sp2.baidu.com",
"type": "AAAA"
},
"response_code": "NOERROR"
},
"ip": "192.168.3.1",
"method": "QUERY",
"port": 53,
"proc": "",
"query": "class IN, type AAAA, sp2.baidu.com",
"resource": "sp2.baidu.com",
"responsetime": 18,
"server": "",
"status": "OK",
"transport": "udp",
"type": "dns"
}
2015/12/29 14:24:39.965774 preprocess.go:94: DBG Forward preprocessed events
2015/12/29 14:24:39.965796 async.go:42: DBG async forward to outputers (1)
2015/12/29 14:24:40.099973 output.go:103: DBG output worker: publish 2 events
然后Elasticsearch应该就会有数据进去了,我们看看:
curl http://localhost:9200/_cat/indices\?pretty\=true -u tribe_user:tribe_user
yellow open packetbeat-2015.12.29 5 1 135 0 561.2kb 561.2kb
至此,packetbeat源码的build成功,我们整个开发流程已经跑通了,下一节正式开始介绍SMTP协议的扩展。 收起阅读 »
medcl 发表于 : 2015-12-30 21:02
评论 (1)
关于提示TooManyClauses[maxClauseCount is set to 1024]的问题。
今天头一次出现的,其实也不算什么问题。
通过数据库获取到了1126个条件数据,然后叠加进bool进行查询,直接抛出个异常:
TooManyClauses[maxClauseCount is set to 1024]
问了Medcl大神,得知是超过默认搜索条件大小的问题,可以通过参数修改
我也觉得挺BT的,自己想想都有点小激动,太佩服自己了。。。
继续阅读 »
通过数据库获取到了1126个条件数据,然后叠加进bool进行查询,直接抛出个异常:
TooManyClauses[maxClauseCount is set to 1024]
问了Medcl大神,得知是超过默认搜索条件大小的问题,可以通过参数修改
index.query.bool.max_clause_count: 4096
M大也说,太BT了。。。 这么多条件查询。。。我也觉得挺BT的,自己想想都有点小激动,太佩服自己了。。。
今天头一次出现的,其实也不算什么问题。
通过数据库获取到了1126个条件数据,然后叠加进bool进行查询,直接抛出个异常:
TooManyClauses[maxClauseCount is set to 1024]
问了Medcl大神,得知是超过默认搜索条件大小的问题,可以通过参数修改
我也觉得挺BT的,自己想想都有点小激动,太佩服自己了。。。 收起阅读 »
通过数据库获取到了1126个条件数据,然后叠加进bool进行查询,直接抛出个异常:
TooManyClauses[maxClauseCount is set to 1024]
问了Medcl大神,得知是超过默认搜索条件大小的问题,可以通过参数修改
index.query.bool.max_clause_count: 4096
M大也说,太BT了。。。 这么多条件查询。。。我也觉得挺BT的,自己想想都有点小激动,太佩服自己了。。。 收起阅读 »
atomyliu 发表于 : 2015-12-29 16:26
评论 (0)
Day24: Elasticsearch添加Shield后TransportClient如何连接?
Shield是Elasticsearch一个安全防护插件,提供了权限访问控制和日志审计功能,企业可以很方便的和LDAP或是ActiveDirectory进行集成,重用现有的安全认证体系.
Elasticsearch使用了Shield后,Elasticsearch就需要权限才能访问了,和默认的调用方式有些不同,下面简单介绍一下HTTP和TCP两种方式的连接.
关于Shield的安装和配置我这里不就具体介绍,创建了一个用户名和密码都是tribe_user的用户,权限是admin.
1.HTTP方式
现在直接访问es的http接口就会报错
curl http://localhost:9200
{"error":{"root_cause":[{"type":"security_exception","reason":"missing authentication token for REST request [/]","header":{"WWW-Authenticate":"Basic realm=\"shield\""}}],"type":"security_exception","reason":"missing authentication token for REST request [/]","header":{"WWW-Authenticate":"Basic realm=\"shield\""}},"status":401}
shield支持HttpBasic验证,所以正确的访问姿势是:
curl -u tribe_user:tribe_user http://localhost:9200 { "name" : "Melter", "cluster_name" : "elasticsearch", "version" : { "number" : "2.1.1", "build_hash" : "805c528f3167980046f224310f9147fa745e5371", "build_timestamp" : "2015-12-09T20:23:16Z", "build_snapshot" : false, "lucene_version" : "5.3.1" }, "tagline" : "You Know, for Search" }
如果是浏览器访问的话,第一次访问会弹出验证窗口,后续只要不关闭这个浏览器保持这个session就能一直访问.
注意http basic是不安全的认证方式,仅供开发调试使用,生产环境还需要结合HTTPS的加密通道使用.
2.TransportClient方式的访问Shield加防的Elasticsearch,稍微麻烦点,需要依赖Shield的包,步骤如下:
2.1 如果你是maven管理的项目,在pom.xml文件里添加Elasticsearch的maven仓库源,如下:
<repositories>
<repository>
<id>elasticsearch-releases</id>
<url>https://maven.elasticsearch.or ... gt%3B
<releases> <enabled>true</enabled> </releases>
<snapshots> <enabled>false</enabled> </snapshots>
</repository>
</repositories>
2.2 添加依赖的配置
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>shield</artifactId>
<version>2.1.1</version>
</dependency
2.3 构建TransportClient的地方增加访问用户的配置
import org.elasticsearch.shield.ShieldPlugin; import org.elasticsearch.shield.authc.support.SecuredString; import static org.elasticsearch.shield.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
String clusterName="elasticsearch"; String ip= "127.0.0.1";
Settings settings = Settings.settingsBuilder()
.put("cluster.name", clusterName)
.put("shield.user", "tribe_user:tribe_user")
.build();
try { client = TransportClient.builder()
.addPlugin(ShieldPlugin.class)
.settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ip),9300));
String token = basicAuthHeaderValue("tribe_user", new SecuredString("tribe_user".toCharArray())); client.prepareSearch()
.putHeader("Authorization", token).get(); }
catch (UnknownHostException e)
{ logger.error("es",e); }
现在的编辑器贴代码有点恶心,可以看这里:
http://log.medcl.net/item/2015 ... -1252
继续阅读 »
Elasticsearch使用了Shield后,Elasticsearch就需要权限才能访问了,和默认的调用方式有些不同,下面简单介绍一下HTTP和TCP两种方式的连接.
关于Shield的安装和配置我这里不就具体介绍,创建了一个用户名和密码都是tribe_user的用户,权限是admin.
1.HTTP方式
现在直接访问es的http接口就会报错
curl http://localhost:9200
{"error":{"root_cause":[{"type":"security_exception","reason":"missing authentication token for REST request [/]","header":{"WWW-Authenticate":"Basic realm=\"shield\""}}],"type":"security_exception","reason":"missing authentication token for REST request [/]","header":{"WWW-Authenticate":"Basic realm=\"shield\""}},"status":401}
shield支持HttpBasic验证,所以正确的访问姿势是:
curl -u tribe_user:tribe_user http://localhost:9200 { "name" : "Melter", "cluster_name" : "elasticsearch", "version" : { "number" : "2.1.1", "build_hash" : "805c528f3167980046f224310f9147fa745e5371", "build_timestamp" : "2015-12-09T20:23:16Z", "build_snapshot" : false, "lucene_version" : "5.3.1" }, "tagline" : "You Know, for Search" }
如果是浏览器访问的话,第一次访问会弹出验证窗口,后续只要不关闭这个浏览器保持这个session就能一直访问.
注意http basic是不安全的认证方式,仅供开发调试使用,生产环境还需要结合HTTPS的加密通道使用.
2.TransportClient方式的访问Shield加防的Elasticsearch,稍微麻烦点,需要依赖Shield的包,步骤如下:
2.1 如果你是maven管理的项目,在pom.xml文件里添加Elasticsearch的maven仓库源,如下:
<repositories>
<repository>
<id>elasticsearch-releases</id>
<url>https://maven.elasticsearch.or ... gt%3B
<releases> <enabled>true</enabled> </releases>
<snapshots> <enabled>false</enabled> </snapshots>
</repository>
</repositories>
2.2 添加依赖的配置
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>shield</artifactId>
<version>2.1.1</version>
</dependency
2.3 构建TransportClient的地方增加访问用户的配置
import org.elasticsearch.shield.ShieldPlugin; import org.elasticsearch.shield.authc.support.SecuredString; import static org.elasticsearch.shield.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
String clusterName="elasticsearch"; String ip= "127.0.0.1";
Settings settings = Settings.settingsBuilder()
.put("cluster.name", clusterName)
.put("shield.user", "tribe_user:tribe_user")
.build();
try { client = TransportClient.builder()
.addPlugin(ShieldPlugin.class)
.settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ip),9300));
String token = basicAuthHeaderValue("tribe_user", new SecuredString("tribe_user".toCharArray())); client.prepareSearch()
.putHeader("Authorization", token).get(); }
catch (UnknownHostException e)
{ logger.error("es",e); }
现在的编辑器贴代码有点恶心,可以看这里:
http://log.medcl.net/item/2015 ... -1252
Shield是Elasticsearch一个安全防护插件,提供了权限访问控制和日志审计功能,企业可以很方便的和LDAP或是ActiveDirectory进行集成,重用现有的安全认证体系.
Elasticsearch使用了Shield后,Elasticsearch就需要权限才能访问了,和默认的调用方式有些不同,下面简单介绍一下HTTP和TCP两种方式的连接.
关于Shield的安装和配置我这里不就具体介绍,创建了一个用户名和密码都是tribe_user的用户,权限是admin.
1.HTTP方式
现在直接访问es的http接口就会报错
curl http://localhost:9200
{"error":{"root_cause":[{"type":"security_exception","reason":"missing authentication token for REST request [/]","header":{"WWW-Authenticate":"Basic realm=\"shield\""}}],"type":"security_exception","reason":"missing authentication token for REST request [/]","header":{"WWW-Authenticate":"Basic realm=\"shield\""}},"status":401}
shield支持HttpBasic验证,所以正确的访问姿势是:
curl -u tribe_user:tribe_user http://localhost:9200 { "name" : "Melter", "cluster_name" : "elasticsearch", "version" : { "number" : "2.1.1", "build_hash" : "805c528f3167980046f224310f9147fa745e5371", "build_timestamp" : "2015-12-09T20:23:16Z", "build_snapshot" : false, "lucene_version" : "5.3.1" }, "tagline" : "You Know, for Search" }
如果是浏览器访问的话,第一次访问会弹出验证窗口,后续只要不关闭这个浏览器保持这个session就能一直访问.
注意http basic是不安全的认证方式,仅供开发调试使用,生产环境还需要结合HTTPS的加密通道使用.
2.TransportClient方式的访问Shield加防的Elasticsearch,稍微麻烦点,需要依赖Shield的包,步骤如下:
2.1 如果你是maven管理的项目,在pom.xml文件里添加Elasticsearch的maven仓库源,如下:
<repositories>
<repository>
<id>elasticsearch-releases</id>
<url>https://maven.elasticsearch.or ... gt%3B
<releases> <enabled>true</enabled> </releases>
<snapshots> <enabled>false</enabled> </snapshots>
</repository>
</repositories>
2.2 添加依赖的配置
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>shield</artifactId>
<version>2.1.1</version>
</dependency
2.3 构建TransportClient的地方增加访问用户的配置
import org.elasticsearch.shield.ShieldPlugin; import org.elasticsearch.shield.authc.support.SecuredString; import static org.elasticsearch.shield.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
String clusterName="elasticsearch"; String ip= "127.0.0.1";
Settings settings = Settings.settingsBuilder()
.put("cluster.name", clusterName)
.put("shield.user", "tribe_user:tribe_user")
.build();
try { client = TransportClient.builder()
.addPlugin(ShieldPlugin.class)
.settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ip),9300));
String token = basicAuthHeaderValue("tribe_user", new SecuredString("tribe_user".toCharArray())); client.prepareSearch()
.putHeader("Authorization", token).get(); }
catch (UnknownHostException e)
{ logger.error("es",e); }
现在的编辑器贴代码有点恶心,可以看这里:
http://log.medcl.net/item/2015 ... -1252 收起阅读 »
Elasticsearch使用了Shield后,Elasticsearch就需要权限才能访问了,和默认的调用方式有些不同,下面简单介绍一下HTTP和TCP两种方式的连接.
关于Shield的安装和配置我这里不就具体介绍,创建了一个用户名和密码都是tribe_user的用户,权限是admin.
1.HTTP方式
现在直接访问es的http接口就会报错
curl http://localhost:9200
{"error":{"root_cause":[{"type":"security_exception","reason":"missing authentication token for REST request [/]","header":{"WWW-Authenticate":"Basic realm=\"shield\""}}],"type":"security_exception","reason":"missing authentication token for REST request [/]","header":{"WWW-Authenticate":"Basic realm=\"shield\""}},"status":401}
shield支持HttpBasic验证,所以正确的访问姿势是:
curl -u tribe_user:tribe_user http://localhost:9200 { "name" : "Melter", "cluster_name" : "elasticsearch", "version" : { "number" : "2.1.1", "build_hash" : "805c528f3167980046f224310f9147fa745e5371", "build_timestamp" : "2015-12-09T20:23:16Z", "build_snapshot" : false, "lucene_version" : "5.3.1" }, "tagline" : "You Know, for Search" }
如果是浏览器访问的话,第一次访问会弹出验证窗口,后续只要不关闭这个浏览器保持这个session就能一直访问.
注意http basic是不安全的认证方式,仅供开发调试使用,生产环境还需要结合HTTPS的加密通道使用.
2.TransportClient方式的访问Shield加防的Elasticsearch,稍微麻烦点,需要依赖Shield的包,步骤如下:
2.1 如果你是maven管理的项目,在pom.xml文件里添加Elasticsearch的maven仓库源,如下:
<repositories>
<repository>
<id>elasticsearch-releases</id>
<url>https://maven.elasticsearch.or ... gt%3B
<releases> <enabled>true</enabled> </releases>
<snapshots> <enabled>false</enabled> </snapshots>
</repository>
</repositories>
2.2 添加依赖的配置
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>shield</artifactId>
<version>2.1.1</version>
</dependency
2.3 构建TransportClient的地方增加访问用户的配置
import org.elasticsearch.shield.ShieldPlugin; import org.elasticsearch.shield.authc.support.SecuredString; import static org.elasticsearch.shield.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
String clusterName="elasticsearch"; String ip= "127.0.0.1";
Settings settings = Settings.settingsBuilder()
.put("cluster.name", clusterName)
.put("shield.user", "tribe_user:tribe_user")
.build();
try { client = TransportClient.builder()
.addPlugin(ShieldPlugin.class)
.settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ip),9300));
String token = basicAuthHeaderValue("tribe_user", new SecuredString("tribe_user".toCharArray())); client.prepareSearch()
.putHeader("Authorization", token).get(); }
catch (UnknownHostException e)
{ logger.error("es",e); }
现在的编辑器贴代码有点恶心,可以看这里:
http://log.medcl.net/item/2015 ... -1252 收起阅读 »
medcl 发表于 : 2015-12-28 12:13
评论 (6)