嘿~ 今天天气不错嘛

有关logstash jdbc 插件增量拉取数据,跟踪列为数值或日期的问题

Logstash | 作者 huhangfei | 发布于2017年05月05日 | 阅读数:14578

我想使用jdbc从数据库中根据日期字段增量拉取数据,但是发现跟踪到的日期时区有差别,通过mssql语句转换后可以查询,但是发现sql_last_value记录的不是每批数据的最大日期。不知道是什么原因,大神们帮忙看一下。
input {
jdbc {
jdbc_driver_library => "D:\elk\新建文件夹\sqljdbc_6.0\chs\sqljdbc42.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://192.168.x.x;user=x;password=x;"
jdbc_user => "xxx"
jdbc_password => "xxx"
statement_filepath => "../config-20170503/sqlscript/ep_epdb8buddy_eppartnerstatistic_session_index1.sql"
schedule => "* * * * *"
record_last_run => true
use_column_value => true
tracking_column => endtime
last_run_metadata_path => "../config-20170503/log/ep_epdb8buddy_eppartnerstatistic_session_index1"
lowercase_column_names => true
clean_run => false

jdbc_fetch_size => 10
jdbc_page_size => 10
jdbc_paging_enabled => true
}
}
sql 
SELECT TOP 30
'Session1_'+CONVERT(VARCHAR,son.Id) AS SessionGuid,
son.[Id],
son.[BeginTime],
son.[CFirstTime],
son.[BFirstTime],
son.[EndTime],
--cname
ctuser.NikeName AS CName,
--bname
epuser.DasAccountName AS BName,
(CASE WHEN son.CFirstTime=son.BeginTime THEN 1 ELSE 0 END) AS IsCActive,
CONVERT(VARCHAR(7),son.[BeginTime],102) AS ESIndexNameByMonth,
IsStatReplyDuration
FROM [Statistic].[dbo].[Session1] AS son WITH(nolock)
LEFT JOIN ChatInfo.dbo.EPUserInfo epuser WITH(nolock) ON son.SaleConsultantId=epuser.DasAccountID
LEFT JOIN ChatInfo.dbo.IMUserInfo imuserid WITH(nolock) ON son.CId=imuserid.IMUserID
LEFT JOIN ChatInfo.dbo.ClientUserInfo ctuser WITH(nolock) ON ctuser.CID = imuserid.UserID
where son.EndTime >=DATEADD(hour, -DATEDIFF(hour,GETDATE(),GETUTCDATE()), CONVERT(DATETIME2,REPLACE(REPLACE('$'+CONVERT(VARCHAR,:sql_last_value)+'$','$0$','1999-01-01'),'$','')))
已邀请:

gzliudan

赞同来自: leo2018

jdbc_paging_enabled 改为 false 看看

leo2018

赞同来自: linenlin01

今天也遇到了这个问题,根据我的分析确实是因为分页查询导致的,举个例子
 
假设你每页获取2个记录;
logstash配置的sql类似  select * from table1 where update_time >= :sql_last_value
 
假设logstash的一个获取周期内有一个批量的更新操作(update_time一样)切数量超过了你的每页获取数量2,比如是6个,然后这6个中有1个又被重新更新了,update_time是最新的
假设此时这6条记录类似下面分布
id     update_time
1        10
2        10
3        12
4        10
5        10
6        10
此时logstash的同步周期来到,先获取 update_time >= :sql_last_value 的记录数,是6
 
然后根据你每页大小是2,分3页来获取
limit 2, offset 0  获取到[1,2]   ,同时更新:sql_last_value 的值记录成了 10
limit 2, offset 2 获取到[3,4],同时更新:sql_last_value 的值记录成了 12
limit 2, offset 4 获取到 [5,6],同时更新:sql_last_value 的值记录成了 10
完成后的
:sql_last_value 的值记录为 10
 
下一个同步周期,只要你的数据没有发生变化,或者发生变化的数据恰好没有落在最后一页上,那么再下一个周期还是会从update_time >=10分页取
 
解决方案:
1、不分页,但是一个同步周期变更数据量大的时候不可行
2、查询语句加入 order by update_time ASC,保正最新更新的数据出现在最后一页
3、修改logstash mysql同步插件的逻辑,使每次记录的syncpoint文件的值不会被更小的值覆盖(我没干,但是理论可行)
 
供你参考

要回复问题请先登录注册