居然是你

logstash在配置文件中指定了type并且用type做判断,但是不向kafka中发数据,去掉output中的If判断做成单文件,就发数据

Logstash | 作者 clean | 发布于2018年04月17日 | 阅读数:4801

脚本如下: 
 
去掉output中的 if 判断, 做成单文件, 启动Logstash就会发数据,是什么问题?
 
 
                    input {
    stdin {
    }
    
    jdbc {
      # mysql 数据库链接,test为数据库名
      jdbc_connection_string => "jdbc:mysql://ip:3306/vw?characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false&maxReconnects=10"
      # 用户名和密码
      jdbc_user => "xxx"
      jdbc_password => "xxx!M"
      # 驱动
      jdbc_driver_library => "/opt/logstash6/mysql/mysql-connector-java-5.1.30-bin.jar"
      # 驱动类名
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "10000"

      #使用其它字段追踪,而不是用时间
      use_column_value => true
      ##追踪的字段
      tracking_column => createdtime 
      record_last_run => true
      ##上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
      last_run_metadata_path => "/opt/logstash6/inputtxt/vw_drive.txt" 

      # 执行的sql 文件路径+名称
      #statement_filepath => "/opt/logstash6/inputtxt/test.sql"
      statement => "SELECT
                        drive.id,
                        drive.t_car_id,
                        drive.vin,
                        drive.t_org_rboid,
                        drive.t_org_smallregionid,
                        drive.t_sys_dic_pid,
                        drive.t_sys_dic_id,
                        drive.t_dealer_id,
                        drive.t_dealer_name,
                        drive.obdSN,
                        drive.plateNumber,
                        drive.t_carline_id,
                        drive.t_carline_name,
                        drive.drivingDuration,
                        drive.beginTime,
                        drive.endTime,
                        drive.fuelConsumption,
                        drive.avgFuelConsumption,
                        drive.driveDistance,
                        drive.unplugOBDTimes,
                        drive.outRangeTimes,
                        drive.driveStatus,
                        drive.ReasonType,
                        drive.Reason,
                        drive.avgSpeed,
                        drive.maxSpeed,
                        drive.createdtime
                    
                    FROM
                        t_car_drive_2018 drive
                    LEFT JOIN t_car car ON car.id = drive.t_car_id
                    LEFT JOIN t_dealer dealer ON dealer.id = drive.t_dealer_id
                    WHERE
                        drive.isdel = 0
                    AND car.isDel = 0
                    AND dealer.isdel = 0
                    and drive.createdtime > :sql_last_value
                    ORDER BY
                        drive.createdtime asc "
      # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
      schedule => "* * * * *"
     # schedule => "* * * * *"
      # 索引类型
      type => "jdbc_car_drive_table"
        codec=>json
    }
    
    
    
    
    jdbc {
      # mysql 数据库链接,test为数据库名
      jdbc_connection_string => "jdbc:mysql://ip:3306/vw?characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false&maxReconnects=10"
      # 用户名和密码
      jdbc_user => "xxx"
      jdbc_password => "xxxx!M"
      # 驱动
      jdbc_driver_library => "/opt/logstash6/mysql/mysql-connector-java-5.1.30-bin.jar"
      # 驱动类名
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "150000"

      #使用其它字段追踪,而不是用时间
      use_column_value => true
      ##追踪的字段
      tracking_column => createdtime 
      record_last_run => true
      ##上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
      last_run_metadata_path => "/opt/logstash6/inputtxt/vw_z_history_table.txt" 

      # 执行的sql 文件路径+名称
      #statement_filepath => "/opt/logstash6/inputtxt/test.sql"
      statement => "select * from ${z_history_table} a where a.createdtime > :sql_last_value order by a.createdtime asc"
      # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
      schedule => "0 * * * *"
     # schedule => "0 * * * *"
      # 索引类型
      type => "jdbc_z_history"
        codec=>json
    }
    

}



output{
    if [type] == 'jdbc_car_drive_table' {

        kafka{
            bootstrap_servers => "ip.com:9092"
            topic_id => "jdbc_vw_car_drive_topic"
            codec => json { charset => "UTF-8" }
            
    }

         stdout {
                codec => rubydebug
            }
   }

       if [type] == 'jdbc_z_history' {
                kafka{
                 bootstrap_servers => "ip:9092"
                                 topic_id => "jdbc_vw_z_history_topic"
                                 codec => json { charset => "UTF-8" }
        }

     stdout {
                codec => rubydebug
            }
   }
}
 
 
 
 
是配置有什么问题吗?
 
已邀请:

clean

赞同来自: jiang333

问题解决: 
    在sql语句中有个  xxx.type 与配置中的 type 冲突了

要回复问题请先登录注册