マルチプロセスで使用可能なロックフリーキュー

タイトルの通り、マルチプロセスで使用可能なロックフリーのFIFOキューを実装したので、その簡単な紹介。

作成物

github: ipc-msgque (0.0.4)

  • ロックフリーなFIFOキュー
    • 再入可能 かつ SIGKILLに対して安全*1
  • C++
  • 共有メモリ(mmap)を使用
  • マルチプロセス(and マルチスレッド)間の通信に使用可能
  • gcc(ver4.1以上)*2 かつ POSIX準拠環境*3でのみ使用可能

単機能な割に、内部では「まず(割合)汎用的な可変長ブロックメモリアロケータを作って、その上に固定長ブロックアロケータ、さらにその上にFIFOキューを実装」と地味に凝ったことをしている。

使用例

fork()と併用した例。

/**
 * 親子プロセスで共有するFIFOキューを作成し、子から親へメッセージを送信するサンプルプログラム
 * 
 * ファイル名: msgque-sample.cc
 * コンパイル: g++ -o msgque-sample msgque-sample.cc
 */
#include <imque/queue.hh>  // インクルードパスを通しておく

#include <unistd.h>    // fork, getpid
#include <sys/types.h>
#include <stdio.h>     // sprintf
#include <string.h>    // strlen
#include <iostream>
#include <string>

#define CHILD_COUNT 10        // 子プロセスの数
#define QUEUE_ENTRY_COUNT 32  // キューの最大要素数
#define SHM_SIZE 4096         // キューが使用可能な共有メモリのバイト数

int main(int argc, char** argv) {
  // 要素数と共有メモリサイズを指定してキューを作成
  imque::Queue que(QUEUE_ENTRY_COUNT, SHM_SIZE);  
  if(! que) {
    return 1;
  } 

  for(int i=0; i < CHILD_COUNT; i++) {
    if(fork() == 0) {
      // 子プロセスの処理
      char buf[1024]; 
      sprintf(buf, "Hello: %d", getpid());

      // enqueue
      que.enq(buf, strlen(buf));
      return 0;
    }
  }

  // 親プロセスの処理
  for(int i=0; i < CHILD_COUNT; i++) {
    std::string buf;

    // dequeue
    while(que.deq(buf) == false);  // キューが空の間はビジーループ
    std::cout << "[receive] " << buf << std::endl;
  }

  return 0;
}

実行結果:

$ ./msgque-sample 
[receive] Hello: 12736
[receive] Hello: 12737
[receive] Hello: 12738
[receive] Hello: 12740
[receive] Hello: 12739
[receive] Hello: 12742
[receive] Hello: 12744
[receive] Hello: 12743
[receive] Hello: 12745
[receive] Hello: 12741

気が向けば、内部で使用しているメモリアロケータのコードなども載せていくかもしれない。

*1:ただし、メモリを確保してから解放するまでの間にSIGKILL等でプロセスがダウンした場合は、その分のメモリはリークする

*2:__sync_bool_compare_and_swap 等の各種アトミック関数を使用しているため。

*3:共有メモリの仕組みとしてmmapを使用しているため。

複数プロセスで共有しているmutexのロック中にSIGKILLを投げたらどうなるか

結論: デッドロックになってしまう


自動的にロックを解放してくれたりはしないみたい。
以下、試した内容のメモ書き。

環境

$ cat /proc/version
Linux version 3.0.0-23-generic (buildd@komainu) (gcc version 4.6.1 (Ubuntu/Linaro 4.6.1-9ubuntu3) ) #38-Ubuntu SMP Fri Jul 6 14:43:30 UTC 2012

テスト用ソースコード

共有mutexに対して、ロック => スリープ(10秒) => アンロック、を行う子プロセスを四個作成するプログラム。
かなりテキトウ。

/**
 * フィル名: mutex-text.cc
 * コンパイル: g++ -o mutex-test mutex-test.cc
 */
#include <pthread.h>
#include <iostream>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/shm.h>
#include <assert.h>

// 共有メモリの管理クラス
class mem {
public:
  mem(int size) : ptr_(NULL) {
    int shmid = shmget(IPC_PRIVATE, sizeof(pthread_mutex_t), 0600);
    if(shmid == -1) { return; }
    
    ptr_ = shmat(shmid, NULL, 0);
    if(ptr_ == reinterpret_cast<void*>(-1)) {
      ptr_ = NULL;
    }
  }

  ~mem() {
    if(ptr_ != NULL) {
      shmdt(ptr_);
    }
  }

  operator bool() const { return ptr_ != NULL; }

  template <typename T>
  T* ptr() { return reinterpret_cast<T*>(ptr_); }
  
private:
  void* ptr_;
};

// 複数プロセスで共有可能なミューテックスクラス
class mutex_lock {
public:
  mutex_lock() : m_(sizeof(pthread_mutex_t)), valid_(false) {
    if(! m_) { return; }

    // プロセス間で共有可能にするためにPTHREAD_PROCESS_SHAREDを付与する
    pthread_mutexattr_t mattr;
    if(pthread_mutexattr_init(&mattr) != 0) { return; }
    if(pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED) != 0) { return; }
    
    // 共有領域のmutexオブジェクトを初期化
    if(pthread_mutex_init(m_.ptr<pthread_mutex_t>(), &mattr) != 0) { return; }
    
    pthread_mutexattr_destroy(&mattr);
    
    valid_ = true;
  }
  
  ~mutex_lock() {
    if(valid_) {
      pthread_mutex_destroy(m_.ptr<pthread_mutex_t>());
    }
  }
  operator bool () const { return valid_; } 

  // ロック
  void lock() {
    assert(pthread_mutex_lock(m_.ptr<pthread_mutex_t>()) == 0);
  }

  // アンロック
  void unlock() {
    assert(pthread_mutex_unlock(m_.ptr<pthread_mutex_t>()) == 0);
  }

private:
  mem m_;
  bool valid_;
};

// main関数
int main(int argc, char** argv) {
  mutex_lock mutex;
  if(! mutex) {
    return 1;
  }
  
  pid_t parent = getpid();
  for(int i=0; i < 4; i++) {
    // 子プロセスのforkと lock => sleep => unlock 処理
    // 子プロセスは四個作成
    if(fork() == 0) {
      std::cout << "[" << getpid() << "] before lock" << std::endl;
      mutex.lock();

      std::cout << "[" << getpid() << "] in lock" << std::endl;
      sleep(10);  // 適当な時間sleep

      mutex.unlock();
      std::cout << "[" << getpid() << "] after lock" << std::endl;
      break;
    }
  }

  if(parent == getpid()) {
    for(int i=0; i < 4; i++) {
      waitid(P_ALL, 0, NULL, WEXITED);
    }
  }

  return 0;
}

実行結果

普通に実行した場合:

$ ./mutex-test
[31413] before lock
[31412] before lock
[31412] in lock
[31415] before lock
[31414] before lock
[31412] after lock
[31413] in lock
[31413] after lock
[31415] in lock
[31415] after lock
[31414] in lock
[31414] after lock

途中で子プロセスにSIGKILLを投げた場合:

$ ./mutex-test 
[31443] before lock
[31443] in lock       # <- このプロセスにSIGKILLを投げる (kill -9 31443)
[31444] before lock
[31445] before lock
[31446] before lock
# 以後 31443 が獲得したロックが解放されることなく、デッドロックに陥る

とりあえず手元の環境では、このような挙動となった。

簡易スタック型VM(バイトコードインタプリタ)でのフィボナッチ数計算速度

今年はlisp系のプログラミング言語(及びその処理系)を作ってみようと考えていて、かつ(少なくとも)当面の間はスタック型VMを基盤として実装していくことになると思われるので、まずは単純なスタックマシンのバイトコードインタプリタで、どの程度の処理速度がでるのかを計測してみた。

命令一覧と実行サンプル

現状のVMが備える命令一覧*1。必要最小限。
下記、命令セットに関してはForthを少し参考にしている。スタックマシンの動作の詳細に関しては、特に特殊な点もないので説明は割愛。

命令 コード値 in-stack out-stack 意味
int 1 n バイトコード中の後続の四バイト(little-endian)を取り出し、int値を生成
add 2 n1 n2 n3 n1 + n2
sub 3 n1 n2 n3 n1 - n2
mul 4 n1 n2 n3 n1 * n2
div 5 n1 n2 n3 n1 / n2
mod 6 n1 n2 n3 n1 % n2
eql 7 n1 n2 b(1 or 0) n1 == n2
less 8 n1 n2 b n1 < n2
dup 9 x x x スタックの先頭要素を複製
drop 10 x スタックの先頭要素を破棄
swap 11 x1 x2 x2 x1 スタックの先頭二つの要素を入れ替え
over 12 x1 x2 x1 x2 x1 スタックの二番目の要素を先頭に複製
rot 13 x1 x2 x3 x2 x3 x1 スタックの先頭三つの要素をローテーション
rpush 14 x スタック(データスタック)の先頭要素をリターンスタックの先頭に移す
rpop 15 x リターンスタックの先頭要素をスタックに移す
rcopy 16 x リターンスタックの先頭要素をスタックに複製
jump 17 n 無条件分岐。nは分岐先のアドレス
jump_if 18 b n 条件分岐。bが新(非ゼロ)なら分岐する
call 19 n 関数呼び出し。リターンスタックにプログラムカウンタを保存後、無条件分岐
return 20 関数からの復帰。リターンスタックからプログラムカウンタを取り出し、そこへ無条件分岐

末尾にソースコード全体を載せるが、バイトコードインタプリタの実行部は、バイトコードから上記命令に対応するコード値を取得し、命令を実行する、ということをひたすら繰り返すという単純なもの。

  // C++
  typedef unsigned char octet;

  /**
   * バイトコード実行用のクラス
   */
  class executor {
  public:
    void execute(const char* filepath) {
      bytecode_stream in(filepath);
      
      // バイトコードストリームの終端に達するまでループ
      while(in.eos() == false) {
        octet opcode = in.read_octet();  // 命令コード読み出し
        op::call(opcode, in, env);       // コードに対応する処理を実行 (envにはデータスタックとリターンスタックが保持されている)
      }
    }
  };

  class op {
  public:
    // コードに対応する命令を実行
    static void call(octet opcode, bytecode_stream& in, environment& env) {
      switch(opcode) {
      case  1: op_int(in, env); break; // int値構築
      case  2: op_add(in, env); break; // +
      case  3: op_sub(in, env); break; // -
      case  4: op_mul(in, env); break; // *
      case  5: op_div(in, env); break; // /
      case  6: op_mod(in, env); break; // %
      case  7: op_eql(in, env); break; // ==
      ... 省略 ...
        
      default:
        assert(false);
      }
    }
  }

VM部はC++で記述しているが、VMが解釈可能なバイトコード列を生成するためのアセンブラ(コンパイラ)はcommon lispで作成。

;; common lisp
;; 実行例
(load "pvm-compile")

;; 加算を行うバイトコード列を'add.bc'ファイルに出力する
;;  - キーワードは命令を表す
(pvmc:compile-to-file
 "add.bc"
 '(10 20 :add))  ; 10 + 20

;; 条件分岐を行うバイトコード列を'jump.bc'ファイルに出力する
;;
;; シンボルはアドレス参照用のラベルを表す
;; (:addr シンボル)形式で参照可能
;; ※ アドレスはコンパイル時に解決される
(pvmc:compile-to-file
 "jump.bc"
 '(10 10 :eql            ; n1 == n2 ?
   (:addr then) :jump-if ; 等しいなら then に移動
   else
   1 2     ; else: スタックに 1と2 を積む
   (:addr end) :jump
   then 
   3 4    ; then: スタックに 3と4 を積む
   end))

;; 上の例では以下のようなバイト列が生成される
(pvmc::compile-to-bytecodes
 '(10 10 :eql (:addr then) :jump-if else 1 2 (:addr end) :jump then 3 4 end))
 => #(1 10 0 0 0 1 10 0 0 0 7 1 33 0 0 0 18 1 1 0 0 0 1 2 0 0 0 1 43 0 0 0 17 1 3 0
      0 0 1 4 0 0 0)

生成したバイトコードはpvmコマンドで実行可能。

# pvmコマンド作成
$ g++ -O2 -o pvm pvm.cc

# add.bc
$ ./pvm add.bc
[data stack]    # 実行後のデータスタックとリターンスタックの中身が出力される
 0# 30   # 10 + 20

[return stack]

# jump.bc
$ ./pvm jump.bc
[data stack]
 0# 4     # then部が実行された
 1# 3

[return stack]

実行速度

上のVM上でのフィボナッチ数の計算に要した時間。
以下は35のフィボナッチ数計算用のコード。

(pvmc:compile-to-file
 "fib.bc"
 '(
   35    ; fib(35)
   (:addr fib-beg) :call ; fib(25)
   (:addr finish)  :jump
   
   fib-beg
   :dup 2  :less (:addr fib-end) :jump-if  ; if(n < 2) 
   :dup 2  :sub  (:addr fib-beg) :call     ; fib(n - 2)
   :swap 1 :sub  (:addr fib-beg) :call     ; fib(n - 1)
   :add
   fib-end
   :return
   
   finish))

#| 実行結果:
$ time ./pvm fib.bc 
[data stack]
 0# 9227465

[return stack]


real	0m3.605s
user	0m3.600s
sys	0m0.000s
|#

他言語との比較。

言語 所要時間(最適化オプションなし) 所要時間(最適化オプションあり)
gcc-4.6.1 0.112s 0.056s
sbcl-1.0.54 0.320s 0.110s
pvm 3.600s
ruby1.9.1 2.816s
ruby1.8.7 14.497s

現状は本当に単純なインタプリタなので仕方がないとはいえ、Ruby(1.9)よりも遅い・・・。

ちなみに各言語用のソースコードは以下の通り。

// C++
// ファイル名: fib.cc
// コンパイル: g++ -O2 -o fib fib.cc
// 実行: time fib 35
#include <iostream>
#include <cstdlib>

int fib(int n) {
  if(n < 2) {
    return n;
  }
  return fib(n-2) + fib(n-1);
}

int main(int argc, char** argv) {
  std::cout << fib(atoi(argv[1])) << std::endl;
  return 0;
}
;; sbcl
(defun fib (n)
  (declare (optimize (speed 3) (safety 0) (debug 0))
           (fixnum n))
  (if (< n 2)
      n
    (the fixnum (+ (fib (- n 2)) (fib (- n 1))))))

;; 実行
(time (fib 35))
# ruby
# ファイル名: fib.rb
# 実行: time fib.rb 35
def fib (n)
  return n if n < 2
  fib(n-2) + fib(n-1)
end

p fib(ARGV[0].to_i)

ソースコード

VM及びコンパイラ用のソースコード
それぞれ180行、80行程度。

// ファイル名: pvm.hh
/**
 * バイトコードインタプリタ
 */
#ifndef PVM_HH
#define PVM_HH

#include <iostream>
#include <fstream>
#include <cassert>
#include <vector>
#include <algorithm>

namespace pvm {
  typedef unsigned char octet;
  typedef std::vector<int> stack_t;

  
  /**
   * バイトコード読み込みストリーム
   */
  class bytecode_stream {
  public:
    bytecode_stream(const char* filepath) : bytecodes(NULL), position(0) {
      std::ifstream in(filepath);
      assert(in);

      length = in.rdbuf()->in_avail();
      bytecodes = new octet[length];
      in.read((char*)bytecodes, length);
    }
    
    ~bytecode_stream() { delete [] bytecodes; }
    
    bool eos() const { return position >= length; }
    
    octet read_octet () { return bytecodes[position++]; }

    // sizeof(int) == 4 と仮定
    int read_int() {
      int n = *(int*)(bytecodes + position);
      position += 4;
      return n;
    }

    // program counter
    unsigned pc() const { return position; }
    unsigned& pc() { return position; }
    
  private:
    octet* bytecodes;
    unsigned length;
    unsigned position;
  };


  /**
   * データスタックとリターンスタック
   */
  class environment {
  public:
    stack_t& dstack() { return data_stack; }
    stack_t& rstack() { return return_stack; }

    const stack_t& dstack() const { return data_stack; }
    const stack_t& rstack() const { return return_stack; }

  private:
    stack_t data_stack;
    stack_t return_stack;
  };


  /**
   * 各種操作(命令)
   */
  class op {
  public:
    static void call(octet opcode, bytecode_stream& in, environment& env) {
      switch(opcode) {
      case  1: op_int(in, env); break; // int値構築
      case  2: op_add(in, env); break; // +
      case  3: op_sub(in, env); break; // -
      case  4: op_mul(in, env); break; // *
      case  5: op_div(in, env); break; // /
      case  6: op_mod(in, env); break; // %
      case  7: op_eql(in, env); break; // ==
      case  8: op_less(in, env); break;// <

      case  9: op_dup(in, env); break;  // データスタックの先頭要素を複製
      case 10: op_drop(in, env); break; // データスタックの先頭要素を破棄
      case 11: op_swap(in, env); break; // データスタックの最初の二つの要素を入れ替え
      case 12: op_over(in, env); break; // データスタックの二番目の要素を先頭にコピーする
      case 13: op_rot(in, env); break;  // データスタックの先頭三つの要素をローテーションする
        
      case 14: op_rpush(in, env); break; // データスタックの先頭要素を取り出しリターンスタックに追加する
      case 15: op_rpop(in, env); break;  // リターンスタックの先頭要素を取り出しデータスタックに追加する
      case 16: op_rcopy(in, env); break; // リターンスタックの先頭要素をデータすタックに追加する

      case 17: op_jump(in, env); break;    // 無条件分岐
      case 18: op_jump_if(in, env); break; // 条件分岐
      case 19: op_call(in, env); break;    // 関数呼び出し
      case 20: op_return(in, env); break;  // 関数から復帰
        
      default:
        assert(false);
      }
    }

  private:
    typedef bytecode_stream bcs;
    typedef environment env;
    
#define DPUSH(x) e.dstack().push_back(x)
#define DPOP pop_back(e.dstack())
#define DNTH(nth) e.dstack()[e.dstack().size()-1-nth]

#define RPUSH(x) e.rstack().push_back(x)
#define RPOP pop_back(e.rstack())
#define RNTH(nth) e.rstack()[e.rstack().size()-1-nth]

    static void op_int(bcs& in, env& e) { DPUSH(in.read_int()); }
    static void op_add(bcs& in, env& e) { DPUSH(DPOP + DPOP); }
    static void op_sub(bcs& in, env& e) { int n = DPOP; DPUSH(DPOP - n); }
    static void op_mul(bcs& in, env& e) { DPUSH(DPOP * DPOP); }
    static void op_div(bcs& in, env& e) { int n = DPOP; DPUSH(DPOP / n); }
    static void op_mod(bcs& in, env& e) { int n = DPOP; DPUSH(DPOP % n); }
    static void op_eql(bcs& in, env& e) { DPUSH(DPOP == DPOP); }
    static void op_less(bcs& in, env& e) { DPUSH(DPOP > DPOP); }

    static void op_dup(bcs& in, env& e) { DPUSH(DNTH(0)); }
    static void op_drop(bcs& in, env& e) { DPOP; }
    static void op_swap(bcs& in, env& e) { std::swap(DNTH(0), DNTH(1)); }
    static void op_over(bcs& in, env& e) { DPUSH(DNTH(1)); }
    static void op_rot(bcs& in, env& e) { std::swap(DNTH(2), DNTH(0)); std::swap(DNTH(1), DNTH(2)); }

    static void op_rpush(bcs& in, env& e) { RPUSH(DPOP); }
    static void op_rpop(bcs& in, env& e) { DPUSH(RPOP); }
    static void op_rcopy(bcs& in, env& e) { DPUSH(RNTH(0)); }

    static void op_jump(bcs& in, env& e) { in.pc() = DPOP;}
    static void op_jump_if(bcs& in, env& e) { int p = DPOP; if(DPOP){ in.pc() = p;} }
    static void op_call(bcs& in, env& e) { RPUSH(in.pc()); in.pc() = DPOP; }
    static void op_return(bcs& in, env& e) { in.pc() = RPOP; }

#undef DPUSH
#undef DPOP
#undef DNTH

#undef RPUSH
#undef RPOP
#undef RNTH

  private:
    static int pop_back(stack_t& stack) {
      int x = stack.back();
      stack.pop_back();
      return x;
    }
  };


  /**
   * バイトコード実行
   */
  class executor {
  public:
    void execute(const char* filepath) {
      bytecode_stream in(filepath);
      
      while(in.eos() == false) {
        octet opcode = in.read_octet();
        op::call(opcode, in, env);
      }
    }
    
    const environment& get_env() const { return env; }

  private:
    environment env;
  };
}

#endif
// ファイル名: pvm.cc
// バイトコード実行用コマンド
#include "pvm.hh"
#include <iostream>

void show(const char* name, const pvm::stack_t& stack) {
  std::cout << "[" << name << "]" << std::endl;
  for(int i = stack.size()-1; i >= 0; i--) {
    std::cout << " " << (stack.size()-1-i) << "# " << stack[i] << std::endl;
  }
  std::cout << std::endl;  
}

int main(int argc, char** argv) {
  if(argc != 2) {
    std::cerr << "Usage: pvm BYTECODE_FILEPATH" << std::endl;
    return 1;
  }
  
  pvm::executor vm;
  vm.execute(argv[1]);

  const pvm::environment& rlt = vm.get_env();
  show("data stack", rlt.dstack());
  show("return stack", rlt.rstack());

  return 0;
}
;;; ファイル名: pvm-compile.lisp
;;; S式をVM用のバイトコードにコンパイル(アセンブル)する
(defpackage pvm-compile
  (:use :common-lisp)
  (:nicknames :pvmc)
  (:export compile-to-file))
(in-package :pvm-compile)

;; 利用可能な操作(命令)のリスト
(defparameter *op-list*
  '((1 :int)
    (2 :add)
    (3 :sub)
    (4 :mul)
    (5 :div)
    (6 :mod)
    (7 :eql)
    (8 :less)

    (9 :dup)
    (10 :drop)
    (11 :swap)
    (12 :over)
    (13 :rot)

    (14 :rpush)
    (15 :rpop)
    (16 :rcopy)
    
    (17 :jump)
    (18 :jump-if)
    (19 :call)
    (20 :return)))

;; 数値をリトルエンディアンのバイト列に変換する
;; n -> '(b1 b2 b3 b4)
(defun int-to-bytes (n)
  (loop FOR i FROM 0 BELOW 4
        COLLECT (ldb (byte 8 (* i 8)) n)))

;; 操作名に対するコード値を取得する
(defun opcode (op)
  (assert #1=(find op *op-list* :key #'second))
  (first #1#))

;; コンパイル
(defun compile-to-bytecodes (codes)
  (loop WITH unresolves = '()  ; 未解決のアドレス参照
        WITH labels = '()      ; ラベルとアドレスのマッピング
        FOR code IN codes
        FOR pos = (length tmps)
    APPEND
    (etypecase code
      (integer `(,(opcode :int) ,@(int-to-bytes code))) ; 整数値構築
      (keyword (list (opcode code)))                    ; 一般的な操作
      (symbol (push `(,code ,pos) labels)               ; アドレス(PC)参照用のラベル
              '())
      (cons (ecase (first code)                         ; アドレス参照
              (:addr (push `(,(second code) ,(1+ pos)) unresolves)
                     `(,(opcode :int) 0 0 0 0))))) ; この時点では実際のアドレスが不明なので 0 を設定しておく
    INTO tmps
    FINALLY
    (let ((bytecodes (coerce tmps 'vector)))
      ;; アドレス解決
      (loop FOR (label offset) IN unresolves
            FOR label-addr = (second (assoc label labels))
        DO
        (setf (subseq bytecodes offset (+ offset 4)) (int-to-bytes label-addr)))

      (return bytecodes))))

;; コンパイルして結果をファイルに出力する
(defun compile-to-file (filepath assembly-codes)
  (let ((bytecodes (compile-to-bytecodes assembly-codes)))
    (with-open-file (out filepath :direction :output
                                  :if-exists :supersede
                                  :element-type '(unsigned-byte 8))
      (write-sequence bytecodes out)))
  t)

*1:大別すると整数処理系、データスタック操作系、リターンスタック操作系、分岐系の四つ

ビットリバース

割合汎用的な、整数のビットを前後反転する関数を作成してみた。
2の乗数サイズの任意の整数型のビット反転が可能。

// 反転例
bit_reverse(0x0000FFFF) => 0xFFFF0000
// 実装

// バイト単位での変換表
const unsigned char REV_BYTE[]={
  0,128,64,192,32,160,96,224,16,144, 80,208,48,176,112,240,8,136,72,200,
  40,168,104,232,24,152,88,216,56,184,120,248,4,132,68,196,36,164,100,228,
  20,148,84,212,52,180,116,244,12,140,76,204,44,172,108,236,28,156,92,220,
  60,188,124,252,2,130,66,194,34,162,98,226,18,146,82,210,50,178,114,242,
  10,138,74,202,42,170,106,234,26,154,90,218,58,186,122,250,6,134,70,198,
  38,166,102,230,22,150,86,214,54,182,118,246,14,142,78,206,46,174,110,238,
  30,158,94,222,62,190,126,254,1,129,65,193,33,161,97,225,17,145,81,209,
  49,177,113,241,9,137,73,201,41,169,105,233,25,153,89,217,57,185,121,249,
  5,133,69,197,37,165,101,229,21,149,85,213,53,181,117,245,13,141,77,205,
  45,173,109,237,29,157,93,221,61,189,125,253,3,131,67,195,35,163,99,227,
  19,147,83,211,51,179,115,243,11,139,75,203,43,171,107,235,27,155,91,219,
  59,187,123,251,7,135,71,199,39,167,103,231,23,151,87,215,55,183,119,247,
  15,143,79,207,47,175,111,239,31,159,95,223,63,191,127,255};

// リバースクラス
template <int BYTE_SIZE> 
struct bit_rev {
  static const int HALF_SIZE = BYTE_SIZE/2;
  static const int HALF_BITS = HALF_SIZE*8;
  
  // BYTE_SIZEが1以外なら、上位ビットと下位ビットに再帰的に処理を適用した後に、二つを入れ替える
  template <typename T>
  static T reverse(T n) {
    return 
      (bit_rev<HALF_SIZE>::reverse(n) << HALF_BITS) | 
      (bit_rev<HALF_SIZE>::reverse(n>>HALF_BITS));
  }  
};

// BYTE_SIZEが1ならテーブルを参照して、バイトを変換する
template <> struct bit_rev<1> {
  template <typename T>
  static T reverse(T n) {
    return REV_BYTE[n&0xFF];
  }
};

// インターフェース関数
template <typename T>
T bit_reverse(T n) {
  return bit_rev<sizeof(T)>::reverse(n);
}

サンプルコマンド:

/**
 * ファイル名: rev.cc
 * コンパイル: g++ -o rev rev.cc
 */
#include <iostream>

/*
 bit_reverse関数定義
 */

int main() {
  unsigned char n08 = 0x0F;
  unsigned short n16 = 0x009F;
  unsigned int n32 = 0x0000699F;
  unsigned long long n64 = 0x00000000666699FF;
  
  std::cout.setf(std::ios::hex, std::ios::basefield);
  std::cout.setf( std::ios::showbase );

  std::cout << "n08: " << (long long)n08 << " => " << (long long)bit_reverse(n08) << std::endl;
  std::cout << "n16: " << (long long)n16 << " => " << (long long)bit_reverse(n16) << std::endl;
  std::cout << "n32: " << (long long)n32 << " => " << (long long)bit_reverse(n32) << std::endl;
  std::cout << "n64: " << (long long)n64 << " => " << (long long)bit_reverse(n64) << std::endl;
  return 0;
}

実行結果:

$ ./rev
n08: 0xf => 0xf0
n16: 0x9f => 0xf900
n32: 0x699f => 0xf9960000
n64: 0x666699ff => 0xff99666600000000

自分の環境で軽く試した感じでは、ビット演算のみを使用して手書きで最適化したものと同等以上の速度が出ていた。

cc-dict: ハッシュテーブル

ハッシュテーブルを実装してみた。
cc-dict-0.0.3
チェイン法を用いたハッシュテーブルで、リンクリスト(チェイン)のノードを割り当てる際に自前のアロケータを使っている点以外は、特に変わったところもないと思う。

ベンチマーク

一応、ベンチマークも取ったので載せておく。
比較対象はg++のunordered_mapとGoogleのsparse_hash_map及びdense_hash_map。
ベンチマークプログラムにはHash Table Benchmarksに記載のものを使用させてもらった*1
※ 実行環境は Ubuntu-11.11-64bit / Intel(R) Core(TM) i7 CPU L 620 @ 2.00GHz。

このベンチマーク結果を見る限りは、特別性能が悪い、ということはなさそう*2

*1:ただし以下の追加・修正を施した。
 1: 検索処理時間の計測の追加
 2: もともとは文字列のベンチマークではキーの型const char*が使用されていたのをstd::stringに変更(関連)
 3: 処理時間にキー配列を生成する時間も含まれていたので、キー配列は最初にまとめて生成しておき、その後から計時を開始するように修正

*2:ベンチマークデータが結構恣意的(連番 or 乱数)なので、必ずしも実データで同様の性能がでるかは分からないけど・・・

std::tr1::unordered_mapのキーの型をconst char*にした場合の挙動

g++(及びその他)のunordered_mapで文字列をキーにしたい場合の注意書き。
unordered_mapのようにconst char*を指定すると、キーが文字列ではなくポインタ(整数値)として解釈されてしまう*1
以下、その際の挙動の確認用のコード:

/**
 * ファイル名: str_map.cc
 * コンパイル: g++ -o str_map str_map.cc
 * gcc-4.6.1
 */
#include <string>
#include <cstring>
#include <tr1/unordered_map>
#include <iostream>

int main() {
  // ハッシュマップ作成
  std::tr1::unordered_map<const char*, int> cstr_map;  // const char*用
  std::tr1::unordered_map<std::string, int> str_map;   // std::string用
  
  // キーを用意
  char key1[] = "abc";
  char* key2 = new char[4];
  strcpy(key2, "abc");
  
  // キーの値とアドレスを出力
  std::cout << std::endl << "--" << std::endl;
  std::cout << "key1: '" << key1 << "' (addr: " << (long)(key1) << ")" << std::endl;
  std::cout << "key2: '" << key2 << "' (addr: " << (long)(key2) << ")" << std::endl;
  
  // key1を要素挿入
  cstr_map[key1] = 10;
  str_map[key1] = 10;

  // key1とkey2を検索
  std::cout << std::endl << "--" << std::endl;
  std::cout << "cstr_map: key1=" << cstr_map[key1] << ", key2=" << cstr_map[key2] << std::endl;
  std::cout << " str_map: key1=" << str_map[key1] << ", key2=" << str_map[key2] << std::endl;

  // key1の値を変えてみる
  key1[1] = '2';  // key1 = "a2c"

  // key1を検索
  std::cout << std::endl << "--" << std::endl;
  std::cout << "key1: '" << key1 << "' (addr: " << (long)(key1) << ")" << std::endl;
  std::cout << "cstr_map: key1=" << cstr_map[key1] << std::endl;
  std::cout << " str_map: key1=" << str_map[key1] << std::endl;    
  return 0;
}

実行結果:

--
key1: 'abc' (addr: 140736322707456)  # key1とkey2は同じ文字列だけど、アドレスは異なる
key2: 'abc' (addr: 30732528)

--
cstr_map: key1=10, key2=0   # key1のみヒット
 str_map: key1=10, key2=10  # key1とkey2の両方にヒット

--
key1: 'a2c' (addr: 140736035469872)
cstr_map: key1=10           # ヒット(ポインタ値が同じなため)
 str_map: key1=0            # ヒットしない(文字列が変わっているため)

分かってみれば、これはこれで自然な動作なので特に違和感もないけど、最近はC++がご無沙汰だったためか(文字列ではなくポインタ値として扱われていることに)すぐに気づかずに少しはまってしまった。

*1:もちろん const char* 用に自前で定義したハッシュ関数と比較関数をテンプレート引数に渡してあげれば特に問題はない(はず)

フィールドのポインタから、オブジェクト全体のポインタを取得するためのマクロ

構造体やクラスのインスタンスの特定のフィールドのアドレス(ポインタ)だけ分かっている場合に、そこからオブジェクト全体のポインタを取得したい場合に使うマクロ。
やっていることは単にフィールドのアドレスから、そのフィールドの(クラスor構造体の)先頭からのオフセットを引いているだけ。

// type.fieldが、typeの先頭の何バイト目に配置されているかを求める
#define field_offset(type, field) \
  ((char*)&(((type*)NULL)->field) - (char*)NULL)

// type.filedのアドレス(field_ptr)から、fieldのオフセット分マイナスし、typeのアドレスを求める
#define obj_ptr(type, field, field_ptr) \
  ((type*)((char*)field_ptr - field_offset(type, field)))

メモリアロケータを自作する際に、割当メモリ領域にタグ付けするのに利用できそうだと思い作成。

/* main.cc */
#include <iostream>

// タグ付きメモリチャンク
struct chunk {
  struct {            // タグ情報:
    const char* type; // - 型名
    int size;         // - サイズ
  } tag;
  void* buf;
};

// メモリ割り当て
inline void* mem_allocate(const char* type, int size) {
  chunk* c = (chunk*)new char[sizeof(chunk::tag)+size];

  // タグ情報を保存する
  c->tag.type = type;
  c->tag.size = size;

  std::cout << "[" << c->tag.type << "] allocate: " << c->tag.size << " bytes" << std::endl;

  // 割り当てアドレスを返す
  return &c->buf;
}

// メモリ解放
void mem_free(void* p) {
  // 解放するアドレス(chunk::buf)から、タグ情報(chunk)を取得する
  chunk* c = obj_ptr(chunk, buf, p);
  std::cout << "[" << c->tag.type << "] free: " << c->tag.size << " bytes" << std::endl;
  delete c;
}

struct abc {
  int a;
  char b;
  long c;
};

// main関数
int main() {
  void* ptrs[3];

  std::cout << "# allocate:" << std::endl;
  ptrs[0] = new (mem_allocate("int", sizeof(int))) int;
  ptrs[1] = new (mem_allocate("pointer", sizeof(void*))) void*;
  ptrs[2] = new (mem_allocate("struct", sizeof(abc))) abc;
  
  std::cout << std::endl;
  std::cout << "# free:" << std::endl;
  for(int i=2; i >=0; i--)
    mem_free(ptrs[i]);

  return 0;
}
$ g++ -o main main.cc
$ ./main
# allocate:
[int] allocate: 4 bytes
[pointer] allocate: 8 bytes
[struct] allocate: 16 bytes

# free:
[struct] free: 16 bytes
[pointer] free: 8 bytes
[int] free: 4 bytes