不同数据库的支持
mysql
数据类型
显示样例
是否支持timestamp
statement
tracking_column
tracking_column_type
SQL示例
date
2020-10-20
N
select *, datediff(date, '1970-01-01') as days from tbl_time where datediff(date, '1970-01-01') > :sql_last_value
days
numeric
select *, datediff(date, '1970-01-01') as days from tbl_time where datediff(date, '1970-01-01') > 1603244266
datetime timestamp
2020-10-20 06:12:01
Y
select * from tbl_time where time > :sql_last_value
time
timestamp
select * from tbl_time where time > '2020-10-20 06:12:01'
时间戳
1603244266
Y
select *, FROM_UNIXTIME(shjnch, '%Y-%m-%d %h:%i:%s') as timestamp from tbl_time where FROM_UNIXTIME(shjnch, '%Y-%m-%d %h:%i:%s') > :sql_last_value
timestamp
timestamp
select *, FROM_UNIXTIME(shjnch, '%Y-%m-%d %h:%i:%s') as timestamp from tbl_time where FROM_UNIXTIME(shjnch, '%Y-%m-%d %h:%i:%s') > '2020-10-21 14:00:00'
sqlserver
数据类型
显示样例
是否支持timestamp
date
2020-10-21
N
time
14:00:00.0000000
N
datetime
2020-10-21 13:59:40.000
Y
datetime2
2020-10-21 14:00:00
Y
smalldatetime
2020-10-21 14:00:00
Y
datetimeoffset
2020-10-21 14:00:00.0000000 +08:00
Y
db2
数据类型
显示样例
是否支持timestamp
date
2020-10-21
N
time
14:00:00.0000000
N
timestamp
2020-10-21 13:59:40.000
Y
Oracle
数据类型
显示样例
是否支持timestamp
statement
date
2010-2-12
N
timestamp
12-FEB-10 01.24.52.234123211 PM
Y
select *,to_char(sysdate,'YYYY-MM-DD HH24:MI:SS') as time d1 from dual where to_char(sysdate,'YYYY-MM-DD HH24:MI:SS') > :sql_last_value
Postgre
数据类型
显示样例
是否支持timestamp
date
1997-01-01
N
timestamp
2020-06-17 10:01:08.03282
Y
time
12:00:00
N
sybase
数据类型
显示样例
是否支持timestamp
date
Jul 24 2014
N
timestamp
0x00000000000a8b75
N
datetime
待验证
smalldatetime
待验证
总结
通过测试,基本上可以断定,只要是时间格式为2020-10-21 14:00:00[.000]这种格式,都可以通过timestamp来实现时间增量更新。
对于不能通过这种方式的,看看有没有对应的函数进行转换成这种格式,如果没有,则只能采用datediff转换成天数(或秒数)之后通过numeric实现增量同步了。
如需要根据上述的date字段做增量同步,则可配置如下:
statement => "select *, datediff(s, '1970-01-01', date) from tbl_time where datediff(s, '1970-01-01', date) > :sql_last_value"
tracking_column => "datediff(s, '1970-01-01', date)"
tracking_column_type => "numeric"
配置示例
下面给出两个具体的配置示例:
MySQL数据库表结构如下:
mysql> desc tbl_time;
+-----------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-----------+--------------+------+-----+---------+-------+
| id | int(11) | NO | PRI | NULL | |
| bigid | bigint(20) | YES | | NULL | |
| name | varchar(255) | YES | | NULL | |
| date | date | YES | | NULL | |
| time | datetime | YES | | NULL | |
| timestamp | timestamp | YES | | NULL | |
| shjnch | bigint(255) | YES | | NULL | |
+-----------+--------------+------+-----+---------+-------+
7 rows in set (0.00 sec)
表中有一条数据:
mysql> select * from tbl_time;
+----+-------+-------+------------+---------------------+---------------------+------------+
| id | bigid | name | date | time | timestamp | shjnch |
+----+-------+-------+------------+---------------------+---------------------+------------+
| 1 | 1 | time1 | 2020-10-21 | 2020-10-21 09:37:31 | 2020-10-21 09:37:34 | 1603244266 |
+----+-------+-------+------------+---------------------+---------------------+------------+
1 row in set (0.00 sec)
为了能够查询出数据,我将增量查询的sql中的>号改成
input { jdbc { jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/mytest?useSSL=false" jdbc_user => "root" jdbc_password => "123456" statement => "select *, FROM_UNIXTIME(shjnch, '%Y-%m-%d %h:%i:%s') as timestamp from tbl_time where FROM_UNIXTIME(shjnch, '%Y-%m-%d %h:%i:%s') < :sql_last_value"" schedule => "*/1 * * * *" connection_retry_attempts => 5 connection_retry_attempts_wait_time => 1 tracking_column => "timestamp" tracking_column_type => "timestamp" columns_charset => { "message" => "utf-8" } use_column_value => true lowercase_column_names => false record_last_run => true add_field => { "@topic" => "fc491237449424896" "@tags" => [] "@ip" => "127.0.0.1" } }
}
通过kafka可消费到数据如下:
{"@topic":"fc491237449424896","timestamp":"2020-10-21 09:37:46","@ip":"127.0.0.1","date":"2020-10-21T00:00:00.000+08:00","bigid":1,"time":"2020-10-21T09:37:31.000+08:00","@timestamp":"2020-10-21T15:36:02.526+08:00","id":1,"shjnch":1603244266,"name":"time1"}
文章来源: segmentfault.com,作者:禹鼎侯,版权归原作者所有,如需转载,请联系作者。
原文链接:segmentfault.com/a/1190000037555838