Open  AI Chat  GPT Client
1 Introduction
2 Plugin System
<core-plugin-system>
3 Context and Protocol
<export>
<loggers>
<utilities>
<handlers>
<dispatcher>
<accumulator>
<context>
4 Test
<test>
5 Configuration
<configuration>
6 Communication
<communication>
7 Default Core
<default>
8 Commandline
<commandline>
<limit>
<input>
<main>
9 Outline
<*>
10 Release Notes
8.12

OpenAI ChatGPT Client🔗ℹ

1 Introduction🔗ℹ

[这个应用实际上是针对SICP 第三章“从函数式编程的视角看待时间”这个观点的一次实践。本文采用了文学式编程的风格, 包含了此应用绝大多数源码。还有一小部分源码位于private目录下,由于未加contract,因此本文档未涉及。]

这个应用实现了基于chatGPT API的简单的文本补全,用户可以优雅的处理输入流、对话上下文和token使用量。

由于难以统计prompt的token数量,这个应用并没有实现流式传输。

2 Plugin System🔗ℹ

(module core-pkg racket/base
  (require racket/contract)
  (provide (contract-out
            #:forall (message? request? response? content?)
            (struct core
              ((system-prompt message?)
               (make-request-json (-> (listof message?) request?))
               (retrieve-content-from-json (-> response? (listof content?)))
               (retrieve-usage-from-json (-> response? (values exact-nonnegative-integer?
                                                               exact-nonnegative-integer?
                                                               exact-nonnegative-integer?)))
               (merge-new-content-to-history (-> (or/c 'request 'response)
                                                 (listof content?)
                                                 (listof message?)
                                                 (listof message?)))
               (send/recv (-> (-> (or/c #f string?) any) request? response?))))
            (put (-> (and/c string? (lambda (nm) (not (hash-has-key? pkgs nm))))
                     core?
                     any))
            (get (-> string? core?))))
  (struct core (system-prompt
                make-request-json
                retrieve-content-from-json
                retrieve-usage-from-json
                merge-new-content-to-history
                send/recv))
  (define pkgs (make-hash))
  (define (put name core) (hash-set! pkgs name core))
  (define (get name) (hash-ref pkgs name)))

所有核心组件都通过这个包管理系统来注册和使用。在这里通过contract定义了core

3 Context and Protocol🔗ℹ

接下来我们要绑定如下这些标识符。

(provide token-logger total-token-logger prompt-token-logger completion-token-logger retry-logger
         (contract-out (context% (class/c (init-field (input (stream/c (or/c 'reset string? (listof string?))))
                                                      (probe (-> any/c any))
                                                      (retry-limit exact-nonnegative-integer?)
                                                      (core-structure core?)))))
 
         (struct-out exn:fail:chat)
         (struct-out exn:fail:chat:retry-limit))

导出这些logger是为了让用户更方便地获取token的使用量。 context%是这里的核心,在这里我们用contract的方式规定了各个初始化字段的内容。 此外还导出了一些新的异常以便用户作异常处理。

; 其他logger的parent logger
(define token-logger (make-logger #f (current-logger)))
; token总数
(define total-token-logger (make-logger #f token-logger))
; 提示的token数量
(define prompt-token-logger (make-logger #f token-logger))
; 补全的token数量
(define completion-token-logger (make-logger #f token-logger))
; 重试原因
(define retry-logger (make-logger #f (current-logger)))

(define (return-fail msg)
  (define (report str) (log-message retry-logger 'info 'Retry str))
  (define (message->string msg) (if msg msg "unknown"))
  (define str (message->string msg))
  (report str)
  ; The value field is always a string
  (Left str))
(define (log-tokens t p c #:prefix (prefix ""))
  (define (add-prefix sym) (string->symbol (string-append prefix (symbol->string sym))))
 
  (log-message total-token-logger 'info (add-prefix 'Tokens) (format "~a" t))
  (log-message prompt-token-logger 'info (add-prefix 'PromptTokens) (format "~a" p))
  (log-message completion-token-logger 'info (add-prefix 'CompletionTokens) (format "~a" c)))

报告token使用和异常的实用函数。

(define (normal history requests)
  (let/cc cc
    ; Send and receive data
    (define new-history (merge-new-content-to-history 'request requests (cdr history)))
    (define response
      (send/recv
       ; Retry occurs only when send/recv wants to raise an exception
       (lambda ((msg #f)) (cc (return-fail msg)))
       (make-request-json new-history)))
    ; Inform probes and loggers, and return updated history
    (let-values (((contents) (retrieve-content-from-json response))
                 ((total prompt completion) (retrieve-usage-from-json response)))
      (probe contents)
      (log-tokens total prompt completion)
 
      (Right (cons (map + (list total prompt completion) (car history))
                   (merge-new-content-to-history 'response contents new-history))))))
(define (reset history _)
  ; Conversations are discarded while token usage is preserved
  (Right (list (car history) system-prompt)))

在这里定义两种事件。

(define (dispatch history request)
  ; Dispatch in terms of the request
  (cond ((list? request) (normal history request))
        ((string? request) (normal history (list request)))
        (else (reset history 'reset))))

调度两种事件的处理。具体协议见前文contract。

(define (make-history-stream input)
  ; An accumulator represented as a stream
  (letrec ((history-stream
            (stream-cons #:eager
                         (list (list 0 0 0) system-prompt)
                         (stream-map*
                          (lambda (hs rq) (retry retry-limit (lambda () (dispatch hs rq))))
                          history-stream
                          input))))
    history-stream))

通过递归定义的stream建立反馈回路,整个stream就是一个usage和对话的收集器。

以下是context%的完整定义。

(module* context racket/base
  (require racket/base racket/class racket/stream racket/contract racket/match
           (submod ".." core-pkg)
           "private/stream.rkt" "private/error.rkt" "private/retry.rkt")
  <export>
  <loggers>
 
  (define context%
    (class object%
      (init-field input probe retry-limit
                  core-structure)
 
      (super-new)
 
      (match-define (core system-prompt
                          make-request-json
                          retrieve-content-from-json
                          retrieve-usage-from-json
                          merge-new-content-to-history
                          send/recv)
        core-structure)
 
      <utilities>
      <handlers>
      <dispatcher>
      <accumulator>
 
      ; Log the total amount of tokens
      (keyword-apply log-tokens '(#:prefix) '("All") (car (stream-last (make-history-stream input)))))))

4 Test🔗ℹ

可以参考下面这个测试用例使用logger和设计新的core。你也可以根据它理解整个程序的工作流程。

<test> ::=
(module* test racket/base
  ; Any code in this `test` submodule runs when this file is run using DrRacket
  ; or with `raco test`. The code here does not run when this file is
  ; required by another module.
 
  (require rackunit
           racket/generator racket/vector racket/class racket/list
           (submod ".." context) (submod ".." core-pkg))
 
  (define retry-log-receiver (make-log-receiver retry-logger 'info))
  (define token-log-receiver (make-log-receiver token-logger 'info))
 
  (define int-generator (generator () (let loop ((i 0)) (yield i) (loop (add1 i)))))
  (define (send/recv left js)
    (define int (int-generator))
    (if (and (>= int 3) (<= int 4))
        js
        (left "a")))
  (define input (in-list (list "1" (list "2" "3") 'reset)))
  (define input-generator (generator () (for ((v (in-list '("1" "3")))) (yield v))))
  (define (probe v) (check-equal? (list (input-generator)) v))
 
  (define tt 12)
  (define pt 6)
  (define ct 6)
 
  (define core-structure
    (core
     "system prompt"
     (lambda (history) history)
     (compose1 list last)
     (lambda (_) (values tt pt ct))
     (lambda (_ requests history)
       (append history requests))
     send/recv))
  (define name (symbol->string (gensym 'core)))
  (put name core-structure)
 
  (define (make-context limit)
    (new context%
         (input input)
         (probe probe)
         (retry-limit limit)
         (core-structure (get name))))
 
  (void (make-context 3))
 
  (define (log-message=? v1 v2) (check-equal? (vector-copy v1 0 2) v2))
  (log-message=? (sync retry-log-receiver) (vector 'info "Retry: a"))
  (log-message=? (sync retry-log-receiver) (vector 'info "Retry: a"))
  (log-message=? (sync retry-log-receiver) (vector 'info "Retry: a"))
  (log-message=? (sync token-log-receiver) (vector 'info (format "Tokens: ~a" tt)))
  (log-message=? (sync token-log-receiver) (vector 'info (format "PromptTokens: ~a" pt)))
  (log-message=? (sync token-log-receiver) (vector 'info (format "CompletionTokens: ~a" ct)))
  (log-message=? (sync token-log-receiver) (vector 'info (format "Tokens: ~a" tt)))
  (log-message=? (sync token-log-receiver) (vector 'info (format "PromptTokens: ~a" pt)))
  (log-message=? (sync token-log-receiver) (vector 'info (format "CompletionTokens: ~a" ct)))
  (log-message=? (sync token-log-receiver) (vector 'info (format "AllTokens: ~a" (* 2 tt))))
  (log-message=? (sync token-log-receiver) (vector 'info (format "AllPromptTokens: ~a" (* 2 pt))))
  (log-message=? (sync token-log-receiver) (vector 'info (format "AllCompletionTokens: ~a" (* 2 ct))))
 
  (check-exn
   (lambda (e)
     (and (exn:fail:chat:retry-limit? e)
          (string=? (exn-message e)
                    "make-retry: hit the limit\nDepth: 7\nIts last attempt fails due to:\n\ta")))
   (lambda () (make-context 6))))

5 Configuration🔗ℹ

这个部分主要是配置程序运行的环境,包括很多参数,可以通过这些源代码了解哪些参数必须提供、哪些参数有默认值以及各个参数应该设置为什么值。

之所以单独设置一个模块,一方面是为了提供除命令行参数以外另一种配置方式,使配置更灵活(例如extra-proxiesmoduleprobe);另一方面则是为了便于检查。

(module config racket/base
  (require racket/contract net/http-easy)
  (provide (contract-out
            (url-prefix (box/c string?))
            (core-name (box/c string?))
            (extra-proxies (box/c (listof proxy?)))
            (model (box/c string?))
            (system (box/c string?))
            (interact? (box/c boolean?))
            (token (box/c (or/c #f string?)))
            (module (box/c (or/c #f module-path?)))
            (probe (box/c (-> any/c any)))
            (request-timeout (box/c (and/c real? positive?)))
            (idle-timeout (box/c (and/c real? positive?)))
            (rate-limit (box/c (and/c real? positive?)))
            (retry-limit (box/c exact-nonnegative-integer?))))
  (define url-prefix (box "https://api.openai.com"))
  (define core-name (box "default"))
  (define extra-proxies (box null))
  (define model (box "gpt-3.5-turbo"))
  (define system (box "You are a helpful assistant."))
  (define interact? (box #t))
  (define token (box #f))
  (define module (box #f))
  (define probe (box (lambda (l) (map displayln l))))
  (define request-timeout (box 600))
  (define idle-timeout (box 600))
  (define rate-limit (box 2))
  (define retry-limit (box 2)))

6 Communication🔗ℹ

我们使用http-easy库绑定与服务器交互的相关函数。

send/recv的例子见Default Core一节。

(module* communication racket/base
  (require racket/match racket/list racket/contract
           net/http-easy net/url
           (submod ".." config))
  (provide (contract-out
            (make-send/recv
             (->i ((method method/c)
                   (auth (-> string? auth-procedure/c))
                   (payload (-> any/c payload-procedure/c))
                   (handle (-> input-port? any))
                   (kws (listof keyword?))
                   (args (kws) (lambda (v) (and (list? v) (= (length v) (length kws))))))
                  any))))
 
  ; A procedure used to wrap make-send/recv
  ; Currently HTTP(S) proxies are supported
  (define (make-send/recv method auth payload handle other-kws other-args)
      (let* (; Proxies
             ; Records: (listof (list/c <scheme> (or/c #f <server>) <maker>))
             ; Server: any/c
             ; Maker: (-> <server> proxy?)
             (format-http*-proxy-server
              (lambda (server)
                (match server
                  ((list scheme host port)
                   (url->string
                    (make-url scheme
                              #f
                              host
                              port
                              #f
                              null
                              null
                              #f))))))
             (proxy-records (list (list "http" (proxy-server-for "http") (compose1 make-http-proxy format-http*-proxy-server))
                                  (list "https" (proxy-server-for "https") (compose1 make-https-proxy format-http*-proxy-server))))
             (proxies
              (append (unbox extra-proxies)
                      (filter-map
                       (lambda (record)
                         (and (cadr record) ((caddr record) (cadr record))))
                       proxy-records)))
             ; Pool and session configuration
             (pool-config (make-pool-config #:idle-timeout (unbox idle-timeout)))
             (session (make-session #:proxies proxies #:pool-config pool-config))
             ; Timeout configuration
             (timeout-config (make-timeout-config #:request (unbox request-timeout)))
             (call/handler
              (lambda (fail proc)
                (with-handlers ((exn:fail:http-easy:timeout?
                                 (lambda (exn) (fail (format "~a: timed out" (exn:fail:http-easy:timeout-kind exn)))))
                                (exn:fail:http-easy? (lambda (exn) (fail (exn-message exn)))))
                  (proc))))
             ; The url constant
             (url (string->url (string-append (unbox url-prefix) "/v1/chat/completions")))
             ; The token constant
             (token-string (unbox token))
             ; How to handle the response
             (stream?
              (let ((index (index-of other-kws '#:stream)))
                (and index (list-ref other-args index))))
             (close?
              (let ((index (index-of other-kws '#:close)))
                (and index (list-ref other-args index))))
             ; Sorted keyword arguments
             (sorted-pairs (sort (map cons other-kws other-args) keyword<? #:key car))
             (sorted-kws (map car sorted-pairs))
             (sorted-args (map cdr sorted-pairs))
             ; A normal send/recv function
             (call/response/safe (lambda (handle response)
                                   (dynamic-wind void
                                                 (lambda () (handle response))
                                                 (lambda () (response-close! response)))))
             (send/recv
              (lambda (fail input)
                (let ((response (keyword-apply
                                 session-request
                                 sorted-kws
                                 sorted-args
                                 session url
                                 null
                                 #:method method
                                 #:timeouts timeout-config
                                 #:auth (auth token-string)
                                 #:data (payload input))))
                  (define status-code (response-status-code response))
                  (cond ((= status-code 200)
                         (call/response/safe
                          (lambda (response)
                            (handle (if (or close? (not stream?))
                                        (open-input-bytes (response-body response))
                                        (response-output response))))
                          response))
                        (else
                         (call/response/safe
                          (lambda (response)
                            (define body (response-body response))
                            (fail (format "Status code: ~a\nBody: ~s" status-code body)))
                          response)))))))
        (plumber-add-flush! (current-plumber) (lambda (_) (session-close! session)))
        ; A send/recv function running in a proper environment
        (lambda (fail input)
          (call/handler
           fail
           (lambda ()
             (parameterize ((current-session session))
               (send/recv fail input))))))))

7 Default Core🔗ℹ

默认的core实现如下。这个core实现了与openai api的非流交互。 因为流式传输无法使用racket实现完整的token统计。

如果用户需要自定义一套实用的core,可以参考这个模块。

(module* default-core-pkg racket/base
  (require
   net/http-easy
   json
   (submod ".." communication)
   (rename-in (only-in (submod ".." core-pkg) core put) (put pkg-put))
   (submod ".." config))
  (provide install-default)
 
  ; Utilities
  (define (make-message role cont)
    (hasheq 'role role 'content cont))
  (define (retrieve-content js)
    (list
     (hash-ref
      (hash-ref
       (list-ref
        (hash-ref js 'choices)
        0)
       'message)
      'content)))
  (define (retrieve-usage js)
    (let ((table (hash-ref js 'usage)))
      (values (hash-ref table 'total_tokens)
              (hash-ref table 'prompt_tokens)
              (hash-ref table 'completion_tokens))))
  (define (make-request model history)
    (hasheq 'model model 'messages history))
  (define (merge mode items history)
    (append history
            (map
             (lambda (item) (make-message (if (eq? mode 'request) "user" "assistant") item))
             items)))
 
  (define (install-default)
    (define core-structure (core (make-message "system" (unbox system))
                                 (lambda (history) (make-request (unbox model) history))
                                 retrieve-content
                                 retrieve-usage
                                 merge
                                 (make-send/recv
                                  'post bearer-auth json-payload read-json
                                  '(#:close?) '(#t))))
    (pkg-put "default" core-structure)))

8 Commandline🔗ℹ

接下来这个部分是命令行程序,已在info.rkt中注册为
  raco
命令,若要直接运行请使用
  racket -t

首先解析命令行参数。这里规定了三种模式。

flags

input

interactive?

-I

(current-input-port)

N

None

(current-input-port)

Y

-p <mod>

input-stream

N

在这里对比一下modulepatch

类型

作用

要求

加载方式

module

提供输入流

必须provide一个input-stream作为输入流

命令行设置或直接设置参数

patch

任意用途

无要求,只要是个racket模块即可

只能通过命令行设置

关于加载的过程,见dynamic-require

必需的参数这里也进行了检查。 patch在检查之前执行,因此无法脱离检查。 而module在检查后导入,因此在下面构建输入流时,需使用stream-lazy,需要使用输入流时再导入模块,这样module当中设置的参数就无效了。

(command-line
 #:program (short-program+command-name)
 #:once-each
 [("-u" "--url") u "Specify the URL prefix." (set-box! url-prefix u)]
 [("-C" "--core") c "Specify the core." (set-box! core-name c)]
 [("-m" "--model") m "Specify the model." (set-box! model m)]
 [("-s" "--system") s "Specify the system prompt." (set-box! system s)]
 [("-I" "--no-interact") "Turn off the interactive mode." (set-box! interact? #f)]
 [("-t" "--token") s "Specify the openai token." (set-box! token s)]
 [("-p" "--module") p "Specify the module to be imported dynamically." (set-box! module (list 'file p))]
 [("-c" "--patch") p "Specify the patch to be executed dynamically." (dynamic-require (list 'file p) #f)]
 [("-r" "--request-timeout") r "Specify how long to wait on a request." (set-box! request-timeout (string->number r))]
 [("-i" "--idle-timeout") i "Specify how long to wait on an idle connection." (set-box! idle-timeout (string->number i))]
 [("-l" "--rate-limit") l "Specify the number of times the client can access the server within a minute." (set-box! rate-limit (string->number l))]
 [("-n" "--retry-limit") l "Specify the number of times the client can re-send a request." (set-box! retry-limit (string->number l))]
 #:ps
 "1. The interactive mode is automatically turned off when `-p` or `--module-path` is supplied."
 "2. The module to be dynamically imported must provide `input-stream` which is a stream of strings, `'reset`s or lists of strings."
 "3. The patch to be dynamically executed is an arbitrary racket module and you can configure the program in this file."
 "4. You can use commas to input commands and blocks when running the driver loop in the interactive mode."
 #:args ()
 ; Additional checks
 (cond ((not (unbox token)) (raise (make-exn:fail:user "You must provide your openai token." (current-continuation-marks))))))

接下来对输入流作速率限制。这里又使用了stream,实际上限制的是每一次输入及其之前输入的平均速率。

; Make input streams with rate limits
(define (make-limited-stream input limit)
  (letrec ((delayed-start (delay (current-inexact-milliseconds)))
           (least-interval (/ 60000.0 limit))
           (record-stream
            (stream-cons
             ; Adding the interval is unnecessary in the beginning
             #:eager (list 0 'reset)
             (stream-map*
              (lambda (r i)
                (cond ((or (string? i) (list? i))
                       (sync (handle-evt (alarm-evt (+ (force delayed-start) (* least-interval (car r))))
                                         (lambda (_) (list (add1 (car r)) i)))))
                      (else (list (car r) i))))
              record-stream
              input))))
    (stream-map cadr record-stream)))

driver loop在这里直接用输入流表示,如前所述,一种是通过模块导入,一种是从标准输入读取。

(define (handle-line line)
  (match line
    ((regexp #rx"^,(.*)$" (list _ datum-string)) (read (open-input-string datum-string)))
    (_ line)))
(define input-stream
  (cond ((unbox module)
         ; The module is loaded when the stream is needed.
         (stream-lazy (dynamic-require (unbox module) 'input-stream)))
        (else
         ; The interactive mode works only when `(unbox module)` returns false
         (cond ((unbox interact?) (displayln (format "I'm ~a. Can I help you?" (unbox model)))))
         (letrec ((read-requests (lambda (in)
                                   (cond ((unbox interact?) (display "> ")))
                                   (define line (handle-line (read-line in)))
                                   (if (eof-object? line) empty-stream (stream-cons #:eager line (read-requests in))))))
           (read-requests (current-input-port))))))

最后让我们把main模块组装好吧!

<main> ::=
(module* main racket/base
  ; (Optional) main submodule. Put code here if you need it to be executed when
  ; this file is run using DrRacket or the `racket` executable. The code here
  ; does not run when this file is required by another module. Documentation:
  ; http://docs.racket-lang.org/guide/Module_Syntax.html#%28part._main-and-test%29
 
  (require racket/cmdline racket/match racket/class racket/stream racket/promise
           (submod ".." context) (submod ".." config) (submod ".." default-core-pkg)
           (rename-in (only-in (submod ".." core-pkg) get) (get pkg-get))
           "private/stream.rkt"
           raco/command-name)
 
  <commandline>
 
  ; Install the default package
  (install-default)
 
  <limit>
  <input>
 
  (void
   (new context%
        (retry-limit (unbox retry-limit))
        (input (make-limited-stream input-stream (unbox rate-limit)))
        (probe (unbox probe))
        (core-structure (pkg-get (unbox core-name))))))

从这里可以发现,想要从程序中安全退出有且只有一种方式,即终止输入流。

9 Outline🔗ℹ

Racket的文学式编程语言要求要有一个提纲把文档所有内容收集起来,从而汇编为可运行的程序。

<*> ::=

10 Release Notes🔗ℹ