logstash在配置文件中指定了type并且用type做判断,但是不向kafka中发数据,去掉output中的If判断做成单文件,就发数据
Logstash | 作者 clean | 发布于2018年04月17日 | 阅读数:4801
脚本如下:
去掉output中的 if 判断, 做成单文件, 启动Logstash就会发数据,是什么问题?
input {
stdin {
}
jdbc {
# mysql 数据库链接,test为数据库名
jdbc_connection_string => "jdbc:mysql://ip:3306/vw?characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false&maxReconnects=10"
# 用户名和密码
jdbc_user => "xxx"
jdbc_password => "xxx!M"
# 驱动
jdbc_driver_library => "/opt/logstash6/mysql/mysql-connector-java-5.1.30-bin.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "10000"
#使用其它字段追踪,而不是用时间
use_column_value => true
##追踪的字段
tracking_column => createdtime
record_last_run => true
##上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
last_run_metadata_path => "/opt/logstash6/inputtxt/vw_drive.txt"
# 执行的sql 文件路径+名称
#statement_filepath => "/opt/logstash6/inputtxt/test.sql"
statement => "SELECT
drive.id,
drive.t_car_id,
drive.vin,
drive.t_org_rboid,
drive.t_org_smallregionid,
drive.t_sys_dic_pid,
drive.t_sys_dic_id,
drive.t_dealer_id,
drive.t_dealer_name,
drive.obdSN,
drive.plateNumber,
drive.t_carline_id,
drive.t_carline_name,
drive.drivingDuration,
drive.beginTime,
drive.endTime,
drive.fuelConsumption,
drive.avgFuelConsumption,
drive.driveDistance,
drive.unplugOBDTimes,
drive.outRangeTimes,
drive.driveStatus,
drive.ReasonType,
drive.Reason,
drive.avgSpeed,
drive.maxSpeed,
drive.createdtime
FROM
t_car_drive_2018 drive
LEFT JOIN t_car car ON car.id = drive.t_car_id
LEFT JOIN t_dealer dealer ON dealer.id = drive.t_dealer_id
WHERE
drive.isdel = 0
AND car.isDel = 0
AND dealer.isdel = 0
and drive.createdtime > :sql_last_value
ORDER BY
drive.createdtime asc "
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
# schedule => "* * * * *"
# 索引类型
type => "jdbc_car_drive_table"
codec=>json
}
jdbc {
# mysql 数据库链接,test为数据库名
jdbc_connection_string => "jdbc:mysql://ip:3306/vw?characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false&maxReconnects=10"
# 用户名和密码
jdbc_user => "xxx"
jdbc_password => "xxxx!M"
# 驱动
jdbc_driver_library => "/opt/logstash6/mysql/mysql-connector-java-5.1.30-bin.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "150000"
#使用其它字段追踪,而不是用时间
use_column_value => true
##追踪的字段
tracking_column => createdtime
record_last_run => true
##上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
last_run_metadata_path => "/opt/logstash6/inputtxt/vw_z_history_table.txt"
# 执行的sql 文件路径+名称
#statement_filepath => "/opt/logstash6/inputtxt/test.sql"
statement => "select * from ${z_history_table} a where a.createdtime > :sql_last_value order by a.createdtime asc"
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "0 * * * *"
# schedule => "0 * * * *"
# 索引类型
type => "jdbc_z_history"
codec=>json
}
}
output{
if [type] == 'jdbc_car_drive_table' {
kafka{
bootstrap_servers => "ip.com:9092"
topic_id => "jdbc_vw_car_drive_topic"
codec => json { charset => "UTF-8" }
}
stdout {
codec => rubydebug
}
}
if [type] == 'jdbc_z_history' {
kafka{
bootstrap_servers => "ip:9092"
topic_id => "jdbc_vw_z_history_topic"
codec => json { charset => "UTF-8" }
}
stdout {
codec => rubydebug
}
}
}
是配置有什么问题吗?
去掉output中的 if 判断, 做成单文件, 启动Logstash就会发数据,是什么问题?
input {
stdin {
}
jdbc {
# mysql 数据库链接,test为数据库名
jdbc_connection_string => "jdbc:mysql://ip:3306/vw?characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false&maxReconnects=10"
# 用户名和密码
jdbc_user => "xxx"
jdbc_password => "xxx!M"
# 驱动
jdbc_driver_library => "/opt/logstash6/mysql/mysql-connector-java-5.1.30-bin.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "10000"
#使用其它字段追踪,而不是用时间
use_column_value => true
##追踪的字段
tracking_column => createdtime
record_last_run => true
##上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
last_run_metadata_path => "/opt/logstash6/inputtxt/vw_drive.txt"
# 执行的sql 文件路径+名称
#statement_filepath => "/opt/logstash6/inputtxt/test.sql"
statement => "SELECT
drive.id,
drive.t_car_id,
drive.vin,
drive.t_org_rboid,
drive.t_org_smallregionid,
drive.t_sys_dic_pid,
drive.t_sys_dic_id,
drive.t_dealer_id,
drive.t_dealer_name,
drive.obdSN,
drive.plateNumber,
drive.t_carline_id,
drive.t_carline_name,
drive.drivingDuration,
drive.beginTime,
drive.endTime,
drive.fuelConsumption,
drive.avgFuelConsumption,
drive.driveDistance,
drive.unplugOBDTimes,
drive.outRangeTimes,
drive.driveStatus,
drive.ReasonType,
drive.Reason,
drive.avgSpeed,
drive.maxSpeed,
drive.createdtime
FROM
t_car_drive_2018 drive
LEFT JOIN t_car car ON car.id = drive.t_car_id
LEFT JOIN t_dealer dealer ON dealer.id = drive.t_dealer_id
WHERE
drive.isdel = 0
AND car.isDel = 0
AND dealer.isdel = 0
and drive.createdtime > :sql_last_value
ORDER BY
drive.createdtime asc "
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
# schedule => "* * * * *"
# 索引类型
type => "jdbc_car_drive_table"
codec=>json
}
jdbc {
# mysql 数据库链接,test为数据库名
jdbc_connection_string => "jdbc:mysql://ip:3306/vw?characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false&maxReconnects=10"
# 用户名和密码
jdbc_user => "xxx"
jdbc_password => "xxxx!M"
# 驱动
jdbc_driver_library => "/opt/logstash6/mysql/mysql-connector-java-5.1.30-bin.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "150000"
#使用其它字段追踪,而不是用时间
use_column_value => true
##追踪的字段
tracking_column => createdtime
record_last_run => true
##上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
last_run_metadata_path => "/opt/logstash6/inputtxt/vw_z_history_table.txt"
# 执行的sql 文件路径+名称
#statement_filepath => "/opt/logstash6/inputtxt/test.sql"
statement => "select * from ${z_history_table} a where a.createdtime > :sql_last_value order by a.createdtime asc"
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "0 * * * *"
# schedule => "0 * * * *"
# 索引类型
type => "jdbc_z_history"
codec=>json
}
}
output{
if [type] == 'jdbc_car_drive_table' {
kafka{
bootstrap_servers => "ip.com:9092"
topic_id => "jdbc_vw_car_drive_topic"
codec => json { charset => "UTF-8" }
}
stdout {
codec => rubydebug
}
}
if [type] == 'jdbc_z_history' {
kafka{
bootstrap_servers => "ip:9092"
topic_id => "jdbc_vw_z_history_topic"
codec => json { charset => "UTF-8" }
}
stdout {
codec => rubydebug
}
}
}
是配置有什么问题吗?
1 个回复
clean
赞同来自: jiang333
在sql语句中有个 xxx.type 与配置中的 type 冲突了