快速排序是一种应用了“分治法”的算法,所谓分治法(divid & conquer),就是把大的问题分割为结构相同的小问题,然后进行解决的方法。因为都是相同的小问题,所以我们可以利用并行计算来并行处理这些问题,使用Erlang可以非常方便非常清晰地利用这种方法解决问题。
由于Erlang是一种声明式、函数式编程语言,所以要表述基本的快速排序算法非常方便:
qsort(_, []) -> [];
qsort(F, [H|T]) ->
qsort(F, [X || X <- T, F(X, H)])
++ [H] ++
qsort(F, [X || X <- T, not F(X, H)]).
qsort(L) -> qsort(fun(X, Y) -> X < Y end, L).
其中F是一个用于比较的函数。这种描述方式非常精炼,不过效率不高,原因在于1. 不是尾递归, 2. 该代码中应用了两个列表领悟( list comprehension ),其结果是对同一个列表进行了两次遍历。不过这些问题我们稍后考虑。
在erlang中,创建和维护进程是非常方便的,可以很容易地将以上的顺序算法变成创建单独的进程解决对每个子问题:
psort(Pid, _Fun, []) -> Pid ! {self(), []};
psort(Pid, Fun, [H|T]) ->
Lid = spawn_link(?MODULE, psort, [self(), Fun, [X||X <- T, Fun(X, H)]]),
Rid = spawn_link(?MODULE, psort, [self(), Fun, [X||X <- T, not Fun(X, H)]]),
receive
{Lid, L} -> Left = L
end,
receive
{Rid, R} -> Right = R
end,
Pid ! {self(), Left ++ [H] ++ Right}.
%% @spec psort(fun(X, Y) -> bool(), list(any())) -> list(any()).
psort(Fun, L) ->
Pid = spawn_link(?MODULE, psort, [self(), Fun, L]),
receive
{Pid, R} -> R
end.
psort(L) -> psort(fun(X, Y) -> X < Y end, L).
解决父任务的进程通过接受子进程发来的消息来获得结果。然而,这个代码除了前面的问题之外,也有两个问题:1. 粒度太小,如果小到连一个空列表都要生成一个新的进程来解决问题,那么创建进程的开销就会大过计算本身。2. Programming Erlang中讲到,并发编程要注意“小消息,大计算”,这里在消息中频繁传递列表是开销较大的操作。
我通过生成1000个0-1000之间的随机数的列表来测试这两个算法的性能,在我的迅驰1.8G的本上,结果表示:
- lists:sort 1.45ms
- qsort 9.44ms
- psort 30.75ms
看来我的这两个实现还是比较低效的。首先关于qsort的两个问题,1. 遍历了两次列表,可以通过单独使用一个分区函数来遍历一次完成对列表的划分;2. 不是尾递归,可以使用一个累加器来收集结果,此法可以将其中一个递归变成尾递归(我还无法做到两个都消除)。
qsort3(_F, []) -> [];
qsort3(F, L) -> qsort3_acc(F, L, []).
qsort3(L) -> qsort3(fun(X, Y) -> if X < Y -> -1; X =:= Y -> 0; true -> 1 end end, L).
qsort3_acc(_F, [], Acc) -> Acc;
qsort3_acc(F, [H | T], Acc) ->
part_acc(F, H, T, {[], [H], []}, Acc).
part_acc(F,_, [], {L,E,G}, Acc) ->
qsort3_acc(F, L, (E ++ qsort3_acc(F, G, Acc)));
part_acc(F, X, [H | T], {L, E, G}, Acc) ->
C = F(H, X),
if
C < 0 ->
part_acc(F, X, T, {[H | L], E, G}, Acc);
C =:= 0 ->
part_acc(F, X, T, {L, [H | E], G}, Acc);
true ->
part_acc(F, X, T, {L, E, [H | G]}, Acc)
end.
此为改进版的快速排序。在此基础上,再解决并行快速排序的两个问题:1. 粒度太小:可以设置一个阈值,当列表长度小于多少时,调用快速排序解决问题;2. 消息太大——我还没想到解决方案。
psort3(Pid, _F, _Granularity, []) ->
Pid ! {self(), []};
psort3(Pid, F, Granularity, [H|T]=Ls) ->
if length(Ls) < Granularity ->
Pid ! {self(), qsort3(F, Ls)};
true ->
{Lesser, Eq, Greater} = part(F, H, T, {[], [H], []}),
Lid = spawn_link(?MODULE, psort3, [self(), F, Granularity, Lesser]),
Rid = spawn_link(?MODULE, psort3, [self(), F, Granularity, Greater]),
receive
{Lid, L} -> Left = L
end,
receive
{Rid, R} -> Right = R
end,
Pid ! { self(), Left ++ Eq ++ Right }
end.
psort3(F, Granularity, L) ->
Pid = spawn_link(sort, psort3, [self(), F, Granularity, L]),
receive
{Pid, R} -> R
end.
psort3(L) ->
psort3(fun(X, Y) -> if X < Y -> -1; X =:= Y -> 0; true -> 1 end end, 100, L).
测试结果为:
- qsort3: 5.31ms
- psort3: 7.76ms
另外,我发现,如果从命令行中传递比较函数,则性能下降非常大,不知何故。


1 Comments
好主意!赞!
eshell 命令行的解释器是不优化的,当然慢。就好比 Haskell 的 hugs98。