問題描述
我想對 SQLite 數據庫中的所有表運行自定義函數.該功能或多或少相同,但取決于單個表的架構.此外,表及其模式僅在運行時才知道(調用程序時使用指定數據庫路徑的參數).
I want to run a custom function on all tables in a SQLite database. The function is more or less the same, but depends on the schema of the individual table. Also, the tables and their schemata are only known at runtime (the program is called with an argument that specifies the path of the database).
這是我目前所擁有的:
val conf = new SparkConf().setAppName("MyApp")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// somehow bind sqlContext to DB
val allTables = sqlContext.tableNames
for( t <- allTables) {
val df = sqlContext.table(t)
val schema = df.columns
sqlContext.sql("SELECT * FROM " + t + "...").map(x => myFunc(x,schema))
}
我目前發現的唯一提示需要提前知道表格,在我的場景中不是這樣的:
The only hint I found so far needs to know the table in advance, which is not the case in my scenario:
val tableData =
sqlContext.read.format("jdbc")
.options(Map("url" -> "jdbc:sqlite:/path/to/file.db", "dbtable" -> t))
.load()
我使用的是 xerial sqlite jdbc 驅動程序.那么我怎樣才能只連接到一個數據庫,而不是一個表呢?
I am using the xerial sqlite jdbc driver. So how can I conntect solely to a database, not to a table?
使用 Beryllium 的答案作為開始,我將代碼更新為:
Using Beryllium's answer as a start I updated my code to this:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val metaData = sqlContext.read.format("jdbc")
.options(Map("url" -> "jdbc:sqlite:/path/to/file.db",
"dbtable" -> "(SELECT * FROM sqlite_master) AS t")).load()
val myTableNames = metaData.select("tbl_name").distinct()
for (t <- myTableNames) {
println(t.toString)
val tableData = sqlContext.table(t.toString)
for (record <- tableData.select("*")) {
println(record)
}
}
至少我可以在運行時讀取表名,這對我來說是一個巨大的進步.但是我看不懂表格.我兩個都試了
At least I can read the table names at runtime which is a huge step forward for me. But I can't read the tables. I tried both
val tableData = sqlContext.table(t.toString)
和
val tableData = sqlContext.read.format("jdbc")
.options(Map("url" -> "jdbc:sqlite:/path/to/file.db",
"dbtable" -> t.toString)).load()
在循環中,但在這兩種情況下,我都會收到 NullPointerException.雖然我可以打印表名,但似乎我無法連接到它們.
in the loop, but in both cases I get a NullPointerException. Although I can print the table names it seems I cannot connect to them.
最后但并非最不重要的一點是,我總是收到 SQLITE_ERROR: Connection is closed
錯誤.它看起來與此問題中描述的問題相同:SQLITE_ERROR: 當通過 JDBC 從 Spark 連接到 SQLite 數據庫時,連接被關閉
Last but not least I always get an SQLITE_ERROR: Connection is closed
error. It looks to be the same issue described in this question: SQLITE_ERROR: Connection is closed when connecting from Spark via JDBC to SQLite database
推薦答案
您可以嘗試兩種選擇
- 在您的 Spark 作業中打開一個單獨的普通 JDBC 連接
- 從 JDBC 元數據中獲取表名
- 將這些融入您的
for
理解
您可以將查詢指定為 dbtable
參數的值.在語法上,這個查詢必須看起來"像一個表,所以它必須包含在一個子查詢中.
You can specify a query as the value for the dbtable
argument. Syntactically this query must "look" like a table, so it must be wrapped in a sub query.
在該查詢中,從數據庫中獲取元數據:
In that query, get the meta data from the database:
val df = sqlContext.read.format("jdbc").options(
Map(
"url" -> "jdbc:postgresql:xxx",
"user" -> "x",
"password" -> "x",
"dbtable" -> "(select * from pg_tables) as t")).load()
此示例適用于 PostgreSQL,您必須將其調整為適用于 SQLite.
This example works with PostgreSQL, you have to adapt it for SQLite.
更新
似乎JDBC驅動程序只支持迭代一個結果集.無論如何,當您使用 collect()
來具體化表名列表時,以下代碼段應該可以工作:
It seems that the JDBC driver only supports to iterate over one result set.
Anyway, when you materialize the list of table names using collect()
, then the following snippet should work:
val myTableNames = metaData.select("tbl_name").map(_.getString(0)).collect()
for (t <- myTableNames) {
println(t.toString)
val tableData = sqlContext.read.format("jdbc")
.options(
Map(
"url" -> "jdbc:sqlite:/x.db",
"dbtable" -> t)).load()
tableData.show()
}
這篇關于在 Apache Spark 中連接到 SQLite的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!