問題描述
我試圖在 sqlite 中建立一個(gè)包含兩個(gè)表的數(shù)據(jù)庫.我的一個(gè)表有一個(gè)時(shí)間戳列.我正在嘗試實(shí)施時(shí)間戳模式來捕獲數(shù)據(jù)庫中的增量更改.Kafka 連接失敗并出現(xiàn)以下錯(cuò)誤:
I tried to set up a database with two tables in sqlite. Once of my table is having a timestamp column . I am trying to implement timestamp mode to capture incremental changes in the DB. Kafka connect is failing with the below error:
ERROR Failed to get current time from DB using Sqlite and query 'SELECT
CURRENT_TIMESTAMP'
(io.confluent.connect.jdbc.dialect.SqliteDatabaseDialect:471)
java.sql.SQLException: Error parsing time stamp
Caused by: java.text.ParseException: Unparseable date: "2019-02-05 02:05:29"
does not match (\p{Nd}++)\Q-\E(\p{Nd}++)\Q-\E(\p{Nd}++)\Q
\E(\p{Nd}++)\Q:\E(\p{Nd}++)\Q:\E(\p{Nd}++)\Q.\E(\p{Nd}++)
非常感謝您的幫助
配置:
name=test-query-sqlite-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:sqlite:employee.db
query=SELECT users.id, users.name, transactions.timestamp, transactions.payment_type FROM users JOIN transactions ON (users.id = transactions.user_id)
mode=timestamp
timestamp.column.name=timestamp
topic.prefix=test-joined
DDL:
CREATE TABLE transactions(id integer primary key not null,
payment_type text not null,
timestamp DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')),
user_id int not null,
constraint fk foreign key(user_id) references users(id)
);
CREATE TABLE users (id integer primary key not null,name text not null);
推薦答案
kafka connect jdbc 連接器很容易檢測(cè)到時(shí)間戳的變化,如果 'timestamp' 列的值是 'UNIX timestamp' 的格式.
The kafka connect jdbc connector easily detects the changes in the timestamp, if the values of the 'timestamp' column are in the format of the 'UNIX timestamp'.
sqlite> CREATE TABLE transact(timestamp TIMESTAMP DEFAULT (STRFTIME('%s', 'now')) not null,
...> id integer primary key not null,
...> payment_type text not null);
sqlite>
值可以插入為:
sqlite> INSERT INTO transact(timestamp,payment_type,id) VALUES (STRFTIME('%s', 'now'),'cash',1);
然后由 kafka jdbc 源連接器檢測(cè)與時(shí)間戳相關(guān)的更改,并且可以按如下方式使用:
The timestamp related changes are then detected by the kafka jdbc source connector and the same can be consumed as follows:
kafka-console-consumer --bootstrap-server localhost:9092 --topic jdbc-transact --from-beginning
{"timestamp":1562321516,"id":2,"payment_type":"card"}
{"timestamp":1562321790,"id":1,"payment_type":"online"}
這篇關(guān)于Kafka JDBC 源連接器時(shí)間戳模式對(duì) sqlite3 失敗的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,也希望大家多多支持html5模板網(wǎng)!