Simulating Symmetric Multi-Processing with fork(): source code
;; -*- Mode: common-lisp; Package: common-lisp-user -*-
(in-package :user)
;;;;;;;;;;;;;;; DEBUGGING AND TIMING CODE ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defmacro my-time (&body body)
;; We just want to know a single thing: the real time spent in BODY.
(let ((start (gensym))
(res (gensym))
(time (gensym)))
`(let* ((,start (get-internal-real-time))
(,res ,@body)
(,time (- (get-internal-real-time) ,start)))
(format t "REAL TIME: ~s~%" (float (/ ,time 1000)))
(force-output t)
,res)))
(eval-when (compile eval)
;; Changing the initial value of this variable to `t' will result in a
;; LOT of output. Only needed when debugging problems.
(defparameter *debug* nil))
(defmacro .debug (format-string &rest format-arguments)
;; We discriminate on *debug* at macroexpand time so there will be no
;; runtime penalty for the debugging code we've added.
(when *debug*
`(..debug ,format-string ,@format-arguments)))
(defvar *debug-time-base* nil)
(defun ..debug-reset-time-base ()
(setq *debug-time-base* (excl::cl-internal-real-time)))
(..debug-reset-time-base)
(defun ..debug (format-string &rest format-arguments)
;; Use an undocumented (not for long, promise) function to get the
;; current time that includes milliseconds. The first value is the
;; number of seconds between now and 2015 (or something). Date chosen to
;; be a fixnum in a 32-bit lisp when the lisp you are using was
;; released.
(multiple-value-bind (seconds mseconds)
(excl::cl-internal-real-time)
(format t "~&~2,'0d:~3,'0d " (- seconds *debug-time-base*) mseconds))
(format t format-string format-arguments)
(format t "~&")
(force-output t))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; General framework for High-level task and processor management
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; The object type that holds information about available processors.
(defstruct task-manager
;; A list of free PROCESSOR objects. Items on this list are free to have
;; tasks scheduled on them.
processors)
(defmethod print-object ((object task-manager) stream)
(format stream "#<TM ~s>" (task-manager-processors object)))
(defstruct processor
;; The stream used to write to the processor.
to-stream
;; The stream used to read from the processor.
from-stream
;; The operating system process ID for the processor. Used to terminate
;; the processor.
pid)
(defmethod print-object ((object processor) stream)
(format stream "#<PROC TO: ~s, FROM: ~s, PID ~s>"
(excl::stream-output-fn (processor-to-stream object))
(excl::stream-input-fn (processor-from-stream object))
(processor-pid object)))
(defvar *task-manager* nil)
(defun sigterm-handler-for-child (&rest args)
(declare (ignore args))
(exit 1 :quiet t :no-unwind t))
(defun initialize-processors (child-worker &optional (n 4))
(when *task-manager* (end-processors))
(flet ((processor ()
(multiple-value-bind (from-child-read from-child-write)
(excl.osi:pipe)
(multiple-value-bind (to-child-read to-child-write)
(excl.osi:pipe)
(let ((pid (excl.osi:fork)))
(cond
((= pid 0)
;; Child
(excl::add-signal-handler 15 'sigterm-handler-for-child)
;; These are `parent' only
(close from-child-read)
(close to-child-write)
(unwind-protect
(funcall child-worker to-child-read
from-child-write)
(ignore-errors (close to-child-read))
(ignore-errors (close from-child-write))
(exit 0 :quiet t :no-unwind t)))
(t
;; Parent
;; These are `child' only
(close from-child-write)
(close to-child-read)
(make-processor :to-stream to-child-write
:from-stream from-child-read
:pid pid))))))))
(setq *task-manager*
(make-task-manager
:processors (let ((c '())) (dotimes (i n c) (push (processor) c)))))))
(defvar *processor-lock* (mp:make-process-lock :name "free processor lock"))
(defun find-free-processor ()
;; In the controlling process, we need a way to allocate a task to a new
;; procesor. This function finds the next available processor. It is
;; important that this function be efficient.
(declare (optimize (speed 3)))
(unless *task-manager* (error "You must call initialize-processors first."))
(.debug "FIND FREE PROCESSOR:")
(let ((tm *task-manager*))
(loop
(if* (task-manager-processors tm)
then (return (mp:with-process-lock (*processor-lock*)
(pop (task-manager-processors tm))))
else (mp:process-wait "waiting for processor"
(lambda (tm)
(task-manager-processors tm))
tm)))))
(defun release-processor (processor)
;; Called on a PROCESSOR that should now be considered ready for use for
;; another task.
(unless *task-manager* (error "You must call initialize-processors first."))
(.debug "RELEASE-PROCESSOR: ~s" processor)
(mp:with-process-lock (*processor-lock*)
;; Put it back into the pool of free processors:
(push processor (task-manager-processors *task-manager*))))
(defun end-processors ()
;; Called to shutdown the child processors.
(unless *task-manager* (error "You must call initialize-processors first."))
(dolist (p (task-manager-processors *task-manager*))
(.debug "shutting down processor: ~s" p)
(close (processor-to-stream p))
(close (processor-from-stream p))
(excl.osi:kill (processor-pid p) excl.osi:*sigterm*)
(system:reap-os-subprocess :pid (processor-pid p)))
(setq *task-manager* nil))
(defun wait-for-processor (function &rest arguments)
;; This is the function which does the work of communicating with child
;; subprocesses and implements the `PROCESSOR' abstraction. FUNCTION
;; must be a symbol and ARGUMENTS must be symbols, strings or numbers.
;;
;; Efficiency is important in this function, too. We use
;; WAIT-FOR-INPUT-AVAILABLE to determine when the child subprocess is
;; done with their task and we can read the return value.
(assert (symbolp function))
(let ((processor (find-free-processor)))
(unwind-protect
(let ((to-stream (processor-to-stream processor))
(from-stream (processor-from-stream processor)))
(write-char #\( to-stream)
(write-string (symbol-name function) to-stream)
(write-char #\space to-stream)
(dolist (argument arguments)
(.debug "WAIT-FOR-PROCESSOR: arg: ~s" argument)
(if* (stringp argument)
then (write-char #\" stream)
(write-string argument to-stream)
(write-char #\" stream)
elseif (symbolp argument)
then (write-string (symbol-name argument) to-stream)
elseif (integerp argument)
then (write-string (prin1-to-string argument) to-stream)
else (error "Cannot convert this argument: ~s." argument))
(write-char #\space to-stream))
(write-char #\) to-stream)
(force-output to-stream)
(mp:wait-for-input-available from-stream)
(let ((result (read from-stream nil from-stream)))
(if* (eq result from-stream)
then (error "Processor did not complete for ~s." function)
elseif (eq '#:error result)
then (warn "Processor did not complete for ~s." function)
else result)))
(release-processor processor))))
(defun quiet-sigpipe-handler (sig continuable)
;; The default SIGPIPE handler in Allegro CL is noisy. This one merely
;; says "I handled the SIGPIPE" and prints nothing. In other words,
;; ignore SIGPIPE.
(declare (ignore sig continuable))
t)
(excl::add-signal-handler 13 'quiet-sigpipe-handler)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Example using framework
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defvar *error-marker* 'sys::error-marker)
(defun child-subprocess (input-stream output-stream)
;; This function runs in the child subprocess and reads forms from
;; INPUT-STREAM, evaluates them and writes the result to OUTPUT-STREAM.
;;
;; Errors in the evaluation are passed back to the parent using a unique
;; symbol bound to *error-marker*.
(let (form)
(loop
(setq form (read input-stream nil input-stream))
(.debug "CHILD-SUBPROCESS: form is ~s" form)
(when (eq input-stream form)
(write-string "#:eof" output-stream)
(force-output output-stream)
(return))
(let* ((result (handler-case (eval form)
(error (c)
(..debug "CHILD ERROR: ~a" c)
(cons *error-marker*
(format nil "~a" c))))))
(.debug "CHILD-SUBPROCESS: result is ~s" result)
(write result :stream output-stream)
(write-char #\newline output-stream)
(force-output output-stream)))))
(defun number-of-cpus ()
(let ((default 4))
#+linux
(if* (probe-file "/proc/cpuinfo")
then (let ((processors 0))
(excl.osi:with-command-output (line "cat /proc/cpuinfo")
(when (match-re "^processor" line :return nil)
(incf processors)))
processors)
else default)
#-linux default))
(defun create-work (child-worker &key iterations
work-estimate-seconds
processors)
(assert iterations)
(assert processors)
(when work-estimate-seconds
(format t "WORK: ~s seconds, "
(* iterations processors work-estimate-seconds))
(force-output t))
(initialize-processors #'child-subprocess processors)
(unwind-protect
(let ((gates '()))
(my-time
(progn
;; For each processor, start a Lisp process that will do the
;; given amount of work. A gate per processor is used to
;; determine when all the work is done, so we can shut down.
(dotimes (i processors)
(let ((gate (mp:make-gate nil)))
(mp:process-run-function (format nil "foo~d" i)
(lambda (gate i)
(dotimes (j iterations)
(let ((result (wait-for-processor child-worker j)))
(if* (and (consp result)
(eq *error-marker* (car result)))
then (..debug "PARENT: Got error: ~a" (cdr result))
elseif (/= (1+ j) result)
then (..debug
"PARENT: foo~d: invalid return value: ~s"
i result))))
(mp:open-gate gate))
gate
i)
(push gate gates)))
;; This is a very efficient way to wait.
(dolist (gate gates)
(mp:process-wait "wait for processor to finish"
#'mp:gate-open-p gate)))))
(end-processors)))
;;;; timing loop, for getting a certain amount of "work"
;;;;
(defun dummy (i) (expt i 100))
(defun timing-loop (&key seconds)
(let ((i 0))
(mp:with-timeout (seconds)
(loop
(dummy i)
(setq i (1+ i))))
i))
(defun loopit (n) (dotimes (i n) (dummy i)))
;; (setq n (timing-loop :seconds 1))
;; (loopit n) => should take 1 second
;; `defvar' instead of `defparameter' so it does not change in a Lisp
;; session, if we reload this file. That will make comparing times between
;; runs a rational thing to do.
(defvar *iterations*
(let ((it 0))
(dotimes (i 10 it)
(setq it (max it (timing-loop :seconds .1))))))
(defun example-child-worker (i)
(loopit *iterations*)
(+ i 1))
(defun example-application (&key processors iterations)
(create-work 'example-child-worker
:iterations iterations
:processors processors
:work-estimate-seconds .1))
(defvar *number-of-cpus* (number-of-cpus))
(defun run ()
(format t "Detected ~d CPUs~%" *number-of-cpus*)
(..debug-reset-time-base)
(macrolet ((run-1 (&key iterations processors)
`(progn
(format t "Iterations ~d, processors ~d, "
,iterations ,processors)
(force-output t)
(example-application :iterations ,iterations
:processors ,processors))))
(run-1 :iterations 40 :processors 2)
(run-1 :iterations 40 :processors 3)
(run-1 :iterations 40 :processors 4)
(run-1 :iterations 40 :processors 5)
(run-1 :iterations 40 :processors 6)
(run-1 :iterations 40 :processors 7)
(run-1 :iterations 40 :processors 8)
(run-1 :iterations 40 :processors 9)
(run-1 :iterations 40 :processors 10)
(run-1 :iterations 40 :processors 11)
(run-1 :iterations 40 :processors 12)))
;;;;;;;;;;;;;;; Error handling example ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defun example-child-worker2 (i)
(if* (eql i 50)
then (error "foo: ~a" i)
else (1+ i)))
(defun example-application2 (&key processors iterations)
(create-work 'example-child-worker2
:iterations iterations
:processors processors))
(defun run2 ()
(format t "Detected ~d CPUs~%" *number-of-cpus*)
(..debug-reset-time-base)
(example-application2 :iterations 100 :processors *number-of-cpus*))
#|
cl-user(6): (run2)
Detected 4 CPUs
00:037 CHILD ERROR: (foo: 50)
00:037 PARENT: Got error: (foo: 50)
00:038 CHILD ERROR: (foo: 50)
00:039 PARENT: Got error: (foo: 50)
00:039 CHILD ERROR: (foo: 50)
00:040 PARENT: Got error: (foo: 50)
00:041 CHILD ERROR: (foo: 50)
00:042 PARENT: Got error: (foo: 50)
REAL TIME: 0.033
nil
cl-user(7):
|#