簡易スタック型VM(JITコンパイラもどき)でのフィボナッチ数計算速度

前々々回でスタック型言語をバイトコードコンパイルする部分を、前々回でCommonLispアセンブラによるマシン語生成を、前回でそのアセンブラ上にスタック型言語のラップするところを扱った。
今回はそれらをまとめて、最初に作成したバイトコードインタプリタ(?)VMを、実行時にネイティブコードを生成するJIT(のようなもの)に置き換えて、実行速度を比較してみる。

バイトコード生成部

ここは前々回と全く同様なので省略。
以下にフィボナッチ数計算用のプログラムを再掲しておく。

(pvmc:compile-to-file
 "fib.bc"
 '(
   35    ; fib(35)
   (:addr fib-beg) :call ; fib(25)
   (:addr finish)  :jump
   
   fib-beg
   :dup 2  :less (:addr fib-end) :jump-if  ; if(n < 2) 
   :dup 2  :sub  (:addr fib-beg) :call     ; fib(n - 2)
   :swap 1 :sub  (:addr fib-beg) :call     ; fib(n - 1)
   :add
   fib-end
   :return
   
   finish))
#|
$ od -h fib.bc
0000000 2301 0000 0100 0011 0000 0113 003a 0000
0000020 0911 0201 0000 0800 3901 0000 1200 0109
0000040 0002 0000 0103 0011 0000 0b13 0101 0000
0000060 0300 1101 0000 1300 1402
|#

バイトコード実行(VM)部

前々回はこの部分をC++で作成したが、今回はCommonLispで実装する。
まずはバイトコード実行用の関数の定義。

;;; ファイル名: pvm-execute.lisp

;; アセンブラを読み込んでおく
(asdf:load-system :cl-asm)

;; パッケージ定義
(defpackage pvm-execute
  (:use :common-lisp :sb-alien)
  (:nicknames :pvme)
  (:export execute        ; バイトコードのファイルパスを受け取り実行結果を返す関数
           make-command)) ; バイトコード実行用のコマンドを生成する     
(in-package :pvm-execute)

;; 前回定義した@pushや@pop、その他の関数定義がここにくる
;; ... 省略 ...
;;

;; バイトコードのファイルパスを受け取り評価・実行する
(defun execute (filepath)
  (with-open-file (in filepath :element-type '(unsigned-byte 8))
    (cl-asm:execute (convert-to-executable (read-bytecodes in))
                    (function int))))

;; 入力ストリームからバイトコードを読み込み、cl-asmのニーモニック形式に変換する
(defun read-bytecodes (in)
  (loop FOR pos = (file-position in)
        FOR op = (read-op in)
        WHILE op
    COLLECT
    ;; 各バイトコードを(開始位置 ニーモニック)形式に変換する
    ;; 開始位置は、後にアドレス解決を行う際に使用される
    (list 
     pos
     (ecase op
       (1 `(@int ,(read-int in)))
       (2 '(@add))  ; @で始まる関数群は、前回定義したもの
       (3 '(@sub))
       (4 :mul (error "unsupported")) ; 未対応
       (5 :div (error "unsupported"))
       (6 :mod (error "unsupported"))
       (7 '(@eql))
       (8 '(@less))
       (9 '(@dup))
       (10 '(@drop))
       (11 '(@swap))
       (12 '(@over))
       (13 '(@rot))
       (14 :rpush (error "unsupported"))
       (15 :rpop (error "unsupported"))
       (16 :rcopy (error "unsupported"))
       (17 '(unresolve @jump))    ; アドレス解決が必要 (resolve-addrs関数内で行う)
       (18 '(unresolve @jump-if)) ; 同上
       (19 '(unresolve @call))    ; 同上
       (20 '(@return))))))

;; 読み込んだニーモニック(の中間形式)を、実行可能な(= cl-asm:executeに渡せる)に変換する
(defun convert-to-executable (mnemonics)
  (eval 
   `(body ,@(mapcar #'second (resolve-addrs mnemonics)) ; 本体
          (@pop %eax))))                                ; 結果を取り出して返す

;; 各種補助関数
(defun read-op (in)    ; バイト読み込み 
  (read-byte in nil nil))

(defun read-uint (in)  ; unsigned int読み込み
  (+ (ash (read-byte in) 00)
     (ash (read-byte in) 08)
     (ash (read-byte in) 16)
     (ash (read-byte in) 24)))

(defun read-int (in)   ; signed int読み込み
  (let ((n (read-uint in)))
    (if (< n #x80000000)
        n
      (- n #x100000000))))

(defun symb (&rest args)  ; シンボル生成: (symb "ABC" 1) => 'abc1
  (intern (format nil "~{~a~}" args)))

;; jump命令やcall命令が参照するアドレスをcl-asmが扱える形式に変換する
;; 
;; バイトコードでは遷移系の命令の直前に遷移先(絶対アドレス)が指定されているので、
;; mnemonics内の'((@int 10) (unresolve @call))のようになっている部分を '((@call &10)) のように置き換える。
;; ※ 変換時に生成したアドレス用のラベル(上の場合は'&10)は、最後にまとめてmnemonics内の適切な位置に挿入する。
(defun resolve-addrs (mnemonics)
  (labels ((recur (list acc addrs)
             (if (null list)
                 (values (nreverse acc) 
                         (remove-duplicates addrs))
               (let ((tag (first (second (car list)))))
                 (case tag
                   (unresolve 
                    (destructuring-bind ((_ (__ addr)) . acc2) acc
                      (declare (ignore _ __))
                      (let ((pos (first (car list)))
                            (op (second (second (car list)))))
                        (recur (cdr list) 
                               (cons `(,pos (,op ,(symb "&" addr))) acc2)
                               (cons addr addrs)))))
                   (otherwise
                    (recur (cdr list) (cons (car list) acc) addrs)))))))
    (multiple-value-bind (mnemonics refered-addrs)
                         (recur mnemonics '() '())
      (sort 
       (append mnemonics
               (loop FOR addr IN refered-addrs
                     COLLECT `(,(- addr 0.5) ,(symb "&" addr))))
       #'<
       :key #'first))))

resolve-addrs関数が若干複雑*1なことを除いては、バイトコードからのほぼ一対一の単純な変換となっている。

後は、前々回に合わせて実行部は通常のUnixコマンドとして使えるようにしておく。

;;; main関数作成用の補助関数
(eval-when (:compile-toplevel :load-toplevel :execute)
  ;; "/dir/file.ext" -> "file.ext"
  (defun basename (pathstring)
    (let ((path (parse-namestring pathstring)))
      (format nil "~A~@[.~A~]" (pathname-name path) (pathname-type path))))

  ;; '(a b c &optional c &key (d e)) -> '(a b c d)
  (defun collect-varsym (args)
    (mapcar (lambda (a)
	      (if (consp a) (car a) a))
	    (remove-if (lambda (a)
			 (and (symbolp a) (string= "&" a :end2 1)))
		       args))))

;;; main関数定義関数
(defmacro defmain (fn-name args &body body)
  (let ((usage nil))
    ;; If first expression of body is string type, it treated as command documentation
    (when (stringp (car body))
      (setf usage (car body)
	    body  (cdr body)))
    
    `(defun ,fn-name ()
       ;; Need to override *invoke-debugger-hook*
       (let ((sb-ext:*invoke-debugger-hook*
	      (lambda (condition hook)
		(declare (ignore hook))
		(format *error-output* "Error: ~A~%" condition)
		(sb-ext:quit :unix-status 1))))
         
	 ;; When failed arguments destructuring, show documentation and exit
	 ,(when usage
	    `(handler-case 
	      (destructuring-bind ,args (cdr sb-ext:*posix-argv*) 
	        (declare (ignore ,@(collect-varsym args))))
	      (error ()
	        (format *error-output* "~&~?~%~%" 
			,usage
			(list (basename (car sb-ext:*posix-argv*))))
		(sb-ext:quit :unix-status 1))))

         (destructuring-bind ,args (cdr sb-ext:*posix-argv*)
           ,@body
	   (sb-ext:quit :unix-status 0))))))

;;; main関数
;;; 引数で指定されたファイルパスに対してexecute関数を呼び出すだけ
(defmain main (bytecode-filepath)
  "Usage: ~a BYTECODE_FILEPTAH"
  (print (execute bytecode-filepath))
  (terpri))

;;; コマンド生成関数
(defun make-command (command-name)
  (sb-ext:save-lisp-and-die command-name :executable t :toplevel #'main))

コマンド生成&実行。

$ sbcl
> (load "pvm-execute.lisp")
> (pvme:make-command "pvm-jit")
[undoing binding stack and other enclosing state... done]
[saving current Lisp image into pvm-jit:
writing 6336 bytes from the read-only space at 0x20000000
writing 4000 bytes from the static space at 0x20100000
writing 46170112 bytes from the dynamic space at 0x1000000000
done]  ; pvm-jitコマンドが生成される

$ ./pvm-jit
Usage: pvm-jit BYTECODE_FILEPTAH

# フィボナッチ数計算
$ time ./pvm-jit fib.bc
9227465    # fib(35) = 9227465

real	0m0.169s
user	0m0.156s
sys	0m0.008s

# 前々回のコマンドの場合
$ time ./pvm fib.bc
[data stack]
 0# 9227465

[return stack]

real	0m3.636s
user	0m3.632s
sys	0m0.000s

比較

比較表に今回の結果を追記(pvm-jit)

言語 所要時間(最適化オプションなし) 所要時間(最適化オプションあり)
gcc-4.6.1 0.112s 0.056s
sbcl-1.0.54 0.320s 0.110s
pvm 3.600s
pvm-jit 0.156s
ruby1.9.1 2.816s
ruby1.8.7 14.497s
cl-asm 0.059s

不完全なアセンブラ及び最適化一切無しの単純な変換(バイトコード=>マシン語)という条件化でも、やはりインタプリタよりは桁違い(20倍程度)に速くなっている*2
データスタック操作周りで明らかに冗長な部分の最適化を簡単にでも行ったら、最適化オプション無しのgccになら結構すぐに追いつけるかもしれない。

*1:アドレス参照周りの仕様をなおざりにしすぎた・・・

*2:加えてVM部のソースコードも、インタプリタのものに比べて過度に複雑になっている、ということもない

CommonLispアセンブラ上にスタック型言語(っぽいもの)

前回のCommonLispアセンブラを使って、アセンブラ上に簡単なスタック型言語(っぽいもの)を組み立てて、それを使ってフィボナッチ数を計算するプログラムを書くと、どのような感じになるかを試してみた。
cl-asmはバージョンを更新して0.0.2を使用*1
0.0.1(前回)からの大きな変更点としては、ニモニック列をプログラムから操作しやすいように以下のような二つの機能を追加した。

;; 例示用のプログラム 
(cl-asm:execute
 '((:push %rbp) (:mov %rbp %rsp) (:push %rdi) (:push %rsi) (:push %rbx)  ; 関数呼び出し時の定形処理

   ;; 10 + 15
   (:mov %eax 10)
   (:mov %ebx 15)
   (:add %eax %ebx)

   (:pop %rbx) (:pop %rsi) (:pop %rdi) (:pop %rbp)  ; 関数から返る時の定形処理
   :ret)
  (function int))
--> 25

;;=======================================================================
;; 追加機能1: (:progn ...)
;;  - 複数のニモニックを一つにまとめることが可能
;;    => 追加機能2(eval)と合わせることでニモニック内に任意の関数・変数を埋め込むことが可能
(cl-asm:execute
 '((:progn
      (:push %rbp) (:mov %rbp %rsp) (:push %rdi) (:push %rsi) (:push %rbx)) ; 関数呼び出し時の定形処理

   ;; 10 + 15
   (:progn
    (:mov %eax 10)
    (:mov %ebx 15)
    (:add %eax %ebx))

   (:progn
      (:pop %rbx) (:pop %rsi) (:pop %rdi) (:pop %rbp))  ; 関数から返る時の定形処理
   :ret)
  (function int))
--> 25

;;=======================================================================
;; 追加機能2: eval
;;  - 以下の二つ以外がニモニック列に表れた場合はevalを適用
;;    a: 組み込みの命令(car部がキーワードのリスト)
;;    b: ラベル('&'で始まるシンボル)

;; 定形処理を関数にまとめる
(defun save-registers ()
  '(:progn (:push %rbp) (:mov %rbp %rsp) (:push %rdi) (:push %rsi) (:push %rbx)))

(defun restore-registers ()
  '(:progn (:pop %rbx) (:pop %rsi) (:pop %rdi) (:pop %rbp)))

;; 実行
(cl-asm:execute
  '((save-registers)  ; レジスタ退避
    
    ;; 10 + 15
    (:mov %eax 10)
    (:mov %ebx 15)
    (:add %eax %ebx)
    
    (restore-registers) ; レジスタ復元
    :ret)
  (function int))
--> 25

一応これで少しは、普通のlispプログラムっぽくアセンブリプログラムが書けるようになった。

スタック型言語

以下では、機能的に前々回とほぼ同等のスタック型言語(っぽいもの)アセンブラ上に作っていく。
まずはデータスタック周りの補助関数を用意。(リターンスタックにはx86の通常のスタックを使用)

;; データスタック用の領域をヒープに確保 & 解放
;; - スタックサイズは決め打ち
;; - スタックの先頭アドレスの保持にはRCXレジスタを使用
;;   (ちなみにRAX/RBXレジスタは、一時データ保持用に使用)

;; 確保
(defun ready-data-stack ()
   '(:progn (:push %rax) (:push %rdi)  ; レジスタ退避
            (:mov %edi 102400)         ; スタックサイズ
            (:mov %rax (:extern "malloc"))
            (:call %rax)               ; malloc(102400)
            (:mov %rcx %rax)           ; アドレスをRCXレジスタに保存
            (:pop %rdi) (:pop %rax)))  ; レジスタ復元

;; 解放
(defun destroy-data-stack ()
  '(:progn (:push %rax) (:push %rdi)  ; レジスタ退避
           (:mov %rdi %rcx)
           (:mov %rax (:extern "free"))
           (:call %rax)               ; free(RCX)
           (:pop %rdi) (:pop %rax)))  ; レジスタ復元

;; アセンブラ用関数(マクロ)定義マクロ
;; これを使えば引数のシンボルのクォートが不要となり、使用時に(xxx '%eax)ではなく(xxx %eax)のように書ける
(defmacro defop (name args &body body)
  `(defmacro ,name ,args
     (list 'quote (locally ,@body))))

;; データスタック用のアクセサ定義
(defop @ds-get (dst index) `(:mov ,dst (:refd %rcx ,(* index -4))))  ; getter
(defop @ds-set (index src) `(:mov (:refd %rcx ,(* index -4)) ,src))  ; setter
(defop @ds-inc (&optional (n 1)) `(:add %rcx ,(* 4 n)))  ; 先頭を進める
(defop @ds-dec (&optional (n 1)) `(:sub %rcx ,(* 4 n)))  ; 先頭を戻す

;; ついでに全ての定期処理をまとめて生成してくれるマクロを用意
(defmacro body (&rest mnemonics)
  `'(,(save-registers)      ; レジスタ退避
     ,(ready-data-stack)    ; データスタック用意
     ,@mnemonics         ; 本体処理
     ,(destroy-data-stack)  ; データスタック破棄
     ,(restore-registers)   ; レジスタ復元
     :ret))

(cl-asm:execute
  (body (:mov %eax 10))
  (function int))
--> 10

残りはひたすらスタック型言語用の命令(関数)を定義。

;; srcをスタックの先頭に追加
(defop @push (src) `(:progn (@ds-inc)
                            (@ds-set 0 ,src)))

;; スタックの先頭を取り出しdstに格納
(defop @pop (dst) `(:progn (@ds-get ,dst 0)
                           (@ds-dec)))

;; スタックの先頭から二つを要素を取り出し、dst1とdst2に格納
(defop @pop2 (dst1 dst2) `(:progn (@ds-get ,dst1 0)
                                  (@ds-get ,dst2 1)
                                  (@ds-dec 2)))

;; スタック[index1]とスタック[index2]の要素を交換
(defop @swap-impl (index1 index2) `(:progn (@ds-get %eax ,index1)
                                           (@ds-get %ebx ,index2)
                                           (@ds-set ,index1 %ebx)
                                           (@ds-set ,index2 %eax)))

;; スタックの先頭二つの要素を交換
(defop @swap () '(@swap-impl 0 1))

;; スタックの先頭要素を複製
(defop @dup () `(:progn (@ds-get %eax 0)
                        (@push %eax)))

;; スタックの先頭要素の破棄
(defop @drop () '(@ds-dec))

;; スタックの二番目の要素を先頭に複製
(defop @over () `(:progn (@ds-get %eax 1)
                         (@push %eax)))

;; スタックの先頭三つの要素をローテーション
(defop @rot () `(:progn (@swap-impl 2 0)
                        (@swap-impl 1 2)))

;; スタックの先頭二つを使った加算
(defop @add () `(:progn (@pop2 %ebx %eax)
                        (:add %eax %ebx)
                        (@push %eax)))

;; スタックの先頭二つを使った減算
(defop @sub () `(:progn (@pop2 %ebx %eax)
                        (:sub %eax %ebx)
                        (@push %eax)))

;; スタックの先頭二つの要素が等しいか (真なら非ゼロがスタックトップに格納)
(defop @eql ()  `(:progn (@pop2 %ebx %eax)
                         (:cmp %eax %ebx)
                         (:mov %eax 0)
                         (:sete %al)
                         (@push %eax)))
 
;; スタックの先頭要素が二番目の要素よりも小さいか (真なら非ゼロがスタックトップに格納)
(defop @less () `(:progn (@pop2 %ebx %eax)
                         (:cmp %eax %ebx)
                         (:mov %eax 0)
                         (:setl %al)
                         (@push %eax)))

;; スタックの先頭が真(非ゼロ)なら、指定位置に遷移
(defop @jump-if (pos) `(:progn (@pop %eax)
                               (:cmp %eax 0)
                               (:jne ,pos)))

;; 指定位置に遷移
(defop @jump (pos) `(:jmp ,pos))

;; 関数呼び出し
(defop @call (pos) `(:call ,pos))

;; 関数からの復帰
(defop @return ()  :ret)

;; int値を生成してスタックトップに積む
(defop @int (n)  `(@push ,n))

実行例。

(cl-asm:execute
 (body
   (@push 10)
   (@push 15)
   (@add)
   (@pop %eax))  ; 結果取り出し
 (function int))
--> 25 

フィボナッチ数

フィボナッチ数計算プログラム。
薄いラップの割にはスタック型言語っぽい見た目になっているように思う。

(cl-asm:execute
 (body
  (@push %edi) ; 引数取得
  (@call &fib-beg)  ; (fib 10)
  (@jump &finish)

  &fib-beg
  (@dup) (@int 2) (@less) (@jump-if &fib-end) ; (if (< arg 2) ... ....)
  (@dup) (@int 2) (@sub) (@call &fib-beg)     ; a = (fib (- arg 2))
  (@swap) (@int 1) (@sub) (@call &fib-beg)    ; b = (fib (- arg 1))
  (@add)                                      ; (+ a b)
  &fib-end
  (@return)

  &finish
  (@pop %eax))

 (function int int) 10)
--> 55

*1:いずれにせよ、まだまだ実用に堪えるものにはかなりほど遠いけど

アセンブリ言語でフィボナッチ数

前回は、C++で単純なVMを書いて、その上でのフィボナッチ数の計算時間を測定した。
そのVM部分をネイティブコードに置き換えたら、どの程度処理速度が改善するのかを測ってみたかったので、その前にまずネイティブコード(x86)の勉強も兼ねて、common lispアセンブラを書くことにした。
現状はまだまだ未完成で、以下のような制限があるが、一応フィボナッチ数が計算できるくらいまでには出来たので、その計算時間を参考までに残しておく。
制限:

  • 使用可能な命令は mov/ret/push/pop/add/sub/inc/dec/cmp/jmp/jcc/call のみ
  • 64bitのみ対応
  • エラーチェックとか不十分
  • SBCLのみで動作

github: cl-asm-0.0.1

コード

フィボナッチ数計算用のコード。

(use-package :sb-alien)

;; Fibonacci用のアセンブリコード
(defparameter *fib*
 '((:push %rbp) (:mov %rbp %rsp) (:push %rdi) (:push %rsi) (:push %rbx)  ; 関数呼び出し時の定形処理

   (:mov %eax %edi)  ; 引数取得
   (:call &fib-beg) 
   
   (:pop %rbx) (:pop %rsi) (:pop %rdi) (:pop %rbp)  ; 関数から返る時の定形処理
   :ret
  
  &fib-beg
  (:cmp %eax 2)      ; arg < 2
  (:jl &fib-end)
  
  (:push %rax)
  (:sub %eax 2)
  (:call &fib-beg)   ; x = (fib (- arg 2))
  (:pop %rbx)

  (:push %rax)
  (:mov %eax %ebx)
  (:dec %eax)
  (:call &fib-beg)   ; y = (fib (- arg 1))
  (:pop %rbx)
  
  (:add %eax %ebx)   ; (+ x y)
  &fib-end
  :ret))
--> *FIB*

;; 生成されるマシン語
(cl-asm:assemble *fib*)
--> (85 72 137 229 87 86 83 137 248 232 5 0 0 0 91 94 95 93 195 131 248 2 124 23 80
     131 232 2 232 242 255 255 255 91 80 137 216 255 200 232 231 255 255 255 91 1
     216 195)

;; 実行
(time
 (cl-asm:execute
   *fib*
  (function int int)  ; 関数の型
  35)                 ; 引数:  (fib 35)
Evaluation took:
  0.059 seconds of real time
  0.060003 seconds of total run time (0.060003 user, 0.000000 system)
  101.69% CPU
  117,246,804 processor cycles
  32,624 bytes consed
--> 9227465

比較

前回の他言語での測定結果に、上での計測結果を追加したもの。

言語 所要時間(最適化オプションなし) 所要時間(最適化オプションあり)
gcc-4.6.1 0.112s 0.056s
sbcl-1.0.54 0.320s 0.110s
pvm 3.600s
ruby1.9.1 2.816s
ruby1.8.7 14.497s
cl-asm 0.059s

やっぱりマシン語直出力は速い。最適化されたGCCよりは遅いけど。

簡易スタック型VM(バイトコードインタプリタ)でのフィボナッチ数計算速度

今年はlisp系のプログラミング言語(及びその処理系)を作ってみようと考えていて、かつ(少なくとも)当面の間はスタック型VMを基盤として実装していくことになると思われるので、まずは単純なスタックマシンのバイトコードインタプリタで、どの程度の処理速度がでるのかを計測してみた。

命令一覧と実行サンプル

現状のVMが備える命令一覧*1。必要最小限。
下記、命令セットに関してはForthを少し参考にしている。スタックマシンの動作の詳細に関しては、特に特殊な点もないので説明は割愛。

命令 コード値 in-stack out-stack 意味
int 1 n バイトコード中の後続の四バイト(little-endian)を取り出し、int値を生成
add 2 n1 n2 n3 n1 + n2
sub 3 n1 n2 n3 n1 - n2
mul 4 n1 n2 n3 n1 * n2
div 5 n1 n2 n3 n1 / n2
mod 6 n1 n2 n3 n1 % n2
eql 7 n1 n2 b(1 or 0) n1 == n2
less 8 n1 n2 b n1 < n2
dup 9 x x x スタックの先頭要素を複製
drop 10 x スタックの先頭要素を破棄
swap 11 x1 x2 x2 x1 スタックの先頭二つの要素を入れ替え
over 12 x1 x2 x1 x2 x1 スタックの二番目の要素を先頭に複製
rot 13 x1 x2 x3 x2 x3 x1 スタックの先頭三つの要素をローテーション
rpush 14 x スタック(データスタック)の先頭要素をリターンスタックの先頭に移す
rpop 15 x リターンスタックの先頭要素をスタックに移す
rcopy 16 x リターンスタックの先頭要素をスタックに複製
jump 17 n 無条件分岐。nは分岐先のアドレス
jump_if 18 b n 条件分岐。bが新(非ゼロ)なら分岐する
call 19 n 関数呼び出し。リターンスタックにプログラムカウンタを保存後、無条件分岐
return 20 関数からの復帰。リターンスタックからプログラムカウンタを取り出し、そこへ無条件分岐

末尾にソースコード全体を載せるが、バイトコードインタプリタの実行部は、バイトコードから上記命令に対応するコード値を取得し、命令を実行する、ということをひたすら繰り返すという単純なもの。

  // C++
  typedef unsigned char octet;

  /**
   * バイトコード実行用のクラス
   */
  class executor {
  public:
    void execute(const char* filepath) {
      bytecode_stream in(filepath);
      
      // バイトコードストリームの終端に達するまでループ
      while(in.eos() == false) {
        octet opcode = in.read_octet();  // 命令コード読み出し
        op::call(opcode, in, env);       // コードに対応する処理を実行 (envにはデータスタックとリターンスタックが保持されている)
      }
    }
  };

  class op {
  public:
    // コードに対応する命令を実行
    static void call(octet opcode, bytecode_stream& in, environment& env) {
      switch(opcode) {
      case  1: op_int(in, env); break; // int値構築
      case  2: op_add(in, env); break; // +
      case  3: op_sub(in, env); break; // -
      case  4: op_mul(in, env); break; // *
      case  5: op_div(in, env); break; // /
      case  6: op_mod(in, env); break; // %
      case  7: op_eql(in, env); break; // ==
      ... 省略 ...
        
      default:
        assert(false);
      }
    }
  }

VM部はC++で記述しているが、VMが解釈可能なバイトコード列を生成するためのアセンブラ(コンパイラ)はcommon lispで作成。

;; common lisp
;; 実行例
(load "pvm-compile")

;; 加算を行うバイトコード列を'add.bc'ファイルに出力する
;;  - キーワードは命令を表す
(pvmc:compile-to-file
 "add.bc"
 '(10 20 :add))  ; 10 + 20

;; 条件分岐を行うバイトコード列を'jump.bc'ファイルに出力する
;;
;; シンボルはアドレス参照用のラベルを表す
;; (:addr シンボル)形式で参照可能
;; ※ アドレスはコンパイル時に解決される
(pvmc:compile-to-file
 "jump.bc"
 '(10 10 :eql            ; n1 == n2 ?
   (:addr then) :jump-if ; 等しいなら then に移動
   else
   1 2     ; else: スタックに 1と2 を積む
   (:addr end) :jump
   then 
   3 4    ; then: スタックに 3と4 を積む
   end))

;; 上の例では以下のようなバイト列が生成される
(pvmc::compile-to-bytecodes
 '(10 10 :eql (:addr then) :jump-if else 1 2 (:addr end) :jump then 3 4 end))
 => #(1 10 0 0 0 1 10 0 0 0 7 1 33 0 0 0 18 1 1 0 0 0 1 2 0 0 0 1 43 0 0 0 17 1 3 0
      0 0 1 4 0 0 0)

生成したバイトコードはpvmコマンドで実行可能。

# pvmコマンド作成
$ g++ -O2 -o pvm pvm.cc

# add.bc
$ ./pvm add.bc
[data stack]    # 実行後のデータスタックとリターンスタックの中身が出力される
 0# 30   # 10 + 20

[return stack]

# jump.bc
$ ./pvm jump.bc
[data stack]
 0# 4     # then部が実行された
 1# 3

[return stack]

実行速度

上のVM上でのフィボナッチ数の計算に要した時間。
以下は35のフィボナッチ数計算用のコード。

(pvmc:compile-to-file
 "fib.bc"
 '(
   35    ; fib(35)
   (:addr fib-beg) :call ; fib(25)
   (:addr finish)  :jump
   
   fib-beg
   :dup 2  :less (:addr fib-end) :jump-if  ; if(n < 2) 
   :dup 2  :sub  (:addr fib-beg) :call     ; fib(n - 2)
   :swap 1 :sub  (:addr fib-beg) :call     ; fib(n - 1)
   :add
   fib-end
   :return
   
   finish))

#| 実行結果:
$ time ./pvm fib.bc 
[data stack]
 0# 9227465

[return stack]


real	0m3.605s
user	0m3.600s
sys	0m0.000s
|#

他言語との比較。

言語 所要時間(最適化オプションなし) 所要時間(最適化オプションあり)
gcc-4.6.1 0.112s 0.056s
sbcl-1.0.54 0.320s 0.110s
pvm 3.600s
ruby1.9.1 2.816s
ruby1.8.7 14.497s

現状は本当に単純なインタプリタなので仕方がないとはいえ、Ruby(1.9)よりも遅い・・・。

ちなみに各言語用のソースコードは以下の通り。

// C++
// ファイル名: fib.cc
// コンパイル: g++ -O2 -o fib fib.cc
// 実行: time fib 35
#include <iostream>
#include <cstdlib>

int fib(int n) {
  if(n < 2) {
    return n;
  }
  return fib(n-2) + fib(n-1);
}

int main(int argc, char** argv) {
  std::cout << fib(atoi(argv[1])) << std::endl;
  return 0;
}
;; sbcl
(defun fib (n)
  (declare (optimize (speed 3) (safety 0) (debug 0))
           (fixnum n))
  (if (< n 2)
      n
    (the fixnum (+ (fib (- n 2)) (fib (- n 1))))))

;; 実行
(time (fib 35))
# ruby
# ファイル名: fib.rb
# 実行: time fib.rb 35
def fib (n)
  return n if n < 2
  fib(n-2) + fib(n-1)
end

p fib(ARGV[0].to_i)

ソースコード

VM及びコンパイラ用のソースコード
それぞれ180行、80行程度。

// ファイル名: pvm.hh
/**
 * バイトコードインタプリタ
 */
#ifndef PVM_HH
#define PVM_HH

#include <iostream>
#include <fstream>
#include <cassert>
#include <vector>
#include <algorithm>

namespace pvm {
  typedef unsigned char octet;
  typedef std::vector<int> stack_t;

  
  /**
   * バイトコード読み込みストリーム
   */
  class bytecode_stream {
  public:
    bytecode_stream(const char* filepath) : bytecodes(NULL), position(0) {
      std::ifstream in(filepath);
      assert(in);

      length = in.rdbuf()->in_avail();
      bytecodes = new octet[length];
      in.read((char*)bytecodes, length);
    }
    
    ~bytecode_stream() { delete [] bytecodes; }
    
    bool eos() const { return position >= length; }
    
    octet read_octet () { return bytecodes[position++]; }

    // sizeof(int) == 4 と仮定
    int read_int() {
      int n = *(int*)(bytecodes + position);
      position += 4;
      return n;
    }

    // program counter
    unsigned pc() const { return position; }
    unsigned& pc() { return position; }
    
  private:
    octet* bytecodes;
    unsigned length;
    unsigned position;
  };


  /**
   * データスタックとリターンスタック
   */
  class environment {
  public:
    stack_t& dstack() { return data_stack; }
    stack_t& rstack() { return return_stack; }

    const stack_t& dstack() const { return data_stack; }
    const stack_t& rstack() const { return return_stack; }

  private:
    stack_t data_stack;
    stack_t return_stack;
  };


  /**
   * 各種操作(命令)
   */
  class op {
  public:
    static void call(octet opcode, bytecode_stream& in, environment& env) {
      switch(opcode) {
      case  1: op_int(in, env); break; // int値構築
      case  2: op_add(in, env); break; // +
      case  3: op_sub(in, env); break; // -
      case  4: op_mul(in, env); break; // *
      case  5: op_div(in, env); break; // /
      case  6: op_mod(in, env); break; // %
      case  7: op_eql(in, env); break; // ==
      case  8: op_less(in, env); break;// <

      case  9: op_dup(in, env); break;  // データスタックの先頭要素を複製
      case 10: op_drop(in, env); break; // データスタックの先頭要素を破棄
      case 11: op_swap(in, env); break; // データスタックの最初の二つの要素を入れ替え
      case 12: op_over(in, env); break; // データスタックの二番目の要素を先頭にコピーする
      case 13: op_rot(in, env); break;  // データスタックの先頭三つの要素をローテーションする
        
      case 14: op_rpush(in, env); break; // データスタックの先頭要素を取り出しリターンスタックに追加する
      case 15: op_rpop(in, env); break;  // リターンスタックの先頭要素を取り出しデータスタックに追加する
      case 16: op_rcopy(in, env); break; // リターンスタックの先頭要素をデータすタックに追加する

      case 17: op_jump(in, env); break;    // 無条件分岐
      case 18: op_jump_if(in, env); break; // 条件分岐
      case 19: op_call(in, env); break;    // 関数呼び出し
      case 20: op_return(in, env); break;  // 関数から復帰
        
      default:
        assert(false);
      }
    }

  private:
    typedef bytecode_stream bcs;
    typedef environment env;
    
#define DPUSH(x) e.dstack().push_back(x)
#define DPOP pop_back(e.dstack())
#define DNTH(nth) e.dstack()[e.dstack().size()-1-nth]

#define RPUSH(x) e.rstack().push_back(x)
#define RPOP pop_back(e.rstack())
#define RNTH(nth) e.rstack()[e.rstack().size()-1-nth]

    static void op_int(bcs& in, env& e) { DPUSH(in.read_int()); }
    static void op_add(bcs& in, env& e) { DPUSH(DPOP + DPOP); }
    static void op_sub(bcs& in, env& e) { int n = DPOP; DPUSH(DPOP - n); }
    static void op_mul(bcs& in, env& e) { DPUSH(DPOP * DPOP); }
    static void op_div(bcs& in, env& e) { int n = DPOP; DPUSH(DPOP / n); }
    static void op_mod(bcs& in, env& e) { int n = DPOP; DPUSH(DPOP % n); }
    static void op_eql(bcs& in, env& e) { DPUSH(DPOP == DPOP); }
    static void op_less(bcs& in, env& e) { DPUSH(DPOP > DPOP); }

    static void op_dup(bcs& in, env& e) { DPUSH(DNTH(0)); }
    static void op_drop(bcs& in, env& e) { DPOP; }
    static void op_swap(bcs& in, env& e) { std::swap(DNTH(0), DNTH(1)); }
    static void op_over(bcs& in, env& e) { DPUSH(DNTH(1)); }
    static void op_rot(bcs& in, env& e) { std::swap(DNTH(2), DNTH(0)); std::swap(DNTH(1), DNTH(2)); }

    static void op_rpush(bcs& in, env& e) { RPUSH(DPOP); }
    static void op_rpop(bcs& in, env& e) { DPUSH(RPOP); }
    static void op_rcopy(bcs& in, env& e) { DPUSH(RNTH(0)); }

    static void op_jump(bcs& in, env& e) { in.pc() = DPOP;}
    static void op_jump_if(bcs& in, env& e) { int p = DPOP; if(DPOP){ in.pc() = p;} }
    static void op_call(bcs& in, env& e) { RPUSH(in.pc()); in.pc() = DPOP; }
    static void op_return(bcs& in, env& e) { in.pc() = RPOP; }

#undef DPUSH
#undef DPOP
#undef DNTH

#undef RPUSH
#undef RPOP
#undef RNTH

  private:
    static int pop_back(stack_t& stack) {
      int x = stack.back();
      stack.pop_back();
      return x;
    }
  };


  /**
   * バイトコード実行
   */
  class executor {
  public:
    void execute(const char* filepath) {
      bytecode_stream in(filepath);
      
      while(in.eos() == false) {
        octet opcode = in.read_octet();
        op::call(opcode, in, env);
      }
    }
    
    const environment& get_env() const { return env; }

  private:
    environment env;
  };
}

#endif
// ファイル名: pvm.cc
// バイトコード実行用コマンド
#include "pvm.hh"
#include <iostream>

void show(const char* name, const pvm::stack_t& stack) {
  std::cout << "[" << name << "]" << std::endl;
  for(int i = stack.size()-1; i >= 0; i--) {
    std::cout << " " << (stack.size()-1-i) << "# " << stack[i] << std::endl;
  }
  std::cout << std::endl;  
}

int main(int argc, char** argv) {
  if(argc != 2) {
    std::cerr << "Usage: pvm BYTECODE_FILEPATH" << std::endl;
    return 1;
  }
  
  pvm::executor vm;
  vm.execute(argv[1]);

  const pvm::environment& rlt = vm.get_env();
  show("data stack", rlt.dstack());
  show("return stack", rlt.rstack());

  return 0;
}
;;; ファイル名: pvm-compile.lisp
;;; S式をVM用のバイトコードにコンパイル(アセンブル)する
(defpackage pvm-compile
  (:use :common-lisp)
  (:nicknames :pvmc)
  (:export compile-to-file))
(in-package :pvm-compile)

;; 利用可能な操作(命令)のリスト
(defparameter *op-list*
  '((1 :int)
    (2 :add)
    (3 :sub)
    (4 :mul)
    (5 :div)
    (6 :mod)
    (7 :eql)
    (8 :less)

    (9 :dup)
    (10 :drop)
    (11 :swap)
    (12 :over)
    (13 :rot)

    (14 :rpush)
    (15 :rpop)
    (16 :rcopy)
    
    (17 :jump)
    (18 :jump-if)
    (19 :call)
    (20 :return)))

;; 数値をリトルエンディアンのバイト列に変換する
;; n -> '(b1 b2 b3 b4)
(defun int-to-bytes (n)
  (loop FOR i FROM 0 BELOW 4
        COLLECT (ldb (byte 8 (* i 8)) n)))

;; 操作名に対するコード値を取得する
(defun opcode (op)
  (assert #1=(find op *op-list* :key #'second))
  (first #1#))

;; コンパイル
(defun compile-to-bytecodes (codes)
  (loop WITH unresolves = '()  ; 未解決のアドレス参照
        WITH labels = '()      ; ラベルとアドレスのマッピング
        FOR code IN codes
        FOR pos = (length tmps)
    APPEND
    (etypecase code
      (integer `(,(opcode :int) ,@(int-to-bytes code))) ; 整数値構築
      (keyword (list (opcode code)))                    ; 一般的な操作
      (symbol (push `(,code ,pos) labels)               ; アドレス(PC)参照用のラベル
              '())
      (cons (ecase (first code)                         ; アドレス参照
              (:addr (push `(,(second code) ,(1+ pos)) unresolves)
                     `(,(opcode :int) 0 0 0 0))))) ; この時点では実際のアドレスが不明なので 0 を設定しておく
    INTO tmps
    FINALLY
    (let ((bytecodes (coerce tmps 'vector)))
      ;; アドレス解決
      (loop FOR (label offset) IN unresolves
            FOR label-addr = (second (assoc label labels))
        DO
        (setf (subseq bytecodes offset (+ offset 4)) (int-to-bytes label-addr)))

      (return bytecodes))))

;; コンパイルして結果をファイルに出力する
(defun compile-to-file (filepath assembly-codes)
  (let ((bytecodes (compile-to-bytecodes assembly-codes)))
    (with-open-file (out filepath :direction :output
                                  :if-exists :supersede
                                  :element-type '(unsigned-byte 8))
      (write-sequence bytecodes out)))
  t)

*1:大別すると整数処理系、データスタック操作系、リターンスタック操作系、分岐系の四つ

fletとlabels

CommonLispのfletとlabels的なものを(あらかじめ使えるものはlambdaしかない状況で)自分で実装する必要が出てきたので、その際のメモ。
なお、以下では煩雑になるためfuncall呼び出しの記述を省略している(実際にはScheme処理系で動作確認を行っていた)

let

flet、labelsの前にまずはlambdaを使ったletの実現方法を考える。

;; 以下のように変換可能
(let ((a 10)
      (b 20)
      (c 30))
  (list a b c))((lambda (a b c)
   (list a b c))
 10 20 30))
; => (10 20 30)

flet

上のようにしてletが使えると仮定した場合、fletと同じ機能を実現するのは簡単。

;; 単に変数に(lambda ...)を束縛すれば良い
(let ((hello (lambda (x)
               (list 'hello x))))
  (hello 'world))
; => (HELLO WORLD)

labels

これがlabels(ローカル関数の再帰呼び出しが可能)になると少し難しくなる。
以下、fletと同様の方法で試した場合。

;; フィボナッチ数を計算
(let ((fib (lambda (n)
             (if (< n 2)
                 n
               ;; fibの再帰呼び出し
               (+ (fib (- n 2)) (fib (- n 1)))))))
  (fib 10))
; => ERROR: 再帰呼び出し部分でfib関数が見つからないと云われる

fib変数に束縛したlambdaの本体のコンテキストからは、fib変数が見つからないため、エラーとなる。
再帰呼び出しに使用するfib関数を明示的に引数を渡すようにすれば、問題は解決する。

(let ((fib (lambda (n fib) ; 第二引数に常にfib関数を渡すようにする
             (if (< n 2)
                 n
               (+ (fib (- n 2) fib) (fib (- n 1) fib))))))
  (fib 10 fib))
; => 55

ただ、上の方法の引数の形が変わってしまうのが難点。
若干複雑にはなるが、以下のようにクロージャを使って再帰関数を持ち回すようにすれば、引数及び本体の形はほぼ変わらないので、マクロなどで生成するのは楽になる(ように思う)

(let ((fib-rec (lambda (fib-rec) ; 再帰関数を引数に渡す 
                 (lambda (n)
                   (let ((fib (fib-rec fib-rec))) ; 再帰関数の情報を埋め込んだfib関数を返す
                     (if (< n 2)
                         n
                       (+ (fib (- n 2)) (fib (- n 1)))))))))
  (let ((fib (fib-rec fib-rec))) ; 再帰関数の情報を埋め込んだfib関数を返す
    (fib 10)))
; => 55

これでローカル関数の再帰呼び出しはできるようになったのでは、マクロを使ってシンタックスを整えれば、labelsが実現できることになる。

  • -

以前に、何でCommonLispにはfletとlabelsの二つがあるのか、とかOcamlでletとlet recが分かれているか、とか少し疑問に思ったことがあったように思うけど、少し理由が分かった気がする。

マインスイーパー

端末上で動作するマインスイーパーをCommonLisp(SBCL)で実装してみた。
github: cl-mine-0.0.2

端末操作

端末操作部分のソースコードは以下のような感じ。
基本的には端末のエスケープシーケンスで(カーソル移動や画面クリア、文字色等の)制御を行っている。
ただ、キー入力をリアルタイムで取得可能にするのはエスケープシーケンスでは無理そうだったので、その部分はtcsetattr等のシステムコール(?)を使用している。

(defpackage console
  (:use :common-lisp :sb-alien)
  (:shadow :common-lisp format)
  (:export with-raw-mode clear move set-pos
           format newline formatln style))
(in-package :console)

;;; types ;;;
(deftype direction () '(member :up :down :left :right))  ; カーソル移動の方向
(deftype color () '(member :black :red :green :yellow :blue :magenta :cyan :white :normal))  ; 文字色、背景色

;;; constants ;;;
(defparameter +ESC+ (common-lisp:format nil "~c[" (code-char #8r33)))  ; エスケープシーケンスの開始文字列
(defparameter +STDIN_FD+ (sb-sys:fd-stream-fd sb-sys:*stdin*))  ; 標準入力のファイルディスクリプタ

;;; internal functions ;;;
;; 文字色のコード値を取得
(defun color-code (color)
  (declare (color color))
  (ecase color 
    (:black   30)
    (:red     31)
    (:green   32)
    (:yellow  33)
    (:blue    34)
    (:magenta 35)
    (:cyan    36)
    (:white   37)
    (:normal  39)))

;; cfmakeraw関数(キー入力リアルタイム取得用)はsb-posixパッケージに存在しないようなので読み込む
(define-alien-routine ("cfmakeraw" %cfmakeraw) void (termios* (* t)))
(defun cfmakeraw ()
  (let ((termios (sb-posix::allocate-alien-termios)))

    (%cfmakeraw termios)
    (unwind-protect
        (sb-posix::alien-to-termios termios)
      (sb-posix::free-alien-termios termios))))

;;; exported functions ;;;
;; 標準のformat関数の薄いラッパー
(defmacro format (control-string &rest format-arguments)
  `(progn (common-lisp:format t ,control-string ,@format-arguments)
          (force-output)))

;; 改行付きのformat関数
(defmacro formatln (control-string &rest format-arguments)
  `(progn (format ,control-string ,@format-arguments)
          (newline)))

;; 改行: tcsetattr関数にcfmakerawの戻り値を渡した場合(rawモード?)、改行には #\Newlineと#\Return の両方が必要
(defun newline ()
  (format "~c~c" #\Newline #\Return))

;; 文字色、背景色、太字、下線、文字色背景色反転、等を指定した文字列を返す
(defun style (x &key (color :normal) (bgcolor :normal) bold inverse underline)
  (declare (color color bgcolor))
  (common-lisp:format nil "~a~{~d;~}~d;~dm~a~a0m"
    +ESC+
    (remove nil (list (and bold 1) (and underline 4) (and inverse 7)))
    (color-code color)
    (+ (color-code bgcolor) 10)
    x
    +ESC+))

;; 上下左右へのカーソル移動
(defun move (direction &optional (delta 1))
  (declare (direction direction))
  (when (plusp delta)
    (format "~a~d~a" +ESC+ delta
            (ecase direction
              (:up    "A")
              (:down  "B")
              (:left  "D")
              (:right "C")))))

;; 画面クリア。lineがtの場合はカーソル行のみをクリア。
(defun clear (&key line)
  (if line
      (format "~a2K" +ESC+)
    (format "~a2J" +ESC+)))

;; 任意の位置へのカーソル移動
(defun set-pos (x y)
  (format "~a~d;~dH" +ESC+ y x))

;; 端末をrawモード(?)に切り替えてbodyを評価する
(defmacro with-raw-mode (&body body)
  (let ((old (gensym)))
    `(locally
      (declare (sb-ext:muffle-conditions sb-ext:compiler-note))
      (let ((,old (sb-posix:tcgetattr +STDIN_FD+)))
        (unwind-protect
            (locally 
             (declare (sb-ext:unmuffle-conditions sb-ext:compiler-note))
             (sb-posix:tcsetattr +STDIN_FD+ sb-posix:tcsadrain (cfmakeraw))
             ,@body)
          (sb-posix:tcsetattr +STDIN_FD+ sb-posix:tcsanow ,old))))))

例えば、端末内をカーソル移動できるようにする場合は、以下のようなコードとなる。

;; 'e': ↑
;; 'd': ↓
;; 's': ←
;; 'f': →
;; 'c': exit
(console:with-raw-mode
 (loop
  (case (read-char)
    (#\e (console:move :up))
    (#\d (console:move :down))
    (#\s (console:move :left))
    (#\f (console:move :right))
    (#\c (return)))))

簡単なUIならこれで十分かもしれない。

N-Queen: 高速化

こちらの記事に刺激を受けて、以前に実装したN-Queenを高速化してみた(Common Lisp版のみ)

(defvar *fastest* '(optimize (speed 3) (safety 0) (debug 0) (compilation-speed 0)))
(deftype max-board-size () '(mod #x100))

(declaim (inline check))  ; inline宣言を追加
(defun check (row queens &optional (r 1) &aux (q (car queens)))
  (declare #.*fastest*
           (max-board-size r row q))
  (or (null queens) 
      (and (/= q (+ row r) (- row r))
	   (check row (cdr queens) (1+ r)))))

;; dolistの亜種
;; - リストの走査時に各要素を変数に束縛するのと同時に、走査中の要素を除いたリストも変数に束縛する
;;   ※ 先頭要素は走査対象外
(defmacro dolist2 ((x but-x list) &body body)
  (multiple-value-bind (recur prev cur next) (values #1=(gensym) #1# #1# #1#)
    `(let ((,but-x ,list))
       (labels ((,recur (,prev &aux (,cur (cdr ,prev)))
                  (when ,cur
                    (destructuring-bind (,x . ,next) ,cur
                      (setf (cdr ,prev) ,next)
                      (locally ,@body)
                      (setf (cdr ,prev) ,cur)
                      (,recur ,cur)))))
         (,recur ,but-x)))))
#|
ex:
> (dolist2 (x but-x '(:head 1 2 3 a b c))
    (print `(:x ,x :but-x ,but-x)))
(:X 1 :BUT-X (:HEAD 2 3 A B C)) 
(:X 2 :BUT-X (:HEAD 1 3 A B C)) 
(:X 3 :BUT-X (:HEAD 1 2 A B C)) 
(:X A :BUT-X (:HEAD 1 2 3 B C)) 
(:X B :BUT-X (:HEAD 1 2 3 A C)) 
(:X C :BUT-X (:HEAD 1 2 3 A B)) 
--> NIL
|#

(defun n-queen (n)                     
  (declare #.*fastest*
           (max-board-size n))
  (nlet-acc self (queens (rows (cons :head (loop FOR i FROM 0 BELOW n COLLECT i))))
    (if (null (cdr rows))   ; rows == '(:head) 
        (accumulate queens)
      (dolist2 (row rest-rows rows)
        (when (check row queens)
          (self (cons row queens) rest-rows))))))

処理時間

  処理時間(サイズ=11) 処理時間(サイズ=12) 処理時間(サイズ=13)
nqueen(Commonlisp:本記事) 0.025秒 0.126秒 0.722秒
nqueen(CommonLisp:前回) 0.061秒 0.336秒 2.043秒
nqueen(Haskell:前回) 0.076秒 0.420秒 2.524秒
nqueen(Haskell:tsumuji) 0.040秒 0.220秒 1.244秒

結構速くなった。
コードも複雑になったけど。