This commit is contained in:
2026-01-18 22:28:17 -05:00
parent 56dde9cf91
commit e2048d8b69
8 changed files with 7917 additions and 35 deletions
+1 -1
View File
@@ -11,7 +11,7 @@
metosin/reitit {:mvn/version "0.7.0-alpha7"}
;; WebSocket
info.sunng/ring-jetty9-adapter {:mvn/version "0.30.0"}
info.sunng/ring-jetty9-adapter {:mvn/version "0.14.3"}
;; Database
com.github.seancorfield/next.jdbc {:mvn/version "1.3.894"}
+37 -14
View File
@@ -48,16 +48,28 @@
(try
(let [data (json/read-value line mapper)]
(case (:type data)
;; System init message contains session_id
"system" (when (= (:subtype data) "init")
{:event :init
:session-id (:session_id data)
:cwd (:cwd data)})
;; User message (from history)
"user" {:role :user
:content (get-in data [:message :content])
:timestamp (:timestamp data)
:uuid (:uuid data)}
"assistant" {:role :assistant
:content (get-in data [:message :content])
:timestamp (:timestamp data)
:uuid (:uuid data)
:metadata {:model (get-in data [:message :model])
:stop-reason (get-in data [:message :stopReason])}}
;; Assistant response - extract text content
"assistant" (let [content-blocks (get-in data [:message :content])
text-content (->> content-blocks
(filter #(= (:type %) "text"))
(map :text)
(clojure.string/join ""))]
{:event :content-delta
:text text-content
:role :assistant
:uuid (:uuid data)
:metadata {:model (get-in data [:message :model])
:stop-reason (get-in data [:message :stop_reason])}})
;; Stream events from --output-format stream-json
"content_block_start" {:event :content-start
:index (:index data)
@@ -72,10 +84,11 @@
"message_delta" {:event :message-delta
:stop-reason (get-in data [:delta :stop_reason])}
"message_stop" {:event :message-stop}
;; Result contains final content and session_id
"result" {:event :result
:content (get-in data [:result :assistant :content])
:cost (:cost data)
:session-id (:session_id data)}
:content (:result data)
:session-id (:session_id data)
:cost (:total_cost_usd data)}
;; Unknown type
{:raw data}))
(catch Exception e
@@ -111,11 +124,14 @@
(spawn-session [_ session-id opts]
(let [{:keys [working-dir model permission-mode]} opts
;; Build base args - only include --resume if we have a session-id
;; --verbose is required when using --print with --output-format=stream-json
args (cond-> ["claude"
"--resume" session-id
"--output-format" "stream-json"
"--input-format" "stream-json"
"--verbose"
"--print"]
session-id (conj "--resume" session-id)
model (conj "--model" model)
permission-mode (conj "--permission-mode" permission-mode))
pb (ProcessBuilder. args)]
@@ -130,7 +146,10 @@
(send-message [_ {:keys [stdin]} message]
(try
(let [json-msg (json/write-value-as-string {:type "user" :content message})]
;; Claude stream-json format: {"type":"user","message":{"role":"user","content":"..."}}
(let [json-msg (json/write-value-as-string {:type "user"
:message {:role "user"
:content message}})]
(.write stdin json-msg)
(.newLine stdin)
(.flush stdin)
@@ -143,9 +162,13 @@
(try
(loop []
(when-let [line (.readLine stdout)]
(when-let [parsed (proto/parse-output this line)]
(callback parsed))
(recur)))
(let [parsed (proto/parse-output this line)]
(when parsed
(callback parsed))
;; Stop reading after result event - response is complete
(if (= :result (:event parsed))
(log/debug "Received result, stopping stream read")
(recur)))))
(catch Exception e
(log/debug "Stream ended:" (.getMessage e)))))
+18 -15
View File
@@ -82,25 +82,28 @@
(defn create-ws-listener
"Create a WebSocket listener"
[]
(reify WebSocketListener
(onWebSocketConnect [_ session]
(log/debug "WebSocket connected")
(.add all-connections session)
(send-to-ws session {:type "connected"}))
(let [session-atom (atom nil)]
(reify WebSocketListener
(onWebSocketConnect [_ session]
(reset! session-atom session)
(log/debug "WebSocket connected")
(.add all-connections session)
(send-to-ws session {:type "connected"}))
(onWebSocketText [_ session message]
(handle-message session message))
(onWebSocketText [_ message]
(handle-message @session-atom message))
(onWebSocketBinary [_ _session _payload _offset _len]
(log/debug "Binary WebSocket message ignored"))
(onWebSocketBinary [_ _payload _offset _len]
(log/debug "Binary WebSocket message ignored"))
(onWebSocketClose [_ session status-code reason]
(log/debug "WebSocket closed:" status-code reason)
(.remove all-connections session)
(unsubscribe-from-all session))
(onWebSocketClose [_ status-code reason]
(log/debug "WebSocket closed:" status-code reason)
(when-let [session @session-atom]
(.remove all-connections session)
(unsubscribe-from-all session)))
(onWebSocketError [_ _session cause]
(log/warn "WebSocket error:" (.getMessage cause)))))
(onWebSocketError [_ cause]
(log/warn "WebSocket error:" (.getMessage cause))))))
(defn ws-handler
"Ring handler for WebSocket upgrade"
+8 -2
View File
@@ -101,8 +101,14 @@
;; Accumulate text content
(when-let [text (:text event)]
(.append content-buffer text))
;; On message stop, save the accumulated message
(when (= :message-stop (:event event))
;; Capture external session-id from init or result event (for new sessions)
(when (and (contains? #{:init :result} (:event event))
(:session-id event)
(not (:external-id session)))
(log/debug "Capturing external session-id:" (:session-id event))
(db/update-session store session-id {:external-id (:session-id event)}))
;; On result event, save the accumulated message
(when (= :result (:event event))
(let [content (.toString content-buffer)]
(when (seq content)
(db/save-message store {:session-id session-id