問(wèn)題描述
我經(jīng)常需要將一個(gè)函數(shù)應(yīng)用到一個(gè)非常大的DataFrame
(混合數(shù)據(jù)類(lèi)型)的組中,并希望利用多個(gè)內(nèi)核.
I often need to apply a function to the groups of a very large DataFrame
(of mixed data types) and would like to take advantage of multiple cores.
我可以從組中創(chuàng)建一個(gè)迭代器并使用多處理模塊,但效率不高,因?yàn)槊總€(gè)組和函數(shù)的結(jié)果都必須為進(jìn)程之間的消息傳遞進(jìn)行腌制.
I can create an iterator from the groups and use the multiprocessing module, but it is not efficient because every group and the results of the function must be pickled for messaging between processes.
有什么方法可以避免酸洗甚至完全避免 DataFrame
的復(fù)制?看起來(lái)多處理模塊的共享內(nèi)存功能僅限于 numpy
數(shù)組.還有其他選擇嗎?
Is there any way to avoid the pickling or even avoid the copying of the DataFrame
completely? It looks like the shared memory functions of the multiprocessing modules are limited to numpy
arrays. Are there any other options?
推薦答案
從上面的評(píng)論看來(lái),這似乎是為 pandas
計(jì)劃的(還有一個(gè)看起來(lái)很有趣的 rosetta
項(xiàng)目 我剛剛注意到).
From the comments above, it seems that this is planned for pandas
some time (there's also an interesting-looking rosetta
project which I just noticed).
然而,在所有并行功能都被整合到 pandas
之前,我注意到編寫(xiě)高效的 & 是非常容易的.直接使用 cython
+ pandas 進(jìn)行非內(nèi)存復(fù)制并行擴(kuò)充"http://www.google.co.il/url?sa=t&rct=j&q=&esrc=s&source=web&cd=1&ved=0CB0QFjAA&url=http%3A%2F%2Fwww.openmp.org%2F&ei=HKpdVfyVJcj8ULXHgcAF&usg=AFQjCNGlD5aZM8ZP3Qx7WXT74Y7C54jLNQ&bvm=bv.93756505,d.d24">OpenMP 和 C++.
However, until every parallel functionality is incorporated into pandas
, I noticed that it's very easy to write efficient & non-memory-copying parallel augmentations to pandas
directly using cython
+ OpenMP and C++.
這是一個(gè)編寫(xiě)并行 groupby-sum 的簡(jiǎn)短示例,其用法如下:
Here's a short example of writing a parallel groupby-sum, whose use is something like this:
import pandas as pd
import para_group_demo
df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)})
print para_group_demo.sum(df.a, df.b)
輸出是:
sum
key
0 6
1 11
2 4
<小時(shí)>
注意 毫無(wú)疑問(wèn),這個(gè)簡(jiǎn)單示例的功能最終將成為 pandas
的一部分.然而,有些事情在 C++ 中并行化一段時(shí)間會(huì)更自然,重要的是要知道將其組合到 pandas
中是多么容易.
Note Doubtlessly, this simple example's functionality will eventually be part of pandas
. Some things, however, will be more natural to parallelize in C++ for some time, and it's important to be aware of how easy it is to combine this into pandas
.
為此,我編寫(xiě)了一個(gè)簡(jiǎn)單的單源文件擴(kuò)展名,其代碼如下.
To do this, I wrote a simple single-source-file extension whose code follows.
從一些導(dǎo)入和類(lèi)型定義開(kāi)始
It starts with some imports and type definitions
from libc.stdint cimport int64_t, uint64_t
from libcpp.vector cimport vector
from libcpp.unordered_map cimport unordered_map
cimport cython
from cython.operator cimport dereference as deref, preincrement as inc
from cython.parallel import prange
import pandas as pd
ctypedef unordered_map[int64_t, uint64_t] counts_t
ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t
ctypedef vector[counts_t] counts_vec_t
C++的unordered_map
類(lèi)型是單線(xiàn)程求和,vector
是所有線(xiàn)程求和.
The C++ unordered_map
type is for summing by a single thread, and the vector
is for summing by all threads.
現(xiàn)在到函數(shù) sum
.它從 鍵入的內(nèi)存視圖 開(kāi)始,以便快速訪(fǎng)問(wèn):
Now to the function sum
. It starts off with typed memory views for fast access:
def sum(crit, vals):
cdef int64_t[:] crit_view = crit.values
cdef int64_t[:] vals_view = vals.values
該函數(shù)繼續(xù)將半等分到線(xiàn)程(這里硬編碼為 4),并讓每個(gè)線(xiàn)程將其范圍內(nèi)的條目相加:
The function continues by dividing the semi-equally to the threads (here hardcoded to 4), and having each thread sum the entries in its range:
cdef uint64_t num_threads = 4
cdef uint64_t l = len(crit)
cdef uint64_t s = l / num_threads + 1
cdef uint64_t i, j, e
cdef counts_vec_t counts
counts = counts_vec_t(num_threads)
counts.resize(num_threads)
with cython.boundscheck(False):
for i in prange(num_threads, nogil=True):
j = i * s
e = j + s
if e > l:
e = l
while j < e:
counts[i][crit_view[j]] += vals_view[j]
inc(j)
當(dāng)線(xiàn)程完成時(shí),該函數(shù)將所有結(jié)果(來(lái)自不同范圍)合并到單個(gè) unordered_map
:
When the threads have completed, the function merges all the results (from the different ranges) into a single unordered_map
:
cdef counts_t total
cdef counts_it_t it, e_it
for i in range(num_threads):
it = counts[i].begin()
e_it = counts[i].end()
while it != e_it:
total[deref(it).first] += deref(it).second
inc(it)
剩下的就是創(chuàng)建一個(gè)DataFrame
并返回結(jié)果:
All that's left is to create a DataFrame
and return the results:
key, sum_ = [], []
it = total.begin()
e_it = total.end()
while it != e_it:
key.append(deref(it).first)
sum_.append(deref(it).second)
inc(it)
df = pd.DataFrame({'key': key, 'sum': sum_})
df.set_index('key', inplace=True)
return df
這篇關(guān)于有效地將函數(shù)并行應(yīng)用于分組的 pandas DataFrame的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,也希望大家多多支持html5模板網(wǎng)!