Monitoring process-wait-with-timeout

When mp:process-wait-with-timeout is used with too long a timeout, a sytem can be very unresponsive to error situations, but if the timeout is too short, then it may react incorrectly to expected behavior.

One solution can be to record the actual wait times over a range of application behavior and choose the final timeout value based on these statistics. This approach has been very useful when developing various Allegro CL features. Extending mp:process-wait-with-timeout so it has a monitoring feature has been considered, but this does not seem practical. If all mp:process-wait-with-timeout calls are monitored, there will be a lot of uninteresting data in the table. It seems more practical for an application to use a variant of mp:process-wait-with-timeout that does the monitoring only in places where it is needed. This variant can be customized to the exact needs of the application.

Here is a variant of mp:process-wait-with-timeout which includes monitoring code. It might be useful to application developers and it is an interesting example of SMP programming. The example attempts to minimize interference among threads by using atomic operations where it is safe to do so and a lock where that is the only way to synchronize access.

The function defined, process-wait-and-report takes the same arguments as mp:process-wait-with-timeout and only behaves differently if the variable *record-timeout-data* is non-nil. The function test-report is a sample application -- users whould write their own version of test-report which uses code from their application. The results look like the following. The units are milliseconds. The numbver of timeouts is shown and the number of calls with actual wait times. In the first run, each process timed out at least once. In the second (with a longer delay), no process timed out.

CL-USER(36): (test-report)


 Process-wait times: 
Wait table busy entries: 0 
   Whostate        Timeouts MaxWait     Wait times 
 tr0                      1  1757  (1757)
 tr1                      1  1757  (420 1757)
 tr3                      1  1757  (0 100 420 1757)
 tr4                      1  1751  (0 320 0 0 1751)
 tr2                      1  1757  (100 420 1757)
 Process-wait times done. 


#<EQUALP hash-table with 6 entries @ #x2129916a>
CL-USER(37): (test-report :delay 5)


 Process-wait times: 
Wait table busy entries: 0 
   Whostate        Timeouts MaxWait     Wait times 
 tr0                      0  2650  (2650 110)
 tr1                      0  2700  (2700 2650 110)
 tr3                      0  2700  (0 820 2700 2650 110)
 tr4                      0  2600  (0 2600 0 2550 0 110)
 tr2                      0  2700  (820 2700 2650 110)
 Process-wait times done. 


#<EQUALP hash-table with 6 entries @ #x212af0f2>
CL-USER(38): 

Here is the definition of process-wait-and-report, test-report, and some additional functions.

(in-package :user)


(defvar *record-timeout-data* nil)
(defvar *record-timeout-max* nil)
(defvar *record-timeout-lock* (make-basic-lock :name "record-timeout-data"))

(defun process-wait-and-report (whostate timeout fn &rest args &aux nt na spec)
  (cond ((null *record-timeout-data*)
	 (apply #'mp:process-wait-with-timeout whostate timeout fn args))
	(t (when (not (hash-table-p *record-timeout-data*))
	     (setq na (list 0 0))
	     (with-locked-structure
	      (*record-timeout-lock* :non-smp :without-scheduling)
	      ;; Once the lock is acquired, we must test the value again.
	      (when (not (hash-table-p (setq spec *record-timeout-data*)))
		(setq nt (make-hash-table :test #'equalp))
		(setf (gethash "active_users" nt) na)
		(setq *record-timeout-max*
		      (when (and (numberp spec) (< 0 spec)) spec))
		(setq *record-timeout-data* nt)
		)))
	   (let* ((tt *record-timeout-data*)
		  (au (gethash "active_users" tt))
		  (lock *record-timeout-lock*)
		  entry dt max inc ne)
	     (unwind-protect
		 (let (res)
		   (with-delayed-interrupts
		    ;; We must avoid a throw between the incrementing
		    ;; and the setting of the flag.
		    ;; The incrementing must be atomic since many
		    ;; threads could be calling this function.
		    (incf-atomic (the fixnum (car (the cons au))))
		    (setq inc t))
		   ;; entry -> (max-wait timeout-count wait...)
		   (when (null (setq entry (gethash whostate tt)))
		     ;; Do any needed consing outside the lock.
		     (setq ne (list 0 0))
		     (with-locked-structure
		      (lock :non-smp :without-scheduling)
		      ;; A new entry must be added under the lock because 
		      ;; we want only one (and the same) entry for each key.
		      (when (null (setq entry (gethash whostate tt)))
			(setf (gethash whostate tt) (setq entry ne)))))
		   (setq dt (get-internal-real-time))
		   (setq res (apply #'mp:process-wait-with-timeout
				    whostate (or *record-timeout-max* timeout) fn args))
		   (setq dt (- (get-internal-real-time) dt))
		   (if res
		       (progn
			 (push-atomic dt (cddr (the cons entry)))
			 (loop
			  (setq max (first entry))
			  (cond
			   ((not (< max dt)) (return))
			   ((atomic-conditional-setf (car (the cons entry)) dt max) (return)))))
		     (incf-atomic (the fixnum (car (the cons (cdr entry))))))
		   
		   res)
	       (with-delayed-interrupts
		(when inc (decf-atomic (the fixnum (car (the cons au))))
		      )))))))

(defun process-wait-report (&key (wait 5) start)
  (let* ((tt *record-timeout-data*)
	 (au (when (hash-table-p *record-timeout-data*) (gethash "active_users" tt)))
	 )
    (setq *record-timeout-data* start)
    (format t "~&~2% Process-wait times: ~%")
    (when au 
      (format t "~&Wait table busy entries: ~A ~%" (car au))
      ;; Even though the table has been taken out of circulation,
      ;; there can be threads still pointing to entries in the table.
      ;; When the  "active_users" count goes to zero, we know that
      ;; the table will not change any more.
      (loop (when (atomic-conditional-setf
		   ;; This test for zero will force a hardware memory
		   ;; barrier so we see the latest value immediately.
		   (car au) 0 0)
	      (return))
	    (sleep 1)
	    (when (< (decf wait) 0) 
	      (cerror "wait 5 more seconds" "busy entries in timeout table")
	      (setq wait 5)))
      (format t "~&   Whostate        Timeouts MaxWait     Wait times ~%")
      (maphash
       (lambda (k v &aux (w (if (stringp k) (length k) 0))
		  (max-wait (first v))
		  (timeout-count (second v))
		  (wait-times (cddr v))
		  (n (length wait-times)))
	 (or (equal k "active_users")
	     (format t "~& ~20A ~5,' D ~5,' D  ~A~%" 
		     (cond ((not (stringp k)) (list k))
			   ((< 20 w) (subseq k 0 20))
			   (t k))
		     timeout-count max-wait
		     (if (< n 10)
			 wait-times
		       (format nil "~A entries min=~A average=~A"
			       n (apply #'min wait-times)
			       (ceiling (apply #'+ wait-times) n) 
			       )))))
       tt))
    (format t "~& Process-wait times done. ~3%")
    tt))

  

(eval-when (compile load eval) (require :smputil))

(defun test-report (&key (ply 5) (run 10) (delay 2))
  (let (end name
	    (gate (mp:make-gate nil))
	    (join (mp:make-barrier (1+ ply)))
	    )
    (setq *record-timeout-data* t)
    (mp:process-run-function
     "trigger"
     (lambda ()
       (loop
	(mp:close-gate gate)
	(sleep (+ 0.1 (* 0.01 (random 200))))
	(mp:open-gate gate)
	(when end (return))
	(sleep 0.1))))
    (dotimes (i ply)
      (mp:process-run-function 
       (setq name (format nil "tr~A" i))
       (lambda (name n)
	 (unwind-protect
	     (dotimes (j (min n run))
	       (process-wait-and-report name delay #'mp:gate-open-p gate)
	       (sleep 0.1))
	   (mp:barrier-pass-through join)
	   ;; Leave one dangling wait
	   (process-wait-and-report name delay #'mp:gate-open-p gate)
	   ))
       name (1+ i)))
    (mp:barrier-wait join)
    (setq end t)
    (process-wait-report)))
Copyright © 2023 Franz Inc., All Rights Reserved | Privacy Statement Twitter