素数列生成
素数列の実装方法を考えてみたのでメモ。
基本的にはエラトステネスの篩と同じようなことを行っているはずだが、一度に保持する篩の範囲が(おそらく)必要最小限となっている。
(一つの既知の素数につき、ハッシュテーブル内の一つの枠しか消費しない)
// file: prime.rs // // $ rustc --version // rustc 1.0.0 (a59de37e9 2015-05-13) (built 2015-05-14) use std::collections::HashMap; pub struct Prime { curr: u64, // 現在の数値 (素数とは限らない) sieve: HashMap<u64, u64>, // 合成数 => 素数: サイズは`curr未満の素数の数`に常に等しい } impl Prime { pub fn is_prime(&mut self, n: u64) -> bool { match self.sieve.remove(&n) { // 一度探索した数はsieveから除去する(容量節約) None => {self.mark_composite(2, n); true}, // 素数 Some(prime) => {self.mark_composite(n/prime + 1, prime); false}, // 合成数 } } fn mark_composite(&mut self, times: u64, prime: u64) { (times..).find(|i| !self.sieve.contains_key(&(i*prime)) ) // 未登録の合成数を探す .and_then(|i| self.sieve.insert(i*prime, prime) ); } } impl Iterator for Prime { type Item = u64; fn next(&mut self) -> Option<u64> { let prime = (self.curr..).find(|n| self.is_prime(*n) ).unwrap(); // curr以降の最小の素数を検索 self.curr = prime + 1; Some(prime) } } pub fn primes() -> Prime { Prime{curr: 2, sieve: HashMap::new()} } fn main() { use std::env; let n: usize = env::args().nth(1).unwrap().parse().unwrap(); println!("{}-th prime: {}", n, primes().nth(n-1).unwrap()); }
実行例:
# CPU: Intel(R) Core(TM) i7-5600U CPU @ 2.60GHz # コンパイル $ rustc -O hoge.rs # 100万番の素数の検索 $ time ./hoge 1000000 1000000-th prime: 15485863 real 0m10.086s # 約10秒掛かった user 0m10.060s sys 0m0.028s
優先度付きキュー系モジュールのベンチマーク
『Purely Functional Data Structures』(略:PFDS)で紹介されている優先度付きキュー(+ α)をいくつか実装してみたので、その性能測定メモ。
なおPFDSは(データ構造がヒープ特性を満たしているかどうかに関係なく)優先度付きキュー群をまとめてヒープと呼称しているようなので、以降ではそれに倣うことにする。
測定対象
測定対象モジュールは以下の通り:
- leftist_heaps:
- PFDSの3.1の実装がベース
- ヒープ特性とLeftist特性を満たす二分木
- ヒープ特性: 親ノードの値は、常に子ノードの値以下 (値が小さいほど高優先)
- Leftist特性: 左の子のランクは、常に右の子のランク以上
- ランクは「右の子を辿り続けて空ノードに至るまでのパスの長さ」
- 要素の挿入とヒープ同士のマージは
O(log n)
、最小値取り出しはO(1)
、の最悪コスト
- binomial_heaps:
- splay_heaps:
- PFDSの5.4の実装がベース
- 二分探索木
- 各操作実行時に辿った経路のみをバランス(スプレー)し、よくアクセスされる要素ほど木のルート近くに配置されるようにする
- 挿入/取り出しは
O(log n)
の償却コスト - マージは
O(n)
の最悪コスト
- pairing_heaps:
- gb_set_heaps:
- list_heaps:
- ソート済みリストによるヒープ実装
- 挿入とマージは
O(n)
の最悪コスト - 取り出しは
O(1)
の最悪コスト (単にリストの先頭要素を取り出すだけ)
環境
- CPU: Intel(R) Core(TM) i7-5600U CPU @ 2.60GHz
- OS: Ubuntu-15.04
- Erlang/OTP: 17.5; パッケージ(.deb)
測定内容
測定用コード: heaps_bench.erl
以下の操作に掛かる時間を計測する:
- 要素の挿入:
- ひたすら要素を挿入して、一回辺りの挿入に掛かった平均時間を計測する
- 入力数は
4^n (n = 2..7)
- 入力列のパターンは
昇順、降順、ランダム
の3つ
- 最小要素の取り出し:
- 全入力要素の挿入後に、ひたすら最小要素の取り出しを行い、一回辺りの取り出しに掛かった平均時間を計測する
- 入力数は
4^n (n = 2..7)
- 入力列のパターンは
昇順、降順、ランダム
の3つ
- 挿入と取り出しの混合:
- 二つのヒープのマージ:
- 要素数が
4^n (n = 2..5)
の二つのヒープのマージを千回繰り返し、一回の平均所要時間を計測する - 各ヒープの要素にはランダムな値が使用される
- 要素数が
測定結果
要素の挿入
計測内容(再掲):
- ひたすら要素を挿入して、一回辺りの挿入に掛かった平均時間を計測する
- 入力数は
4^n (n = 2..7)
- 入力列のパターンは
昇順、降順、ランダム
の3つ
入力が昇順の場合
%% [計測関数] %% N = 2..7 > heaps_bench:bench_in(lists:seq(1, trunc(math:pow(4, N)))).
結果:
入力数 | leftist | binomial | splay | pairing | gb_set | list |
---|---|---|---|---|---|---|
16 | 0.395 μs | 0.229 μs | 0.125 μs | 0.083 μs | 0.854 μs | 0.270 μs |
64 | 0.422 μs | 0.120 μs | 0.089 μs | 0.073 μs | 1.115 μs | 0.656 μs |
256 | 0.533 μs | 0.121 μs | 0.063 μs | 0.056 μs | 1.467 μs | 2.460 μs |
1024 | 0.672 μs | 0.115 μs | 0.061 μs | 0.054 μs | 1.995 μs | 10.372 μs |
4096 | 0.838 μs | 0.147 μs | 0.118 μs | 0.107 μs | 2.823 μs | 41.247 μs |
16384 | 1.050 μs | 0.130 μs | 0.078 μs | 0.067 μs | 2.846 μs | 166.472 μs |
- pairing_heapsが一番高速
- paring_heaps、splay_heaps、binomial_heapsは入力数によらず
O(1)
で挿入可能 - leftist_heapsとgb_set_heapsは
O(log n)
- list_heapsは
O(n)
(常にリストの末尾への追加となる)
入力が降順の場合
%% [計測関数] %% N = 2..7 > heaps_bench:bench_in(lists:seq(trunc(math:pow(4, N)), 1, -1)).
結果:
入力数 | leftist | binomial | splay | pairing | gb_set | list |
---|---|---|---|---|---|---|
16 | 0.146 μs | 0.146 μs | 0.063 μs | 0.063 μs | 0.688 μs | 0.063 μs |
64 | 0.161 μs | 0.135 μs | 0.089 μs | 0.063 μs | 0.932 μs | 0.031 μs |
256 | 0.111 μs | 0.126 μs | 0.068 μs | 0.057 μs | 1.297 μs | 0.026 μs |
1024 | 0.131 μs | 0.119 μs | 0.068 μs | 0.056 μs | 1.737 μs | 0.029 μs |
4096 | 0.127 μs | 0.119 μs | 0.083 μs | 0.063 μs | 2.493 μs | 0.049 μs |
16384 | 0.147 μs | 0.166 μs | 0.079 μs | 0.072 μs | 2.574 μs | 0.032 μs |
- list_heapsが一番高速 (常にリストの先頭への追加となる)
- ただしpairing_heapsとsplay_heapsの所要時間も(単なる先頭要素の追加に対して)二倍程度に収まっているので悪くはない
- list_heaps、paring_heaps、splay_heaps、binomial_heaps、leftist_heapsは入力数によらず
O(1)
で挿入可能 - gb_set_heapsだけが
O(log n)
入力がランダムの場合
%% [計測関数] %% N = 2..7 > heaps_bench:bench_in([random:uniform() || _ <- lists:seq(1, trunc(math:pow(4, N)))]).
結果:
入力数 | leftist | binomial | splay | pairing | gb_set | list |
---|---|---|---|---|---|---|
16 | 0.313 μs | 0.125 μs | 0.208 μs | 0.063 μs | 0.604 μs | 0.146 μs |
64 | 0.344 μs | 0.135 μs | 0.344 μs | 0.073 μs | 0.750 μs | 0.401 μs |
256 | 0.367 μs | 0.132 μs | 0.480 μs | 0.053 μs | 0.930 μs | 1.323 μs |
1024 | 0.493 μs | 0.131 μs | 0.727 μs | 0.058 μs | 1.171 μs | 5.988 μs |
4096 | 0.528 μs | 0.150 μs | 1.014 μs | 0.060 μs | 1.811 μs | 24.181 μs |
16384 | 0.724 μs | 0.163 μs | 1.209 μs | 0.078 μs | 2.027 μs | 93.670 μs |
- pairing_heapsが一番高速
- paring_heaps、binomial_heapsは入力数によらず
O(1)
で挿入可能 - leftist_heaps、splay_heaps、gb_set_heapsは
O(log n)
- list_heapsは
O(n)
実測結果のまとめ (挿入)
注意: 以降に出てくる実測値のオーダーは観測値を眺めての印象に基づくものであり、厳密なものではない
オーダー:
入力パターン | leftist | binomial | splay | pairing | gb_set | list |
---|---|---|---|---|---|---|
昇順 | 1 | 1 | 1 | 1 | log n | n |
降順 | log n | 1 | 1 | 1 | log n | 1 |
ランダム | log n | 1 | log n | 1 | log n | n |
入力数=1024(4^5)
の際の平均所要時間(μs):
入力パターン | leftist | binomial | splay | pairing | gb_set | list |
---|---|---|---|---|---|---|
昇順 | 0.672 | 0.115 | 二番:0.061 | 最速:0.054 | 1.995 | 10.372 |
降順 | 0.131 | 0.119 | 0.068 | 二番:0.056 | 1.737 | 最速:0.029 |
ランダム | 0.493 | 二番:0.131 | 0.727 | 最速:0.058 | 1.171 | 5.988 |
pairing_heapsは優秀。
最小要素の取り出し
計測内容(再掲):
- 全入力要素の挿入後に、ひたすら最小要素の取り出しを行い、一回辺りの取り出しに掛かった平均時間を計測する
- 入力数は
4^n (n = 2..7)
- 入力列のパターンは
昇順、降順、ランダム
の3つ
入力が昇順の場合
%% [計測関数] %% N = 2..7 > heaps_bench:bench_out(lists:seq(1, trunc(math:pow(4, N)))).
結果:
入力数 | leftist | binomial | splay | pairing | gb_set | list |
---|---|---|---|---|---|---|
16 | 0.375 μs | 0.167 μs | 0.125 μs | 0.208 μs | 0.250 μs | 0.042 μs |
64 | 0.453 μs | 0.214 μs | 0.156 μs | 0.167 μs | 0.203 μs | 0.063 μs |
256 | 0.599 μs | 0.230 μs | 0.143 μs | 0.164 μs | 0.241 μs | 0.047 μs |
1024 | 0.842 μs | 0.285 μs | 0.156 μs | 0.179 μs | 0.230 μs | 0.047 μs |
4096 | 1.243 μs | 0.286 μs | 0.170 μs | 0.316 μs | 0.245 μs | 0.043 μs |
16384 | 1.428 μs | 0.322 μs | 0.156 μs | 0.183 μs | 0.268 μs | 0.049 μs |
- list_heapsは先頭要素を取り出すだけなので速い
- binomial_heapsとleftist_heapsは
O(log n)
。後者の方が劣化が激しい。 - gb_set_treesが
O(1)
に見えるのが意外 (微増はしているけど)
入力が降順の場合
%% [計測関数] %% N = 2..7 > heaps_bench:bench_out(lists:seq(trunc(math:pow(4, N)), 1, -1)).
結果:
入力数 | leftist | binomial | splay | pairing | gb_set | list |
---|---|---|---|---|---|---|
16 | 0.083 μs | 0.188 μs | 0.083 μs | 0.063 μs | 0.292 μs | 0.042 μs |
64 | 0.078 μs | 0.229 μs | 0.083 μs | 0.052 μs | 0.234 μs | 0.063 μs |
256 | 0.063 μs | 0.230 μs | 0.066 μs | 0.057 μs | 0.270 μs | 0.046 μs |
1024 | 0.069 μs | 0.276 μs | 0.073 μs | 0.060 μs | 0.249 μs | 0.044 μs |
4096 | 0.060 μs | 0.297 μs | 0.065 μs | 0.061 μs | 0.275 μs | 0.045 μs |
16384 | 0.068 μs | 0.404 μs | 0.069 μs | 0.059 μs | 0.287 μs | 0.059 μs |
- 一番速いのはやっぱりlist_heaps
- ただし降順の場合は、lefist_heaps/splay_heaps/pairing_heaps、の所要時間がだいぶlist_heapsに近づいている
- binomial_heapsとgb_set_heapsの傾向は、昇順の場合とほぼ同様
入力がランダムの場合
%% [計測関数] %% N = 2..7 > heaps_bench:bench_out([random:uniform() || _ <- lists:seq(1, trunc(math:pow(4, N)))]).
結果:
入力数 | leftist | binomial | splay | pairing | gb_set | list |
---|---|---|---|---|---|---|
16 | 0.271 μs | 0.417 μs | 0.208 μs | 0.188 μs | 0.271 μs | 0.042 μs |
64 | 0.427 μs | 0.526 μs | 0.130 μs | 0.313 μs | 0.224 μs | 0.068 μs |
256 | 0.590 μs | 0.779 μs | 0.104 μs | 0.406 μs | 0.236 μs | 0.042 μs |
1024 | 0.807 μs | 1.027 μs | 0.116 μs | 0.578 μs | 0.310 μs | 0.049 μs |
4096 | 1.238 μs | 1.643 μs | 0.110 μs | 0.684 μs | 0.304 μs | 0.056 μs |
16384 | 1.543 μs | 1.637 μs | 0.107 μs | 0.864 μs | 0.326 μs | 0.051 μs |
- lists_heapsが最速なのは変わらず
- lefist_heaps, pairing_heaps, gb_set_heapsが
O(log n)
になり、他の入力順よりも性能が落ちている感がある
実測結果のまとめ (取り出し)
オーダー:
入力パターン | leftist | binomial | splay | pairing | gb_set | list |
---|---|---|---|---|---|---|
昇順 | log n | log n | 1 | 1 | 1 | 1 |
降順 | 1 | log n | 1 | 1 | 1 | 1 |
ランダム | log n | log n | 1 | log n | log n | 1 |
入力数=1024(4^5)
の際の平均所要時間(μs):
入力パターン | leftist | binomial | splay | pairing | gb_set | list |
---|---|---|---|---|---|---|
昇順 | 0.842 | 0.285 | 二番:0.156 | 0.179 | 0.230 | 最速:0.047 |
降順 | 0.069 | 0.276 | 0.073 | 二番:0.060 | 0.249 | 最速:0.044 |
ランダム | 0.807 | 1.027 | 二番:0.116 | 0.578 | 0.310 | 最速:0.049 |
- 当然list_heapsが一番速い (全てのケースでリストの先頭要素を取り出すだけ)
- 二番目はsplay_heaps、三番目はpairing_heapsかgb_set_heapsのどちらか、といったところ
挿入と取り出しの混合
計測内容(再掲):
- 常時一定数の要素を保持しているヒープに対して、挿入と取り出しを繰り返す
- 今回は、具体的には以下のような条件で計測を行う
- ヒープの要素数は常に
4^n (n = 2..7)
- このヒープに対して「要素取り出し + 優先度が最低の要素の挿入」を一万回繰り返し、一回の処理の平均所要時間を計測する
- ヒープの要素数は常に
%% [計測関数] %% X = 2..7 > N = trunc(math:pow(4, X)), > Input = [{in, I} || I <- lists:seq(1, N)] ++ [case I rem 2 of 0 -> out; 1 -> {in, I} end || I <- lists:seq(N+1, 10000+N+1)], > heaps_bench:bench_in_out(Input).
結果:
入力数 | leftist | binomial | splay | pairing | gb_set | list |
---|---|---|---|---|---|---|
16 | 0.316 μs | 0.258 μs | 0.145 μs | 0.130 μs | 0.514 μs | 0.209 μs |
64 | 0.492 μs | 0.292 μs | 0.153 μs | 0.142 μs | 0.746 μs | 0.646 μs |
256 | 0.706 μs | 0.314 μs | 0.163 μs | 0.150 μs | 1.037 μs | 2.436 μs |
1024 | 0.861 μs | 0.355 μs | 0.165 μs | 0.151 μs | 1.339 μs | 9.645 μs |
4096 | 1.093 μs | 0.369 μs | 0.165 μs | 0.167 μs | 1.816 μs | 39.733 μs |
16384 | 1.093 μs | 0.275 μs | 0.155 μs | 0.151 μs | 2.460 μs | 170.720 μs |
O(log n) | O(1) | O(1) | O(1) | O(log n) | O(n) |
- paring_heaps、splay_heaps、binomial_heapsの3つが、ヒープ内の要素数に影響を受けずに
O(1)
で各操作が行えていた - 特にpairing_heapsとsplay_heapsの性能は良好
二つのヒープのマージ
計測内容(再掲):
- 要素数が
4^n (n = 2..5)
の二つのヒープのマージを千回繰り返し、一回の平均所要時間を計測する - 各ヒープの要素にはランダムな値が使用される
%% [計測関数] %% X = 2..5 > HeapSize = lists:seq(1, trunc(math:pow(4, X)), > heaps_bench:bench_merge( [begin {[random:uniform() || _ <- HeapSize], [random:uniform() || _ <- HeapSize]} end || _ <- lists:seq(1, 1000)]).
結果:
入力数 | leftist | binomial | splay | pairing | gb_set | list |
---|---|---|---|---|---|---|
16 | 0.478 μs | 0.141 μs | 3.765 μs | 0.102 μs | 4.575 μs | 1.174 μs |
64 | 0.825 μs | 0.197 μs | 15.291 μs | 0.114 μs | 16.559 μs | 5.072 μs |
256 | 0.960 μs | 0.196 μs | 65.527 μs | 0.114 μs | 69.478 μs | 28.971 μs |
1024 | 1.279 μs | 0.159 μs | 261.275 μs | 0.110 μs | 264.889 μs | 78.302 μs |
O(log n) | O(1) | O(n) | O(1) | O(n) | O(n) |
- pairing_heapsとbinomial_heapsは
O(1)
でマージできている - leftist_heapsは
O(log n)
だが、これも十分高速 - 残りは
O(n)
であり、マージが重要な用途での使用は現実的ではなさそうな感じ
感想
今回計測したほぼ全てのケースで良好な結果が出ているので、基本的にはpairing_heapsを使っておけば問題はなさそう。
ただしsplay_heapsとの差は(マージの場合を除けば)結構軽微なので、実装や利用途によってはsplay_heapsの方が良い可能性もあるかもしれない。
ソート済みのリストに対する破壊的マージソートの改良
以前に載せたマージソート(をベースとしたもの)をSBCL(1.0.58)にコミットしてくれたPaul Khuongさんが、こんな記事を書いていて、なるほどなー、と思ったので、表題に関係する部分を参考にさせて貰って変更前後での比較を行ったメモ。
オリジナルのマージソート
まず、SBCL(1.0.58)のリストに対する破壊的マージソートの実装*1:
;; 二つのソート済みリストのマージ関数 (declaim (inline merge-lists*)) (defun merge-lists* (head list1 list2 test key &aux (tail head)) (declare (type cons head list1 list2) (type function test key) (optimize speed)) (macrolet ((merge-one (l1 l2) `(progn (setf (cdr tail) ,l1 tail ,l1) (let ((rest (cdr ,l1))) (cond (rest (setf ,l1 rest)) (t (setf (cdr ,l1) ,l2) (return (cdr head)))))))) (loop (if (funcall test (funcall key (car list2)) ; this way, equivalent (funcall key (car list1))) ; values are first popped (merge-one list2 list1) ; from list1 (merge-one list1 list2))))) ;; 実行 (merge-lists* '(:head) '(1 3 5) '(2 4 6) #'< #'identity)) => (1 2 3 4 5 6)
;; リストのマージソート関数 (declaim (inline stable-sort-list)) (defun stable-sort-list (list test key &aux (head (cons :head list))) (declare (type list list) (type function test key) (dynamic-extent head)) (labels ((recur (list size) (declare (optimize speed) (type cons list) (type (and fixnum unsigned-byte) size)) (if (= 1 size) (values list (shiftf (cdr list) nil)) (let ((half (ash size -1))) (multiple-value-bind (list1 rest) (recur list half) (multiple-value-bind (list2 rest) (recur rest (- size half)) (values (merge-lists* head list1 list2 test key) rest))))))) (when list (values (recur list (length list)))))) ;; 実行 (stable-sort-list '(8 73 2 40 0 3) #'< #'identity) => (0 2 3 8 40 73)
何種類かデータを用意して実行時間を計測:
;;; 計測用データ ;; 1] 400万要素のソート済みリスト (defparameter *sorted-list* (loop FOR i FROM 0 BELOW 4000000 COLLECT i)) ;; 2] 400万要素の逆順ソート済みリスト (defparameter *reverse-sorted-list* (reverse *sorted-list*)) ;; 3] 400万要素のほぼソート済みリスト1 ※ 千要素に一つがランダムな値 (defparameter *nearly-sorted-list1* (loop FOR i FROM 0 BELOW 4000000 COLLECT (if (zerop (random 1000)) (random 4000000) i))) ;; 4] 400万要素のほぼソート済みリスト2 ※ 複数のソート済みリストが連結 (defparameter *nearly-sorted-list2* (loop REPEAT 4 APPEND (loop FOR i FROM 0 BELOW 1000000 COLLECT i))) ;; 5] 400万要素のランダムなリスト (defparameter *random-list* (loop REPEAT 4000000 COLLECT (random most-positive-fixnum))) ;;; 計測用マクロ (defmacro sort-time (sort-fn-name list) `(let ((list~ (copy-list ,list))) (declare (optimize (speed 3) (safety 0))) (time (progn (,sort-fn-name list~ #'< #'identity) t)))) ;;; 計測 ;; 1] ソート済みリスト (sort-time stable-sort-list *sorted-list*) Evaluation took: 0.254 seconds of real time ; 0.254秒 0.252017 seconds of total run time (0.248016 user, 0.004001 system) 99.21% CPU 508,247,464 processor cycles 0 bytes consed => T ;; 2] 逆順ソート済みリスト (sort-time stable-sort-list *reverse-sorted-list*) Evaluation took: 0.235 seconds of real time ; 0.235秒 0.232015 seconds of total run time (0.232015 user, 0.000000 system) 98.72% CPU 468,869,834 processor cycles 0 bytes consed => T ;; 3] ほぼソート済みリスト1 ※ 千要素に一つがランダムな値 (sort-time stable-sort-list *nearly-sorted-list1*) Evaluation took: 0.348 seconds of real time ; 0.348秒 0.348023 seconds of total run time (0.344022 user, 0.004001 system) 100.00% CPU 694,968,622 processor cycles 0 bytes consed => T ;; 4] ほぼソート済みリスト2 ※ 複数のソート済みリストが連結 (sort-time stable-sort-list *nearly-sorted-list2*) Evaluation took: 0.271 seconds of real time ; 0.271秒 0.272017 seconds of total run time (0.272017 user, 0.000000 system) 100.37% CPU 538,952,732 processor cycles 0 bytes consed => T ;; 5] ランダムリスト (sort-time stable-sort-list *random-list*) Evaluation took: 2.171 seconds of real time ; 2.171秒 2.168135 seconds of total run time (2.160135 user, 0.008000 system) 99.86% CPU 4,332,215,938 processor cycles 0 bytes consed => T
ソート済みのリストに対する改良を加えたマージソート
変更後のマージソート関数: ※ 変更内容はコメントを参照
;; 改良版マージソート関数 ;; - fast-merge-lists*関数が追加されたこと以外は、もともとの関数とほとんど同様 ;; - fast-merge-lists*関数は要素の範囲が重複しない二つのリストをO(1)でマージ可能 (declaim (inline stable-sort-list2)) (defun stable-sort-list2 (list test key &aux (head (cons :head list))) (declare (type list list) (type function test key) (dynamic-extent head)) ;; マージ対象の二つのリスト内の片方が、もう片方に完全に先行している場合は、 ;; 各要素の比較などは省略して、末尾のcdrの更新のみを行う。 (labels ((fast-merge-lists* (try-fast-merge? list1 tail1 list2 tail2 rest) (when try-fast-merge? ;; list1がlist2に完全に先行: (list1 .. tail1) <= (list2 .. tail2) (cond ((not (funcall test (funcall key (car list2)) (funcall key (car tail1)))) (setf (cdr tail1) list2) (return-from fast-merge-lists* (values list1 tail2 rest))) ;; list2がlist1に完全に先行: (list2 .. tail2) < (list1 .. tail1) ((funcall test (funcall key (car tail2)) (funcall key (car list1))) (setf (cdr tail2) list1) (return-from fast-merge-lists* (values list2 tail1 rest))))) ;; その他: 通常のマージ (values (merge-lists* head list1 list2 test key) (if (null (cdr tail1)) tail1 tail2) rest)) ;; トップダウンマージリスト関数: リストの末尾を管理するようになったのとfast-merge-lists*関数を使うようになったこと以外は変更なし (recur (list size) (declare (optimize speed) (type cons list) (type (and fixnum unsigned-byte) size)) (if (= 1 size) (values list list (shiftf (cdr list) nil)) (let ((half (ash size -1))) (multiple-value-bind (list1 tail1 rest) (recur list half) (multiple-value-bind (list2 tail2 rest) (recur rest (- size half)) (fast-merge-lists* (>= size 8) ; オーバヘッドを少なくするために、一定サイズ以上のリストに対してのみ適用を試みる list1 tail1 list2 tail2 rest))))))) (when list (values (recur list (length list)))))) ;; 実行 (stable-sort-list2 '(8 73 2 40 0 3) #'< #'identity) => (0 2 3 8 40 73)
処理時間計測:
;; 1] ソート済みリスト (sort-time stable-sort-list2 *sorted-list*) Evaluation took: 0.086 seconds of real time ; 0.086秒 (変更前: 0.254秒) 0.088005 seconds of total run time (0.088005 user, 0.000000 system) 102.33% CPU 171,845,432 processor cycles 0 bytes consed => T ;; 2] 逆順ソート済みリスト (sort-time stable-sort-list2 *reverse-sorted-list*) Evaluation took: 0.087 seconds of real time ; 0.0.87秒 (変更前: 0.235秒) 0.088006 seconds of total run time (0.088006 user, 0.000000 system) 101.15% CPU 173,196,084 processor cycles 0 bytes consed => T ;; 3] ほぼソート済みリスト1 ※ 千要素に一つがランダムな値 (sort-time stable-sort-list2 *nearly-sorted-list1*) Evaluation took: 0.293 seconds of real time ; 0.293秒 (変更前: 0.348秒) 0.292019 seconds of total run time (0.292019 user, 0.000000 system) 99.66% CPU 585,393,530 processor cycles 0 bytes consed => T ;; 4] ほぼソート済みリスト2 ※ 複数のソート済みリストが連結 (sort-time stable-sort-list2 *nearly-sorted-list2*) Evaluation took: 0.122 seconds of real time ; 0.122秒 (変更前: 0.271秒) 0.120007 seconds of total run time (0.116007 user, 0.004000 system) 98.36% CPU 242,403,024 processor cycles 0 bytes consed => T ;; 5] ランダムリスト (sort-time stable-sort-list2 *random-list*) Evaluation took: 2.193 seconds of real time ; 2.193秒 (変更前: 2.171秒) 2.192138 seconds of total run time (2.164136 user, 0.028002 system) 99.95% CPU 4,376,336,316 processor cycles 0 bytes consed => T
完全にランダムなリストに対するソートは心なしか改良版の方が(ごく若干)遅くなっているように思うが、入力リストにソート済みの部分が多ければ多いほど、確実に改良版の方が速くなっている。
確かに、二つのリストをマージする場合、それぞれの領域が独立しているなら、片方の先頭要素ともう片方の末尾要素を比較するだけで、リスト全体を完全に順序づけ可能なんだけど、自分が実装方法を考えている時には、そのことに思い至らなかった。
なるほどなー。
Lock-Free Queue
compare-and-swap操作を用いたロックフリーなキューの実装。
SBCLでのみ動作*1。
(defpackage lock-free-queue (:use :common-lisp) (:export queue make enq deq empty-p element-count to-list)) (in-package :lock-free-queue) ;; compare-and-swap: 成功した場合はTを、失敗した場合はNILを返す (defmacro compare-and-swap (place old new) `(eq (sb-ext:compare-and-swap ,place ,old ,new) ,old)) ;; キュー構造体 (defstruct queue (head nil :type list) (tail nil :type list)) ;; リストへ変換/空判定/要素数取得 (defun to-list (que) (copy-seq (cdr (queue-head que)))) (defun empty-p (que) (endp (cdr (queue-head que)))) (defun element-count (que) (length (cdr (queue-head que)))) (defmethod print-object ((o queue) stream) (print-unreadable-object (o stream :type t) (format stream "~s ~s" :count (element-count o)))) ;; キューを生成 (defun make (&optional initial-contents) (let ((contents (cons :initial-head initial-contents))) (make-queue :head contents :tail (last contents)))) ;; キューの末尾に要素を追加する ;; => queue (defun enq (x que) (loop WITH new-elem = (list x) FOR tail = (queue-tail que) DO (cond ((cdr tail) (compare-and-swap (queue-tail que) tail (cdr tail))) ; tailの位置を調整 ((compare-and-swap (cdr tail) nil new-elem) (return que))))) ; 追加成功 ;; キューの先頭から要素を取り出す ;; => (or (values 先頭要素 T) ; キューに要素がある場合 ;; (values NIL NIL)) ; キューが空の場合 (defun deq (que) (let* ((head (queue-head que)) (next (cdr head))) (cond ((null next) (values nil nil)) ; 空 ((compare-and-swap (queue-head que) head next) (values (car next) t)) ; 取得成功 (t (deq que))))) ; 他スレッドと競合(リトライ)
実行例:
;; シングルスレッドでの例 (defparameter *que* (lock-free-queue:make)) => *QUE* (lock-free-queue:enq 1 *que*) => #<LOCK-FREE-QUEUE:QUEUE :COUNT 1> (lock-free-queue:enq 2 *que*) => #<LOCK-FREE-QUEUE:QUEUE :COUNT 2> (lock-free-queue:to-list *que*) => (1 2) (lock-free-queue:deq *que*) => 1 T (lock-free-queue:deq *que*) => 2 T (lock-free-queue:deq *que*) => NIL NIL ;; マルチスレッドでの例 (let ((data (loop FOR i FROM 0 BELOW 10000 COLLECT i)) (que (lock-free-queue:make)) (thread-num 500)) ;; enqueuers (loop REPEAT thread-num DO (sb-thread:make-thread (lambda () (dolist (e data) (lock-free-queue:enq e que))))) ;; dequeuer (list (length (loop REPEAT (* thread-num (length data)) COLLECT (loop (multiple-value-bind (val ok?) (lock-free-queue:deq que) (when ok? (return val)))))) que)) => 5000000 #<LOCK-FREE-QUEUE:QUEUE :COUNT 0>
*1:sb-ext:compare-and-swapを置き換えれば他の処理系でも動作可能
逆FizzBuzz
逆FizzBuzz問題 (Inverse FizzBuzz)というものがあるのを知ったので解いてみた。
結構力技。
あと、本当に合っているかは不明。
;; 逆FizzBuzzを解く関数 ;; listは fizz,buzz,fizzbuzz のいずれかを要素に持つリスト ;; ;; 処理内容は、 ;; - 1: 開始数値を(1から15の範囲で)設定して、とりあえず解いてみる ;; - 2: その15個の解の中から、一番短いものを選択する ;; という簡単なもの。 ;; ;; 解がない場合はnilを返す。 (defun ifizzbuzz (list) (select-min (delete '() (loop FOR start FROM 1 TO 15 ; 開始数値を1〜15の範囲を試せば、全パターン網羅できる(はず) COLLECT (ifizzbuzz-impl start list '()))))) ;; リスト内で、一番短いリストを返す (defun select-min (list-of-list) (first (sort list-of-list #'< :key #'length))) ;; 数値をfizzbuzzを表すシンボルに変換する (defun get-fizzbuzz-type (n) (cond ((= (mod n 15) 0) 'fizzbuzz) ((= (mod n 5) 0) 'buzz) ((= (mod n 3) 0) 'fizz) (t 'none))) ;; nを開始とする連続する数値列が、listが示すfizzbuzz列に一致するかを判定する関数 ;; 一致する場合は、その一致した数値列を返す。 (defun ifizzbuzz-impl (n list acc) (if (null list) (reverse acc) ; 一致した (let ((type (get-fizzbuzz-type n))) (if (eq type 'none) ; fizzbuzzと関係ない数値の場合は無条件で許可 (ifizzbuzz-impl (1+ n) list (cons n acc)) (when (eq (car list) type) ; fizzbuzz系の数値の場合は、listの先頭要素と一致するもののみ許可 (ifizzbuzz-impl (1+ n) (cdr list) (cons n acc)))))))
動作例。
> (ifizzbuzz '(fizz)) => (3) > (ifizzbuzz '(fizz buzz)) => (9 10) > (ifizzbuzz '(fizz buzz fizz)) => (3 4 5 6) > (ifizzbuzz '(fizz fizz buzz fizz fizzbuzz fizz)) => (6 7 8 9 10 11 12 13 14 15 16 17 18) > (ifizzbuzz '(buzz buzz)) =>NIL ; 解無し
合ってそうには見える。
N-Queen: 高速化
こちらの記事に刺激を受けて、以前に実装したN-Queenを高速化してみた(Common Lisp版のみ)。
(defvar *fastest* '(optimize (speed 3) (safety 0) (debug 0) (compilation-speed 0))) (deftype max-board-size () '(mod #x100)) (declaim (inline check)) ; inline宣言を追加 (defun check (row queens &optional (r 1) &aux (q (car queens))) (declare #.*fastest* (max-board-size r row q)) (or (null queens) (and (/= q (+ row r) (- row r)) (check row (cdr queens) (1+ r))))) ;; dolistの亜種 ;; - リストの走査時に各要素を変数に束縛するのと同時に、走査中の要素を除いたリストも変数に束縛する ;; ※ 先頭要素は走査対象外 (defmacro dolist2 ((x but-x list) &body body) (multiple-value-bind (recur prev cur next) (values #1=(gensym) #1# #1# #1#) `(let ((,but-x ,list)) (labels ((,recur (,prev &aux (,cur (cdr ,prev))) (when ,cur (destructuring-bind (,x . ,next) ,cur (setf (cdr ,prev) ,next) (locally ,@body) (setf (cdr ,prev) ,cur) (,recur ,cur))))) (,recur ,but-x))))) #| ex: > (dolist2 (x but-x '(:head 1 2 3 a b c)) (print `(:x ,x :but-x ,but-x))) (:X 1 :BUT-X (:HEAD 2 3 A B C)) (:X 2 :BUT-X (:HEAD 1 3 A B C)) (:X 3 :BUT-X (:HEAD 1 2 A B C)) (:X A :BUT-X (:HEAD 1 2 3 B C)) (:X B :BUT-X (:HEAD 1 2 3 A C)) (:X C :BUT-X (:HEAD 1 2 3 A B)) --> NIL |# (defun n-queen (n) (declare #.*fastest* (max-board-size n)) (nlet-acc self (queens (rows (cons :head (loop FOR i FROM 0 BELOW n COLLECT i)))) (if (null (cdr rows)) ; rows == '(:head) (accumulate queens) (dolist2 (row rest-rows rows) (when (check row queens) (self (cons row queens) rest-rows))))))
N-Queen (Haskell + Common Lisp)
Etsukata blog: Haskellでlist monadを使ってN-Queens問題を解いてみました を見たのをきっかけに久しぶりにN-Queen問題を解くプログラムをHaskellで書いてみた。
---- ファイル名: nqueen.hs ---- コンパイル: ghc -O2 -o nqueen nqueen.hs # Glasgow Haskell Compiler, Version 7.0.3 import System -- クイーンの配置: リスト内のオフセットが列を、値が行を表す type Queens = [Int] -- N-Queenを解く: ボードのサイズを受け取り、全ての解答(可能な配置のリスト)を返す nQueens :: Int -> [Queens] nQueens n = solve n [] where solve 0 queens = [queens] -- 最後の列: 全てのクイーンを配置できた solve col queens = -- 途中の列: 全ての行に対して配置可能かを調べ、可能なら次の列に進む concat $ map (solve (col-1) . (:queens)) $ filter (check queens 1) [0..(n-1)] -- クイーンが配置可能かどうか調べる check :: Queens -> Int -> Int -> Bool check [] _ _ = True check (q:qs) r row -- rは対角線上の(チェックすべき)クイーンの位置を知るための変数 | q /= row && q /= row+r && q /= row-r = check qs (r+1) row | otherwise = False -- メイン関数 main = do args <- getArgs let size = (read $ head args)::Int let rlt = nQueens size putStrLn $ show . length $ rlt
実行結果:
$ ./nqueen 12 14200
処理時間
冒頭で挙げた記事のもの(Etsutaka)、および、Common Lisp版(後述)との処理速度の比較。
処理時間(サイズ=11) | 処理時間(サイズ=12) | 処理時間(サイズ=13) | |
---|---|---|---|
nqueen(Haskell:本記事) | 0.080秒 | 0.424秒 | 2.592秒 |
nqueen(Haskell:Etsutaka) | 0.132秒 | 0.736秒 | 4.424秒 |
nqueen(Common Lisp) | 0.071秒 | 0.375秒 | 2.289秒 |
この中ではCommon Lisp版が一番速くなっているけど、Haskellで効率の良いプログラムの書き方とかが全く分かっていないので、その辺を把握してちゃんと書けばHaskell版はもっと速くなるかもしれない。
Common Lisp版
Common Lisp版のソースコード。
内容的には N-Queen(1) - sileの日記 に最適化宣言を加えただけ。
(defvar *fastest* '(optimize (speed 3) (safety 0) (debug 0) (compilation-speed 0))) (deftype max-board-size () '(mod #x100)) (defun check (row queens &optional (r 1)) (declare #.*fastest* (max-board-size r row)) (or (null queens) (and (/= (the max-board-size (car queens)) row (+ row r) (- row r)) (check row (cdr queens) (1+ r))))) (defun n-queen (n) (declare #.*fastest* (max-board-size n)) (nlet-acc self (queens (col n)) (if (zerop col) (accumulate queens) (dotimes (row n) (when (check row queens) (self (cons row queens) (1- col)))))))
;; SBCL-1.0.51 > (n-queen 4) --> ((2 0 3 1) (1 3 0 2)) > (time (length (n-queen 12))) Evaluation took: 0.401 seconds of real time 0.400025 seconds of total run time (0.400025 user, 0.000000 system) [ Run times consist of 0.012 seconds GC time, and 0.389 seconds non-GC time. ] 99.75% CPU 800,068,094 processor cycles 13,926,400 bytes consed --> 14200