Simulating Symmetric Multi-Processing with fork(): source code

;; -*- Mode: common-lisp; Package: common-lisp-user -*-

(in-package :user)

;;;;;;;;;;;;;;; DEBUGGING AND TIMING CODE ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

(defmacro my-time (&body body)
  ;; We just want to know a single thing: the real time spent in BODY.
  (let ((start (gensym))
	(res (gensym))
	(time (gensym)))
    `(let* ((,start (get-internal-real-time))
	    (,res ,@body)
	    (,time (- (get-internal-real-time) ,start)))
       (format t "REAL TIME: ~s~%" (float (/ ,time 1000)))
       (force-output t)
       ,res)))

(eval-when (compile eval)
  ;; Changing the initial value of this variable to `t' will result in a
  ;; LOT of output.  Only needed when debugging problems.
  (defparameter *debug* nil))

(defmacro .debug (format-string &rest format-arguments)
  ;; We discriminate on *debug* at macroexpand time so there will be no
  ;; runtime penalty for the debugging code we've added.
  (when *debug*
    `(..debug ,format-string ,@format-arguments)))

(defvar *debug-time-base* nil)

(defun ..debug-reset-time-base ()
  (setq *debug-time-base* (excl::cl-internal-real-time)))

(..debug-reset-time-base)

(defun ..debug (format-string &rest format-arguments)
  ;; Use an undocumented (not for long, promise) function to get the
  ;; current time that includes milliseconds.  The first value is the
  ;; number of seconds between now and 2015 (or something).  Date chosen to
  ;; be a fixnum in a 32-bit lisp when the lisp you are using was
  ;; released.
  (multiple-value-bind (seconds mseconds)
      (excl::cl-internal-real-time)
    (format t "~&~2,'0d:~3,'0d " (- seconds *debug-time-base*) mseconds))
  (format t format-string format-arguments)
  (format t "~&")
  (force-output t))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; General framework for High-level task and processor management
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

;; The object type that holds information about available processors.
(defstruct task-manager
  ;; A list of free PROCESSOR objects.  Items on this list are free to have
  ;; tasks scheduled on them.
  processors)

(defmethod print-object ((object task-manager) stream) 
  (format stream "#<TM ~s>" (task-manager-processors object)))

(defstruct processor
  ;; The stream used to write to the processor.
  to-stream
  ;; The stream used to read from the processor.
  from-stream
  ;; The operating system process ID for the processor.  Used to terminate
  ;; the processor.
  pid)

(defmethod print-object ((object processor) stream) 
  (format stream "#<PROC TO: ~s, FROM: ~s, PID ~s>"
	  (excl::stream-output-fn (processor-to-stream object))
	  (excl::stream-input-fn (processor-from-stream object))
	  (processor-pid object)))

(defvar *task-manager* nil)

(defun sigterm-handler-for-child (&rest args)
  (declare (ignore args))
  (exit 1 :quiet t :no-unwind t))

(defun initialize-processors (child-worker &optional (n 4))
  (when *task-manager* (end-processors))
  (flet ((processor ()
	   (multiple-value-bind (from-child-read from-child-write)
	       (excl.osi:pipe)
	     (multiple-value-bind (to-child-read to-child-write)
		 (excl.osi:pipe)
	       (let ((pid (excl.osi:fork)))
		 (cond
		  ((= pid 0)
		   ;; Child
		   (excl::add-signal-handler 15 'sigterm-handler-for-child)

		   ;; These are `parent' only
		   (close from-child-read)
		   (close to-child-write)

		   (unwind-protect
		       (funcall child-worker to-child-read
				from-child-write)
		     (ignore-errors (close to-child-read))
		     (ignore-errors (close from-child-write))
		     (exit 0 :quiet t :no-unwind t)))
		  (t
		   ;; Parent
		
		   ;; These are `child' only
		   (close from-child-write)
		   (close to-child-read)
		
		   (make-processor :to-stream to-child-write
				   :from-stream from-child-read
				   :pid pid))))))))
    (setq *task-manager*
      (make-task-manager
       :processors (let ((c '())) (dotimes (i n c) (push (processor) c)))))))

(defvar *processor-lock* (mp:make-process-lock :name "free processor lock"))

(defun find-free-processor ()
  ;; In the controlling process, we need a way to allocate a task to a new
  ;; procesor.  This function finds the next available processor.  It is
  ;; important that this function be efficient.
  (declare (optimize (speed 3)))
  (unless *task-manager* (error "You must call initialize-processors first."))
  (.debug "FIND FREE PROCESSOR:")
  (let ((tm *task-manager*))
    (loop
      (if* (task-manager-processors tm)
	 then (return (mp:with-process-lock (*processor-lock*)
			(pop (task-manager-processors tm))))
	 else (mp:process-wait "waiting for processor"
			       (lambda (tm)
				 (task-manager-processors tm))
			       tm)))))

(defun release-processor (processor)
  ;; Called on a PROCESSOR that should now be considered ready for use for
  ;; another task.
  (unless *task-manager* (error "You must call initialize-processors first."))
  (.debug "RELEASE-PROCESSOR: ~s" processor)
  (mp:with-process-lock (*processor-lock*)
    ;; Put it back into the pool of free processors:
    (push processor (task-manager-processors *task-manager*))))

(defun end-processors ()
  ;; Called to shutdown the child processors.
  (unless *task-manager* (error "You must call initialize-processors first."))
  (dolist (p (task-manager-processors *task-manager*))
    (.debug "shutting down processor: ~s" p)
    (close (processor-to-stream p))
    (close (processor-from-stream p))
    (excl.osi:kill (processor-pid p) excl.osi:*sigterm*)
    (system:reap-os-subprocess :pid (processor-pid p)))
  (setq *task-manager* nil))

(defun wait-for-processor (function &rest arguments)
  ;; This is the function which does the work of communicating with child
  ;; subprocesses and implements the `PROCESSOR' abstraction.  FUNCTION
  ;; must be a symbol and ARGUMENTS must be symbols, strings or numbers.
  ;;
  ;; Efficiency is important in this function, too.  We use
  ;; WAIT-FOR-INPUT-AVAILABLE to determine when the child subprocess is
  ;; done with their task and we can read the return value.
  (assert (symbolp function))
  (let ((processor (find-free-processor)))
    (unwind-protect
	(let ((to-stream (processor-to-stream processor))
	      (from-stream (processor-from-stream processor)))
	  (write-char #\( to-stream)
	  (write-string (symbol-name function) to-stream)
	  (write-char #\space to-stream)
	  (dolist (argument arguments)
	    (.debug "WAIT-FOR-PROCESSOR: arg: ~s" argument)
	    (if* (stringp argument)
	       then (write-char #\" stream)
		    (write-string argument to-stream)
		    (write-char #\" stream)
	     elseif (symbolp argument)
	       then (write-string (symbol-name argument) to-stream)
	     elseif (integerp argument)
	       then (write-string (prin1-to-string argument) to-stream)
	       else (error "Cannot convert this argument: ~s." argument))
	    (write-char #\space to-stream))
	  (write-char #\) to-stream)
	  (force-output to-stream)

	  (mp:wait-for-input-available from-stream)
    
	  (let ((result (read from-stream nil from-stream)))
	    (if* (eq result from-stream)
	       then (error "Processor did not complete for ~s." function)
	     elseif (eq '#:error result)
	       then (warn "Processor did not complete for ~s." function)
	       else result)))
      (release-processor processor))))

(defun quiet-sigpipe-handler (sig continuable)
  ;; The default SIGPIPE handler in Allegro CL is noisy.  This one merely
  ;; says "I handled the SIGPIPE" and prints nothing.  In other words,
  ;; ignore SIGPIPE.
  (declare (ignore sig continuable))
  t)

(excl::add-signal-handler 13 'quiet-sigpipe-handler)

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Example using framework
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

(defvar *error-marker* 'sys::error-marker)

(defun child-subprocess (input-stream output-stream)
  ;; This function runs in the child subprocess and reads forms from
  ;; INPUT-STREAM, evaluates them and writes the result to OUTPUT-STREAM.
  ;;
  ;; Errors in the evaluation are passed back to the parent using a unique
  ;; symbol bound to *error-marker*.
  (let (form)
    (loop
      (setq form (read input-stream nil input-stream))
      (.debug "CHILD-SUBPROCESS: form is ~s" form)
      (when (eq input-stream form)
	(write-string "#:eof" output-stream)
	(force-output output-stream)
	(return))
      (let* ((result (handler-case (eval form)
		       (error (c)
			 (..debug "CHILD ERROR: ~a" c)
			 (cons *error-marker*
			       (format nil "~a" c))))))
	(.debug "CHILD-SUBPROCESS: result is ~s" result)
	(write result :stream output-stream)
	(write-char #\newline output-stream)
	(force-output output-stream)))))

(defun number-of-cpus ()
  (let ((default 4))
    #+linux
    (if* (probe-file "/proc/cpuinfo")
       then (let ((processors 0))
	      (excl.osi:with-command-output (line "cat /proc/cpuinfo")
		(when (match-re "^processor" line :return nil)
		  (incf processors)))
	      processors)
       else default)
    #-linux default))

(defun create-work (child-worker &key iterations
				      work-estimate-seconds
				      processors)
  (assert iterations)
  (assert processors)

  (when work-estimate-seconds
    (format t "WORK: ~s seconds, "
	    (* iterations processors work-estimate-seconds))
    (force-output t))

  (initialize-processors #'child-subprocess processors)

  (unwind-protect
      (let ((gates '()))
	(my-time
	 (progn
	   ;; For each processor, start a Lisp process that will do the
	   ;; given amount of work.  A gate per processor is used to
	   ;; determine when all the work is done, so we can shut down.
	   (dotimes (i processors)
	     (let ((gate (mp:make-gate nil)))
	       (mp:process-run-function (format nil "foo~d" i)
		 (lambda (gate i)
		   (dotimes (j iterations)
		     (let ((result (wait-for-processor child-worker j)))
		       (if* (and (consp result)
				 (eq *error-marker* (car result)))
			  then (..debug "PARENT: Got error: ~a" (cdr result))
			elseif  (/= (1+ j) result)
			  then (..debug
				"PARENT: foo~d: invalid return value: ~s"
				i result))))
		   (mp:open-gate gate))
		 gate
		 i)
	       (push gate gates)))
	   ;; This is a very efficient way to wait.
	   (dolist (gate gates)
	     (mp:process-wait "wait for processor to finish"
			      #'mp:gate-open-p gate)))))
  
    (end-processors)))

;;;; timing loop, for getting a certain amount of "work"
;;;;
(defun dummy (i) (expt i 100))
(defun timing-loop (&key seconds)
  (let ((i 0))
    (mp:with-timeout (seconds)
      (loop 
	(dummy i)
	(setq i (1+ i))))
    i))
(defun loopit (n) (dotimes (i n) (dummy i)))
;; (setq n (timing-loop :seconds 1))
;; (loopit n) => should take 1 second

;; `defvar' instead of `defparameter' so it does not change in a Lisp
;; session, if we reload this file.  That will make comparing times between
;; runs a rational thing to do.
(defvar *iterations*
    (let ((it 0))
      (dotimes (i 10 it)
	(setq it (max it (timing-loop :seconds .1))))))

(defun example-child-worker (i)
  (loopit *iterations*)
  (+ i 1))

(defun example-application (&key processors iterations)
  (create-work 'example-child-worker
	       :iterations iterations
	       :processors processors
	       :work-estimate-seconds .1))

(defvar *number-of-cpus* (number-of-cpus))

(defun run ()
  (format t "Detected ~d CPUs~%" *number-of-cpus*)
  (..debug-reset-time-base)
  (macrolet ((run-1 (&key iterations processors)
	       `(progn
		  (format t "Iterations ~d, processors ~d, "
			  ,iterations ,processors)
		  (force-output t)
		  (example-application :iterations ,iterations
				       :processors ,processors))))
    (run-1 :iterations 40 :processors 2)
    (run-1 :iterations 40 :processors 3)
    (run-1 :iterations 40 :processors 4)
    (run-1 :iterations 40 :processors 5)
    (run-1 :iterations 40 :processors 6)
    (run-1 :iterations 40 :processors 7)
    (run-1 :iterations 40 :processors 8)
    (run-1 :iterations 40 :processors 9)
    (run-1 :iterations 40 :processors 10)
    (run-1 :iterations 40 :processors 11)
    (run-1 :iterations 40 :processors 12)))

;;;;;;;;;;;;;;; Error handling example ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

(defun example-child-worker2 (i)
  (if* (eql i 50)
     then (error "foo: ~a" i)
     else (1+ i)))

(defun example-application2 (&key processors iterations)
  (create-work 'example-child-worker2
	       :iterations iterations
	       :processors processors))

(defun run2 ()
  (format t "Detected ~d CPUs~%" *number-of-cpus*)
  (..debug-reset-time-base)
  (example-application2 :iterations 100 :processors *number-of-cpus*))

#|
cl-user(6): (run2)
Detected 4 CPUs
00:037 CHILD ERROR: (foo: 50)
00:037 PARENT: Got error: (foo: 50)
00:038 CHILD ERROR: (foo: 50)
00:039 PARENT: Got error: (foo: 50)
00:039 CHILD ERROR: (foo: 50)
00:040 PARENT: Got error: (foo: 50)
00:041 CHILD ERROR: (foo: 50)
00:042 PARENT: Got error: (foo: 50)
REAL TIME: 0.033
nil
cl-user(7): 
|#

Copyright © 2023 Franz Inc., All Rights Reserved | Privacy Statement Twitter