Jobsched: Running jobs with multiple Racket instances
(require jobsched) | package: levintreesearch_cm |
Jobsched is a job scheduler with a server-worker architecture on a single machine, where the server dispatches a number of jobs to the workers.
All workers are identical and are able to process any job assigned by the server. The workers are independent racket programs, each with its own Racket instance. See Comparison with other Racket parallelism mechanisms for more information.
The server (a Racket program) is started.
N jobs are added to the job queue in the server. A job is specified as read-able data.
The server starts K workers (independent Racket programs).
Each worker signals on their output ports to the server that they are ready to receive jobs, and listens to events on its input port.
As soon as a worker is ready, the server sends it one of the remaining jobs.
When a worker has finished a job, it sends the result to the server, and waits for another job.
When the server receives the result of a job, it processes this result and sends a new job (if any is remaining) to the worker.
When all N jobs are finished, the server finishes.
If your machine has K cores and sufficient memory, jobsched can run K workers in parallel and benefit from a speedup of a factor K. Jobsched has been successfully used with more than 120 workers in parallel, with more than 100 000 fast-paced jobs to dispatch between them.
1 Example
"adder-server.rkt"
#lang racket (require jobsched) ;; This is the command to start a worker. ;; Creates a new Racket instance. (define (make-worker-command _worker-index) (make-racket-cmd "adder-worker.rkt")) ;; Create a schedule, and tell it how to start a worker (define sched (make-scheduler make-worker-command)) ;; Schedule some jobs (for* ([num1 (in-range 10)] [num2 (in-range 10)]) (scheduler-add-job! sched #:data (list num1 num2))) (define (process-result sched jb result) (match (job-data jb) [(list num1 num2) (printf "~a + ~a = ~a\n" num1 num2 result)])) ;; Start the server (scheduler-start sched (processor-count) #:after-stop process-result)
"adder-worker.rkt"
#lang racket (require jobsched) (define (run-job jb) (match (job-data jb) [(list num1 num2) (+ num1 num2)] [else 0])) (module+ main (start-worker run-job))
All definitions exported by the various modules below are also exported by jobsched.
See also the one-file example in examples/server-worker.
2 Job
(require jobsched/job) | package: levintreesearch_cm |
struct
(struct job (index cost data start-ms stop-ms) #:transparent) index : nonnegative-integer? cost : number? data : readable? start-ms : nonnegative-integer? stop-ms : nonnegative-integer?
The cost field is used by the priority queue of the server.
The data field contains readable? information that is sent to the worker for processing.
The fields start-ms and stop-ms are set automatically by the server when a job is sent to a worker and when the result is received.
3 Server
(require jobsched/server) | package: levintreesearch_cm |
procedure
(scheduler? v) → boolean?
v : any/c
procedure
(make-scheduler make-worker-command) → scheduler?
make-worker-command : (-> nonnegative-integer? list?)
See also make-racket-cmd.
procedure
(scheduler-add-job! sched #:data data [ #:cost cost]) → void? sched : scheduler? data : readable? cost : number? = 0
The data will be sent to the worker, who will receive it on its input port and will be accessible via job-data.
The cost is used for ordering the job in the priority queue, which is ordered by minimum cost.
procedure
(scheduler-start sched n-workers [ #:before-start before-start #:after-stop after-stop]) → void? sched : scheduler? n-workers : nonnegative-integer? before-start : (-> scheduler? job? any) = void after-stop : (-> scheduler? job? readable? any) = void
The callback before-start is called before a job is sent to a worker. The callback after-stop is called when a job is finished and the result is received from the worker. Both callbacks can be used to add new jobs to the queue, using scheduler-add-job!.
procedure
(processor-count) → nonnegative-integer?
4 Utilities
(require jobsched/utils) | package: levintreesearch_cm |
procedure
(make-racket-cmd path-to-prog [ #:submod submod #:errortrace? errortrace?] args ...) → (listof path-string?) path-to-prog : path-string? submod : (or/c symbol? #f) = #f errortrace? : any/c = #f args : path-string?
5 Worker
(require jobsched/worker) | package: levintreesearch_cm |
procedure
(start-worker run-job [#:silent? silent?]) → void?
run-job : (-> job? any) silent? : any/c = #f
See example at the top.
NOTICE: Any output before start-worker is called is processed by the server, who is waiting for a ready signal from the worker. It is advised to avoid any output before start-worker.
6 Simple server / worker
procedure
(start-simple-worker run [#:silent? silent?]) → void?
run : (-> readable? any) silent? : any/c = #f
procedure
(start-simple-server #:worker-file worker-file #:data-list data-list #:process-result process-result [ #:submod-name submod-name #:n-proc n-proc]) → void? worker-file : path-string? data-list : (listof readable?) process-result : (procedure-arity-includes/c 2) submod-name : symbol? = 'worker n-proc : integer? = (min (length data-list) (processor-count))
The workers will receive one element of data-list at a time, and return a result to be processed by process-result.
The server starts n-proc workers in separate OS processes.
Note: By contrast to scheduler-start, the simple server does not allow to add more tasks while it is running.
racket -l- jobsched/examples/server-worker-simple
"server-worker-simple.rkt"
#lang racket (require jobsched) ;=== Worker ===; (define (worker-run data) (match data [(list x y) (* x y)])) (module+ worker (start-simple-worker worker-run)) ;=== Server ===; (module+ main (require racket/runtime-path) (define-runtime-path this-file "server-worker-simple.rkt") (define (process-result data result) (printf "~a × ~a = ~a\n" (first data) (second data) result)) (define data-list (for*/list ([x 5] [y 5]) (list x y))) (start-simple-server #:worker-file this-file #:data-list data-list #:process-result process-result))
7 Comparison with other Racket parallelism mechanisms
Futures can make use of many cores and can share memory, but are limited in the kind of operations they can handle without blocking. Futures are well suited for numerical applications where garbage collection can be greatly reduced, allowing to make the most of the CPUs without requiring .
Places allow more free-form racket computation and can make use of multiple cores and can share memory, but are still constrained by a single garbage collector, which prevents from making full use of all the cores.
Distributed Places create separate Racket instances like jobsched but can also spawn workers on remote machines. When all workers are on the same machine, jobsched may be simpler to use.