Sized queues in Multiprocessing

Allegro Common Lisp provides a mp:queue object for general multiprocessing use. This robust CLOS object has various defined generic-functions, such as mp:enqueue and mp:dequeue. Queues are discussed in the section Queues in Multiprocessing.htm.

The basic queue has no restriction on size, so in any production environment queue size will need to be regulated or its growth could exhaust the amount of a available memory and crash your program.

To ensure that a queue cannot grow beyond an expected size we can customize the basic CLOS object to track its current size as well as denote the maximum number of items we'll allow. Here is the definition of our finite sized queue:

(defclass finite-queue (mp:queue)
  ((count :documentation "The current size of the queue."
          :initform 0
	  :accessor queue-count)
   (max-size :documentation "The maximum size of the queue."
             :initform *default-finite-queue-max-size*
	     :accessor queue-max-size))
  (:documentation "A mp:queue with finite size."))

(defvar *default-finite-queue-max-size* 100
  "The default maximum size for a finite-queue.")

In the event of the queue being full during an enqueue attempt, we'll create a queue-full condition:

(define-condition queue-full (simple-error)
  ((queue :initarg :queue
          :reader queue-full-queue))
  (:report (lambda (condition stream)
  	     (let ((queue (queue-full-queue condition)))
               (format stream 
               	       "~A is at it's maximum size of ~D."
  		       queue
                       (queue-max-size queue))))))

We then wrap the default methods for mp:queue to ensure that our finite-queue tracks and holds to its boundaries:

(defmethod mp:enqueue :around ((queue finite-queue) what)
   (declare (ignorable what))
   (when (>= (queue-count queue) (queue-max-size queue))
     (error 'queue-full :queue queue))
   (incf-atomic (slot-value queue 'count) 1)
   (call-next-method))

(defmethod mp:dequeue :around ((queue finite-queue) &key wait empty-queue-results)
  (declare (ignorable wait empty-queue-results))
  (let ((item (call-next-method)))
    (decf-atomic (slot-value queue 'count) 1)
    item)

Notice the use of incf-atomic/decf-atomic in the above mp:enqueue method. These ensure that simultaneous increments and decrements from multiple threads will all be accounted for. But this doesn't eliminate all possible concurrency effects. Since there is no locking around the check of the max-size or the current count of the queue there is a small opportunity for these values to change between when they are checked and when the next method is called.

Depending on how sensitive our application is to the number of items on the queue this technique might be satisfactory and even have performance benefits over the strict locking technique. The other positives of the non-locking technique is that once the enqueuing finishes we still have an accurate queue-count and the queue size will never grow larger than a known amount calculable by this formula:

  maximum queue size + number of enqueuing threads - 1 = adjusted maximum

For many cases this type of queue limiting is sufficient. In the case that an absolute maximum must be maintained or if the maximum number of enqueuing threads is unknown, we can provide stricter locking that comes at the performance cost of acquiring and holding a lock. Here is an example of such a technique:

  (defmethod mp:enqueue :around ((queue finite-queue) what)
    (declare (ignorable what))
    (let ((queue-full-p nil))
      (mp:with-process-lock ((mp:queue-lock queue))
        (if (>= (queue-count queue) (queue-max-size queue))
            (setf queue-full-p t)
            (incf (queue-count queue))))
      (if queue-full-p
          (error 'queue-full :queue queue)
          (call-next-method))))

The lock is acquired to read, compare, and update the size of the queue. This prevents inaccuracies in the reported size of the queue and ensures that the queue stays within its designated size.

In all cases, programming in a multiprocessing environment, and particularly in an SMP environment, is an exercise in matching a design choice that meets program requirements, thread safety, and satisfactory performance. We hope this helps introduce you to the mp:queue object, how to customize it, and to introduce you to loose and strict locking techniques.

Copyright © 2023 Franz Inc., All Rights Reserved | Privacy Statement Twitter