長さ制限つきのハフマン符号化(The Coin Collector's Problem)

冬休みの取り組みとして、ZIP*1圧縮/解凍器を作ってみることにした。
ZIPで使われている圧縮アルゴリズム(圧縮フォーマット?)は、DEFLATEアルゴリズムと云って、簡単に云えばLZ77とハフマン符号化を組み合わせたもののようだ。日本語版仕様(RFC1951)
ハフマン符号化は、以前に簡単なものを実装したことがあるが、DEFLATEアルゴリズムで使用するハフマン符号化では、一般的なそれとは異なり、(符号化される)各コードを表現するビット列の長さに制限(15ビットまで)を付けられる必要があるらしい。
なので今回はまず、その長さ制限付きのハフマン符号化アルゴリズムの実装を試してみることにする。

The Coin Collector's Problem

参考にしたのは、以下のWikipediaの記事。

詳しくはリンク先を読んでもらえれば分かると思うが、長さ制限付きハフマン符号化には、package-mergeアルゴリズムというものを使えば良いらしい。
上の記事では、まずpackage-mergeアルゴリズムを用いて、「The (binary) Coin Collector's Problem」という問題を解くことから始めているので、僕もそれに習うことにする。


・「The Coin Collector's Problem」設問 ※正確なものは上記リンク先を参照

  1. コインの収集家がいる
  2. コインは、額面上の価値(金額)と古銭的な価値の両方を有する
  3. 収集家は価格Nの商品を、手持ちのコインを消費して買いたいと思っている ※ Nは整数
  4. 額面上の金額を足してNになるコインの組み合わせの内で、古銭的な価値の総計が最も低い組み合わせを選択したい
  5. 前提: 額面上の価値の値は、二の乗数であるとする。1ドル、1/2ドル、1/4ドル、1/8ドル、など。

この問題の解法(package-mergeアルゴリズム)common lispで書いたのが以下のコード。
参照: nlet-acc

;;;;;;;;
;;;; パッケージ
(defpackage package-merge
  (:use :common-lisp)
  (:export :coin
           :coin-denom
           :coin-numis
           :make-coin
           :select-loss-smallest-coin-set))
(in-package :package-merge)

;; [The Coin Collector's Problem]
;; 金額N(整数)の商品を買いたいとして、denomを足してNになる組み合わせのうち、numisの合計が最小となるコインを選択する
;; denomは二の乗数とする。
;;  - また、最大のdenomと最小のdenomの間に、抜けている二の乗数値はないとものとする。
;;       e.g '(#S(coin :denom 1) #S(coin :denom 1) #S(coin :denom 1/4)) は不正 
;;              ==> #S(coin :denom 1/2)が一つ以上必要   ※ これは実装上の制約で本質的には不要
;;  - denomの最大値は1であるのが、望ましい
;;
;; 参照: http://en.wikipedia.org/wiki/Package-merge_algorithm

;;;;;;;;
;;;; 構造体: coin, packaged-coin
(defstruct coin
  (denom 0 :type number)  ; denomination:     額面金額
  (numis 0 :type number)) ; numismatic value: 古銭的な価値

(defstruct (packaged-coin (:include coin))
  (pair '() :type list))  ; (パッケージされた)コインのペア

(defun package-coin-pair (coin1 coin2)
  (make-packaged-coin :denom (+ (coin-denom coin1) (coin-denom coin2))
                      :numis (+ (coin-numis coin1) (coin-numis coin2))
                      :pair (list coin1 coin2)))

;;;;;;;;
;;;; 補助関数群
;; denomごとにコインを分割する: (coin ...) -> ((A-denom-coin ...) (B-denom-coin ...))
;;  1] 分割後のリストは、denomの昇順にソートされる
;;  2] 分割された各リストは、numisの昇順にソートされる
(defun separate-by-denom (coins)
  (let ((denom-map (make-hash-table)))
    (dolist (coin coins)
      (push coin (gethash (coin-denom coin) denom-map))) ;; denomで分割
    
    (let ((acc '()))
      (maphash (lambda (denom coins)
                 (declare (ignore denom))
                 (push (sort coins #'< :key #'coin-numis) acc)) ;; numisの昇順
               denom-map)
      (sort acc #'< :key (lambda (coins) (coin-denom (car coins))))))) ;; denomの昇順

;; denomが最小(2^i)のコイン群を、numisの小さい順に二つずつまとめて、
;; その結果を、二番目にdenomが小さい(2^i+1)コイン群とマージする 
(defun package-then-merge-one (separated-coins-list)
  (let ((smallest-denom-coins      (first  separated-coins-list))
        (next-smallest-denom-coins (second separated-coins-list))
        (rest                      (cddr   separated-coins-list)))
    (let ((merged-coins 
           (merge
            'list
            (loop FOR (fst snd) ON smallest-denom-coins BY #'cddr
                  WHEN snd  ; smallest-denom-coins数が奇数の場合、端数は切り捨てる ※1
                  COLLECT (package-coin-pair fst snd)) ; 古銭的な価値が低い順に、二つのコインをまとめる
            next-smallest-denom-coins
            #'< :key #'coin-numis)))
      (cons merged-coins rest))))
;;※1 余った(numisが最大の)コインのdenomをどのように組み合わせても、(二の乗数という性質上)整数にはならないため不要(?)

(defun coin-flatten (coins)
  (nlet-acc self ((coins coins))
    (dolist (coin coins)
      (if (packaged-coin-p coin)
          (self (packaged-coin-pair coin))
        (accumulate coin)))))

;;;;;;;;
;;;; メインの関数
;; 額面金額がcostを満たすコインの組み合わせの内で、古銭的な価値が最小となるものを選択して返す
(defun select-loss-smallest-coin-set (cost coin-list &optional flat)
         ;; 額面金額が1になる組み合わせのコインを、古銭的な価値が低い順に並べて取得する
  (let* ((numis-ascending-order-coins 
          (do* ((#1=separated-coins (separate-by-denom coin-list) 
                                    (package-then-merge-one #1#))
                (first-coin (caar #1#) (caar #1#)))
               ((= 1 (coin-denom first-coin)) (car #1#)))) ;; 最も低い額面価値が1になるまで続ける  ※ 最も高い額面価値も1

         ;; 古銭的な価値が低い順に、cost個のコインを取り出す
         (selected-coins (subseq numis-ascending-order-coins 0 cost)))
    (if flat 
        (coin-flatten selected-coins) ; unpackする
      selected-coins)))

;;;;;;;;
;;;; 表示関連
(defmethod print-object ((o coin) stream)
  ;; [額面金額 古銭的な価値]
  (format stream "[~A ~A]" (coin-denom o) (coin-numis o)))   

(defmethod print-object ((o packaged-coin) stream)
  ;; [額面金額(総計) 古銭的な価値(総計) {まとめられているコインの数}]
  (format stream "[~A ~A(~D)]" (coin-denom o) (coin-numis o) (length (coin-flatten (packaged-coin-pair o)))))

基本的には、以下の二つを繰り返しているだけ。※ 全てのコインの額面金額(or 額面金額の合計)が1になるまで繰り返す

  1. 額面金額の一番小さいコイン群を、古銭的な価値が小さい順に二つずつまとめる。※ まとめられたコインの額面金額(の合計)は、二番目に小さい額面金額と等しくなる。ex: 1/8+1/8=1/4
  2. 上でまとめたコイン群を、二番目に額面価値が小さいコイン群とマージする。


実行例は以下の通り。

> (in-package :common-lisp-user)

;; テストデータ
> (defvar *coins* (loop repeat 25 collect (package-merge:make-coin :denom (expt 2 (- (random 5)))
                                                                   :numis (expt 2 (- (random 5))))))
> (format t "~{~A~%~}" (sort (copy-list *coins*) #'< :key #'package-merge:coin-denom))
[1/16 1/2]
[1/16 1]
[1/16 1/2]
[1/16 1/4]
[1/8 1]
[1/8 1]
[1/8 1/16]
[1/4 1/16]
[1/4 1/16]
[1/4 1/8]
[1/4 1/16]
[1/4 1/8]
[1/4 1/4]
[1/4 1/4]
[1/4 1/16]
[1/4 1/2]
[1/2 1/4]
[1/2 1/4]
[1/2 1]
[1/2 1/2]
[1/2 1/16]
[1/2 1/16]
[1 1/16]
[1 1/16]
[1 1]

;; 額面金額が足して4になるコインの組み合わせを選択
> (package-merge:select-loss-smallest-coin-set 4 *coins*)
--> ([1 1/16] [1 1/16] [1 1/8{2}] [1 1/4{4}])     ; 額面金額が1のコインが2個と、パッケージにまとめられたコインが計6個

> (package-merge:select-loss-smallest-coin-set 4 *coins* t)  
--> ([1 1/16]   [1 1/16]   [1/2 1/16] [1/2 1/16]  ; 上の展開版
     [1/4 1/16] [1/4 1/16] [1/4 1/16] [1/4 1/16])

> (reduce #'+ * :key #'package-merge:coin-numis)
--> 1/2  ; 額面金額4に対して、古銭的な価値は1/2

長さ制限付きハフマン符号化

次は、長さ制限付きのハフマン符号化。
既にpackage-mergeアルゴリズムは使えるようになったので、後はこれをハフマン符号化に応用するだけ。

(defpackage :length-limited-huffman 
  (:use :common-lisp)
  (:export :calc-code-bit-length))
(in-package :length-limited-huffman)

(defstruct (coin-with-code (:include package-merge:coin))
  code)

;; 入力ストリームのデータから、各コードのハフマン符号化に必要なビット長を計算する
;; --> #(コード0のビット長 コード1のビット長 コード2のビット長 ... コード255のビット長)
(defun calc-code-bit-length (binary-input-stream bit-length-limit &aux (in binary-input-stream))
  ;; 出現回数を数える
  (let ((code-freq (make-array #x100 :initial-element 0)))
    (loop FOR   code = (read-byte in nil nil)
          WHILE code
          DO    (incf (aref code-freq code)))

    ;; package-mergeアルゴリズムを用いて、各コードの符号化に必要なビット数を算出する
    (let ((coins (loop FOR code FROM 0 BELOW #x100 
                       UNLESS (zerop (aref code-freq code)) ; 一回でも出現しているなら
                       APPEND (loop FOR i FROM -1 DOWNTO (- bit-length-limit) ; コインをbit-lenght-limit個作成する
                                    COLLECT (make-coin-with-code 
                                              :code  code
                                              :denom (expt 2 i)                ; 2^-1 から 2^-bit-length-limitまで
                                              :numis (aref code-freq code))))) ; 出現回数
          (bit-lengths (make-array #x100 :initial-element 0))) 

      ;; 各コードの符号化に必要なビット数をカウントする
      (dolist (coin (package-merge:select-loss-smallest-coin-set ; packge-mergeアルゴリズム
                     (1- (count-if #'plusp code-freq)) coins t))
        ;; 選択されたcoin(コード)の数 = 符号ビット数
        (incf (aref bit-lengths (coin-with-code-code coin))))
      bit-lengths)))

例:

> (in-package :common-lisp-user)

;; 『こころ』(UTF-8)の符号化に最適な、各コードの符号ビット長を取得する
> (with-open-file (in "/home/ohta/kokoro" :element-type '(unsigned-byte 8))
    (length-limited-huffman:calc-code-bit-length in 15)) ; ビット長さの最大は15
--> #(0 0 0 0 0 0 0 0 0 0 8 0 0 8 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 15 0 0 0 0 0
      0 0 0 0 0 0 0 11 15 15 15 15 15 15 15 15 15 15 15 15 15 0 0 0 0 0 0 0 0 0 0 0
      0 0 0 15 15 0 0 0 0 0 0 0 0 15 0 0 0 0 15 0 0 0 0 0 0 0 0 15 0 0 0 0 0 15 15
      15 15 0 0 15 0 15 15 0 15 0 15 15 0 15 0 15 15 0 0 0 0 0 4 3 4 8 6 8 7 8 7 7
      6 5 6 7 9 7 8 8 7 7 8 8 8 6 9 7 9 8 9 7 10 6 8 8 9 7 8 8 6 6 7 8 7 6 9 8 6 7
      8 9 9 9 9 9 9 9 7 9 7 8 8 8 7 8 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
      0 0 0 0 0 0 0 0 0 0 0 11 2 7 5 6 6 7 7 0 0 0 0 0 10 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0)

> (aref * (char-code #\a))
--> 15  ; #\aは15ビットで符号化される

> (defvar *bit-lengths* **) ; 後で使うので保存しておく

これで、符号化対象データ(入力ストリーム)から各コードの符号ビット長を(長さ制限付きで)計算することができるようになった。
上の関数で取得できるのは各コードの符号ビット長だけで、実際の符号化で用いるビット列は不明なままだが、これはビット長さえ分かれば、(例えば)以下のようなコードで簡単に求めることができる(詳しくはRFC1951を参照)

;; ポストインクリメント
(defmacro post-incf (place &optional (num 1))
  `(prog1 ,place (incf ,place ,num)))

;; 値とビット長から、bit配列を作成する
(defun make-bit-vector (code length)
  (let ((bits (make-array length :element-type 'bit)))
    (dotimes (i length bits)
      (setf (bit bits (- length i 1)) (ldb (byte 1 i) code)))))

;; ビット長の配列から、符号化に用いるビット列を作成するコード
;; ビット長の最大が15だということが前提のコード
;; XXX: 未整理
> (let ((lengths (make-array 16 :initial-element 0)))  ; nビットで符号化されるコードの個数を数える
    (loop FOR    len ACROSS *bit-lengths*
          UNLESS (zerop len)
          DO     (incf (aref lengths len)))

   (let ((base-codes (make-array 16 :initial-element 0))) ; それぞれの長さのビット列のベースとなる値を設定する
    (loop FOR i FROM 1 BELOW 16 DO
      (setf (aref base-codes i) 
            (ash (+ (aref base-codes (1- i))
                    (aref lengths (1- i)))
                 1)))

    (let ((bits (make-array #x100 :initial-element #*))) ; それぞれのコードにビット列を割り振る
      (loop FOR i FROM 0 BELOW #x100 
            UNLESS (zerop (aref *huf* i))
            DO (setf (aref bits i) 
                     (make-bit-vector (post-incf (aref base-codes (aref *bit-lengths* i)))
                                      (aref *bit-lengths* i))))
      bits)))
--> #(#* #* #* #* #* #* #* #* #* #* #*11100010 #* #* #*11100011 #* #* #* #* #* #*
      #* #* #* #* #* #* #* #* #* #* #* #* #*111111111100000 #* #* #* #* #* #* #* #*
      #* #* #* #* #*11111111100 #*111111111100001 #*111111111100010
      #*111111111100011 #*111111111100100 #*111111111100101 #*111111111100110
      #*111111111100111 #*111111111101000 #*111111111101001 #*111111111101010
      #*111111111101011 #*111111111101100 #*111111111101101 #* #* #* #* #* #* #* #*
      #* #* #* #* #* #* #*111111111101110 #*111111111101111 #* #* #* #* #* #* #* #*
      #*111111111110000 #* #* #* #* #*111111111110001 #* #* #* #* #* #* #* #*
      #*111111111110010 #* #* #* #* #* #*111111111110011 #*111111111110100
      #*111111111110101 #*111111111110110 #* #* #*111111111110111 #*
      #*111111111111000 #*111111111111001 #* #*111111111111010 #* #*111111111111011
      #*111111111111100 #* #*111111111111101 #* #*111111111111110 #*111111111111111
      #* #* #* #* #* #*0110 #*010 #*0111 #*11100100 #*100100 #*11100101 #*1011110
      #*11100110 #*1011111 #*1100000 #*100101 #*10000 #*100110 #*1100001
      #*111110000 #*1100010 #*11100111 #*11101000 #*1100011 #*1100100 #*11101001
      #*11101010 #*11101011 #*100111 #*111110001 #*1100101 #*111110010 #*11101100
      #*111110011 #*1100110 #*1111111100 #*101000 #*11101101 #*11101110 #*111110100
      #*1100111 #*11101111 #*11110000 #*101001 #*101010 #*1101000 #*11110001
      #*1101001 #*101011 #*111110101 #*11110010 #*101100 #*1101010 #*11110011
      #*111110110 #*111110111 #*111111000 #*111111001 #*111111010 #*111111011
      #*111111100 #*1101011 #*111111101 #*1101100 #*11110100 #*11110101 #*11110110
      #*1101101 #*11110111 #* #* #* #* #* #* #* #* #* #* #* #* #* #* #* #* #* #* #*
      #* #* #* #* #* #* #* #* #* #* #* #* #* #* #* #*11111111101 #*00 #*1101110
      #*10001 #*101101 #*101110 #*1101111 #*1110000 #* #* #* #* #* #*1111111101 #*
      #* #* #* #* #* #* #* #* #* #* #* #* #* #* #*)

> (aref * (char-code #\a))
--> #*111111111110010  ; #\aの符号ビット列は111111111110010

2009/01/05: 修正版length-limited-huffman

上で作成したlength-limited-huffmanパッケージは対象がバイト列(値が0~255の列)に固定されていたので、その制限をなくした版を載せておく。
また、(コードに対応する)符号ビット長の列からハフマン符号表およびハフマン符号木を作成する関数も追加した。
参照: hash-trie

(defpackage :length-limited-huffman 
  (:use :common-lisp)
  (:export :calc-code-bit-length
           :make-trie
           :make-code->bits-table))
(in-package :length-limited-huffman)

(rename-package :hash-trie :hash-trie '(:trie))

(defstruct (coin-with-code (:include package-merge:coin))
  code)

;; 以前は、入力ストリームを引数に取るようにしていたが、
;; 符号化の対象となるコードの出現数を保持する配列を受けとるように変更。
;; ※ 配列の添字=コード値
(defun calc-code-bit-length (code-frequency-table bit-length-limit)
  (let* ((code-freq code-frequency-table)
         (code-limit (length code-frequency-table))
         (coins (loop FOR code FROM 0 BELOW code-limit
                      UNLESS (zerop (aref code-freq code))
                      APPEND (loop FOR i FROM -1 DOWNTO (- bit-length-limit)
                                   COLLECT (make-coin-with-code 
                                            :code  code
                                            :denom (expt 2 i)               
                                            :numis (aref code-freq code)))))
         (bit-lengths (make-array code-limit :initial-element 0))) 
    
    (dolist (coin (package-merge:select-loss-smallest-coin-set
                   (1- (count-if #'plusp code-freq)) coins t))
      (incf (aref bit-lengths (coin-with-code-code coin))))
    bit-lengths))

;; ポストインクリメント
(defmacro post-incf (place &optional (num 1))
  `(prog1 ,place (incf ,place ,num)))

;; 整数とビット長から、ビット配列を作成する
;; ex. (bits #b10 4) ==> #*0010
(defun bits (code length)
  (let ((bits (make-array length :element-type 'bit)))
    (dotimes (i length bits)
      (setf (bit bits (- length i 1)) (ldb (byte 1 i) code)))))

;; 配列の中で値が最大の要素を取得する
(defun max-value (ary)
  (loop FOR v ACROSS ary MAXIMIZE v))

;; ビット長を要素とする配列を受け取り、各コードに対応する符号ビット列を保持する配列を返す
(defun make-code->bits-table (bit-lens &aux (max-len (max-value bit-lens)))
  (let ((lengths    (make-array (1+ max-len) :initial-element 0))
        (code-limit (length bit-lens)))
    (loop FOR    len ACROSS bit-lens
          UNLESS (zerop len)
          DO     (incf (aref lengths len)))

   (let ((base-codes (make-array (1+ max-len) :initial-element 0)))
     (loop FOR i FROM 1 TO max-len  DO
       (setf (aref base-codes i) 
             (ash (+ (aref base-codes (1- i))
                     (aref lengths (1- i)))
                  1)))

    (let ((bits (make-array code-limit :initial-element #*)))
      (loop FOR i FROM 0 BELOW code-limit
            UNLESS (zerop (aref bit-lens i))
            DO (setf (aref bits i) 
                     (bits (post-incf (aref base-codes (aref bit-lens i)))
                           (aref bit-lens i))))
      bits))))

;; 符号ビット列の配列から、ハフマン木(トライ)を作成する
(defun make-trie (bits-table)
  (let ((trie       (trie:make-trie))
        (code-limit (length bits-table)))
    (dotimes (code code-limit)
      (let ((bits (aref bits-table code)))
        (unless (zerop (length bits))
          (setf (trie:get-elem bits trie) code))))
    trie))

*1:gzipに変更。2009/12/29