通过logstash将日志文件读取到es中,logstash的版本为2.4.0通过filter处理:
filter {
grok {
patterns_dir => [".\patterns"]
match => { "message" => "%{CREDIT_CORE_TIMESTAMP:credittime}" }
}
date {
match => [ "credittime" , "YYYY-MM-dd HH:mm:ss,SSS" ]
}
}
处理后的的时间为:
"@timestamp":"2016-10-21T15:41:10.457Z"
"credittime":"2016-10-21 11:41:10.457"
附上完整的配置文件:
input{
kafka
{
zk_connect => "10.214.128.20:2181"
group_id => "creditgroup"
topic_id => "credit"
consumer_id => "credit1"
#auto_offset_reset => "smallest"
#reset_beginning => true
consumer_threads => 5
decorate_events => false
type => "string"
codec => "plain"
}
}
filter {
grok {
patterns_dir => [".\patterns"]
match => { "message" => "%{CREDIT_CORE_TIMESTAMP:credittime}" }
}
date {
match => [ "credittime" , "YYYY-MM-dd HH:mm:ss,SSS" ]
# add_field => {link => "%{@timestamp}"}
}
}
output {
elasticsearch{
hosts => ["10.214.128.20:9200","10.214.128.21:9200","10.214.128.22:9200"]
index => "credit_log"
document_type => "credit_access"
user => "adcredit"
password => "adcredit123"
}
stdout {
codec => plain
}
}
filter {
grok {
patterns_dir => [".\patterns"]
match => { "message" => "%{CREDIT_CORE_TIMESTAMP:credittime}" }
}
date {
match => [ "credittime" , "YYYY-MM-dd HH:mm:ss,SSS" ]
}
}
处理后的的时间为:
"@timestamp":"2016-10-21T15:41:10.457Z"
"credittime":"2016-10-21 11:41:10.457"
附上完整的配置文件:
input{
kafka
{
zk_connect => "10.214.128.20:2181"
group_id => "creditgroup"
topic_id => "credit"
consumer_id => "credit1"
#auto_offset_reset => "smallest"
#reset_beginning => true
consumer_threads => 5
decorate_events => false
type => "string"
codec => "plain"
}
}
filter {
grok {
patterns_dir => [".\patterns"]
match => { "message" => "%{CREDIT_CORE_TIMESTAMP:credittime}" }
}
date {
match => [ "credittime" , "YYYY-MM-dd HH:mm:ss,SSS" ]
# add_field => {link => "%{@timestamp}"}
}
}
output {
elasticsearch{
hosts => ["10.214.128.20:9200","10.214.128.21:9200","10.214.128.22:9200"]
index => "credit_log"
document_type => "credit_access"
user => "adcredit"
password => "adcredit123"
}
stdout {
codec => plain
}
}
0 个回复