DoubleArray(1)
しばらくDoubleArrayでいろいろ試してみようと思っているので、今日はそのベースとなるソースコード(+覚え書き)を掲載。
『An Efficient Implementation of Trie Structures』*1を参考に実装した。
前置き
DoubleArrayの簡単(かつテキトウ)な説明。
- Trieの実装方法の一種
- BASEとCHECKという二つの配列を使う
- (一つの)配列での実装に比べて、スペース効率が良い
- リストでの実装に比べて、検索速度が速い
- ただ、挿入速度は若干遅い
各項目(+データ構造)についての詳細な説明は、上記論文(と多分他のサイト)に載っているので割愛する。
前置き2
とりあえず、最初なので効率云々は置いておいて、シンプルに実装する。
キーは文字列(正確にはoctet列?)限定で、set(集合)的な機能のみを持つ。
可能な操作は、挿入・検索・削除。
依存関数の定義は、nlet、a.whenを参照。
また、実験的(?)にここで定義している可変配列用リードマクロを使っている。
実装
定数・構造体・スペシャル変数定義
(defconstant EOS #x01) ; 今回の実装では、#x01終端の文字列(octets)を用いる (defconstant NULL #x00) ; BASEとCHECK配列の初期値 (defconstant MAX-CODE #xFF) ; 文字のコード表現の最大値 ;; DoubleArray構造体 (defstruct (double-array (:conc-name da-)) (base (make-array 32 :element-type 'fixnum :initial-element NULL)) (chck (make-array 32 :element-type 'fixnum :initial-element NULL)) (tail (make-array 32 :element-type '(unsigned-byte 8) :adjustable t :fill-pointer 1))) (defun make-da () (let ((da (make-double-array))) ;; BASE配列の1番目を1に設定。 ;; BASE,CHECK,TAILのいずれも0番目は未使用。 (setf @(da-base da)#1 1) da)) (defmethod print-object ((o double-array) stream) (print-unreadable-object (o stream :type t) (format stream "base:~A check:~A tail:~A" (length (da-base o)) (length (da-chck o)) (length (da-tail o))))) ;; special変数: (defvar *da*) ; double-array (defvar *octs*) ; 対象octet列 ;; 省略記法 (define-symbol-macro *chck* (da-chck *da*)) (define-symbol-macro *base* (da-base *da*)) (define-symbol-macro *tail* (da-tail *da*))
検索
補助関数定義
;; octet列をEOS終端にする (defun eos-terminated-octets (octs &aux (len (length octs))) (adjust-array octs (1+ len) :initial-element EOS)) ;; 文字列をoctet列に変換する(もともとoctet列ならそのまま) (defun make-octets (#1=string-or-octets) (if (stringp #1#) (string-to-octets #1#) #1#)) ;; 引数のoctetsと*tail*内のoctets列が一致するかをテスト (defun tail=(octs start tail-start &aux (len (length octs))) (do ((i start (1+ i)) (j tail-start (1+ j))) ((= i len) t) (if (/= @octs#i @*tail*#j) (return nil)))) ;; 文字のコード値とnode番号(index)を元に、次のnodeを取得する (defun get-next (code node) (+ @*base*#node code)) ;;;; メモ ;;;; ;; BASE[node]+code => next-node ;; CHECK[node] => prev-node ;; next_node=BASE[node]+code、として ;; node==CHECK[next_node]、が成り立つなら、その遷移は正しい。 ;; CHECK[node]==0、ならBASE[node]は未使用(使用可能) ;; BASE[node]< 0、なら-BASE[node]がTAILに格納されている残りの文字列の開始インデックス ;; ex. string="abcd" ※ うち"cd"がTAILに格納されている場合 ;; pos = get-next(98, get-next(97, 1)) ;; TAIL[pos]=99, TAIL[pos+1]=100, TAIL[pos+2]=1
検索(集合に含まれるかどうか)
;; target: 検索する文字列(or octet列) ;; *da*: DoubleArray構造体 ;; 集合内にtargetが存在した場合は、一番最後に用いたnode番号を返す(-BASE[node]=TAIL内でのインデックス)。 (defun member? (target *da*) (let* ((octets (eos-terminated-octets (make-octets target))) (limit (1- (length octets)))) (nlet self ((node 1) (i 0)) ;; node=1から始めて、get-nextを繰り返す (let ((next (get-next @octets#i node))) (and (= node @*chck*?next) ; 正しい遷移かチェック (cond ((plusp @*base*?next) (when (< i limit) (self next (1+ i)))) ; 次のnodeへ ;; ((zerop @*base*#next)) <- DoubleArrayが適切に構築されているなら不要なチェック。ただ、本当は入れておいた方が良い。 ((tail= octets (1+ i) (- @*base*#next)) ;; BASE[next]<0 かつ octets[i+1..]==TAIL[-BASE[next]..]の場合は、検索成功 next)))))))
削除
削除は簡単
;; 検索して、初期化 (defun %delete (target da) (a.when (member? target da) (setf @*base*#nodeit NULL @*chck*#it NULL)))
挿入
挿入が一番ややこしい。
まずは、メインループ。
(defun insert (target da) (let ((*da* da) (*octs* (eos-terminated-octets (make-octets target)))) (nlet self ((node 1) (i 0)) (let ((next (get-next @*octs*#i node))) ;; ノードnextは利用可能(CHECK[next]==0)なので、ここに残りの文字列を挿入する (cond ((zerop @*chck*?next) (set-check-and-insert-tail node next (1+ i))) ;; 正しい遷移先 ((= node @*chck*#next) ;; 次のノードへ (cond ((plusp @*base*?next) (self next (1+ i))) ;; ありえない分岐 -> member?ではコメントアウトしていたもの ((zerop @*base*#next) (error "error")) ;; targetは既に存在する ((tail= *octs* (1+ i) (- @*base*#next))) ;; 末端(BASE[node]<0)での衝突発生 ;; "bachelor"の後に、"badge"を挿入した場合のように、先頭を共有している文字列を挿入した場合にここに来る (t (tail-collision-case (1+ i) next)))) ;; 不正な遷移先(衝突発生) ;; 例えば、"bachelor"の後に、"baby"のような、先頭を共有して、かつ ;; "b"のように同じ文字列内に複数回出てくる文字がある文字列を挿入した時に、割合良くここ来る (t (collision-case i node next)))))))
BASE、CHECK、TAIL関連補助関数
;; BASE[prev]に値を書き込む ;; その際に、prev==CHECK[next]となるように、CHECKの値も更新する ;; 変数xは、冒頭に挙げた論文のX_CHECK関数に由来する (defun set-node (code prev x &aux (next (+ x code))) (setf @*base*#prev x @*chck*#next prev) next) ;; TAILへのインデックスを再設定する (defun reset-tail-pos (node new-tail-pos) (setf @*base*?node (- new-tail-pos))) ;; TAILの末尾に文字列(octets)を挿入する (defun insert-tail (node i) (setf @*base*?node (- (fill-pointer *tail*))) (loop for i from i to (1- (length *octs*)) do (vector-push-extend @*octs*#i *tail*))) ;; CHECKを正しく設定した後に、TAILの末尾に文字列(octets)を挿入する (defun set-check-and-insert-tail (prev node i) (setf @*chck*?node prev) (insert-tail node i))
衝突解消1
末端での、衝突を解消するケース。
まずは、補助関数。
;; set: 文字のコード値の集合。ex. '(97 98 99 100)。 ;; setに含まれる(=衝突が発生した)コード値の全てを、安全に挿入できるnode番号が返される。 ;; ※ 挿入時の一番のボトルネックはここ。次回以降に解消予定。 (defun x-check (set) (loop for i from 1 do (when (every (lambda (c) (zerop @*chck*?(+ c i))) set) (return-from x-check i)))) ;; 二つのoctetsの共通の接頭部分を返す ;; ※ どちらかが片方を完全に包含している場合には、範囲外アクセスエラーが発生する (defun common-prefixes (o1 start1 o2 start2) (do ((i start1 (1+ i)) (j start2 (1+ j))) ((/= @o1#i @o2#j) (coerce (subseq o1 start1 i) 'list))))
衝突解消関数
;; 共通部分のnodeを作成 ;; "abcd"と"abce"で衝突が発生した場合は、"abc"まではnodeを共有できる。 ;; なお、"a"は("abce"挿入時に)もともと存在していたので、ここで作成されるのは"bc"の部分。 (defun set-common-nodes (i node &aux (tail-pos (- @*base*#node))) (let* ((prs (common-prefixes *octs* i *tail* tail-pos)) (len (length prs))) (dolist (p prs) (setf node (set-node p node (x-check `(,p))))) (values (+ i len) node (+ tail-pos len)))) ;; (BASEの)末端での衝突解消 ;; 1] 共通のnode部分の処理(set-common-nodes) ;; 2] TAILの設定・再設定 (defun tail-collision-case (i node) (multiple-value-bind (i node tail-pos) (set-common-nodes i node) (let ((x (x-check (list @*octs*#i @*tail*#tail-pos)))) (reset-tail-pos (set-node @*tail*#tail-pos node x) (1+ tail-pos)) ; branch1 (insert-tail (set-node @*octs*#i node x) (1+ i))))) ; branch2
衝突解消2
末端以外での衝突を解消するケース。
補助関数。
;; nodeに関連する(nodeから可能な遷移先を持つ)文字のコード値を集めて、リストで返す。 ;; 末端以外のnodeを修正する時には、関連する遷移情報も合わせて修正する必要があるので、この関数が必要(多分)。 (defun correspond-codes (node &aux (base @*base*#node)) (loop for i from (1+ base) to (1- (min (length *chck*) (+ base MAX-CODE))) when (= node @*chck*#i) collect (- i base)))
衝突解消関数
(defun collision-case (i node next) ;; 1] 衝突が発生した二つのnodeの関連する(修正が必要な)コード値を集めて (let ((lst-a (correspond-codes node)) (lst-b (correspond-codes @*chck*#next))) ;; 2] 修正が少なくて済む方を修正して、衝突を解消 (if (< (1+ (length lst-a)) (length lst-b)) (setf node (modify-nodes node node lst-a @*octs*#i)) (setf node (modify-nodes node @*chck*#next lst-b))) ;; 3] 最後に、新しい文字列(insert関数のtarget引数)を挿入する (set-check-and-insert-tail node (get-next @*octs*#i node) (1+ i)))) ;; 実際にnodeの修正を行う関数 (defun modify-nodes (current node codes &optional c &aux (old-base @*base*#node)) (let ((new-base (x-check (if c (cons c codes) codes)))) ;; 1] 起点となるnodeを更新 (setf @*base*#node new-base) ;; 2] 修正が必要な各コード値に対して (dolist (code codes) (let ((old (+ old-base code)) (new (+ new-base code))) ;; 3] 古いnodeから新しいnodeへ値引き継ぎ and 古いnodeの初期化 (shiftf @*base*?new @*base*#old NULL) (shiftf @*chck*?new @*chck*#old NULL) ;; 4] 新しいnodeが終端ではないなら (when (plusp @*base*#new) (setf *chck* ;; 5] そのnodeに関連するCHECKを全て更新する (nsubstitute new old *chck* :start (1+ @*base*#new) :end (min (length *chck*) (+ @*base*#new MAX-CODE))))) ;; 6] 後、(上でlst-bを用いた場合は)衝突が発生したnode自体を初期化してしまうことがあるので、チェックして更新する (when (and (/= current node) (= current old)) (setf current new))))) current)
終了
何とか動くものはできたので、今日は終了。