通过在控制台执行logstash,可以看到正常连接到kafka,但数据一直无法消费,然后过大约5分钟后连接就断开了,也能看到明显的提示
[WARN ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=logstsh-0, groupId=gc] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
这是logstash连接上kafka后 查看kafka的 gc消费组 的偏移,不会变化GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
gc gc-mysql-slow 0 666730 708688 41958 logstash-0 /10.255.0.2 logstash-0
gc gc-mysql-slow 1 669991 710558 40567 logstash-0 /10.255.0.2 logstash-0
这是logstash在控制台下打印的消息,
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [kafka-node1:26200, kafka-node2:26201, kafka-node3:26202]
check.crcs = true
client.dns.lookup = default
client.id = logstsh-1-0
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = gc
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes =
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1
metadata.max.age.ms = 300000
metric.reporters =
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 52428800
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
[2019-10-18T04:08:57,480][INFO ][org.apache.kafka.common.utils.AppInfoParser][main] Kafka version: 2.3.0
[2019-10-18T04:08:57,480][INFO ][org.apache.kafka.common.utils.AppInfoParser][main] Kafka commitId: fc1aaa116b661c8a
[2019-10-18T04:08:57,480][INFO ][org.apache.kafka.common.utils.AppInfoParser][main] Kafka startTimeMs: 1571386137478
[2019-10-18T04:08:57,516][INFO ][org.apache.kafka.clients.consumer.KafkaConsumer][main] [Consumer clientId=logstash-0, groupId=gc] Subscribed to topic(s): gc-mysql-slow
[2019-10-18T04:08:57,754][INFO ][org.apache.kafka.clients.Metadata][main] [Consumer clientId=logstash-0, groupId=gc] Cluster ID: 7LJ29pB8TW2zpdihaoyDbg
[2019-10-18T04:08:57,755][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=logstash-0, groupId=gc] Discovered group coordinator kafka-node3:26202 (id: 2147483644 rack: null)
[2019-10-18T04:08:57,764][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main] [Consumer clientId=logstash-0, groupId=gc] Revoking previously assigned partitions
[2019-10-18T04:08:57,764][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=logstash-0, groupId=gc] (Re-)joining group
[2019-10-18T04:08:57,788][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=logstash-0, groupId=gc] (Re-)joining group
[2019-10-18T04:08:57,814][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=logstash-0, groupId=gc] Successfully joined group with generation 297
[2019-10-18T04:08:57,817][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main] [Consumer clientId=logstash-0, groupId=gc] Setting newly assigned partitions: gc-mysql-slow-0, gc-mysql-slow-1
[2019-10-18T04:08:57,831][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main] [Consumer clientId=logstash-0, groupId=gc] Setting offset for partition gc-mysql-slow-0 to the committed offset FetchPosition{offset=666730, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=kafka-node3:26202 (id: 3 rack: null), epoch=6}}
[2019-10-18T04:08:57,841][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main] [Consumer clientId=logstash-0, groupId=gc] Setting offset for partition gc-mysql-slow-1 to the committed offset FetchPosition{offset=669991, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=kafka-node1:26200 (id: 1 rack: null), epoch=11}}
[2019-10-18T04:08:57,866][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
我也有通过python写了一小段简单的脚本,能正常获取到kafka数据。
下面是python的订阅参数
{
'bootstrap_servers': ['kafka-node1:26200'],
'client_id': 'kafka-python-1.4.7',
'group_id': 'gc',
'key_deserializer': None,
'value_deserializer': None,
'fetch_max_wait_ms': 500,
'fetch_min_bytes': 1,
'fetch_max_bytes': 52428800,
'max_partition_fetch_bytes': 1048576,
'request_timeout_ms': 305000,
'retry_backoff_ms': 100,
'reconnect_backoff_ms': 50,
'reconnect_backoff_max_ms': 1000,
'max_in_flight_requests_per_connection': 5,
'auto_offset_reset': 'latest',
'enable_auto_commit': True,
'auto_commit_interval_ms': 5000,
'default_offset_commit_callback': <function KafkaConsumer.<lambda> at 0x000001355B4FCE58>,
'check_crcs': True,
'metadata_max_age_ms': 300000,
'partition_assignment_strategy': (<class 'kafka.coordinator.assignors.range.RangePartitionAssignor'>, <class 'kafka.coordinator.assignors.roundrobin.RoundRobinPartitionAssignor'>),
'max_poll_records': 500,
'max_poll_interval_ms': 300000,
'session_timeout_ms': 10000,
'heartbeat_interval_ms': 3000,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
'socket_options': [(6, 1, 1)],
'sock_chunk_bytes': 4096,
'sock_chunk_buffer_count': 1000,
'consumer_timeout_ms': inf,
'security_protocol': 'PLAINTEXT',
'ssl_context': None,
'ssl_check_hostname': True,
'ssl_cafile': None,
'ssl_certfile': None,
'ssl_keyfile': None,
'ssl_crlfile': None,
'ssl_password': None,
'ssl_ciphers': None,
'api_version': (1, 0, 0),
'api_version_auto_timeout_ms': 2000,
'connections_max_idle_ms': 540000,
'metric_reporters': ,
'metrics_num_samples': 2,
'metrics_sample_window_ms': 30000,
'metric_group_prefix': 'consumer',
'selector': <class 'selectors.SelectSelector'>,
'exclude_internal_topics': True,
'sasl_mechanism': None,
'sasl_plain_username': None,
'sasl_plain_password': None,
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None,
'legacy_iterator': False}
真的不知道原因在哪了。有没有大神指点一下方向。昨晚还好好的,今天一天就不行了。现在已经堆积好几万条的数据没消费了。
input kafka
output elasticsearch
[WARN ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=logstsh-0, groupId=gc] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
这是logstash连接上kafka后 查看kafka的 gc消费组 的偏移,不会变化GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
gc gc-mysql-slow 0 666730 708688 41958 logstash-0 /10.255.0.2 logstash-0
gc gc-mysql-slow 1 669991 710558 40567 logstash-0 /10.255.0.2 logstash-0
这是logstash在控制台下打印的消息,
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [kafka-node1:26200, kafka-node2:26201, kafka-node3:26202]
check.crcs = true
client.dns.lookup = default
client.id = logstsh-1-0
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = gc
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes =
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1
metadata.max.age.ms = 300000
metric.reporters =
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 52428800
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
[2019-10-18T04:08:57,480][INFO ][org.apache.kafka.common.utils.AppInfoParser][main] Kafka version: 2.3.0
[2019-10-18T04:08:57,480][INFO ][org.apache.kafka.common.utils.AppInfoParser][main] Kafka commitId: fc1aaa116b661c8a
[2019-10-18T04:08:57,480][INFO ][org.apache.kafka.common.utils.AppInfoParser][main] Kafka startTimeMs: 1571386137478
[2019-10-18T04:08:57,516][INFO ][org.apache.kafka.clients.consumer.KafkaConsumer][main] [Consumer clientId=logstash-0, groupId=gc] Subscribed to topic(s): gc-mysql-slow
[2019-10-18T04:08:57,754][INFO ][org.apache.kafka.clients.Metadata][main] [Consumer clientId=logstash-0, groupId=gc] Cluster ID: 7LJ29pB8TW2zpdihaoyDbg
[2019-10-18T04:08:57,755][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=logstash-0, groupId=gc] Discovered group coordinator kafka-node3:26202 (id: 2147483644 rack: null)
[2019-10-18T04:08:57,764][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main] [Consumer clientId=logstash-0, groupId=gc] Revoking previously assigned partitions
[2019-10-18T04:08:57,764][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=logstash-0, groupId=gc] (Re-)joining group
[2019-10-18T04:08:57,788][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=logstash-0, groupId=gc] (Re-)joining group
[2019-10-18T04:08:57,814][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=logstash-0, groupId=gc] Successfully joined group with generation 297
[2019-10-18T04:08:57,817][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main] [Consumer clientId=logstash-0, groupId=gc] Setting newly assigned partitions: gc-mysql-slow-0, gc-mysql-slow-1
[2019-10-18T04:08:57,831][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main] [Consumer clientId=logstash-0, groupId=gc] Setting offset for partition gc-mysql-slow-0 to the committed offset FetchPosition{offset=666730, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=kafka-node3:26202 (id: 3 rack: null), epoch=6}}
[2019-10-18T04:08:57,841][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main] [Consumer clientId=logstash-0, groupId=gc] Setting offset for partition gc-mysql-slow-1 to the committed offset FetchPosition{offset=669991, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=kafka-node1:26200 (id: 1 rack: null), epoch=11}}
[2019-10-18T04:08:57,866][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
我也有通过python写了一小段简单的脚本,能正常获取到kafka数据。
下面是python的订阅参数
{
'bootstrap_servers': ['kafka-node1:26200'],
'client_id': 'kafka-python-1.4.7',
'group_id': 'gc',
'key_deserializer': None,
'value_deserializer': None,
'fetch_max_wait_ms': 500,
'fetch_min_bytes': 1,
'fetch_max_bytes': 52428800,
'max_partition_fetch_bytes': 1048576,
'request_timeout_ms': 305000,
'retry_backoff_ms': 100,
'reconnect_backoff_ms': 50,
'reconnect_backoff_max_ms': 1000,
'max_in_flight_requests_per_connection': 5,
'auto_offset_reset': 'latest',
'enable_auto_commit': True,
'auto_commit_interval_ms': 5000,
'default_offset_commit_callback': <function KafkaConsumer.<lambda> at 0x000001355B4FCE58>,
'check_crcs': True,
'metadata_max_age_ms': 300000,
'partition_assignment_strategy': (<class 'kafka.coordinator.assignors.range.RangePartitionAssignor'>, <class 'kafka.coordinator.assignors.roundrobin.RoundRobinPartitionAssignor'>),
'max_poll_records': 500,
'max_poll_interval_ms': 300000,
'session_timeout_ms': 10000,
'heartbeat_interval_ms': 3000,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
'socket_options': [(6, 1, 1)],
'sock_chunk_bytes': 4096,
'sock_chunk_buffer_count': 1000,
'consumer_timeout_ms': inf,
'security_protocol': 'PLAINTEXT',
'ssl_context': None,
'ssl_check_hostname': True,
'ssl_cafile': None,
'ssl_certfile': None,
'ssl_keyfile': None,
'ssl_crlfile': None,
'ssl_password': None,
'ssl_ciphers': None,
'api_version': (1, 0, 0),
'api_version_auto_timeout_ms': 2000,
'connections_max_idle_ms': 540000,
'metric_reporters': ,
'metrics_num_samples': 2,
'metrics_sample_window_ms': 30000,
'metric_group_prefix': 'consumer',
'selector': <class 'selectors.SelectSelector'>,
'exclude_internal_topics': True,
'sasl_mechanism': None,
'sasl_plain_username': None,
'sasl_plain_password': None,
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None,
'legacy_iterator': False}
真的不知道原因在哪了。有没有大神指点一下方向。昨晚还好好的,今天一天就不行了。现在已经堆积好几万条的数据没消费了。
input kafka
output elasticsearch
3 个回复
xiaoxiao_1110
赞同来自:
每次启动都能往es中写入1345条数据,然后就是等到5分钟后被移出 gc消费组
[2019-10-18T05:20:00,563][WARN ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=logstash-0, groupId=gc] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
[2019-10-18T05:20:00,564][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=logstash-0, groupId=gc] Member logstash-0-adabac6d-22f0-4da3-b89e-2016684a9770 sending LeaveGroup request to coordinator kafka-node3:26202 (id: 2147483644 rack: null)
但没有提交最新的偏移,每次都是从上一个地方重复在消费数据,commit没成功。不知何故
chachabusi - 新手妹子运维,希望多多关照
赞同来自:
medcl - 今晚打老虎。
赞同来自: