ループ処理を関数型っぽく書いてみる(2)

前回の続き。
githubにあるloopの簡易版を載せておく。

基本的な考え方

基本的なJava等のIteratorと似た*1インタフェースを通してループ処理を実現している。
異なるのは全ての関数をinline展開可能にすることで、同等のループを非関数型的に書いた場合と同じくらいに、コンパイラが最適化を行ってくれることを期待していることくらい。
後は、SBCLの最適化の制限上、構造体等は使用せず、極力lambdaで全てを表現するようにしている。

実装

まず、loopパッケージ用のシーケンス生成関数。

;; 数値の範囲を表現するシーケンス
(declaim (inline from))
(defun from (start &key to (by 1))  ; toがnilなら無限シーケンス
  ;; 全体をlambdaで囲む。このlambdaの呼び出しがシーケンスの初期化処理に相当する。
  (lambda () 
    (let ((cur start))
      ;; 以下の三つの関数を呼び出し元に返す
      (values (lambda () (incf cur by))          ; 1] 値更新関数
              (lambda () (and to(> cur to)))     ; 2] 終端判定関数
              (lambda (fn) (funcall fn cur)))))) ; 3] ループの本体実行関数

;; リスト用
(declaim (inline for-list))
(defun for-list (list)
  (lambda ()
    (let ((head list)) ; 初期値
      (values (lambda () (setf head (cdr head)))         ; 1] 値更新関数
              (lambda () (endp head))                    ; 2] 終端判定関数
              (lambda (fn) (funcall fn (car head)))))))  ; 3] ループの本体実行関数

;; 実行
> (from 1 :to 10)
#<CLOSURE (LAMBDA () :IN FROM) {1007855D3B}>

> (funcall (from 1 :to 10))
#<CLOSURE (LAMBDA () :IN FROM) {10078C432B}>   ; 1] 値更新関数
#<CLOSURE (LAMBDA () :IN FROM) {10078C434B}>   ; 2] 終端判定関数
#<CLOSURE (LAMBDA (FN) :IN FROM) {10078C436B}> ; 3] ループの本体実行関数

> (setf (values next end? call-body) (funcall (from 1 :to 10)))
> (funcall call-body (lambda (x) (list :val x)))
=> (:val 1)

> (funcall next)
> (funcall call-body (lambda (x) (list :val x)))
=> (:val 2)

上の関数で生成されたシーケンスを走査する関数。

;; 一番基本となる走査関数
(declaim (inline each))
(defun each (fn seq)
  (multiple-value-bind (next-fn end-fn call-fn) (funcall seq)  ; シーケンス初期化
    (loop UNTIL (funcall end-fn)   ; 終端判定
          DO (funcall call-fn fn)  ; 本体実行
             (funcall next-fn))))  ; 値更新

;; 畳み込み関数  ※ reduceはclパッケージとそれと名前が衝突するので、ここではfoldにしている
(declaim (inline fold))
(defun fold (fn init seq)
  (let ((acc init))
    (each (lambda (x)
            (setf acc (funcall fn acc x)))
          seq)
    acc))

;; シーケンスを集めたリストを返す
(declaim (inline collect))
(defun collect (seq)
  (nreverse (fold (lambda (acc x) (cons x acc))
                    '()
                    seq)))

;; 実行
> (collect (from 1 :to 20 :by 3))
=> (1 4 7 10 13 16 19)

; 合計値計算
> (fold (lambda (acc x) (+ acc x))
        0
        (from 1 :to 20 :by 3))
=> 70

mapとかfilterとかシーケンスを加工/制御する関数。

;; map関数
(declaim (inline map-seq))
(defun map-seq (map-fn seq)
  ;; ソースとなるシーケンスの情報を取得し、それをラップして返す
  (multiple-value-bind (next-fn end-fn call-fn) (funcall seq)
    (lambda ()
      (values next-fn  ; 値更新関数と終端判定関数はそのまま
              end-fn
              (lambda (body-fn)
                ;; 本体呼び出し前に、マップ処理用関数を差し込む
                (funcall call-fn (lambda (val) (funcall body-fn (funcall map-fn val)))))))))

;; filter関数: (funcall pred-fn val)がnilとなる要素をスキップする
(declaim (inline filter))
(defun filter (pred-fn seq)
  ;; ソースとなるシーケンスの情報を取得し、それをラップして返す
  (multiple-value-bind (next-fn end-fn call-fn) (funcall seq)
    (lambda ()
      (values next-fn ; 値更新関数と終端判定関数はそのまま
              end-fn
              (lambda (body-fn)
                ;; 本体呼び出し前に、フィルター処理用関数を差し込む
                (funcall call-fn (lambda (val)
                                   (unless (funcall pred-fn val)
                                     (funcall body-fn val)))))))))

;; 実行
; 二乗する
> (collect (map-seq (lambda (x) (* x x)) (from 1 :to 20)))
=> (1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361 400)

; 奇数の値だけフィルタして二乗する
> (collect (map-seq (lambda (x) (* x x)) (filter #'oddp (from 1 :to 20))))
=> (1 9 25 49 81 121 169 225 289 361)

これらの関数群を組み合わせてループ処理を表現すると、そこそこ良い感じのコードを生成してくれる。

;; 上で定義した関数群を用いたsum関数
;; - startからendの範囲の奇数値を-10した合計値を返す
(defun sum1 (start end)
 (declare (fixnum start end)
          (optimize (speed 3) (safety 0))
          (sb-ext:unmuffle-conditions sb-ext:compiler-note))
 (fold (lambda (acc n)
         (the fixnum (+ acc n)))
       0
       (map-seq (lambda (x) (- x 10)) 
                (filter #'oddp (from start :to end)))))

;; loopマクロを使用したsum関数
(defun sum2 (start end)
 (declare (fixnum start end)
          (optimize (speed 3) (safety 0))
          (sb-ext:unmuffle-conditions sb-ext:compiler-note))
 (loop WITH total fixnum = 0
       FOR i FROM start TO end
       WHEN (oddp i)
   DO (let ((n (- i 10)))
        (declare (fixnum n))
        (incf total n))
   FINALLY (return total)))

;; 一億要素に対するループ
> (time (sum1 1 100000000))
Evaluation took:
  0.134 seconds of real time  ; 0.134秒
  0.136009 seconds of total run time (0.136009 user, 0.000000 system)
  101.49% CPU
  267,335,373 processor cycles
  0 bytes consed
=> 2499999500000000

> (time (sum2 1 100000000))
Evaluation took:
  0.131 seconds of real time  ; 0.131秒
  0.132008 seconds of total run time (0.132008 user, 0.000000 system)
  100.76% CPU
  261,630,697 processor cycles
  0 bytes consed
=> 2499999500000000

;; disassemble結果
> (disassemble #'sum1)
; disassembly for SUM1
; 07AA1C28:       31D2             XOR EDX, EDX               ; no-arg-parsing entry point
;       2A:       EB1B             JMP L2
;       2C:       90               NOP
;       2D:       90               NOP
;       2E:       90               NOP
;       2F:       90               NOP
;       30: L0:   488BC1           MOV RAX, RCX
;       33:       488D1C4500000000 LEA RBX, [RAX*2]
;       3B:       4883E302         AND RBX, 2
;       3F:       4885DB           TEST RBX, RBX
;       42:       750E             JNE L3
;       44: L1:   48FFC1           INC RCX
;       47: L2:   4839F9           CMP RCX, RDI
;       4A:       7EE4             JLE L0
;       4C:       488BE5           MOV RSP, RBP
;       4F:       F8               CLC
;       50:       5D               POP RBP
;       51:       C3               RET
;       52: L3:   4883E80A         SUB RAX, 10
;       56:       48D1FA           SAR RDX, 1
;       59:       4801C2           ADD RDX, RAX
;       5C:       48D1E2           SHL RDX, 1
;       5F:       EBE3             JMP L1

> (disassemble #'sum2)
; disassembly for SUM2
; 07EF0DB8:       31D2             XOR EDX, EDX               ; no-arg-parsing entry point
;       BA:       EB2A             JMP L2
;       BC:       90               NOP
;       BD:       90               NOP
;       BE:       90               NOP
;       BF:       90               NOP
;       C0: L0:   488D044D00000000 LEA RAX, [RCX*2]
;       C8:       4883E002         AND RAX, 2
;       CC:       4885C0           TEST RAX, RAX
;       CF:       7412             JEQ L1
;       D1:       488D044D00000000 LEA RAX, [RCX*2]
;       D9:       488BD8           MOV RBX, RAX
;       DC:       4883EB14         SUB RBX, 20
;       E0:       4801DA           ADD RDX, RBX
;       E3: L1:   48FFC1           INC RCX
;       E6: L2:   4839F9           CMP RCX, RDI
;       E9:       7ED5             JLE L0
;       EB:       488BE5           MOV RSP, RBP
;       EE:       F8               CLC
;       EF:       5D               POP R

最後は複数シーケンスをまとめるzip関数。
これを使うと表現力はだいぶ上がるけど、性能は若干劣化する。

;; 二つのシーケンスをまとめる
(declaim (inline zip))
(defun zip (loop1 loop2 &aux (undef (gensym)))
  (multiple-value-bind (next-fn1 end-fn1 call-fn1) (funcall loop1)
    (multiple-value-bind (next-fn2 end-fn2 call-fn2) (funcall loop2)
      (let ((memo1 undef)
            (memo2 undef))
        (lambda ()
          (values (lambda () ; 値更新
                    (when (eq memo1 undef) (funcall next-fn1))
                    (when (eq memo2 undef) (funcall next-fn2)))

                  (lambda () ; 終端判定
                    (or (funcall end-fn1) (funcall end-fn2)))

                  (lambda (body-fn)  ; 本体呼び出し
                    ;; それぞれのシーケンスの次の値を取得する
                    ;; (次の値がfilterでスキップされた場合は memoX はundefのままになる)
                    (when (eq memo1 undef)
                      (funcall call-fn1 (lambda (val) (setf memo1 val))))

                    (when (eq memo2 undef)
                      (funcall call-fn2 (lambda (val) (setf memo2 val))))
    
                    ;; 両方のシーケンスの値が取得できたら、本体を呼び出す
                    (when (not (or (eq memo1 undef)
                                   (eq memo2 undef)))
                      (funcall fn (list memo1 memo2))  ; XXX: listで二つの値をまとめるのはconsingが発生するので効率が悪い (そのためloopパッケージでは、多引数を受け取るmapやfilterを用意している)
                      (setf memo1 undef
                            memo2 undef)))))))))

;; 実行
> (collect
    (zip (filter (lambda (n) (and (oddp n)  (zerop (mod n 3)))) (from 1))           ; 奇数かつ三の倍数
         (filter (lambda (n) (and (evenp n) (zerop (mod n 5)))) (from 1 :to 100)))) ; 偶数かつ五の倍数
=> ((3 10) (9 20) (15 30) (21 40) (27 50) (33 60) (39 70) (45 80) (51 90) (57 100))

zipはもう少し上手く実装したいところだけど、それでも関数型っぽく書いても実用上十分な性能がでるループ処理が実現できそうなことが分かったので、結構満足している。

*1:似てないかも

マージソート(3): 高階関数呼び出し最適化

マージソート(1)の改良版。
ソートのような高階関数では、引数で渡した比較関数の間接呼び出しのコストも実行速度にそれなりの影響を与えるので、それを(マクロをほとんど使わずに)できるだけ低く抑えるための試み。

比較関数最適化

まず、比較関数自体の実行コストを下げるために、汎用的な数値比較関数ではなく、より特殊化されたものを使用するようにする。

;; fixnum用の比較関数
(define-symbol-macro fixnum< (lambda (x y) (declare (fixnum x y)) (< x y)))

;;; fixnum< を使用した場合の処理時間
;;;
;;; 大量データのソート速度
;;; 100万要素のリストが対象
(sb-sys:without-gcing
 (let* ((data (loop REPEAT 1000000 COLLECT (random 10000000)))
        (d1 (copy-seq data))
        (d2 (copy-seq data))
        (r1 (time (stable-sort d1 fixnum<)))
        (r2 (time (merge-sort:sort d2 fixnum<))))
   (equal r1 r2)))

Evaluation took:
  1.366 seconds of real time  ; stable-sort# 1.366秒 (前回 2.484秒)
  1.360085 seconds of total run time (1.360085 user, 0.000000 system)
  99.56% CPU
  2,723,515,890 processor cycles
  0 bytes consed
  
Evaluation took:
  0.541 seconds of real time  ; merge-sort:sort# 0.541秒 (前回 1.662秒)
  0.540034 seconds of total run time (0.540034 user, 0.000000 system)
  99.82% CPU
  1,079,254,874 processor cycles
  0 bytes consed
  
--> T

後は、ここからどれだけ短縮できるか。

実装

今回のマージソート実装。
八割方は前と同じ。
まず、ほとんど変更がない前半部分から載せる。(変更箇所はコメントで記載)

(defpackage merge-sort
  (:use common-lisp)
  (:shadow :common-lisp sort)
  (:export sort))
(in-package :merge-sort)

;; inline-sort関数とsort-impl関数がinline宣言に加わっている。前者は今回新たに追加される関数
(declaim (inline halve merge-lists inline-sort sort-impl)  
         (optimize (speed 3) (debug 0) (safety 0)))

(defun halve (n)
  (declare (fixnum n))
  (multiple-value-bind (n1 x) (floor n 2)
    (values (+ n1 x) n1)))

(defmacro cdr! (list new-cdr)
  `(setf (cdr ,list) ,new-cdr))

(defmacro multiple-value-let* (bind-specs &body body)
  (if (null bind-specs)
      `(locally ,@body)
    (destructuring-bind ((vars exp) . rest) bind-specs
      `(multiple-value-bind ,vars ,exp
         (multiple-value-let* ,rest ,@body)))))
   
(defun merge-lists (list1 list2 test key)
  (declare (function test key))
  (labels ((less-equal-than (list1 list2)  ; 安定ソートになるように比較方法が若干修正されている
             (not (funcall test (funcall key (car list2)) (funcall key (car list1)))))
           (recur (head tail l1 l2)
             (cond ((null l1)               (cdr! tail l2) head)
                   ((null l2)               (cdr! tail l1) head)
                   ((less-equal-than l1 l2) (recur head (cdr! tail l1) (cdr l1) l2))
                   (t                       (recur head (cdr! tail l2) l1 (cdr l2))))))
    (declare (inline less-equal-than))
    (if (less-equal-than list1 list2)
        (recur list1 list1 (cdr list1) list2)
      (recur list2 list2 list1 (cdr list2)))))

次はsort-impl関数。
量は多くないけど、ここが一番重要な変更箇所。

;; 前回は、sort-impl関数自体で再帰処理を行っていたのを、
;; 再帰部分をrecur関数に括り出すように修正。
;;
;; これによって、sort-impl関数に対してinline宣言を行うことが可能になる。
;;
;; sort-impl関数がinline展開可能となると、
;; inline-sort関数(後述) => sort-impl関数 => merge-lists関数、の
;; 全てがinline展開されるようになるため、
;; コンパイラが(inline-sort関数の引数で渡され)merge-lists関数内でfuncallされている
;; testとkeyの情報を知ることができるようになり、間接呼び出しを除去する等といった
;; 最適化が可能となる(と思っている)。
(defun sort-impl (list size test key)
  (labels ((recur (list size)
             (declare (fixnum size))
             (if (= 1 size)
                 (values list (prog1 (cdr list) (cdr! list nil)))
               (multiple-value-let* (((size1 size2) (halve size))
                                     ((list1 rest) (recur list size1))
                                     ((list2 rest) (recur rest size2)))
                 (values (merge-lists list1 list2 test key) rest)))))
    (recur list size)))

最後はsort関数。
inline展開の有無を選択するための引数を追加してみた。
※ 単にsort関数をinline宣言するだけでも良いのだが、常に展開されるようになってしまうのも避けたかったの若干(無駄に)凝ってみた

;; inline引数を追加。これが真の場合は、inline展開される。
(defun sort (list test &key (key #'identity) inline)
  (declare (list list)
           (function test key)
           (ignore inline)
           (optimize (speed 3) (safety 2) (debug 2)))
  (when list
    (values (sort-impl list (length list) test key))))

;; sort関数のinline展開版。上でinline宣言されている以外は、sort関数と基本的に同様。
(defun inline-sort (list test &key (key #'identity))
  (declare (list list)
           (optimize (speed 3) (safety 0) (debug 0)))
  (when list
    (values (sort-impl list (length list) test key))))

;; sort関数のinline引数が真の場合に、(sort関数の代わりに)inline-sort関数を呼び出すためのコンパイラマクロ
(define-compiler-macro sort (&whole form list test &key (key '#'identity) inline)
  (if inline
      `(inline-sort ,list ,test :key ,key)
    form))

前回*1と比べて、本質的に異なるのは、sort-impl関数がinline展開可能になった、という点だけ。

計時

今回追加した関数(オプション)を加えて、再度計測。

;;; 大量データのソート速度
;;; 100万要素のリストが対象
(sb-sys:without-gcing
 (let* ((data (loop REPEAT 1000000 COLLECT (random 10000000)))
        (d1 (copy-seq data))
        (d2 (copy-seq data))
        (d3 (copy-seq data))
        (r1 (time (stable-sort d1 fixnum<)))
        (r2 (time (merge-sort:sort d2 fixnum<)))
        (r3 (time (merge-sort:sort d3 fixnum< :inline t)))) 
   (list (equal r1 r2)
         (equal r1 r3))))

Evaluation took:
  1.336 seconds of real time  ; stable-sort# 1.336秒
  1.332083 seconds of total run time (1.332083 user, 0.000000 system)
  99.70% CPU
  2,664,840,158 processor cycles
  0 bytes consed
  
Evaluation took:
  0.555 seconds of real time  ; merge-sort:sort# 0.555秒
  0.552034 seconds of total run time (0.552034 user, 0.000000 system)
  99.46% CPU
  1,107,829,062 processor cycles
  0 bytes consed
  
Evaluation took:
  0.382 seconds of real time  ; merge-sort:sort(inline)# 0.382秒
  0.376024 seconds of total run time (0.376024 user, 0.000000 system)
  98.43% CPU
  761,537,180 processor cycles
  0 bytes consed
  
--> (T T)


;;; 少量データのソート速度
;;; 平均50要素のリスト x 1万 が対象
(sb-sys:without-gcing
 (let* ((data (loop REPEAT 10000 
                    COLLECT (loop REPEAT (random 100) 
                                  COLLECT (random 10000))))
        (d1 (copy-tree data))
        (d2 (copy-tree data))
        (d3 (copy-tree data))
        (r1 (time (loop FOR d IN d1 COLLECT (stable-sort d fixnum<))))
        (r2 (time (loop FOR d IN d2 COLLECT (merge-sort:sort d fixnum<))))
        (r3 (time (loop FOR d IN d3 COLLECT (merge-sort:sort d fixnum< :inline t)))))
   (list (equal r1 r2)
         (equal r1 r3))))

Evaluation took:
  0.072 seconds of real time ; stable-sort# 0.072秒
  0.072004 seconds of total run time (0.072004 user, 0.000000 system)
  100.00% CPU
  144,958,896 processor cycles
  327,680 bytes consed
  
Evaluation took:
  0.058 seconds of real time  ; merge-sort:sort# 0.058秒
  0.056003 seconds of total run time (0.056003 user, 0.000000 system)
  96.55% CPU
  116,927,902 processor cycles
  163,840 bytes consed
  
Evaluation took:
  0.036 seconds of real time   ; merge-sort:sort(inline)# 0.036秒
  0.032002 seconds of total run time (0.032002 user, 0.000000 system)
  88.89% CPU
  72,255,454 processor cycles
  163,840 bytes consed
  
--> (T T)

今回のように比較関数自体の実行コストが低い場合だと、関数呼び出し(funcall)部分を含めてinline化するだけで、処理時間が2/3程度に削減できていることが分かる。

=関数よりもeql関数の方が速かった(間接呼び出し時)

以下、sbcl-1.0.51-x86-64-linuxでの実行結果。

;; 計時用関数
(defun compare-time (fn nums1 nums2)
  (declare (optimize (speed 3) (safety 0) (debug 0))
           (function fn))
  (time
    (loop FOR n1 fixnum IN nums1
          FOR n2 fixnum IN nums2
          WHEN (funcall fn n1 n2)
          SUM 1)))

;; fixnum用の=関数
(declaim (inline fixnum=))
(defun fixnum= (a b)
  (declare (fixnum a b)
           (optimize (speed 3) (safety 0) (debug 0)))
  (= a b))

;; データ
(defparameter *nums1* (loop REPEAT 10000000 COLLECT (random 1000000)))
(defparameter *nums2* (loop REPEAT 10000000 COLLECT (random 1000000)))
;;;; 比較
;; =関数
(compare-time #'= *nums1* *nums2*)
Evaluation took:
  1.312 seconds of real time
  1.300000 seconds of total run time (1.300000 user, 0.000000 system)
  99.09% CPU
  2,616,703,170 processor cycles
  0 bytes consed
==> 12

;; fixnum=関数
(compare-time #'fixnum= *nums1* *nums2*)
Evaluation took:
  0.367 seconds of real time
  0.350000 seconds of total run time (0.320000 user, 0.030000 system)
  95.37% CPU
  732,105,438 processor cycles
  0 bytes consed
==> 12

;; eql関数
(compare-time #'eql *nums1* *nums2*)
Evaluation took:
  0.202 seconds of real time
  0.190000 seconds of total run time (0.190000 user, 0.000000 system)
  94.06% CPU
  403,706,880 processor cycles
  0 bytes consed
==>

eql関数の方がだいぶ効率的。少し意外。


直接使用するなら=関数とeqlの間に差異はない。(fixnum=関数はなぜか遅い)

;; =
(time
 (locally
  (declare (optimize (speed 3) (safety 0) (debug 0)))
  (loop FOR n1 fixnum IN *nums1*
        FOR n2 fixnum IN *nums2*
        WHEN (= n1 n2)
        SUM 1)))
Evaluation took:
  0.074 seconds of real time
  0.080000 seconds of total run time (0.080000 user, 0.000000 system)
  108.11% CPU
  147,692,292 processor cycles
  0 bytes consed
==> 12

;; fixnum=
(time
 (locally
  (declare (optimize (speed 3) (safety 0) (debug 0)))
  (loop FOR n1 fixnum IN *nums1*
        FOR n2 fixnum IN *nums2*
        WHEN (fixnum= n1 n2)
        SUM 1)))
Evaluation took:
  0.299 seconds of real time
  0.300000 seconds of total run time (0.300000 user, 0.000000 system)
  100.33% CPU
  595,071,324 processor cycles
  0 bytes consed
==> 12

;; eql
(time
 (locally
  (declare (optimize (speed 3) (safety 0) (debug 0)))
  (loop FOR n1 fixnum IN *nums1*
        FOR n2 fixnum IN *nums2*
        WHEN (eql n1 n2)
        SUM 1)))
Evaluation took:
  0.076 seconds of real time
  0.070000 seconds of total run time (0.070000 user, 0.000000 system)
  92.11% CPU
  150,384,648 processor cycles
  0 bytes consed
==> 12

再帰関数(ハノイの塔)にinline宣言をつけたら・・・

Forthでハノイの塔 - sileの日記ハノイの塔を解くcommon lispプログラムを載せたら「inline宣言を付けるともっと早くなりますよ」というコメントを頂いたので試してみた。

inline宣言付与結果

再帰関数をinline展開する際の深さはsb-ext:*inline-expansion-limit*変数で制御できるらしい。
今回はデフォルトの設定値をそのまま利用した。

;; sbcl-1.0.49(linux/64bit)
> sb-ext:*inline-expansion-limit*
--> 200 

;; inline宣言
(declaim (inline hanoi-impl))

;; 前回定義したhanoi-impl関数とhanoi関数を再コンパイル 
;; (*inline-expansion-limit*を越えました、という長いメッセージが出力される)
;; ... 略 ...

;; 実行
> (time (hanoi 'a 'b 'c 25))
Evaluation took:
  0.271 seconds of real time   ; 約 2.7 秒
  0.270000 seconds of total run time (0.270000 user, 0.000000 system)
  99.63% CPU
  541,031,826 processor cycles
  0 bytes consed
  
--> (:COUNT 33554431)

確かに速くなっている。


ついでにhanoi関数にもinline宣言をつけてみた。

;; inline宣言
(declaim (inline hanoi))

;; hanoi関数を再コンパイル 
;; ... 略 ...

;; 実行
> (time (hanoi 'a 'b 'c 25))   ; 実行時にinline展開される
;; (*inline-expansion-limit*を越えました、という長いメッセージが出力される)
Evaluation took:
  0.169 seconds of real time  ; 約 0.17 秒
  0.170000 seconds of total run time (0.170000 user, 0.000000 system)
  100.59% CPU
  337,858,167 processor cycles
  0 bytes consed
  
--> (:COUNT 33554431)

hanoi関数を実行時に展開するようにしたら、C++と同じくらいに高速になった。


ただsbclのtime関数はinline関数の展開時間は所要時間に含めていないようで、少し不公平な感があるので、別の方法でも図ってみた。

;; 実行時のinline展開時間も含めて図ってみる
> (get-internal-real-time) (hanoi 'a 'b 'c 25) (get-internal-real-time) (- * ***)
--> 304455 ; 開始時間
--> (:COUNT 33554431)
--> 304730 ; 終了時間
--> 275    ; 終了 - 開始 = 0.275 秒

展開時間も含めると、特別速いということはなさそう。
それでもinline宣言を全く使用しない場合に比べるとだいぶ良い。

各言語でのハノイの塔の処理時間をまとめておく。

C++
gcc
Forth
gforth
Forth
VFX-Forth
common lisp
sbcl(宣言無)
common lisp
sbcl(hanoi-implをinline宣言)
common lisp
sbcl(hanoiとhanoi-implをinline宣言)
0.168s3.359s0.190s0.403s0.271s0.169s (0.275s*1 )

教訓

再帰関数でもinline展開すると速くなることがある。
覚えておこう。

*1:実行時のinline展開処理も含めて計測した場合の数値

構造体のスタックへの割り当て

SBCLマニュアル(ver 1.0.37)にも書いてあることだけど ... 。


通常は構造体のインスタンスを作成するとヒープ上にそのための領域が割り当てられる。
これには(おそらく)ヒープ割り当て+GC処理のコストが伴うので、構造体としてまとめるよりも、その各フィールドを個別に扱った方が、パフォーマンス的には有利だったりする。

;; sbcl-1.0.40
;;
;; 構造体のインスタンス作成のコストを測るためプログラム

;; 範囲を表す構造体
(defstruct range
  start 
  end)

;; 範囲をランダムに複数生成し、それらの距離の合計値を求める
;; 1) 構造体を使う場合
(time 
 (let ((total 0))
   (loop REPEAT 1000000
     DO
     (let ((r (make-range :start (random 100) :end (random 100))))
       (incf total (- (range-end r) (range-start r)))))
   total))
Evaluation took:
  0.076 seconds of real time
  0.068005 seconds of total run time (0.064004 user, 0.004001 system)
  [ Run times consist of 0.016 seconds GC time, and 0.053 seconds non-GC time. ]
  89.47% CPU
  152,191,506 processor cycles
  15,998,952 bytes consed        ; 約15MBのコンスが発生
--> -27532

;; 2) 構造体を使わない場合
(time 
 (let ((total 0))
   (loop REPEAT 1000000
     DO
     (let ((start (random 100))
           (end   (random 100)))
       (incf total (- end start))))
   total))

Evaluation took:
  0.037 seconds of real time
  0.036002 seconds of total run time (0.036002 user, 0.000000 system)
  97.30% CPU
  73,238,322 processor cycles
  0 bytes consed                ; コンスなし
--> -6527

ただし、以下の二つの条件を満たせば、インスタンスがヒープではなくスタックに割り当てられるようになる。

  1. 構造体定義の前にコンストラクタ関数をインライン宣言する
  2. 作成される構造体のインスタンスに対してdynamic-extent宣言を行う
;; 3) 構造体を使う場合(スタック割り当て版)
(declaim (inline make-range))
(defstruct range
  start 
  end)

(time 
 (let ((total 0))
   (loop REPEAT 100000
     DO
     (let ((r (make-range :start (random 100) :end (random 100))))
       (declare (dynamic-extent r))
       (incf total (- (range-end r) (range-start r)))))
   total))
Evaluation took:
  0.005 seconds of real time
  0.004000 seconds of total run time (0.004000 user, 0.000000 system)
  80.00% CPU
  10,287,186 processor cycles
  0 bytes consed               ; スタックへの割り当てなのでコンスは発生しない
--> -15697

これでインスタンス生成のコストをほとんど気にせず構造体を使うことができる*1

*1:もちろんextentがdynamicで良い場合だけだけど ...

引数の型チェックの有無を使用者に選択させる(sbcl)

関数を書いていると、引数の型チェックを有効にするかどうかで悩むことがたまにある。

;;;; sbcl-1.0.37

;; 足し算を行う関数
;; 引数が適切な前提としている
(defun plus-impl (x y)
  (declare ((integer 0 1000) x y)
           (optimize (speed 3) (safety 0)))
  (+ x y))

(plus-impl 1 2)  ; 正しい引数
--> 3

(plus-impl 1 'a) ; 不正な引数
--> (0)          ; どんな処理が行われるかは不定。この場合は不正な結果が返されている。

上の例の場合は、型チェック(安全性)よりも実行速度を優先している。
この関数に渡される引数が常に適切であることが確信できるならばこれでも特に問題はないが、二番目の実行例のように想定外の引数が渡される可能性があるなら、何らかのチェックを追加する必要が出てくる。

;; ユーザ(?)が実際に使用する関数
;;  - インライン
;;  - 引数の型宣言のみで、最適化宣言はない
(declaim (inline plus))
(defun plus (x y)
  (declare ((integer 0 1000) x y))
  (plus-impl x y))  ; 実際の処理はplus-impl関数に任せる

(plus 1 2)  ; 正しい引数
--> 3

(plus 1 'a) ; 不正な引数
;; ↓ちゃんとチェックして、適切なエラーを出してくれる
debugger invoked on a TYPE-ERROR in thread #<THREAD "initial thread" RUNNING
                                             {A9F2831}>:
  The value A is not of type (MOD 1001).  ; 型が違う
Type HELP for debugger help, or (SB-EXT:QUIT) to exit from SBCL.
restarts (invokable by number or by possibly-abbreviated name):
  0: [ABORT] Exit debugger, returning to top level.
(PLUS 1 A)[:EXTERNAL]

plus関数のように引数の型が宣言されており、かつsafetyが0ではない場合は、実行時(関数呼び出し時)に引数の型がチェックされるので、plus-impl関数の時のような問題はおこらなくなる*1
ただし当たり前のことだが、この場合、型のチェックという余分の処理が加わるので、実行速度自体は遅くなってしまう(はず)


理想的には、関数の使用者が(その使用状況によって)実行速度を優先するか、それとも安全性を優先するかを選択できるのが望ましい*2
で、そのために上でplus関数につけているようなインライン宣言が使えないか、というのが今回の趣旨。
関数にインライン宣言がついている場合、その関数の本体のコードは、展開先の最適化宣言の影響を受けることになるので、関数の呼出側がsafetyのレベルを操作することで、型チェックの有無も制御できる。

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; 引数の型チェックを行いたい場合
(defun 2* (x)
  (declare (optimize (safety 1)))  ; 最適化宣言そのものを省略してしまっても良い
  (plus x x))

(2* 10)  ; 適切な引数
--> 20

(2* 'a)  ; 不正な引数
debugger invoked on a TYPE-ERROR in thread #<THREAD "initial thread" RUNNING {A9F2831}>:
  The value A is not of type (MOD 1001).  ; 不正な引数

(disassemble #'2*)  
;; いろいろチェックしている
; disassembly for 2*
; 0B5EE969:       F7C603000000     TEST ESI, 3                ; no-arg-parsing entry point
;       6F:       7525             JNE L0
;       71:       8BC6             MOV EAX, ESI
;       73:       83F800           CMP EAX, 0
;       76:       7C1E             JL L0
;       78:       8BC6             MOV EAX, ESI
;       7A:       3DA00F0000       CMP EAX, 4000
;       7F:       7F15             JNLE L0
;       81:       8BD6             MOV EDX, ESI
;       83:       8BFE             MOV EDI, ESI
;       85:       8B0538E95E0B     MOV EAX, [#xB5EE938]       ; #<FDEFINITION object for PLUS-IMPL>
;       8B:       B908000000       MOV ECX, 8
;       90:       FF7504           PUSH DWORD PTR [EBP+4]
;       93:       FF6005           JMP DWORD PTR [EAX+5]
;       96: L0:   8B053CE95E0B     MOV EAX, [#xB5EE93C]       ; '(MOD 1001)
;       9C:       CC0A             BREAK 10                   ; error trap
;       9E:       05               BYTE #X05
;       9F:       1F               BYTE #X1F                  ; OBJECT-NOT-TYPE-ERROR
;       A0:       FE9001           BYTE #XFE, #X90, #X01      ; ESI
;       A3:       10               BYTE #X10                  ; EAX
;       A4:       CC0A             BREAK 10                   ; error trap
;       A6:       02               BYTE #X02
;       A7:       18               BYTE #X18                  ; INVALID-ARG-COUNT-ERROR
;       A8:       4F               BYTE #X4F                  ; ECX


;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; 引数の型チェックがいらない(外したい)場合
(defun 2* (x)
  (declare (optimize (safety 0)))  ; safetyを0に
  (plus x x))

(2* 10)  ; 適切な引数
--> 20

(2* 'a)  ; 不正な引数
--> #<unknown immediate object, lowtag=#b110, widetag=#xE {1635750E}>  ; よく分からないけど、不正な値が返ってきた

(disassemble #'2*)
;; 引数を設定した後、plus-impl関数にジャンプしているだけ(?)
; disassembly for 2*
; 0B5BC24C:       8BD6             MOV EDX, ESI               ; no-arg-parsing entry point
;       4E:       8BFE             MOV EDI, ESI
;       50:       8B0520C25B0B     MOV EAX, [#xB5BC220]       ; #<FDEFINITION object for PLUS-IMPL>
;       56:       B908000000       MOV ECX, 8
;       5B:       FF7504           PUSH DWORD PTR [EBP+4]
;       5E:       FF6005           JMP DWORD PTR [EAX+5]

使用者側は、呼び出す関数を変える必要もなく、最適化宣言を状況に応じて変更するだけで良いので、少し便利。
まあ、型チェックのコストくらい気にせず、(パッケージ外にエクスポートする関数)全部につけてしまっても良いような気もするけど。

*1:ちなみに、plus関数の定義から引数の型宣言を外すと、受け取った引数をそのままplus-impl関数に渡すことしかしないので、plus-impl関数をそのまま使うのと同じ問題が起こる

*2:その点、HaskellOCamlなどでは、コンパイル時の型チェックによって、安全性と実行速度を両立させることができる(型チェックのために実行速度を犠牲にせずに済む)ので、羨ましい。

sbclで文字列を効率的に扱う場合の型

あまり話題に関連性がないが、一応前々回の続き。


sbcl(1.0.37)での文字列関連の型の階層は以下のようになっている*1


僕は最近までsimple-string型*2が文字列関連型の最下層だと思い込んでいたので、文字列を扱う部分のプログラムを高速化したい場合、(declare (simple-string ...))などを宣言していた。
しかし、上の図を見るとsimple-string型は、simple-base-string型とsimple-character-string型*3の基底型となっている。
実際、変数の型が事前に明らかな場合は、simple-string型の二つの副型を指定した方がsimple-string型を指定するより高速である。

;;;;;;;;;;
;;;; 準備
;;; 型準備
; simple-character-stringは、クラス名として存在しても型名ではないので、等価なものを定義しておく
(deftype simple-character-string () '(simple-array character *))  

;;; データ準備
;; 一万要素のランダムな文字列を作成する
;; ※base-string用のサンプルデータと合わせるために、個々の文字の値の上限が#xFFに収まるようにする
(defvar *sample-string* 
  (coerce (loop REPEAT 10000 COLLECT (code-char (random #x80))) 'string))
;; *sample-string*をbase-stringに変換する
(defvar *sample-base-string*
  (coerce *sample-string* 'simple-base-string))

(type-of *sample-string*)
--> (SIMPLE-ARRAY CHARACTER (10000))
(type-of *sample-base-string*)
--> (SIMPLE-BASE-STRING 10000)
(subtypep * **)
--> NIL  ; *sample-string*の型と*sample-base-string*の型は独立している
    T

;;; 関数準備
;; 文字列の各文字の値の合計値を求める関数を定義するマクロ
;; string-type引数で、処理対象の文字列の型を指定できる
(defmacro def-charcode-sum (fn-name string-type)
  `(defun ,fn-name (string &aux (sum 0))
     (declare (optimize (speed 3) (safety 0))
              (,string-type string)
              (fixnum sum))
     (loop FOR char ACROSS string 
           FOR code = (char-code char)
           DO (incf sum code))
     sum))

;; 関数定義
(def-charcode-sum charcode-sum1 string)
(def-charcode-sum charcode-sum2 simple-string)
(def-charcode-sum charcode-sum3 simple-character-string)
(def-charcode-sum charcode-sum4 simple-base-string)


;;;;;;;;;
;;;; 実行
(charcode-sum1 *sample-string*)
--> 637886

(charcode-sum1 *sample-base-string*)
--> 637886

(charcode-sum3 *sample-base-string*)
--> -5604664  ; simple-character-string型用の関数をsimple-base-string型の引数に適用すると結果が不正になる

(= (charcode-sum1 *sample-string*) (charcode-sum1 *sample-base-string*)
   (charcode-sum2 *sample-string*) (charcode-sum2 *sample-base-string*)
   (charcode-sum3 *sample-string*) (charcode-sum4 *sample-base-string*))
--> T

;;; 型宣言ごとの処理時間
(time  ; string型と宣言されている関数をsimple-character-stringに適用した場合
  (dotimes (i 10000 'done) 
    (charcode-sum1 *sample-string*)))
Evaluation took:
  0.594 seconds of real time  ; 0.594秒
  0.592037 seconds of total run time (0.592037 user, 0.000000 system)
  99.66% CPU
  1,883,232,728 processor cycles
  0 bytes consed
--> DONE

(time  ; simple-string型と宣言されている関数をsimple-character-stringに適用した場合
  (dotimes (i 10000 'done) 
    (charcode-sum2 *sample-string*)))
Evaluation took:
  0.284 seconds of real time  ; 0.284秒
  0.284018 seconds of total run time (0.284018 user, 0.000000 system)
  100.00% CPU
  901,336,060 processor cycles
  0 bytes consed
--> DONE

(time  ; simple-character-string型と宣言されている関数をsimple-character-stringに適用した場合
  (dotimes (i 10000 'done) 
    (charcode-sum3 *sample-string*)))
Evaluation took:
  0.096 seconds of real time  ; 0.096秒
  0.096006 seconds of total run time (0.096006 user, 0.000000 system)
  100.00% CPU
  305,298,289 processor cycles
  0 bytes consed
--> DONE

(time  ; string型と宣言されている関数をsimple-base-stringに適用した場合
  (dotimes (i 10000 'done) 
    (charcode-sum1 *sample-base-string*)))
Evaluation took:
  0.597 seconds of real time  ; 0.597秒
  0.596038 seconds of total run time (0.596038 user, 0.000000 system)
  99.83% CPU
  1,894,275,594 processor cycles
  0 bytes consed
--> DONE

(time  ; simple-string型と宣言されている関数をsimple-base-stringに適用した場合
  (dotimes (i 10000 'done) 
    (charcode-sum2 *sample-base-string*)))
Evaluation took:
  0.208 seconds of real time  ; 0.208秒
  0.204013 seconds of total run time (0.204013 user, 0.000000 system)
  98.08% CPU
  657,645,081 processor cycles
  0 bytes consed
--> DONE

(time  ; simple-base-string型と宣言されている関数をsimple-base-stringに適用した場合
  (dotimes (i 10000 'done) 
    (charcode-sum4 *sample-base-string*)))
Evaluation took:
  0.125 seconds of real time  ; 0.125秒
  0.124008 seconds of total run time (0.124008 user, 0.000000 system)
  99.20% CPU
  393,795,273 processor cycles
  0 bytes consed
--> DONE

;;; おまけ
;;; simpleではない文字列に対する処理時間
;; displaced
(let ((str (make-array (length *sample-string*) 
                       :element-type 'character 
                       :displaced-to *sample-string*
                       :displaced-index-offset 0)))
  (time  
   (dotimes (i 10000 'done) 
     (charcode-sum1 str))))  ; simpleではないので、適用可能なのはcharcode-sum1のみ
(AND (VECTOR CHARACTER 10000) (NOT SIMPLE-ARRAY))  ; 型
Evaluation took:
  1.644 seconds of real time  ; 1.644秒
  1.644102 seconds of total run time (1.640102 user, 0.004000 system)
  100.00% CPU
  5,215,281,747 processor cycles
  0 bytes consed
--> DONE

;; fill-pointer
(let ((str (make-array (length *sample-string*) 
                       :element-type 'character
                       :fill-pointer (length *sample-string*)
                       :initial-contents *sample-string*)))
  (print (type-of str))
  (time  
   (dotimes (i 10000 'done) 
     (charcode-sum1 str))))
(AND (VECTOR CHARACTER 10000) (NOT SIMPLE-ARRAY)) ; 型
Evaluation took:
  0.871 seconds of real time  ; 0.871秒
  0.872054 seconds of total run time (0.872054 user, 0.000000 system)
  100.11% CPU
  2,762,532,161 processor cycles
  0 bytes consed
--> DONE

このため、最近は処理速度を重視する場合には、(simple-array character *)と宣言するようにしている。
ただその場合は、型が異なると処理結果が不正となってしまう*4ので、対象となる値の型が確実に(simple-array character *)となるように気をつけなければならない*5
igoやcreoleではそのために「文字列引数の型をチェックして(simple-array character *)ならそのまま、それ以外なら(simple-array character *)型になるように変換処理を施す」というようなことを行っている。


処理速度を考慮して文字列を扱う場合は、こういった型周りのことも考える必要がある。
しかし前々回も書いたように、こういったことを文字列処理を行う関数・ライブラリを各たびに意識しなければならないのは結構面倒。
なので、前々回に言及したJavaのStringライクなcommon lispの型(実際には構造体とその操作関数をまとめたパッケージ)で、文字列型に関する処理もまとめて行ってしまおう、というところで次に続く。
前置きが随分長くなってしまったが、次回が実装編。

*1:クラス間の階層関係はsb-mop:class-direct-subclasses及びsb-mop:class-direct-superclassesを用いて取得

*2:上の図に当てはめるなら本当はクラスと呼ぶのが正しいと思うが、今回の場合あまり厳密に区別する意味もないので"型"で統一することにする

*3: (simple-array character *)

*4:最悪のケースでは、プログラムが異常終了する

*5:全て均一に(simple-array character *)と宣言するのではなく「typecaseなどで分岐して各型ごとに処理を分ける」という方法もある。ただし、それはそれでいろいろと面倒なので、今のところ採用したことはない。もしかしたらそっちの方が良かったりして...