使用 shuf 来打乱一个文件中的行或是选择文件中一个随机的行。

极其怪异的GBK乱码问题

Logstash | 作者 supolu | 发布于2019年09月29日 | 阅读数:4630

ELK版本7.3.2 kafka版本的2.12
业务需要把kafka的数据通过logstash传入ES,kafka的数据是GBK格式的,无论怎么设置入到es都是乱码。
  • 确认kafka数据源:打开CRT,设置默认GB2312,输入kafka消费命令,可正常显示中文无乱码
  • 测试logstash处理GBK数据:新建GBK格式的测试文件,通过如下配置可正常输出UTF-8格式的结果文件。

input {
  file {
        path => "/home/wangyu1/app/logstash/gbk2.txt"
        start_position => "beginning"
                codec => plain{
                charset=>"GBK"
        }
  }
}
output {
    file {
      path => "/home/wangyu1/app/logstash/result_gbk1715.txt"
    }
        stdout{
        }
}
  • kafka input输出就乱码了,配置如下,试过GBK UTF-8 GB2312,输入输出都试过了,每次都乱码而且都乱码格式都不一样

input {
  kafka {
        bootstrap_servers=>"localhost:9092"
        topics=>["jiake-alarm"]
        type => "jiake-alarm"
        codec => plain{
        charset=>"GBK"
    }
  }
}


output {
  kafka {
        bootstrap_servers=>"localhost:9092"
        topic_id=>["jiake_test1"]
  }
    file {
      path => "/home/wangyu1/app/logstash/result_1837.txt"
    }
}
已邀请:

doom

赞同来自: supolu yj7778826

这样的话,就别用设置  key_deserializer_class,value_deserializer_class 的输出类;系统默认是
org.apache.kafka.common.serialization.StringDeserializer,里面默认用UTF-8转码,这个插件还不能外部配置;
所以,修改成 字节流的形式;
        key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
        value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
然后在添加输出转码;
codec => plain { charset => "GBK" }
具体配置如下:
input {
kafka {
bootstrap_servers =>"n0:9092"
topics => ["0930logstash"]
type => "test"
key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
auto_offset_reset =>"latest"
codec => plain { charset => "GBK" }
}
}
output {
file {
path => "/root/test2_rel.txt"
}
stdout { codec => rubydebug }
}

doom

赞同来自: supolu

遇到的问题相同,都是GBK编码的文件,可以解码正常,但是经过卡夫卡就乱码;我也测试过,数据源端采用flume收集文件,传入kafka字符是GBK的,kafka自带的显示没有问题;我用java客户端,连接怎么解码都是乱码;可能还需要继续研究;kafka默认字符uft-8;中间件一般很麻烦修改配置的;目前,建议在数据源修改; ​解决最好还是用UTF-8;
我的是在数据源端就设置了uft-8;你是要是其他数据源最后在数据源修改为UTF-8,在传入kafka;
下面的flume的配置:
kafka.sources = r1
kafka.channels = c1
kafka.sinks = k1


kafka.sources.r1.type=spooldir
kafka.sources.r1.spoolDir=/root/flume
#设置监视目录文件编码为GBK
kafka.sources.r1.inputCharset=GBK
kafka.sources.r1.consumeOrder=random
kafka.sources.r1.recursiveDirectorySearch=true
kafka.sources.r1.deletePolicy=immediate
kafka.sources.r1.pollDelay=500
kafka.sources.r1.fileHeader=true
kafka.sources.r1.fileHeaderKey=fileAbsolutePath
kafka.sources.r1.deserializer.maxLineLength=10000
kafka.sources.r1.deserializer.outputCharset=GBK

kafka.channels.c1.type=memory
kafka.channels.c1.capacity=10000
kafka.channels.c1.transactionCapacity=100

#设置Kafka接收器
kafka.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
#设置Kafka的broker地址和端口号
kafka.sinks.k1.brokerList=n0:9092
#设置Kafka的Topic
kafka.sinks.k1.topic=0930logstash

#设置序列化方式
kafka.sinks.k1.serializer.class=kafka.serializer.StringEncoder

kafka.sources.r1.channels =c1
kafka.sinks.k1.channel=c1

logstash正常配置,不需要处理转码:
input {
kafka {
bootstrap_servers =>"n0:9092"
topics => ["0930logstash"]
type => "test"
auto_offset_reset =>"latest"
# codec => plain { charset => "GBK" }
}
}
output {
file {
path => "/root/test2_rel.txt"
}
stdout { codec => rubydebug }
}

 

要回复问题请先登录注册