読者です 読者をやめる 読者になる 読者になる

ソート済みファイルからO(1)のメモリ使用量でDoubleArrayを構築する方法 #APPENDIX

common lisp algorithm

前半および後半で実装を省略していたパッケージのソースコードをここに載せておく。
buffered-outputパッケージとnode-allocatorパッケージの二つ。

buffered-output

ランダムアクセス可能なバイナリファイル出力用のパッケージ。
DoubleArray構築時に使われる。
※ DoubleArrayは一旦オンメモリに構築されることなく、このパッケージを使って直接(随時)ファイルに書き出してしまう
DoubleArray構築時のファイルアクセスパターンに特化して効率化(シークの軽減等)を図っている。
アクセスパターン:

  • 大局的に見れば、ほぼシーケンシャルな書込アクセス
    • ノードは添字の若い方から順に割当られていく傾向があるため ※ もちろんノード割当ロジックに依存するが
    • 狭い範囲内(ブロック)でのランダムアクセスは多々ある
      • バッファ(キャッシュ)を用意して、ブロックでの書込みは、まずそこに行われるようにする
      • バッファの内容は次のブロックに移る前に、ファイルに書き出される
  • まれに大きく離れた場所への書込アクセスが発生する可能性があるが、性能に与える影響は小さいので無視

具体的な数値は手元には無いが、このbuffered-outputパッケージを使った場合と、通常(?)の「まずオンメモリでDoubleArrayを作成し、再度にまとめてファイルに書き出す」方法を以前に比べた時、ほとんど気になるような処理速度の差はなかったように思う。
所要メモリはO(1)。 ※ ブロック・バッファサイズに依存: +BUFFER_SIZE+ * 配列の要素のサイズ

(defpackage buffered-output
  (:use :common-lisp)
  (:export buffered-output
           with-output
           write-uint))
(in-package :buffered-output)

;;;;;;;;
;;; type
(deftype array-index () `(mod ,array-dimension-limit))
(deftype positive-fixnum () `(integer 0 ,most-positive-fixnum))

;;;;;;;;;;;;
;;; constant
(defconstant +BUFFER_SIZE+ 819200)
(defconstant +CODE_LIMIT+ #x100)

;;;;;;;;;;;;;;;;;;;
;;; buffered-output
(defstruct buffered-output
  (binary-output nil :type file-stream)
  (buffer        #() :type simple-array)
  (offset          0 :type array-index))

;;;;;;;;;;;;;;;;;;;;;
;;; external function
(defmacro with-output ((out path &key (byte-width 1)) &body body)
  (declare ((mod 9) byte-width))
  `(with-open-file (,out ,path :element-type #1='(unsigned-byte ,(* 8 byte-width))
                               :direction :output
                               :if-exists :supersede)
     (let ((,out (make-buffered-output 
                  :binary-output ,out
                  :buffer (make-array ,+BUFFER_SIZE+ :element-type #1#
                                                     :initial-element 0))))
       (unwind-protect
           (locally ,@body)
         (flush ,out :final t)))))

;; unsigned-int型の数値をpositionの位置に書き込む
(defun write-uint (uint out &key (position 0))
  (declare (buffered-output out)
           (positive-fixnum position))
  (with-slots (binary-output buffer offset) out
    (cond ((< position offset)  ; a] 現在のブロックよりも前方に書き込む場合: 通常のランダムアクセス書込み(シーク発生)
           (file-position binary-output position)
           (write-byte uint binary-output))
          ((< position (+ offset +BUFFER_SIZE+)) ; b] ブロック内の書き込む場合: バッファに書込み
           (setf (aref buffer (- position offset)) uint))
          (t            ; c] 現在のブロックよりも後方に書き込む場合: バッファの内容を全て出力した後に、ブロックを移動
           (flush out)
           (incf offset +BUFFER_SIZE+)
           (fill buffer 0)
           (write-uint uint out :position position)))))

;;;;;;;;;;;;;;;;;;;;;
;;; internal function
(defun flush (out &key final)
  (declare (buffered-output out))
  (with-slots (binary-output buffer offset) out
    (file-position binary-output offset)
    (if (null final)
        (write-sequence buffer binary-output)
      (let ((end (or (position-if-not #'zerop buffer :from-end t)
                     (1- +BUFFER_SIZE+))))
        (write-sequence buffer binary-output :end (1+ end))
        ;; da:member?関数内で 範囲外アクセスチェック を行わないで済むように余裕を設けておく
        (loop REPEAT +CODE_LIMIT+ DO (write-byte 0 binary-output))))))  

node-allocator

ノードの割当を管理するパッケージ。
子ノード(のラベルの)リストを受け取って、使用可能なノード(BASE)のインデックスを返す。
まだソースコードが未整理なので、ややこしい・・・。
基本的な割当ロジックは『An Efficient Implementation of Trie Structures』にも出てくるリンクリストによる実装とほとんど変わらない*1
ただ、このパッケージでも(シーケンシャルな?)ブロック的な考え方を採用していて「まずはブロック1(0 〜 +BUFFER_SIZE+)の中でノードを割当」て、その範囲で足りなくなったら「前のブロックは捨てて(もう使わない)、次のブロック(+BUFFER_SIZE+ 〜 +BUFFER_SIZE+*2)内で割当を行う」といったことを繰り返している。
こちらも所要メモリはO(1)。 ※ ブロックサイズに依存: +BUFFER_SIZE+ * 定数値

(defpackage node-allocator
  (:use :common-lisp)
  (:export node-allocator
           make
           allocate))
(in-package :node-allocator)

;;;;;;;;;;;;;;;
;;; declaration
(declaim (inline get-next can-allocate?))

;;;;;;;;
;;; type
(deftype array-index () `(mod ,array-dimension-limit))
(deftype octet () '(unsigned-byte 8))

;;;;;;;;;;;;
;;; constant
(eval-when (:compile-toplevel :load-toplevel :execute)
  (defconstant +BUFFER_SIZE+ 89120))

;;;;;;;;;;;;;;;;;;
;;; node-allocator
(defstruct node-allocator 
  (head #x100 :type array-index)
  (bits   #*  :type (simple-bit-vector #.+BUFFER_SIZE+))
  (nexts #()  :type (simple-array fixnum (#.+BUFFER_SIZE+)))
  (prevs #()  :type (simple-array fixnum (#.+BUFFER_SIZE+)))
  (offset  0  :type array-index))

(defmethod print-object ((o node-allocator) stream)
  (print-unreadable-object (o stream :type t :identity t)))

;;;;;;;;;;;;;;;
;;; constructor
(defun make ()
  (let ((bits  (make-array +BUFFER_SIZE+ :element-type 'bit :initial-element 0))
        (nexts (make-array +BUFFER_SIZE+ :element-type 'fixnum))
        (prevs (make-array +BUFFER_SIZE+ :element-type 'fixnum)))
    (loop FOR i FROM 0 BELOW +BUFFER_SIZE+ 
      DO
      (setf (aref nexts i) (1+ i)
            (aref prevs i) (1- i)))
    (make-node-allocator :nexts nexts :prevs prevs :bits bits)))

;;;;;;;;;;;;;;;;;;;;;;
;;; auxiliary function
(defun shift (alloca)
  (with-slots (bits nexts prevs offset head) (the node-allocator alloca)
    (let ((new-offset head))
      (loop WHILE (< new-offset (+ offset (- +BUFFER_SIZE+ (* #x100 2))))
        DO
        (setf new-offset (aref nexts (- new-offset offset))))
      (let* ((delta (- new-offset offset))
             (use-len (- +BUFFER_SIZE+ delta)))
        (shiftf (subseq bits 0 use-len) (subseq bits delta))
        (fill bits 0 :start use-len)

        (setf offset new-offset)
        
        (shiftf (subseq nexts 0 use-len) (subseq nexts delta))
        (shiftf (subseq prevs 0 use-len) (subseq prevs delta))
        (loop FOR i FROM (+ offset use-len) BELOW (+ offset +BUFFER_SIZE+)
          DO
          (setf (aref nexts (- i offset)) (1+ i)
                (aref prevs (- i offset)) (1- i)))

        (setf head offset)
        (loop WHILE (< head (+ offset #x100))
          DO
          (setf head (aref nexts (- head offset)))))))
  alloca)

(defun ref (alloca index)
  (declare (array-index index))
  (with-slots (offset nexts) (the node-allocator alloca)
    (if (<= (+ offset +BUFFER_SIZE+) index)
        (ref (shift alloca) index) 
      (aref nexts (- index offset)))))

(defun bref (alloca index)
  (declare (array-index index))
  (with-slots (bits offset) (the node-allocator alloca)
    (if (> offset index)
        1
      (if (<= (+ offset +BUFFER_SIZE+) index)
          (bref (shift alloca) index)
        (bit bits (- index offset))))))

(defun get-next (alloca index)
  (ref alloca index))

(defun can-allocate? (alloca index arcs)
  (declare (list arcs)
           (array-index index))
  (and (zerop (bref alloca index))
       (every (lambda (arc)
                (declare (octet arc))
                (/= -1 (ref alloca (+ index arc))))
              arcs)))

(defun allocate-impl (alloca index arcs)
  (declare (array-index index))
  (with-slots (bits head prevs nexts offset) (the node-allocator alloca)
    (when (<= offset index)
      (setf (bit bits (- index offset)) 1))
    (loop WITH base = index
          FOR arc OF-TYPE (mod #x100) IN arcs
          FOR index OF-TYPE fixnum = (+ base arc)
      DO
      (when (<= offset index)
        (ref alloca index)

        (let ((prev (aref prevs (- index offset)))
              (next (aref nexts (- index offset))))
          (setf (aref prevs (- index offset)) -1
                (aref nexts (- index offset)) -1)
          
          (when (= head index)
            (setf head next))

          (when (<= offset prev)
            (setf (aref nexts (- prev offset)) next))

          (when (<= offset next)
            (ref alloca next)
            (setf (aref prevs (- next offset)) prev)))))))

;;;;;;;;;;;;;;;;;;;;;
;;; external function
(defun allocate (alloca arcs)
  (with-slots (head) (the node-allocator alloca)
    (loop WITH front OF-TYPE (mod #x100) = (car arcs)
          FOR cur = (get-next alloca head) THEN (get-next alloca cur)
          FOR base OF-TYPE fixnum = (- cur front)
          UNTIL (and (plusp base) (can-allocate? alloca base (cdr arcs)))
      FINALLY
      (allocate-impl alloca base arcs)
      (return base))))

*1:異なるのはCHECK配列に格納されているのが遷移元ノードのインデックスではなく、遷移文字(ラベル)なため、BASE値が重複して使われているのを禁止している点くらいだと思う