問題描述
我在 MySQL 中有一個 test
表,其 ID 和名稱如下:
+----+-------+|身份證 |姓名 |+----+-------+|1 |姓名1 |+----+-------+|2 |姓名2 |+----+-------+|3 |姓名3 |+----+-------+
我正在使用 Spark DataFrame
讀取此數(shù)據(jù)(使用 JDBC)并像這樣修改數(shù)據(jù)
Datasetmodified = sparkSession.sql("select id, concat(name,' - new') as name from test");modified.write().mode("overwrite").jdbc(AppProperties.MYSQL_CONNECTION_URL,測試",連接屬性);
但我的問題是,如果我提供覆蓋模式,它會刪除以前的表并創(chuàng)建一個新表但不插入任何數(shù)據(jù).
我通過從 csv 文件(與測試表相同的數(shù)據(jù))讀取并覆蓋來嘗試相同的程序.那對我有用.
我在這里遺漏了什么嗎?
謝謝!
問題出在您的代碼中.因為你覆蓋了一個你試圖從中讀取的表,所以在 Spark 可以實際訪問它之前,你有效地清除了所有數(shù)據(jù).
記住 Spark 是懶惰的.當您創(chuàng)建 Dataset
時,Spark 會獲取所需的元數(shù)據(jù),但不會加載數(shù)據(jù).所以沒有可以保留原始內容的魔法緩存.數(shù)據(jù)將在實際需要時加載.這是當您執(zhí)行 write
操作并且當您開始寫入時沒有更多數(shù)據(jù)要獲取時.
你需要的是這樣的:
- 創(chuàng)建一個
數(shù)據(jù)集
. 應用所需的轉換并將數(shù)據(jù)寫入中間 MySQL 表.
TRUNCATE
原始輸入和INSERT INTO ... SELECT
來自中間表或DROP
原始表和RENAME
中間表.
另一種但不太有利的方法是:
- 創(chuàng)建一個
數(shù)據(jù)集
. - 應用所需的轉換并將數(shù)據(jù)寫入持久 Spark 表(
df.write.saveAsTable(...)
或等效項) TRUNCATE
原始輸入.- 讀回數(shù)據(jù)并保存 (
spark.table(...).write.jdbc(...)
) - 刪除 Spark 表.
我們不能過分強調使用 Spark cache
/persist
不是正確的方法.即使使用保守的 StorageLevel
(MEMORY_AND_DISK_2
/MEMORY_AND_DISK_SER_2
) 緩存數(shù)據(jù)也可能丟失(節(jié)點故障),導致無提示的正確性錯誤.>
I have a test
table in MySQL with id and name like below:
+----+-------+
| id | name |
+----+-------+
| 1 | Name1 |
+----+-------+
| 2 | Name2 |
+----+-------+
| 3 | Name3 |
+----+-------+
I am using Spark DataFrame
to read this data (using JDBC) and modifying the data like this
Dataset<Row> modified = sparkSession.sql("select id, concat(name,' - new') as name from test");
modified.write().mode("overwrite").jdbc(AppProperties.MYSQL_CONNECTION_URL,
"test", connectionProperties);
But my problem is, if I give overwrite mode, it drops the previous table and creates a new table but not inserting any data.
I tried the same program by reading from a csv file (same data as test table) and overwriting. That worked for me.
Am I missing something here ?
Thank You!
The problem is in your code. Because you overwrite a table from which you're trying to read you effectively obliterate all data before Spark can actually access it.
Remember that Spark is lazy. When you create a Dataset
Spark fetches required metadata, but doesn't load the data. So there is no magic cache which will preserve original content. Data will be loaded when it is actually required. Here it is when you execute write
action and when you start writing there is no more data to be fetched.
What you need is something like this:
- Create a
Dataset
. Apply required transformations and write data to an intermediate MySQL table.
TRUNCATE
the original input andINSERT INTO ... SELECT
from the intermediate table orDROP
the original table andRENAME
intermediate table.
Alternative, but less favorable approach, would be:
- Create a
Dataset
. - Apply required transformations and write data to a persistent Spark table (
df.write.saveAsTable(...)
or equivalent) TRUNCATE
the original input.- Read data back and save (
spark.table(...).write.jdbc(...)
) - Drop Spark table.
We cannot stress enough that using Spark cache
/ persist
is not the way to go. Even in with the conservative StorageLevel
(MEMORY_AND_DISK_2
/ MEMORY_AND_DISK_SER_2
) cached data can be lost (node failures), leading to silent correctness errors.
這篇關于Spark SQL 和 MySQL- SaveMode.Overwrite 不插入修改的數(shù)據(jù)的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網(wǎng)!