message-loop
1 Introduction
2 Examples
3 API
3.1 Processing
start-message-loop
stop-message-loop
3.2 Messages
message-type?
message+  +
message.type
message.source
message.data
post-message
3.3 Listeners
listener+  +
listener.listen-for
listener.action
listener.id
listener.comm
add-listener
add-listeners
8.12

message-loop🔗ℹ

David K. Storrs

 (require message-loop) package: message-loop

1 Introduction🔗ℹ

Provides a single-threaded message loop that can be used as an aggregation point to trigger arbitrary actions. Essentially a thread that listens on an async channel, sends the result through a dispatch table, and then loops.

The message loop can be started before or after listeners are added. When a message is posted, all listeners who have registered for that type of message will have a chance to react, in an unspecified order.

2 Examples🔗ℹ

> (require struct-plus-plus racket/async-channel rx-tx-async-channel)
> (require message-loop)
> (struct++ person (name age) #:transparent)
> (start-message-loop)
> (define comm-ch (rx-tx-async-channel++))
> (define ch (make-async-channel))
; Listen for 'birth messages and send out a nicely formatted string
> (add-listener (listener++ #:listen-for '(birth)
                            #:id         'listener1
                            #:action     (λ (l msg)
                                             (define p (message.data msg))
                                             (async-channel-put ch
                                                                (format "listener ~a, message ~v. ~a was born"
                                                                        (listener.id l)
                                                                        (message-type msg)
                                                                        (person-name p))))))
> (post-message (message++ #:type 'birth #:data (person "Bob" 17)))
; listener notices that bob was born, puts that fact on the channel
> (println (sync ch))

"listener listener1, message 'birth. Bob was born"

; Add multiple listeners at once, one for 'birthday messages and one for 'birthday-modify messages
> (add-listeners
     (listener++ #:listen-for '(birthday)
                 #:id         'listener2
                 #:action     (λ (l msg)
                                (define p (message.data msg))
                                (async-channel-put
                                 ch
                                 (format "listener ~a heard that ~a had a birthday at time ~a"
                                         (listener.id l)
                                         (person.name p)
                                         1234))))
     (listener++ #:listen-for '(birthday-modify)
                 #:id         'listener3
                 #:comm       comm-ch
                 #:action     (λ (l msg)
                                (match-define (struct* message
                                                       ([data (and prsn
                                                                   (struct person (name age)))]))
                                  msg)
                                (define new-age (add1 age))
                                (async-channel-put (rx-tx-async-channel.to-parent
                                                    (listener.comm l))
                                                   (set-person-age prsn new-age))))
     (listener++ #:listen-for '(type-only)
                 #:id 'type-only-listener
                 #:action (λ (l msg) (async-channel-put ch msg))))
; post a birthday for bob, get back a string describing it
> (post-message (message++ #:type 'birthday #:data (person "Bob" 17)))
> (println (async-channel-get ch))

"listener listener2 heard that Bob had a birthday at time 1234"

; post a birthday-modify for bob, get back an updated version of bob
> (post-message (message++ #:type 'birthday-modify #:data (person "Bob" 17)))
> (println (sync (rx-tx-async-channel.to-parent comm-ch)))

(person "Bob" 18)

; post a birthday for Alice to demonstrate that the listener didn't stop after running once
> (post-message (message++ #:type 'birthday #:data (person "Alice" 24)))
> (println (async-channel-get ch))

"listener listener2 heard that Alice had a birthday at time 1234"

; ditto for birthday-modify
> (post-message (message++ #:type 'birthday-modify #:data (person "Alice" 24)))
> (println (sync (rx-tx-async-channel.to-parent comm-ch)))

(person "Alice" 25)

; post a message type. the message struct will be created
> (post-message 'type-only)
> (println (async-channel-get ch))

#<message>

; multiple listeners can trigger from a single message
> (define ch (make-async-channel))
> (add-listeners (listener++ #:listen-for '(multi)
                             #:id         'listener-X
                             #:action     (λ (l msg)
                                            (async-channel-put
                                             ch
                                             (listener.id l))))
                 (listener++ #:listen-for '(multi)
                             #:id         'listener-Y
                             #:action     (λ (l msg)
                                            (async-channel-put
                                             ch
                                             (listener.id l)))))
; We will get back the IDs of the listeners that receive the message
> (post-message (message++ #:type 'multi))
> (println (sort (for/list ([i 2]) (sync ch)) symbol<?))

'(listener-X listener-Y)

> (define ch (make-async-channel))
> (define bob (person "Bob" 42))
; One listener that listens for both 'matriculate and 'graduate messages
> (add-listener (listener++ #:listen-for '(matriculate graduate)
                            #:id         'multiple-type-listener
                            #:action     (λ (l msg)
                                           (define p (message.data msg))
                                           (async-channel-put ch
                                                              (format "listener ~a, message type ~v for ~a"
                                                                      (listener.id l)
                                                                      (message-type msg)
                                                                      (person.name p))))))
; Receive a 'matriculate message
> (post-message (message++ #:type 'matriculate #:data bob))
> (println (sync ch))

"listener multiple-type-listener, message type 'matriculate for Bob"

; Receive a 'graduate message
> (post-message (message++ #:type 'graduate #:data bob))
> (println (sync ch))

"listener multiple-type-listener, message type 'graduate for Bob"

3 API🔗ℹ

3.1 Processing🔗ℹ

procedure

(start-message-loop)  any

Begin the message processing. Listeners can be added before this is called. Messages posted before this is called will remain queued for processing until it is called.

procedure

(stop-message-loop)  any

Terminates the message processing thread.

3.2 Messages🔗ℹ

The message structure is used to signal that interested listeners should activate.

procedure

(message-type? arg)  boolean?

  arg : any/c
An alias for symbol?. Used for futureproofing.

procedure

(message++ #:type type    
  [#:source source    
  #:data data])  message?
  type : message-type?
  source : any/c = #f
  data : any/c = #f
Keyword constructor for the message struct. source is intended to specify who created the message while data can carry any message-specific information to be used by the listener.

procedure

(message.type msg)  message-type?

  msg : message?
(message.source msg)  any/c
  msg : message?
(message.data msg)  any/c
  msg : message?
Accessors for each of the fields in the message struct.

procedure

(post-message msg)  any

  msg : message?
Sends a message to the processing thread. It will then be farmed out to all relevant listeners. If start-message-loop has not been called, the message will be queued until the loop is started.

3.3 Listeners🔗ℹ

The listener structure defines what message types to listen for and what to do with them.

procedure

(listener++ #:listen-for message-types    
  #:action action    
  [#:id id    
  #:comm comm])  listener?
  message-types : (or/c (listof message-type?) message-type?)
  action : (-> listener? message? any)
  id : symbol? = (gensym "listener-")
  comm : rx-tx-async-channel? = (rx-tx-async-channel++)
Keyword constructor for the listener struct.

listen-for is a list of message types to listen for. NOTE: As a convenience, you may specify a single message type and it will be converted to a one-element list in the process of creating the struct.

action is the procedure that will be called when the relevant message type comes in. It is called with the listener itself and with the message that triggered the listener.

id allows you to easily distinguish between listeners.

comm provides a pair of async channels that can be used to communicate to and from the listener.

procedure

(listener.listen-for l)  (listof message-type?)

  l : listener?
(listener.action l)  (-> listener? message? any)
  l : listener?
(listener.id l)  symbol?
  l : listener?
(listener.comm l)  rx-tx-async-channel?
  l : listener?
Accessors for the various fields of a listener.

procedure

(add-listener l)  any

  l : listener?
Notify the message processor to use this listener. Listeners can be added before the message processing loop is started (cf start-message-loop) but, obviously, processing will not happen until then.

procedure

(add-listeners l ...)  any

  l : listener?
Add multiple listeners at a time.