マルチプロセスで使用可能なロックフリーキュー

タイトルの通り、マルチプロセスで使用可能なロックフリーのFIFOキューを実装したので、その簡単な紹介。

作成物

github: ipc-msgque (0.0.4)

  • ロックフリーなFIFOキュー
    • 再入可能 かつ SIGKILLに対して安全*1
  • C++
  • 共有メモリ(mmap)を使用
  • マルチプロセス(and マルチスレッド)間の通信に使用可能
  • gcc(ver4.1以上)*2 かつ POSIX準拠環境*3でのみ使用可能

単機能な割に、内部では「まず(割合)汎用的な可変長ブロックメモリアロケータを作って、その上に固定長ブロックアロケータ、さらにその上にFIFOキューを実装」と地味に凝ったことをしている。

使用例

fork()と併用した例。

/**
 * 親子プロセスで共有するFIFOキューを作成し、子から親へメッセージを送信するサンプルプログラム
 * 
 * ファイル名: msgque-sample.cc
 * コンパイル: g++ -o msgque-sample msgque-sample.cc
 */
#include <imque/queue.hh>  // インクルードパスを通しておく

#include <unistd.h>    // fork, getpid
#include <sys/types.h>
#include <stdio.h>     // sprintf
#include <string.h>    // strlen
#include <iostream>
#include <string>

#define CHILD_COUNT 10        // 子プロセスの数
#define QUEUE_ENTRY_COUNT 32  // キューの最大要素数
#define SHM_SIZE 4096         // キューが使用可能な共有メモリのバイト数

int main(int argc, char** argv) {
  // 要素数と共有メモリサイズを指定してキューを作成
  imque::Queue que(QUEUE_ENTRY_COUNT, SHM_SIZE);  
  if(! que) {
    return 1;
  } 

  for(int i=0; i < CHILD_COUNT; i++) {
    if(fork() == 0) {
      // 子プロセスの処理
      char buf[1024]; 
      sprintf(buf, "Hello: %d", getpid());

      // enqueue
      que.enq(buf, strlen(buf));
      return 0;
    }
  }

  // 親プロセスの処理
  for(int i=0; i < CHILD_COUNT; i++) {
    std::string buf;

    // dequeue
    while(que.deq(buf) == false);  // キューが空の間はビジーループ
    std::cout << "[receive] " << buf << std::endl;
  }

  return 0;
}

実行結果:

$ ./msgque-sample 
[receive] Hello: 12736
[receive] Hello: 12737
[receive] Hello: 12738
[receive] Hello: 12740
[receive] Hello: 12739
[receive] Hello: 12742
[receive] Hello: 12744
[receive] Hello: 12743
[receive] Hello: 12745
[receive] Hello: 12741

気が向けば、内部で使用しているメモリアロケータのコードなども載せていくかもしれない。

*1:ただし、メモリを確保してから解放するまでの間にSIGKILL等でプロセスがダウンした場合は、その分のメモリはリークする

*2:__sync_bool_compare_and_swap 等の各種アトミック関数を使用しているため。

*3:共有メモリの仕組みとしてmmapを使用しているため。