読者です 読者をやめる 読者になる 読者になる

DoubleArray(2): 挿入速度改善

common lisp algorithm

DoubleArrayシリーズ(?)の続き。
今回は、insert関数の処理速度を改善する。

改善点

前回の実装では、insert関数の処理速度がもの凄く遅かったが、その原因はinsert関数の中(の関数の中)で呼び出されているx-check関数にある。


この関数は、簡単に云えば(要素の挿入のために)「CHECK配列*1の中から空いている(=利用可能な)場所(index)を探す」というようなことを行っているのだが、前回の実装では、CHECK配列の0番目から順番にループして空き場所を探しているだけであった。


挿入処理を重ねると、CHECK配列の前半部分は、ほとんど全て使用済みの状態となるので、0番目から逐次空き場所を探した場合、その大半を無駄なループに費やすことになってしまう。


そのため今回は、このx-check関数を、CHECK配列内での空き場所(index)を管理するリンクリストを利用する実装に変更する。 ※利用可能な場所(index)だけをリストに保持しておくことで、使用済の場所(index)を走査せずに済むようにする。


ちなみに、このリンクリストを使う方法は(前回のDoubleArray実装の元となった)『An Efficient Implementation of Trie Structures』*2にも書かれている(というよりは、この論文のアイデアを参考に今回の実装が作られた)。
ただし、上の論文内で示唆されている実装方法と、今回採用(?)した実装方法は(詳述は避けるが)結構異なっている。

実装

【utility】

まずは、今回の実装が依存するutility関数を定義。定番のアナフォリックマクロはそのうちどこかでまとめて定義しておきたい。
nletも参照。

(defmacro a.when (expr &body body)
  `(let (it ,expr)
      (when it
         ,@body)))

(defmacro a.mapl (expr list)        ; 簡易版
  `(mapl (lambda ($) ,expr) ,list))
【x-check関数+CHECK配列空き場所管理用構造体・関数 実装】

CHECK配列の空き場所(index)を管理するリンクリスト用の関数等は、packageにまとめる。

(defpackage :double-array-allocator
  (:use :cl)
  (:export :*memory*  ; 空き場所(index)管理用の構造体
           :init      ; 空き場所(index)管理用の構造体初期化
           :alloc     ; 場所(index)割り当て
           :free      ; 場所(index)解放
           :x-check)  ; x-check関数
  (:nicknames :allocator))
(in-package :double-array-allocator)


構造体、スペシャル変数

(defstruct (memory (:conc-name "MEM-"))
  availables             ; CHECK配列内の空き場所(index)保持したlist
  (size 0 :type fixnum)) ; 利用可能なindexの最大値+1。(/= (length availables) size)。名前は不適切?

(defvar *memory*)


internal関数

;;; fromからbelowまでの要素を持ったlistを作成する
(defun gen-list (from below)
  (loop FOR i FROM from BELOW below COLLECT i))

;;; x-check関数内で使われる関数
;;; availsの中に、relate-poses内の要素が全て含まれている(利用可能)かどうかを判定
;;;   判定には相対位置(base1,base2)を用いる。
;;;   cf #等価?#. (every (lambda (x) (find (+ x (- (car avails) (car relate-poses))) avails)) relate-poses)
;;;
;;; 【変数】
;;; relate-poses: 使用したいindex(相対位置)のリスト(昇順ソート)
;;; avails:       利用可能なindexのリスト(昇順ソート)
(defun allocable? (relate-poses avails)
  (let ((base1 (car relate-poses))
        (base2 (car avails)))
    '#1=(- (car as) base1)
    '#2=(- (car bs) base2)
    (nlet self ((as (cdr relate-poses)) (bs (cdr avails)))
      (cond ((null as)   (- base2 base1))          ; 返り値
            ((null bs)   nil)
            ((> #1# #2#) (self as       (cdr bs)))
            ((= #1# #2#) (self (cdr as) (cdr bs)))
            (t           nil)))))


external関数

;;; memory構造体初期化
(defun init (size)
  ;; 下で定義するallocとfreeの実装を簡単にするために、availablesにはダミー要素を先頭に加える
  (make-memory :availables (cons :head (gen-list 0 size))
               :size size))


;;; addressで指定されたindexを割り当てる(= mem-availablesから除外する)
;;; XXX: 重複して同じaddressをallocした場合のチェックが未実装
(defun alloc (address &optional (mem *memory*))
  (if (>= address (mem-size mem))
      ;; mem-sizeよりもaddressが大きい場合は、availablesを拡張して、再度allocを呼ぶ
      (let ((last (last (mem-availables mem)))
            (next-size (* 2 (mem-size mem))))
        (setf (cdr last) (gen-list (mem-size mem) next-size)
              (mem-size mem) next-size)
        (alloc address mem))
    ;; availables内から、addressを探して取り除く
    ;; (setf #1=(cdr (mem-availables mem)) (delete address #1# :count 1))でも良いが、何故か遅かったので不採用
    (let ((head (mem-availables mem)))
      (nlet self ((cur (cdr head)) (prev head))
        (if (= (car cur) address)
            (setf (cdr prev) (cdr cur)) ; allocate = address除去
          (self (cdr cur) cur))))))

;;; addressで指定されたindexを利用可能にする(= mem-availablesに追加する)
;;;   挿入ソートと類似の処理を行っている。
;;; XXX: 重複して同じaddressをfreeした場合のチェックが未実装
(defun free (address &optional (mem *memory*))
  (let ((head (mem-availables mem)))
    (nlet self ((cur (cdr head)) (prev head))
      (if (or (null cur) 
              (< address (car cur)))
          (setf (cdr prev) (cons address cur))
        (self (cdr cur) cur)))))


;;; x-check関数
(defun x-check (set &optional (mem *memory*))
  ;; setをソートする
  (when (cdr set)
    (setf set (sort (copy-list set) #'<)))
  
  ;; 利用可能な場所(index)を取得する
  (a.mapl
   (a.when (and (> (car $) (car set))
                (allocable? set $))
     (return-from x-check it))      ; 利用可能なindexが見つかったのでreturn
   (cdr (mem-availables mem)))
  
  ;; availables内に利用可能なindexがない場合は、availablesを拡張して再度x-checkを呼ぶ
  (let ((last (last (mem-availables mem))))
    (setf (cdr last) (gen-list #1=(mem-size mem) (* #1# 2))
          #1# (* #1# 2))
    (x-check set mem)))

これで、新しいx-check用の関数群(+その他)の定義は終了。

次は、DoubleArrayのソースコードを修正。

DoubleArray: 既存コード修正

まずは、x-check関数を修正。

(defun x-check (set)
  (let ((x (allocator:x-check set)))
    ;; 範囲外アクセスエラーを防ぐために、ここでアクセスしておく
    ;; ※ 最終的にはここでアクセスしなくてもいいように、他の箇所を修正する
    @*chck*?(+ x (apply #'max set))   
    x))


後は、CHECK配列を操作(値の代入・削除)する時に、上で定義したリンクリスト(memory構造体)も操作して、整合性を取るようにする。

(defun set-node (code prev x &aux (next (+ x code)))
  (allocator:alloc next)   ; 追加
  (setf @*base*#prev x
        @*chck*#next prev)
  next)

(defun set-check-and-insert-tail (prev node i)
  (allocator:alloc node)   ; 追加
  (setf @*chck*?node prev)
  (insert-tail node i))

(defun modify-nodes (current node codes &optional c &aux (old-base @*base*#node))
  (let ((new-base (x-check (if c (cons c codes) codes))))
    (setf @*base*#node new-base)

    (dolist (code codes)
      (let ((old (+ old-base code))
            (new (+ new-base code)))
        (shiftf @*base*?new @*base*#old NULL)
        (shiftf @*chck*?new @*chck*#old NULL)
        (allocator:free  old) ; 追加
        (allocator:alloc new) ; 追加
        
        (when (plusp @*base*#new)
          (setf *chck*
                (nsubstitute new old *chck*
                             :start #|(1+|# @*base*#new ;)
                             :end (min (length *chck*) (+ @*base*#new MAX-CODE)))))

        (when (and (/= current node) (= current old))
          (setf current new)))))
  current)

これで、今回の修正は全て終了。

比較

新旧のx-check関数で処理速度を比較する。
比較方法は、5万個の単語を順にDoubleArrayに挿入し、掛かった処理時間を比べるというもの。
テストコードは以下の通り。

(defmacro a.while (expr &body body) 
  `(let (it)
     (loop while (setf it ,expr)
       ,@body)))  

;; 実行
(let ((da (make-da))
      (allocator:*memory* (allocator:init 100)))
  (time
   (with-open-file (in "/path/to/words")
      (a.while (read-line in nil nil)
        (handler-bind ((SB-IMPL::OCTETS-ENCODING-ERROR 
                         (lambda (c) (use-value #\? c))))
          (insert it da))))))


結果。※なお、旧x-check関数の結果は、『DoubleArray(1.5) - 文字エンコーディングの違いによる速度・サイズ差』での結果をそのまま引用している。

;; 旧x-check関数
Evaluation took:
  984.957 seconds of real time
  984.789546 seconds of total run time (981.905365 user, 2.884181 system)
  [ Run times consist of 42.770 seconds GC time, and 942.020 seconds non-GC time. ]
  99.98% CPU
  3,124,717,944,534 processor cycles
  276,843,319,608 bytes consed

;; 新x-check関数
Evaluation took:
  0.715 seconds of real time
  0.716045 seconds of total run time (0.700044 user, 0.016001 system)
  [ Run times consist of 0.012 seconds GC time, and 0.705 seconds non-GC time. ]
  100.14% CPU
  2,270,351,601 processor cycles
  21,974,144 bytes consed

比べ物にならないほど、高速化している(新しい関数の方が1000倍以上速い)。
やはり、もともとのx-check関数はかなり遅かったようだ。


今回の作成したx-check関数(というかmemory構造体とその操作関数群)も、まだまだ改善の余地はあるが、これ以上劇的に速くなることはないと思うので、しばらくはこの実装を使用することにする。

*1:実際には、CHECK配列だけでなくBASE配列も関係しているが、文脈上この二つの配列はほとんど等価なので、CHECK配列についてだけ言及する

*2:Jun-ichi Aoe, katushi Morimoto and Takashi Satou : An Efficient Implementation of Trie Structures, Software Practice & Experience, Vol.22, No.9, pp.695-721, 1992.