問題描述
所以我正在做一些應該很簡單的事情,但顯然它不在 Spark SQL 中.
So I'm doing something that should be simple, but apparently it's not in Spark SQL.
如果我在 MySQL 中運行以下查詢,查詢會在幾分之一秒內完成:
If I run the following query in MySQL, the query finishes in a fraction of a second:
SELECT ua.address_id
FROM user u
inner join user_address ua on ua.address_id = u.user_address_id
WHERE u.user_id = 123;
但是,在 Spark (1.5.1) 下的 HiveContext 中運行相同的查詢需要超過 13 秒.添加更多連接會使查詢運行很長時間(超過 10 分鐘).我不確定我在這里做錯了什么以及如何加快速度.
However, running the same query in HiveContext under Spark (1.5.1) takes more than 13 seconds. Adding more joins makes the query run for a very very long time (over 10 minutes). I'm not sure what I'm doing wrong here and how I can speed things up.
這些表是 MySQL 表,它們作為臨時表加載到 Hive 上下文中.它在單個實例中運行,數據庫在遠程機器上.
The tables are MySQL tables that are loaded into the Hive Context as temporary tables.This is running in a single instance, with the database on a remote machine.
- 用戶表大約有 480 萬行.
- user_address 表有 350,000 行.
表有外鍵字段,但在數據庫中沒有定義明確的 fk 關系.我正在使用 InnoDB.
The tables have foreign key fields, but no explicit fk relationships is defined in the db. I'm using InnoDB.
Spark 中的執行計劃:
The execution plan in Spark:
計劃:
掃描JDBCRelation(jdbc:mysql://.user,[Lorg.apache.spark.Partition;@596f5dfc,{user=, password=, url=jdbc:mysql://, dbtable=user})[address_id#0L,user_address_id#27L]
Scan JDBCRelation(jdbc:mysql://.user,[Lorg.apache.spark.Partition;@596f5dfc, {user=, password=, url=jdbc:mysql://, dbtable=user}) [address_id#0L,user_address_id#27L]
過濾器 (user_id#0L = 123) 掃描JDBCRelation(jdbc:mysql://.user_address,[Lorg.apache.spark.Partition;@2ce558f3,{user=, password=,url=jdbc:mysql://, dbtable=user_address})[address_id#52L]
Filter (user_id#0L = 123) Scan JDBCRelation(jdbc:mysql://.user_address, [Lorg.apache.spark.Partition;@2ce558f3,{user=, password=, url=jdbc:mysql://, dbtable=user_address})[address_id#52L]
ConvertToUnsafe ConvertToUnsafe
ConvertToUnsafe ConvertToUnsafe
TungstenExchange hashpartitioning(address_id#52L) TungstenExchangehashpartitioning(user_address_id#27L) TungstenSort [address_id#52LASC], false, 0 TungstenSort [user_address_id#27L ASC], false, 0
TungstenExchange hashpartitioning(address_id#52L) TungstenExchange hashpartitioning(user_address_id#27L) TungstenSort [address_id#52L ASC], false, 0 TungstenSort [user_address_id#27L ASC], false, 0
SortMergeJoin [user_address_id#27L], [address_id#52L]
SortMergeJoin [user_address_id#27L], [address_id#52L]
== 物理計劃 == TungstenProject [address_id#0L]
== Physical Plan == TungstenProject [address_id#0L]
推薦答案
首先,您執行的查詢類型極其低效.至于現在(Spark 1.5.0*)要執行這樣的連接,每次執行查詢時都必須對兩個表進行混洗/散列分區.對于 users
表,其中 user_id = 123
謂詞最有可能被下推,但仍然需要對 user_address
.
First of all type of query you perform is extremely inefficient. As for now (Spark 1.5.0*) to perform join like this, both tables has to be shuffled / hash-partitioned each time query is executed. It shouldn't be a problem in case of users
table where user_id = 123
predicate is most likely pushed-down but still requires full shuffle on user_address
.
此外,如果表只注冊而不緩存,那么每次執行此查詢都會從 MySQL 獲取整個 user_address
表到 Spark.
Moreover, if tables are only registered and not cached, then every execution of this query will fetch a whole user_address
table from MySQL to Spark.
我不確定我在這里做錯了什么以及如何加快速度.
I'm not sure what I'm doing wrong here and how I can speed things up.
不清楚為什么要將 Spark 用于應用程序,但單機設置、小數據和查詢類型表明 Spark 不適合這里.
It is not exactly clear why you want to use Spark for application but single machine setup, small data and type of queries suggest that Spark is not a good fit here.
一般來說,如果應用程序邏輯需要單條記錄訪問,那么 Spark SQL 的性能就不會很好.它專為分析查詢而設計,而不是作為 OLTP 數據庫的替代品.
Generally speaking if application logic requires a single record access then Spark SQL won't perform well. It is designed for analytical queries not as a OLTP database replacement.
如果單個表/數據框小得多,您可以嘗試廣播.
If a single table / data frame is much smaller you could try broadcasting.
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.broadcast
val user: DataFrame = ???
val user_address: DataFrame = ???
val userFiltered = user.where(???)
user_addresses.join(
broadcast(userFiltered), $"address_id" === $"user_address_id")
<小時>
* 這應該在 Spark 1.6.0 中改變,SPARK-11410應該啟用持久表分區.
這篇關于Spark SQL/Hive 查詢永遠需要加入的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!