読者です 読者をやめる 読者になる 読者になる

スキップリスト

common lisp algorithm

スキップリスト(Wikipedia)を実装してみた。

説明等は一切抜きでコードだけ。

(defpackage skiplist
  (:use :common-lisp)
  (:shadow :common-lisp get rem)
  (:export *p*
           make
           get
           rem))
(in-package :skiplist)

;;;;;;;;;
;;;; 宣言
(declaim (inline make-node new-node node-level decide-node-level))

;;;;;;;;;;;;;;;;;
;;;; スペシャル変数
;; スキップリストのレベルiにある特定のノード(要素)が、レベルi+1にも存在する確立
;; 個々のノードの作成時に、そのレベルを決定するのに用いられる
(defvar *p* 0.5)  

;;;;;;;;;;;;;;
;;;; ノード定義
;; スキップリストのノード(要素)
;; 基本的にはただの連結リスト
;;  - ただし、次のノードへのリンクを一つではなく、各レベルごとに有する点が異なる
;;  - nexts[0]だけを利用するなら、ただの(ソート済みの)連結リストとなる
(defstruct node
  key
  value
  (nexts #() :type simple-vector)) ; 次のノードへのリンク

;; レベル(の上限)を与えてノードを作成
(defun new-node (level &key key value)
  (make-node :nexts (make-array level :initial-element nil)
             :key   key
             :value value))

;; ノードのレベル(の上限)を取得
(defun node-level (node)
  (length (node-nexts node)))

;; レベルを指定して、次のリンク先ノードを取得する
(defmacro next-node (node level)
  `(aref (node-nexts ,node) (1- ,level)))

;;;;;;;;;;;;;;;;;;;;;
;;;; スキップリスト定義
(defstruct skiplist
  (max-level 1           :type fixnum)    ; 全体でのレベルの上限。簡単のためにインスタンス生成時に固定とする。
  (head-node (make-node) :type node)      ; 先頭ノード
  (test      #'<         :type function)) ; キーの比較関数。引数1が引数2より小さければ真を返す。

;; スキップリストに格納する(予定の)要素数をもとに、最適っぽい最大レベルを算出する
(defun calc-max-level (expected-entry-size)
  (ceiling (log expected-entry-size (/ 1 *p*))))

;; スキップリストに格納する(予定の)要素数とキーの比較関数を与えて、
;; スキップリストのインスタンスを生成する。
(defun make (expected-entry-size &key (test #'<))
  (let ((max-level (calc-max-level expected-entry-size)))
    (make-skiplist :max-level max-level
                   :head-node (new-node max-level)
                   :test    test)))

;; ノードのレベル(の上限)を決定して返す
(defun decide-node-level (skiplist)
  (with-slots (max-level) skiplist
    (do ((i 1 (1+ i)))
        ((or (= i max-level)
             (>= (random 1.0) *p*))
         i))))

;; キーの比較処理補助マクロ
;; 比較結果によって、:less、:greater、:equal、で指定された式を実行する
(defmacro keys-compare-case ((key1 key2 skiplist) &key less greater equal)
  `(with-slots ((#1=#:test test)) ,skiplist
     (cond ((funcall #1# ,key1 ,key2) ,less)
           ((funcall #1# ,key2 ,key1) ,greater)
           (t                         ,equal))))

;; 与えられたキーでノードの検索を行い、
;; 成功したら:suceed、失敗したら:fail、で指定された式を実行する。
;; 
;; 引数の一つ目のS式の各シンボルは、:suceed/:fail式評価時に、
;; 見つかったノード等が束縛される変数名
;;   - found-node: キーに対応するノード
;;   - prev-node: found-nodeの前方のノード(リンク元ノード)
;;   - path: 検索パス。探索したノードが、探索の逆順に格納されている
(defmacro find-node-case ((prev-node found-node path)
                          (skiplist level node key)
                          &key succeed fail)
  (let ((recur (gensym)) (lvl   (gensym))
        (cur   prev-node) (next  found-node))
    `(labels ((,recur (,lvl ,cur ,path)
                (declare (optimize (speed 3) (safety 0) (debug 0))
                         (skiplist ,skiplist) (fixnum ,lvl))
                (if (zerop ,lvl)
                    ,fail  ; 検索失敗
                  (let ((,next (next-node ,cur ,lvl)))
                    (if ,next
                        (keys-compare-case ((node-key ,next) ,key ,skiplist)
                          :equal      ,succeed  ; 検索成功
                          :less       (,recur ,lvl ,next ,path) ; 次のノードを検索
                          :greater #1=(,recur (1- ,lvl) ,cur (cons ,cur ,path))) ; レベルを下げて再試行
                      #1#))))) ; レベルを下げて再試行
       (,recur ,level ,node '())))) 

;; 検索関数実装
(defun get-impl (skiplist level node key)
  (find-node-case (prev found path) (skiplist level node key)
    :succeed (values (node-value found) t)
    :fail    (values nil nil)))

;; 挿入関数実装
(defun set-impl (skiplist level node key value)
  (find-node-case (prev found path) (skiplist level node key) 
    :succeed (setf (node-value found) value) ;; キーに紐付く値を更新
    :fail 
    ;; 要素が存在しない場合は、新しく追加する
    (loop WITH new = (new-node (decide-node-level skiplist) :key key :value value)
          ;; 各レベルでのリンクを張り替える (新規ノードを挿入する)
          FOR lv FROM 1 TO (node-level new)
          FOR prev IN path
      DO
      (shiftf (next-node new lv) (next-node prev lv) new))))

;; 削除関数実装
(defun rem-impl (skiplist level node key)
  (find-node-case (prev found path) (skiplist level node key)
    :fail nil
    :succeed 
    (with-slots (test) (the skiplist skiplist)
      ;; 各レベルでのリンクを張り替える
      (loop FOR lv FROM (node-level found) DOWNTO 1
        DO
        ;; レベルが下がるとprevとfoundの間に、別のノードが挟まっていることがあるので、
        ;; その場合はprevを必要な分だけ前に進める
        (loop UNTIL (eq (next-node prev lv) found)
              DO (setf prev (next-node prev lv)))
        ;; foundノードを飛ばすようにする
        (setf (next-node prev lv) (next-node found lv)))
      t)))

;; 検索関数
;; => (values (or キーに紐付く値 nil) 検索に成功したかどうか)
(defun get (key skiplist)
  (with-slots (max-level head-node) (the skiplist skiplist)
    (get-impl skiplist max-level head-node key)))

;; 挿入(値更新)関数
;; => new-value
(defsetf get (key skiplist) (new-value)
  `(with-slots (max-level head-node) (the skiplist ,skiplist)
     (set-impl ,skiplist max-level head-node ,key ,new-value)
     ,new-value))

;; 削除関数
;; => keyに紐付く要素が存在していたかどうか
(defun rem (key skiplist)
  (with-slots (max-level head-node) (the skiplist skiplist)
    (rem-impl skiplist max-level head-node key)))

実行例

;; sbcl-1.0.40

;; 作成
(defvar *sl* (skiplist:make 5))
--> *SL*

*sl*
--> #S(SKIPLIST::SKIPLIST
       :MAX-LEVEL 3
       :HEAD-NODE #S(SKIPLIST::NODE :KEY NIL :VALUE NIL :NEXTS #(NIL NIL NIL))
       :TEST #<FUNCTION <>)

;; 挿入
(setf (skiplist:get 10 *sl*) :a
      (skiplist:get 3  *sl*) :b
      (skiplist:get 50 *sl*) :c)
--> :C

(setf *print-circle* t)
*sl*
--> #S(SKIPLIST::SKIPLIST
       :MAX-LEVEL 3
       :HEAD-NODE #S(SKIPLIST::NODE
                     :KEY NIL
                     :VALUE NIL
                     :NEXTS #(#1=#S(SKIPLIST::NODE
                                    :KEY 3
                                    :VALUE :B
                                    :NEXTS #(#S(SKIPLIST::NODE
                                                :KEY 10
                                                :VALUE :A
                                                :NEXTS #(#S(SKIPLIST::NODE
                                                            :KEY 50
                                                            :VALUE :C
                                                            :NEXTS #(NIL))))
                                             NIL NIL))
                              #1# #1#))
       :TEST #<FUNCTION <>)

;; 検索
(skiplist:get 10 *sl*)
--> :a
    T

(skiplist:get 20 *sl*)
--> NIL
    NIL

;; 削除
(progn
  (skiplist:rem 3 *sl*)
  (skiplist:rem 10 *sl*))
--> T

*sl*
--> #S(SKIPLIST::SKIPLIST
       :MAX-LEVEL 3
       :HEAD-NODE #S(SKIPLIST::NODE
                     :KEY NIL
                     :VALUE NIL
                     :NEXTS #(#S(SKIPLIST::NODE :KEY 50 :VALUE :C :NEXTS #(NIL))
                              NIL NIL))
       :TEST #<FUNCTION <>)
;; hash-tableとの速度比較 (最適化されていない実装なので、あまり参考にはならないけど ...)

;; 比較関数定義
(defun fixnum< (a b)
  (declare (optimize (speed 3) (safety 0) (debug 0))
           (fixnum a b))
  (< a b))

;; データ生成関数
(defun gen-keys (count)
  (let ((keys (coerce (loop FOR i FROM 0 BELOW count COLLECT i)
                      'vector)))
    (loop REPEAT 2
      DO
      (loop FOR i FROM 0 BELOW count 
        DO
        (rotatef (aref keys i) (aref keys (random count)))))
    keys))
(gen-keys 10)
--> #(7 9 5 2 0 8 6 3 4 1)

;; データ数 / データ
(defvar *data-size* 1000000) ; 100万 
(defvar *keys* (gen-keys *data-size*))

;; スキップリスト: 挿入
(time
  (let ((sl (skiplist:make *data-size* :test #'fixnum<)))
    (loop FOR key ACROSS *keys*
          DO (setf (skiplist:get key sl) t))
    (defparameter *sl* sl)))
Evaluation took:
  4.670 seconds of real time
  4.664292 seconds of total run time (4.496281 user, 0.168011 system)
  [ Run times consist of 1.896 seconds GC time, and 2.769 seconds non-GC time. ]
  99.87% CPU
  9,314,648,895 processor cycles
  218,651,512 bytes consed

;; hash-table: 挿入
(time
  (let ((hash (make-hash-table :size *data-size*)))
    (loop FOR key ACROSS *keys*
          DO (setf (gethash key hash) t))
    (defparameter *hash* hash)))
Evaluation took:
  0.395 seconds of real time
  0.388025 seconds of total run time (0.360023 user, 0.028002 system)
  [ Run times consist of 0.128 seconds GC time, and 0.261 seconds non-GC time. ]
  98.23% CPU
  787,607,349 processor cycles
  31,457,528 bytes consed

;; スキップリスト: 検索
(time 
 (loop FOR key ACROSS *keys*
       DO
       (skiplist:get key *sl*)))
Evaluation took:
  3.141 seconds of real time
  3.112195 seconds of total run time (3.112195 user, 0.000000 system)
  [ Run times consist of 0.376 seconds GC time, and 2.737 seconds non-GC time. ]
  99.08% CPU
  6,266,176,248 processor cycles
  144,006,744 bytes consed

;; hash-table: 検索
(time 
 (loop FOR key ACROSS *keys*
       DO
       (gethash key *hash*)))
Evaluation took:
  0.013 seconds of real time
  0.012001 seconds of total run time (0.012001 user, 0.000000 system)
  92.31% CPU
  25,548,175 processor cycles
  0 bytes consed

;; スキップリスト: 削除
(time 
 (loop FOR key ACROSS *keys*
       DO
       (skiplist:rem key *sl*)))
Evaluation took:
  3.275 seconds of real time
  3.236203 seconds of total run time (3.208201 user, 0.028002 system)
  [ Run times consist of 0.452 seconds GC time, and 2.785 seconds non-GC time. ]
  98.81% CPU
  6,532,661,067 processor cycles
  144,005,048 bytes consed

;; hash-table: 削除
(time 
 (loop FOR key ACROSS *keys*
       DO
       (remhash key *hash*)))
Evaluation took:
  0.255 seconds of real time
  0.248015 seconds of total run time (0.244015 user, 0.004000 system)
  97.25% CPU
  508,396,815 processor cycles
  0 bytes consed