行动是治愈恐惧的良药,而犹豫、拖延将不断滋养恐惧。

logstash-5.0.1-kafka-input启动exception

Logstash | 作者 forsaken627 | 发布于2016年11月24日 | 阅读数:12945

kafka-kafka_2.10-0.8.2
logstash-5.0.1
[2016-11-24T16:51:39,783][WARN ][org.apache.kafka.common.utils.AppInfoParser] Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=logstash
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[?:1.8.0_111]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[?:1.8.0_111]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[?:1.8.0_111]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[?:1.8.0_111]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) ~[?:1.8.0_111]
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[?:1.8.0_111]
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58) [kafka-clients-0.10.0.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:694) [kafka-clients-0.10.0.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:587) [kafka-clients-0.10.0.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:569) [kafka-clients-0.10.0.1.jar:?]
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_111]
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) [?:1.8.0_111]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) [?:1.8.0_111]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) [?:1.8.0_111]
at org.jruby.javasupport.JavaConstructor.newInstanceDirect(JavaConstructor.java:277) [?:?]
at org.jruby.java.invokers.ConstructorInvoker.call(ConstructorInvoker.java:83) [?:?]
at org.jruby.java.invokers.ConstructorInvoker.call(ConstructorInvoker.java:174) [?:?]
at org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:336) [?:?]
at org.jruby.runtime.callsite.CachingCallSite.callBlock(CachingCallSite.java:179) [?:?]
at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:183) [?:?]
at org.jruby.java.proxies.ConcreteJavaProxy$InitializeMethod.call(ConcreteJavaProxy.java:56) [?:?]
at org.jruby.runtime.callsite.CachingCallSite.callBlock(CachingCallSite.java:177) [?:?]
at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:183) [?:?]
at org.jruby.RubyClass.newInstance(RubyClass.java:857) [?:?]
at org.jruby.RubyClass$INVOKER$i$newInstance.call(RubyClass$INVOKER$i$newInstance.gen) [?:?]
at org.jruby.internal.runtime.methods.JavaMethod$JavaMethodZeroOrOneOrNBlock.call(JavaMethod.java:297) [?:?]
at org.jruby.java.proxies.ConcreteJavaProxy$NewMethod.call(ConcreteJavaProxy.java:155) [?:?]
at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:168) [?:?]
at org.jruby.ast.CallOneArgNode.interpret(CallOneArgNode.java:57) [?:?]
at org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105) [?:?]
at org.jruby.ast.BlockNode.interpret(BlockNode.java:71) [?:?]
at org.jruby.ast.RescueNode.executeBody(RescueNode.java:221) [?:?]
at org.jruby.ast.RescueNode.interpret(RescueNode.java:116) [?:?]
at org.jruby.ast.BeginNode.interpret(BeginNode.java:83) [?:?]
at org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105) [?:?]
at org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74) [?:?]
at org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:139) [?:?]
at org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:187) [?:?]
at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:134) [?:?]
at org.jruby.ast.VCallNode.interpret(VCallNode.java:88) [?:?]
at org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105) [?:?]
at org.jruby.evaluator.ASTInterpreter.INTERPRET_BLOCK(ASTInterpreter.java:112) [?:?]
at org.jruby.runtime.Interpreted19Block.evalBlockBody(Interpreted19Block.java:206) [?:?]
at org.jruby.runtime.Interpreted19Block.yield(Interpreted19Block.java:157) [?:?]
at org.jruby.runtime.Block.yield(Block.java:142) [?:?]
at org.jruby.RubyEnumerable$23.call(RubyEnumerable.java:773) [?:?]
at org.jruby.runtime.CallBlock19.yieldSpecific(CallBlock19.java:71) [?:?]
at org.jruby.runtime.Block.yieldSpecific(Block.java:111) [?:?]
at org.jruby.RubyFixnum.times(RubyFixnum.java:275) [?:?]
at org.jruby.RubyFixnum$INVOKER$i$0$0$times.call(RubyFixnum$INVOKER$i$0$0$times.gen) [?:?]
at org.jruby.internal.runtime.methods.JavaMethod$JavaMethodZeroBlock.call(JavaMethod.java:458) [?:?]
at org.jruby.RubyClass.finvoke(RubyClass.java:549) [?:?]
at org.jruby.runtime.Helpers.invoke(Helpers.java:498) [?:?]
at org.jruby.RubyBasicObject.callMethod(RubyBasicObject.java:388) [?:?]
at org.jruby.RubyEnumerator.each(RubyEnumerator.java:274) [?:?]
at org.jruby.RubyEnumerator$INVOKER$i$each.call(RubyEnumerator$INVOKER$i$each.gen) [?:?]
at org.jruby.RubyClass.finvoke(RubyClass.java:528) [?:?]
at org.jruby.runtime.Helpers.invoke(Helpers.java:486) [?:?]
at org.jruby.RubyEnumerable.callEach19(RubyEnumerable.java:104) [?:?]
at org.jruby.RubyEnumerable.collectCommon19(RubyEnumerable.java:765) [?:?]
at org.jruby.RubyEnumerable.map19(RubyEnumerable.java:757) [?:?]
at org.jruby.RubyEnumerable$INVOKER$s$0$0$map19.call(RubyEnumerable$INVOKER$s$0$0$map19.gen) [?:?]
at org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:316) [?:?]
at org.jruby.runtime.callsite.CachingCallSite.callBlock(CachingCallSite.java:145) [?:?]
at org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:154) [?:?]
at org.jruby.ast.CallNoArgBlockNode.interpret(CallNoArgBlockNode.java:64) [?:?]
at org.jruby.ast.InstAsgnNode.interpret(InstAsgnNode.java:95) [?:?]
at org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105) [?:?]
at org.jruby.ast.BlockNode.interpret(BlockNode.java:71) [?:?]
at org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74) [?:?]
at org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:182) [?:?]
at org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:203) [?:?]
at org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:326) [?:?]
at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:170) [?:?]
at org.jruby.ast.CallOneArgNode.interpret(CallOneArgNode.java:57) [?:?]
at org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105) [?:?]
at org.jruby.ast.RescueNode.executeBody(RescueNode.java:221) [?:?]
at org.jruby.ast.RescueNode.interpret(RescueNode.java:116) [?:?]
at org.jruby.ast.EnsureNode.interpret(EnsureNode.java:96) [?:?]
at org.jruby.ast.BeginNode.interpret(BeginNode.java:83) [?:?]
at org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105) [?:?]
at org.jruby.ast.BlockNode.interpret(BlockNode.java:71) [?:?]
at org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74) [?:?]
at org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:182) [?:?]
at org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:203) [?:?]
at org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:326) [?:?]
at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:170) [?:?]
at org.jruby.ast.FCallOneArgNode.interpret(FCallOneArgNode.java:36) [?:?]
at org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105) [?:?]
at org.jruby.evaluator.ASTInterpreter.INTERPRET_BLOCK(ASTInterpreter.java:112) [?:?]
at org.jruby.runtime.Interpreted19Block.evalBlockBody(Interpreted19Block.java:206) [?:?]
at org.jruby.runtime.Interpreted19Block.yield(Interpreted19Block.java:194) [?:?]
at org.jruby.runtime.Interpreted19Block.call(Interpreted19Block.java:125) [?:?]
at org.jruby.runtime.Block.call(Block.java:101) [?:?]
at org.jruby.RubyProc.call(RubyProc.java:300) [?:?]
at org.jruby.RubyProc.call(RubyProc.java:230) [?:?]
at org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:99) [?:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]
Exception in thread "Ruby-0-Thread-7: /data/dmp/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.0.6/lib/logstash/inputs/kafka.rb:191" Exception in thread "Ruby-0-Thread-9: /data/dmp/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.0.6/lib/logstash/inputs/kafka.rb:191" Exception in thread "Ruby-0-Thread-8: /data/dmp/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.0.6/lib/logstash/inputs/kafka.rb:191" org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'brokers': Error reading field 'host': Error reading string of length 27489, only 114 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(org/apache/kafka/common/protocol/types/Schema.java:73)
at org.apache.kafka.clients.NetworkClient.parseResponse(org/apache/kafka/clients/NetworkClient.java:380)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(org/apache/kafka/clients/NetworkClient.java:449)
at org.apache.kafka.clients.NetworkClient.poll(org/apache/kafka/clients/NetworkClient.java:269)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java:360)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java:224)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java:192)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java:163)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:179)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(org/apache/kafka/clients/consumer/KafkaConsumer.java:974)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(org/apache/kafka/clients/consumer/KafkaConsumer.java:938)
at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:498)
at RUBY.thread_runner(/data/dmp/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.0.6/lib/logstash/inputs/kafka.rb:201)
at java.lang.Thread.run(java/lang/Thread.java:745)
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'brokers': Error reading field 'host': Error reading string of length 27489, only 114 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(org/apache/kafka/common/protocol/types/Schema.java:73)
at org.apache.kafka.clients.NetworkClient.parseResponse(org/apache/kafka/clients/NetworkClient.java:380)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(org/apache/kafka/clients/NetworkClient.java:449)
at org.apache.kafka.clients.NetworkClient.poll(org/apache/kafka/clients/NetworkClient.java:269)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java:360)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java:224)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java:192)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java:163)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:179)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(org/apache/kafka/clients/consumer/KafkaConsumer.java:974)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(org/apache/kafka/clients/consumer/KafkaConsumer.java:938)
at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:498)
at RUBY.thread_runner(/data/dmp/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.0.6/lib/logstash/inputs/kafka.rb:201)
at java.lang.Thread.run(java/lang/Thread.java:745)
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'brokers': Error reading field 'host': Error reading string of length 27489, only 114 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(org/apache/kafka/common/protocol/types/Schema.java:73)
at org.apache.kafka.clients.NetworkClient.parseResponse(org/apache/kafka/clients/NetworkClient.java:380)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(org/apache/kafka/clients/NetworkClient.java:449)
at org.apache.kafka.clients.NetworkClient.poll(org/apache/kafka/clients/NetworkClient.java:269)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java:360)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java:224)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java:192)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java:163)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:179)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(org/apache/kafka/clients/consumer/KafkaConsumer.java:974)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(org/apache/kafka/clients/consumer/KafkaConsumer.java:938)
at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:498)
at RUBY.thread_runner(/data/dmp/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.0.6/lib/logstash/inputs/kafka.rb:201)
at java.lang.Thread.run(java/lang/Thread.java:745)
[2016-11-24T16:51:39,992][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>["http://host1.elk.freed.so:9200&quot;, "http://host2.elk.freed.so:9200&quot;, "http://host3.elk.freed.so:9200&quot;]}}
[2016-11-24T16:51:39,995][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2016-11-24T16:51:40,175][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"_all"=>{"enabled"=>true, "omit_norms"=>true}, "dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"string", "index"=>"analyzed", "omit_norms"=>true, "fielddata"=>{"format"=>"disabled"}}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"string", "index"=>"analyzed", "omit_norms"=>true, "fielddata"=>{"format"=>"disabled"}, "fields"=>{"raw"=>{"type"=>"string", "index"=>"not_analyzed", "doc_values"=>true, "ignore_above"=>256}}}}}, {"float_fields"=>{"match"=>"*", "match_mapping_type"=>"float", "mapping"=>{"type"=>"float", "doc_values"=>true}}}, {"double_fields"=>{"match"=>"*", "match_mapping_type"=>"double", "mapping"=>{"type"=>"double", "doc_values"=>true}}}, {"byte_fields"=>{"match"=>"*", "match_mapping_type"=>"byte", "mapping"=>{"type"=>"byte", "doc_values"=>true}}}, {"short_fields"=>{"match"=>"*", "match_mapping_type"=>"short", "mapping"=>{"type"=>"short", "doc_values"=>true}}}, {"integer_fields"=>{"match"=>"*", "match_mapping_type"=>"integer", "mapping"=>{"type"=>"integer", "doc_values"=>true}}}, {"long_fields"=>{"match"=>"*", "match_mapping_type"=>"long", "mapping"=>{"type"=>"long", "doc_values"=>true}}}, {"date_fields"=>{"match"=>"*", "match_mapping_type"=>"date", "mapping"=>{"type"=>"date", "doc_values"=>true}}}, {"geo_point_fields"=>{"match"=>"*", "match_mapping_type"=>"geo_point", "mapping"=>{"type"=>"geo_point", "doc_values"=>true}}}], "properties"=>{"@timestamp"=>{"type"=>"date", "doc_values"=>true}, "@version"=>{"type"=>"string", "index"=>"not_analyzed", "doc_values"=>true}, "geoip"=>{"type"=>"object", "dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip", "doc_values"=>true}, "location"=>{"type"=>"geo_point", "doc_values"=>true}, "latitude"=>{"type"=>"float", "doc_values"=>true}, "longitude"=>{"type"=>"float", "doc_values"=>true}}}}}}}}
[2016-11-24T16:51:40,189][INFO ][logstash.outputs.elasticsearch] Installing elasticsearch template to _template/logstash
[2016-11-24T16:51:40,274][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["host1.elk.freed.so:9200", "host2.elk.freed.so:9200", "host3.elk.freed.so:9200"]}
[2016-11-24T16:51:40,369][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/data/dmp/logstash/conf/GeoLite2-City.mmdb"}
[2016-11-24T16:51:40,392][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>125}
[2016-11-24T16:51:40,393][INFO ][logstash.pipeline ] Pipeline main started
[2016-11-24T16:51:40,475][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2016-11-24T16:51:43,455][WARN ][logstash.agent ] stopping pipeline {:id=>"main"}
我得配置文件
input {
kafka {
bootstrap_servers => "kafka1.dmp.com:9092"
topics => ["80-client-lz.rili.cn"]
group_id => "logstash"
client_id => "logstash"
codec => plain
consumer_threads => 3
decorate_events => true
type => "nginx-access"
}
}

filter {
if [type] == "nginx-access" {
grok {
patterns_dir => "/data/dmp/logstash/patterns/nginx-access"
match => {
"message" => "%{NGINXACCESS}"
}
}
date {
match => [ "time_locale" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
ruby { code => "event['http_x_forwarded_for'] = event['http_x_forwarded_for'].split(',')" }
geoip {
source => "http_x_forwarded_for"
target => "geoip"
database => "/data/dmp/logstash/conf/GeoLite2-City.mmdb"
}
}
}


output {
elasticsearch {
hosts => ["host1.elk.freed.so:9200","host2.elk.freed.so:9200","host3.elk.freed.so:9200"]
index => "logstash-%{type}-%{+YYYY.MM.dd}"
document_type => "%{type}"
workers => 1
flush_size => 10
idle_flush_time => 10
template_overwrite => true
}
}
已邀请:

leighton_buaa

赞同来自:

这是个warning,应该不影响使用吧

forsaken627 - 90后打字员

赞同来自:

注意还有这个地方
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]
Exception in thread "Ruby-0-Thread-7: /data/dmp/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.0.6/lib/logstash/inputs/kafka.rb:191" Exception in thread "Ruby-0-Thread-9: /data/dmp/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.0.6/lib/logstash/inputs/kafka.rb:191" Exception in thread "Ruby-0-Thread-8: /data/dmp/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-5.0.6/lib/logstash/inputs/kafka.rb:191" org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'brokers': Error reading field 'host': Error reading string of length 27489, only 114 bytes available

leighton_buaa

赞同来自:

我建议你先把filter和elasticsearch的output去掉,直接stdout输出,看看能不能启动

forsaken627 - 90后打字员

赞同来自:

我看这个报错跟filter和output没有关系吧,跟input-kafka这个插件有关系的
虽然是这样,我把filter和elasticsearch的output去掉再次启动,依然还是这个错误

leighton_buaa

赞同来自:

你看看你的kafka1.dmp.com:9092是否启动成功了

pmgcat

赞同来自:

可以把consumer_threads设为1试试

lsyoung

赞同来自:

我也遇到的同样的问题,还没有解决,但是查看资料应该是kafka集群版本和logstash的kafka插件版本不配套产生的问题。
kafka.rb_.png

redhat

赞同来自:

kafka集群版本和logstash的kafka插件版本不配套产生的问题。要么升级kafka版本,要么使用低版本的logstash,低版本的logstash也是可以向高版本的es发数据的

fxyfdf - 18

赞同来自:

 可能是auto.offset.reset  参数的原因。如果在logstash中有配置auto_offset_reset => "none" 或kafka 中有配置,注释即可。

当Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时(例如,因为该数据已被删除),该怎么办:
最早:自动将偏移重置为最早的偏移
最新:自动将偏移重置为最新偏移
none:如果没有为消费者组找到以前的偏移,则向消费者抛出异常
任何其他:抛出异常到消费者。
“auto.offset.reset”的值只能是:[latest, earliest, none]中的一个,默认是"latest"


 

要回复问题请先登录注册