Q:有两个人掉到陷阱里了,死的人叫死人,活人叫什么?

如何实现Logstash使用avro序列化实现批发送消息到Kafka

Logstash | 作者 elkan1788 | 发布于2017年06月09日 | 阅读数:9914

环境: `CentOS7.2`,  `Logstash5.x`, `Kafka-0.10.1`, `logstash-codec-avro-3.0.0`, `logstash-output-kafka-6.1.3`
 
在日志采集方面使用了Logstash进行收集,由于考虑到性能的问题,采用了apache avro进行序列化,发现网上也有人提供此插件,于是把搭建了个简单的环境进行测试,验证是可以实现的。不过在压测的时候发现个问题,后面数据处理的速度并不快,跟踪下来发现Logstash是line的形式发送消息的,不知道能否实现用批量的形式发送呢?查看过`logstash-codec-avro`的源码,但不知道如何修改这块,有人遇过到类似的问题吗?求指教。
 
`logstash-codec-avro`的源码: 
```
public
def encode(event)
dw = Avro::IO::DatumWriter.new(@schema)
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(buffer)
dw.write(event.to_hash, encoder)
@on_event.call(event, Base64.strict_encode64(buffer.string))
end
```
 
 
Logstash配置文件:
 
```
input {

    file {

        path => "/data/tmp/*.log"

        start_position => beginning

        codec => "json"

    }

}


filter {

    json {

        remove_field => [ "path","@timestamp","@version","host" ]

        source => message

    }

}




output {

    stdout {

        codec => plain {

             format => "%{message}"

       }

    }

    kafka {
        client_id => "logstash-test"

        bootstrap_servers => "xxx:9092,xxx:9093"

        topic_id => "logstash-test"

        compression_type => "snappy"

        retries => 5

        codec => avro {

            schema_uri => "/root/User.avsc"

        }
    }

}
 
```
 
 
已邀请:

要回复问题请先登录注册