読者です 読者をやめる 読者になる 読者になる

HAMT(Hash Array Mapped Trie)

common lisp algorithm

Ideal Hash Trees*1という論文を(必要なところだけ、だいたい)読み終わったので、そのメモ等。

概要

AMT(Array Mapped Trie)という基盤的なデータ構造を使って、ideal(nearly ideal)なHash Treesを作ろう、というような話。
AMTの応用例として、以下のようなものが説明されている。

  • Hash Array Mapped Trie(HAMT)
    • ハッシュマップ
      • 各種操作がO(1)
      • ハッシュテーブルの初期サイズを(あまり)気にする必要がない
      • 要素が増えた場合のリサイズのコストが小さい*2
        • リサイズ不要な実装も可能だがその場合はO(log N)に。※ Nは要素数。今回の実装はこっち。
    • 成功検索時、キーの比較は一回しか生じない
    • Clojureの組み込みのハッシュマップはHAMTの亜種らしい*3
  • Partition Hashing
    • 外部ストレージにレコードを格納するようなハッシュの効率的(ディスクI/Oが少ない?)実装
  • IP Routing table
  • Class/Selector dispath table

とりあえず今回はHAMTの簡単版を実装する。

AMT(Array Mapped Trie)

トライの一種。
トライの各ノードは32bit*4のビットマップを有しており、値nに対応する子ノードが存在するかどうかは、ビットマップのn番目のビットが1かどうかで判断する。※ 0 <= n < 32
実際の子ノードは配列として保持するが、配列のサイズはその時に有効な(存在する)子ノード分だけ確保し、子ノードが増えたらその度にリサイズする。
値nに対応する子ノードの配列中での位置を取得する場合は、ビットマップのnより前にある1bitの数を数え、それを配列の添字とする。
以下は、その実装例。
:hamtパッケージは後でHAMTを実装する際に定義する。

(in-package :hamt)

;;;;;;;;;;;;;
;;;; 補助関数
;; count population
;; bitmapのstartからendのビット位置にある1bitの数を数える
(defun ctpop (bitmap &key (start 0) (end 32))
  (logcount (ldb (byte (- end start) start) bitmap)))

;;;;;;;;;;;;;;;;;;;;
;; AMTのノード構造体
(defstruct amt-node
  (bitmap 0      :type (unsigned-byte 32))  ; 子ノードの有無を判定するためのビットマップ
  (entries   #() :type simple-vector))      ; 子ノードを格納する配列。HAMT用に名前はentriesとしている。

;; arcに対応する子ノード(entry※)が存在するかどうか
;; ※ HAMTではノードだけではなく、キーと値がペアとなった終端要素が格納されることもあるので、entryとしている
(defun valid-entry-p (node arc)
  (with-slots (bitmap) node
    (ldb-test (byte 1 arc) bitmap)))

;; arcに対応する子ノードを取得する
;; 子ノードが存在しない場合はnilを返す
(defun get-entry (node arc)
  (with-slots (bitmap entries) node
    (if (not (valid-entry-p node arc))
        nil
      (aref entries (ctpop bitmap :end arc)))))

;; arcに対応する子ノードを設定する
;; 返り値はnew-entry
(defun set-entry (node arc new-entry)
  (with-slots (bitmap entries) node
    (let ((new-entry-index (ctpop bitmap :end arc)))
      (unless (valid-entry-p node arc)
        ;; 新規に追加する場合
        (setf (ldb (byte 1 arc) bitmap) 1                           ; ビットマップを1に設定
              entries (adjust-array entries (1+ (length entries)))) ; 子ノードの配列を+1にリサイズ
        (loop FOR i FROM (1- (length entries)) DOWNTO (1+ new-entry-index)
          DO
          (setf (aref entries i) (aref entries (1- i))))) ; 新規追加用に、一つずつ子ノードを後ろにずらす
      (setf (aref entries new-entry-index) new-entry))))  ; 子ノード設定

;;;;;;;
;;;; 例
(defvar *n* (make-amt-node))
--> *N*

(set-entry *n* 17 :entry-17)  ; 出力を見やすくするために子ノードの代わりにシンボルを設定する
--> :ENTRY-17

(set-entry *n* 1 :entry-1) 
--> :ENTRY-1

*n*
--> #S(HAMT::AMT-NODE :BITMAP 131074 :ENTRIES #(:ENTRY-1 :ENTRY-17))

(format t "~b" 131074)
100000000000000010  ; 1番目と17番目のビットが1
--> NIL

HAMT(Hash Array Mapped Trie)

AMTを用いたハッシュマップ。
キーのハッシュ値(32bit整数)を求めて、その値の5bitずつをAMTのarcとして利用し、ノードを探索する。
※ ルートノードではハッシュ値の0〜4bitを使って、その子ノードでは5〜9bit、...、を終端(キーと値のペアが格納されたentry)に達するまで繰り返す
※ 論文では、ルートにはAMTのノードではなく通常のハッシュテーブルを使用している(かつ使用ビットを5bitに限定していない)が、今回の実装では簡潔性を優先して、全てAMTのノードとして扱う

;;;;;;;;;;;;;;;;;;;
;;;; パッケージ定義
(defpackage hamt
  (:use :common-lisp)
  (:shadow :common-lisp get set remove)
  (:export make
           get))
(in-package :hamt)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;; HAMTのキー関連の構造体/関数
;;;;  - キーのハッシュ値を算出し、それをarcのストリームとして扱う

;;; 定数定義
(defconstant +MAX-INT-LENGTH+ (integer-length most-positive-fixnum))  ; ハッシュ値の最大ビット数
(defconstant +PER-ARC-BIT-LENGTH+ 5)                                  ; arcのビット数

;;; arcストリーム構造体
(defstruct arc-stream
  (hash #'sxhash  :type function :read-only t)  ; ハッシュ関数
  (key  t         :type t        :read-only t)  ; キー
  (hash-code 0    :type fixnum)                 ; ハッシュ値
  (start 0        :type fixnum)                 ; arcのhash-code内での開始ビット
  (rehash-count 0 :type fixnum))                ; rehashした数

;;; インスタンス生成
(defun new-arc-stream (key &key (hash #'sxhash) (start 0) (rehash-count 0))
  (make-arc-stream :hash hash
                   :key key
                   ;; ハッシュ値を計算。(cons key rehash-count)にread-arc関数を参照
                   :hash-code (funcall hash (if (zerop rehash-count) key (cons key rehash-count)))
                   :start start
                   :rehash-count rehash-count))

;;; arcを一つ分読み込む
(defun read-arc (arc-stream)
  (with-slots (hash key hash-code start rehash-count) arc-stream
    (when (>= start +MAX-INT-LENGTH+)
      ;; ハッシュ値の全範囲を消費してしまった場合 (それでもまだ衝突が発生する場合)
      ;; rehash-countとkeyを組み合わせたconsをキーとして、再度ハッシュ値を算出する。
      ;; XXX(2010/09/27): (cons key (incf rehash-count))は、(sbcl-1.0.40では)文字列の場合は意味がなかった。例えば"あさ子"と"あえん"は同一のハッシュ値を生じるが、それにconsでrehash-countを付与しても、両者が異なるハッシュ値になることはなかった。
      (setf hash-code (funcall hash (cons key (incf rehash-count)))
            start 0))
    ;; startから+PER-ARC-BIT-LENGTH+だけ取り出す
    (prog1 (ldb (byte +PER-ARC-BIT-LENGTH+ start) hash-code)
      (incf start +PER-ARC-BIT-LENGTH+))))

;;;;;;;;;;
;;; 実行例
(loop WITH in = (new-arc-stream "キーサンプル")
      REPEAT 20
      COLLECT (read-arc in))
--> (11 18 29 14 11 5 3 30 0 29 4 7 23 29 0 29 4 7 7 29) ; 5bit値を延々と生成する
;;;;;;;;;
;;;; HAMT
;; HAMT構造体
(defstruct hamt
  (root (make-amt-node) :type amt-node)  ; ルートノード
  (test #'equal  :type function)         ; キーの等値性比較関数
  (hash #'sxhash :type function))        ; ハッシュ関数

;; ハッシュ関数と比較関数を渡して、hamtインスタンスを生成する
(defun make (&key (test #'equal) (hash #'sxhash))
  (make-hamt :test test :hash hash))

;; マップの要素
(defstruct key/value 
  key
  value)

;; キーに対応する値の取得
;; => (values 値 キーに対応する値が存在したかどうか)
(defun get (key hamt)
  (with-slots (root test hash) hamt
    ;; 探索ループ ※1
    (loop WITH node = root
          WITH in = (new-arc-stream key :hash hash)
          FOR arc = (read-arc in)
          FOR entry = (get-entry node arc)
      DO
      (typecase entry
        (null     (return (values nil nil)))  ; 対応する要素なし
        (amt-node (setf node entry))          ; 子ノードへ
        (key/value                            ; key/valueエントリを発見
         (return (if (funcall test key (key/value-key entry))  
                     (values (key/value-value entry) t)  ; キーと等しい
                   (values nil nil))))))))            ; キーと等しくない
#|
※1: 探索はキーのハッシュ値の全てのビットを消費せずとも良く、
   (nullなり終端エントリなり)キーを他の要素から区別し得るエントリに達した時点で終了する。
   
   ハッシュ関数が十分にバラツキあるハッシュ値を算出すると仮定すれば、
   キーを他の要素(挿入済みのキーと値のペア)から区別するには、
   前方から(log 要素数 2)個分のビットを参照すれば良い。

   read-arc関数はキーのハッシュ値から5bit単位でビットを読み込むので、
   計算量(探索ループの回数)は、O(/ (log 要素数 2) 5) => O(log 要素数)、となる。
|#

;; キーに対応する値を設定(要素の挿入)
(defsetf get (key hamt) (new-value)
  `(progn (set ,key ,new-value ,hamt)  ; set関数を呼ぶだけ
          ,new-value))

(defun set (key value hamt)
  (with-slots (root test hash) hamt
    (loop WITH node = root
          WITH in = (new-arc-stream key :hash hash)
          FOR arc = (read-arc in)
          FOR entry = (get-entry node arc)
      DO
      (typecase entry
        ;; 未使用のエントリに当たった場合は、key/valueエントリを設定する
        (null      (return (set-entry node arc (make-key/value :key key :value value)))) 
        (amt-node  (setf node entry))  ; 子ノードへ遷移
        (key/value 
         ;; 既にkey/valueエントリが存在する場合
         (return (if (funcall test key (key/value-key entry))
                     (setf (key/value-value entry) value)  ; キーが同じなら値を更新
                   ;; キーが異なる場合は、衝突を解消する必要有り
                   (resolve-collision (make-key/value :key key :value value)
                                      entry
                                      (arc-stream-start in)
                                      (arc-stream-rehash-count in)
                                      node
                                      arc
                                      hamt))))))))

;;; 以下二つは、resolve-collision関数用のユーティリティ
(defun symb (&rest args)
  (intern
   (with-output-to-string (out)
     (dolist (a args) (princ a out)))))

;; instance.field形式で構造体やクラスのスロットへのアクセスを可能にするためのマクロ
(defmacro with-fields ((&rest fields) instance &body body)
  `(with-slots ,(mapcar (lambda (f) `(,(symb instance"."f) ,f)) fields)
               ,instance
     ,@body))

;; key/valueエントリの衝突を解消する
;; 解消方法は以下の通り
;; 1] 既存のkey/valueエントリを新たに作成したamt-nodeエントリ(子ノード)で置換する
;; 2] 作成した子ノードに遷移し、二つのkey/valueエントリの衝突が起きないかをチェックする
;; 3] 衝突が起きる(= 二つのarcが等しい)場合は、そのarcに対応する子ノードを作成し、2から繰り返す
;; 4] 衝突が発生しない子ノードに達したら、二つのkey/valueエントリを格納する
(defun resolve-collision (kv1 kv2 in-start in-rehash-count node arc hamt)
  (with-slots (test hash) hamt
    (with-fields (key value) kv1
      (with-fields (key value) kv2
        (let ((in1 (new-arc-stream kv1.key :hash hash  ; 新規エントリ用のarcストリーム
                                           :start in-start 
                                           :rehash-count in-rehash-count))
              (in2 (new-arc-stream kv2.key :hash hash  ; 既存エントリ用のarcストリーム
                                           :start in-start 
                                           :rehash-count in-rehash-count)))
          (setf node (set-entry node arc (make-amt-node)))  ; 衝突が発生したkey/valueエントリは子ノード(amt-node)で置換
          (loop FOR a1 = (read-arc in1)
                FOR a2 = (read-arc in2)
                WHILE (= a1 a2)  ; 子ノードでも衝突が発生するかどうか
            DO 
            ;; 衝突する分だけ、子ノードを生成する
            (setf node (set-entry node a1 (make-amt-node)))
            
            FINALLY
            ;; 衝突が発生しないノードに達したら、二つのkey/valueエントリを設定する
            (set-entry node a1 kv1)
            (set-entry node a2 kv2)))))))
実行例
(in-package :common-lisp-user)

;; 作成
(defvar *h* (hamt:make))
--> *H*

*h*
--> #S(HAMT::HAMT
       :ROOT #S(HAMT::AMT-NODE :BITMAP 0 :ENTRIES #())
       :TEST #<FUNCTION EQUAL> :HASH #<FUNCTION SXHASH>)

;; 要素追加
(setf (hamt:get :key1 *h*) :value1)
--> :VALUE1

*h*
--> #S(HAMT::HAMT
       :ROOT #S(HAMT::AMT-NODE
                :BITMAP 2048
                :ENTRIES #(#S(HAMT::KEY/VALUE :KEY :KEY1 :VALUE :VALUE1)))
   :TEST #<FUNCTION EQUAL> :HASH #<FUNCTION SXHASH>)

;; 要素を10個追加 ※ (random 1000)に重複があれば10個未満
(loop REPEAT 10 DO (setf (hamt:get (random 1000) *h*) `(:rand ,(random 1000))))
--> NIL

*h*
--> #S(HAMT::HAMT
       :ROOT #S(HAMT::AMT-NODE
                :BITMAP 337152368
                :ENTRIES #(#S(HAMT::KEY/VALUE :KEY 957 :VALUE (:RAND 109))
                           #S(HAMT::KEY/VALUE :KEY 959 :VALUE (:RAND 503))
                           #S(HAMT::KEY/VALUE :KEY 792 :VALUE (:RAND 878))
                           #S(HAMT::AMT-NODE
                              :BITMAP 192
                              :ENTRIES #(#S(HAMT::KEY/VALUE :KEY 421 :VALUE (:RAND 112))
                                         #S(HAMT::KEY/VALUE :KEY 485 :VALUE (:RAND 798))))
                           #S(HAMT::KEY/VALUE :KEY :KEY1 :VALUE :VALUE1)
                           #S(HAMT::KEY/VALUE :KEY 970 :VALUE (:RAND 981))
                           #S(HAMT::KEY/VALUE :KEY 915 :VALUE (:RAND 639))
                           #S(HAMT::KEY/VALUE :KEY 157 :VALUE (:RAND 725))
                           #S(HAMT::KEY/VALUE :KEY 800 :VALUE (:RAND 297))
                           #S(HAMT::KEY/VALUE :KEY 141 :VALUE (:RAND 4))))
       :TEST #<FUNCTION EQUAL> :HASH #<FUNCTION SXHASH>)

;; 検索
(hamt:get :key1 *h*)
--> :VALUE1
    T

(hamt:get :key2 *h*)
--> NIL
    NIL

(hamt:get 421 *h*)
--> (:RAND 112)
    T

追記

上のソースコードの整理版をgithubv0.0.1として保存。

*1:Bagwell, P. (2001) Ideal Hash Trees. Technical Report, 2001

*2:「コストが小さい」というよりリサイズ処理を分散して償却することで、最悪の場合の処理コストを低く抑えることが可能、という方が正しい?

*3:上の論文で説明されているHAMTにpersistantな性質を加えたもの(?)

*4:必ずしも32bitである必要はない。int値を32bitで表現するアーキテクチャ上では32を使うのが一番自然、というだけ。