問題描述
我正在使用 Java 生成器在我的 Kafka 主題頂部插入數據.然后我使用 Kafka jdbc connect 將數據插入到我的 Oracle 表中.下面是我的生產者代碼.
I am using a Java producer to insert data top my Kafka topic. Then I use Kafka jdbc connect to insert data into my Oracle table. Below is my producer code.
package producer.serialized.avro;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class Sender4 {
public static void main(String[] args) {
String flightSchema = "{\"type\":\"record\"," + "\"name\":\"Flight\","
+ "\"fields\":[{\"name\":\"flight_id\",\"type\":\"string\"},{\"name\":\"flight_to\",\"type\":\"string\"},{\"name\":\"flight_from\",\"type\":\"string\"}]}";
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://192.168.0.1:8081");
KafkaProducer producer = new KafkaProducer(props);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(flightSchema);
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("flight_id", "myflight");
avroRecord.put("flight_to", "QWE");
avroRecord.put("flight_from", "RTY");
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("topic9",avroRecord);
producer.send(record);
}
}
下面是我的 Kafka 連接屬性
Below is my Kafka connect properties
name=test-sink-6
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=topic9
connection.url=jdbc:oracle:thin:@192.168.0.1:1521:usera
connection.user=usera
connection.password=usera
auto.create=true
table.name.format=FLIGHTS4
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://192.168.0.1:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://192.168.0.1:8081
根據我的架構,我希望插入到我的 Oracle 表中的值是 varchar2.我創建了一個包含 3 個 varchar2 列的表.當我啟動我的連接器時,沒有插入任何東西.然后我刪除了表并在表自動創建模式下運行連接器.那個時候,表被創建并且值被插入.但問題是,列數據類型是 CLOB.我希望它是 varchar2,因為它使用的數據較少.
From my schema, I am expecting the values inserted to my Oracle table to be varchar2. I have created a table having 3 varchar2 columns. When i started my connector, nothing got inserted. Then i deleted the table and ran the connector with table auto create mode on. That time, the table got created and values got inserted. But the problem is, the column data type is CLOB. I want it to be varchar2 since it use less data.
為什么會發生這種情況,我該如何解決?謝謝你.
Why is this happening and how can i fix this? Thank you.
推薦答案
貌似Kafka的String
映射到Oracle的NCLOB
:
Looks like Kafka's String
is mapped to Oracle's NCLOB
:
<table border="1">
<tr>
<th>Schema Type</th><th>MySQL</th><th>Oracle</th><th>PostgreSQL</th><th>SQLite</th>
</tr>
<tr>
<td>INT8</td><td>TINYINT</td><td>NUMBER(3,0)</td><td>SMALLINT</td><td>NUMERIC</td>
</tr>
<tr>
<td>INT16</td><td>SMALLINT</td><td>NUMBER(5,0)</td><td>SMALLINT</td><td>NUMERIC</td>
</tr>
<tr>
<td>INT32</td><td>INT</td><td>NUMBER(10,0)</td><td>INT</td><td>NUMERIC</td>
</tr>
<tr>
<td>INT64</td><td>BIGINT</td><td>NUMBER(19,0)</td><td>BIGINT</td><td>NUMERIC</td>
</tr>
<tr>
<td>FLOAT32</td><td>FLOAT</td><td>BINARY_FLOAT</td><td>REAL</td><td>REAL</td>
</tr>
<tr>
<td>FLOAT64</td><td>DOUBLE</td><td>BINARY_DOUBLE</td><td>DOUBLE PRECISION</td><td>REAL</td>
</tr>
<tr>
<td>BOOLEAN</td><td>TINYINT</td><td>NUMBER(1,0)</td><td>BOOLEAN</td><td>NUMERIC</td>
</tr>
<tr>
<td>STRING</td><td>VARCHAR(256)</td><td>NCLOB</td><td>TEXT</td><td>TEXT</td>
</tr>
<tr>
<td>BYTES</td><td>VARBINARY(1024)</td><td>BLOB</td><td>BYTEA</td><td>BLOB</td>
</tr>
<tr>
<td>'Decimal'</td><td>DECIMAL(65,s)</td><td>NUMBER(*,s)</td><td>DECIMAL</td><td>NUMERIC</td>
</tr>
<tr>
<td>'Date'</td><td>DATE</td><td>DATE</td><td>DATE</td><td>NUMERIC</td>
</tr>
<tr>
<td>'Time'</td><td>TIME(3)</td><td>DATE</td><td>TIME</td><td>NUMERIC</td>
</tr>
<tr>
<td>'Timestamp'</td><td>TIMESTAMP(3)</td><td>TIMESTAMP</td><td>TIMESTAMP</td><td>NUMERIC</td>
</tr>
</table>
來源:https://www.ibm.com/support/knowledgecenter/en/SSPT3X_4.2.5/com.ibm.swg.im.infosphere.biginsights.admin.doc/doc/admin_kafka_jdbc_sink.html
https://docs.confluent.io/current/connect/connect-jdbc/docs/sink_connector.html
更新
OracleDialect
類(https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/sink/dialect/OracleDialect.java) 具有硬編碼的 CLOB
值,只需使用您自己的類擴展它,更改映射將無濟于事,因為方言類型是在 CLOB
中的靜態方法中定義的代碼>JdbcSinkTask (https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkTask.java)
OracleDialect
class (https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/sink/dialect/OracleDialect.java) has hardcoded CLOB
value and simply extend it with your own class and change that mapping will not help as type of dialect is defined in static method in JdbcSinkTask
(https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkTask.java)
final DbDialect dbDialect = DbDialect.fromConnectionString(config.connectionUrl);
這篇關于為什么 Kafka jdbc 將插入數據作為 BLOB 而不是 varchar 連接的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!