Lock-Free Queue

compare-and-swap操作を用いたロックフリーなキューの実装。
SBCLでのみ動作*1

(defpackage lock-free-queue
  (:use :common-lisp)
  (:export queue
           make
           enq 
           deq
           empty-p 
           element-count       
           to-list))
(in-package :lock-free-queue)

;; compare-and-swap: 成功した場合はTを、失敗した場合はNILを返す
(defmacro compare-and-swap (place old new)
  `(eq (sb-ext:compare-and-swap ,place ,old ,new) ,old))

;; キュー構造体
(defstruct queue
  (head nil :type list) 
  (tail nil :type list))

;; リストへ変換/空判定/要素数取得
(defun to-list (que) (copy-seq (cdr (queue-head que))))
(defun empty-p (que) (endp (cdr (queue-head que))))
(defun element-count (que) (length (cdr (queue-head que))))

(defmethod print-object ((o queue) stream)
  (print-unreadable-object (o stream :type t)
    (format stream "~s ~s" :count (element-count o))))

;; キューを生成
(defun make (&optional initial-contents)
  (let ((contents (cons :initial-head initial-contents)))
    (make-queue :head contents
                :tail (last contents))))

;; キューの末尾に要素を追加する
;; => queue
(defun enq (x que)
  (loop WITH new-elem = (list x)
        FOR tail = (queue-tail que)
    DO
    (cond ((cdr tail)
           (compare-and-swap (queue-tail que) tail (cdr tail)))  ; tailの位置を調整
          ((compare-and-swap (cdr tail) nil new-elem)
           (return que)))))                                      ; 追加成功

;; キューの先頭から要素を取り出す
;; => (or (values 先頭要素 T)   ; キューに要素がある場合
;;        (values NIL NIL))     ; キューが空の場合
(defun deq (que)
  (let* ((head (queue-head que))
         (next (cdr head)))
    (cond ((null next)
           (values nil nil))       ; 空
          ((compare-and-swap (queue-head que) head next)
           (values (car next) t))  ; 取得成功
          (t
           (deq que)))))           ; 他スレッドと競合(リトライ)

実行例:

;; シングルスレッドでの例
(defparameter *que* (lock-free-queue:make))
=> *QUE*

(lock-free-queue:enq 1 *que*)
=> #<LOCK-FREE-QUEUE:QUEUE :COUNT 1>

(lock-free-queue:enq 2 *que*)
=> #<LOCK-FREE-QUEUE:QUEUE :COUNT 2>

(lock-free-queue:to-list *que*)
=> (1 2)

(lock-free-queue:deq *que*)
=> 1
   T

(lock-free-queue:deq *que*)
=> 2
   T

(lock-free-queue:deq *que*)
=> NIL
   NIL

;; マルチスレッドでの例
(let ((data (loop FOR i FROM 0 BELOW 10000 COLLECT i))
      (que (lock-free-queue:make))
      (thread-num 500))
  
  ;; enqueuers
  (loop REPEAT thread-num
        DO (sb-thread:make-thread 
            (lambda ()
              (dolist (e data)
                (lock-free-queue:enq e que)))))

  ;; dequeuer
  (list
   (length 
    (loop REPEAT (* thread-num (length data))
          COLLECT 
          (loop
           (multiple-value-bind (val ok?) (lock-free-queue:deq que)
             (when ok?
               (return val))))))
   que))
=> 5000000
   #<LOCK-FREE-QUEUE:QUEUE :COUNT 0>

*1:sb-ext:compare-and-swapを置き換えれば他の処理系でも動作可能

複数プロセスで共有しているmutexのロック中にSIGKILLを投げたらどうなるか

結論: デッドロックになってしまう


自動的にロックを解放してくれたりはしないみたい。
以下、試した内容のメモ書き。

環境

$ cat /proc/version
Linux version 3.0.0-23-generic (buildd@komainu) (gcc version 4.6.1 (Ubuntu/Linaro 4.6.1-9ubuntu3) ) #38-Ubuntu SMP Fri Jul 6 14:43:30 UTC 2012

テスト用ソースコード

共有mutexに対して、ロック => スリープ(10秒) => アンロック、を行う子プロセスを四個作成するプログラム。
かなりテキトウ。

/**
 * フィル名: mutex-text.cc
 * コンパイル: g++ -o mutex-test mutex-test.cc
 */
#include <pthread.h>
#include <iostream>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/shm.h>
#include <assert.h>

// 共有メモリの管理クラス
class mem {
public:
  mem(int size) : ptr_(NULL) {
    int shmid = shmget(IPC_PRIVATE, sizeof(pthread_mutex_t), 0600);
    if(shmid == -1) { return; }
    
    ptr_ = shmat(shmid, NULL, 0);
    if(ptr_ == reinterpret_cast<void*>(-1)) {
      ptr_ = NULL;
    }
  }

  ~mem() {
    if(ptr_ != NULL) {
      shmdt(ptr_);
    }
  }

  operator bool() const { return ptr_ != NULL; }

  template <typename T>
  T* ptr() { return reinterpret_cast<T*>(ptr_); }
  
private:
  void* ptr_;
};

// 複数プロセスで共有可能なミューテックスクラス
class mutex_lock {
public:
  mutex_lock() : m_(sizeof(pthread_mutex_t)), valid_(false) {
    if(! m_) { return; }

    // プロセス間で共有可能にするためにPTHREAD_PROCESS_SHAREDを付与する
    pthread_mutexattr_t mattr;
    if(pthread_mutexattr_init(&mattr) != 0) { return; }
    if(pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED) != 0) { return; }
    
    // 共有領域のmutexオブジェクトを初期化
    if(pthread_mutex_init(m_.ptr<pthread_mutex_t>(), &mattr) != 0) { return; }
    
    pthread_mutexattr_destroy(&mattr);
    
    valid_ = true;
  }
  
  ~mutex_lock() {
    if(valid_) {
      pthread_mutex_destroy(m_.ptr<pthread_mutex_t>());
    }
  }
  operator bool () const { return valid_; } 

  // ロック
  void lock() {
    assert(pthread_mutex_lock(m_.ptr<pthread_mutex_t>()) == 0);
  }

  // アンロック
  void unlock() {
    assert(pthread_mutex_unlock(m_.ptr<pthread_mutex_t>()) == 0);
  }

private:
  mem m_;
  bool valid_;
};

// main関数
int main(int argc, char** argv) {
  mutex_lock mutex;
  if(! mutex) {
    return 1;
  }
  
  pid_t parent = getpid();
  for(int i=0; i < 4; i++) {
    // 子プロセスのforkと lock => sleep => unlock 処理
    // 子プロセスは四個作成
    if(fork() == 0) {
      std::cout << "[" << getpid() << "] before lock" << std::endl;
      mutex.lock();

      std::cout << "[" << getpid() << "] in lock" << std::endl;
      sleep(10);  // 適当な時間sleep

      mutex.unlock();
      std::cout << "[" << getpid() << "] after lock" << std::endl;
      break;
    }
  }

  if(parent == getpid()) {
    for(int i=0; i < 4; i++) {
      waitid(P_ALL, 0, NULL, WEXITED);
    }
  }

  return 0;
}

実行結果

普通に実行した場合:

$ ./mutex-test
[31413] before lock
[31412] before lock
[31412] in lock
[31415] before lock
[31414] before lock
[31412] after lock
[31413] in lock
[31413] after lock
[31415] in lock
[31415] after lock
[31414] in lock
[31414] after lock

途中で子プロセスにSIGKILLを投げた場合:

$ ./mutex-test 
[31443] before lock
[31443] in lock       # <- このプロセスにSIGKILLを投げる (kill -9 31443)
[31444] before lock
[31445] before lock
[31446] before lock
# 以後 31443 が獲得したロックが解放されることなく、デッドロックに陥る

とりあえず手元の環境では、このような挙動となった。

エラトステネスの篩

loop*1を使って、エラトステネスの篩を実装してみたメモ。
以下、処理系にはSBCLのver1.0.54(x86-64bit)を使用。

;; 引数nまでの範囲の素数のシーケンス(ジェネレータ)を作成する
(declaim (inline make-prime-sequence))
(defun make-prime-sequence (n)
  (let ((arr (make-array (1+ n) :element-type 'bit :initial-element 1)))
    (flet ((prime? (i) (= (bit arr i) 1))       
           (not-prime! (i) (setf (bit arr i) 0))) 
      (declare (inline prime? not-prime!))

      (loop:each (lambda (i)
                   (when (prime? i)
                     (loop:each #'not-prime! (loop:from (* i 2) :to n :by i))))
                 (loop:from 2 :to (floor (sqrt n))))
    
      (loop:filter #'prime? (loop:from 2 :to n)))))

;;; 実行例
;; 100以下の素数
(loop:collect (make-prime-sequence 100))
=> (2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61 67 71 73 79 83 89 97)

;; 1001から1010番目の素数
(loop:collect (loop:take 10 (loop:drop 1000 (make-prime-sequence 10000000))))
=> (7927 7933 7937 7949 7951 7963 7993 8009 8011 8017)

通常のループ(loopマクロ)を使った場合との速度比較。

;; 比較用に素数の合計値を求める関数を用意
(defun prime-sum1 (n)
  (declare (fixnum n)
           (optimize (speed 3) (safety 0) (debug 0)))
  (loop:sum #'identity (make-prime-sequence n)))

;; 一億以下の素数の合計値
(time (prime-sum1 100000000))
Evaluation took:
  1.675 seconds of real time  ; 1.675秒
  1.676105 seconds of total run time (1.676105 user, 0.000000 system)
  100.06% CPU
  3,342,591,038 processor cycles
  12,500,032 bytes consed
=> 279209790387276
;; loopマクロ版
(defun prime-sum2 (n)
  (declare (fixnum n)
           (optimize (speed 3) (safety 0) (debug 0)))
  (let ((arr (make-array (1+ n) :element-type 'bit :initial-element 1)))
    (flet ((prime? (i) (= (bit arr i) 1))
           (not-prime! (i) (setf (bit arr i) 0)))
      (declare (inline prime? not-prime!))

      (loop FOR i fixnum FROM 2 TO (floor (sqrt n))
            WHEN (prime? i)
        DO
        (loop FOR j fixnum FROM (* i 2) TO n BY i
          DO
          (not-prime! j)))

      (loop WITH sum OF-TYPE (unsigned-byte 64)
            FOR i fixnum FROM 2 TO n
            WHEN (prime? i)
        DO (incf sum i)
        FINALLY (return sum)))))

;; 一億以下の素数の合計値
(time (prime-sum2 100000000))
Evaluation took:
  1.476 seconds of real time  ; 1.476秒
  1.472092 seconds of total run time (1.468092 user, 0.004000 system)
  99.73% CPU
  2,944,592,020 processor cycles
  12,500,032 bytes consed
=> 279209790387276

ループ処理を関数型っぽく書いてみる(2)

前回の続き。
githubにあるloopの簡易版を載せておく。

基本的な考え方

基本的なJava等のIteratorと似た*1インタフェースを通してループ処理を実現している。
異なるのは全ての関数をinline展開可能にすることで、同等のループを非関数型的に書いた場合と同じくらいに、コンパイラが最適化を行ってくれることを期待していることくらい。
後は、SBCLの最適化の制限上、構造体等は使用せず、極力lambdaで全てを表現するようにしている。

実装

まず、loopパッケージ用のシーケンス生成関数。

;; 数値の範囲を表現するシーケンス
(declaim (inline from))
(defun from (start &key to (by 1))  ; toがnilなら無限シーケンス
  ;; 全体をlambdaで囲む。このlambdaの呼び出しがシーケンスの初期化処理に相当する。
  (lambda () 
    (let ((cur start))
      ;; 以下の三つの関数を呼び出し元に返す
      (values (lambda () (incf cur by))          ; 1] 値更新関数
              (lambda () (and to(> cur to)))     ; 2] 終端判定関数
              (lambda (fn) (funcall fn cur)))))) ; 3] ループの本体実行関数

;; リスト用
(declaim (inline for-list))
(defun for-list (list)
  (lambda ()
    (let ((head list)) ; 初期値
      (values (lambda () (setf head (cdr head)))         ; 1] 値更新関数
              (lambda () (endp head))                    ; 2] 終端判定関数
              (lambda (fn) (funcall fn (car head)))))))  ; 3] ループの本体実行関数

;; 実行
> (from 1 :to 10)
#<CLOSURE (LAMBDA () :IN FROM) {1007855D3B}>

> (funcall (from 1 :to 10))
#<CLOSURE (LAMBDA () :IN FROM) {10078C432B}>   ; 1] 値更新関数
#<CLOSURE (LAMBDA () :IN FROM) {10078C434B}>   ; 2] 終端判定関数
#<CLOSURE (LAMBDA (FN) :IN FROM) {10078C436B}> ; 3] ループの本体実行関数

> (setf (values next end? call-body) (funcall (from 1 :to 10)))
> (funcall call-body (lambda (x) (list :val x)))
=> (:val 1)

> (funcall next)
> (funcall call-body (lambda (x) (list :val x)))
=> (:val 2)

上の関数で生成されたシーケンスを走査する関数。

;; 一番基本となる走査関数
(declaim (inline each))
(defun each (fn seq)
  (multiple-value-bind (next-fn end-fn call-fn) (funcall seq)  ; シーケンス初期化
    (loop UNTIL (funcall end-fn)   ; 終端判定
          DO (funcall call-fn fn)  ; 本体実行
             (funcall next-fn))))  ; 値更新

;; 畳み込み関数  ※ reduceはclパッケージとそれと名前が衝突するので、ここではfoldにしている
(declaim (inline fold))
(defun fold (fn init seq)
  (let ((acc init))
    (each (lambda (x)
            (setf acc (funcall fn acc x)))
          seq)
    acc))

;; シーケンスを集めたリストを返す
(declaim (inline collect))
(defun collect (seq)
  (nreverse (fold (lambda (acc x) (cons x acc))
                    '()
                    seq)))

;; 実行
> (collect (from 1 :to 20 :by 3))
=> (1 4 7 10 13 16 19)

; 合計値計算
> (fold (lambda (acc x) (+ acc x))
        0
        (from 1 :to 20 :by 3))
=> 70

mapとかfilterとかシーケンスを加工/制御する関数。

;; map関数
(declaim (inline map-seq))
(defun map-seq (map-fn seq)
  ;; ソースとなるシーケンスの情報を取得し、それをラップして返す
  (multiple-value-bind (next-fn end-fn call-fn) (funcall seq)
    (lambda ()
      (values next-fn  ; 値更新関数と終端判定関数はそのまま
              end-fn
              (lambda (body-fn)
                ;; 本体呼び出し前に、マップ処理用関数を差し込む
                (funcall call-fn (lambda (val) (funcall body-fn (funcall map-fn val)))))))))

;; filter関数: (funcall pred-fn val)がnilとなる要素をスキップする
(declaim (inline filter))
(defun filter (pred-fn seq)
  ;; ソースとなるシーケンスの情報を取得し、それをラップして返す
  (multiple-value-bind (next-fn end-fn call-fn) (funcall seq)
    (lambda ()
      (values next-fn ; 値更新関数と終端判定関数はそのまま
              end-fn
              (lambda (body-fn)
                ;; 本体呼び出し前に、フィルター処理用関数を差し込む
                (funcall call-fn (lambda (val)
                                   (unless (funcall pred-fn val)
                                     (funcall body-fn val)))))))))

;; 実行
; 二乗する
> (collect (map-seq (lambda (x) (* x x)) (from 1 :to 20)))
=> (1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361 400)

; 奇数の値だけフィルタして二乗する
> (collect (map-seq (lambda (x) (* x x)) (filter #'oddp (from 1 :to 20))))
=> (1 9 25 49 81 121 169 225 289 361)

これらの関数群を組み合わせてループ処理を表現すると、そこそこ良い感じのコードを生成してくれる。

;; 上で定義した関数群を用いたsum関数
;; - startからendの範囲の奇数値を-10した合計値を返す
(defun sum1 (start end)
 (declare (fixnum start end)
          (optimize (speed 3) (safety 0))
          (sb-ext:unmuffle-conditions sb-ext:compiler-note))
 (fold (lambda (acc n)
         (the fixnum (+ acc n)))
       0
       (map-seq (lambda (x) (- x 10)) 
                (filter #'oddp (from start :to end)))))

;; loopマクロを使用したsum関数
(defun sum2 (start end)
 (declare (fixnum start end)
          (optimize (speed 3) (safety 0))
          (sb-ext:unmuffle-conditions sb-ext:compiler-note))
 (loop WITH total fixnum = 0
       FOR i FROM start TO end
       WHEN (oddp i)
   DO (let ((n (- i 10)))
        (declare (fixnum n))
        (incf total n))
   FINALLY (return total)))

;; 一億要素に対するループ
> (time (sum1 1 100000000))
Evaluation took:
  0.134 seconds of real time  ; 0.134秒
  0.136009 seconds of total run time (0.136009 user, 0.000000 system)
  101.49% CPU
  267,335,373 processor cycles
  0 bytes consed
=> 2499999500000000

> (time (sum2 1 100000000))
Evaluation took:
  0.131 seconds of real time  ; 0.131秒
  0.132008 seconds of total run time (0.132008 user, 0.000000 system)
  100.76% CPU
  261,630,697 processor cycles
  0 bytes consed
=> 2499999500000000

;; disassemble結果
> (disassemble #'sum1)
; disassembly for SUM1
; 07AA1C28:       31D2             XOR EDX, EDX               ; no-arg-parsing entry point
;       2A:       EB1B             JMP L2
;       2C:       90               NOP
;       2D:       90               NOP
;       2E:       90               NOP
;       2F:       90               NOP
;       30: L0:   488BC1           MOV RAX, RCX
;       33:       488D1C4500000000 LEA RBX, [RAX*2]
;       3B:       4883E302         AND RBX, 2
;       3F:       4885DB           TEST RBX, RBX
;       42:       750E             JNE L3
;       44: L1:   48FFC1           INC RCX
;       47: L2:   4839F9           CMP RCX, RDI
;       4A:       7EE4             JLE L0
;       4C:       488BE5           MOV RSP, RBP
;       4F:       F8               CLC
;       50:       5D               POP RBP
;       51:       C3               RET
;       52: L3:   4883E80A         SUB RAX, 10
;       56:       48D1FA           SAR RDX, 1
;       59:       4801C2           ADD RDX, RAX
;       5C:       48D1E2           SHL RDX, 1
;       5F:       EBE3             JMP L1

> (disassemble #'sum2)
; disassembly for SUM2
; 07EF0DB8:       31D2             XOR EDX, EDX               ; no-arg-parsing entry point
;       BA:       EB2A             JMP L2
;       BC:       90               NOP
;       BD:       90               NOP
;       BE:       90               NOP
;       BF:       90               NOP
;       C0: L0:   488D044D00000000 LEA RAX, [RCX*2]
;       C8:       4883E002         AND RAX, 2
;       CC:       4885C0           TEST RAX, RAX
;       CF:       7412             JEQ L1
;       D1:       488D044D00000000 LEA RAX, [RCX*2]
;       D9:       488BD8           MOV RBX, RAX
;       DC:       4883EB14         SUB RBX, 20
;       E0:       4801DA           ADD RDX, RBX
;       E3: L1:   48FFC1           INC RCX
;       E6: L2:   4839F9           CMP RCX, RDI
;       E9:       7ED5             JLE L0
;       EB:       488BE5           MOV RSP, RBP
;       EE:       F8               CLC
;       EF:       5D               POP R

最後は複数シーケンスをまとめるzip関数。
これを使うと表現力はだいぶ上がるけど、性能は若干劣化する。

;; 二つのシーケンスをまとめる
(declaim (inline zip))
(defun zip (loop1 loop2 &aux (undef (gensym)))
  (multiple-value-bind (next-fn1 end-fn1 call-fn1) (funcall loop1)
    (multiple-value-bind (next-fn2 end-fn2 call-fn2) (funcall loop2)
      (let ((memo1 undef)
            (memo2 undef))
        (lambda ()
          (values (lambda () ; 値更新
                    (when (eq memo1 undef) (funcall next-fn1))
                    (when (eq memo2 undef) (funcall next-fn2)))

                  (lambda () ; 終端判定
                    (or (funcall end-fn1) (funcall end-fn2)))

                  (lambda (body-fn)  ; 本体呼び出し
                    ;; それぞれのシーケンスの次の値を取得する
                    ;; (次の値がfilterでスキップされた場合は memoX はundefのままになる)
                    (when (eq memo1 undef)
                      (funcall call-fn1 (lambda (val) (setf memo1 val))))

                    (when (eq memo2 undef)
                      (funcall call-fn2 (lambda (val) (setf memo2 val))))
    
                    ;; 両方のシーケンスの値が取得できたら、本体を呼び出す
                    (when (not (or (eq memo1 undef)
                                   (eq memo2 undef)))
                      (funcall fn (list memo1 memo2))  ; XXX: listで二つの値をまとめるのはconsingが発生するので効率が悪い (そのためloopパッケージでは、多引数を受け取るmapやfilterを用意している)
                      (setf memo1 undef
                            memo2 undef)))))))))

;; 実行
> (collect
    (zip (filter (lambda (n) (and (oddp n)  (zerop (mod n 3)))) (from 1))           ; 奇数かつ三の倍数
         (filter (lambda (n) (and (evenp n) (zerop (mod n 5)))) (from 1 :to 100)))) ; 偶数かつ五の倍数
=> ((3 10) (9 20) (15 30) (21 40) (27 50) (33 60) (39 70) (45 80) (51 90) (57 100))

zipはもう少し上手く実装したいところだけど、それでも関数型っぽく書いても実用上十分な性能がでるループ処理が実現できそうなことが分かったので、結構満足している。

*1:似てないかも

ループ処理を関数型っぽく書いてみる(1)

今週は、common lispでループ処理を関数型っぽく、かつ効率良く実装できるかどうかを試していたので、その結果を載せておく。
結論から云えば、処理系の十分な最適化を期待できれば、関数型っぽく書いても、手続き型的に書いた場合と比肩しえる性能が得られそうな感じだった。

成果物

作ったもの: loop
今回はこれの使用例とベンチマーク結果を、次回は実装方法を載せる予定。

使用例

シーケンス(or 無限シーケンス)とmapとかfilterとかを組み合わせてループを表現する。

;; 1から5までの数値を表示する
> (loop:each (lambda (n) (print n))
             (loop:from 1 :to 5))
1
2
3
4
5
=> NIL

> (loop:from 1 :to 5)
=> #<CLOSURE (LAMBDA () :IN LOOP:FROM) {10030E0E0B}> ; 実態はクロージャー

;; マッピング
> (loop:collect (loop:map (lambda (c) (list c (char-code c)))
                          (loop:for-string "mapping")))
=> ((#\m 109) (#\a 97) (#\p 112) (#\p 112) (#\i 105) (#\n 110) (#\g 103))

;; フィルター
> (loop:collect (loop:filter #'oddp (loop:from 1 :to 10)))
-> (1 3 5 7 9)

;;; fizzbuzz
> (defun fizzbuzz-seq ()
    (loop:filter #'consp
      (loop:map (lambda (n) (cond ((zerop (mod n 15)) (cons n :fizzbuzz))
                                  ((zerop (mod n  5)) (cons n :buzz))
                                  ((zerop (mod n  3)) (cons n :fizz))
                                  (t nil)))
                (loop:from 1))))

;; 先頭三つ
> (loop:collect (loop:take 3 (fizzbuzz-seq)))
=> ((3 . :FIZZ) (5 . :BUZZ) (6 . :FIZZ))

;; 10から12番目
> (loop:collect (loop:take 2 (loop:drop 10 (fizzbuzz-seq))))
=> ((24 . :FIZZ) (25 . :BUZZ))

;; 100以下かつ偶数のものだけ
> (loop:collect 
   (loop:take-while (lambda (x) (<= (car x) 100)) 
                    (loop:filter (lambda (x) (evenp (car x)))
                                 (fizzbuzz-seq))))
=> ((6 . :FIZZ) (10 . :BUZZ) (12 . :FIZZ) (18 . :FIZZ) (20 . :BUZZ) (24 . :FIZZ)
    (30 . :FIZZBUZZ) (36 . :FIZZ) (40 . :BUZZ) (42 . :FIZZ) (48 . :FIZZ)
    (50 . :BUZZ) (54 . :FIZZ) (60 . :FIZZBUZZ) (66 . :FIZZ) (70 . :BUZZ)
    (72 . :FIZZ) (78 . :FIZZ) (80 . :BUZZ) (84 . :FIZZ) (90 . :FIZZBUZZ)

;; zip
> (loop:collect 
    (loop:take 3
      (loop:map-n 3 (lambda (x y z) (list x y z))  ; zipと組み合わせる場合は XXX-n 系のマクロを使用して、引数の数を指定する
        (loop:zip (loop:filter #'oddp (loop:from 1))
                  (loop:down-from 100 :by 3)
                  (loop:map #'sqrt (loop:repeat (lambda () (random 1000))))))))
=> ((1 100 19.078785) 
    (3 97 19.052559) 
    (5 94 16.309507))

;; フィボナッチ数列を定義
(defun fib-seq () 
  (let ((n+1 1))
    (declare (fixnum n+1))
    (loop:make-generator 
     :init (lambda () 0)                             ; 初期値生成関数
     :next (lambda (n) (prog1 n+1 (incf n+1 n)))     ; 値更新関数
     :end? (lambda (n) (declare (ignore n)) nil))))  ; 終端判定関数

> (loop:collect (loop:take 20 (fib-seq)))
=> (0 1 1 2 3 5 8 13 21 34 55 89 144 233 377 610 987 1597 2584 4181)

速度比較

loopマクロやdoを使って書いた場合との速度比較。
処理系はSBCL(1.0.54)

比較1: sum関数 (単純なループ)
;; 準備
(defparameter *fastest* '(optimize (speed 3) (safety 0) (debug 0) (compilation-speed 0)))
(defparameter *note* '(sb-ext:unmuffle-conditions sb-ext:compiler-note))

;; loopパッケージ版
(defun sum1 (start end)
  (declare (fixnum start end) #.*fastest* #.*note*)
  (loop:reduce (lambda (acc x) (the fixnum (+ acc x)))
               0
               (loop:from start :to end)))

;; loopマクロ版
(defun sum2 (start end)
  (declare (fixnum start end) #.*fastest* #.*note*)
  (loop WITH total fixnum = 0
        FOR i FROM start TO end
        DO (incf total i)
        FINALLY (return total)))  ; (loop ... SUM i) では最適化されない部分があるので、少し長くなるけどこちらを採用

;; do版
(defun sum3 (start end)
  (declare (fixnum start end) #.*fastest* #.*note*)
  (let ((total 0))
    (declare (fixnum total))
    (do ((i start (1+ i)))
        ((> i end) total)
      (incf total i))))

;; 実行
> (time (sum1 1 100000000))  ; loopパッケージ版: 0.084秒
Evaluation took:
  0.084 seconds of real time
  0.084006 seconds of total run time (0.084006 user, 0.000000 system)
  100.00% CPU
  167,231,593 processor cycles
  0 bytes consed
=> 5000000050000000

> (time (sum2 1 100000000)) ; loopマクロ版: 0.086秒
Evaluation took:
  0.086 seconds of real time
  0.088005 seconds of total run time (0.088005 user, 0.000000 system)
  102.33% CPU
  171,410,077 processor cycles
  0 bytes consed
=> 5000000050000000

> (time (sum3 1 100000000)) ; do版: 0.083秒
Evaluation took:
  0.083 seconds of real time
  0.084005 seconds of total run time (0.084005 user, 0.000000 system)
  101.20% CPU
  166,793,649 processor cycles
  0 bytes consed
=> 5000000050000000

単純なループ処理なら、どれも速度は同じくらい。

比較2: 数値リストの奇数番目の要素の平均値を求める (zipを使ったループ)
;; データ準備: 1000万要素のリスト
(defparameter *list* (loop REPEAT 10000000 COLLECT (random 100000)))

;; loopパッケージ版
(defun avg1 (list)
  (declare #.*fastest* #.*note*)
  (flet ((average (sequence)  ; シーケンスの生成と平均値を求める処理を分離することが可能
           (let ((total 0)
                 (count 0))
             (declare (fixnum total count))
             (loop:each (lambda (n)
                          (incf total n)
                          (incf count))
                        sequence)
             (float (/ total count)))))

    (let ((seq (loop:map-n 2 (lambda (_ n) n)
                 (loop:filter-n 2 (lambda (i _) (oddp i))
                   (loop:zip (loop:from 0 :to most-positive-fixnum)
                             (loop:for-list list :element-type fixnum))))))
      (average seq))))

;; loopマクロ版
(defun avg2 (list)
  (declare #.*fastest* #.*note*)
  (loop WITH total fixnum = 0
        WITH count fixnum = 0
        FOR i fixnum FROM 0
        FOR n fixnum IN list
        WHEN (oddp i)
    DO
    (incf total n)
    (incf count)
    FINALLY
    (return (float (/ total count)))))

;; do版
(defun avg3 (list)
  (declare #.*fastest* #.*note*)
  (let ((total 0)
        (count 0))
    (declare (fixnum total count))
    (do ((i 0 (1+ i))
         (head list (cdr head)))
        ((endp head))
      (declare (fixnum i))
      (when (oddp i)
        (incf count)
        (incf total (the fixnum (car head)))))
    (float (/ total count))))

;; 実行
> (time (avg1 *list*))  ; loopパッケージ版: 0.084秒
Evaluation took:
  0.084 seconds of real time
  0.084005 seconds of total run time (0.084005 user, 0.000000 system)
  100.00% CPU
  166,739,958 processor cycles
  0 bytes consed
=> 50003.64

> (time (avg2 *list*))  ; loopマクロ版: 0.036秒
Evaluation took:
  0.036 seconds of real time
  0.036002 seconds of total run time (0.036002 user, 0.000000 system)
  100.00% CPU
  72,645,764 processor cycles
  0 bytes consed
=> 50003.64

> (time (avg3 *list*))  ; do版: 0.037秒
Evaluation took:
  0.037 seconds of real time
  0.040003 seconds of total run time (0.040003 user, 0.000000 system)
  108.11% CPU
  75,246,648 processor cycles
  0 bytes consed 
=> 50003.64

loopパッケージ版はzipを通すと、loopマクロやdoを使った場合の半分以下になってしまう。

比較3: 比較2での、平均値算出部分と奇数番要素のフィルタ部分を、別関数に分けた場合
;; loopマクロで、平均値算出部分と奇数番要素のフィルタ部分を、別関数に分けた場合
(declaim (inline average-list))
(defun average-list (list)
  (declare #.*fastest* #.*note*)
  (loop WITH total fixnum = 0
        WITH count fixnum = 0
        FOR n fixnum IN list
    DO
    (incf total n)
    (incf count)
    FINALLY
    (return (float (/ total count)))))

(declaim (inline filter-list))
(defun filter-list (list)
  (declare #.*fastest* #.*note*)
  (loop FOR i fixnum FROM 0
        FOR x IN list
        WHEN (oddp i)
        COLLECT x))

> (time (average-list (filter-list *list*)))
Evaluation took:
  0.122 seconds of real time              ; 全体で0.122秒、GC抜きなら0.081秒
  0.120008 seconds of total run time (0.120008 user, 0.000000 system)
  [ Run times consist of 0.040 seconds GC time, and 0.081 seconds non-GC time. ]
  98.36% CPU
  244,986,221 processor cycles
  79,986,688 bytes consed
=> 50003.64

;; do版
;; ※ loopマクロ版とほとんど変わらないので省略

;; loopパッケージで、平均値算出部分と奇数番要素のフィルタ部分を、別関数に分けた場合
(declaim (inline average-loop))
(defun average-loop (sequence)
  (declare #.*fastest* #.*note*)
  (let ((total 0)
        (count 0))
    (declare (fixnum total count))
    (loop:each (lambda (n)
                 (incf total (the fixnum n))
                 (incf count))
               sequence)
    (float (/ total count))))

(declaim (inline filter-loop))
(defun filter-loop (list)
  (declare #.*fastest* #.*note*)
  (loop:map-n 2 (lambda (_ n) n)
    (loop:filter-n 2 (lambda (i _) (oddp i))
      (loop:zip (loop:from 0 :to most-positive-fixnum)
                (loop:for-list list :element-type fixnum)))))

> (time (average-loop (filter-loop *list*)))
Evaluation took:
  0.070 seconds of real time   ; 0.070秒
  0.072005 seconds of total run time (0.072005 user, 0.000000 system)
  102.86% CPU
  139,856,631 processor cycles
  0 bytes consed
=> 50003.64

;; loopパッケージでinline宣言を外した場合
(declaim (notinline average-loop))
(declaim (notinline filter-loop))
> (time (average-loop (filter-loop *list*)))
Evaluation took:
  0.378 seconds of real time  ; 0.378秒
  0.376024 seconds of total run time (0.376024 user, 0.000000 system)
  99.47% CPU
  754,498,049 processor cycles
  0 bytes consed
=> 50003.64

loopマクロやdoマクロでは、ループ処理の一部を自然な形で効率よく外だしするのが困難なので、そういった用途ではloopパッケージの方が性能が良い。
ただし、現状のloopパッケージは、inline展開による最適化に過度に依存しているので、展開が効かないケースでは、いっきに処理速度が遅くなってしまう。

逆FizzBuzz

逆FizzBuzz問題 (Inverse FizzBuzz)というものがあるのを知ったので解いてみた。
結構力技。
あと、本当に合っているかは不明。

;; 逆FizzBuzzを解く関数
;; listは fizz,buzz,fizzbuzz のいずれかを要素に持つリスト
;;
;; 処理内容は、
;;   - 1: 開始数値を(1から15の範囲で)設定して、とりあえず解いてみる
;;   - 2: その15個の解の中から、一番短いものを選択する
;;   という簡単なもの。
;;
;; 解がない場合はnilを返す。
(defun ifizzbuzz (list)
  (select-min 
   (delete '() (loop FOR start FROM 1 TO 15   ; 開始数値を1〜15の範囲を試せば、全パターン網羅できる(はず)
                     COLLECT (ifizzbuzz-impl start list '())))))

;; リスト内で、一番短いリストを返す
(defun select-min (list-of-list)
  (first (sort list-of-list #'< :key #'length)))

;; 数値をfizzbuzzを表すシンボルに変換する
(defun get-fizzbuzz-type (n)
  (cond ((= (mod n 15) 0) 'fizzbuzz)
        ((= (mod n  5) 0) 'buzz)
        ((= (mod n  3) 0) 'fizz)
        (t                'none)))

;; nを開始とする連続する数値列が、listが示すfizzbuzz列に一致するかを判定する関数
;; 一致する場合は、その一致した数値列を返す。
(defun ifizzbuzz-impl (n list acc)
  (if (null list)
      (reverse acc)  ; 一致した
    (let ((type (get-fizzbuzz-type n)))
      (if (eq type 'none)           ; fizzbuzzと関係ない数値の場合は無条件で許可
          (ifizzbuzz-impl (1+ n) list (cons n acc))
        (when (eq (car list) type)  ; fizzbuzz系の数値の場合は、listの先頭要素と一致するもののみ許可
          (ifizzbuzz-impl (1+ n) (cdr list) (cons n acc)))))))

動作例。

> (ifizzbuzz '(fizz))
=> (3)

> (ifizzbuzz '(fizz buzz))
=> (9 10)

> (ifizzbuzz '(fizz buzz fizz))
=> (3 4 5 6)

> (ifizzbuzz '(fizz fizz buzz fizz fizzbuzz fizz))
=> (6 7 8 9 10 11 12 13 14 15 16 17 18)

> (ifizzbuzz '(buzz buzz))
=>NIL  ; 解無し

合ってそうには見える。

Gomokuの形態素解析部をScalaで実装してみた

ここ数日はScalaのコップ本を読んでいて、何かまとまったプログラムをScalaで書いてみたくなったのでGomoku(Java形態素解析器。ver0.0.6)Scalaで実装してみた*1
github: scala-gomoku(ver0.0.1)

以下、使用例とJava版/Scala版の簡単な比較メモ。

使用例

$ scala -cp scala-gomoku-0.0.1.jar

// インタプリタ起動 & パッケージインポート
scala> import net.reduls.scala.gomoku._

// 分かち書き
scala> Tagger.wakati("Scalaはオブジェクト指向言語と関数型言語の特徴を統合したマルチパラダイムのプログラミング言語である。")
res0: List[String] = 
        List(Scala, は, オブジェクト, 指向, 言語, と, 関数, 型, 言語, の, 特徴, を, 統合, し, た, マルチパラダイム, の, プログラミング, 言語, で, ある, 。)

// 形態素解析
scala> Tagger.parse("Scalaはオブジェクト指向言語と関数型言語の特徴を統合したマルチパラダイムのプログラミング言語である。")
res1: List[net.reduls.scala.gomoku.Morpheme] =
         List(Morpheme(Scala,名詞,固有名詞,組織,*,*,*,0), Morpheme(は,助詞,係助詞,*,*,*,*,5), Morpheme(オブジェクト,名詞,一般,*,*,*,*,6), Morpheme(指向,名詞,サ変接続,*,*,*,*,12), Morpheme(言語,名詞,一般,*,*,*,*,14), 
              Morpheme(と,助詞,並立助詞,*,*,*,*,16), Morpheme(関数,名詞,一般,*,*,*,*,17), Morpheme(型,名詞,接尾,一般,*,*,*,19), Morpheme(言語,名詞,一般,*,*,*,*,20), Morpheme(の,助詞,連体化,*,*,*,*,22), Morpheme(特徴,名詞,一般,*,*,*,*,23), 
              Morpheme(を,助詞,格助詞,一般,*,*,*,25), Morpheme(統合,名詞,サ変接続,*,*,*,*,26), Morpheme(し,動詞,自立,*,*,サ変・スル,連用形,28), Morpheme(た,助動詞,*,*,*,特殊・タ,基本形,29), Morpheme(マルチパラダイム,名詞,一般,*,*,*,*,30), 
              Morpheme(の,助詞,連体化,*,*,*,*,38), Morpheme(プログラミング,名詞,サ変接続,*,*,*,*,39), Morpheme(言語,名詞,一般,*,*,*,*,46), Morpheme(で,助動詞,*,*,*,特殊・ダ,連用形,48), Morpheme(ある,助動詞,*,*,*,五段・ラ行アル,基本形,49), Morpheme(。,記号,句点,*,*,*,*,51))

// 名詞のみ取り出し
scala> for(m <- res1 if m.feature.startsWith("名詞")) yield m.surface
res2: List[String] = 
        List(Scala, オブジェクト, 指向, 言語, 関数, 型, 言語, 特徴, 統合, マルチパラダイム, プログラミング, 言語)

ソースコード行数

形態素解析部のソースコードの行数比較。

Java:

$ cd gomoku-0.0.6-src
$ wc -l `find . -name '*.java'`
  117 ./analyzer/src/net/reduls/gomoku/Tagger.java
   12 ./analyzer/src/net/reduls/gomoku/Morpheme.java
   23 ./analyzer/src/net/reduls/gomoku/util/ReadLine.java
   83 ./analyzer/src/net/reduls/gomoku/util/Misc.java
   38 ./analyzer/src/net/reduls/gomoku/bin/Gomoku.java
   32 ./analyzer/src/net/reduls/gomoku/dic/Unknown.java
   72 ./analyzer/src/net/reduls/gomoku/dic/Char.java
   23 ./analyzer/src/net/reduls/gomoku/dic/WordDic.java
   61 ./analyzer/src/net/reduls/gomoku/dic/SurfaceId.java
   43 ./analyzer/src/net/reduls/gomoku/dic/Morpheme.java
   26 ./analyzer/src/net/reduls/gomoku/dic/PartsOfSpeech.java
   23 ./analyzer/src/net/reduls/gomoku/dic/ViterbiNode.java
   26 ./analyzer/src/net/reduls/gomoku/dic/Matrix.java
  579 合計

Scala:

$ cd scala-gomoku-0.0.1-src
$ wc -l `find . -name '*.scala'`
   3 ./src/net/reduls/scala/gomoku/Morpheme.scala
  27 ./src/net/reduls/scala/gomoku/bin/Gomoku.scala
  15 ./src/net/reduls/scala/gomoku/dic/Matrix.scala
  13 ./src/net/reduls/scala/gomoku/dic/PartsOfSpeech.scala
  18 ./src/net/reduls/scala/gomoku/dic/Morpheme.scala
  22 ./src/net/reduls/scala/gomoku/dic/Char.scala
  32 ./src/net/reduls/scala/gomoku/dic/Util.scala
   9 ./src/net/reduls/scala/gomoku/dic/ViterbiNode.scala
  39 ./src/net/reduls/scala/gomoku/dic/SurfaceId.scala
  30 ./src/net/reduls/scala/gomoku/dic/Unknown.scala
  15 ./src/net/reduls/scala/gomoku/dic/WordDic.scala
  56 ./src/net/reduls/scala/gomoku/Tagger.scala
 279 合計

Scala版はJava版に対して、おおよそ半分程度の行数。

処理速度

以下のようなベンチマークスクリプトを書いて、両者の処理速度を比較してみた。

// ファイル名: Benchmark.scala

import scala.testing.Benchmark
import net.reduls.scala.gomoku.{Tagger=>ScalaTagger}
import net.reduls.gomoku.{Tagger=>JavaTagger}
import scala.io.Source

// ベンチマーク用データ: 使用したのは約17MBの日本語テキストデータ
object BenchmarkData {
  val lines = Source.fromFile("/path/to/testdata").getLines.toArray
}

// Scala用のベンチマークオブジェクト
object ScalaGomokuBenchmark extends Benchmark {
  // BenchmarkData.linesの各行を分かち書き
  override def run() { BenchmarkData.lines.foreach(ScalaTagger.wakati _) } 
}

// Scala用のベンチマークオブジェクト
object JavaGomokuBenchmark extends Benchmark {
  override def run() { BenchmarkData.lines.foreach(JavaTagger.wakati _) }
}

// ベンチマーク実行
println("[Data]")
println("  lines: " + BenchmarkData.lines.length)
println("")

val scalaRlt = ScalaGomokuBenchmark.runBenchmark(11).tail
println("[Scala]")
println("  result : " + scalaRlt.mkString(", "))
println("  average: " + (scalaRlt.sum / scalaRlt.length))
println("")

val javaRlt = JavaGomokuBenchmark.runBenchmark(11).tail
println("[Java]")
println("  result : " + javaRlt.mkString(", "))
println("  average: " + (javaRlt.sum / javaRlt.length))
println("")

実行結果:

# Scala: version 2.9.0.1 (OpenJDK 64-Bit Server VM, Java 1.6.0_23).
$ scala -cp scala-gomoku-0.0.1.jar:gomoku-0.0.6.jar Benchmark.scala
[Data]
  lines: 172088  # データの行数(約17万行)

[Scala]
  result : 4529, 4574, 4568, 4540, 4503, 4510, 4523, 4515, 4551, 4531  
  average: 4534  # 平均: 4.534秒

[Java]
  result : 3153, 3111, 3118, 3112, 3102, 3098, 3118, 3130, 3117, 3133
  average: 3119  # 平均: 3.119秒

自分の環境では、Scala版はJava版よりも1.5倍程度遅かった。
※ まだScalaでの効率の良い書き方とかが全然分かっていないので、その辺りを踏まえてちゃんと最適化を行えばもっと差は縮まるかもしれない


書きやすさはScalaの方が全然上だけど、(今回のケースでは)まだJavaに比べると若干遅い感じはする。

*1:形態素解析部のみ、バイナリ辞書データ構築部は未実装