読者です 読者をやめる 読者になる 読者になる

Lock-Free Queue

common lisp sbcl algorithm

compare-and-swap操作を用いたロックフリーなキューの実装。
SBCLでのみ動作*1

(defpackage lock-free-queue
  (:use :common-lisp)
  (:export queue
           make
           enq 
           deq
           empty-p 
           element-count       
           to-list))
(in-package :lock-free-queue)

;; compare-and-swap: 成功した場合はTを、失敗した場合はNILを返す
(defmacro compare-and-swap (place old new)
  `(eq (sb-ext:compare-and-swap ,place ,old ,new) ,old))

;; キュー構造体
(defstruct queue
  (head nil :type list) 
  (tail nil :type list))

;; リストへ変換/空判定/要素数取得
(defun to-list (que) (copy-seq (cdr (queue-head que))))
(defun empty-p (que) (endp (cdr (queue-head que))))
(defun element-count (que) (length (cdr (queue-head que))))

(defmethod print-object ((o queue) stream)
  (print-unreadable-object (o stream :type t)
    (format stream "~s ~s" :count (element-count o))))

;; キューを生成
(defun make (&optional initial-contents)
  (let ((contents (cons :initial-head initial-contents)))
    (make-queue :head contents
                :tail (last contents))))

;; キューの末尾に要素を追加する
;; => queue
(defun enq (x que)
  (loop WITH new-elem = (list x)
        FOR tail = (queue-tail que)
    DO
    (cond ((cdr tail)
           (compare-and-swap (queue-tail que) tail (cdr tail)))  ; tailの位置を調整
          ((compare-and-swap (cdr tail) nil new-elem)
           (return que)))))                                      ; 追加成功

;; キューの先頭から要素を取り出す
;; => (or (values 先頭要素 T)   ; キューに要素がある場合
;;        (values NIL NIL))     ; キューが空の場合
(defun deq (que)
  (let* ((head (queue-head que))
         (next (cdr head)))
    (cond ((null next)
           (values nil nil))       ; 空
          ((compare-and-swap (queue-head que) head next)
           (values (car next) t))  ; 取得成功
          (t
           (deq que)))))           ; 他スレッドと競合(リトライ)

実行例:

;; シングルスレッドでの例
(defparameter *que* (lock-free-queue:make))
=> *QUE*

(lock-free-queue:enq 1 *que*)
=> #<LOCK-FREE-QUEUE:QUEUE :COUNT 1>

(lock-free-queue:enq 2 *que*)
=> #<LOCK-FREE-QUEUE:QUEUE :COUNT 2>

(lock-free-queue:to-list *que*)
=> (1 2)

(lock-free-queue:deq *que*)
=> 1
   T

(lock-free-queue:deq *que*)
=> 2
   T

(lock-free-queue:deq *que*)
=> NIL
   NIL

;; マルチスレッドでの例
(let ((data (loop FOR i FROM 0 BELOW 10000 COLLECT i))
      (que (lock-free-queue:make))
      (thread-num 500))
  
  ;; enqueuers
  (loop REPEAT thread-num
        DO (sb-thread:make-thread 
            (lambda ()
              (dolist (e data)
                (lock-free-queue:enq e que)))))

  ;; dequeuer
  (list
   (length 
    (loop REPEAT (* thread-num (length data))
          COLLECT 
          (loop
           (multiple-value-bind (val ok?) (lock-free-queue:deq que)
             (when ok?
               (return val))))))
   que))
=> 5000000
   #<LOCK-FREE-QUEUE:QUEUE :COUNT 0>

*1:sb-ext:compare-and-swapを置き換えれば他の処理系でも動作可能