managed sessions only. allow for rename/delete

This commit is contained in:
2026-01-19 19:34:58 -05:00
parent e2048d8b69
commit 313ac44337
32 changed files with 1759 additions and 331 deletions
+2 -3
View File
@@ -2,8 +2,7 @@
:deps {org.clojure/clojure {:mvn/version "1.11.1"}
;; Web server
ring/ring-core {:mvn/version "1.10.0"}
ring/ring-jetty-adapter {:mvn/version "1.10.0"}
ring/ring-core {:mvn/version "1.12.2"}
ring/ring-json {:mvn/version "0.5.1"}
ring-cors/ring-cors {:mvn/version "0.1.13"}
@@ -11,7 +10,7 @@
metosin/reitit {:mvn/version "0.7.0-alpha7"}
;; WebSocket
info.sunng/ring-jetty9-adapter {:mvn/version "0.14.3"}
info.sunng/ring-jetty9-adapter {:mvn/version "0.33.4"}
;; Database
com.github.seancorfield/next.jdbc {:mvn/version "1.3.894"}
+25 -8
View File
@@ -85,10 +85,25 @@
:stop-reason (get-in data [:delta :stop_reason])}
"message_stop" {:event :message-stop}
;; Result contains final content and session_id
"result" {:event :result
:content (:result data)
:session-id (:session_id data)
:cost (:total_cost_usd data)}
"result" (let [denials (:permission_denials data)]
(cond-> {:event :result
:content (:result data)
:session-id (:session_id data)
:cost (:total_cost_usd data)}
;; If there are permission denials, emit permission-request event
(seq denials)
(assoc :permission-request
{:tools (vec (distinct (map :tool_name denials)))
:denials (mapv (fn [{:keys [tool_name tool_input]}]
{:tool tool_name
:input tool_input
:description (case tool_name
"Bash" (:command tool_input)
"Write" (str "create " (:file_path tool_input))
"Edit" (str "edit " (:file_path tool_input))
"Read" (str "read " (:file_path tool_input))
(pr-str tool_input))})
denials)})))
;; Unknown type
{:raw data}))
(catch Exception e
@@ -123,7 +138,7 @@
[])))
(spawn-session [_ session-id opts]
(let [{:keys [working-dir model permission-mode]} opts
(let [{:keys [working-dir model permission-mode allowed-tools]} opts
;; Build base args - only include --resume if we have a session-id
;; --verbose is required when using --print with --output-format=stream-json
args (cond-> ["claude"
@@ -133,10 +148,12 @@
"--print"]
session-id (conj "--resume" session-id)
model (conj "--model" model)
permission-mode (conj "--permission-mode" permission-mode))
permission-mode (conj "--permission-mode" permission-mode)
;; Add allowed tools for permission grants
(seq allowed-tools) (into (cons "--allowedTools" allowed-tools)))
pb (ProcessBuilder. args)]
(when working-dir
(.directory pb (io/file working-dir)))
;; Default to home directory if no working-dir specified
(.directory pb (io/file (or working-dir (System/getProperty "user.home"))))
(.redirectErrorStream pb false)
(let [process (.start pb)]
{:process process
+44 -31
View File
@@ -7,7 +7,6 @@
[ring.util.response :as response]
[spiceflow.db.protocol :as db]
[spiceflow.session.manager :as manager]
[spiceflow.adapters.claude :as claude]
[clojure.tools.logging :as log]))
(defn- json-response
@@ -59,6 +58,16 @@
(response/status (response/response nil) 204))
(error-response 404 "Session not found")))))
(defn update-session-handler
[store]
(fn [request]
(let [id (get-in request [:path-params :id])
body (:body request)]
(if (db/get-session store id)
(let [updated (db/update-session store id (select-keys body [:title]))]
(json-response updated))
(error-response 404 "Session not found")))))
(defn send-message-handler
[store broadcast-fn]
(fn [request]
@@ -73,41 +82,46 @@
(manager/stream-session-response store id
(fn [event]
(broadcast-fn id event)))
;; Always send message-stop when stream ends to ensure client clears progress indicator
(broadcast-fn id {:event :message-stop})
(catch Exception e
(log/error "Streaming error:" (.getMessage e))
(broadcast-fn id {:event :error :message (.getMessage e)}))))
(broadcast-fn id {:event :error :message (.getMessage e)})
;; Also send message-stop on error to clear progress indicator
(broadcast-fn id {:event :message-stop}))))
(json-response {:status "sent"})
(catch Exception e
(error-response 500 (.getMessage e))))
(error-response 404 "Session not found")))))
;; Discovery handlers
(defn discover-claude-handler
[_store]
(fn [_request]
(let [adapter (claude/create-adapter)
sessions (manager/discover-all-sessions)]
(json-response (->> sessions
(filter #(= :claude (:provider %)))
vec)))))
(defn discover-opencode-handler
[_store]
(fn [_request]
(let [sessions (manager/discover-all-sessions)]
(json-response (->> sessions
(filter #(= :opencode (:provider %)))
vec)))))
(defn import-session-handler
[store]
(defn permission-response-handler
"Handle permission response: accept, deny, or steer"
[store broadcast-fn]
(fn [request]
(let [body (:body request)]
(if (:external-id body)
(let [session (manager/import-session store body)]
(-> (json-response session)
(response/status 201)))
(error-response 400 "Missing external-id")))))
(let [id (get-in request [:path-params :id])
{:keys [response message]} (:body request)
response-type (keyword response)]
(if-let [session (db/get-session store id)]
(if (#{:accept :deny :steer} response-type)
(try
;; Respond to permission and get new process handle
(manager/respond-to-permission store id response-type message)
;; Stream the response in background
(future
(try
(manager/stream-session-response store id
(fn [event]
(broadcast-fn id event)))
(broadcast-fn id {:event :message-stop})
(catch Exception e
(log/error "Streaming error after permission response:" (.getMessage e))
(broadcast-fn id {:event :error :message (.getMessage e)})
(broadcast-fn id {:event :message-stop}))))
(json-response {:status "permission-response-sent"})
(catch Exception e
(error-response 400 (.getMessage e))))
(error-response 400 "Invalid response type. Must be: accept, deny, or steer"))
(error-response 404 "Session not found")))))
;; Health check
(defn health-handler
@@ -122,11 +136,10 @@
["/sessions" {:get (list-sessions-handler store)
:post (create-session-handler store)}]
["/sessions/:id" {:get (get-session-handler store)
:patch (update-session-handler store)
:delete (delete-session-handler store)}]
["/sessions/:id/send" {:post (send-message-handler store broadcast-fn)}]
["/discover/claude" {:get (discover-claude-handler store)}]
["/discover/opencode" {:get (discover-opencode-handler store)}]
["/import" {:post (import-session-handler store)}]]])
["/sessions/:id/permission" {:post (permission-response-handler store broadcast-fn)}]]])
(defn create-app
"Create the Ring application"
+55 -65
View File
@@ -1,111 +1,101 @@
(ns spiceflow.api.websocket
"WebSocket handlers for real-time updates"
(:require [jsonista.core :as json]
[ring.websocket :as ring-ws]
[clojure.tools.logging :as log])
(:import [org.eclipse.jetty.websocket.api Session WebSocketListener]
[java.util.concurrent ConcurrentHashMap]))
(:import [java.util.concurrent ConcurrentHashMap]))
(def ^:private mapper (json/object-mapper {:encode-key-fn name}))
(def ^:private mapper (json/object-mapper {:encode-key-fn name
:decode-key-fn keyword}))
;; Connected WebSocket sessions: session-id -> #{ws-sessions}
;; Connected WebSocket sessions: session-id -> #{sockets}
(defonce ^:private connections (ConcurrentHashMap.))
;; All connected WebSocket sessions for broadcast
;; All connected WebSocket sockets for broadcast
(defonce ^:private all-connections (ConcurrentHashMap/newKeySet))
(defn- send-to-ws
"Send a message to a WebSocket session"
[^Session ws-session message]
"Send a message to a WebSocket socket"
[socket message]
(try
(when (.isOpen ws-session)
(let [json-str (json/write-value-as-string message mapper)]
(.sendString (.getRemote ws-session) json-str)))
(let [json-str (json/write-value-as-string message mapper)]
(log/debug "Sending WS message:" json-str)
(ring-ws/send socket json-str))
(catch Exception e
(log/debug "Failed to send to WebSocket:" (.getMessage e)))))
(log/error "Failed to send to WebSocket:" (.getMessage e)))))
(defn broadcast-to-session
"Broadcast an event to all WebSocket connections subscribed to a session"
[session-id event]
(log/debug "Broadcasting to session:" session-id "event:" event)
(when-let [subscribers (.get connections session-id)]
(let [message (assoc event :session-id session-id)]
(doseq [ws-session subscribers]
(send-to-ws ws-session message)))))
(doseq [socket subscribers]
(send-to-ws socket message)))))
(defn broadcast-all
"Broadcast an event to all connected WebSocket sessions"
"Broadcast an event to all connected WebSocket sockets"
[event]
(doseq [ws-session all-connections]
(send-to-ws ws-session event)))
(doseq [socket all-connections]
(send-to-ws socket event)))
(defn- subscribe-to-session
"Subscribe a WebSocket session to updates for a session"
[ws-session session-id]
(.compute connections session-id
(fn [_k existing]
(let [subscribers (or existing (ConcurrentHashMap/newKeySet))]
(.add subscribers ws-session)
subscribers))))
"Subscribe a WebSocket socket to updates for a session"
[socket session-id]
(log/debug "Subscribing socket to session:" session-id)
(let [subscribers (or (.get connections session-id)
(let [new-set (ConcurrentHashMap/newKeySet)]
(.putIfAbsent connections session-id new-set)
(or (.get connections session-id) new-set)))]
(.add subscribers socket)))
(defn- unsubscribe-from-session
"Unsubscribe a WebSocket session from a session"
[ws-session session-id]
"Unsubscribe a WebSocket socket from a session"
[socket session-id]
(when-let [subscribers (.get connections session-id)]
(.remove subscribers ws-session)
(.remove subscribers socket)
(when (.isEmpty subscribers)
(.remove connections session-id))))
(defn- unsubscribe-from-all
"Unsubscribe a WebSocket session from all sessions"
[ws-session]
"Unsubscribe a WebSocket socket from all sessions"
[socket]
(doseq [[session-id _] connections]
(unsubscribe-from-session ws-session session-id)))
(unsubscribe-from-session socket session-id)))
(defn- handle-message
"Handle an incoming WebSocket message"
[ws-session message]
[socket message]
(try
(log/debug "Received WS message:" message)
(let [data (json/read-value message mapper)]
(case (:type data)
"subscribe" (do
(subscribe-to-session ws-session (:session-id data))
(send-to-ws ws-session {:type "subscribed"
:session-id (:session-id data)}))
(subscribe-to-session socket (:session-id data))
(send-to-ws socket {:type "subscribed"
:session-id (:session-id data)}))
"unsubscribe" (do
(unsubscribe-from-session ws-session (:session-id data))
(send-to-ws ws-session {:type "unsubscribed"
:session-id (:session-id data)}))
"ping" (send-to-ws ws-session {:type "pong"})
(unsubscribe-from-session socket (:session-id data))
(send-to-ws socket {:type "unsubscribed"
:session-id (:session-id data)}))
"ping" (send-to-ws socket {:type "pong"})
(log/debug "Unknown WebSocket message type:" (:type data))))
(catch Exception e
(log/warn "Failed to handle WebSocket message:" (.getMessage e)))))
(defn create-ws-listener
"Create a WebSocket listener"
[]
(let [session-atom (atom nil)]
(reify WebSocketListener
(onWebSocketConnect [_ session]
(reset! session-atom session)
(log/debug "WebSocket connected")
(.add all-connections session)
(send-to-ws session {:type "connected"}))
(onWebSocketText [_ message]
(handle-message @session-atom message))
(onWebSocketBinary [_ _payload _offset _len]
(log/debug "Binary WebSocket message ignored"))
(onWebSocketClose [_ status-code reason]
(log/debug "WebSocket closed:" status-code reason)
(when-let [session @session-atom]
(.remove all-connections session)
(unsubscribe-from-all session)))
(onWebSocketError [_ cause]
(log/warn "WebSocket error:" (.getMessage cause))))))
(defn ws-handler
"Ring handler for WebSocket upgrade"
"Ring handler for WebSocket upgrade - returns websocket listener map"
[_request]
{:ring.websocket/listener (create-ws-listener)})
{:ring.websocket/listener
{:on-open (fn [socket]
(log/debug "WebSocket connected")
(.add all-connections socket)
(send-to-ws socket {:type "connected"}))
:on-message (fn [socket message]
(handle-message socket message))
:on-close (fn [socket _status-code _reason]
(log/debug "WebSocket closed")
(.remove all-connections socket)
(unsubscribe-from-all socket))
:on-error (fn [_socket error]
(log/warn "WebSocket error:" (.getMessage error)))}})
+21 -11
View File
@@ -18,21 +18,34 @@
(sqlite/create-store db-path)))
:stop nil)
;; Atom to hold the Jetty server instance
(defonce ^:private jetty-server (atom nil))
;; HTTP Server
(defstate server
:start (let [port (get-in config/config [:server :port] 3000)
host (get-in config/config [:server :host] "0.0.0.0")
app (routes/create-app store ws/broadcast-to-session)]
api-app (routes/create-app store ws/broadcast-to-session)
;; Wrap the app to handle WebSocket upgrades on /api/ws
app (fn [request]
(if (and (jetty/ws-upgrade-request? request)
(= "/api/ws" (:uri request)))
(ws/ws-handler request)
(api-app request)))
srv (jetty/run-jetty app
{:port port
:host host
:join? false
:allow-null-path-info true})]
(log/info "Starting Spiceflow server on" (str host ":" port))
(jetty/run-jetty app
{:port port
:host host
:join? false
:websockets {"/api/ws" ws/ws-handler}}))
(reset! jetty-server srv)
srv)
:stop (do
(log/info "Stopping Spiceflow server...")
(manager/cleanup-all store)
(.stop server)))
(when-let [srv @jetty-server]
(.stop srv)
(reset! jetty-server nil))))
(defn -main
"Main entry point"
@@ -50,7 +63,4 @@
;; Test database
(require '[spiceflow.db.protocol :as db])
(db/get-sessions store)
;; Test discovery
(manager/discover-all-sessions))
(db/get-sessions store))
+84 -18
View File
@@ -10,6 +10,9 @@
;; Active process handles for running sessions
(defonce ^:private active-processes (ConcurrentHashMap.))
;; Pending permission requests: session-id -> {:tools [...] :denials [...]}
(defonce ^:private pending-permissions (ConcurrentHashMap.))
(defn get-adapter
"Get the appropriate adapter for a provider"
[provider]
@@ -18,18 +21,6 @@
:opencode (opencode/create-adapter)
(throw (ex-info "Unknown provider" {:provider provider}))))
(defn discover-all-sessions
"Discover sessions from all configured providers"
[]
(let [claude-sessions (adapter/discover-sessions (claude/create-adapter))
opencode-sessions (adapter/discover-sessions (opencode/create-adapter))]
(concat claude-sessions opencode-sessions)))
(defn import-session
"Import a discovered session into the database"
[store session]
(db/save-session store session))
(defn get-active-process
"Get the active process handle for a session"
[session-id]
@@ -83,6 +74,23 @@
;; Send to CLI
(adapter/send-message adapter handle message)))
;; Permission handling - defined before stream-session-response which uses them
(defn set-pending-permission
"Store a pending permission request for a session"
[session-id permission-request]
(.put pending-permissions session-id permission-request))
(defn get-pending-permission
"Get pending permission request for a session"
[session-id]
(.get pending-permissions session-id))
(defn clear-pending-permission
"Clear pending permission for a session"
[session-id]
(.remove pending-permissions session-id))
(defn stream-session-response
"Stream response from a running session, calling callback for each event"
[store session-id callback]
@@ -101,21 +109,44 @@
;; Accumulate text content
(when-let [text (:text event)]
(.append content-buffer text))
;; Capture external session-id from init or result event (for new sessions)
(when (and (contains? #{:init :result} (:event event))
;; Capture external session-id and cwd from init event (for new sessions)
(when (= :init (:event event))
(when (and (:session-id event)
(not (:external-id session)))
(log/debug "Capturing external session-id:" (:session-id event))
(db/update-session store session-id {:external-id (:session-id event)}))
;; Also capture working directory from cwd
(when (and (:cwd event)
(or (nil? (:working-dir session))
(empty? (:working-dir session))))
(log/debug "Capturing working directory:" (:cwd event))
(db/update-session store session-id {:working-dir (:cwd event)})))
;; Also capture external-id from result event if not already set
(when (and (= :result (:event event))
(:session-id event)
(not (:external-id session)))
(log/debug "Capturing external session-id:" (:session-id event))
(log/debug "Capturing external session-id from result:" (:session-id event))
(db/update-session store session-id {:external-id (:session-id event)}))
;; On result event, save the accumulated message
;; On result event, check for permission requests
(when (= :result (:event event))
(let [content (.toString content-buffer)]
;; Save accumulated message if any
(when (seq content)
(db/save-message store {:session-id session-id
:role :assistant
:content content}))))))
:content content}))
;; If there's a permission request, store it and emit event
(when-let [perm-req (:permission-request event)]
(log/info "Permission request detected:" perm-req)
(set-pending-permission session-id perm-req)
(callback {:event :permission-request
:permission-request perm-req}))))))
;; Update session status when stream ends
(db/update-session store session-id {:status :idle})
;; If there's a pending permission, set status to awaiting-permission
(let [new-status (if (get-pending-permission session-id)
:awaiting-permission
:idle)]
(db/update-session store session-id {:status new-status}))
(.remove active-processes session-id)))
(defn cleanup-all
@@ -126,3 +157,38 @@
(stop-session store session-id)
(catch Exception e
(log/warn "Failed to stop session:" session-id (.getMessage e))))))
(defn respond-to-permission
"Respond to a permission request.
response-type: :accept, :deny, or :steer
message: optional message for :deny or :steer responses"
[store session-id response-type message]
(let [session (db/get-session store session-id)
_ (when-not session
(throw (ex-info "Session not found" {:session-id session-id})))
pending (get-pending-permission session-id)
_ (when-not pending
(throw (ex-info "No pending permission request" {:session-id session-id})))
adapter (get-adapter (:provider session))
;; Build spawn options based on response type
opts (cond-> {:working-dir (:working-dir session)}
;; For :accept, grant the requested tools
(= response-type :accept)
(assoc :allowed-tools (:tools pending)))
;; Determine the message to send
send-msg (case response-type
:accept "continue"
:deny "Permission denied. Find another approach without using that tool."
:steer message)
;; Spawn new process with --resume and possibly --allowedTools
handle (adapter/spawn-session adapter
(:external-id session)
opts)]
;; Clear pending permission
(clear-pending-permission session-id)
;; Store new process handle
(.put active-processes session-id handle)
(db/update-session store session-id {:status :running})
;; Send the response message
(adapter/send-message adapter handle send-msg)
handle))