文字列/バイト列用のハッシュ関数ライブラリ

A Hash Function for Hash Table Lookupに載っているハッシュ関数(Jenkins Hash)Common Lispで実装した。
github: jenkins-hash(0.0.2)


作成の主な動機は以下の二つ:

おそらくSBCL以外の処理系でも動くと思うけど、動作は未確認。

使用例

以下、使用例。

;;;; SBCL-1.0.51-64bit

;;; 【文字列】
;; 文字列からハッシュ値を算出
(jenkins-hash:hash-string "ハッシュ関数")
--> 3188986421   ; 一つのキーに対して二つのハッシュ値(32bit)を返す
    2167986557

;; パラメータ(seed1,seed2)を替えて別のハッシュ値を算出
(jenkins-hash:hash-string "ハッシュ関数" :seed1 13 :seed2 44444)
--> 2402597428
    3323692532

;; 範囲指定
(jenkins-hash:hash-string "ハッシュ関数" :start 2 :end 4)
--> 58741211
    888923469

;;; 【バイト列】
;; バイト列からハッシュ値を算出
(jenkins-hash:hash-octets (sb-ext:string-to-octets "ハッシュ関数"))
--> 1523938354
    936250363

;; sxhash関数だと配列を与えた場合、全て同じハッシュ値になってしまう
(sxhash (sb-ext:string-to-octets "ハッシュ関数"))
--> 518591303

(sxhash (sb-ext:string-to-octets "別の値"))
--> 518591303

;;; 【複数のハッシュ値】
;; nth-hash関数を使って、任意個のハッシュ値を安価に生成可能
;; ※ 内部的にはDoubleHashingを用いて生成している => 可算一つと乗算一つで算出可能

;; 一つのキーに対する10個の異なるハッシュ値を取得する
(defun hash10 (key)
  (multiple-value-bind (h1 h2) (jenkins-hash:hash-string key)
    ;; 最初の二つはそのまま使って、残りはnth-hash関数で生成する
    `(,h1 ,h2 . ,(loop FOR i FROM 2 BELOW 10 COLLECT (jenkins-hash:nth-hash i h1 h2)))))
       
(hash10 "ハッシュ関数")
--> (3188986421 2167986557 3229992239 1103011500 3270998057 1144017318 3312003875
     1185023136 3353009693 1226028954)

*1:sxhash関数を配列に適用すると常に同じ値が返ってきてしまう。sbcl-1.0.51-64bit

N-Queen (Haskell + Common Lisp)

Etsukata blog: Haskellでlist monadを使ってN-Queens問題を解いてみました を見たのをきっかけに久しぶりにN-Queen問題を解くプログラムをHaskellで書いてみた。

---- ファイル名: nqueen.hs
---- コンパイル: ghc -O2 -o nqueen nqueen.hs  # Glasgow Haskell Compiler, Version 7.0.3

import System

-- クイーンの配置: リスト内のオフセットが列を、値が行を表す
type Queens = [Int]

-- N-Queenを解く: ボードのサイズを受け取り、全ての解答(可能な配置のリスト)を返す
nQueens :: Int -> [Queens]
nQueens n = solve n []
  where solve 0   queens = [queens]   -- 最後の列: 全てのクイーンを配置できた
        solve col queens =            -- 途中の列: 全ての行に対して配置可能かを調べ、可能なら次の列に進む
          concat $ map (solve (col-1) . (:queens)) $ filter (check queens 1) [0..(n-1)]

-- クイーンが配置可能かどうか調べる
check :: Queens -> Int -> Int -> Bool  
check [] _ _  = True
check (q:qs) r row    -- rは対角線上の(チェックすべき)クイーンの位置を知るための変数
  | q /= row && q /= row+r && q /= row-r = check qs (r+1) row
  | otherwise = False
  
-- メイン関数
main = do
  args <- getArgs
  let size = (read $ head args)::Int
  let rlt = nQueens size
  putStrLn $ show . length $ rlt

実行結果:

$ ./nqueen 12
14200

処理時間

冒頭で挙げた記事のもの(Etsutaka)、および、Common Lisp(後述)との処理速度の比較。

  処理時間(サイズ=11) 処理時間(サイズ=12) 処理時間(サイズ=13)
nqueen(Haskell:本記事) 0.080秒 0.424秒 2.592秒
nqueen(Haskell:Etsutaka) 0.132秒 0.736秒 4.424秒
nqueen(Common Lisp) 0.071秒 0.375秒 2.289秒

この中ではCommon Lisp版が一番速くなっているけど、Haskellで効率の良いプログラムの書き方とかが全く分かっていないので、その辺を把握してちゃんと書けばHaskell版はもっと速くなるかもしれない。

Common Lisp

Common Lisp版のソースコード
内容的には N-Queen(1) - sileの日記 に最適化宣言を加えただけ。

(defvar *fastest* '(optimize (speed 3) (safety 0) (debug 0) (compilation-speed 0)))
(deftype max-board-size () '(mod #x100))

(defun check (row queens &optional (r 1))
  (declare #.*fastest*
           (max-board-size r row))
  (or (null queens) 
      (and (/= (the max-board-size (car queens)) row (+ row r) (- row r)) 
	   (check row (cdr queens) (1+ r)))))

(defun n-queen (n)
  (declare #.*fastest*
           (max-board-size n))
  (nlet-acc self (queens (col n))
    (if (zerop col)
        (accumulate queens) 
      (dotimes (row n)
        (when (check row queens)
          (self (cons row queens) (1- col)))))))
;; SBCL-1.0.51
> (n-queen 4)
--> ((2 0 3 1) (1 3 0 2))

> (time (length (n-queen 12)))
Evaluation took:
  0.401 seconds of real time
  0.400025 seconds of total run time (0.400025 user, 0.000000 system)
  [ Run times consist of 0.012 seconds GC time, and 0.389 seconds non-GC time. ]
  99.75% CPU
  800,068,094 processor cycles
  13,926,400 bytes consed
  
--> 14200

マージソート(3): 高階関数呼び出し最適化

マージソート(1)の改良版。
ソートのような高階関数では、引数で渡した比較関数の間接呼び出しのコストも実行速度にそれなりの影響を与えるので、それを(マクロをほとんど使わずに)できるだけ低く抑えるための試み。

比較関数最適化

まず、比較関数自体の実行コストを下げるために、汎用的な数値比較関数ではなく、より特殊化されたものを使用するようにする。

;; fixnum用の比較関数
(define-symbol-macro fixnum< (lambda (x y) (declare (fixnum x y)) (< x y)))

;;; fixnum< を使用した場合の処理時間
;;;
;;; 大量データのソート速度
;;; 100万要素のリストが対象
(sb-sys:without-gcing
 (let* ((data (loop REPEAT 1000000 COLLECT (random 10000000)))
        (d1 (copy-seq data))
        (d2 (copy-seq data))
        (r1 (time (stable-sort d1 fixnum<)))
        (r2 (time (merge-sort:sort d2 fixnum<))))
   (equal r1 r2)))

Evaluation took:
  1.366 seconds of real time  ; stable-sort# 1.366秒 (前回 2.484秒)
  1.360085 seconds of total run time (1.360085 user, 0.000000 system)
  99.56% CPU
  2,723,515,890 processor cycles
  0 bytes consed
  
Evaluation took:
  0.541 seconds of real time  ; merge-sort:sort# 0.541秒 (前回 1.662秒)
  0.540034 seconds of total run time (0.540034 user, 0.000000 system)
  99.82% CPU
  1,079,254,874 processor cycles
  0 bytes consed
  
--> T

後は、ここからどれだけ短縮できるか。

実装

今回のマージソート実装。
八割方は前と同じ。
まず、ほとんど変更がない前半部分から載せる。(変更箇所はコメントで記載)

(defpackage merge-sort
  (:use common-lisp)
  (:shadow :common-lisp sort)
  (:export sort))
(in-package :merge-sort)

;; inline-sort関数とsort-impl関数がinline宣言に加わっている。前者は今回新たに追加される関数
(declaim (inline halve merge-lists inline-sort sort-impl)  
         (optimize (speed 3) (debug 0) (safety 0)))

(defun halve (n)
  (declare (fixnum n))
  (multiple-value-bind (n1 x) (floor n 2)
    (values (+ n1 x) n1)))

(defmacro cdr! (list new-cdr)
  `(setf (cdr ,list) ,new-cdr))

(defmacro multiple-value-let* (bind-specs &body body)
  (if (null bind-specs)
      `(locally ,@body)
    (destructuring-bind ((vars exp) . rest) bind-specs
      `(multiple-value-bind ,vars ,exp
         (multiple-value-let* ,rest ,@body)))))
   
(defun merge-lists (list1 list2 test key)
  (declare (function test key))
  (labels ((less-equal-than (list1 list2)  ; 安定ソートになるように比較方法が若干修正されている
             (not (funcall test (funcall key (car list2)) (funcall key (car list1)))))
           (recur (head tail l1 l2)
             (cond ((null l1)               (cdr! tail l2) head)
                   ((null l2)               (cdr! tail l1) head)
                   ((less-equal-than l1 l2) (recur head (cdr! tail l1) (cdr l1) l2))
                   (t                       (recur head (cdr! tail l2) l1 (cdr l2))))))
    (declare (inline less-equal-than))
    (if (less-equal-than list1 list2)
        (recur list1 list1 (cdr list1) list2)
      (recur list2 list2 list1 (cdr list2)))))

次はsort-impl関数。
量は多くないけど、ここが一番重要な変更箇所。

;; 前回は、sort-impl関数自体で再帰処理を行っていたのを、
;; 再帰部分をrecur関数に括り出すように修正。
;;
;; これによって、sort-impl関数に対してinline宣言を行うことが可能になる。
;;
;; sort-impl関数がinline展開可能となると、
;; inline-sort関数(後述) => sort-impl関数 => merge-lists関数、の
;; 全てがinline展開されるようになるため、
;; コンパイラが(inline-sort関数の引数で渡され)merge-lists関数内でfuncallされている
;; testとkeyの情報を知ることができるようになり、間接呼び出しを除去する等といった
;; 最適化が可能となる(と思っている)。
(defun sort-impl (list size test key)
  (labels ((recur (list size)
             (declare (fixnum size))
             (if (= 1 size)
                 (values list (prog1 (cdr list) (cdr! list nil)))
               (multiple-value-let* (((size1 size2) (halve size))
                                     ((list1 rest) (recur list size1))
                                     ((list2 rest) (recur rest size2)))
                 (values (merge-lists list1 list2 test key) rest)))))
    (recur list size)))

最後はsort関数。
inline展開の有無を選択するための引数を追加してみた。
※ 単にsort関数をinline宣言するだけでも良いのだが、常に展開されるようになってしまうのも避けたかったの若干(無駄に)凝ってみた

;; inline引数を追加。これが真の場合は、inline展開される。
(defun sort (list test &key (key #'identity) inline)
  (declare (list list)
           (function test key)
           (ignore inline)
           (optimize (speed 3) (safety 2) (debug 2)))
  (when list
    (values (sort-impl list (length list) test key))))

;; sort関数のinline展開版。上でinline宣言されている以外は、sort関数と基本的に同様。
(defun inline-sort (list test &key (key #'identity))
  (declare (list list)
           (optimize (speed 3) (safety 0) (debug 0)))
  (when list
    (values (sort-impl list (length list) test key))))

;; sort関数のinline引数が真の場合に、(sort関数の代わりに)inline-sort関数を呼び出すためのコンパイラマクロ
(define-compiler-macro sort (&whole form list test &key (key '#'identity) inline)
  (if inline
      `(inline-sort ,list ,test :key ,key)
    form))

前回*1と比べて、本質的に異なるのは、sort-impl関数がinline展開可能になった、という点だけ。

計時

今回追加した関数(オプション)を加えて、再度計測。

;;; 大量データのソート速度
;;; 100万要素のリストが対象
(sb-sys:without-gcing
 (let* ((data (loop REPEAT 1000000 COLLECT (random 10000000)))
        (d1 (copy-seq data))
        (d2 (copy-seq data))
        (d3 (copy-seq data))
        (r1 (time (stable-sort d1 fixnum<)))
        (r2 (time (merge-sort:sort d2 fixnum<)))
        (r3 (time (merge-sort:sort d3 fixnum< :inline t)))) 
   (list (equal r1 r2)
         (equal r1 r3))))

Evaluation took:
  1.336 seconds of real time  ; stable-sort# 1.336秒
  1.332083 seconds of total run time (1.332083 user, 0.000000 system)
  99.70% CPU
  2,664,840,158 processor cycles
  0 bytes consed
  
Evaluation took:
  0.555 seconds of real time  ; merge-sort:sort# 0.555秒
  0.552034 seconds of total run time (0.552034 user, 0.000000 system)
  99.46% CPU
  1,107,829,062 processor cycles
  0 bytes consed
  
Evaluation took:
  0.382 seconds of real time  ; merge-sort:sort(inline)# 0.382秒
  0.376024 seconds of total run time (0.376024 user, 0.000000 system)
  98.43% CPU
  761,537,180 processor cycles
  0 bytes consed
  
--> (T T)


;;; 少量データのソート速度
;;; 平均50要素のリスト x 1万 が対象
(sb-sys:without-gcing
 (let* ((data (loop REPEAT 10000 
                    COLLECT (loop REPEAT (random 100) 
                                  COLLECT (random 10000))))
        (d1 (copy-tree data))
        (d2 (copy-tree data))
        (d3 (copy-tree data))
        (r1 (time (loop FOR d IN d1 COLLECT (stable-sort d fixnum<))))
        (r2 (time (loop FOR d IN d2 COLLECT (merge-sort:sort d fixnum<))))
        (r3 (time (loop FOR d IN d3 COLLECT (merge-sort:sort d fixnum< :inline t)))))
   (list (equal r1 r2)
         (equal r1 r3))))

Evaluation took:
  0.072 seconds of real time ; stable-sort# 0.072秒
  0.072004 seconds of total run time (0.072004 user, 0.000000 system)
  100.00% CPU
  144,958,896 processor cycles
  327,680 bytes consed
  
Evaluation took:
  0.058 seconds of real time  ; merge-sort:sort# 0.058秒
  0.056003 seconds of total run time (0.056003 user, 0.000000 system)
  96.55% CPU
  116,927,902 processor cycles
  163,840 bytes consed
  
Evaluation took:
  0.036 seconds of real time   ; merge-sort:sort(inline)# 0.036秒
  0.032002 seconds of total run time (0.032002 user, 0.000000 system)
  88.89% CPU
  72,255,454 processor cycles
  163,840 bytes consed
  
--> (T T)

今回のように比較関数自体の実行コストが低い場合だと、関数呼び出し(funcall)部分を含めてinline化するだけで、処理時間が2/3程度に削減できていることが分かる。

マージソート(2): 要素数が少ない部分リストの特別扱い

昨日に作成したマージソートに手を加えたもの。
素数が少ない部分リスト*1には、(再帰的な)マージソートではなく、ソーティングネットワーク的なソートを適用することで高速化を図った。
けど、結果的にはほとんど効果がなかった。

計時

まず計測結果から載せる。

;;; 大量データのソート速度
;;; 100万要素のリストが対象
(sb-sys:without-gcing
 (let* ((data (loop REPEAT 1000000 COLLECT (random 10000000)))
        (d1 (copy-seq data))
        (d2 (copy-seq data))
        (r1 (time (stable-sort d1 #'<)))
        (r2 (time (merge-sort:sort d2 #'<))))  ; 更新版: 実装は末尾に掲載
   (equal r1 r2)))

Evaluation took:
  2.542 seconds of real time  ; stable-sort# 2.542秒 (前回 2.484秒)
  2.536158 seconds of total run time (2.536158 user, 0.000000 system)
  99.76% CPU
  5,071,126,128 processor cycles
  0 bytes consed
  
Evaluation took:
  1.691 seconds of real time   ; merge-sort:sort# 1.691秒 (前回 1.662秒)
  1.688106 seconds of total run time (1.688106 user, 0.000000 system)
  99.82% CPU
  3,373,722,509 processor cycles
  0 bytes consed
  
--> T


;;; 少量データのソート速度
;;; 平均50要素のリスト x 1万 が対象
(sb-sys:without-gcing
 (let* ((data (loop REPEAT 10000 
                    COLLECT (loop REPEAT (random 100) 
                                  COLLECT (random 10000))))
        (d1 (copy-tree data))
        (d2 (copy-tree data))
        (r1 (time (loop FOR d IN d1 COLLECT (stable-sort d #'<))))
        (r2 (time (loop FOR d IN d2 COLLECT (merge-sort:sort d #'<)))))
   (equal r1 r2)))

Evaluation took:
  0.207 seconds of real time  ; stable-sort# 0.207秒 (前回 0.204秒)
  0.204013 seconds of total run time (0.204013 user, 0.000000 system)
  98.55% CPU
  414,010,874 processor cycles
  327,680 bytes consed
  
Evaluation took:
  0.174 seconds of real time   ; merge-sort:sort# 0.174秒 (前回 0.176秒)
  0.172011 seconds of total run time (0.172011 user, 0.000000 system)
  98.85% CPU
  346,667,396 processor cycles
  163,840 bytes consed
  
--> T

見ての通り、全くと云って良いほど(前回と)結果に差異がない。
少しくらいは速くなるかと期待していたのだけれど・・・。

ソースコード

今回の実装のソースコード
特に何かが改善されたということでもないので、コメントはいつも以上に手抜き。

(defpackage merge-sort
  (:use common-lisp)
  (:shadow :common-lisp sort)
  (:export sort))
(in-package :merge-sort)

(declaim (inline halve last! merge-lists less-equal-than
                 sort2 sort3 sort4 sort5)
         (optimize (speed 3) (debug 0) (safety 0)))

(defun halve (n)
  (declare (fixnum n))
  (multiple-value-bind (n1 x) (floor n 2)
    (values (+ n1 x) n1)))

(defmacro cdr! (list new-cdr)
  `(setf (cdr ,list) ,new-cdr))

(defun last! (list)
  (prog1 (cdr list) (cdr! list nil)))

(defmacro multiple-value-let* (bind-specs &body body)
  (if (null bind-specs)
      `(locally ,@body)
    (destructuring-bind ((vars exp) . rest) bind-specs
      `(multiple-value-bind ,vars ,exp
         (multiple-value-let* ,rest ,@body)))))
   
(defun less-equal-than (list1 list2 test key)
  (declare (function test key))
  (not (funcall test (funcall key (car list2)) (funcall key (car list1)))))

(defun merge-lists (list1 list2 test key)
  (declare (function test key))
  (labels ((recur (head tail l1 l2)
             (cond ((null l1) (cdr! tail l2) head)
                   ((null l2) (cdr! tail l1) head)
                   ((less-equal-than l1 l2 test key) 
                    (recur head (cdr! tail l1) (cdr l1) l2))
                   (t                 
                    (recur head (cdr! tail l2) l1 (cdr l2))))))
    (if (less-equal-than list1 list2 test key)
        (recur list1 list1 (cdr list1) list2)
      (recur list2 list2 list1 (cdr list2)))))

(eval-when (:compile-toplevel :load-toplevel :execute)
  (defun symb (&rest args)
    (intern (format nil "~{~a~}" args))))

(defun sort2 (list test key &aux (l1 list) (l2 (cdr list)))
  (unless (less-equal-than l1 l2 test key)
    (rotatef (car l1) (car l2)))
  (values l1 (last! l2)))


          ;; (vars (a b c) (list key)
          ;;   body)
          ;;
          ;; =>
          ;; (let* ((a list)
          ;;        (b (cdr a))
          ;;        (c (cdr b)))
          ;;   (let ((_a (funcall key (car a)))
          ;;         (_b (funcall key (car b)))
          ;;         (_c (funcall key (car c))))
          ;;     body))
(macrolet ((vars (vars (list key) &body body)
             `(let* ,(loop FOR prev = nil THEN var
                           FOR var IN vars
                           FOR i fixnum FROM 0
                           COLLECT (if prev 
                                       `(,var (cdr ,prev))
                                     `(,var ,list)))
                (declare (function ,key))
                (let ,(loop FOR var IN vars
                            COLLECT `(,(symb '_ var) (funcall ,key (car ,var))))
                  ,@body)))
           (swap-if-greater-than (x y test)
             `(unless (less-equal-than ,x ,y ,test #'identity)
                (rotatef (car ,x) (car ,y))
                (rotatef ,(symb '_ x) ,(symb '_ y)))))

  (defun sort3 (list test key)
    (vars (a b c) (list key)
      (swap-if-greater-than a c test)
      (swap-if-greater-than a b test)
      (swap-if-greater-than b c test)
      (values a (last! c))))
  
  (defun sort4 (list test key)
    (vars (a b c d) (list key)
      (swap-if-greater-than a c test)
      (swap-if-greater-than b d test)
      (swap-if-greater-than a b test)
      (swap-if-greater-than c d test)
      (swap-if-greater-than b c test)
      (values a (last! d))))

  (defun sort5 (list test key)
    (vars (a b c d e) (list key)
      (swap-if-greater-than a b test)
      (swap-if-greater-than d e test)
      (swap-if-greater-than a c test)
      (swap-if-greater-than b c test)
      (swap-if-greater-than a d test)
      (swap-if-greater-than c d test)
      (swap-if-greater-than b e test)
      (swap-if-greater-than b c test)
      (swap-if-greater-than d e test)
      (values a (last! e)))))

(defun sort-impl (list size test key)
  (declare (fixnum size))
  (case size
    (5 (sort5 list test key))
    (4 (sort4 list test key))
    (3 (sort3 list test key))
    (otherwise
     (multiple-value-let* (((size1 size2) (halve size))
                           ((list1 rest) (sort-impl list size1 test key))
                           ((list2 rest) (sort-impl rest size2 test key)))
       (values (merge-lists list1 list2 test key) rest)))))

(defun sort (list test &key (key #'identity) &aux (size (length list)))
  (declare (list list)
           (function test key)
           (optimize (speed 3) (safety 2) (debug 2)))
  (case size
    ((0 1) list)
    (2 
     (values (sort2 list test key)))
    (otherwise
     (values (sort-impl list size test key)))))

*1:具体的には要素数が5以下の部分リスト

マージソート(1)

久々にマージソートを実装してみたら、結構良いものができたので載せておく。
まずはパッケージ定義とグルーバルな宣言。

;;;; SBCL-1.0.51 (x86-64)
(defpackage merge-sort
  (:use common-lisp)
  (:shadow :common-lisp sort)
  (:export sort))
(in-package :merge-sort)

(declaim (inline halve merge-lists)
         (optimize (speed 3) (debug 0) (safety 0)))

ユーティリティ関数とマクロ。

;; 整数nを二分割する関数
;; => (values n1 n2)
;;
;; nが奇数の場合は (= (1+ n1) n2) となる
(defun halve (n)
  (declare (fixnum n))
  (multiple-value-bind (n1 x) (floor n 2)
    (values (+ n1 x) n1)))

;; listのcdr部を更新するマクロ
(defmacro cdr! (list new-cdr)
  `(setf (cdr ,list) ,new-cdr))

;; 複数のmultiple-value-bindの使用を簡略化するためのマクロ
;;
;; (multiple-value-let* (((a b) (values 1 2))
;;                       ((c d) (values 3 4)))
;;   (list a b c d))
;; =>
;; (multiple-value-bind (a b) (values 1 2)
;;    (multiple-value-bind (c d) (values 3 4)
;;        (list a b c d)))
(defmacro multiple-value-let* (bind-specs &body body)
  (if (null bind-specs)
      `(locally ,@body)
    (destructuring-bind ((vars exp) . rest) bind-specs
      `(multiple-value-bind ,vars ,exp
         (multiple-value-let* ,rest ,@body)))))

マージソート実装。

;; list1とlist2をマージしたリストを返す
(defun merge-lists (list1 list2 test key)
  (declare (function test key))
  (labels ((not-less-than (l1 l2)
             (not (funcall test (funcall key (car l1)) (funcall key (car l2))))) ; XXX: これでは安定ソートにならない!
           (recur (head tail l1 l2)
             (cond ((null l1)             (cdr! tail l2) head)
                   ((null l2)             (cdr! tail l1) head)
                   ((not-less-than l1 l2) (recur head (cdr! tail l2) l1 (cdr l2)))
                   (t                     (recur head (cdr! tail l1) (cdr l1) l2)))))
    (declare (inline not-less-than))
    (if (not-less-than list1 list2)
        (recur list2 list2 list1 (cdr list2))
      (recur list1 list1 (cdr list1) list2))))

;; マージソート
;; => (values ソート済みリスト=(subseq list 0 size) 
;;            未ソートリスト=(subseq list size))
(defun sort-impl (list size test key)
  (declare (fixnum size))
  (if (= 1 size)
      (values list (prog1 (cdr list) (cdr! list nil)))
    (multiple-value-let* (((size1 size2) (halve size))
                          ((list1 rest) (sort-impl list size1 test key))
                          ((list2 rest) (sort-impl rest size2 test key)))
      (values (merge-lists list1 list2 test key) rest))))

;; 公開用の関数: listが空でなければ、sort-implに処理を任せる
(defun sort (list test &key (key #'identity))
  (declare (list list)
           (function test key)
           (optimize (speed 3) (safety 2) (debug 2)))
  (when list
    (values (sort-impl list (length list) test key))))

以上。

計時

SBCLの組み込みのマージソート(stable-sort関数)との比較。

;;; 大量データのソート速度
;;; 100万要素のリストが対象
(sb-sys:without-gcing
 (let* ((data (loop REPEAT 1000000 COLLECT (random 10000000)))
        (d1 (copy-seq data))
        (d2 (copy-seq data))
        (r1 (time (stable-sort d1 #'<)))
        (r2 (time (merge-sort:sort d2 #'<))))
   (equal r1 r2))) ; ソート結果が正しいかの確認

Evaluation took:
  2.484 seconds of real time   ; stable-sort# 2.484秒
  2.476154 seconds of total run time (2.368148 user, 0.108006 system)
  99.68% CPU
  4,955,234,522 processor cycles
  0 bytes consed
  
Evaluation took:
  1.662 seconds of real time   ; merge-sort:sort# 1.662秒
  1.652103 seconds of total run time (1.592099 user, 0.060004 system)
  99.40% CPU
  3,315,393,750 processor cycles
  0 bytes consed
 
--> T  ; ソート結果は正しい


;;; 少量データのソート速度
;;; 平均50要素のリスト x 1万 が対象
(sb-sys:without-gcing
 (let* ((data (loop REPEAT 10000 
                    COLLECT (loop REPEAT (random 100) 
                                  COLLECT (random 10000))))
        (d1 (copy-tree data))
        (d2 (copy-tree data))
        (r1 (time (loop FOR d IN d1 COLLECT (stable-sort d #'<))))
        (r2 (time (loop FOR d IN d2 COLLECT (merge-sort:sort d #'<)))))
   (equal r1 r2)))

Evaluation took:
  0.204 seconds of real time  ; stable-sort# 0.204秒
  0.204012 seconds of total run time (0.204012 user, 0.000000 system)
  100.00% CPU
  407,272,146 processor cycles
  294,912 bytes consed
  
Evaluation took:
  0.176 seconds of real time  ; merge-sort:sort# 0.176秒
  0.172010 seconds of total run time (0.172010 user, 0.000000 system)
  97.73% CPU
  351,409,803 processor cycles
  163,840 bytes consed
  
--> T

自分の環境で試した限りでは、なかなか良好な結果となった。

Forthでハノイの塔

Forthを触ってみたくなったので、試しにハノイの塔を実装してみた。
ついでにcommon lispC++でも実装し、Forthとの処理速度を比較してみた。
common lispの処理系にはSBCLを、Forthの処理系にはgforth及びVFX-Forthを使用した

Forthでの実装(gforth)

gforthはバイトコードタイプのForth実装。

\ gforth-0.7.0 (linux/64bit)
\ Forth版のハノイの塔

variable cnt             \ 再帰呼び出しの数をカウント用の変数

: hanoi-print ( to from -- to from )
    cr 2dup . ." -> " . ;

: hanoi-impl ( to tmp from level -- to tmp from )
    dup >r 0> if
        rot swap r@ 1- recurse                   ( tmp to from )
        cnt @ 1+ cnt ! 
        ( hanoi-print 計時用にコメントアウト )   ( tmp to from )
        rot r@ 1- recurse                        ( to from tmp )
        swap                                     ( to tmp from )
    then
    r> drop ;

: hanoi ( to tmp from level -- )
    0 cnt !                      \ カウント初期化
    hanoi-impl drop drop drop
    cr ." count: " cnt @ . cr ;  \ 再帰数表示

hanoi-printワードをコメントアウトしなかった場合の実行結果。

$ gforth      # shellからgforthコマンドを起動
1 2 3 3 hanoi 
3 -> 1 
3 -> 2 
1 -> 2 
3 -> 1 
2 -> 3 
2 -> 1 
3 -> 1 
count: 7 
 ok

gforthでの計時。

$ gforth-fast   # 高速版のコマンドを使用する

\ 計時用のワードを定義
: time ( word_pointer -- )
    utime drop >r
    execute
    utime drop r> - 1000 / ." elapsed " . ." ms " cr ;

\ 計時
1 2 3 25 ' hanoi time
count: 33554431 
elapsed 3359 ms  \ 結果: 3.34秒
 ok

Forthでの実装(VFX-Forth)

VFX-ForthはネイティブコードタイプのForth実装(? 不確か)。
ワードの定義はgforthのそれと同様なので計時部分だけ掲載。

# VFX-Forth-4.43 (linux/32bit ?)
$ vfxlin

\ 計時用のワードを定義
: time ( word_pointer -- )
    ticks >r
    execute
    ticks r> - ." elapsed " . ." ms " cr ;

\ 計時
1 2 3 25 ' hanoi time
count: 33554431 
elapsed 190 ms   \ 結果: 0.19秒
 ok

common lispでの実装(SBCL)

実装及び計時結果。

;;;; sbcl-1.0.49 (linux/64bit)

;; 関数定義
(defvar *count*)

(defun hanoi-impl (from tmp to level)
  (declare (fixnum level)
           (optimize (speed 3) (safety 0) (debug 0))
           (sb-ext:unmuffle-conditions sb-ext:compiler-note))
  (when (plusp level)
    (hanoi-impl from to tmp (1- level))
    (incf (the fixnum *count*))     ; (format t "~&~a => ~a~%" from to)
    (hanoi-impl tmp from to (1- level))))

(defun hanoi (from tmp to level)
  (let ((*count* 0))
    (hanoi-impl from tmp to level)
    `(:count ,*count*)))

;; 計時
(time (hanoi 'a 'b 'c 25))

Evaluation took:
  0.403 seconds of real time   ; 結果: 0.40秒
  0.390000 seconds of total run time (0.390000 user, 0.000000 system)
  96.77% CPU
  804,338,623 processor cycles
  0 bytes consed
  
(:COUNT 33554431)

C++での実装(GCC)

ソースコード

/*  ファイル名: hanoi.cc
 *  コンパイル: g++ -O3 -o hanoi hanoi.cc
 *  バージョン: gcc-4.4.3 (linux/64bit)
 */
#include <iostream>

int count;

void hanoi(int from, int tmp, int to, int level) {
  if(level > 0) {
    hanoi(from, to, tmp, level-1);
    count++;
    hanoi(tmp, from, to, level-1);
  }
}

int main() {
  count=0;
  hanoi(1, 2, 3, 25);
  std::cout << "count: " << count << std::endl;
  return 0;
}

コンパイル & 実行。

$ g++ -O3 -o hanoi hanoi.cc
$ time ./hanoi 
count: 33554431

real	0m0.168s  # 結果: 0.17秒
user	0m0.170s
sys	0m0.000s

ハノイの塔での処理速度比較した結果は、C++(0.17秒)、Forth-VFX(0.20秒)、lisp(0.40秒)、Forth-gforth(3.34秒)、の順となった。
Forthも処理系によっては、C++と同程度の速度がでるみたい。※ 雑な比較なので正確なところは分からないけど・・・

おまけ: 末尾再帰 => ループ変換 版

末尾再帰部分を手動でループに変換したソースコードも書いてみたので、載せておく。

variable cnt 

: 3dup ( a b c -- a b c a b c )
    dup 2over rot ;

: hanoi-impl ( to tmp from level -- )
    begin dup >r 0> while
            rot swap 3dup r@ 1- recurse ( tmp to from )
            cnt @ 1+ cnt !
            rot r> 1-                   ( to from tmp )
    repeat
    r> drop drop drop drop ;

: hanoi ( to tmp from level -- )
    0 cnt !                      
    hanoi-impl
    cr ." count: " cnt @ . cr ; 

この変換によって、gforthの場合は処理時間が短く(2.32秒)なったけど、VFX-Forthでは逆に長く(0.35秒)なってしまった。

ソート済みファイルからO(1)のメモリ使用量でDoubleArrayを構築する方法 #APPENDIX

前半および後半で実装を省略していたパッケージのソースコードをここに載せておく。
buffered-outputパッケージとnode-allocatorパッケージの二つ。

buffered-output

ランダムアクセス可能なバイナリファイル出力用のパッケージ。
DoubleArray構築時に使われる。
※ DoubleArrayは一旦オンメモリに構築されることなく、このパッケージを使って直接(随時)ファイルに書き出してしまう
DoubleArray構築時のファイルアクセスパターンに特化して効率化(シークの軽減等)を図っている。
アクセスパターン:

  • 大局的に見れば、ほぼシーケンシャルな書込アクセス
    • ノードは添字の若い方から順に割当られていく傾向があるため ※ もちろんノード割当ロジックに依存するが
    • 狭い範囲内(ブロック)でのランダムアクセスは多々ある
      • バッファ(キャッシュ)を用意して、ブロックでの書込みは、まずそこに行われるようにする
      • バッファの内容は次のブロックに移る前に、ファイルに書き出される
  • まれに大きく離れた場所への書込アクセスが発生する可能性があるが、性能に与える影響は小さいので無視

具体的な数値は手元には無いが、このbuffered-outputパッケージを使った場合と、通常(?)の「まずオンメモリでDoubleArrayを作成し、再度にまとめてファイルに書き出す」方法を以前に比べた時、ほとんど気になるような処理速度の差はなかったように思う。
所要メモリはO(1)。 ※ ブロック・バッファサイズに依存: +BUFFER_SIZE+ * 配列の要素のサイズ

(defpackage buffered-output
  (:use :common-lisp)
  (:export buffered-output
           with-output
           write-uint))
(in-package :buffered-output)

;;;;;;;;
;;; type
(deftype array-index () `(mod ,array-dimension-limit))
(deftype positive-fixnum () `(integer 0 ,most-positive-fixnum))

;;;;;;;;;;;;
;;; constant
(defconstant +BUFFER_SIZE+ 819200)
(defconstant +CODE_LIMIT+ #x100)

;;;;;;;;;;;;;;;;;;;
;;; buffered-output
(defstruct buffered-output
  (binary-output nil :type file-stream)
  (buffer        #() :type simple-array)
  (offset          0 :type array-index))

;;;;;;;;;;;;;;;;;;;;;
;;; external function
(defmacro with-output ((out path &key (byte-width 1)) &body body)
  (declare ((mod 9) byte-width))
  `(with-open-file (,out ,path :element-type #1='(unsigned-byte ,(* 8 byte-width))
                               :direction :output
                               :if-exists :supersede)
     (let ((,out (make-buffered-output 
                  :binary-output ,out
                  :buffer (make-array ,+BUFFER_SIZE+ :element-type #1#
                                                     :initial-element 0))))
       (unwind-protect
           (locally ,@body)
         (flush ,out :final t)))))

;; unsigned-int型の数値をpositionの位置に書き込む
(defun write-uint (uint out &key (position 0))
  (declare (buffered-output out)
           (positive-fixnum position))
  (with-slots (binary-output buffer offset) out
    (cond ((< position offset)  ; a] 現在のブロックよりも前方に書き込む場合: 通常のランダムアクセス書込み(シーク発生)
           (file-position binary-output position)
           (write-byte uint binary-output))
          ((< position (+ offset +BUFFER_SIZE+)) ; b] ブロック内の書き込む場合: バッファに書込み
           (setf (aref buffer (- position offset)) uint))
          (t            ; c] 現在のブロックよりも後方に書き込む場合: バッファの内容を全て出力した後に、ブロックを移動
           (flush out)
           (incf offset +BUFFER_SIZE+)
           (fill buffer 0)
           (write-uint uint out :position position)))))

;;;;;;;;;;;;;;;;;;;;;
;;; internal function
(defun flush (out &key final)
  (declare (buffered-output out))
  (with-slots (binary-output buffer offset) out
    (file-position binary-output offset)
    (if (null final)
        (write-sequence buffer binary-output)
      (let ((end (or (position-if-not #'zerop buffer :from-end t)
                     (1- +BUFFER_SIZE+))))
        (write-sequence buffer binary-output :end (1+ end))
        ;; da:member?関数内で 範囲外アクセスチェック を行わないで済むように余裕を設けておく
        (loop REPEAT +CODE_LIMIT+ DO (write-byte 0 binary-output))))))  

node-allocator

ノードの割当を管理するパッケージ。
子ノード(のラベルの)リストを受け取って、使用可能なノード(BASE)のインデックスを返す。
まだソースコードが未整理なので、ややこしい・・・。
基本的な割当ロジックは『An Efficient Implementation of Trie Structures』にも出てくるリンクリストによる実装とほとんど変わらない*1
ただ、このパッケージでも(シーケンシャルな?)ブロック的な考え方を採用していて「まずはブロック1(0 〜 +BUFFER_SIZE+)の中でノードを割当」て、その範囲で足りなくなったら「前のブロックは捨てて(もう使わない)、次のブロック(+BUFFER_SIZE+ 〜 +BUFFER_SIZE+*2)内で割当を行う」といったことを繰り返している。
こちらも所要メモリはO(1)。 ※ ブロックサイズに依存: +BUFFER_SIZE+ * 定数値

(defpackage node-allocator
  (:use :common-lisp)
  (:export node-allocator
           make
           allocate))
(in-package :node-allocator)

;;;;;;;;;;;;;;;
;;; declaration
(declaim (inline get-next can-allocate?))

;;;;;;;;
;;; type
(deftype array-index () `(mod ,array-dimension-limit))
(deftype octet () '(unsigned-byte 8))

;;;;;;;;;;;;
;;; constant
(eval-when (:compile-toplevel :load-toplevel :execute)
  (defconstant +BUFFER_SIZE+ 89120))

;;;;;;;;;;;;;;;;;;
;;; node-allocator
(defstruct node-allocator 
  (head #x100 :type array-index)
  (bits   #*  :type (simple-bit-vector #.+BUFFER_SIZE+))
  (nexts #()  :type (simple-array fixnum (#.+BUFFER_SIZE+)))
  (prevs #()  :type (simple-array fixnum (#.+BUFFER_SIZE+)))
  (offset  0  :type array-index))

(defmethod print-object ((o node-allocator) stream)
  (print-unreadable-object (o stream :type t :identity t)))

;;;;;;;;;;;;;;;
;;; constructor
(defun make ()
  (let ((bits  (make-array +BUFFER_SIZE+ :element-type 'bit :initial-element 0))
        (nexts (make-array +BUFFER_SIZE+ :element-type 'fixnum))
        (prevs (make-array +BUFFER_SIZE+ :element-type 'fixnum)))
    (loop FOR i FROM 0 BELOW +BUFFER_SIZE+ 
      DO
      (setf (aref nexts i) (1+ i)
            (aref prevs i) (1- i)))
    (make-node-allocator :nexts nexts :prevs prevs :bits bits)))

;;;;;;;;;;;;;;;;;;;;;;
;;; auxiliary function
(defun shift (alloca)
  (with-slots (bits nexts prevs offset head) (the node-allocator alloca)
    (let ((new-offset head))
      (loop WHILE (< new-offset (+ offset (- +BUFFER_SIZE+ (* #x100 2))))
        DO
        (setf new-offset (aref nexts (- new-offset offset))))
      (let* ((delta (- new-offset offset))
             (use-len (- +BUFFER_SIZE+ delta)))
        (shiftf (subseq bits 0 use-len) (subseq bits delta))
        (fill bits 0 :start use-len)

        (setf offset new-offset)
        
        (shiftf (subseq nexts 0 use-len) (subseq nexts delta))
        (shiftf (subseq prevs 0 use-len) (subseq prevs delta))
        (loop FOR i FROM (+ offset use-len) BELOW (+ offset +BUFFER_SIZE+)
          DO
          (setf (aref nexts (- i offset)) (1+ i)
                (aref prevs (- i offset)) (1- i)))

        (setf head offset)
        (loop WHILE (< head (+ offset #x100))
          DO
          (setf head (aref nexts (- head offset)))))))
  alloca)

(defun ref (alloca index)
  (declare (array-index index))
  (with-slots (offset nexts) (the node-allocator alloca)
    (if (<= (+ offset +BUFFER_SIZE+) index)
        (ref (shift alloca) index) 
      (aref nexts (- index offset)))))

(defun bref (alloca index)
  (declare (array-index index))
  (with-slots (bits offset) (the node-allocator alloca)
    (if (> offset index)
        1
      (if (<= (+ offset +BUFFER_SIZE+) index)
          (bref (shift alloca) index)
        (bit bits (- index offset))))))

(defun get-next (alloca index)
  (ref alloca index))

(defun can-allocate? (alloca index arcs)
  (declare (list arcs)
           (array-index index))
  (and (zerop (bref alloca index))
       (every (lambda (arc)
                (declare (octet arc))
                (/= -1 (ref alloca (+ index arc))))
              arcs)))

(defun allocate-impl (alloca index arcs)
  (declare (array-index index))
  (with-slots (bits head prevs nexts offset) (the node-allocator alloca)
    (when (<= offset index)
      (setf (bit bits (- index offset)) 1))
    (loop WITH base = index
          FOR arc OF-TYPE (mod #x100) IN arcs
          FOR index OF-TYPE fixnum = (+ base arc)
      DO
      (when (<= offset index)
        (ref alloca index)

        (let ((prev (aref prevs (- index offset)))
              (next (aref nexts (- index offset))))
          (setf (aref prevs (- index offset)) -1
                (aref nexts (- index offset)) -1)
          
          (when (= head index)
            (setf head next))

          (when (<= offset prev)
            (setf (aref nexts (- prev offset)) next))

          (when (<= offset next)
            (ref alloca next)
            (setf (aref prevs (- next offset)) prev)))))))

;;;;;;;;;;;;;;;;;;;;;
;;; external function
(defun allocate (alloca arcs)
  (with-slots (head) (the node-allocator alloca)
    (loop WITH front OF-TYPE (mod #x100) = (car arcs)
          FOR cur = (get-next alloca head) THEN (get-next alloca cur)
          FOR base OF-TYPE fixnum = (- cur front)
          UNTIL (and (plusp base) (can-allocate? alloca base (cdr arcs)))
      FINALLY
      (allocate-impl alloca base arcs)
      (return base))))

*1:異なるのはCHECK配列に格納されているのが遷移元ノードのインデックスではなく、遷移文字(ラベル)なため、BASE値が重複して使われているのを禁止している点くらいだと思う