Lock-Free Queue
compare-and-swap操作を用いたロックフリーなキューの実装。
SBCLでのみ動作*1。
(defpackage lock-free-queue (:use :common-lisp) (:export queue make enq deq empty-p element-count to-list)) (in-package :lock-free-queue) ;; compare-and-swap: 成功した場合はTを、失敗した場合はNILを返す (defmacro compare-and-swap (place old new) `(eq (sb-ext:compare-and-swap ,place ,old ,new) ,old)) ;; キュー構造体 (defstruct queue (head nil :type list) (tail nil :type list)) ;; リストへ変換/空判定/要素数取得 (defun to-list (que) (copy-seq (cdr (queue-head que)))) (defun empty-p (que) (endp (cdr (queue-head que)))) (defun element-count (que) (length (cdr (queue-head que)))) (defmethod print-object ((o queue) stream) (print-unreadable-object (o stream :type t) (format stream "~s ~s" :count (element-count o)))) ;; キューを生成 (defun make (&optional initial-contents) (let ((contents (cons :initial-head initial-contents))) (make-queue :head contents :tail (last contents)))) ;; キューの末尾に要素を追加する ;; => queue (defun enq (x que) (loop WITH new-elem = (list x) FOR tail = (queue-tail que) DO (cond ((cdr tail) (compare-and-swap (queue-tail que) tail (cdr tail))) ; tailの位置を調整 ((compare-and-swap (cdr tail) nil new-elem) (return que))))) ; 追加成功 ;; キューの先頭から要素を取り出す ;; => (or (values 先頭要素 T) ; キューに要素がある場合 ;; (values NIL NIL)) ; キューが空の場合 (defun deq (que) (let* ((head (queue-head que)) (next (cdr head))) (cond ((null next) (values nil nil)) ; 空 ((compare-and-swap (queue-head que) head next) (values (car next) t)) ; 取得成功 (t (deq que))))) ; 他スレッドと競合(リトライ)
実行例:
;; シングルスレッドでの例 (defparameter *que* (lock-free-queue:make)) => *QUE* (lock-free-queue:enq 1 *que*) => #<LOCK-FREE-QUEUE:QUEUE :COUNT 1> (lock-free-queue:enq 2 *que*) => #<LOCK-FREE-QUEUE:QUEUE :COUNT 2> (lock-free-queue:to-list *que*) => (1 2) (lock-free-queue:deq *que*) => 1 T (lock-free-queue:deq *que*) => 2 T (lock-free-queue:deq *que*) => NIL NIL ;; マルチスレッドでの例 (let ((data (loop FOR i FROM 0 BELOW 10000 COLLECT i)) (que (lock-free-queue:make)) (thread-num 500)) ;; enqueuers (loop REPEAT thread-num DO (sb-thread:make-thread (lambda () (dolist (e data) (lock-free-queue:enq e que))))) ;; dequeuer (list (length (loop REPEAT (* thread-num (length data)) COLLECT (loop (multiple-value-bind (val ok?) (lock-free-queue:deq que) (when ok? (return val)))))) que)) => 5000000 #<LOCK-FREE-QUEUE:QUEUE :COUNT 0>
*1:sb-ext:compare-and-swapを置き換えれば他の処理系でも動作可能