我的环境如下
rsyslog/filebeat --- > provider-logstash (7.4.0) --- > kafka(3节点) --- > consumer logstash(7.2.0 3节点) --- > es(3节点)
其中consumer logstash 与 ES 部署在3个服务器上, 每个服务器上有一个consumer和一个ES
我的问题是,当kafka的一个节点故障或者停止,此时数据可以正常provider到kafka ,但是consumer的logstash却无法从kafka上获取消息。
我修改了metadata_max_age_ms 为3000 即每3秒同步一次metadata数据,以便获取kafka topic分布,但是仍然是这样
我通过python的客户端尝试消费,发现在缺少一个kafka节点的情况下消费是正常,说明问题应该是在logstash上
下面是我的logstash的配置
输入kafka的logstash配置
output {
if [topic]{
kafka {
bootstrap_servers => "ip1:9092,ip2:9092,ip3:9092" #生产者
topic_id => "%{topic}"
codec => json
metadata_max_age_ms => 3000
}
}
}
从kafka消费的logstash的配置
input {
kafka {
bootstrap_servers => "ip1:9092,ip2:9092,ip3:9092"
group_id => "logstash-input"
metadata_max_age_ms => 3000
auto_offset_reset => "latest"
consumer_threads => 3
decorate_events => true
topics => [topic列表。。]
codec => json
}
通过logstash的日志我看到了当kafka节点断开连接后,logstash的报错
[2020-03-14T23:48:25,268][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash-input] Group coordinator 故障kafka节点ip:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
[2020-03-14T23:48:25,308][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-1, groupId=logstash-input] Group coordinator 故障kafka节点ip:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
[2020-03-14T23:48:25,308][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-2, groupId=logstash-input] Group coordinator 故障kafka节点ip:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
[2020-03-14T23:48:25,309][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-1, groupId=logstash-input] Discovered group coordinator 故障kafka节点ip:9092 (id: 2147483647 rack: null)
[2020-03-14T23:48:25,310][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-2, groupId=logstash-input] Discovered group coordinator 故障kafka节点ip:9092 (id: 2147483647 rack: null)
[2020-03-14T23:48:25,310][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-2, groupId=logstash-input] Group coordinator 故障kafka节点ip:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
[2020-03-14T23:48:25,310][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-1, groupId=logstash-input] Group coordinator 故障kafka节点ip:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
[2020-03-14T23:48:25,411][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-1, groupId=logstash-input] Discovered group coordinator 故障kafka节点ip:9092 (id: 2147483647 rack: null)
[2020-03-14T23:48:35,449][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-1, groupId=logstash-input] Group coordinator 故障kafka节点ip:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
[2020-03-14T23:48:49,630][INFO ][org.apache.kafka.clients.FetchSessionHandler] [Consumer clientId=logstash-0, groupId=logstash-input] Error sending fetch request (sessionId=2036742827, epoch=226) to node 0: org.apache.kafka.common.errors.DisconnectException.
rsyslog/filebeat --- > provider-logstash (7.4.0) --- > kafka(3节点) --- > consumer logstash(7.2.0 3节点) --- > es(3节点)
其中consumer logstash 与 ES 部署在3个服务器上, 每个服务器上有一个consumer和一个ES
我的问题是,当kafka的一个节点故障或者停止,此时数据可以正常provider到kafka ,但是consumer的logstash却无法从kafka上获取消息。
我修改了metadata_max_age_ms 为3000 即每3秒同步一次metadata数据,以便获取kafka topic分布,但是仍然是这样
我通过python的客户端尝试消费,发现在缺少一个kafka节点的情况下消费是正常,说明问题应该是在logstash上
下面是我的logstash的配置
输入kafka的logstash配置
output {
if [topic]{
kafka {
bootstrap_servers => "ip1:9092,ip2:9092,ip3:9092" #生产者
topic_id => "%{topic}"
codec => json
metadata_max_age_ms => 3000
}
}
}
从kafka消费的logstash的配置
input {
kafka {
bootstrap_servers => "ip1:9092,ip2:9092,ip3:9092"
group_id => "logstash-input"
metadata_max_age_ms => 3000
auto_offset_reset => "latest"
consumer_threads => 3
decorate_events => true
topics => [topic列表。。]
codec => json
}
通过logstash的日志我看到了当kafka节点断开连接后,logstash的报错
[2020-03-14T23:48:25,268][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash-input] Group coordinator 故障kafka节点ip:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
[2020-03-14T23:48:25,308][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-1, groupId=logstash-input] Group coordinator 故障kafka节点ip:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
[2020-03-14T23:48:25,308][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-2, groupId=logstash-input] Group coordinator 故障kafka节点ip:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
[2020-03-14T23:48:25,309][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-1, groupId=logstash-input] Discovered group coordinator 故障kafka节点ip:9092 (id: 2147483647 rack: null)
[2020-03-14T23:48:25,310][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-2, groupId=logstash-input] Discovered group coordinator 故障kafka节点ip:9092 (id: 2147483647 rack: null)
[2020-03-14T23:48:25,310][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-2, groupId=logstash-input] Group coordinator 故障kafka节点ip:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
[2020-03-14T23:48:25,310][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-1, groupId=logstash-input] Group coordinator 故障kafka节点ip:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
[2020-03-14T23:48:25,411][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-1, groupId=logstash-input] Discovered group coordinator 故障kafka节点ip:9092 (id: 2147483647 rack: null)
[2020-03-14T23:48:35,449][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-1, groupId=logstash-input] Group coordinator 故障kafka节点ip:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
[2020-03-14T23:48:49,630][INFO ][org.apache.kafka.clients.FetchSessionHandler] [Consumer clientId=logstash-0, groupId=logstash-input] Error sending fetch request (sessionId=2036742827, epoch=226) to node 0: org.apache.kafka.common.errors.DisconnectException.
1 个回复
yang4210
赞同来自:
from elasticsearch import Elasticsearch
from kafka import KafkaConsumer
之后批量写入ES