在logstash内部, input到filter, 以及filter到output, 消息都是通过一个队列来中转.
在我写hangout的第一个版本,也是这么做的,用ArrayBlockingQueue来中转消息, 上游几个线程把消息放在queue中, 下游再几个线程把queue中的消息消费走.
但是, 用下来之后, 发现在queue上面消耗的资源是相当的大,strace查看,非常大量的lock相关的系统调用, 现在的版本已经把queue去掉了. 想必Logstash也会有大量资源用在这一块.
zeromq中的Parallel Pipeline正好适合这个场景,而且文档中说是lock free的, 拿来和queue对比一下看.
在我自己的电脑上测试,2.6 GHz Intel Core i5. 一个主线程生成10,000,000个随机数, 分发给四个线程消费.
用Queue来实现, 需要约37秒, CPU使用率在150%. 用zeromq的ipc来传递消息, 只需要22秒, 期间CPU使用率在250%. 总的CPU使用时间都60秒左右.
不知道java中还有没有更合适的Queue可以用在这个场景中.至少zeromq和ArrayBlockingQueue相比, zeromq可以更快的处理消息, 但代价就是更高的CPU使用率.
在我写hangout的第一个版本,也是这么做的,用ArrayBlockingQueue来中转消息, 上游几个线程把消息放在queue中, 下游再几个线程把queue中的消息消费走.
但是, 用下来之后, 发现在queue上面消耗的资源是相当的大,strace查看,非常大量的lock相关的系统调用, 现在的版本已经把queue去掉了. 想必Logstash也会有大量资源用在这一块.
zeromq中的Parallel Pipeline正好适合这个场景,而且文档中说是lock free的, 拿来和queue对比一下看.
在我自己的电脑上测试,2.6 GHz Intel Core i5. 一个主线程生成10,000,000个随机数, 分发给四个线程消费.
用Queue来实现, 需要约37秒, CPU使用率在150%. 用zeromq的ipc来传递消息, 只需要22秒, 期间CPU使用率在250%. 总的CPU使用时间都60秒左右.
不知道java中还有没有更合适的Queue可以用在这个场景中.至少zeromq和ArrayBlockingQueue相比, zeromq可以更快的处理消息, 但代价就是更高的CPU使用率.
[尊重社区原创,转载请保留或注明出处]
本文地址:http://searchkit.cn/article/31
本文地址:http://searchkit.cn/article/31
1 个评论
Logstash内部多个queue还有一个作用是处理output后端故障造成堆积的问题的,起到缓冲的作用,2.2在管道处理这块有优化,会在内存中重用消息,并且管道内消息也将batch处理