使用Rust优化Python性能

电子说

1.3w人已加入

描述

在数据分析领域Python无疑是最流行的编程语言,但是Python有一个硬伤就是作为一个编译语言在性能上有些微的欠缺。而同样最流行的语言Rust则在性能方面表现优秀。本文我们一起学习一个优化项目的实践,对一个数据分析程序,改为Rust后将性能提高了18万倍经历。

概述

要分析的问题如下,以下数据是一个在线问答的数据,一个用户(user)对应一个问题(question)以及结果(score)。

 

[
{
"user": "5ea2c2e3-4dc8-4a5a-93ec-18d3d9197374",
"question": "7d42b17d-77ff-4e0a-9a4d-354ddd7bbc57",
"score": 1
},
{
"user": "b7746016-fdbf-4f8a-9f84-05fde7b9c07a",
"question": "7d42b17d-77ff-4e0a-9a4d-354ddd7bbc57",
"score": 0
},
/* ... 跟多数据 ... */
]

 

有的用户可能仅仅回答了问题的一部分,问题的结果是0或者1。

需要求解问题是:给定一个大小k, k个问题的结合,求解那一组用户与整体表现的相关性最高?

该问题叫做k-CorrSet问题。可以用简单的简单遍历来解决k-CorrSet问题,算法如下所示(伪代码):

 

func k_corrset($data, $k):
$all_qs = all questions in $data
for all $k-sized subsets $qs within $all_qs:
$us = all users that answered every question in $qs
$qs_totals = the total score on $qs of each user in $us
$grand_totals = the grand score on $all_qs of each user in $us
$r = correlation($qs_totals, $grand_totals)
return $qs with maximum $r

 

Python算法为基准

先用Python来解决这个问题,如果在性能不能满足需求的话,可以用Rust提高性能。一个简单的Pandas程序来解决k-CorrSet问题的算法:

 

from itertools import combinations
import pandas as pd
from pandas import IndexSlice as islice
def k_corrset(data, K):
all_qs = data.question.unique()
q_to_score = data.set_index(['question', 'user'])
all_grand_totals = data.groupby('user').score.sum().rename('grand_total')
corrs = []
for qs in combinations(all_qs, K):
qs_data = q_to_score.loc[islice[qs,:],:].swaplevel()
answered_all = qs_data.groupby(level=[0]).size() == K
answered_all = answered_all[answered_all].index
qs_totals = qs_data.loc[islice[answered_all,:]] 
.groupby(level=[0]).sum().rename(columns={'score': 'qs'})
r = qs_totals.join(all_grand_totals).corr().qs.grand_total
corrs.append({'qs': qs, 'r': r})
corrs = pd.DataFrame(corrs)
return corrs.sort_values('r', ascending=False).iloc[0].qs
data = pd.read_json('scores.json')
print(k_corrset(data, K=5))

 

数据分析

该算法使用了一些MultiIndex魔法,细节上不在深入解释。马上进行一次开始基准测试。

首先,我们需要数据。为了使基准测试切合实际,生成了合成数据:

60000个用户

200个问题

20%稀疏性(即每个问题有12,000个用户回答)

每个结果同样可能为1或0。

目标是计算该数据集上的k-CorrSet,其中k = 5使用2021 M1 Macbook Pro的时间还算合理。

使用Python的time.time()函数计时,使用 CPython 3.9.17,计算1000 次迭代的内循环速度。平均执行时间为36毫秒。还不错,但按照这个速度,完全完成计算将在2.9年内。

注意:对Python代码页有很多优化技巧,可以提高其性能,如果有需要后续可以学习。

Rust实现

可以通过将Python代码用Rust实现,期待一些免费的加速Rust的编译器优化。为了可读性,下面的所有代码都是实际基准的简化。

首先,转换一下数据类型:

 

pub struct User(pub String);
pub struct Question(pub String);
pub struct Row {
pub user: User,
pub question: Question,
pub score: u32,
}

 

在Rust中建立User和Question的新类型,既是为了清晰起见,也是为了在其上使用traits。然后,基本的k-CorrSet算法实现如下:

 

fn k_corrset(data: &[Row], k: usize) -> Vec<&Question> {
// utils::group_by(impl Iterator)
// -> HashMap>;
let q_to_score: HashMap<&Question, HashMap<&User, u32>> =
utils::group_by(data.iter().map(|r| (&r.question, &r.user, r.score)));
let u_to_score: HashMap<&User, HashMap<&Question, u32>> =
utils::group_by(data.iter().map(|r| (&r.user, &r.question, r.score)));
let all_grand_totals: HashMap<&User, u32> =
u_to_score.iter().map(|(user, scores)| {
let total = scores.values().sum::();
(*user, total)
})
.collect();
let all_qs = q_to_score.keys().copied();
all_qs.combinations(k)
.filter_map(|qs: Vec<&Question>| {
let (qs_totals, grand_totals): (Vec<_>, Vec<_>) = all_grand_totals.iter()
.filter_map(|(u, grand_total)| {
let q_total = qs.iter()
.map(|q| q_to_score[*q].get(u).copied())
.sum::>()?;
Some((q_total as f64, *grand_total as f64))
})
.unzip();
// utils::correlation(&[f64], &[f64]) -> f64;
let r = utils::correlation(&qs_totals, &grand_totals);
(!r.is_nan()).then_some((qs, r))
})
.max_by_key(|(_, r)| FloatOrd(*r))
.unwrap().0
}

 

数据分析

数据分析

算法关键点:

与Python一样,将平面数据转换为分层数据带有HashMap和utils::group_by帮手。

然后使用Itertools::combinations方法方法迭代所有问题组合。

在内循环中,通过all_grand_totals.iter()方式迭代所有用户。

表达方式q_to_score[*q].get(u).copied()有类型 Option,即 Some(n)如果用户的结果为q,否则为None。

如果用户回答了qs中的所有问题,迭代器方法 .sum::>()返回Some(total),否则返回None。

调用辅助方法utils::correlatio实现了Pearson的r标准算法。

用max_by_key获得最高的问题相关性。用FloatOrd可以比较浮动。

那么表现如何呢?使用Criterion(默认设置)对内循环的性能进行基准测试(filter_map),使用相同的数据集。新的内循环运行4.2中毫秒,比Python快约8倍基线!

但我们完整的计算仍然是124天,这有点太长了。

逐步优化

让我们用一些技巧对该程序进行优化一下。

索引数据

运行一个探查器,看看程序瓶颈在哪里。在Mac上,可使用Instruments.app和Samply,后者好像对Rust优化得更好。

数据分析

下面是用Samply对Rust算法程序跟踪相关部分的屏幕截图:

数据分析

可以看到,有75%的时间都花在HashMap::get上,这是需要优化的关键,其对应代码:

 

q_to_score[*q].get(u).copied()

 

问题是正在散列并比较36字节UUID字符串,这是一个昂贵耗时的操作。对此,需要一种更小的类型来代替问题/用户字符串。

解决方案:将所有的问题和用户收集一个Vec,并通过索引来表示每个问题/用户。可以使用usize指数与Vec类型,但更好的做法是使用newtypes代表各类指标。事实上,这个问题经常出现。这样定义这些索引类型:

 

pub struct QuestionRef<'a>(pub &'a Question);
pub struct UserRef<'a>(pub &'a User);
define_index_type! {
pub struct QuestionIdx for QuestionRef<'a> = u16;
}
define_index_type! {
pub struct UserIdx for UserRef<'a> = u32;
}

 

数据分析

QuestionRef和UserRef类型有新类型能够实现traits &Question和&User。define_index_type宏创建新的索引类型QuestionIdx和UserIdx,以及对应的QuestionRef和 UserRef。分别对应为u16和一个u32类型。

最后更新了k_corrset对于问题和用户生成一个IndexedDomain,然后使用 QuestionIdx和 UserIdx其余代码中的类型:

 

fn k_corrset(data: &[Row], k: usize) -> Vec<&Question> {
let (questions_set, users_set): (HashSet<_>, HashSet<_>) = data.iter()
.map(|row| (QuestionRef(&row.question), UserRef(&row.user)))
.unzip();
let questions = IndexedDomain::from_iter(questions_set);
let users = IndexedDomain::from_iter(users_set);
let q_to_score: HashMap> =
utils::group_by(data.iter().map(|r| (
questions.index(&(QuestionRef(&r.question))),
users.index(&(UserRef(&r.user))),
r.score,
)));
let u_to_score: HashMap> =
utils::group_by(data.iter().map(|r| (
users.index(&(UserRef(&r.user))),
questions.index(&(QuestionRef(&r.question))),
r.score,
)));
let all_grand_totals = // same code
let all_qs = questions.indices();
all_qs.combinations(k)
.filter_map(|qs: Vec| {
})
.max_by_key(|(_, r)| FloatOrd(*r))
.unwrap().0
.into_iter().map(|idx| questions.value(idx).0).collect()
}

 

数据分析

数据分析

我们再次计算的运行基准测试。新的内循环运行时间为1.0毫秒 ,比上次算法快4,比原始Python版本快35 倍。

总计算时间减少到30天,还需要继续优化。

索引集合

继续追踪执行:

数据分析

仍然,大部分时间还是消耗在HashMap::get。为了解决这个问题,考虑完全更换掉HashMap。

HashMap<&User, u32>在概念上和Vec>是相同的,都对&User有唯一索引。例如,在一个Vec中用户["a", "b", "c"],然后是HashMap {"b" => 1}相当于vector [None, Some(1), None]。vector消耗更多内存,但它改善了键/值查找的性能。

考虑到数据集规模进行计算/内存权衡。可以使用Indexical,它提供了 DenseIndexMap 内部实现为的类型Vec类型,索引为K::Index。

替换后主要变化是k_corrset函数,所有辅助数据结构转换为DenseIndexMap:

 

pub type QuestionMap<'a, T> = DenseIndexMap<'a, QuestionRef<'a>, T>;
pub type UserMap<'a, T> = DenseIndexMap<'a, UserRef<'a>, T>;
fn k_corrset(data: &[Row], k: usize) -> Vec<&Question> {
let mut q_to_score: QuestionMap<'_, UserMap<'_, Option>> =
QuestionMap::new(&questions, |_| UserMap::new(&users, |_| None));
for r in data {
q_to_score
.get_mut(&QuestionRef(&r.question))
.unwrap()
.insert(UserRef(&r.user), Some(r.score));
}
let grand_totals = UserMap::new(&users, |u| {
q_to_score.values().filter_map(|v| v[u]).sum::()
});
let all_qs = questions.indices();
all_qs.combinations(k)
}

 

数据分析

内部循环的唯一变化是:

 

q_to_score[*q].get(u).copied()

 

变成了:

 

q_to_score[*q][u]

 

再次运行基准测试,新的内循环运行在181微秒 ,比上次迭代快6倍,比原始的Python快了199 倍。

总计算将缩短至5.3天。

边界检查

每次使用括号时都会出现另一个小的性能影响[]索引到DenseIndexMap。向量 为了安全起见,都要运行边界检查,实际上,该代码可以保证的不会超出所写的向量边界。实际上找不到边界检查样本配置文件,但它确实造成了明显的影响了性能,需要对其进行优化。

内循环之前是这样的:

 

let q_total = qs.iter()
.map(|q| q_to_score[*q][u])
.sum::>()?;
let grand_total = all_grand_totals[u];

 

删除边界检查get_unchecked后,新内循环:

 

let q_total = qs.iter()
.map(|q| unsafe {
let u_scores = q_to_score.get_unchecked(q);
*u_scores.get_unchecked(u)
})
.sum::>()?;
let grand_total = unsafe { *all_grand_totals.get_unchecked(u) };

 

没有边界检查是不安全的,所以必须用unsafe块对其进行标记。

再次运行基准测试,新的内循环运行在156微秒,比上一个迭代快1.16倍,比原始的Python快了229倍。

总计算将缩短至4.6天。

bit-set

考虑一下内循环的计算结构。现在,循环实际上看起来像:

 

for each subset of questions $qs:
for each user $u:
for each question $q in $qs:
if $u answered $q: add $u's score on $q to a running total
else: skip to the next user
$r = correlation($u's totals on $qs, $u's grand total)

 

数据的一个重要方面是它实际上形成了一个稀疏矩阵。对于给定的问题,只有20%的用户回答了这个问题问题。对于一组5个问题,只有一小部分回答了全部5个问题。因此,如果能够有效地首先确定哪个用户回答了所有5个问题,然后后续循环将运行减少迭代次数(并且没有分支):

 

for each subset of questions $qs:
$qs_u = all users who have answered every question in $qs
for each user $u in $qs_u:
for each question $q in $qs:
add $u's score on $q to a running total
$r = correlation($u's scores on $qs, $u's grand total)

 

那么我们如何表示已回答给定问题的用户集问题?

可以使用一个HashSet, 但考虑到到散列的计算成本很高。因此对于已索引的数据,可以使用更有效的数据结构:bit-set,它使用各个位表示对象是否存在的内存的或集合中不存在。Indexical提供了另一种抽象将位集与新型索引集成: IndexSet。

此前, q_to_score映射的数据结构对用户索引的可选分数向量提出问题(即 UserMap<'_, Option>)。现在要改变Option到u32并添加一个位集描述回答给定问题的一组用户。首先更新后的代码的一半如下所示:

 

type UserSet<'a> = IndexSet<'a, UserRef<'a>>;
let mut q_to_score: QuestionMap<'_, (UserSet<'_>, UserMap<'_, u32>)> =
QuestionMap::new(&questions, |_| (
UserMap::<'_, u32>::new(&users, |_| 0),
UserSet::new(&users),
));
for r in data {
let (scores, set) = &mut q_to_score.get_mut(&QuestionRef(&r.question)).unwrap();
scores.insert(UserRef(&r.user), r.score);
set.insert(UserRef(&r.user));
}

 

注意q_to_score现在实际上具有无效值,因为为没有回答的用户提供默认值0 问题。

然后更新内部循环以匹配新的伪代码:

 

let all_qs = questions.indices();
all_qs.combinations(k)
.filter_map(|qs: Vec| {
// Compute the intersection of the user-sets for each question
let mut users = q_to_score[qs[0]].1.clone();
for q in &qs[1..] {
users.intersect(&q_to_score[*q].1);
}
let (qs_totals, grand_totals): (Vec<_>, Vec<_>) = users.indices()
// only .map, not .filter_map as before
.map(|u| {
let q_total = qs.iter()
.map(|q| unsafe {
let (u_scores, _) = q_to_score.get_unchecked(q);
*u_scores.get_unchecked(u)
})
// only u32, not Option as before
.sum::();
let grand_total = unsafe { *all_grand_totals.get_unchecked(u) };
(q_total as f64, grand_total as f64)
})
.unzip();
let r = utils::correlation(&qs_totals, &grand_totals);
(!r.is_nan()).then_some((qs, r))
})

 

数据分析

再次运行基准测试,新的内循环运行在47微秒 ,比上次迭代快了3.4倍,比原始Python 程序,快了769倍。

总计算时间为1.4天。

单指令多数据流

新计算结构肯定有帮助,但它仍然不够快。再次检查一下示例:

数据分析

现在我们把所有的时间都花在了bit-set intersection上。因为默认Indexical使用的位集库是bitvec 。其bit-set intersection的原码是:

 

fn intersect(dst: &mut BitSet, src: &BitSet) {
for (n1, n2): (&mut u64, &u64) in dst.iter_mut().zip(&src) {
*n1 &= *n2;
}
}

 

bitvec是AND运算u64一次。现代大多数处理器都有专门用于一次执行多个u64位操作指令,称为SIMD (ingle instruction, multiple data,多数据,单指令)。

值得庆幸的是,Rust 提供了实验性 SIMD API std::simd可以供我们使用。粗略地说,SIMD版本的bit-set intersection看起来像这样:

 

fn intersect(dst: &mut SimdBitSet, src: &SimdBitSet) {
for (n1, n2): (&mut u64x4, &u64x4) in dst.iter_mut().zip(&src) {
*n1 &= *n2;
}
}

 

唯一的区别是已经替换了原始的u64类型为SIMD类型u64x4, 在底层,Rust发出一条SIMD指令来一次执行四条u64 &=运算。

在crates.io ,搜到一个名为Bitsvec的。可以适用SIMD的快速交集,但我发现它的迭代器可以找到索引1位的速度实际上相当慢。进行少量修改实现并编写了一个更高效的迭代器。

得益于Indexical的抽象,仅交换SIMD位集需要更改类型别名并且不需要修改k_corrset函数。优化为SIMD位集可以u64x16在最大程度提高性能。

再次运行基准测试,新的内部循环运行在1.35微秒 ,比上次迭代算法快34倍,比原始Python算法26,459 倍。

总计算时间缩短至57分钟。

内存分配

此时,非常接近峰值性能了。继续回到profile倒置视图(显示了叶子节点上最常调用的函数调用树):

数据分析

最大的瓶颈是的位集迭代器。有几个相关的函数:memmove, realloc,allocate,是在函数的内循环中分配内存的。

为了避免过多分配,可以预先创建这些数据结构所需的最大可能大小,然后重复写入他们:

 

let mut qs_totals = vec![0.; users.len()]
let mut grand_totals = vec![0.; users.len()];
let mut user_set = IndexSet::new(&users);
let all_qs = questions.indices();
all_qs.combinations(k)
.filter_map(|qs| {
// Use `clone_from` rather than `clone` to copy without allocation
user_set.clone_from(&q_to_score[qs[0]].1);
for q in &qs[1..] {
user_set.intersect(&q_to_score[*q].1);
}
let mut n = 0;
for (i, u) in user_set.indices().enumerate() {
let q_total = qs.iter()
.map(|q| unsafe {
let (u_scores, _) = q_to_score.get_unchecked(q);
*u_scores..get_unchecked(u)
})
.sum::();
let grand_total = unsafe { *all_grand_totals.get_unchecked(u) };
unsafe {
*qs_totals.get_unchecked_mut(i) = q_total as f64;
*grand_totals.get_unchecked_mut(i) = grand_total as f64;
}
n += 1;
}
let r = utils::correlation(&qs_totals[..n], &grand_totals[..n]);
(!r.is_nan()).then_some((qs, r))
})

 

数据分析

数据分析

再次运行基准测试,新的内循环运行1.09微秒 ,比上次迭代快1.24倍,比原始的Python基线32,940倍。

总计算时间缩短至46分钟。

数据分析

并行性

至此,似乎已经用尽了所有的优化途径。实际上想不出任何其他方法来制作内循环速度大大加快。但是实际上,还可考虑一个通用技巧并行执行!

可以简单地并行化内部循环多个核心运行:

 

let all_qs = questions.indices();
all_qs.combinations(k)
.par_bridge()
.map_init(
|| (vec![0.; users.len()], vec![0.; users.len()], IndexSet::new(&users)),
|(qs_totals, grand_totals, user_set), qs| {
// same code as before
})
// same code as before

 

数据分析

par_bridge方法采用串行迭代器并且将其转换为并行迭代器。

map_init功能是一个具有线程特定状态的并行映射,所保留免分配状态。

需要一个不同的基准来评估外循环。用5000000个问题组合上运行外循环的标准 使用给定策略的单次运行。

使用串行策略运行此基准测试超过最快内循环需要6.8秒。对比并行策略进行基准测试后,大概需要4.2 秒完成5000000种组合。

只是1.6倍加速

追踪下性能执行:

数据分析

线程大部分时间都花在锁定和解锁互斥,可能存在某种同步瓶颈。

之间的交接Itertools::combinations迭代器和Rayon并行桥太慢了。鉴于有大量的组合,避免这个瓶颈的简单方法是增加粒度任务分配。也就是说,可以将许多问题批处理在一起组合并将它们一次性传递给一个线程。

对于这个任务,定义了一个快速而粗劣的批处理迭代器使用一个ArrayVec以避免分配。

 

pub struct Batched {
iter: I,
}
impl Iterator for Batched {
type Item = ArrayVec;
#[inline]
fn next(&mut self) -> Option {
let batch = ArrayVec::from_iter((&mut self.iter).take(N));
(!batch.is_empty()).then_some(batch)
}
}

 

然后通过批处理组合迭代器来修改外循环, 并修改内部循环以展平每个批次:

 

let all_qs = questions.indices();
all_qs.combinations(k)
.batched::<1024>()
.par_bridge()
.map_init(
|| (vec![0.; users.len()], vec![0.; users.len()], IndexSet::new(&users)),
|(qs_totals, grand_totals, user_set), qs_batch| {
qs_batch
.into_iter()
.filter_map(|qs| {
// same code as before
})
.collect_vec()
})
.flatten()

 

再次运行外循环基准测试,现在是分块迭代器内完成5000000种组合在982毫秒。与串行方法相比,速度提高了6.9倍。

总结

结论

最初的Python程序需要k=5时需要2.9年完成。使用各种方法优化过的Rust程序只需要8 分钟就可以实现对几十亿数据的处理。总体上,优化了180,000 倍加速。

在这个案例中,使用的优化关键点为:

使用Rust的编译器优化。

使用散列数字而非字符串。

使用(索引)向量而非HashMap。

使用bit-set进行有效的成员资格测试。

使用SIMD实现高效的位集。

使用多线程将工作分配给多个核心计算

使用批处理来避免工作分配中的瓶颈。

  审核编辑:汤梓红

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分