现在架构是 filebeat - > kafka -> logstash -> elasticsearch
kafka保留消息时间为24小时
问题:
如果一段时间内数据采集有遗漏,再想把数据补充回去,并保证数据几乎不重复,有什么方式?
想法:
elastic查询出指定时间段的offset,删除指定时间段的,补全数据后logstash根据offset值重新消费
但是上面的想法实现起来太困难,不管是从kafka还是elastic获取offset都不靠谱,而且重新消费也消耗性能,目前ye也没找到logstash重新消费的方式
kafka保留消息时间为24小时
问题:
如果一段时间内数据采集有遗漏,再想把数据补充回去,并保证数据几乎不重复,有什么方式?
想法:
elastic查询出指定时间段的offset,删除指定时间段的,补全数据后logstash根据offset值重新消费
但是上面的想法实现起来太困难,不管是从kafka还是elastic获取offset都不靠谱,而且重新消费也消耗性能,目前ye也没找到logstash重新消费的方式
2 个回复
BirdZhang
赞同来自: leighton_buaa 、medcl
总结一下吧,logstash 的filter跟out配置如下:
应该用fingerprint也可以
leighton_buaa
赞同来自:
每条日志都指定一个字段(例如timestamp)作为doc ID索引到es中,当你的补全数据入库时,如果es中没有该日志,那就是添加,如果有就会覆盖原有日志记录。