RustとSBCLでのDAWG構築性能の比較メモ

Rustの勉強を兼ねて、cl-dawgというDAWGのCommon Lisp実装を移植して、rust-dawgというライブラリを作ってみた。
(DAWGは末尾部分を共有可能にしたトライの亜種。上記ライブラリでは、そのトライ木をDoubleArray形式で表現している。DAWGやDoubleArrayの構築方法自体に関しては、過去に何度か記事を書いているのでここでは省略する)

両者の性能比較を軽く行ったので、ここにその結果メモを残しておく。

環境

  • CPU: Intel(R) Core(TM) i7-5600U CPU @ 2.60GHz (論理四コア)
  • メモリ: 8GB
  • OS: Ubuntu-15.04 (64bit)
  • rust-dawg: v0.1.0
  • cl-dawg: v0.3.1
  • Rust: rustc-1.5.0
  • Common Lisp: SBCL-1.2.14

測定

内容

  • DAWGインデックスファイルの構築性能を比較
    • 実行時間とメモリ消費量
  • 入力は4500万個程度の文字Nグラム
  • 内部的には、以下の二つの処理が行われているので、そのそれぞれに要した時間を計測した:
    1. 入力を読み込み、メモリ上に二分木トライ(DAWG)の構築
    2. その二分木をDoubleArray形式に変換しつつ、ファイルに書き出し
  • なお上記の1に関してはrust-dawgとcl-dawgでほぼ同様のコードとなっているが、2に関してはrust-dawgの実装中に、実行速度的により効率的(と思われる)な方法を思いついて実装してしまったのでrust版の方が有利な計測となってしまっている可能性が高い
    • メモリ消費量には影響がない

準備とコマンド実行

入力ファイルの準備

# Nグラムコーパスの取得
$ wget -xnH -i http://dist.s-yata.jp/corpus/nwc2010/ngrams/char/over99/filelist
$ xz -d corpus/nwc2010/ngrams/char/over999/*/*.xz
$ head -5 corpus/nwc2010/ngrams/char/over999/2gms/2gm-0000
" "     123067
" #     2867
" $     1047
" %     2055
" &     3128

# 文字列部分のみを抜き出してソートする
$ cut -f 1 corpus/nwc2010/ngrams/char/over999/*gms/* | sed -e 's/ //g' > words.tmp
$ LC_ALL=c sort words.tmp -o words

$ wc -l words
44280717 words # 約4500万ワード

$ du -h words
652M    words

rust-dawgのビルドと実行

# ビルド
$ git clone git@github.com:sile/rust-dawg.git
$ cd rust-dawg
$ patch -p1 < rust-dawg.patch # 計測コードを埋め込むためのパッチを当てる
$ cargo build --release

# 実行
$ /usr/bin/time -v target/release/dawg_build rust-dawg.idx < words
Building binary-tree trie ... done: elapsed=49999 ms
Building double-array trie ... done: elapsed=11402 ms
DONE
        Command being timed: "target/release/dawg_build rust-dawg.idx"
        User time (seconds): 60.90
        System time (seconds): 0.53
        Percent of CPU this job got: 100%
        Elapsed (wall clock) time (h:mm:ss or m:ss): 1:01.40
        Average shared text size (kbytes): 0
        Average unshared data size (kbytes): 0
        Average stack size (kbytes): 0
        Average total size (kbytes): 0
        Maximum resident set size (kbytes): 1921288
        Average resident set size (kbytes): 0
        Major (requiring I/O) page faults: 0
        Minor (reclaiming a frame) page faults: 3195
        Voluntary context switches: 1
        Involuntary context switches: 84
        Swaps: 0
        File system inputs: 0
        File system outputs: 409824
        Socket messages sent: 0
        Socket messages received: 0
        Signals delivered: 0
        Page size (bytes): 4096
        Exit status: 0
$ du -h rust-dawg.idx
201M    rust-dawg.idx

cl-dawgのビルドと実行

# ビルド
$ git clone git@github.com:sile/cl-dawg.git
$ cd cl-dawg
$ patch -p1 < cl-dawg.patch # 計測コードを埋め込むためのパッチを当てる
$ sbcl --noinform --dynamic-space-size 7500

;; dawgパッケージをビルド
* (require :asdf)
* (setf asdf:*central-registry* (list (directory ".") (directory "lib/dict-0.2.0/")))
* (asdf:load-system :dict-0.2.0)
* (asdf:load-system :dawg)

;; 実行可能ファイルを作成
* (defun main ()
   (let ((input-file (second sb-ext:*posix-argv*))
        (output-file (third sb-ext:*posix-argv*)))
    (time
     (dawg:build :input input-file :output output-file))))
* (sb-ext:save-lisp-and-die "dawg-build" :toplevel #'main :executable 't :save-runtime-options t)

# 実行
$ /usr/bin/time -v ./dawg-build words cl-dawg.idx

:BUILD-BINARY-TREE-TRIE
Evaluation took:
  37.298 seconds of real time
  37.304000 seconds of total run time (35.300000 user, 2.004000 system)
  [ Run times consist of 12.180 seconds GC time, and 25.124 seconds non-GC time. ]
  100.02% CPU
  96,735,665,025 processor cycles
  12 page faults
  11,938,296,176 bytes consed

:BUILD-DOUBLE-ARRAY-TRIE
Evaluation took:
  33.487 seconds of real time
  33.496000 seconds of total run time (32.436000 user, 1.060000 system)
  [ Run times consist of 6.484 seconds GC time, and 27.012 seconds non-GC time. ]
  100.03% CPU
  86,850,782,635 processor cycles
  2 page faults
  10,786,507,072 bytes consed

        Command being timed: "./dawg-build words cl-dawg.idx"
        User time (seconds): 67.74
        System time (seconds): 3.09
        Percent of CPU this job got: 100%
        Elapsed (wall clock) time (h:mm:ss or m:ss): 1:10.82
        Average shared text size (kbytes): 0
        Average unshared data size (kbytes): 0
        Average stack size (kbytes): 0
        Average total size (kbytes): 0
        Maximum resident set size (kbytes): 4622404
        Average resident set size (kbytes): 0
        Major (requiring I/O) page faults: 25
        Minor (reclaiming a frame) page faults: 1190638
        Voluntary context switches: 30
        Involuntary context switches: 595
        Swaps: 0
        File system inputs: 4896
        File system outputs: 820952
        Socket messages sent: 0
        Socket messages received: 0
        Signals delivered: 0
        Page size (bytes): 4096
        Exit status: 0

$ du -h cl-dawg.idx
201M    cl-dawg.idx

結果(まとめ)

上の結果を表形式にまとめたもの。

所要時間

二分木構築 DoubleArray構築 合計
rust-dawg 49.99s 11.40s 61.40s
cl-dawg 37.30s (非GC: 25.12s) 33.49s (非GC: 27.01s) 70.80s (非GC: 52.13s)

二分木構築に要した時間はcl-dawgの方が、DoubleArray構築に要した時間はrust-dawgの方が、短くなっている。
後者の影響が大きく合計時間もrust-dawgの方が短くなっているが、上で書いたようにDoubleArray構築部分はRust版の方が有利な実装になっているので、 その辺りの条件を合わせれば、おそらく合計でもcl版の方が速い結果となると思う。

また、もしGCに要した時間を除外したとすればcl-dawg(SBCL)の方がだいぶ良好な結果となっている。

メモリ消費量

最大メモリ消費量
rust-dawg 1.921GB
cl-dawg 4.622GB

メモリ消費量に関しては、(GC無し言語とGC有り言語の比較なので当然と言えば当然だが)Rust版の方が半分以下の消費量となっている。

感想

rust-dawgのメモリ消費量に関しては期待通り。

二分木構築部分の処理時間は、cl-dawg(SBCL)の「非GC部分の時間+α」くらいに収まってくれるかと予想していたが、意外と振るわない結果となった。

SBCLはかなり高速な処理系 and CL版はかなり最適化されている and Rust初心者」とcl-dawg版に有利な条件も多々あるけど、 Rustは「静的型付け and GC不要」な分だけ実行時のコストを安く出来ても良いはずなのに、とは思う。
(ただし、GC不要と言いつつrust-dawg内では(GCの一種と見做せなくもない)RCモジュールは多用されている)

特に直近でやる予定はないけど、気が向いたら何が原因で速度差が生じているのかの特定を行ってみたいかもしれない。
(ex. RCを生ポインタに置き換えたらどうなるか、とか、Optionをやめて番兵値でundefinedを表現してみる、とか、内部で多用しているハッシュマップの実装を変えたらどうなるか、とか、そもそもコンパイラの最適化が不十分、とか)

測定用に当てたパッチ

rust-dawg.patch

diff --git a/Cargo.toml b/Cargo.toml
index a1daa96..f829e37 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -7,2 +7,3 @@  [dependencies]
 bit-vec = "*"
 byteorder = "*"
+time = "*"
diff --git a/src/bin/dawg_build.rs b/src/bin/dawg_build.rs
index 7677951..880fd5f 100644
--- a/src/bin/dawg_build.rs
+++ b/src/bin/dawg_build.rs
@@ -4,6 +4,7 @@
 // see the LICENSE file at the top-level directory.

 extern crate dawg;
+extern crate time;

 use std::env;
 use std::process;
@@ -12,6 +13,11 @@ use std::io::BufRead;
 use dawg::binary_tree::Builder as BinaryTreeBuilder;
 use dawg::double_array::Builder as DoubleArrayBuilder;

+fn now_ms() -> u64 {
+    let t = time::now().to_timespec();
+    (t.sec as u64 * 1000 + t.nsec as u64 / 1000 / 1000)
+}
+
 fn main() {
     let args: Vec<_> = env::args().collect();
     if args.len() != 2 {
@@ -21,17 +27,25 @@ fn main() {

     let stdin = io::stdin();
     let output_file = &args[1];
+
+    print!("Building binary-tree trie ... ");
+    let start = now_ms();
     let trie = BinaryTreeBuilder::new()
                    .build(stdin.lock().lines())
                    .unwrap_or_else(|e| {
                        println!("[ERROR] Can't build DAWG: reason={}", e);
                        process::exit(1);
                    });
+    println!("done: elapsed={} ms", now_ms() - start);
+
+    print!("Building double-array trie ... ");
+    let start = now_ms();
     let trie = DoubleArrayBuilder::new().build(trie);
     if let Err(e) = trie.save(output_file) {
         println!("[ERROR] Can't save dawg index: path={}, reason={}", output_file, e);
         process::exit(1);
     }
+    println!("done: elapsed={} ms", now_ms() - start);

     println!("DONE");
 }

cl-dawg.patch

diff --git a/dawg.lisp b/dawg.lisp
index aa5ea40..095dcbd 100644
--- a/dawg.lisp
+++ b/dawg.lisp
@@ -61,11 +61,14 @@
   (declare ((or string pathname list) input)
            ((or string pathname) output)
            ((member :native :little :big) byte-order))
-  (let ((trie (if (listp input)
+  (let ((trie (time (progn (print :build-binary-tree-trie) (if (listp input)
                   (dawg.bintrie-builder:build-from-list input :show-progress show-progress)
                 (dawg.bintrie-builder:build-from-file input :show-progress show-progress))))
+        ))
+    (time (progn (print :build-double-array-trie)
     (dawg.double-array-builder:build-from-bintrie
      trie :output-file output :byte-order byte-order :show-progress show-progress))
+    ))
   t)

 (defun load (index-path &key (byte-order :native))

ソート済みのリストに対する破壊的マージソートの改良

以前に載せたマージソート(をベースとしたもの)をSBCL(1.0.58)にコミットしてくれたPaul Khuongさんが、こんな記事を書いていて、なるほどなー、と思ったので、表題に関係する部分を参考にさせて貰って変更前後での比較を行ったメモ。

オリジナルのマージソート

まず、SBCL(1.0.58)のリストに対する破壊的マージソートの実装*1:

;; 二つのソート済みリストのマージ関数
(declaim (inline merge-lists*))
(defun merge-lists* (head list1 list2 test key &aux (tail head))
  (declare (type cons head list1 list2)
           (type function test key)
           (optimize speed))
  (macrolet ((merge-one (l1 l2)
               `(progn
                  (setf (cdr tail) ,l1
                        tail       ,l1)
                  (let ((rest (cdr ,l1)))
                    (cond (rest
                           (setf ,l1 rest))
                          (t
                           (setf (cdr ,l1) ,l2)
                           (return (cdr head))))))))
    (loop
     (if (funcall test (funcall key (car list2))  ; this way, equivalent
                       (funcall key (car list1))) ; values are first popped
         (merge-one list2 list1)                  ; from list1
         (merge-one list1 list2)))))

;; 実行
(merge-lists* '(:head) '(1 3 5) '(2 4 6) #'< #'identity))
=> (1 2 3 4 5 6)
;; リストのマージソート関数
(declaim (inline stable-sort-list))
(defun stable-sort-list (list test key &aux (head (cons :head list)))
  (declare (type list list)
           (type function test key)
           (dynamic-extent head))
  (labels ((recur (list size)
             (declare (optimize speed)
                      (type cons list)
                      (type (and fixnum unsigned-byte) size))
             (if (= 1 size)
                 (values list (shiftf (cdr list) nil))
                 (let ((half (ash size -1)))
                   (multiple-value-bind (list1 rest)
                       (recur list half)
                     (multiple-value-bind (list2 rest)
                         (recur rest (- size half))
                       (values (merge-lists* head list1 list2 test key)
                               rest)))))))
    (when list
      (values (recur list (length list))))))

;; 実行
(stable-sort-list '(8 73 2 40 0 3) #'< #'identity)
=> (0 2 3 8 40 73)

何種類かデータを用意して実行時間を計測:

;;; 計測用データ
;; 1] 400万要素のソート済みリスト
(defparameter *sorted-list* (loop FOR i FROM 0 BELOW 4000000 COLLECT i))

;; 2] 400万要素の逆順ソート済みリスト
(defparameter *reverse-sorted-list* (reverse *sorted-list*))

;; 3] 400万要素のほぼソート済みリスト1  ※ 千要素に一つがランダムな値
(defparameter *nearly-sorted-list1* (loop FOR i FROM 0 BELOW 4000000
                                         COLLECT (if (zerop (random 1000))
                                                     (random 4000000)
                                                   i)))

;; 4] 400万要素のほぼソート済みリスト2  ※ 複数のソート済みリストが連結
(defparameter *nearly-sorted-list2* (loop REPEAT 4 APPEND (loop FOR i FROM 0 BELOW 1000000 COLLECT i)))

;; 5] 400万要素のランダムなリスト
(defparameter *random-list* (loop REPEAT 4000000 COLLECT (random most-positive-fixnum)))


;;; 計測用マクロ
(defmacro sort-time (sort-fn-name list)
  `(let ((list~ (copy-list ,list)))
     (declare (optimize (speed 3) (safety 0)))
     (time (progn (,sort-fn-name list~ #'< #'identity)
                  t))))


;;; 計測
;; 1] ソート済みリスト
(sort-time stable-sort-list *sorted-list*)
Evaluation took:
  0.254 seconds of real time  ; 0.254秒
  0.252017 seconds of total run time (0.248016 user, 0.004001 system)
  99.21% CPU
  508,247,464 processor cycles
  0 bytes consed
=> T

;; 2] 逆順ソート済みリスト
(sort-time stable-sort-list *reverse-sorted-list*)
Evaluation took:
  0.235 seconds of real time  ; 0.235秒
  0.232015 seconds of total run time (0.232015 user, 0.000000 system)
  98.72% CPU
  468,869,834 processor cycles
  0 bytes consed
=> T

;; 3] ほぼソート済みリスト1  ※ 千要素に一つがランダムな値
(sort-time stable-sort-list *nearly-sorted-list1*)
Evaluation took:
  0.348 seconds of real time  ; 0.348秒
  0.348023 seconds of total run time (0.344022 user, 0.004001 system)
  100.00% CPU
  694,968,622 processor cycles
  0 bytes consed
=> T

;; 4] ほぼソート済みリスト2  ※ 複数のソート済みリストが連結
(sort-time stable-sort-list *nearly-sorted-list2*)
Evaluation took:
  0.271 seconds of real time  ; 0.271秒
  0.272017 seconds of total run time (0.272017 user, 0.000000 system)
  100.37% CPU
  538,952,732 processor cycles
  0 bytes consed
=> T

;; 5] ランダムリスト
(sort-time stable-sort-list *random-list*)
Evaluation took:
  2.171 seconds of real time  ; 2.171秒
  2.168135 seconds of total run time (2.160135 user, 0.008000 system)
  99.86% CPU
  4,332,215,938 processor cycles
  0 bytes consed
=> T

ソート済みのリストに対する改良を加えたマージソート

変更後のマージソート関数: ※ 変更内容はコメントを参照

;; 改良版マージソート関数
;; - fast-merge-lists*関数が追加されたこと以外は、もともとの関数とほとんど同様
;; - fast-merge-lists*関数は要素の範囲が重複しない二つのリストをO(1)でマージ可能
(declaim (inline stable-sort-list2))
(defun stable-sort-list2 (list test key &aux (head (cons :head list)))
  (declare (type list list)
           (type function test key)
           (dynamic-extent head))
        
           ;; マージ対象の二つのリスト内の片方が、もう片方に完全に先行している場合は、
           ;; 各要素の比較などは省略して、末尾のcdrの更新のみを行う。
  (labels ((fast-merge-lists* (try-fast-merge? list1 tail1 list2 tail2 rest)
             (when try-fast-merge?
                      ;; list1がlist2に完全に先行: (list1 .. tail1) <= (list2 .. tail2)
               (cond ((not (funcall test (funcall key (car list2))
                                         (funcall key (car tail1))))
                      (setf (cdr tail1) list2)
                      (return-from fast-merge-lists* (values list1 tail2 rest)))

                      ;; list2がlist1に完全に先行: (list2 .. tail2) < (list1 .. tail1)
                     ((funcall test (funcall key (car tail2))
                                    (funcall key (car list1)))
                      (setf (cdr tail2) list1)
                      (return-from fast-merge-lists* (values list2 tail1 rest)))))
             
             ;; その他: 通常のマージ
             (values (merge-lists* head list1 list2 test key)
                     (if (null (cdr tail1))
                         tail1
                       tail2)
                     rest))
                  
            ;; トップダウンマージリスト関数: リストの末尾を管理するようになったのとfast-merge-lists*関数を使うようになったこと以外は変更なし            
            (recur (list size)
             (declare (optimize speed)
                      (type cons list)
                      (type (and fixnum unsigned-byte) size))
             (if (= 1 size)
                 (values list list (shiftf (cdr list) nil))
                 (let ((half (ash size -1)))
                   (multiple-value-bind (list1 tail1 rest)
                       (recur list half)
                     (multiple-value-bind (list2 tail2 rest)
                         (recur rest (- size half))
                       (fast-merge-lists* (>= size 8)  ; オーバヘッドを少なくするために、一定サイズ以上のリストに対してのみ適用を試みる
                                          list1 tail1 list2 tail2 rest)))))))
    (when list
      (values (recur list (length list))))))

;; 実行
(stable-sort-list2 '(8 73 2 40 0 3) #'< #'identity)
=> (0 2 3 8 40 73)

処理時間計測:

;; 1] ソート済みリスト
(sort-time stable-sort-list2 *sorted-list*)
Evaluation took:
  0.086 seconds of real time  ; 0.086秒  (変更前: 0.254秒)
  0.088005 seconds of total run time (0.088005 user, 0.000000 system)
  102.33% CPU
  171,845,432 processor cycles
  0 bytes consed
=> T

;; 2] 逆順ソート済みリスト
(sort-time stable-sort-list2 *reverse-sorted-list*)
Evaluation took:
  0.087 seconds of real time  ; 0.0.87秒  (変更前: 0.235秒)
  0.088006 seconds of total run time (0.088006 user, 0.000000 system)
  101.15% CPU
  173,196,084 processor cycles
  0 bytes consed
=> T

;; 3] ほぼソート済みリスト1  ※ 千要素に一つがランダムな値
(sort-time stable-sort-list2 *nearly-sorted-list1*)
Evaluation took:
  0.293 seconds of real time  ; 0.293秒  (変更前: 0.348秒)
  0.292019 seconds of total run time (0.292019 user, 0.000000 system)
  99.66% CPU
  585,393,530 processor cycles
  0 bytes consed
=> T

;; 4] ほぼソート済みリスト2  ※ 複数のソート済みリストが連結
(sort-time stable-sort-list2 *nearly-sorted-list2*)
Evaluation took:
  0.122 seconds of real time  ; 0.122秒  (変更前: 0.271秒)
  0.120007 seconds of total run time (0.116007 user, 0.004000 system)
  98.36% CPU
  242,403,024 processor cycles
  0 bytes consed
=> T

;; 5] ランダムリスト
(sort-time stable-sort-list2 *random-list*)
Evaluation took:
  2.193 seconds of real time  ; 2.193秒  (変更前: 2.171秒)
  2.192138 seconds of total run time (2.164136 user, 0.028002 system)
  99.95% CPU
  4,376,336,316 processor cycles
  0 bytes consed
=> T

完全にランダムなリストに対するソートは心なしか改良版の方が(ごく若干)遅くなっているように思うが、入力リストにソート済みの部分が多ければ多いほど、確実に改良版の方が速くなっている。
確かに、二つのリストをマージする場合、それぞれの領域が独立しているなら、片方の先頭要素ともう片方の末尾要素を比較するだけで、リスト全体を完全に順序づけ可能なんだけど、自分が実装方法を考えている時には、そのことに思い至らなかった。
なるほどなー。

*1:sbcl-1.0.58/src/code/sort.lisp より引用

Lock-Free Queue

compare-and-swap操作を用いたロックフリーなキューの実装。
SBCLでのみ動作*1

(defpackage lock-free-queue
  (:use :common-lisp)
  (:export queue
           make
           enq 
           deq
           empty-p 
           element-count       
           to-list))
(in-package :lock-free-queue)

;; compare-and-swap: 成功した場合はTを、失敗した場合はNILを返す
(defmacro compare-and-swap (place old new)
  `(eq (sb-ext:compare-and-swap ,place ,old ,new) ,old))

;; キュー構造体
(defstruct queue
  (head nil :type list) 
  (tail nil :type list))

;; リストへ変換/空判定/要素数取得
(defun to-list (que) (copy-seq (cdr (queue-head que))))
(defun empty-p (que) (endp (cdr (queue-head que))))
(defun element-count (que) (length (cdr (queue-head que))))

(defmethod print-object ((o queue) stream)
  (print-unreadable-object (o stream :type t)
    (format stream "~s ~s" :count (element-count o))))

;; キューを生成
(defun make (&optional initial-contents)
  (let ((contents (cons :initial-head initial-contents)))
    (make-queue :head contents
                :tail (last contents))))

;; キューの末尾に要素を追加する
;; => queue
(defun enq (x que)
  (loop WITH new-elem = (list x)
        FOR tail = (queue-tail que)
    DO
    (cond ((cdr tail)
           (compare-and-swap (queue-tail que) tail (cdr tail)))  ; tailの位置を調整
          ((compare-and-swap (cdr tail) nil new-elem)
           (return que)))))                                      ; 追加成功

;; キューの先頭から要素を取り出す
;; => (or (values 先頭要素 T)   ; キューに要素がある場合
;;        (values NIL NIL))     ; キューが空の場合
(defun deq (que)
  (let* ((head (queue-head que))
         (next (cdr head)))
    (cond ((null next)
           (values nil nil))       ; 空
          ((compare-and-swap (queue-head que) head next)
           (values (car next) t))  ; 取得成功
          (t
           (deq que)))))           ; 他スレッドと競合(リトライ)

実行例:

;; シングルスレッドでの例
(defparameter *que* (lock-free-queue:make))
=> *QUE*

(lock-free-queue:enq 1 *que*)
=> #<LOCK-FREE-QUEUE:QUEUE :COUNT 1>

(lock-free-queue:enq 2 *que*)
=> #<LOCK-FREE-QUEUE:QUEUE :COUNT 2>

(lock-free-queue:to-list *que*)
=> (1 2)

(lock-free-queue:deq *que*)
=> 1
   T

(lock-free-queue:deq *que*)
=> 2
   T

(lock-free-queue:deq *que*)
=> NIL
   NIL

;; マルチスレッドでの例
(let ((data (loop FOR i FROM 0 BELOW 10000 COLLECT i))
      (que (lock-free-queue:make))
      (thread-num 500))
  
  ;; enqueuers
  (loop REPEAT thread-num
        DO (sb-thread:make-thread 
            (lambda ()
              (dolist (e data)
                (lock-free-queue:enq e que)))))

  ;; dequeuer
  (list
   (length 
    (loop REPEAT (* thread-num (length data))
          COLLECT 
          (loop
           (multiple-value-bind (val ok?) (lock-free-queue:deq que)
             (when ok?
               (return val))))))
   que))
=> 5000000
   #<LOCK-FREE-QUEUE:QUEUE :COUNT 0>

*1:sb-ext:compare-and-swapを置き換えれば他の処理系でも動作可能

エラトステネスの篩

loop*1を使って、エラトステネスの篩を実装してみたメモ。
以下、処理系にはSBCLのver1.0.54(x86-64bit)を使用。

;; 引数nまでの範囲の素数のシーケンス(ジェネレータ)を作成する
(declaim (inline make-prime-sequence))
(defun make-prime-sequence (n)
  (let ((arr (make-array (1+ n) :element-type 'bit :initial-element 1)))
    (flet ((prime? (i) (= (bit arr i) 1))       
           (not-prime! (i) (setf (bit arr i) 0))) 
      (declare (inline prime? not-prime!))

      (loop:each (lambda (i)
                   (when (prime? i)
                     (loop:each #'not-prime! (loop:from (* i 2) :to n :by i))))
                 (loop:from 2 :to (floor (sqrt n))))
    
      (loop:filter #'prime? (loop:from 2 :to n)))))

;;; 実行例
;; 100以下の素数
(loop:collect (make-prime-sequence 100))
=> (2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61 67 71 73 79 83 89 97)

;; 1001から1010番目の素数
(loop:collect (loop:take 10 (loop:drop 1000 (make-prime-sequence 10000000))))
=> (7927 7933 7937 7949 7951 7963 7993 8009 8011 8017)

通常のループ(loopマクロ)を使った場合との速度比較。

;; 比較用に素数の合計値を求める関数を用意
(defun prime-sum1 (n)
  (declare (fixnum n)
           (optimize (speed 3) (safety 0) (debug 0)))
  (loop:sum #'identity (make-prime-sequence n)))

;; 一億以下の素数の合計値
(time (prime-sum1 100000000))
Evaluation took:
  1.675 seconds of real time  ; 1.675秒
  1.676105 seconds of total run time (1.676105 user, 0.000000 system)
  100.06% CPU
  3,342,591,038 processor cycles
  12,500,032 bytes consed
=> 279209790387276
;; loopマクロ版
(defun prime-sum2 (n)
  (declare (fixnum n)
           (optimize (speed 3) (safety 0) (debug 0)))
  (let ((arr (make-array (1+ n) :element-type 'bit :initial-element 1)))
    (flet ((prime? (i) (= (bit arr i) 1))
           (not-prime! (i) (setf (bit arr i) 0)))
      (declare (inline prime? not-prime!))

      (loop FOR i fixnum FROM 2 TO (floor (sqrt n))
            WHEN (prime? i)
        DO
        (loop FOR j fixnum FROM (* i 2) TO n BY i
          DO
          (not-prime! j)))

      (loop WITH sum OF-TYPE (unsigned-byte 64)
            FOR i fixnum FROM 2 TO n
            WHEN (prime? i)
        DO (incf sum i)
        FINALLY (return sum)))))

;; 一億以下の素数の合計値
(time (prime-sum2 100000000))
Evaluation took:
  1.476 seconds of real time  ; 1.476秒
  1.472092 seconds of total run time (1.468092 user, 0.004000 system)
  99.73% CPU
  2,944,592,020 processor cycles
  12,500,032 bytes consed
=> 279209790387276

簡易スタック型VM(JITコンパイラもどき)でのフィボナッチ数計算速度

前々々回でスタック型言語をバイトコードコンパイルする部分を、前々回でCommonLispアセンブラによるマシン語生成を、前回でそのアセンブラ上にスタック型言語のラップするところを扱った。
今回はそれらをまとめて、最初に作成したバイトコードインタプリタ(?)VMを、実行時にネイティブコードを生成するJIT(のようなもの)に置き換えて、実行速度を比較してみる。

バイトコード生成部

ここは前々回と全く同様なので省略。
以下にフィボナッチ数計算用のプログラムを再掲しておく。

(pvmc:compile-to-file
 "fib.bc"
 '(
   35    ; fib(35)
   (:addr fib-beg) :call ; fib(25)
   (:addr finish)  :jump
   
   fib-beg
   :dup 2  :less (:addr fib-end) :jump-if  ; if(n < 2) 
   :dup 2  :sub  (:addr fib-beg) :call     ; fib(n - 2)
   :swap 1 :sub  (:addr fib-beg) :call     ; fib(n - 1)
   :add
   fib-end
   :return
   
   finish))
#|
$ od -h fib.bc
0000000 2301 0000 0100 0011 0000 0113 003a 0000
0000020 0911 0201 0000 0800 3901 0000 1200 0109
0000040 0002 0000 0103 0011 0000 0b13 0101 0000
0000060 0300 1101 0000 1300 1402
|#

バイトコード実行(VM)部

前々回はこの部分をC++で作成したが、今回はCommonLispで実装する。
まずはバイトコード実行用の関数の定義。

;;; ファイル名: pvm-execute.lisp

;; アセンブラを読み込んでおく
(asdf:load-system :cl-asm)

;; パッケージ定義
(defpackage pvm-execute
  (:use :common-lisp :sb-alien)
  (:nicknames :pvme)
  (:export execute        ; バイトコードのファイルパスを受け取り実行結果を返す関数
           make-command)) ; バイトコード実行用のコマンドを生成する     
(in-package :pvm-execute)

;; 前回定義した@pushや@pop、その他の関数定義がここにくる
;; ... 省略 ...
;;

;; バイトコードのファイルパスを受け取り評価・実行する
(defun execute (filepath)
  (with-open-file (in filepath :element-type '(unsigned-byte 8))
    (cl-asm:execute (convert-to-executable (read-bytecodes in))
                    (function int))))

;; 入力ストリームからバイトコードを読み込み、cl-asmのニーモニック形式に変換する
(defun read-bytecodes (in)
  (loop FOR pos = (file-position in)
        FOR op = (read-op in)
        WHILE op
    COLLECT
    ;; 各バイトコードを(開始位置 ニーモニック)形式に変換する
    ;; 開始位置は、後にアドレス解決を行う際に使用される
    (list 
     pos
     (ecase op
       (1 `(@int ,(read-int in)))
       (2 '(@add))  ; @で始まる関数群は、前回定義したもの
       (3 '(@sub))
       (4 :mul (error "unsupported")) ; 未対応
       (5 :div (error "unsupported"))
       (6 :mod (error "unsupported"))
       (7 '(@eql))
       (8 '(@less))
       (9 '(@dup))
       (10 '(@drop))
       (11 '(@swap))
       (12 '(@over))
       (13 '(@rot))
       (14 :rpush (error "unsupported"))
       (15 :rpop (error "unsupported"))
       (16 :rcopy (error "unsupported"))
       (17 '(unresolve @jump))    ; アドレス解決が必要 (resolve-addrs関数内で行う)
       (18 '(unresolve @jump-if)) ; 同上
       (19 '(unresolve @call))    ; 同上
       (20 '(@return))))))

;; 読み込んだニーモニック(の中間形式)を、実行可能な(= cl-asm:executeに渡せる)に変換する
(defun convert-to-executable (mnemonics)
  (eval 
   `(body ,@(mapcar #'second (resolve-addrs mnemonics)) ; 本体
          (@pop %eax))))                                ; 結果を取り出して返す

;; 各種補助関数
(defun read-op (in)    ; バイト読み込み 
  (read-byte in nil nil))

(defun read-uint (in)  ; unsigned int読み込み
  (+ (ash (read-byte in) 00)
     (ash (read-byte in) 08)
     (ash (read-byte in) 16)
     (ash (read-byte in) 24)))

(defun read-int (in)   ; signed int読み込み
  (let ((n (read-uint in)))
    (if (< n #x80000000)
        n
      (- n #x100000000))))

(defun symb (&rest args)  ; シンボル生成: (symb "ABC" 1) => 'abc1
  (intern (format nil "~{~a~}" args)))

;; jump命令やcall命令が参照するアドレスをcl-asmが扱える形式に変換する
;; 
;; バイトコードでは遷移系の命令の直前に遷移先(絶対アドレス)が指定されているので、
;; mnemonics内の'((@int 10) (unresolve @call))のようになっている部分を '((@call &10)) のように置き換える。
;; ※ 変換時に生成したアドレス用のラベル(上の場合は'&10)は、最後にまとめてmnemonics内の適切な位置に挿入する。
(defun resolve-addrs (mnemonics)
  (labels ((recur (list acc addrs)
             (if (null list)
                 (values (nreverse acc) 
                         (remove-duplicates addrs))
               (let ((tag (first (second (car list)))))
                 (case tag
                   (unresolve 
                    (destructuring-bind ((_ (__ addr)) . acc2) acc
                      (declare (ignore _ __))
                      (let ((pos (first (car list)))
                            (op (second (second (car list)))))
                        (recur (cdr list) 
                               (cons `(,pos (,op ,(symb "&" addr))) acc2)
                               (cons addr addrs)))))
                   (otherwise
                    (recur (cdr list) (cons (car list) acc) addrs)))))))
    (multiple-value-bind (mnemonics refered-addrs)
                         (recur mnemonics '() '())
      (sort 
       (append mnemonics
               (loop FOR addr IN refered-addrs
                     COLLECT `(,(- addr 0.5) ,(symb "&" addr))))
       #'<
       :key #'first))))

resolve-addrs関数が若干複雑*1なことを除いては、バイトコードからのほぼ一対一の単純な変換となっている。

後は、前々回に合わせて実行部は通常のUnixコマンドとして使えるようにしておく。

;;; main関数作成用の補助関数
(eval-when (:compile-toplevel :load-toplevel :execute)
  ;; "/dir/file.ext" -> "file.ext"
  (defun basename (pathstring)
    (let ((path (parse-namestring pathstring)))
      (format nil "~A~@[.~A~]" (pathname-name path) (pathname-type path))))

  ;; '(a b c &optional c &key (d e)) -> '(a b c d)
  (defun collect-varsym (args)
    (mapcar (lambda (a)
	      (if (consp a) (car a) a))
	    (remove-if (lambda (a)
			 (and (symbolp a) (string= "&" a :end2 1)))
		       args))))

;;; main関数定義関数
(defmacro defmain (fn-name args &body body)
  (let ((usage nil))
    ;; If first expression of body is string type, it treated as command documentation
    (when (stringp (car body))
      (setf usage (car body)
	    body  (cdr body)))
    
    `(defun ,fn-name ()
       ;; Need to override *invoke-debugger-hook*
       (let ((sb-ext:*invoke-debugger-hook*
	      (lambda (condition hook)
		(declare (ignore hook))
		(format *error-output* "Error: ~A~%" condition)
		(sb-ext:quit :unix-status 1))))
         
	 ;; When failed arguments destructuring, show documentation and exit
	 ,(when usage
	    `(handler-case 
	      (destructuring-bind ,args (cdr sb-ext:*posix-argv*) 
	        (declare (ignore ,@(collect-varsym args))))
	      (error ()
	        (format *error-output* "~&~?~%~%" 
			,usage
			(list (basename (car sb-ext:*posix-argv*))))
		(sb-ext:quit :unix-status 1))))

         (destructuring-bind ,args (cdr sb-ext:*posix-argv*)
           ,@body
	   (sb-ext:quit :unix-status 0))))))

;;; main関数
;;; 引数で指定されたファイルパスに対してexecute関数を呼び出すだけ
(defmain main (bytecode-filepath)
  "Usage: ~a BYTECODE_FILEPTAH"
  (print (execute bytecode-filepath))
  (terpri))

;;; コマンド生成関数
(defun make-command (command-name)
  (sb-ext:save-lisp-and-die command-name :executable t :toplevel #'main))

コマンド生成&実行。

$ sbcl
> (load "pvm-execute.lisp")
> (pvme:make-command "pvm-jit")
[undoing binding stack and other enclosing state... done]
[saving current Lisp image into pvm-jit:
writing 6336 bytes from the read-only space at 0x20000000
writing 4000 bytes from the static space at 0x20100000
writing 46170112 bytes from the dynamic space at 0x1000000000
done]  ; pvm-jitコマンドが生成される

$ ./pvm-jit
Usage: pvm-jit BYTECODE_FILEPTAH

# フィボナッチ数計算
$ time ./pvm-jit fib.bc
9227465    # fib(35) = 9227465

real	0m0.169s
user	0m0.156s
sys	0m0.008s

# 前々回のコマンドの場合
$ time ./pvm fib.bc
[data stack]
 0# 9227465

[return stack]

real	0m3.636s
user	0m3.632s
sys	0m0.000s

比較

比較表に今回の結果を追記(pvm-jit)

言語 所要時間(最適化オプションなし) 所要時間(最適化オプションあり)
gcc-4.6.1 0.112s 0.056s
sbcl-1.0.54 0.320s 0.110s
pvm 3.600s
pvm-jit 0.156s
ruby1.9.1 2.816s
ruby1.8.7 14.497s
cl-asm 0.059s

不完全なアセンブラ及び最適化一切無しの単純な変換(バイトコード=>マシン語)という条件化でも、やはりインタプリタよりは桁違い(20倍程度)に速くなっている*2
データスタック操作周りで明らかに冗長な部分の最適化を簡単にでも行ったら、最適化オプション無しのgccになら結構すぐに追いつけるかもしれない。

*1:アドレス参照周りの仕様をなおざりにしすぎた・・・

*2:加えてVM部のソースコードも、インタプリタのものに比べて過度に複雑になっている、ということもない

CommonLispアセンブラ上にスタック型言語(っぽいもの)

前回のCommonLispアセンブラを使って、アセンブラ上に簡単なスタック型言語(っぽいもの)を組み立てて、それを使ってフィボナッチ数を計算するプログラムを書くと、どのような感じになるかを試してみた。
cl-asmはバージョンを更新して0.0.2を使用*1
0.0.1(前回)からの大きな変更点としては、ニモニック列をプログラムから操作しやすいように以下のような二つの機能を追加した。

;; 例示用のプログラム 
(cl-asm:execute
 '((:push %rbp) (:mov %rbp %rsp) (:push %rdi) (:push %rsi) (:push %rbx)  ; 関数呼び出し時の定形処理

   ;; 10 + 15
   (:mov %eax 10)
   (:mov %ebx 15)
   (:add %eax %ebx)

   (:pop %rbx) (:pop %rsi) (:pop %rdi) (:pop %rbp)  ; 関数から返る時の定形処理
   :ret)
  (function int))
--> 25

;;=======================================================================
;; 追加機能1: (:progn ...)
;;  - 複数のニモニックを一つにまとめることが可能
;;    => 追加機能2(eval)と合わせることでニモニック内に任意の関数・変数を埋め込むことが可能
(cl-asm:execute
 '((:progn
      (:push %rbp) (:mov %rbp %rsp) (:push %rdi) (:push %rsi) (:push %rbx)) ; 関数呼び出し時の定形処理

   ;; 10 + 15
   (:progn
    (:mov %eax 10)
    (:mov %ebx 15)
    (:add %eax %ebx))

   (:progn
      (:pop %rbx) (:pop %rsi) (:pop %rdi) (:pop %rbp))  ; 関数から返る時の定形処理
   :ret)
  (function int))
--> 25

;;=======================================================================
;; 追加機能2: eval
;;  - 以下の二つ以外がニモニック列に表れた場合はevalを適用
;;    a: 組み込みの命令(car部がキーワードのリスト)
;;    b: ラベル('&'で始まるシンボル)

;; 定形処理を関数にまとめる
(defun save-registers ()
  '(:progn (:push %rbp) (:mov %rbp %rsp) (:push %rdi) (:push %rsi) (:push %rbx)))

(defun restore-registers ()
  '(:progn (:pop %rbx) (:pop %rsi) (:pop %rdi) (:pop %rbp)))

;; 実行
(cl-asm:execute
  '((save-registers)  ; レジスタ退避
    
    ;; 10 + 15
    (:mov %eax 10)
    (:mov %ebx 15)
    (:add %eax %ebx)
    
    (restore-registers) ; レジスタ復元
    :ret)
  (function int))
--> 25

一応これで少しは、普通のlispプログラムっぽくアセンブリプログラムが書けるようになった。

スタック型言語

以下では、機能的に前々回とほぼ同等のスタック型言語(っぽいもの)アセンブラ上に作っていく。
まずはデータスタック周りの補助関数を用意。(リターンスタックにはx86の通常のスタックを使用)

;; データスタック用の領域をヒープに確保 & 解放
;; - スタックサイズは決め打ち
;; - スタックの先頭アドレスの保持にはRCXレジスタを使用
;;   (ちなみにRAX/RBXレジスタは、一時データ保持用に使用)

;; 確保
(defun ready-data-stack ()
   '(:progn (:push %rax) (:push %rdi)  ; レジスタ退避
            (:mov %edi 102400)         ; スタックサイズ
            (:mov %rax (:extern "malloc"))
            (:call %rax)               ; malloc(102400)
            (:mov %rcx %rax)           ; アドレスをRCXレジスタに保存
            (:pop %rdi) (:pop %rax)))  ; レジスタ復元

;; 解放
(defun destroy-data-stack ()
  '(:progn (:push %rax) (:push %rdi)  ; レジスタ退避
           (:mov %rdi %rcx)
           (:mov %rax (:extern "free"))
           (:call %rax)               ; free(RCX)
           (:pop %rdi) (:pop %rax)))  ; レジスタ復元

;; アセンブラ用関数(マクロ)定義マクロ
;; これを使えば引数のシンボルのクォートが不要となり、使用時に(xxx '%eax)ではなく(xxx %eax)のように書ける
(defmacro defop (name args &body body)
  `(defmacro ,name ,args
     (list 'quote (locally ,@body))))

;; データスタック用のアクセサ定義
(defop @ds-get (dst index) `(:mov ,dst (:refd %rcx ,(* index -4))))  ; getter
(defop @ds-set (index src) `(:mov (:refd %rcx ,(* index -4)) ,src))  ; setter
(defop @ds-inc (&optional (n 1)) `(:add %rcx ,(* 4 n)))  ; 先頭を進める
(defop @ds-dec (&optional (n 1)) `(:sub %rcx ,(* 4 n)))  ; 先頭を戻す

;; ついでに全ての定期処理をまとめて生成してくれるマクロを用意
(defmacro body (&rest mnemonics)
  `'(,(save-registers)      ; レジスタ退避
     ,(ready-data-stack)    ; データスタック用意
     ,@mnemonics         ; 本体処理
     ,(destroy-data-stack)  ; データスタック破棄
     ,(restore-registers)   ; レジスタ復元
     :ret))

(cl-asm:execute
  (body (:mov %eax 10))
  (function int))
--> 10

残りはひたすらスタック型言語用の命令(関数)を定義。

;; srcをスタックの先頭に追加
(defop @push (src) `(:progn (@ds-inc)
                            (@ds-set 0 ,src)))

;; スタックの先頭を取り出しdstに格納
(defop @pop (dst) `(:progn (@ds-get ,dst 0)
                           (@ds-dec)))

;; スタックの先頭から二つを要素を取り出し、dst1とdst2に格納
(defop @pop2 (dst1 dst2) `(:progn (@ds-get ,dst1 0)
                                  (@ds-get ,dst2 1)
                                  (@ds-dec 2)))

;; スタック[index1]とスタック[index2]の要素を交換
(defop @swap-impl (index1 index2) `(:progn (@ds-get %eax ,index1)
                                           (@ds-get %ebx ,index2)
                                           (@ds-set ,index1 %ebx)
                                           (@ds-set ,index2 %eax)))

;; スタックの先頭二つの要素を交換
(defop @swap () '(@swap-impl 0 1))

;; スタックの先頭要素を複製
(defop @dup () `(:progn (@ds-get %eax 0)
                        (@push %eax)))

;; スタックの先頭要素の破棄
(defop @drop () '(@ds-dec))

;; スタックの二番目の要素を先頭に複製
(defop @over () `(:progn (@ds-get %eax 1)
                         (@push %eax)))

;; スタックの先頭三つの要素をローテーション
(defop @rot () `(:progn (@swap-impl 2 0)
                        (@swap-impl 1 2)))

;; スタックの先頭二つを使った加算
(defop @add () `(:progn (@pop2 %ebx %eax)
                        (:add %eax %ebx)
                        (@push %eax)))

;; スタックの先頭二つを使った減算
(defop @sub () `(:progn (@pop2 %ebx %eax)
                        (:sub %eax %ebx)
                        (@push %eax)))

;; スタックの先頭二つの要素が等しいか (真なら非ゼロがスタックトップに格納)
(defop @eql ()  `(:progn (@pop2 %ebx %eax)
                         (:cmp %eax %ebx)
                         (:mov %eax 0)
                         (:sete %al)
                         (@push %eax)))
 
;; スタックの先頭要素が二番目の要素よりも小さいか (真なら非ゼロがスタックトップに格納)
(defop @less () `(:progn (@pop2 %ebx %eax)
                         (:cmp %eax %ebx)
                         (:mov %eax 0)
                         (:setl %al)
                         (@push %eax)))

;; スタックの先頭が真(非ゼロ)なら、指定位置に遷移
(defop @jump-if (pos) `(:progn (@pop %eax)
                               (:cmp %eax 0)
                               (:jne ,pos)))

;; 指定位置に遷移
(defop @jump (pos) `(:jmp ,pos))

;; 関数呼び出し
(defop @call (pos) `(:call ,pos))

;; 関数からの復帰
(defop @return ()  :ret)

;; int値を生成してスタックトップに積む
(defop @int (n)  `(@push ,n))

実行例。

(cl-asm:execute
 (body
   (@push 10)
   (@push 15)
   (@add)
   (@pop %eax))  ; 結果取り出し
 (function int))
--> 25 

フィボナッチ数

フィボナッチ数計算プログラム。
薄いラップの割にはスタック型言語っぽい見た目になっているように思う。

(cl-asm:execute
 (body
  (@push %edi) ; 引数取得
  (@call &fib-beg)  ; (fib 10)
  (@jump &finish)

  &fib-beg
  (@dup) (@int 2) (@less) (@jump-if &fib-end) ; (if (< arg 2) ... ....)
  (@dup) (@int 2) (@sub) (@call &fib-beg)     ; a = (fib (- arg 2))
  (@swap) (@int 1) (@sub) (@call &fib-beg)    ; b = (fib (- arg 1))
  (@add)                                      ; (+ a b)
  &fib-end
  (@return)

  &finish
  (@pop %eax))

 (function int int) 10)
--> 55

*1:いずれにせよ、まだまだ実用に堪えるものにはかなりほど遠いけど

マインスイーパー

端末上で動作するマインスイーパーをCommonLisp(SBCL)で実装してみた。
github: cl-mine-0.0.2

端末操作

端末操作部分のソースコードは以下のような感じ。
基本的には端末のエスケープシーケンスで(カーソル移動や画面クリア、文字色等の)制御を行っている。
ただ、キー入力をリアルタイムで取得可能にするのはエスケープシーケンスでは無理そうだったので、その部分はtcsetattr等のシステムコール(?)を使用している。

(defpackage console
  (:use :common-lisp :sb-alien)
  (:shadow :common-lisp format)
  (:export with-raw-mode clear move set-pos
           format newline formatln style))
(in-package :console)

;;; types ;;;
(deftype direction () '(member :up :down :left :right))  ; カーソル移動の方向
(deftype color () '(member :black :red :green :yellow :blue :magenta :cyan :white :normal))  ; 文字色、背景色

;;; constants ;;;
(defparameter +ESC+ (common-lisp:format nil "~c[" (code-char #8r33)))  ; エスケープシーケンスの開始文字列
(defparameter +STDIN_FD+ (sb-sys:fd-stream-fd sb-sys:*stdin*))  ; 標準入力のファイルディスクリプタ

;;; internal functions ;;;
;; 文字色のコード値を取得
(defun color-code (color)
  (declare (color color))
  (ecase color 
    (:black   30)
    (:red     31)
    (:green   32)
    (:yellow  33)
    (:blue    34)
    (:magenta 35)
    (:cyan    36)
    (:white   37)
    (:normal  39)))

;; cfmakeraw関数(キー入力リアルタイム取得用)はsb-posixパッケージに存在しないようなので読み込む
(define-alien-routine ("cfmakeraw" %cfmakeraw) void (termios* (* t)))
(defun cfmakeraw ()
  (let ((termios (sb-posix::allocate-alien-termios)))

    (%cfmakeraw termios)
    (unwind-protect
        (sb-posix::alien-to-termios termios)
      (sb-posix::free-alien-termios termios))))

;;; exported functions ;;;
;; 標準のformat関数の薄いラッパー
(defmacro format (control-string &rest format-arguments)
  `(progn (common-lisp:format t ,control-string ,@format-arguments)
          (force-output)))

;; 改行付きのformat関数
(defmacro formatln (control-string &rest format-arguments)
  `(progn (format ,control-string ,@format-arguments)
          (newline)))

;; 改行: tcsetattr関数にcfmakerawの戻り値を渡した場合(rawモード?)、改行には #\Newlineと#\Return の両方が必要
(defun newline ()
  (format "~c~c" #\Newline #\Return))

;; 文字色、背景色、太字、下線、文字色背景色反転、等を指定した文字列を返す
(defun style (x &key (color :normal) (bgcolor :normal) bold inverse underline)
  (declare (color color bgcolor))
  (common-lisp:format nil "~a~{~d;~}~d;~dm~a~a0m"
    +ESC+
    (remove nil (list (and bold 1) (and underline 4) (and inverse 7)))
    (color-code color)
    (+ (color-code bgcolor) 10)
    x
    +ESC+))

;; 上下左右へのカーソル移動
(defun move (direction &optional (delta 1))
  (declare (direction direction))
  (when (plusp delta)
    (format "~a~d~a" +ESC+ delta
            (ecase direction
              (:up    "A")
              (:down  "B")
              (:left  "D")
              (:right "C")))))

;; 画面クリア。lineがtの場合はカーソル行のみをクリア。
(defun clear (&key line)
  (if line
      (format "~a2K" +ESC+)
    (format "~a2J" +ESC+)))

;; 任意の位置へのカーソル移動
(defun set-pos (x y)
  (format "~a~d;~dH" +ESC+ y x))

;; 端末をrawモード(?)に切り替えてbodyを評価する
(defmacro with-raw-mode (&body body)
  (let ((old (gensym)))
    `(locally
      (declare (sb-ext:muffle-conditions sb-ext:compiler-note))
      (let ((,old (sb-posix:tcgetattr +STDIN_FD+)))
        (unwind-protect
            (locally 
             (declare (sb-ext:unmuffle-conditions sb-ext:compiler-note))
             (sb-posix:tcsetattr +STDIN_FD+ sb-posix:tcsadrain (cfmakeraw))
             ,@body)
          (sb-posix:tcsetattr +STDIN_FD+ sb-posix:tcsanow ,old))))))

例えば、端末内をカーソル移動できるようにする場合は、以下のようなコードとなる。

;; 'e': ↑
;; 'd': ↓
;; 's': ←
;; 'f': →
;; 'c': exit
(console:with-raw-mode
 (loop
  (case (read-char)
    (#\e (console:move :up))
    (#\d (console:move :down))
    (#\s (console:move :left))
    (#\f (console:move :right))
    (#\c (return)))))

簡単なUIならこれで十分かもしれない。