マルチプロセスで使用可能なロックフリーキュー

タイトルの通り、マルチプロセスで使用可能なロックフリーのFIFOキューを実装したので、その簡単な紹介。

作成物

github: ipc-msgque (0.0.4)

  • ロックフリーなFIFOキュー
    • 再入可能 かつ SIGKILLに対して安全*1
  • C++
  • 共有メモリ(mmap)を使用
  • マルチプロセス(and マルチスレッド)間の通信に使用可能
  • gcc(ver4.1以上)*2 かつ POSIX準拠環境*3でのみ使用可能

単機能な割に、内部では「まず(割合)汎用的な可変長ブロックメモリアロケータを作って、その上に固定長ブロックアロケータ、さらにその上にFIFOキューを実装」と地味に凝ったことをしている。

使用例

fork()と併用した例。

/**
 * 親子プロセスで共有するFIFOキューを作成し、子から親へメッセージを送信するサンプルプログラム
 * 
 * ファイル名: msgque-sample.cc
 * コンパイル: g++ -o msgque-sample msgque-sample.cc
 */
#include <imque/queue.hh>  // インクルードパスを通しておく

#include <unistd.h>    // fork, getpid
#include <sys/types.h>
#include <stdio.h>     // sprintf
#include <string.h>    // strlen
#include <iostream>
#include <string>

#define CHILD_COUNT 10        // 子プロセスの数
#define QUEUE_ENTRY_COUNT 32  // キューの最大要素数
#define SHM_SIZE 4096         // キューが使用可能な共有メモリのバイト数

int main(int argc, char** argv) {
  // 要素数と共有メモリサイズを指定してキューを作成
  imque::Queue que(QUEUE_ENTRY_COUNT, SHM_SIZE);  
  if(! que) {
    return 1;
  } 

  for(int i=0; i < CHILD_COUNT; i++) {
    if(fork() == 0) {
      // 子プロセスの処理
      char buf[1024]; 
      sprintf(buf, "Hello: %d", getpid());

      // enqueue
      que.enq(buf, strlen(buf));
      return 0;
    }
  }

  // 親プロセスの処理
  for(int i=0; i < CHILD_COUNT; i++) {
    std::string buf;

    // dequeue
    while(que.deq(buf) == false);  // キューが空の間はビジーループ
    std::cout << "[receive] " << buf << std::endl;
  }

  return 0;
}

実行結果:

$ ./msgque-sample 
[receive] Hello: 12736
[receive] Hello: 12737
[receive] Hello: 12738
[receive] Hello: 12740
[receive] Hello: 12739
[receive] Hello: 12742
[receive] Hello: 12744
[receive] Hello: 12743
[receive] Hello: 12745
[receive] Hello: 12741

気が向けば、内部で使用しているメモリアロケータのコードなども載せていくかもしれない。

*1:ただし、メモリを確保してから解放するまでの間にSIGKILL等でプロセスがダウンした場合は、その分のメモリはリークする

*2:__sync_bool_compare_and_swap 等の各種アトミック関数を使用しているため。

*3:共有メモリの仕組みとしてmmapを使用しているため。

ソート済みのリストに対する破壊的マージソートの改良

以前に載せたマージソート(をベースとしたもの)をSBCL(1.0.58)にコミットしてくれたPaul Khuongさんが、こんな記事を書いていて、なるほどなー、と思ったので、表題に関係する部分を参考にさせて貰って変更前後での比較を行ったメモ。

オリジナルのマージソート

まず、SBCL(1.0.58)のリストに対する破壊的マージソートの実装*1:

;; 二つのソート済みリストのマージ関数
(declaim (inline merge-lists*))
(defun merge-lists* (head list1 list2 test key &aux (tail head))
  (declare (type cons head list1 list2)
           (type function test key)
           (optimize speed))
  (macrolet ((merge-one (l1 l2)
               `(progn
                  (setf (cdr tail) ,l1
                        tail       ,l1)
                  (let ((rest (cdr ,l1)))
                    (cond (rest
                           (setf ,l1 rest))
                          (t
                           (setf (cdr ,l1) ,l2)
                           (return (cdr head))))))))
    (loop
     (if (funcall test (funcall key (car list2))  ; this way, equivalent
                       (funcall key (car list1))) ; values are first popped
         (merge-one list2 list1)                  ; from list1
         (merge-one list1 list2)))))

;; 実行
(merge-lists* '(:head) '(1 3 5) '(2 4 6) #'< #'identity))
=> (1 2 3 4 5 6)
;; リストのマージソート関数
(declaim (inline stable-sort-list))
(defun stable-sort-list (list test key &aux (head (cons :head list)))
  (declare (type list list)
           (type function test key)
           (dynamic-extent head))
  (labels ((recur (list size)
             (declare (optimize speed)
                      (type cons list)
                      (type (and fixnum unsigned-byte) size))
             (if (= 1 size)
                 (values list (shiftf (cdr list) nil))
                 (let ((half (ash size -1)))
                   (multiple-value-bind (list1 rest)
                       (recur list half)
                     (multiple-value-bind (list2 rest)
                         (recur rest (- size half))
                       (values (merge-lists* head list1 list2 test key)
                               rest)))))))
    (when list
      (values (recur list (length list))))))

;; 実行
(stable-sort-list '(8 73 2 40 0 3) #'< #'identity)
=> (0 2 3 8 40 73)

何種類かデータを用意して実行時間を計測:

;;; 計測用データ
;; 1] 400万要素のソート済みリスト
(defparameter *sorted-list* (loop FOR i FROM 0 BELOW 4000000 COLLECT i))

;; 2] 400万要素の逆順ソート済みリスト
(defparameter *reverse-sorted-list* (reverse *sorted-list*))

;; 3] 400万要素のほぼソート済みリスト1  ※ 千要素に一つがランダムな値
(defparameter *nearly-sorted-list1* (loop FOR i FROM 0 BELOW 4000000
                                         COLLECT (if (zerop (random 1000))
                                                     (random 4000000)
                                                   i)))

;; 4] 400万要素のほぼソート済みリスト2  ※ 複数のソート済みリストが連結
(defparameter *nearly-sorted-list2* (loop REPEAT 4 APPEND (loop FOR i FROM 0 BELOW 1000000 COLLECT i)))

;; 5] 400万要素のランダムなリスト
(defparameter *random-list* (loop REPEAT 4000000 COLLECT (random most-positive-fixnum)))


;;; 計測用マクロ
(defmacro sort-time (sort-fn-name list)
  `(let ((list~ (copy-list ,list)))
     (declare (optimize (speed 3) (safety 0)))
     (time (progn (,sort-fn-name list~ #'< #'identity)
                  t))))


;;; 計測
;; 1] ソート済みリスト
(sort-time stable-sort-list *sorted-list*)
Evaluation took:
  0.254 seconds of real time  ; 0.254秒
  0.252017 seconds of total run time (0.248016 user, 0.004001 system)
  99.21% CPU
  508,247,464 processor cycles
  0 bytes consed
=> T

;; 2] 逆順ソート済みリスト
(sort-time stable-sort-list *reverse-sorted-list*)
Evaluation took:
  0.235 seconds of real time  ; 0.235秒
  0.232015 seconds of total run time (0.232015 user, 0.000000 system)
  98.72% CPU
  468,869,834 processor cycles
  0 bytes consed
=> T

;; 3] ほぼソート済みリスト1  ※ 千要素に一つがランダムな値
(sort-time stable-sort-list *nearly-sorted-list1*)
Evaluation took:
  0.348 seconds of real time  ; 0.348秒
  0.348023 seconds of total run time (0.344022 user, 0.004001 system)
  100.00% CPU
  694,968,622 processor cycles
  0 bytes consed
=> T

;; 4] ほぼソート済みリスト2  ※ 複数のソート済みリストが連結
(sort-time stable-sort-list *nearly-sorted-list2*)
Evaluation took:
  0.271 seconds of real time  ; 0.271秒
  0.272017 seconds of total run time (0.272017 user, 0.000000 system)
  100.37% CPU
  538,952,732 processor cycles
  0 bytes consed
=> T

;; 5] ランダムリスト
(sort-time stable-sort-list *random-list*)
Evaluation took:
  2.171 seconds of real time  ; 2.171秒
  2.168135 seconds of total run time (2.160135 user, 0.008000 system)
  99.86% CPU
  4,332,215,938 processor cycles
  0 bytes consed
=> T

ソート済みのリストに対する改良を加えたマージソート

変更後のマージソート関数: ※ 変更内容はコメントを参照

;; 改良版マージソート関数
;; - fast-merge-lists*関数が追加されたこと以外は、もともとの関数とほとんど同様
;; - fast-merge-lists*関数は要素の範囲が重複しない二つのリストをO(1)でマージ可能
(declaim (inline stable-sort-list2))
(defun stable-sort-list2 (list test key &aux (head (cons :head list)))
  (declare (type list list)
           (type function test key)
           (dynamic-extent head))
        
           ;; マージ対象の二つのリスト内の片方が、もう片方に完全に先行している場合は、
           ;; 各要素の比較などは省略して、末尾のcdrの更新のみを行う。
  (labels ((fast-merge-lists* (try-fast-merge? list1 tail1 list2 tail2 rest)
             (when try-fast-merge?
                      ;; list1がlist2に完全に先行: (list1 .. tail1) <= (list2 .. tail2)
               (cond ((not (funcall test (funcall key (car list2))
                                         (funcall key (car tail1))))
                      (setf (cdr tail1) list2)
                      (return-from fast-merge-lists* (values list1 tail2 rest)))

                      ;; list2がlist1に完全に先行: (list2 .. tail2) < (list1 .. tail1)
                     ((funcall test (funcall key (car tail2))
                                    (funcall key (car list1)))
                      (setf (cdr tail2) list1)
                      (return-from fast-merge-lists* (values list2 tail1 rest)))))
             
             ;; その他: 通常のマージ
             (values (merge-lists* head list1 list2 test key)
                     (if (null (cdr tail1))
                         tail1
                       tail2)
                     rest))
                  
            ;; トップダウンマージリスト関数: リストの末尾を管理するようになったのとfast-merge-lists*関数を使うようになったこと以外は変更なし            
            (recur (list size)
             (declare (optimize speed)
                      (type cons list)
                      (type (and fixnum unsigned-byte) size))
             (if (= 1 size)
                 (values list list (shiftf (cdr list) nil))
                 (let ((half (ash size -1)))
                   (multiple-value-bind (list1 tail1 rest)
                       (recur list half)
                     (multiple-value-bind (list2 tail2 rest)
                         (recur rest (- size half))
                       (fast-merge-lists* (>= size 8)  ; オーバヘッドを少なくするために、一定サイズ以上のリストに対してのみ適用を試みる
                                          list1 tail1 list2 tail2 rest)))))))
    (when list
      (values (recur list (length list))))))

;; 実行
(stable-sort-list2 '(8 73 2 40 0 3) #'< #'identity)
=> (0 2 3 8 40 73)

処理時間計測:

;; 1] ソート済みリスト
(sort-time stable-sort-list2 *sorted-list*)
Evaluation took:
  0.086 seconds of real time  ; 0.086秒  (変更前: 0.254秒)
  0.088005 seconds of total run time (0.088005 user, 0.000000 system)
  102.33% CPU
  171,845,432 processor cycles
  0 bytes consed
=> T

;; 2] 逆順ソート済みリスト
(sort-time stable-sort-list2 *reverse-sorted-list*)
Evaluation took:
  0.087 seconds of real time  ; 0.0.87秒  (変更前: 0.235秒)
  0.088006 seconds of total run time (0.088006 user, 0.000000 system)
  101.15% CPU
  173,196,084 processor cycles
  0 bytes consed
=> T

;; 3] ほぼソート済みリスト1  ※ 千要素に一つがランダムな値
(sort-time stable-sort-list2 *nearly-sorted-list1*)
Evaluation took:
  0.293 seconds of real time  ; 0.293秒  (変更前: 0.348秒)
  0.292019 seconds of total run time (0.292019 user, 0.000000 system)
  99.66% CPU
  585,393,530 processor cycles
  0 bytes consed
=> T

;; 4] ほぼソート済みリスト2  ※ 複数のソート済みリストが連結
(sort-time stable-sort-list2 *nearly-sorted-list2*)
Evaluation took:
  0.122 seconds of real time  ; 0.122秒  (変更前: 0.271秒)
  0.120007 seconds of total run time (0.116007 user, 0.004000 system)
  98.36% CPU
  242,403,024 processor cycles
  0 bytes consed
=> T

;; 5] ランダムリスト
(sort-time stable-sort-list2 *random-list*)
Evaluation took:
  2.193 seconds of real time  ; 2.193秒  (変更前: 2.171秒)
  2.192138 seconds of total run time (2.164136 user, 0.028002 system)
  99.95% CPU
  4,376,336,316 processor cycles
  0 bytes consed
=> T

完全にランダムなリストに対するソートは心なしか改良版の方が(ごく若干)遅くなっているように思うが、入力リストにソート済みの部分が多ければ多いほど、確実に改良版の方が速くなっている。
確かに、二つのリストをマージする場合、それぞれの領域が独立しているなら、片方の先頭要素ともう片方の末尾要素を比較するだけで、リスト全体を完全に順序づけ可能なんだけど、自分が実装方法を考えている時には、そのことに思い至らなかった。
なるほどなー。

*1:sbcl-1.0.58/src/code/sort.lisp より引用

Lock-Free Queue

compare-and-swap操作を用いたロックフリーなキューの実装。
SBCLでのみ動作*1

(defpackage lock-free-queue
  (:use :common-lisp)
  (:export queue
           make
           enq 
           deq
           empty-p 
           element-count       
           to-list))
(in-package :lock-free-queue)

;; compare-and-swap: 成功した場合はTを、失敗した場合はNILを返す
(defmacro compare-and-swap (place old new)
  `(eq (sb-ext:compare-and-swap ,place ,old ,new) ,old))

;; キュー構造体
(defstruct queue
  (head nil :type list) 
  (tail nil :type list))

;; リストへ変換/空判定/要素数取得
(defun to-list (que) (copy-seq (cdr (queue-head que))))
(defun empty-p (que) (endp (cdr (queue-head que))))
(defun element-count (que) (length (cdr (queue-head que))))

(defmethod print-object ((o queue) stream)
  (print-unreadable-object (o stream :type t)
    (format stream "~s ~s" :count (element-count o))))

;; キューを生成
(defun make (&optional initial-contents)
  (let ((contents (cons :initial-head initial-contents)))
    (make-queue :head contents
                :tail (last contents))))

;; キューの末尾に要素を追加する
;; => queue
(defun enq (x que)
  (loop WITH new-elem = (list x)
        FOR tail = (queue-tail que)
    DO
    (cond ((cdr tail)
           (compare-and-swap (queue-tail que) tail (cdr tail)))  ; tailの位置を調整
          ((compare-and-swap (cdr tail) nil new-elem)
           (return que)))))                                      ; 追加成功

;; キューの先頭から要素を取り出す
;; => (or (values 先頭要素 T)   ; キューに要素がある場合
;;        (values NIL NIL))     ; キューが空の場合
(defun deq (que)
  (let* ((head (queue-head que))
         (next (cdr head)))
    (cond ((null next)
           (values nil nil))       ; 空
          ((compare-and-swap (queue-head que) head next)
           (values (car next) t))  ; 取得成功
          (t
           (deq que)))))           ; 他スレッドと競合(リトライ)

実行例:

;; シングルスレッドでの例
(defparameter *que* (lock-free-queue:make))
=> *QUE*

(lock-free-queue:enq 1 *que*)
=> #<LOCK-FREE-QUEUE:QUEUE :COUNT 1>

(lock-free-queue:enq 2 *que*)
=> #<LOCK-FREE-QUEUE:QUEUE :COUNT 2>

(lock-free-queue:to-list *que*)
=> (1 2)

(lock-free-queue:deq *que*)
=> 1
   T

(lock-free-queue:deq *que*)
=> 2
   T

(lock-free-queue:deq *que*)
=> NIL
   NIL

;; マルチスレッドでの例
(let ((data (loop FOR i FROM 0 BELOW 10000 COLLECT i))
      (que (lock-free-queue:make))
      (thread-num 500))
  
  ;; enqueuers
  (loop REPEAT thread-num
        DO (sb-thread:make-thread 
            (lambda ()
              (dolist (e data)
                (lock-free-queue:enq e que)))))

  ;; dequeuer
  (list
   (length 
    (loop REPEAT (* thread-num (length data))
          COLLECT 
          (loop
           (multiple-value-bind (val ok?) (lock-free-queue:deq que)
             (when ok?
               (return val))))))
   que))
=> 5000000
   #<LOCK-FREE-QUEUE:QUEUE :COUNT 0>

*1:sb-ext:compare-and-swapを置き換えれば他の処理系でも動作可能

複数プロセスで共有しているmutexのロック中にSIGKILLを投げたらどうなるか

結論: デッドロックになってしまう


自動的にロックを解放してくれたりはしないみたい。
以下、試した内容のメモ書き。

環境

$ cat /proc/version
Linux version 3.0.0-23-generic (buildd@komainu) (gcc version 4.6.1 (Ubuntu/Linaro 4.6.1-9ubuntu3) ) #38-Ubuntu SMP Fri Jul 6 14:43:30 UTC 2012

テスト用ソースコード

共有mutexに対して、ロック => スリープ(10秒) => アンロック、を行う子プロセスを四個作成するプログラム。
かなりテキトウ。

/**
 * フィル名: mutex-text.cc
 * コンパイル: g++ -o mutex-test mutex-test.cc
 */
#include <pthread.h>
#include <iostream>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/shm.h>
#include <assert.h>

// 共有メモリの管理クラス
class mem {
public:
  mem(int size) : ptr_(NULL) {
    int shmid = shmget(IPC_PRIVATE, sizeof(pthread_mutex_t), 0600);
    if(shmid == -1) { return; }
    
    ptr_ = shmat(shmid, NULL, 0);
    if(ptr_ == reinterpret_cast<void*>(-1)) {
      ptr_ = NULL;
    }
  }

  ~mem() {
    if(ptr_ != NULL) {
      shmdt(ptr_);
    }
  }

  operator bool() const { return ptr_ != NULL; }

  template <typename T>
  T* ptr() { return reinterpret_cast<T*>(ptr_); }
  
private:
  void* ptr_;
};

// 複数プロセスで共有可能なミューテックスクラス
class mutex_lock {
public:
  mutex_lock() : m_(sizeof(pthread_mutex_t)), valid_(false) {
    if(! m_) { return; }

    // プロセス間で共有可能にするためにPTHREAD_PROCESS_SHAREDを付与する
    pthread_mutexattr_t mattr;
    if(pthread_mutexattr_init(&mattr) != 0) { return; }
    if(pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED) != 0) { return; }
    
    // 共有領域のmutexオブジェクトを初期化
    if(pthread_mutex_init(m_.ptr<pthread_mutex_t>(), &mattr) != 0) { return; }
    
    pthread_mutexattr_destroy(&mattr);
    
    valid_ = true;
  }
  
  ~mutex_lock() {
    if(valid_) {
      pthread_mutex_destroy(m_.ptr<pthread_mutex_t>());
    }
  }
  operator bool () const { return valid_; } 

  // ロック
  void lock() {
    assert(pthread_mutex_lock(m_.ptr<pthread_mutex_t>()) == 0);
  }

  // アンロック
  void unlock() {
    assert(pthread_mutex_unlock(m_.ptr<pthread_mutex_t>()) == 0);
  }

private:
  mem m_;
  bool valid_;
};

// main関数
int main(int argc, char** argv) {
  mutex_lock mutex;
  if(! mutex) {
    return 1;
  }
  
  pid_t parent = getpid();
  for(int i=0; i < 4; i++) {
    // 子プロセスのforkと lock => sleep => unlock 処理
    // 子プロセスは四個作成
    if(fork() == 0) {
      std::cout << "[" << getpid() << "] before lock" << std::endl;
      mutex.lock();

      std::cout << "[" << getpid() << "] in lock" << std::endl;
      sleep(10);  // 適当な時間sleep

      mutex.unlock();
      std::cout << "[" << getpid() << "] after lock" << std::endl;
      break;
    }
  }

  if(parent == getpid()) {
    for(int i=0; i < 4; i++) {
      waitid(P_ALL, 0, NULL, WEXITED);
    }
  }

  return 0;
}

実行結果

普通に実行した場合:

$ ./mutex-test
[31413] before lock
[31412] before lock
[31412] in lock
[31415] before lock
[31414] before lock
[31412] after lock
[31413] in lock
[31413] after lock
[31415] in lock
[31415] after lock
[31414] in lock
[31414] after lock

途中で子プロセスにSIGKILLを投げた場合:

$ ./mutex-test 
[31443] before lock
[31443] in lock       # <- このプロセスにSIGKILLを投げる (kill -9 31443)
[31444] before lock
[31445] before lock
[31446] before lock
# 以後 31443 が獲得したロックが解放されることなく、デッドロックに陥る

とりあえず手元の環境では、このような挙動となった。

エラトステネスの篩

loop*1を使って、エラトステネスの篩を実装してみたメモ。
以下、処理系にはSBCLのver1.0.54(x86-64bit)を使用。

;; 引数nまでの範囲の素数のシーケンス(ジェネレータ)を作成する
(declaim (inline make-prime-sequence))
(defun make-prime-sequence (n)
  (let ((arr (make-array (1+ n) :element-type 'bit :initial-element 1)))
    (flet ((prime? (i) (= (bit arr i) 1))       
           (not-prime! (i) (setf (bit arr i) 0))) 
      (declare (inline prime? not-prime!))

      (loop:each (lambda (i)
                   (when (prime? i)
                     (loop:each #'not-prime! (loop:from (* i 2) :to n :by i))))
                 (loop:from 2 :to (floor (sqrt n))))
    
      (loop:filter #'prime? (loop:from 2 :to n)))))

;;; 実行例
;; 100以下の素数
(loop:collect (make-prime-sequence 100))
=> (2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61 67 71 73 79 83 89 97)

;; 1001から1010番目の素数
(loop:collect (loop:take 10 (loop:drop 1000 (make-prime-sequence 10000000))))
=> (7927 7933 7937 7949 7951 7963 7993 8009 8011 8017)

通常のループ(loopマクロ)を使った場合との速度比較。

;; 比較用に素数の合計値を求める関数を用意
(defun prime-sum1 (n)
  (declare (fixnum n)
           (optimize (speed 3) (safety 0) (debug 0)))
  (loop:sum #'identity (make-prime-sequence n)))

;; 一億以下の素数の合計値
(time (prime-sum1 100000000))
Evaluation took:
  1.675 seconds of real time  ; 1.675秒
  1.676105 seconds of total run time (1.676105 user, 0.000000 system)
  100.06% CPU
  3,342,591,038 processor cycles
  12,500,032 bytes consed
=> 279209790387276
;; loopマクロ版
(defun prime-sum2 (n)
  (declare (fixnum n)
           (optimize (speed 3) (safety 0) (debug 0)))
  (let ((arr (make-array (1+ n) :element-type 'bit :initial-element 1)))
    (flet ((prime? (i) (= (bit arr i) 1))
           (not-prime! (i) (setf (bit arr i) 0)))
      (declare (inline prime? not-prime!))

      (loop FOR i fixnum FROM 2 TO (floor (sqrt n))
            WHEN (prime? i)
        DO
        (loop FOR j fixnum FROM (* i 2) TO n BY i
          DO
          (not-prime! j)))

      (loop WITH sum OF-TYPE (unsigned-byte 64)
            FOR i fixnum FROM 2 TO n
            WHEN (prime? i)
        DO (incf sum i)
        FINALLY (return sum)))))

;; 一億以下の素数の合計値
(time (prime-sum2 100000000))
Evaluation took:
  1.476 seconds of real time  ; 1.476秒
  1.472092 seconds of total run time (1.468092 user, 0.004000 system)
  99.73% CPU
  2,944,592,020 processor cycles
  12,500,032 bytes consed
=> 279209790387276

ループ処理を関数型っぽく書いてみる(2)

前回の続き。
githubにあるloopの簡易版を載せておく。

基本的な考え方

基本的なJava等のIteratorと似た*1インタフェースを通してループ処理を実現している。
異なるのは全ての関数をinline展開可能にすることで、同等のループを非関数型的に書いた場合と同じくらいに、コンパイラが最適化を行ってくれることを期待していることくらい。
後は、SBCLの最適化の制限上、構造体等は使用せず、極力lambdaで全てを表現するようにしている。

実装

まず、loopパッケージ用のシーケンス生成関数。

;; 数値の範囲を表現するシーケンス
(declaim (inline from))
(defun from (start &key to (by 1))  ; toがnilなら無限シーケンス
  ;; 全体をlambdaで囲む。このlambdaの呼び出しがシーケンスの初期化処理に相当する。
  (lambda () 
    (let ((cur start))
      ;; 以下の三つの関数を呼び出し元に返す
      (values (lambda () (incf cur by))          ; 1] 値更新関数
              (lambda () (and to(> cur to)))     ; 2] 終端判定関数
              (lambda (fn) (funcall fn cur)))))) ; 3] ループの本体実行関数

;; リスト用
(declaim (inline for-list))
(defun for-list (list)
  (lambda ()
    (let ((head list)) ; 初期値
      (values (lambda () (setf head (cdr head)))         ; 1] 値更新関数
              (lambda () (endp head))                    ; 2] 終端判定関数
              (lambda (fn) (funcall fn (car head)))))))  ; 3] ループの本体実行関数

;; 実行
> (from 1 :to 10)
#<CLOSURE (LAMBDA () :IN FROM) {1007855D3B}>

> (funcall (from 1 :to 10))
#<CLOSURE (LAMBDA () :IN FROM) {10078C432B}>   ; 1] 値更新関数
#<CLOSURE (LAMBDA () :IN FROM) {10078C434B}>   ; 2] 終端判定関数
#<CLOSURE (LAMBDA (FN) :IN FROM) {10078C436B}> ; 3] ループの本体実行関数

> (setf (values next end? call-body) (funcall (from 1 :to 10)))
> (funcall call-body (lambda (x) (list :val x)))
=> (:val 1)

> (funcall next)
> (funcall call-body (lambda (x) (list :val x)))
=> (:val 2)

上の関数で生成されたシーケンスを走査する関数。

;; 一番基本となる走査関数
(declaim (inline each))
(defun each (fn seq)
  (multiple-value-bind (next-fn end-fn call-fn) (funcall seq)  ; シーケンス初期化
    (loop UNTIL (funcall end-fn)   ; 終端判定
          DO (funcall call-fn fn)  ; 本体実行
             (funcall next-fn))))  ; 値更新

;; 畳み込み関数  ※ reduceはclパッケージとそれと名前が衝突するので、ここではfoldにしている
(declaim (inline fold))
(defun fold (fn init seq)
  (let ((acc init))
    (each (lambda (x)
            (setf acc (funcall fn acc x)))
          seq)
    acc))

;; シーケンスを集めたリストを返す
(declaim (inline collect))
(defun collect (seq)
  (nreverse (fold (lambda (acc x) (cons x acc))
                    '()
                    seq)))

;; 実行
> (collect (from 1 :to 20 :by 3))
=> (1 4 7 10 13 16 19)

; 合計値計算
> (fold (lambda (acc x) (+ acc x))
        0
        (from 1 :to 20 :by 3))
=> 70

mapとかfilterとかシーケンスを加工/制御する関数。

;; map関数
(declaim (inline map-seq))
(defun map-seq (map-fn seq)
  ;; ソースとなるシーケンスの情報を取得し、それをラップして返す
  (multiple-value-bind (next-fn end-fn call-fn) (funcall seq)
    (lambda ()
      (values next-fn  ; 値更新関数と終端判定関数はそのまま
              end-fn
              (lambda (body-fn)
                ;; 本体呼び出し前に、マップ処理用関数を差し込む
                (funcall call-fn (lambda (val) (funcall body-fn (funcall map-fn val)))))))))

;; filter関数: (funcall pred-fn val)がnilとなる要素をスキップする
(declaim (inline filter))
(defun filter (pred-fn seq)
  ;; ソースとなるシーケンスの情報を取得し、それをラップして返す
  (multiple-value-bind (next-fn end-fn call-fn) (funcall seq)
    (lambda ()
      (values next-fn ; 値更新関数と終端判定関数はそのまま
              end-fn
              (lambda (body-fn)
                ;; 本体呼び出し前に、フィルター処理用関数を差し込む
                (funcall call-fn (lambda (val)
                                   (unless (funcall pred-fn val)
                                     (funcall body-fn val)))))))))

;; 実行
; 二乗する
> (collect (map-seq (lambda (x) (* x x)) (from 1 :to 20)))
=> (1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361 400)

; 奇数の値だけフィルタして二乗する
> (collect (map-seq (lambda (x) (* x x)) (filter #'oddp (from 1 :to 20))))
=> (1 9 25 49 81 121 169 225 289 361)

これらの関数群を組み合わせてループ処理を表現すると、そこそこ良い感じのコードを生成してくれる。

;; 上で定義した関数群を用いたsum関数
;; - startからendの範囲の奇数値を-10した合計値を返す
(defun sum1 (start end)
 (declare (fixnum start end)
          (optimize (speed 3) (safety 0))
          (sb-ext:unmuffle-conditions sb-ext:compiler-note))
 (fold (lambda (acc n)
         (the fixnum (+ acc n)))
       0
       (map-seq (lambda (x) (- x 10)) 
                (filter #'oddp (from start :to end)))))

;; loopマクロを使用したsum関数
(defun sum2 (start end)
 (declare (fixnum start end)
          (optimize (speed 3) (safety 0))
          (sb-ext:unmuffle-conditions sb-ext:compiler-note))
 (loop WITH total fixnum = 0
       FOR i FROM start TO end
       WHEN (oddp i)
   DO (let ((n (- i 10)))
        (declare (fixnum n))
        (incf total n))
   FINALLY (return total)))

;; 一億要素に対するループ
> (time (sum1 1 100000000))
Evaluation took:
  0.134 seconds of real time  ; 0.134秒
  0.136009 seconds of total run time (0.136009 user, 0.000000 system)
  101.49% CPU
  267,335,373 processor cycles
  0 bytes consed
=> 2499999500000000

> (time (sum2 1 100000000))
Evaluation took:
  0.131 seconds of real time  ; 0.131秒
  0.132008 seconds of total run time (0.132008 user, 0.000000 system)
  100.76% CPU
  261,630,697 processor cycles
  0 bytes consed
=> 2499999500000000

;; disassemble結果
> (disassemble #'sum1)
; disassembly for SUM1
; 07AA1C28:       31D2             XOR EDX, EDX               ; no-arg-parsing entry point
;       2A:       EB1B             JMP L2
;       2C:       90               NOP
;       2D:       90               NOP
;       2E:       90               NOP
;       2F:       90               NOP
;       30: L0:   488BC1           MOV RAX, RCX
;       33:       488D1C4500000000 LEA RBX, [RAX*2]
;       3B:       4883E302         AND RBX, 2
;       3F:       4885DB           TEST RBX, RBX
;       42:       750E             JNE L3
;       44: L1:   48FFC1           INC RCX
;       47: L2:   4839F9           CMP RCX, RDI
;       4A:       7EE4             JLE L0
;       4C:       488BE5           MOV RSP, RBP
;       4F:       F8               CLC
;       50:       5D               POP RBP
;       51:       C3               RET
;       52: L3:   4883E80A         SUB RAX, 10
;       56:       48D1FA           SAR RDX, 1
;       59:       4801C2           ADD RDX, RAX
;       5C:       48D1E2           SHL RDX, 1
;       5F:       EBE3             JMP L1

> (disassemble #'sum2)
; disassembly for SUM2
; 07EF0DB8:       31D2             XOR EDX, EDX               ; no-arg-parsing entry point
;       BA:       EB2A             JMP L2
;       BC:       90               NOP
;       BD:       90               NOP
;       BE:       90               NOP
;       BF:       90               NOP
;       C0: L0:   488D044D00000000 LEA RAX, [RCX*2]
;       C8:       4883E002         AND RAX, 2
;       CC:       4885C0           TEST RAX, RAX
;       CF:       7412             JEQ L1
;       D1:       488D044D00000000 LEA RAX, [RCX*2]
;       D9:       488BD8           MOV RBX, RAX
;       DC:       4883EB14         SUB RBX, 20
;       E0:       4801DA           ADD RDX, RBX
;       E3: L1:   48FFC1           INC RCX
;       E6: L2:   4839F9           CMP RCX, RDI
;       E9:       7ED5             JLE L0
;       EB:       488BE5           MOV RSP, RBP
;       EE:       F8               CLC
;       EF:       5D               POP R

最後は複数シーケンスをまとめるzip関数。
これを使うと表現力はだいぶ上がるけど、性能は若干劣化する。

;; 二つのシーケンスをまとめる
(declaim (inline zip))
(defun zip (loop1 loop2 &aux (undef (gensym)))
  (multiple-value-bind (next-fn1 end-fn1 call-fn1) (funcall loop1)
    (multiple-value-bind (next-fn2 end-fn2 call-fn2) (funcall loop2)
      (let ((memo1 undef)
            (memo2 undef))
        (lambda ()
          (values (lambda () ; 値更新
                    (when (eq memo1 undef) (funcall next-fn1))
                    (when (eq memo2 undef) (funcall next-fn2)))

                  (lambda () ; 終端判定
                    (or (funcall end-fn1) (funcall end-fn2)))

                  (lambda (body-fn)  ; 本体呼び出し
                    ;; それぞれのシーケンスの次の値を取得する
                    ;; (次の値がfilterでスキップされた場合は memoX はundefのままになる)
                    (when (eq memo1 undef)
                      (funcall call-fn1 (lambda (val) (setf memo1 val))))

                    (when (eq memo2 undef)
                      (funcall call-fn2 (lambda (val) (setf memo2 val))))
    
                    ;; 両方のシーケンスの値が取得できたら、本体を呼び出す
                    (when (not (or (eq memo1 undef)
                                   (eq memo2 undef)))
                      (funcall fn (list memo1 memo2))  ; XXX: listで二つの値をまとめるのはconsingが発生するので効率が悪い (そのためloopパッケージでは、多引数を受け取るmapやfilterを用意している)
                      (setf memo1 undef
                            memo2 undef)))))))))

;; 実行
> (collect
    (zip (filter (lambda (n) (and (oddp n)  (zerop (mod n 3)))) (from 1))           ; 奇数かつ三の倍数
         (filter (lambda (n) (and (evenp n) (zerop (mod n 5)))) (from 1 :to 100)))) ; 偶数かつ五の倍数
=> ((3 10) (9 20) (15 30) (21 40) (27 50) (33 60) (39 70) (45 80) (51 90) (57 100))

zipはもう少し上手く実装したいところだけど、それでも関数型っぽく書いても実用上十分な性能がでるループ処理が実現できそうなことが分かったので、結構満足している。

*1:似てないかも

ループ処理を関数型っぽく書いてみる(1)

今週は、common lispでループ処理を関数型っぽく、かつ効率良く実装できるかどうかを試していたので、その結果を載せておく。
結論から云えば、処理系の十分な最適化を期待できれば、関数型っぽく書いても、手続き型的に書いた場合と比肩しえる性能が得られそうな感じだった。

成果物

作ったもの: loop
今回はこれの使用例とベンチマーク結果を、次回は実装方法を載せる予定。

使用例

シーケンス(or 無限シーケンス)とmapとかfilterとかを組み合わせてループを表現する。

;; 1から5までの数値を表示する
> (loop:each (lambda (n) (print n))
             (loop:from 1 :to 5))
1
2
3
4
5
=> NIL

> (loop:from 1 :to 5)
=> #<CLOSURE (LAMBDA () :IN LOOP:FROM) {10030E0E0B}> ; 実態はクロージャー

;; マッピング
> (loop:collect (loop:map (lambda (c) (list c (char-code c)))
                          (loop:for-string "mapping")))
=> ((#\m 109) (#\a 97) (#\p 112) (#\p 112) (#\i 105) (#\n 110) (#\g 103))

;; フィルター
> (loop:collect (loop:filter #'oddp (loop:from 1 :to 10)))
-> (1 3 5 7 9)

;;; fizzbuzz
> (defun fizzbuzz-seq ()
    (loop:filter #'consp
      (loop:map (lambda (n) (cond ((zerop (mod n 15)) (cons n :fizzbuzz))
                                  ((zerop (mod n  5)) (cons n :buzz))
                                  ((zerop (mod n  3)) (cons n :fizz))
                                  (t nil)))
                (loop:from 1))))

;; 先頭三つ
> (loop:collect (loop:take 3 (fizzbuzz-seq)))
=> ((3 . :FIZZ) (5 . :BUZZ) (6 . :FIZZ))

;; 10から12番目
> (loop:collect (loop:take 2 (loop:drop 10 (fizzbuzz-seq))))
=> ((24 . :FIZZ) (25 . :BUZZ))

;; 100以下かつ偶数のものだけ
> (loop:collect 
   (loop:take-while (lambda (x) (<= (car x) 100)) 
                    (loop:filter (lambda (x) (evenp (car x)))
                                 (fizzbuzz-seq))))
=> ((6 . :FIZZ) (10 . :BUZZ) (12 . :FIZZ) (18 . :FIZZ) (20 . :BUZZ) (24 . :FIZZ)
    (30 . :FIZZBUZZ) (36 . :FIZZ) (40 . :BUZZ) (42 . :FIZZ) (48 . :FIZZ)
    (50 . :BUZZ) (54 . :FIZZ) (60 . :FIZZBUZZ) (66 . :FIZZ) (70 . :BUZZ)
    (72 . :FIZZ) (78 . :FIZZ) (80 . :BUZZ) (84 . :FIZZ) (90 . :FIZZBUZZ)

;; zip
> (loop:collect 
    (loop:take 3
      (loop:map-n 3 (lambda (x y z) (list x y z))  ; zipと組み合わせる場合は XXX-n 系のマクロを使用して、引数の数を指定する
        (loop:zip (loop:filter #'oddp (loop:from 1))
                  (loop:down-from 100 :by 3)
                  (loop:map #'sqrt (loop:repeat (lambda () (random 1000))))))))
=> ((1 100 19.078785) 
    (3 97 19.052559) 
    (5 94 16.309507))

;; フィボナッチ数列を定義
(defun fib-seq () 
  (let ((n+1 1))
    (declare (fixnum n+1))
    (loop:make-generator 
     :init (lambda () 0)                             ; 初期値生成関数
     :next (lambda (n) (prog1 n+1 (incf n+1 n)))     ; 値更新関数
     :end? (lambda (n) (declare (ignore n)) nil))))  ; 終端判定関数

> (loop:collect (loop:take 20 (fib-seq)))
=> (0 1 1 2 3 5 8 13 21 34 55 89 144 233 377 610 987 1597 2584 4181)

速度比較

loopマクロやdoを使って書いた場合との速度比較。
処理系はSBCL(1.0.54)

比較1: sum関数 (単純なループ)
;; 準備
(defparameter *fastest* '(optimize (speed 3) (safety 0) (debug 0) (compilation-speed 0)))
(defparameter *note* '(sb-ext:unmuffle-conditions sb-ext:compiler-note))

;; loopパッケージ版
(defun sum1 (start end)
  (declare (fixnum start end) #.*fastest* #.*note*)
  (loop:reduce (lambda (acc x) (the fixnum (+ acc x)))
               0
               (loop:from start :to end)))

;; loopマクロ版
(defun sum2 (start end)
  (declare (fixnum start end) #.*fastest* #.*note*)
  (loop WITH total fixnum = 0
        FOR i FROM start TO end
        DO (incf total i)
        FINALLY (return total)))  ; (loop ... SUM i) では最適化されない部分があるので、少し長くなるけどこちらを採用

;; do版
(defun sum3 (start end)
  (declare (fixnum start end) #.*fastest* #.*note*)
  (let ((total 0))
    (declare (fixnum total))
    (do ((i start (1+ i)))
        ((> i end) total)
      (incf total i))))

;; 実行
> (time (sum1 1 100000000))  ; loopパッケージ版: 0.084秒
Evaluation took:
  0.084 seconds of real time
  0.084006 seconds of total run time (0.084006 user, 0.000000 system)
  100.00% CPU
  167,231,593 processor cycles
  0 bytes consed
=> 5000000050000000

> (time (sum2 1 100000000)) ; loopマクロ版: 0.086秒
Evaluation took:
  0.086 seconds of real time
  0.088005 seconds of total run time (0.088005 user, 0.000000 system)
  102.33% CPU
  171,410,077 processor cycles
  0 bytes consed
=> 5000000050000000

> (time (sum3 1 100000000)) ; do版: 0.083秒
Evaluation took:
  0.083 seconds of real time
  0.084005 seconds of total run time (0.084005 user, 0.000000 system)
  101.20% CPU
  166,793,649 processor cycles
  0 bytes consed
=> 5000000050000000

単純なループ処理なら、どれも速度は同じくらい。

比較2: 数値リストの奇数番目の要素の平均値を求める (zipを使ったループ)
;; データ準備: 1000万要素のリスト
(defparameter *list* (loop REPEAT 10000000 COLLECT (random 100000)))

;; loopパッケージ版
(defun avg1 (list)
  (declare #.*fastest* #.*note*)
  (flet ((average (sequence)  ; シーケンスの生成と平均値を求める処理を分離することが可能
           (let ((total 0)
                 (count 0))
             (declare (fixnum total count))
             (loop:each (lambda (n)
                          (incf total n)
                          (incf count))
                        sequence)
             (float (/ total count)))))

    (let ((seq (loop:map-n 2 (lambda (_ n) n)
                 (loop:filter-n 2 (lambda (i _) (oddp i))
                   (loop:zip (loop:from 0 :to most-positive-fixnum)
                             (loop:for-list list :element-type fixnum))))))
      (average seq))))

;; loopマクロ版
(defun avg2 (list)
  (declare #.*fastest* #.*note*)
  (loop WITH total fixnum = 0
        WITH count fixnum = 0
        FOR i fixnum FROM 0
        FOR n fixnum IN list
        WHEN (oddp i)
    DO
    (incf total n)
    (incf count)
    FINALLY
    (return (float (/ total count)))))

;; do版
(defun avg3 (list)
  (declare #.*fastest* #.*note*)
  (let ((total 0)
        (count 0))
    (declare (fixnum total count))
    (do ((i 0 (1+ i))
         (head list (cdr head)))
        ((endp head))
      (declare (fixnum i))
      (when (oddp i)
        (incf count)
        (incf total (the fixnum (car head)))))
    (float (/ total count))))

;; 実行
> (time (avg1 *list*))  ; loopパッケージ版: 0.084秒
Evaluation took:
  0.084 seconds of real time
  0.084005 seconds of total run time (0.084005 user, 0.000000 system)
  100.00% CPU
  166,739,958 processor cycles
  0 bytes consed
=> 50003.64

> (time (avg2 *list*))  ; loopマクロ版: 0.036秒
Evaluation took:
  0.036 seconds of real time
  0.036002 seconds of total run time (0.036002 user, 0.000000 system)
  100.00% CPU
  72,645,764 processor cycles
  0 bytes consed
=> 50003.64

> (time (avg3 *list*))  ; do版: 0.037秒
Evaluation took:
  0.037 seconds of real time
  0.040003 seconds of total run time (0.040003 user, 0.000000 system)
  108.11% CPU
  75,246,648 processor cycles
  0 bytes consed 
=> 50003.64

loopパッケージ版はzipを通すと、loopマクロやdoを使った場合の半分以下になってしまう。

比較3: 比較2での、平均値算出部分と奇数番要素のフィルタ部分を、別関数に分けた場合
;; loopマクロで、平均値算出部分と奇数番要素のフィルタ部分を、別関数に分けた場合
(declaim (inline average-list))
(defun average-list (list)
  (declare #.*fastest* #.*note*)
  (loop WITH total fixnum = 0
        WITH count fixnum = 0
        FOR n fixnum IN list
    DO
    (incf total n)
    (incf count)
    FINALLY
    (return (float (/ total count)))))

(declaim (inline filter-list))
(defun filter-list (list)
  (declare #.*fastest* #.*note*)
  (loop FOR i fixnum FROM 0
        FOR x IN list
        WHEN (oddp i)
        COLLECT x))

> (time (average-list (filter-list *list*)))
Evaluation took:
  0.122 seconds of real time              ; 全体で0.122秒、GC抜きなら0.081秒
  0.120008 seconds of total run time (0.120008 user, 0.000000 system)
  [ Run times consist of 0.040 seconds GC time, and 0.081 seconds non-GC time. ]
  98.36% CPU
  244,986,221 processor cycles
  79,986,688 bytes consed
=> 50003.64

;; do版
;; ※ loopマクロ版とほとんど変わらないので省略

;; loopパッケージで、平均値算出部分と奇数番要素のフィルタ部分を、別関数に分けた場合
(declaim (inline average-loop))
(defun average-loop (sequence)
  (declare #.*fastest* #.*note*)
  (let ((total 0)
        (count 0))
    (declare (fixnum total count))
    (loop:each (lambda (n)
                 (incf total (the fixnum n))
                 (incf count))
               sequence)
    (float (/ total count))))

(declaim (inline filter-loop))
(defun filter-loop (list)
  (declare #.*fastest* #.*note*)
  (loop:map-n 2 (lambda (_ n) n)
    (loop:filter-n 2 (lambda (i _) (oddp i))
      (loop:zip (loop:from 0 :to most-positive-fixnum)
                (loop:for-list list :element-type fixnum)))))

> (time (average-loop (filter-loop *list*)))
Evaluation took:
  0.070 seconds of real time   ; 0.070秒
  0.072005 seconds of total run time (0.072005 user, 0.000000 system)
  102.86% CPU
  139,856,631 processor cycles
  0 bytes consed
=> 50003.64

;; loopパッケージでinline宣言を外した場合
(declaim (notinline average-loop))
(declaim (notinline filter-loop))
> (time (average-loop (filter-loop *list*)))
Evaluation took:
  0.378 seconds of real time  ; 0.378秒
  0.376024 seconds of total run time (0.376024 user, 0.000000 system)
  99.47% CPU
  754,498,049 processor cycles
  0 bytes consed
=> 50003.64

loopマクロやdoマクロでは、ループ処理の一部を自然な形で効率よく外だしするのが困難なので、そういった用途ではloopパッケージの方が性能が良い。
ただし、現状のloopパッケージは、inline展開による最適化に過度に依存しているので、展開が効かないケースでは、いっきに処理速度が遅くなってしまう。