Jobsched:   Running jobs with multiple Racket instances
1 Example
2 Job
job
3 Server
scheduler?
worker?
make-scheduler
scheduler-add-job!
scheduler-start
processor-count
4 Utilities
make-racket-cmd
5 Worker
start-worker
6 Simple server /   worker
start-simple-worker
start-simple-server
7 Comparison with other Racket parallelism mechanisms
8.12

Jobsched: Running jobs with multiple Racket instances🔗ℹ

Laurent Orseau

 (require jobsched) package: levintreesearch_cm

Jobsched is a job scheduler with a server-worker architecture on a single machine, where the server dispatches a number of jobs to the workers.

All workers are identical and are able to process any job assigned by the server. The workers are independent racket programs, each with its own Racket instance. See Comparison with other Racket parallelism mechanisms for more information.

The workflow is as follows:
  1. The server (a Racket program) is started.

  2. N jobs are added to the job queue in the server. A job is specified as read-able data.

  3. The server starts K workers (independent Racket programs).

  4. Each worker signals on their output ports to the server that they are ready to receive jobs, and listens to events on its input port.

  5. As soon as a worker is ready, the server sends it one of the remaining jobs.

  6. When a worker has finished a job, it sends the result to the server, and waits for another job.

  7. When the server receives the result of a job, it processes this result and sends a new job (if any is remaining) to the worker.

  8. When all N jobs are finished, the server finishes.

If your machine has K cores and sufficient memory, jobsched can run K workers in parallel and benefit from a speedup of a factor K. Jobsched has been successfully used with more than 120 workers in parallel, with more than 100 000 fast-paced jobs to dispatch between them.

1 Example🔗ℹ

Here is a simple example of a server-worker architecture. The first file contains the definition of the server:

"adder-server.rkt"

#lang racket
(require jobsched)
 
;; This is the command to start a worker.
;; Creates a new Racket instance.
(define (make-worker-command _worker-index)
  (make-racket-cmd "adder-worker.rkt"))
 
;; Create a schedule, and tell it how to start a worker
(define sched (make-scheduler make-worker-command))
 
;; Schedule some jobs
(for* ([num1 (in-range 10)] [num2 (in-range 10)])
  (scheduler-add-job! sched #:data (list num1 num2)))
 
(define (process-result sched jb result)
  (match (job-data jb)
    [(list num1 num2)
     (printf "~a + ~a = ~a\n" num1 num2 result)]))
 
;; Start the server
(scheduler-start sched (processor-count) #:after-stop process-result)
And the second file contains the definition of the worker:

"adder-worker.rkt"

#lang racket
(require jobsched)
 
(define (run-job jb)
  (match (job-data jb)
    [(list num1 num2) (+ num1 num2)]
    [else 0]))
 
(module+ main
  (start-worker run-job))

All definitions exported by the various modules below are also exported by jobsched.

See also the one-file example in examples/server-worker.

2 Job🔗ℹ

The bindings in this section are also exported by jobsched.

struct

(struct job (index cost data start-ms stop-ms)
    #:transparent)
  index : nonnegative-integer?
  cost : number?
  data : readable?
  start-ms : nonnegative-integer?
  stop-ms : nonnegative-integer?
This structure is usually used only for accessing information by a worker or by the server.

The cost field is used by the priority queue of the server.

The data field contains readable? information that is sent to the worker for processing.

The fields start-ms and stop-ms are set automatically by the server when a job is sent to a worker and when the result is received.

3 Server🔗ℹ

The bindings in this section are also exported by jobsched.

procedure

(scheduler? v)  boolean?

  v : any/c

procedure

(worker? v)  boolean?

  v : any/c

procedure

(make-scheduler make-worker-command)  scheduler?

  make-worker-command : (-> nonnegative-integer? list?)
Returns a scheduler which will use make-worker-command to start the workers’ Racket processes.

See also make-racket-cmd.

procedure

(scheduler-add-job! sched    
  #:data data    
  [#:cost cost])  void?
  sched : scheduler?
  data : readable?
  cost : number? = 0
Adds a job to the scheduler’s queue. A job can be added either before starting the scheduler, or during, using the #:before-start and #:after-stop arguments of scheduler-start.

The data will be sent to the worker, who will receive it on its input port and will be accessible via job-data.

The cost is used for ordering the job in the priority queue, which is ordered by minimum cost.

procedure

(scheduler-start sched    
  n-workers    
  [#:before-start before-start    
  #:after-stop after-stop])  void?
  sched : scheduler?
  n-workers : nonnegative-integer?
  before-start : (-> scheduler? job? any) = void
  after-stop : (-> scheduler? job? readable? any) = void
Starts a scheduler. n-workers racket instances are started on the same machine.

The callback before-start is called before a job is sent to a worker. The callback after-stop is called when a job is finished and the result is received from the worker. Both callbacks can be used to add new jobs to the queue, using scheduler-add-job!.

procedure

(processor-count)  nonnegative-integer?

Re-exported from racket/future.

4 Utilities🔗ℹ

The bindings in this section are also exported by jobsched.

procedure

(make-racket-cmd path-to-prog 
  [#:submod submod 
  #:errortrace? errortrace?] 
  args ...) 
  (listof path-string?)
  path-to-prog : path-string?
  submod : (or/c symbol? #f) = #f
  errortrace? : any/c = #f
  args : path-string?
Creates a command line to call the racket program path-to-prog. If submod is specificied, the corresponding submodule is called instead. (For example I like to use a worker submodule.) By default, the main submodule is used if available, or the main function if available. The additional command-line arguments args are passed to the program, which may choose to parse them. Note that path? arguments are turned automatically into strings by Racket’s primitives.

5 Worker🔗ℹ

The bindings in this section are also exported by jobsched.

procedure

(start-worker run-job [#:silent? silent?])  void?

  run-job : (-> job? any)
  silent? : any/c = #f
Starts a worker which waits for jobs. Each time a job is received, the run-job procedure is called. The data of the job can be retrieved with (job-data job). If silent? is not #f, all output of run-job to its output port is suppressed—the error port remains untouched.

See example at the top.

NOTICE: Any output before start-worker is called is processed by the server, who is waiting for a ready signal from the worker. It is advised to avoid any output before start-worker.

6 Simple server / worker🔗ℹ

procedure

(start-simple-worker run [#:silent? silent?])  void?

  run : (-> readable? any)
  silent? : any/c = #f
Like start-worker except that run accepts the data of the job rather than the job. This masks the job object.

procedure

(start-simple-server #:worker-file worker-file    
  #:data-list data-list    
  #:process-result process-result    
  [#:submod-name submod-name    
  #:n-proc n-proc])  void?
  worker-file : path-string?
  data-list : (listof readable?)
  process-result : (procedure-arity-includes/c 2)
  submod-name : symbol? = 'worker
  n-proc : integer? = (min (length data-list) (processor-count))
Creates and starts a server, like scheduler-start, but hiding the scheduler and the job objects. The worker is assumed to be in the racket file worker-file in the submodule submod-name. See start-simple-worker.

The workers will receive one element of data-list at a time, and return a result to be processed by process-result.

The server starts n-proc workers in separate OS processes.

Note: By contrast to scheduler-start, the simple server does not allow to add more tasks while it is running.

Try the following example with:
racket -l- jobsched/examples/server-worker-simple

"server-worker-simple.rkt"

#lang racket
(require jobsched)
 
;=== Worker ===;
 
(define (worker-run data)
  (match data
    [(list x y)
     (* x y)]))
 
(module+ worker
  (start-simple-worker worker-run))
 
;=== Server ===;
 
(module+ main
  (require racket/runtime-path)
 
  (define-runtime-path this-file "server-worker-simple.rkt")
 
  (define (process-result data result)
    (printf "~a × ~a = ~a\n" (first data) (second data) result))
 
  (define data-list
    (for*/list ([x 5] [y 5]) (list x y)))
  
  (start-simple-server
   #:worker-file this-file
   #:data-list data-list
   #:process-result process-result))
 

7 Comparison with other Racket parallelism mechanisms🔗ℹ

Futures can make use of many cores and can share memory, but are limited in the kind of operations they can handle without blocking. Futures are well suited for numerical applications where garbage collection can be greatly reduced, allowing to make the most of the CPUs without requiring .

Places allow more free-form racket computation and can make use of multiple cores and can share memory, but are still constrained by a single garbage collector, which prevents from making full use of all the cores.

Distributed Places create separate Racket instances like jobsched but can also spawn workers on remote machines. When all workers are on the same machine, jobsched may be simpler to use.