add tmux sessions
This commit is contained in:
+6
-1
@@ -23,7 +23,12 @@
|
||||
aero/aero {:mvn/version "1.1.6"}
|
||||
mount/mount {:mvn/version "0.1.18"}
|
||||
org.clojure/tools.logging {:mvn/version "1.2.4"}
|
||||
ch.qos.logback/logback-classic {:mvn/version "1.4.11"}}
|
||||
ch.qos.logback/logback-classic {:mvn/version "1.4.11"}
|
||||
|
||||
;; Web Push notifications
|
||||
buddy/buddy-core {:mvn/version "1.11.423"}
|
||||
buddy/buddy-sign {:mvn/version "3.5.351"}
|
||||
clj-http/clj-http {:mvn/version "3.12.3"}}
|
||||
|
||||
:aliases
|
||||
{:run {:main-opts ["-m" "spiceflow.core"]}
|
||||
|
||||
@@ -102,6 +102,12 @@
|
||||
"Write" (str "create " (:file_path tool_input))
|
||||
"Edit" (str "edit " (:file_path tool_input))
|
||||
"Read" (str "read " (:file_path tool_input))
|
||||
"NotebookEdit" (str (:edit_mode tool_input "edit") " cell in " (:notebook_path tool_input))
|
||||
"WebFetch" (str "fetch " (:url tool_input))
|
||||
"WebSearch" (str "search: " (:query tool_input))
|
||||
"Skill" (str "run /" (:skill tool_input) (when-let [args (:args tool_input)] (str " " args)))
|
||||
"ExitPlanMode" "exit plan mode"
|
||||
"Task" (str "spawn " (:subagent_type tool_input) " agent")
|
||||
(pr-str tool_input))})
|
||||
denials)})))
|
||||
;; Unknown type
|
||||
@@ -167,6 +173,7 @@
|
||||
(let [json-msg (json/write-value-as-string {:type "user"
|
||||
:message {:role "user"
|
||||
:content message}})]
|
||||
(log/debug "[Claude stdin]" json-msg)
|
||||
(.write stdin json-msg)
|
||||
(.newLine stdin)
|
||||
(.flush stdin)
|
||||
@@ -177,17 +184,21 @@
|
||||
|
||||
(read-stream [this {:keys [stdout]} callback]
|
||||
(try
|
||||
(log/debug "[Claude] Starting stdout read loop")
|
||||
(loop []
|
||||
(when-let [line (.readLine stdout)]
|
||||
(log/debug "[Claude stdout]" line)
|
||||
(let [parsed (proto/parse-output this line)]
|
||||
(when parsed
|
||||
(log/debug "[Claude parsed]" (:event parsed))
|
||||
(callback parsed))
|
||||
;; Stop reading after result event - response is complete
|
||||
(if (= :result (:event parsed))
|
||||
(log/debug "Received result, stopping stream read")
|
||||
(log/debug "[Claude] Received result, stopping stream read")
|
||||
(recur)))))
|
||||
(log/debug "[Claude] stdout read loop ended")
|
||||
(catch Exception e
|
||||
(log/debug "Stream ended:" (.getMessage e)))))
|
||||
(log/debug "[Claude] Stream ended:" (.getMessage e)))))
|
||||
|
||||
(kill-process [_ {:keys [process]}]
|
||||
(when process
|
||||
|
||||
@@ -157,7 +157,7 @@
|
||||
;; Wrap with script -qc to create a pseudo-terminal
|
||||
;; This forces Go to flush stdout properly (Go binaries ignore stdbuf)
|
||||
args ["script" "-qc" opencode-cmd "/dev/null"]
|
||||
_ (log/info "Starting OpenCode with args:" args)
|
||||
_ (log/debug "[OpenCode] Starting with args:" args)
|
||||
pb (ProcessBuilder. (vec args))]
|
||||
;; Set working directory
|
||||
(when working-dir
|
||||
@@ -169,6 +169,7 @@
|
||||
(let [process (.start pb)
|
||||
stdout (BufferedReader. (InputStreamReader. (.getInputStream process) StandardCharsets/UTF_8))
|
||||
stderr (BufferedReader. (InputStreamReader. (.getErrorStream process) StandardCharsets/UTF_8))]
|
||||
(log/debug "[OpenCode] Process started successfully")
|
||||
;; Update the handle with the running process
|
||||
;; Note: We're mutating the handle here by storing process info
|
||||
;; The caller should use the returned handle
|
||||
@@ -181,7 +182,7 @@
|
||||
nil)))
|
||||
|
||||
(read-stream [this {:keys [stdout stderr process]} callback]
|
||||
(log/info "read-stream starting, stdout:" (boolean stdout) "process:" (boolean process) "process-alive:" (when process (.isAlive process)))
|
||||
(log/debug "[OpenCode] read-stream starting, stdout:" (boolean stdout) "process:" (boolean process) "process-alive:" (when process (.isAlive process)))
|
||||
(try
|
||||
;; Start a thread to log stderr
|
||||
(when stderr
|
||||
@@ -189,20 +190,19 @@
|
||||
(try
|
||||
(loop []
|
||||
(when-let [line (.readLine stderr)]
|
||||
(log/info "[OpenCode stderr]" line)
|
||||
(log/debug "[OpenCode stderr]" line)
|
||||
(recur)))
|
||||
(catch Exception e
|
||||
(log/info "Stderr stream ended:" (.getMessage e))))))
|
||||
(log/debug "[OpenCode] Stderr stream ended:" (.getMessage e))))))
|
||||
|
||||
;; Read stdout for JSON events
|
||||
(log/info "Starting stdout read loop")
|
||||
(log/debug "[OpenCode] Starting stdout read loop")
|
||||
(loop []
|
||||
(log/debug "Waiting for line from stdout...")
|
||||
(when-let [line (.readLine stdout)]
|
||||
(log/info "[OpenCode stdout]" line)
|
||||
(log/debug "[OpenCode stdout]" line)
|
||||
(let [parsed (proto/parse-output this line)]
|
||||
(when parsed
|
||||
(log/info "Parsed event:" (:event parsed))
|
||||
(log/debug "[OpenCode parsed]" (:event parsed))
|
||||
(callback parsed))
|
||||
;; Continue reading unless we hit a terminal event
|
||||
;; Note: step_finish with reason "tool-calls" is NOT terminal - OpenCode
|
||||
@@ -211,15 +211,15 @@
|
||||
(if (or (= :error (:event parsed))
|
||||
(and (= :result (:event parsed))
|
||||
(not= "tool-calls" (:stop-reason parsed))))
|
||||
(log/info "Received terminal event, stopping stream read. stop-reason:" (:stop-reason parsed))
|
||||
(log/debug "[OpenCode] Received terminal event, stopping stream read. stop-reason:" (:stop-reason parsed))
|
||||
(recur)))))
|
||||
(log/info "stdout read loop ended (nil line)")
|
||||
(log/debug "[OpenCode] stdout read loop ended")
|
||||
|
||||
;; Wait for process to complete
|
||||
(when process
|
||||
(log/info "Waiting for process to complete")
|
||||
(log/debug "[OpenCode] Waiting for process to complete")
|
||||
(.waitFor process)
|
||||
(log/info "Process completed with exit code:" (.exitValue process)))
|
||||
(log/debug "[OpenCode] Process completed with exit code:" (.exitValue process)))
|
||||
|
||||
(catch Exception e
|
||||
(log/error "Stream error:" (.getMessage e) (class e)))))
|
||||
|
||||
@@ -0,0 +1,319 @@
|
||||
(ns spiceflow.adapters.tmux
|
||||
"Adapter for tmux terminal sessions.
|
||||
|
||||
Provides shell access without an LLM. Users can interact with tmux sessions
|
||||
as if they were chat sessions - commands are sent to the shell and output
|
||||
is streamed back as messages."
|
||||
(:require [spiceflow.adapters.protocol :as proto]
|
||||
[clojure.java.io :as io]
|
||||
[clojure.java.shell :as shell]
|
||||
[clojure.string :as str]
|
||||
[clojure.tools.logging :as log])
|
||||
(:import [java.io BufferedReader FileReader File RandomAccessFile]
|
||||
[java.util UUID]))
|
||||
|
||||
(def ^:private session-prefix "spiceflow-")
|
||||
(def ^:private output-dir "/tmp")
|
||||
(def ^:private end-marker-prefix "---SPICEFLOW-END-")
|
||||
|
||||
;; Word lists for generating readable random session names (like Docker containers)
|
||||
(def ^:private adjectives
|
||||
["brave" "calm" "clever" "cool" "eager" "fancy" "happy" "jolly" "kind"
|
||||
"lively" "merry" "neat" "nice" "proud" "quick" "sharp" "swift" "warm"
|
||||
"wise" "bold" "bright" "fair" "keen" "mild" "pure" "rare" "safe" "sure"])
|
||||
|
||||
(def ^:private nouns
|
||||
["fox" "owl" "bee" "elk" "ant" "bat" "cat" "dog" "eel" "hen" "jay"
|
||||
"koi" "ram" "yak" "ape" "cod" "cow" "doe" "gnu" "hog" "orb" "oak"
|
||||
"pine" "fern" "moss" "leaf" "rose" "wave" "star" "moon" "sun" "wind"])
|
||||
|
||||
(defn- generate-random-name
|
||||
"Generate a random adjective-noun name for a tmux session"
|
||||
[]
|
||||
(let [adj (rand-nth adjectives)
|
||||
noun (rand-nth nouns)
|
||||
suffix (format "%04d" (rand-int 10000))]
|
||||
(str adj "-" noun "-" suffix)))
|
||||
|
||||
;; Pattern to match ANSI escape sequences
|
||||
(def ^:private ansi-pattern #"\u001b\[[0-9;?]*[a-zA-Z]|\u001b\].*?\u0007")
|
||||
|
||||
(defn- strip-ansi
|
||||
"Remove ANSI escape sequences from a string"
|
||||
[s]
|
||||
(str/replace s ansi-pattern ""))
|
||||
|
||||
(defn- tmux-session-name
|
||||
"Generate tmux session name from spiceflow session id"
|
||||
[session-id]
|
||||
(str session-prefix session-id))
|
||||
|
||||
(defn- output-file-path
|
||||
"Get path to output file for a session"
|
||||
[session-name]
|
||||
(str output-dir "/spiceflow-tmux-" session-name ".log"))
|
||||
|
||||
(defn- run-tmux
|
||||
"Run a tmux command and return result"
|
||||
[& args]
|
||||
(let [result (apply shell/sh "tmux" args)]
|
||||
(when (zero? (:exit result))
|
||||
(str/trim (:out result)))))
|
||||
|
||||
(defn- tmux-session-exists?
|
||||
"Check if a tmux session exists"
|
||||
[session-name]
|
||||
(some-> (run-tmux "has-session" "-t" session-name)
|
||||
(= "")))
|
||||
|
||||
(defn- parse-session-list
|
||||
"Parse output from 'tmux list-sessions'"
|
||||
[output]
|
||||
(when output
|
||||
(->> (str/split-lines output)
|
||||
(filter #(str/starts-with? % session-prefix))
|
||||
(map (fn [line]
|
||||
;; Format: "session_name:path"
|
||||
(let [[name path] (str/split line #":" 2)]
|
||||
{:external-id name
|
||||
:provider :tmux
|
||||
:title name
|
||||
:working-dir (or path (System/getProperty "user.home"))}))))))
|
||||
|
||||
(defn- is-prompt-line?
|
||||
"Check if a line looks like a shell prompt (ends with $ or #)"
|
||||
[line]
|
||||
(boolean (re-find #"[$#]\s*$" (str/trim line))))
|
||||
|
||||
(defn- is-echo-command?
|
||||
"Check if a line is our echo command for the end marker"
|
||||
[line]
|
||||
(str/includes? line (str "echo '" end-marker-prefix)))
|
||||
|
||||
(defn- tail-file
|
||||
"Tail a file for new content, calling callback with each new line.
|
||||
Stops when end-marker is seen. Returns the marker UUID if found."
|
||||
[file-path end-marker callback timeout-ms original-cmd]
|
||||
(let [file (File. ^String file-path)
|
||||
start-time (System/currentTimeMillis)
|
||||
marker-found (atom nil)
|
||||
seen-first-line (atom false)]
|
||||
;; Wait for file to exist
|
||||
(while (and (not (.exists file))
|
||||
(< (- (System/currentTimeMillis) start-time) timeout-ms))
|
||||
(Thread/sleep 50))
|
||||
(when (.exists file)
|
||||
(with-open [raf (RandomAccessFile. file "r")]
|
||||
(loop [last-pos 0]
|
||||
(let [current-len (.length raf)]
|
||||
(when (> current-len last-pos)
|
||||
(.seek raf last-pos)
|
||||
(let [new-bytes (byte-array (- current-len last-pos))]
|
||||
(.readFully raf new-bytes)
|
||||
(let [new-content (String. new-bytes "UTF-8")
|
||||
lines (str/split-lines new-content)]
|
||||
(doseq [line lines]
|
||||
(let [clean-line (strip-ansi line)]
|
||||
(cond
|
||||
;; Found end marker - extract UUID and stop
|
||||
(str/includes? clean-line end-marker-prefix)
|
||||
(let [marker-match (re-find (re-pattern (str end-marker-prefix "([a-f0-9-]+)---")) clean-line)]
|
||||
(when marker-match
|
||||
(reset! marker-found (second marker-match))))
|
||||
|
||||
;; Skip blank lines, prompt lines, echo command, and echoed user command
|
||||
(or (str/blank? clean-line)
|
||||
(is-prompt-line? clean-line)
|
||||
(is-echo-command? clean-line)
|
||||
;; Skip the first line if it's the echoed command
|
||||
(and (not @seen-first-line)
|
||||
original-cmd
|
||||
(str/includes? clean-line original-cmd)))
|
||||
(reset! seen-first-line true)
|
||||
|
||||
;; Regular output line
|
||||
:else
|
||||
(do
|
||||
(reset! seen-first-line true)
|
||||
(callback {:event :content-delta :text (str clean-line "\n")}))))))))
|
||||
;; Check if we should stop
|
||||
(if @marker-found
|
||||
@marker-found
|
||||
(if (< (- (System/currentTimeMillis) start-time) timeout-ms)
|
||||
(do
|
||||
(Thread/sleep 50)
|
||||
(recur (.length raf)))
|
||||
nil))))))))
|
||||
|
||||
(defrecord TmuxAdapter []
|
||||
proto/AgentAdapter
|
||||
|
||||
(provider-name [_] :tmux)
|
||||
|
||||
(discover-sessions [_]
|
||||
;; List only spiceflow-managed tmux sessions
|
||||
(if-let [output (run-tmux "list-sessions" "-F" "#{session_name}:#{pane_current_path}")]
|
||||
(vec (parse-session-list output))
|
||||
[]))
|
||||
|
||||
(spawn-session [_ session-id opts]
|
||||
(let [{:keys [working-dir]} opts
|
||||
;; If session-id is already a spiceflow- prefixed name, use it directly
|
||||
;; Otherwise, create a new session with a random readable name
|
||||
session-name (if (and session-id (str/starts-with? session-id session-prefix))
|
||||
session-id
|
||||
(tmux-session-name (or session-id (generate-random-name))))
|
||||
output-file (output-file-path session-name)
|
||||
work-dir (or working-dir (System/getProperty "user.home"))]
|
||||
;; Check if session already exists
|
||||
(if (run-tmux "has-session" "-t" session-name)
|
||||
;; Session exists, just set up pipe-pane
|
||||
(do
|
||||
(log/debug "[Tmux] Reconnecting to existing session:" session-name)
|
||||
;; Clear and restart pipe-pane
|
||||
(run-tmux "pipe-pane" "-t" session-name)
|
||||
(spit output-file "") ;; Clear the log file
|
||||
(run-tmux "pipe-pane" "-t" session-name (str "cat >> " output-file)))
|
||||
;; Create new session
|
||||
(do
|
||||
(log/debug "[Tmux] Creating new session:" session-name "in" work-dir)
|
||||
;; Create output file
|
||||
(spit output-file "")
|
||||
;; Create tmux session in detached mode
|
||||
(let [result (shell/sh "tmux" "new-session" "-d" "-s" session-name "-c" work-dir)]
|
||||
(when-not (zero? (:exit result))
|
||||
(throw (ex-info "Failed to create tmux session" {:error (:err result)}))))
|
||||
;; Verify the session was created with the expected name
|
||||
(when-not (run-tmux "has-session" "-t" session-name)
|
||||
(throw (ex-info "Tmux session created but not found with expected name"
|
||||
{:expected-name session-name})))
|
||||
;; Double-check by querying the actual session name from tmux
|
||||
(let [actual-name (run-tmux "display-message" "-t" session-name "-p" "#{session_name}")]
|
||||
(when (and actual-name (not= actual-name session-name))
|
||||
(log/warn "[Tmux] Session name mismatch! Expected:" session-name "Actual:" actual-name)
|
||||
(throw (ex-info "Tmux session name mismatch"
|
||||
{:expected-name session-name :actual-name actual-name}))))
|
||||
(log/debug "[Tmux] Verified session exists with correct name:" session-name)
|
||||
;; Set up pipe-pane to capture output
|
||||
(run-tmux "pipe-pane" "-t" session-name (str "cat >> " output-file))))
|
||||
;; Return handle
|
||||
{:session-name session-name
|
||||
:output-file output-file
|
||||
:working-dir work-dir
|
||||
:process nil}))
|
||||
|
||||
(send-message [_ handle message]
|
||||
(let [{:keys [session-name output-file]} handle
|
||||
marker-id (str (UUID/randomUUID))
|
||||
end-marker (str end-marker-prefix marker-id "---")]
|
||||
(log/debug "[Tmux] Sending command to" session-name ":" message)
|
||||
;; Clear output file before sending command to avoid reading stale output
|
||||
(spit output-file "")
|
||||
;; Send the user's command
|
||||
(run-tmux "send-keys" "-t" session-name message "Enter")
|
||||
;; Send end marker after a tiny delay to let command start
|
||||
(Thread/sleep 100)
|
||||
(run-tmux "send-keys" "-t" session-name (str "echo '" end-marker "'") "Enter")
|
||||
;; Return handle with marker and original command for read-stream to use
|
||||
(assoc handle
|
||||
:end-marker end-marker
|
||||
:marker-id marker-id
|
||||
:original-cmd message)))
|
||||
|
||||
(read-stream [_ handle callback]
|
||||
(let [{:keys [output-file end-marker marker-id original-cmd]} handle
|
||||
timeout-ms 300000] ;; 5 minute timeout
|
||||
(log/debug "[Tmux] Starting to read output from" output-file "waiting for marker:" marker-id)
|
||||
(when end-marker
|
||||
(let [found-marker (tail-file output-file end-marker callback timeout-ms original-cmd)]
|
||||
(log/debug "[Tmux] Read complete, found marker:" found-marker)
|
||||
;; Emit message-stop event
|
||||
(callback {:event :message-stop})))))
|
||||
|
||||
(kill-process [_ handle]
|
||||
(let [{:keys [session-name output-file]} handle]
|
||||
(log/debug "[Tmux] Killing session:" session-name)
|
||||
;; Kill the tmux session
|
||||
(run-tmux "kill-session" "-t" session-name)
|
||||
;; Clean up output file
|
||||
(when output-file
|
||||
(let [f (File. ^String output-file)]
|
||||
(when (.exists f)
|
||||
(.delete f))))))
|
||||
|
||||
(parse-output [_ line]
|
||||
;; For tmux, we parse in read-stream directly
|
||||
;; This is here for protocol compliance
|
||||
(when (and line (not (str/blank? line)))
|
||||
{:event :content-delta :text line})))
|
||||
|
||||
(defn create-adapter
|
||||
"Create a tmux adapter"
|
||||
[]
|
||||
(->TmuxAdapter))
|
||||
|
||||
(defn capture-pane
|
||||
"Capture the current content of a tmux pane.
|
||||
Returns the visible terminal content as a string, or nil if session doesn't exist."
|
||||
[session-name]
|
||||
(when session-name
|
||||
;; Use capture-pane with -p to print to stdout, -e to include escape sequences (then strip them)
|
||||
;; -S - and -E - captures the entire scrollback history
|
||||
(let [result (shell/sh "tmux" "capture-pane" "-t" session-name "-p" "-S" "-1000")]
|
||||
(when (zero? (:exit result))
|
||||
(strip-ansi (:out result))))))
|
||||
|
||||
(defn get-session-name
|
||||
"Get the tmux session name for a spiceflow session.
|
||||
Checks if the external-id is already a session name or needs the prefix."
|
||||
[external-id]
|
||||
(when external-id
|
||||
(if (str/starts-with? external-id session-prefix)
|
||||
external-id
|
||||
(tmux-session-name external-id))))
|
||||
|
||||
(defn session-alive?
|
||||
"Check if a tmux session is still alive"
|
||||
[session-name]
|
||||
(when session-name
|
||||
(let [result (shell/sh "tmux" "has-session" "-t" session-name)]
|
||||
(zero? (:exit result)))))
|
||||
|
||||
(defn get-actual-session-name
|
||||
"Get the actual session name from tmux for a given session.
|
||||
This queries tmux directly to ensure the name matches what's on the device."
|
||||
[session-name]
|
||||
(when session-name
|
||||
(let [result (shell/sh "tmux" "display-message" "-t" session-name "-p" "#{session_name}")]
|
||||
(when (zero? (:exit result))
|
||||
(str/trim (:out result))))))
|
||||
|
||||
(defn send-keys-raw
|
||||
"Send raw input to a tmux session without waiting for output.
|
||||
Used for stdin-style input to running processes."
|
||||
[session-name input]
|
||||
(when session-name
|
||||
(log/debug "[Tmux] send-keys-raw to" session-name "input:" (pr-str input))
|
||||
(cond
|
||||
;; Carriage return should be sent as Enter key name
|
||||
(= input "\r")
|
||||
(run-tmux "send-keys" "-t" session-name "Enter")
|
||||
;; Literal newline character
|
||||
(= input "\n")
|
||||
(run-tmux "send-keys" "-t" session-name "-l" "\n")
|
||||
;; Control characters (like Ctrl+C = \u0003) - send without -l
|
||||
(and (= 1 (count input))
|
||||
(< (int (first input)) 32))
|
||||
(run-tmux "send-keys" "-t" session-name input)
|
||||
;; Regular text - send literal
|
||||
:else
|
||||
(run-tmux "send-keys" "-t" session-name "-l" input))
|
||||
true))
|
||||
|
||||
(defn rename-session
|
||||
"Rename a tmux session. Returns the new session name on success, nil on failure."
|
||||
[old-name new-name]
|
||||
(when (and old-name new-name)
|
||||
(let [result (shell/sh "tmux" "rename-session" "-t" old-name new-name)]
|
||||
(when (zero? (:exit result))
|
||||
new-name))))
|
||||
@@ -7,6 +7,9 @@
|
||||
[ring.util.response :as response]
|
||||
[spiceflow.db.protocol :as db]
|
||||
[spiceflow.session.manager :as manager]
|
||||
[spiceflow.adapters.protocol :as adapter]
|
||||
[spiceflow.adapters.tmux :as tmux]
|
||||
[spiceflow.push.protocol :as push-proto]
|
||||
[clojure.tools.logging :as log]))
|
||||
|
||||
(defn- json-response
|
||||
@@ -23,42 +26,101 @@
|
||||
(response/content-type "application/json")))
|
||||
|
||||
;; Session handlers
|
||||
(defn- now-iso
|
||||
"Get current time as ISO string for JSON serialization"
|
||||
[]
|
||||
(str (java.time.Instant/now)))
|
||||
|
||||
(defn list-sessions-handler
|
||||
"List all sessions - merges DB sessions with live tmux sessions"
|
||||
[store]
|
||||
(fn [_request]
|
||||
(json-response (db/get-sessions store))))
|
||||
;; Get DB sessions (claude, opencode)
|
||||
(let [db-sessions (db/get-sessions store)
|
||||
;; Get live tmux sessions
|
||||
tmux-adapter (manager/get-adapter :tmux)
|
||||
tmux-sessions (adapter/discover-sessions tmux-adapter)
|
||||
;; Convert tmux sessions to session format with id = external-id
|
||||
tmux-formatted (map (fn [s]
|
||||
{:id (:external-id s)
|
||||
:provider "tmux"
|
||||
:title (:title s)
|
||||
:working-dir (:working-dir s)
|
||||
:created-at (now-iso)
|
||||
:updated-at (now-iso)})
|
||||
tmux-sessions)]
|
||||
(json-response (concat db-sessions tmux-formatted)))))
|
||||
|
||||
(defn- tmux-session-id?
|
||||
"Check if a session ID is a tmux session (starts with spiceflow-)"
|
||||
[id]
|
||||
(and id (clojure.string/starts-with? id "spiceflow-")))
|
||||
|
||||
(defn get-session-handler
|
||||
[store]
|
||||
(fn [request]
|
||||
(let [id (get-in request [:path-params :id])]
|
||||
(if-let [session (db/get-session store id)]
|
||||
(let [messages (db/get-messages store id)]
|
||||
(json-response (assoc session :messages messages)))
|
||||
(error-response 404 "Session not found")))))
|
||||
(if (tmux-session-id? id)
|
||||
;; Tmux session - check if it's alive
|
||||
(if (tmux/session-alive? id)
|
||||
(json-response {:id id
|
||||
:provider "tmux"
|
||||
:title id
|
||||
:messages []})
|
||||
(error-response 404 "Session not found"))
|
||||
;; Regular DB session
|
||||
(if-let [session (db/get-session store id)]
|
||||
(let [messages (db/get-messages store id)]
|
||||
(json-response (assoc session :messages messages)))
|
||||
(error-response 404 "Session not found"))))))
|
||||
|
||||
(defn create-session-handler
|
||||
[store]
|
||||
(fn [request]
|
||||
(let [body (:body request)]
|
||||
(let [body (:body request)
|
||||
provider (keyword (:provider body))]
|
||||
(log/debug "API request: create-session" {:body body})
|
||||
(if (db/valid-session? body)
|
||||
(let [session (db/save-session store body)]
|
||||
(-> (json-response session)
|
||||
(if (= :tmux provider)
|
||||
;; Tmux session - spawn directly without DB persistence
|
||||
(let [tmux-adapter (manager/get-adapter :tmux)
|
||||
handle (adapter/spawn-session tmux-adapter nil {})
|
||||
session-name (:session-name handle)]
|
||||
(-> (json-response {:id session-name
|
||||
:provider "tmux"
|
||||
:title session-name
|
||||
:created-at (now-iso)
|
||||
:updated-at (now-iso)})
|
||||
(response/status 201)))
|
||||
(error-response 400 "Invalid session data")))))
|
||||
;; Regular session - save to DB
|
||||
(if (db/valid-session? body)
|
||||
(let [session (db/save-session store body)]
|
||||
(-> (json-response session)
|
||||
(response/status 201)))
|
||||
(error-response 400 "Invalid session data"))))))
|
||||
|
||||
(defn delete-session-handler
|
||||
[store]
|
||||
(fn [request]
|
||||
(let [id (get-in request [:path-params :id])]
|
||||
(log/debug "API request: delete-session" {:session-id id})
|
||||
(if (db/get-session store id)
|
||||
(do
|
||||
(manager/stop-session store id)
|
||||
(db/delete-session store id)
|
||||
(response/status (response/response nil) 204))
|
||||
(error-response 404 "Session not found")))))
|
||||
(if (tmux-session-id? id)
|
||||
;; Tmux session - just kill the tmux session
|
||||
(if (tmux/session-alive? id)
|
||||
(do
|
||||
(log/debug "Killing tmux session:" id)
|
||||
(let [tmux-adapter (manager/get-adapter :tmux)]
|
||||
(adapter/kill-process tmux-adapter {:session-name id
|
||||
:output-file (str "/tmp/spiceflow-tmux-" id ".log")}))
|
||||
(response/status (response/response nil) 204))
|
||||
(error-response 404 "Session not found"))
|
||||
;; Regular DB session
|
||||
(if-let [session (db/get-session store id)]
|
||||
(do
|
||||
;; Stop any active process
|
||||
(manager/stop-session store id)
|
||||
(db/delete-session store id)
|
||||
(response/status (response/response nil) 204))
|
||||
(error-response 404 "Session not found"))))))
|
||||
|
||||
(defn update-session-handler
|
||||
[store]
|
||||
@@ -66,17 +128,37 @@
|
||||
(let [id (get-in request [:path-params :id])
|
||||
body (:body request)]
|
||||
(log/debug "API request: update-session" {:session-id id :body body})
|
||||
(if (db/get-session store id)
|
||||
(let [updated (db/update-session store id (select-keys body [:title :auto-accept-edits]))]
|
||||
(json-response updated))
|
||||
(error-response 404 "Session not found")))))
|
||||
(if (tmux-session-id? id)
|
||||
;; Tmux session - rename via tmux CLI, no DB changes
|
||||
(if (tmux/session-alive? id)
|
||||
(if-let [new-title (:title body)]
|
||||
;; Ensure new name has spiceflow- prefix so it's still discoverable
|
||||
(let [new-name (if (clojure.string/starts-with? new-title "spiceflow-")
|
||||
new-title
|
||||
(str "spiceflow-" new-title))]
|
||||
(if (tmux/rename-session id new-name)
|
||||
(json-response {:id new-name
|
||||
:provider "tmux"
|
||||
:title new-name})
|
||||
(error-response 500 "Failed to rename tmux session")))
|
||||
;; No title change, just return current info
|
||||
(json-response {:id id
|
||||
:provider "tmux"
|
||||
:title id}))
|
||||
(error-response 404 "Session not found"))
|
||||
;; Regular DB session
|
||||
(if (db/get-session store id)
|
||||
(let [updated (db/update-session store id (select-keys body [:title :auto-accept-edits]))]
|
||||
(json-response updated))
|
||||
(error-response 404 "Session not found"))))))
|
||||
|
||||
(defn send-message-handler
|
||||
[store broadcast-fn]
|
||||
(fn [request]
|
||||
(let [id (get-in request [:path-params :id])
|
||||
message (get-in request [:body :message])]
|
||||
(log/debug "API request: send-message" {:session-id id :message message})
|
||||
(log/debug "API request: send-message" {:session-id id})
|
||||
(log/debug "API request message content:" message)
|
||||
(if-let [session (db/get-session store id)]
|
||||
(try
|
||||
;; Send message and start streaming in a separate thread
|
||||
@@ -128,14 +210,89 @@
|
||||
(error-response 400 "Invalid response type. Must be: accept, deny, or steer"))
|
||||
(error-response 404 "Session not found")))))
|
||||
|
||||
;; Tmux terminal handlers
|
||||
(defn terminal-capture-handler
|
||||
"Get the current terminal content for a tmux session.
|
||||
For ephemeral tmux sessions, the session ID IS the tmux session name."
|
||||
[_store]
|
||||
(fn [request]
|
||||
(let [id (get-in request [:path-params :id])]
|
||||
(if (tmux-session-id? id)
|
||||
;; Ephemeral tmux session - ID is the session name
|
||||
(if (tmux/session-alive? id)
|
||||
(let [content (tmux/capture-pane id)]
|
||||
(json-response {:content (or content "")
|
||||
:alive true
|
||||
:session-name id}))
|
||||
(error-response 404 "Session not found"))
|
||||
(error-response 400 "Not a tmux session")))))
|
||||
|
||||
(defn terminal-input-handler
|
||||
"Send raw input to a tmux session (stdin-style)"
|
||||
[_store broadcast-fn]
|
||||
(fn [request]
|
||||
(let [id (get-in request [:path-params :id])
|
||||
input (get-in request [:body :input])]
|
||||
(if (tmux-session-id? id)
|
||||
;; Ephemeral tmux session - ID is the session name
|
||||
(if (tmux/session-alive? id)
|
||||
(do
|
||||
(tmux/send-keys-raw id input)
|
||||
;; Broadcast terminal update after input
|
||||
(future
|
||||
(Thread/sleep 100) ;; Small delay to let terminal update
|
||||
(let [content (tmux/capture-pane id)]
|
||||
(broadcast-fn id {:event :terminal-update
|
||||
:content (or content "")})))
|
||||
(json-response {:status "sent"}))
|
||||
(error-response 400 "Tmux session not alive"))
|
||||
(error-response 400 "Not a tmux session")))))
|
||||
|
||||
;; Health check
|
||||
(defn health-handler
|
||||
[_request]
|
||||
(json-response {:status "ok" :service "spiceflow"}))
|
||||
|
||||
;; Push notification handlers
|
||||
(defn vapid-key-handler
|
||||
"Return the public VAPID key for push subscriptions"
|
||||
[push-store]
|
||||
(fn [_request]
|
||||
(if-let [vapid-keys (push-proto/get-vapid-keys push-store)]
|
||||
(json-response {:publicKey (:public-key vapid-keys)})
|
||||
(error-response 500 "VAPID keys not configured"))))
|
||||
|
||||
(defn subscribe-handler
|
||||
"Save a push subscription"
|
||||
[push-store]
|
||||
(fn [request]
|
||||
(let [body (:body request)
|
||||
subscription {:endpoint (:endpoint body)
|
||||
:p256dh (get-in body [:keys :p256dh])
|
||||
:auth (get-in body [:keys :auth])
|
||||
:user-agent (get-in request [:headers "user-agent"])}]
|
||||
(log/debug "Push subscribe request:" {:endpoint (:endpoint subscription)})
|
||||
(if (push-proto/valid-subscription? subscription)
|
||||
(let [saved (push-proto/save-subscription push-store subscription)]
|
||||
(-> (json-response {:id (:id saved)})
|
||||
(response/status 201)))
|
||||
(error-response 400 "Invalid subscription: endpoint, p256dh, and auth are required")))))
|
||||
|
||||
(defn unsubscribe-handler
|
||||
"Remove a push subscription"
|
||||
[push-store]
|
||||
(fn [request]
|
||||
(let [endpoint (get-in request [:body :endpoint])]
|
||||
(log/debug "Push unsubscribe request:" {:endpoint endpoint})
|
||||
(if endpoint
|
||||
(do
|
||||
(push-proto/delete-subscription-by-endpoint push-store endpoint)
|
||||
(response/status (response/response nil) 204))
|
||||
(error-response 400 "Endpoint is required")))))
|
||||
|
||||
(defn create-routes
|
||||
"Create API routes with the given store and broadcast function"
|
||||
[store broadcast-fn]
|
||||
"Create API routes with the given store, broadcast function, and push store"
|
||||
[store broadcast-fn push-store]
|
||||
[["/api"
|
||||
["/health" {:get health-handler}]
|
||||
["/sessions" {:get (list-sessions-handler store)
|
||||
@@ -144,13 +301,18 @@
|
||||
:patch (update-session-handler store)
|
||||
:delete (delete-session-handler store)}]
|
||||
["/sessions/:id/send" {:post (send-message-handler store broadcast-fn)}]
|
||||
["/sessions/:id/permission" {:post (permission-response-handler store broadcast-fn)}]]])
|
||||
["/sessions/:id/permission" {:post (permission-response-handler store broadcast-fn)}]
|
||||
["/sessions/:id/terminal" {:get (terminal-capture-handler store)}]
|
||||
["/sessions/:id/terminal/input" {:post (terminal-input-handler store broadcast-fn)}]
|
||||
["/push/vapid-key" {:get (vapid-key-handler push-store)}]
|
||||
["/push/subscribe" {:post (subscribe-handler push-store)}]
|
||||
["/push/unsubscribe" {:post (unsubscribe-handler push-store)}]]])
|
||||
|
||||
(defn create-app
|
||||
"Create the Ring application"
|
||||
[store broadcast-fn]
|
||||
[store broadcast-fn push-store]
|
||||
(-> (ring/ring-handler
|
||||
(ring/router (create-routes store broadcast-fn))
|
||||
(ring/router (create-routes store broadcast-fn push-store))
|
||||
(ring/create-default-handler))
|
||||
(wrap-json-body {:keywords? true})
|
||||
wrap-json-response
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
[spiceflow.api.routes :as routes]
|
||||
[spiceflow.api.websocket :as ws]
|
||||
[spiceflow.session.manager :as manager]
|
||||
[spiceflow.push.store :as push-store]
|
||||
[mount.core :as mount :refer [defstate]]
|
||||
[clojure.tools.logging :as log])
|
||||
(:gen-class))
|
||||
@@ -18,6 +19,13 @@
|
||||
(sqlite/create-store db-path)))
|
||||
:stop nil)
|
||||
|
||||
;; Push notification store (shares datasource with main store)
|
||||
(defstate push
|
||||
:start (do
|
||||
(log/info "Initializing push notification store...")
|
||||
(push-store/create-push-store (:datasource store)))
|
||||
:stop nil)
|
||||
|
||||
;; Atom to hold the Jetty server instance
|
||||
(defonce ^:private jetty-server (atom nil))
|
||||
|
||||
@@ -27,7 +35,9 @@
|
||||
host (get-in config/config [:server :host] "0.0.0.0")
|
||||
;; Wire up pending permission function for WebSocket (partially apply store)
|
||||
_ (ws/set-pending-permission-fn! (partial manager/get-pending-permission store))
|
||||
api-app (routes/create-app store ws/broadcast-to-session)
|
||||
;; Wire up push store for notifications (used by manager)
|
||||
_ (manager/set-push-store! push)
|
||||
api-app (routes/create-app store ws/broadcast-to-session push)
|
||||
;; Wrap the app to handle WebSocket upgrades on /api/ws
|
||||
app (fn [request]
|
||||
(if (and (jetty/ws-upgrade-request? request)
|
||||
|
||||
@@ -29,7 +29,7 @@
|
||||
"Validate session data has required fields"
|
||||
[{:keys [provider]}]
|
||||
(and provider
|
||||
(contains? #{:claude :opencode "claude" "opencode"} provider)))
|
||||
(contains? #{:claude :opencode :tmux "claude" "opencode" "tmux"} provider)))
|
||||
|
||||
(defn valid-message?
|
||||
"Validate message data has required fields"
|
||||
|
||||
@@ -0,0 +1,33 @@
|
||||
(ns spiceflow.push.protocol
|
||||
"Protocol for push notification subscription storage")
|
||||
|
||||
(defprotocol PushStore
|
||||
"Protocol for managing push subscriptions and VAPID keys"
|
||||
|
||||
;; Subscription operations
|
||||
(get-subscriptions [this]
|
||||
"Get all push subscriptions")
|
||||
(get-subscription [this id]
|
||||
"Get a subscription by ID")
|
||||
(get-subscription-by-endpoint [this endpoint]
|
||||
"Get a subscription by endpoint URL")
|
||||
(save-subscription [this subscription]
|
||||
"Save a new push subscription. Returns the saved subscription with ID.")
|
||||
(delete-subscription [this id]
|
||||
"Delete a subscription by ID")
|
||||
(delete-subscription-by-endpoint [this endpoint]
|
||||
"Delete a subscription by endpoint URL")
|
||||
|
||||
;; VAPID key operations
|
||||
(get-vapid-keys [this]
|
||||
"Get the VAPID key pair (returns {:public-key :private-key} or nil)")
|
||||
(save-vapid-keys [this keys]
|
||||
"Save VAPID key pair. Only called once on first use."))
|
||||
|
||||
(defn valid-subscription?
|
||||
"Validate subscription data has required fields"
|
||||
[{:keys [endpoint p256dh auth]}]
|
||||
(and endpoint p256dh auth
|
||||
(string? endpoint)
|
||||
(string? p256dh)
|
||||
(string? auth)))
|
||||
@@ -0,0 +1,337 @@
|
||||
(ns spiceflow.push.sender
|
||||
"Web Push message encryption and delivery.
|
||||
Implements RFC 8291 (Message Encryption for Web Push) and RFC 8292 (VAPID)."
|
||||
(:require [clj-http.client :as http]
|
||||
[jsonista.core :as json]
|
||||
[spiceflow.push.protocol :as proto]
|
||||
[spiceflow.push.vapid :as vapid]
|
||||
[clojure.tools.logging :as log])
|
||||
(:import [java.security KeyPairGenerator SecureRandom KeyFactory]
|
||||
[java.security.spec ECGenParameterSpec ECPublicKeySpec ECPoint]
|
||||
[javax.crypto Cipher KeyAgreement Mac]
|
||||
[javax.crypto.spec SecretKeySpec GCMParameterSpec]
|
||||
[java.util Base64 Arrays]
|
||||
[java.nio ByteBuffer]
|
||||
[java.math BigInteger]
|
||||
[org.bouncycastle.jce ECNamedCurveTable]
|
||||
[org.bouncycastle.jce.spec ECNamedCurveSpec]))
|
||||
|
||||
;; Note: We implement Web Push encryption using Java crypto primitives
|
||||
;; to avoid additional dependencies beyond buddy-core
|
||||
|
||||
(defn- base64url-decode
|
||||
"Decode URL-safe base64 string to bytes"
|
||||
[^String s]
|
||||
(.decode (Base64/getUrlDecoder) s))
|
||||
|
||||
(defn- base64url-encode
|
||||
"Encode bytes to URL-safe base64 without padding"
|
||||
[^bytes b]
|
||||
(-> (Base64/getUrlEncoder)
|
||||
(.withoutPadding)
|
||||
(.encodeToString b)))
|
||||
|
||||
(defn- generate-salt
|
||||
"Generate 16 random bytes for encryption salt"
|
||||
[]
|
||||
(let [salt (byte-array 16)
|
||||
random (SecureRandom.)]
|
||||
(.nextBytes random salt)
|
||||
salt))
|
||||
|
||||
(defn- generate-ephemeral-keypair
|
||||
"Generate an ephemeral ECDH key pair for message encryption"
|
||||
[]
|
||||
(let [kpg (KeyPairGenerator/getInstance "EC")
|
||||
_ (.initialize kpg (ECGenParameterSpec. "secp256r1") (SecureRandom.))]
|
||||
(.generateKeyPair kpg)))
|
||||
|
||||
(defn- public-key->uncompressed-bytes
|
||||
"Convert EC public key to uncompressed point format (0x04 || x || y)"
|
||||
[public-key]
|
||||
(let [point (.getW public-key)
|
||||
x-bytes (.toByteArray (.getAffineX point))
|
||||
y-bytes (.toByteArray (.getAffineY point))
|
||||
x-padded (byte-array 32)
|
||||
y-padded (byte-array 32)]
|
||||
(let [x-len (min 32 (alength x-bytes))
|
||||
x-offset (max 0 (- (alength x-bytes) 32))
|
||||
y-len (min 32 (alength y-bytes))
|
||||
y-offset (max 0 (- (alength y-bytes) 32))]
|
||||
(System/arraycopy x-bytes x-offset x-padded (- 32 x-len) x-len)
|
||||
(System/arraycopy y-bytes y-offset y-padded (- 32 y-len) y-len))
|
||||
(let [result (byte-array 65)]
|
||||
(aset-byte result 0 (unchecked-byte 0x04))
|
||||
(System/arraycopy x-padded 0 result 1 32)
|
||||
(System/arraycopy y-padded 0 result 33 32)
|
||||
result)))
|
||||
|
||||
(defn- uncompressed-bytes->public-key
|
||||
"Convert uncompressed point bytes (0x04 || x || y) to EC public key"
|
||||
[^bytes point-bytes]
|
||||
(when (and (= 65 (alength point-bytes))
|
||||
(= 0x04 (aget point-bytes 0)))
|
||||
(let [x-bytes (byte-array 32)
|
||||
y-bytes (byte-array 32)
|
||||
_ (System/arraycopy point-bytes 1 x-bytes 0 32)
|
||||
_ (System/arraycopy point-bytes 33 y-bytes 0 32)
|
||||
x (BigInteger. 1 x-bytes)
|
||||
y (BigInteger. 1 y-bytes)
|
||||
;; Get EC parameters for P-256
|
||||
kpg (doto (KeyPairGenerator/getInstance "EC")
|
||||
(.initialize (ECGenParameterSpec. "secp256r1")))
|
||||
temp-pair (.generateKeyPair kpg)
|
||||
params (.getParams (.getPublic temp-pair))
|
||||
point (ECPoint. x y)
|
||||
spec (ECPublicKeySpec. point params)]
|
||||
(-> (KeyFactory/getInstance "EC")
|
||||
(.generatePublic spec)))))
|
||||
|
||||
(defn- ecdh-derive-secret
|
||||
"Perform ECDH to derive shared secret"
|
||||
[private-key public-key]
|
||||
(let [ka (KeyAgreement/getInstance "ECDH")]
|
||||
(.init ka private-key)
|
||||
(.doPhase ka public-key true)
|
||||
(.generateSecret ka)))
|
||||
|
||||
(defn- hmac-sha256
|
||||
"Compute HMAC-SHA256"
|
||||
[^bytes key ^bytes data]
|
||||
(let [mac (Mac/getInstance "HmacSHA256")
|
||||
secret-key (SecretKeySpec. key "HmacSHA256")]
|
||||
(.init mac secret-key)
|
||||
(.doFinal mac data)))
|
||||
|
||||
(defn- hkdf-extract
|
||||
"HKDF extract step"
|
||||
[salt ikm]
|
||||
(let [salt (if (and salt (pos? (alength salt))) salt (byte-array 32))]
|
||||
(hmac-sha256 salt ikm)))
|
||||
|
||||
(defn- hkdf-expand
|
||||
"HKDF expand step"
|
||||
[prk info length]
|
||||
(let [hash-len 32
|
||||
n (int (Math/ceil (/ length hash-len)))
|
||||
okm (byte-array (* n hash-len))
|
||||
prev (byte-array 0)]
|
||||
(loop [i 1
|
||||
prev prev]
|
||||
(when (<= i n)
|
||||
(let [input (byte-array (+ (alength prev) (alength info) 1))
|
||||
_ (System/arraycopy prev 0 input 0 (alength prev))
|
||||
_ (System/arraycopy info 0 input (alength prev) (alength info))
|
||||
_ (aset-byte input (dec (alength input)) (unchecked-byte i))
|
||||
output (hmac-sha256 prk input)]
|
||||
(System/arraycopy output 0 okm (* (dec i) hash-len) hash-len)
|
||||
(recur (inc i) output))))
|
||||
(Arrays/copyOf okm length)))
|
||||
|
||||
(defn- hkdf
|
||||
"Full HKDF key derivation"
|
||||
[salt ikm info length]
|
||||
(let [prk (hkdf-extract salt ikm)]
|
||||
(hkdf-expand prk info length)))
|
||||
|
||||
(defn- build-info
|
||||
"Build the info parameter for HKDF according to RFC 8291"
|
||||
[^String type ^bytes client-public ^bytes server-public]
|
||||
(let [type-bytes (.getBytes type "UTF-8")
|
||||
;; Info structure: "Content-Encoding: <type>" || 0x00 || "P-256" || 0x00
|
||||
;; || client public key length (2 bytes) || client public key
|
||||
;; || server public key length (2 bytes) || server public key
|
||||
info-len (+ (count "Content-Encoding: ") (alength type-bytes) 1
|
||||
5 1 ;; "P-256" + null
|
||||
2 65 ;; client key length + key
|
||||
2 65) ;; server key length + key
|
||||
info (byte-array info-len)
|
||||
buf (ByteBuffer/wrap info)]
|
||||
(.put buf (.getBytes (str "Content-Encoding: " type) "UTF-8"))
|
||||
(.put buf (byte 0))
|
||||
(.put buf (.getBytes "P-256" "UTF-8"))
|
||||
(.put buf (byte 0))
|
||||
(.putShort buf (short 65))
|
||||
(.put buf client-public)
|
||||
(.putShort buf (short 65))
|
||||
(.put buf server-public)
|
||||
info))
|
||||
|
||||
(defn- aes-128-gcm-encrypt
|
||||
"Encrypt data using AES-128-GCM"
|
||||
[^bytes key ^bytes nonce ^bytes plaintext]
|
||||
(let [cipher (Cipher/getInstance "AES/GCM/NoPadding")
|
||||
secret-key (SecretKeySpec. key "AES")
|
||||
gcm-spec (GCMParameterSpec. 128 nonce)]
|
||||
(.init cipher Cipher/ENCRYPT_MODE secret-key gcm-spec)
|
||||
(.doFinal cipher plaintext)))
|
||||
|
||||
(defn- pad-plaintext
|
||||
"Add padding to plaintext according to RFC 8291.
|
||||
Padding: 2 bytes length prefix (big-endian) + padding bytes"
|
||||
[^bytes plaintext]
|
||||
;; For simplicity, use minimal padding (just the required 2-byte header)
|
||||
;; The format is: padding_length (2 bytes, big-endian) || zeros || plaintext
|
||||
(let [plaintext-len (alength plaintext)
|
||||
;; Use 0 bytes of actual padding
|
||||
padding-len 0
|
||||
result (byte-array (+ 2 padding-len plaintext-len))]
|
||||
;; Write padding length as big-endian 16-bit integer
|
||||
(aset-byte result 0 (unchecked-byte (bit-shift-right padding-len 8)))
|
||||
(aset-byte result 1 (unchecked-byte (bit-and padding-len 0xFF)))
|
||||
;; Copy plaintext after padding header
|
||||
(System/arraycopy plaintext 0 result (+ 2 padding-len) plaintext-len)
|
||||
result))
|
||||
|
||||
(defn encrypt-payload
|
||||
"Encrypt a push message payload using Web Push encryption (RFC 8291).
|
||||
|
||||
Parameters:
|
||||
- p256dh: Client's ECDH public key (base64url encoded)
|
||||
- auth: Client's auth secret (base64url encoded)
|
||||
- plaintext: The message to encrypt (bytes)
|
||||
|
||||
Returns a map with:
|
||||
- :ciphertext - The encrypted payload
|
||||
- :salt - The encryption salt (for Content-Encoding header)
|
||||
- :public-key - Server's ephemeral public key"
|
||||
[p256dh auth plaintext]
|
||||
(let [;; Decode client keys
|
||||
client-public-bytes (base64url-decode p256dh)
|
||||
auth-secret (base64url-decode auth)
|
||||
client-public-key (uncompressed-bytes->public-key client-public-bytes)
|
||||
|
||||
;; Generate ephemeral server key pair
|
||||
server-keypair (generate-ephemeral-keypair)
|
||||
server-private-key (.getPrivate server-keypair)
|
||||
server-public-key (.getPublic server-keypair)
|
||||
server-public-bytes (public-key->uncompressed-bytes server-public-key)
|
||||
|
||||
;; Generate salt
|
||||
salt (generate-salt)
|
||||
|
||||
;; ECDH to derive shared secret
|
||||
ecdh-secret (ecdh-derive-secret server-private-key client-public-key)
|
||||
|
||||
;; Derive PRK using auth secret
|
||||
;; PRK = HKDF-Extract(auth_secret, ecdh_secret)
|
||||
auth-info (.getBytes "Content-Encoding: auth\u0000" "UTF-8")
|
||||
prk-key (hkdf auth-secret ecdh-secret auth-info 32)
|
||||
|
||||
;; Derive content encryption key (CEK)
|
||||
cek-info (build-info "aes128gcm" client-public-bytes server-public-bytes)
|
||||
cek (hkdf salt prk-key cek-info 16)
|
||||
|
||||
;; Derive nonce
|
||||
nonce-info (build-info "nonce" client-public-bytes server-public-bytes)
|
||||
nonce (hkdf salt prk-key nonce-info 12)
|
||||
|
||||
;; Pad and encrypt
|
||||
padded-plaintext (pad-plaintext plaintext)
|
||||
ciphertext (aes-128-gcm-encrypt cek nonce padded-plaintext)]
|
||||
{:ciphertext ciphertext
|
||||
:salt salt
|
||||
:public-key server-public-bytes}))
|
||||
|
||||
(defn build-encrypted-body
|
||||
"Build the full encrypted body with header for aes128gcm Content-Encoding.
|
||||
|
||||
Format: salt (16 bytes) || rs (4 bytes) || idlen (1 byte) || keyid || ciphertext"
|
||||
[^bytes salt ^bytes server-public ^bytes ciphertext]
|
||||
(let [rs 4096 ;; Record size
|
||||
idlen (alength server-public)
|
||||
body (byte-array (+ 16 4 1 idlen (alength ciphertext)))
|
||||
buf (ByteBuffer/wrap body)]
|
||||
(.put buf salt)
|
||||
(.putInt buf rs)
|
||||
(.put buf (unchecked-byte idlen))
|
||||
(.put buf server-public)
|
||||
(.put buf ciphertext)
|
||||
body))
|
||||
|
||||
(defn send-notification
|
||||
"Send a push notification to a subscription.
|
||||
|
||||
Parameters:
|
||||
- subscription: Map with :endpoint, :p256dh, :auth
|
||||
- payload: Map to be JSON-encoded as the notification payload
|
||||
- vapid-keys: Map with :public-key, :private-key (base64url encoded)
|
||||
- options: Optional map with :ttl (seconds), :urgency, :topic
|
||||
|
||||
Returns:
|
||||
- {:success true} on success
|
||||
- {:success false :status <code> :body <body>} on failure
|
||||
- {:success false :error <message>} on exception"
|
||||
[subscription payload vapid-keys & [{:keys [ttl urgency topic subject]
|
||||
:or {ttl 86400
|
||||
subject "mailto:spiceflow@localhost"}}]]
|
||||
(try
|
||||
(let [endpoint (:endpoint subscription)
|
||||
plaintext (.getBytes (json/write-value-as-string payload) "UTF-8")
|
||||
|
||||
;; Encrypt payload
|
||||
{:keys [ciphertext salt public-key]} (encrypt-payload
|
||||
(:p256dh subscription)
|
||||
(:auth subscription)
|
||||
plaintext)
|
||||
|
||||
;; Build encrypted body
|
||||
body (build-encrypted-body salt public-key ciphertext)
|
||||
|
||||
;; Build VAPID authorization header
|
||||
auth-header (vapid/vapid-authorization-header endpoint subject vapid-keys)
|
||||
|
||||
;; Build request headers
|
||||
headers {"Authorization" auth-header
|
||||
"Content-Type" "application/octet-stream"
|
||||
"Content-Encoding" "aes128gcm"
|
||||
"TTL" (str ttl)}
|
||||
headers (cond-> headers
|
||||
urgency (assoc "Urgency" urgency)
|
||||
topic (assoc "Topic" topic))
|
||||
|
||||
;; Send request
|
||||
response (http/post endpoint
|
||||
{:headers headers
|
||||
:body body
|
||||
:throw-exceptions false})]
|
||||
(if (<= 200 (:status response) 299)
|
||||
{:success true}
|
||||
{:success false
|
||||
:status (:status response)
|
||||
:body (:body response)}))
|
||||
(catch Exception e
|
||||
(log/error e "Failed to send push notification")
|
||||
{:success false
|
||||
:error (.getMessage e)})))
|
||||
|
||||
(defn send-to-all-subscriptions
|
||||
"Send a notification to all subscriptions in the push store.
|
||||
|
||||
Parameters:
|
||||
- push-store: PushStore instance
|
||||
- payload: Notification payload map
|
||||
|
||||
Returns a sequence of results for each subscription."
|
||||
[push-store payload]
|
||||
(let [subscriptions (proto/get-subscriptions push-store)
|
||||
vapid-keys (proto/get-vapid-keys push-store)]
|
||||
(when (and (seq subscriptions) vapid-keys)
|
||||
(log/info "Sending push notification to" (count subscriptions) "subscription(s)")
|
||||
(doall
|
||||
(for [sub subscriptions]
|
||||
(let [result (send-notification sub payload vapid-keys)]
|
||||
;; If subscription is gone (410) or invalid (404), remove it
|
||||
(when (#{404 410} (:status result))
|
||||
(log/info "Removing invalid/expired subscription:" (:endpoint sub))
|
||||
(proto/delete-subscription push-store (:id sub)))
|
||||
(assoc result :subscription-id (:id sub))))))))
|
||||
|
||||
(comment
|
||||
;; Test encryption
|
||||
(def test-p256dh "BNcRdreALRFXTkOOUHK1EtK2wtaz5Ry4YfYCA_0QTpQtUbVlUls0VJXg7A8u-Ts1XbjhazAkj7I99e8QcYP7DkM")
|
||||
(def test-auth "tBHItJI5svbpez7KI4CCXg")
|
||||
(def test-payload {:title "Test" :body "Hello"})
|
||||
|
||||
(encrypt-payload test-p256dh test-auth
|
||||
(.getBytes (json/write-value-as-string test-payload) "UTF-8")))
|
||||
@@ -0,0 +1,148 @@
|
||||
(ns spiceflow.push.store
|
||||
"SQLite implementation of PushStore protocol for managing push subscriptions and VAPID keys"
|
||||
(:require [next.jdbc :as jdbc]
|
||||
[next.jdbc.result-set :as rs]
|
||||
[next.jdbc.sql :as sql]
|
||||
[spiceflow.push.protocol :as proto]
|
||||
[spiceflow.push.vapid :as vapid]
|
||||
[clojure.tools.logging :as log])
|
||||
(:import [java.util UUID]
|
||||
[java.time Instant]))
|
||||
|
||||
(defn- generate-id []
|
||||
(str (UUID/randomUUID)))
|
||||
|
||||
(defn- now-iso []
|
||||
(.toString (Instant/now)))
|
||||
|
||||
(defn- row->subscription
|
||||
"Convert a database row to a subscription map"
|
||||
[row]
|
||||
(when row
|
||||
{:id (:id row)
|
||||
:endpoint (:endpoint row)
|
||||
:p256dh (:p256dh row)
|
||||
:auth (:auth row)
|
||||
:user-agent (:user-agent row)
|
||||
:created-at (:created-at row)}))
|
||||
|
||||
(defn- row->vapid-keys
|
||||
"Convert a database row to VAPID keys map"
|
||||
[row]
|
||||
(when row
|
||||
{:public-key (:public-key row)
|
||||
:private-key (:private-key row)
|
||||
:created-at (:created-at row)}))
|
||||
|
||||
(defrecord SQLitePushStore [datasource]
|
||||
proto/PushStore
|
||||
|
||||
(get-subscriptions [_]
|
||||
(let [rows (jdbc/execute! datasource
|
||||
["SELECT * FROM push_subscriptions ORDER BY created_at DESC"]
|
||||
{:builder-fn rs/as-unqualified-kebab-maps})]
|
||||
(mapv row->subscription rows)))
|
||||
|
||||
(get-subscription [_ id]
|
||||
(let [row (jdbc/execute-one! datasource
|
||||
["SELECT * FROM push_subscriptions WHERE id = ?" id]
|
||||
{:builder-fn rs/as-unqualified-kebab-maps})]
|
||||
(row->subscription row)))
|
||||
|
||||
(get-subscription-by-endpoint [_ endpoint]
|
||||
(let [row (jdbc/execute-one! datasource
|
||||
["SELECT * FROM push_subscriptions WHERE endpoint = ?" endpoint]
|
||||
{:builder-fn rs/as-unqualified-kebab-maps})]
|
||||
(row->subscription row)))
|
||||
|
||||
(save-subscription [this subscription]
|
||||
(let [id (or (:id subscription) (generate-id))
|
||||
now (now-iso)]
|
||||
;; Check if subscription with this endpoint already exists
|
||||
(if-let [existing (proto/get-subscription-by-endpoint this (:endpoint subscription))]
|
||||
;; Update existing subscription
|
||||
(do
|
||||
(sql/update! datasource :push_subscriptions
|
||||
{:p256dh (:p256dh subscription)
|
||||
:auth (:auth subscription)
|
||||
:user_agent (:user-agent subscription)}
|
||||
{:endpoint (:endpoint subscription)})
|
||||
(proto/get-subscription-by-endpoint this (:endpoint subscription)))
|
||||
;; Insert new subscription
|
||||
(do
|
||||
(sql/insert! datasource :push_subscriptions
|
||||
{:id id
|
||||
:endpoint (:endpoint subscription)
|
||||
:p256dh (:p256dh subscription)
|
||||
:auth (:auth subscription)
|
||||
:user_agent (:user-agent subscription)
|
||||
:created_at now})
|
||||
(proto/get-subscription this id)))))
|
||||
|
||||
(delete-subscription [_ id]
|
||||
(jdbc/execute! datasource ["DELETE FROM push_subscriptions WHERE id = ?" id])
|
||||
nil)
|
||||
|
||||
(delete-subscription-by-endpoint [_ endpoint]
|
||||
(jdbc/execute! datasource ["DELETE FROM push_subscriptions WHERE endpoint = ?" endpoint])
|
||||
nil)
|
||||
|
||||
(get-vapid-keys [_]
|
||||
(let [row (jdbc/execute-one! datasource
|
||||
["SELECT * FROM vapid_keys WHERE id = 1"]
|
||||
{:builder-fn rs/as-unqualified-kebab-maps})]
|
||||
(row->vapid-keys row)))
|
||||
|
||||
(save-vapid-keys [this keys]
|
||||
(let [now (now-iso)]
|
||||
;; Only save if no keys exist (singleton)
|
||||
(when-not (proto/get-vapid-keys this)
|
||||
(sql/insert! datasource :vapid_keys
|
||||
{:id 1
|
||||
:public_key (:public-key keys)
|
||||
:private_key (:private-key keys)
|
||||
:created_at now}))
|
||||
(proto/get-vapid-keys this))))
|
||||
|
||||
(def push-schema
|
||||
"SQLite schema for push notifications"
|
||||
["CREATE TABLE IF NOT EXISTS push_subscriptions (
|
||||
id TEXT PRIMARY KEY,
|
||||
endpoint TEXT NOT NULL UNIQUE,
|
||||
p256dh TEXT NOT NULL,
|
||||
auth TEXT NOT NULL,
|
||||
user_agent TEXT,
|
||||
created_at TEXT DEFAULT (datetime('now'))
|
||||
)"
|
||||
"CREATE TABLE IF NOT EXISTS vapid_keys (
|
||||
id INTEGER PRIMARY KEY CHECK (id = 1),
|
||||
public_key TEXT NOT NULL,
|
||||
private_key TEXT NOT NULL,
|
||||
created_at TEXT DEFAULT (datetime('now'))
|
||||
)"
|
||||
"CREATE INDEX IF NOT EXISTS idx_push_subscriptions_endpoint ON push_subscriptions(endpoint)"])
|
||||
|
||||
(defn init-push-schema!
|
||||
"Initialize push notification tables"
|
||||
[datasource]
|
||||
(doseq [stmt push-schema]
|
||||
(jdbc/execute! datasource [stmt])))
|
||||
|
||||
(defn ensure-vapid-keys!
|
||||
"Ensure VAPID keys exist, generating them if needed"
|
||||
[push-store]
|
||||
(if-let [existing (proto/get-vapid-keys push-store)]
|
||||
(do
|
||||
(log/info "Using existing VAPID keys")
|
||||
existing)
|
||||
(let [keys (vapid/generate-keypair)]
|
||||
(log/info "Generated new VAPID keys")
|
||||
(proto/save-vapid-keys push-store keys))))
|
||||
|
||||
(defn create-push-store
|
||||
"Create a SQLite push store using the given datasource"
|
||||
[datasource]
|
||||
(init-push-schema! datasource)
|
||||
(let [store (->SQLitePushStore datasource)]
|
||||
(ensure-vapid-keys! store)
|
||||
store))
|
||||
@@ -0,0 +1,129 @@
|
||||
(ns spiceflow.push.vapid
|
||||
"VAPID (Voluntary Application Server Identification) authentication for Web Push.
|
||||
Generates ECDSA P-256 key pairs and creates JWT tokens for push service authentication."
|
||||
(:require [buddy.core.keys :as keys]
|
||||
[buddy.sign.jwt :as jwt]
|
||||
[clojure.tools.logging :as log])
|
||||
(:import [java.security KeyPairGenerator SecureRandom]
|
||||
[java.security.spec ECGenParameterSpec]
|
||||
[java.util Base64]))
|
||||
|
||||
(defn- bytes->base64url
|
||||
"Convert bytes to URL-safe base64 without padding"
|
||||
[^bytes b]
|
||||
(-> (Base64/getUrlEncoder)
|
||||
(.withoutPadding)
|
||||
(.encodeToString b)))
|
||||
|
||||
(defn- base64url->bytes
|
||||
"Convert URL-safe base64 string to bytes"
|
||||
[^String s]
|
||||
(.decode (Base64/getUrlDecoder) s))
|
||||
|
||||
(defn- ec-public-key->uncompressed-bytes
|
||||
"Convert EC public key to uncompressed point format (0x04 || x || y).
|
||||
This is the format expected by the Push API for applicationServerKey."
|
||||
[public-key]
|
||||
(let [point (.getW public-key)
|
||||
x-bytes (.toByteArray (.getAffineX point))
|
||||
y-bytes (.toByteArray (.getAffineY point))
|
||||
;; Ensure exactly 32 bytes for each coordinate
|
||||
x-padded (byte-array 32)
|
||||
y-padded (byte-array 32)]
|
||||
;; Handle BigInteger byte arrays (may have leading zero or be shorter)
|
||||
(let [x-len (min 32 (alength x-bytes))
|
||||
x-offset (max 0 (- (alength x-bytes) 32))
|
||||
y-len (min 32 (alength y-bytes))
|
||||
y-offset (max 0 (- (alength y-bytes) 32))]
|
||||
(System/arraycopy x-bytes x-offset x-padded (- 32 x-len) x-len)
|
||||
(System/arraycopy y-bytes y-offset y-padded (- 32 y-len) y-len))
|
||||
;; Create uncompressed point: 0x04 || x || y
|
||||
(let [result (byte-array 65)]
|
||||
(aset-byte result 0 (unchecked-byte 0x04))
|
||||
(System/arraycopy x-padded 0 result 1 32)
|
||||
(System/arraycopy y-padded 0 result 33 32)
|
||||
result)))
|
||||
|
||||
(defn generate-keypair
|
||||
"Generate a new ECDSA P-256 key pair for VAPID.
|
||||
Returns {:public-key <base64url> :private-key <base64url>}"
|
||||
[]
|
||||
(let [kpg (KeyPairGenerator/getInstance "EC")
|
||||
_ (.initialize kpg (ECGenParameterSpec. "secp256r1") (SecureRandom.))
|
||||
keypair (.generateKeyPair kpg)
|
||||
public-key (.getPublic keypair)
|
||||
private-key (.getPrivate keypair)
|
||||
;; Public key in uncompressed format for Push API
|
||||
public-bytes (ec-public-key->uncompressed-bytes public-key)
|
||||
;; Private key as raw 32-byte scalar
|
||||
private-bytes (.getS private-key)]
|
||||
{:public-key (bytes->base64url public-bytes)
|
||||
:private-key (bytes->base64url (.toByteArray private-bytes))}))
|
||||
|
||||
(defn- reconstruct-private-key
|
||||
"Reconstruct an EC private key from raw bytes"
|
||||
[^bytes private-bytes]
|
||||
(let [s (java.math.BigInteger. 1 private-bytes)
|
||||
curve-params (-> (java.security.KeyFactory/getInstance "EC")
|
||||
(.generatePrivate
|
||||
(java.security.spec.ECPrivateKeySpec.
|
||||
s
|
||||
(.getParams
|
||||
(-> (KeyPairGenerator/getInstance "EC")
|
||||
(doto (.initialize (ECGenParameterSpec. "secp256r1")))
|
||||
(.generateKeyPair)
|
||||
(.getPrivate))))))
|
||||
spec (java.security.spec.ECPrivateKeySpec. s (.getParams curve-params))]
|
||||
(-> (java.security.KeyFactory/getInstance "EC")
|
||||
(.generatePrivate spec))))
|
||||
|
||||
(defn create-vapid-jwt
|
||||
"Create a VAPID JWT for authenticating to a push service.
|
||||
|
||||
Parameters:
|
||||
- audience: The origin of the push service (e.g., https://fcm.googleapis.com)
|
||||
- subject: Contact info (mailto: or https: URL)
|
||||
- private-key-b64: Base64url-encoded private key
|
||||
|
||||
Returns a signed JWT valid for 12 hours."
|
||||
[audience subject private-key-b64]
|
||||
(let [now (quot (System/currentTimeMillis) 1000)
|
||||
exp (+ now (* 12 60 60)) ;; 12 hours
|
||||
claims {:aud audience
|
||||
:exp exp
|
||||
:sub subject}
|
||||
private-bytes (base64url->bytes private-key-b64)
|
||||
private-key (reconstruct-private-key private-bytes)]
|
||||
(jwt/sign claims private-key {:alg :es256})))
|
||||
|
||||
(defn vapid-authorization-header
|
||||
"Create the Authorization header value for VAPID authentication.
|
||||
|
||||
Parameters:
|
||||
- endpoint: The push subscription endpoint URL
|
||||
- subject: Contact info (mailto: or https: URL)
|
||||
- vapid-keys: Map with :public-key and :private-key (base64url encoded)
|
||||
|
||||
Returns the value for the Authorization header."
|
||||
[endpoint subject vapid-keys]
|
||||
(let [url (java.net.URL. endpoint)
|
||||
audience (str (.getProtocol url) "://" (.getHost url))
|
||||
jwt (create-vapid-jwt audience subject (:private-key vapid-keys))]
|
||||
(str "vapid t=" jwt ",k=" (:public-key vapid-keys))))
|
||||
|
||||
(comment
|
||||
;; Test key generation
|
||||
(def kp (generate-keypair))
|
||||
kp
|
||||
|
||||
;; Test JWT creation
|
||||
(create-vapid-jwt
|
||||
"https://fcm.googleapis.com"
|
||||
"mailto:test@example.com"
|
||||
(:private-key kp))
|
||||
|
||||
;; Test authorization header
|
||||
(vapid-authorization-header
|
||||
"https://fcm.googleapis.com/fcm/send/abc123"
|
||||
"mailto:test@example.com"
|
||||
kp))
|
||||
@@ -4,18 +4,54 @@
|
||||
[spiceflow.adapters.protocol :as adapter]
|
||||
[spiceflow.adapters.claude :as claude]
|
||||
[spiceflow.adapters.opencode :as opencode]
|
||||
[spiceflow.adapters.tmux :as tmux]
|
||||
[spiceflow.push.sender :as push-sender]
|
||||
[clojure.tools.logging :as log])
|
||||
(:import [java.util.concurrent ConcurrentHashMap]))
|
||||
|
||||
;; Active process handles for running sessions
|
||||
(defonce ^:private active-processes (ConcurrentHashMap.))
|
||||
|
||||
;; Push store for sending notifications (set by core.clj)
|
||||
(defonce ^:private push-store (atom nil))
|
||||
|
||||
;; Forward declaration for use in send-permission-notification-delayed!
|
||||
(declare get-pending-permission)
|
||||
|
||||
(defn set-push-store!
|
||||
"Set the push store for sending notifications"
|
||||
[store]
|
||||
(reset! push-store store))
|
||||
|
||||
(defn- send-permission-notification-delayed!
|
||||
"Send push notification for a permission request after a delay, only if still pending"
|
||||
[store session-id session perm-req delay-ms]
|
||||
(when-let [pstore @push-store]
|
||||
(future
|
||||
(try
|
||||
(Thread/sleep delay-ms)
|
||||
;; Check if permission is still pending
|
||||
(when-let [pending (get-pending-permission store session-id)]
|
||||
;; Verify same permission request (by message-id) to avoid race conditions
|
||||
(when (= (:message-id pending) (:message-id perm-req))
|
||||
(let [tools (:tools perm-req)
|
||||
payload {:title "Permission Required"
|
||||
:body (str "Claude wants to use: " (clojure.string/join ", " tools))
|
||||
:sessionId (:id session)
|
||||
:sessionTitle (or (:title session) "Untitled Session")
|
||||
:tools tools}]
|
||||
(log/debug "Sending push notification for permission request:" (:tools perm-req))
|
||||
(push-sender/send-to-all-subscriptions pstore payload))))
|
||||
(catch Exception e
|
||||
(log/error e "Failed to send push notification"))))))
|
||||
|
||||
(defn get-adapter
|
||||
"Get the appropriate adapter for a provider"
|
||||
[provider]
|
||||
(case (keyword provider)
|
||||
:claude (claude/create-adapter)
|
||||
:opencode (opencode/create-adapter)
|
||||
:tmux (tmux/create-adapter)
|
||||
(throw (ex-info "Unknown provider" {:provider provider}))))
|
||||
|
||||
(defn get-active-process
|
||||
@@ -42,15 +78,10 @@
|
||||
;; Fall back to working-dir for existing sessions that don't have spawn-dir yet
|
||||
(let [adapter (get-adapter (:provider session))
|
||||
spawn-dir (or (:spawn-dir session) (:working-dir session))
|
||||
;; Pre-grant Write/Edit tools if auto-accept-edits is enabled
|
||||
allowed-tools (when (:auto-accept-edits session)
|
||||
["Write" "Edit"])
|
||||
_ (log/debug "Starting session with spawn-dir:" spawn-dir "external-id:" (:external-id session) "allowed-tools:" allowed-tools)
|
||||
_ (log/debug "Starting session with spawn-dir:" spawn-dir "external-id:" (:external-id session))
|
||||
handle (adapter/spawn-session adapter
|
||||
(:external-id session)
|
||||
(cond-> {:working-dir spawn-dir}
|
||||
(seq allowed-tools)
|
||||
(assoc :allowed-tools allowed-tools)))]
|
||||
{:working-dir spawn-dir})]
|
||||
(.put active-processes session-id handle)
|
||||
(db/update-session store session-id {:status :processing})
|
||||
handle)))
|
||||
@@ -68,7 +99,8 @@
|
||||
(defn send-message-to-session
|
||||
"Send a message to a running session"
|
||||
[store session-id message]
|
||||
(log/debug "User action: send-message" {:session-id session-id :message message})
|
||||
(log/debug "User action: send-message" {:session-id session-id})
|
||||
(log/debug "User message content:" message)
|
||||
(let [session (db/get-session store session-id)
|
||||
_ (when-not session
|
||||
(throw (ex-info "Session not found" {:session-id session-id})))
|
||||
@@ -82,11 +114,11 @@
|
||||
:content message})
|
||||
;; Send to CLI - for OpenCode, this returns an updated handle with the process
|
||||
(let [result (adapter/send-message adapter handle message)]
|
||||
(log/info "send-message result type:" (type result) "has-process:" (boolean (:process result)))
|
||||
(log/debug "send-message result type:" (type result) "has-process:" (boolean (:process result)))
|
||||
;; If result is a map with :process, it's an updated handle (OpenCode)
|
||||
;; Store it so stream-session-response can use it
|
||||
(when (and (map? result) (:process result))
|
||||
(log/info "Storing updated handle with process for session:" session-id)
|
||||
(log/debug "Storing updated handle with process for session:" session-id)
|
||||
(.put active-processes session-id result))
|
||||
result)))
|
||||
|
||||
@@ -142,26 +174,36 @@
|
||||
(sort-by count)
|
||||
last)))
|
||||
|
||||
(defn- should-auto-accept?
|
||||
"Check if a permission request should be auto-accepted.
|
||||
Returns true if auto-accept-edits is enabled and all requested tools are Write/Edit."
|
||||
[session perm-req]
|
||||
(and (:auto-accept-edits session)
|
||||
(every? #{"Write" "Edit"} (:tools perm-req))))
|
||||
|
||||
(declare stream-session-response respond-to-permission)
|
||||
|
||||
(defn stream-session-response
|
||||
"Stream response from a running session, calling callback for each event"
|
||||
[store session-id callback]
|
||||
(log/info "stream-session-response starting for session:" session-id)
|
||||
(log/debug "stream-session-response starting for session:" session-id)
|
||||
(let [session (db/get-session store session-id)
|
||||
_ (when-not session
|
||||
(throw (ex-info "Session not found" {:session-id session-id})))
|
||||
handle (get-active-process session-id)
|
||||
_ (log/info "Got handle for session:" session-id "has-process:" (boolean (:process handle)) "has-stdout:" (boolean (:stdout handle)))
|
||||
_ (log/debug "Got handle for session:" session-id "has-process:" (boolean (:process handle)) "has-stdout:" (boolean (:stdout handle)))
|
||||
_ (when-not handle
|
||||
(throw (ex-info "Session not running" {:session-id session-id})))
|
||||
adapter (get-adapter (:provider session))
|
||||
content-buffer (StringBuilder.)
|
||||
last-working-dir (atom nil)]
|
||||
last-working-dir (atom nil)
|
||||
auto-accepted (atom false)]
|
||||
;; Read stream and accumulate content
|
||||
(log/info "Starting to read stream for session:" session-id)
|
||||
(log/debug "Starting to read stream for session:" session-id)
|
||||
(adapter/read-stream adapter handle
|
||||
(fn [event]
|
||||
(log/info "Received event:" (:event event) "text:" (when (:text event) (subs (str (:text event)) 0 (min 50 (count (str (:text event)))))))
|
||||
(log/debug "Agent response full event:" (pr-str event))
|
||||
(log/debug "LLM event:" (:event event) (when (:text event) (str "text-length:" (count (str (:text event))))))
|
||||
(log/debug "LLM response full event:" (pr-str event))
|
||||
(callback event)
|
||||
;; Accumulate text content
|
||||
(when-let [text (:text event)]
|
||||
@@ -185,7 +227,7 @@
|
||||
(when (and (= :user (:role event))
|
||||
(vector? (:content event)))
|
||||
(when-let [dir (extract-working-dir-from-tool-result (:content event))]
|
||||
(log/info "Detected working directory from tool result:" dir)
|
||||
(log/debug "Detected working directory from tool result:" dir)
|
||||
(reset! last-working-dir dir)
|
||||
;; Emit working-dir-update event so UI can update in real-time
|
||||
(callback {:event :working-dir-update
|
||||
@@ -210,44 +252,80 @@
|
||||
(callback {:event :content-delta :text result-content}))
|
||||
;; Save message if any content
|
||||
(when (seq content)
|
||||
(log/debug "LLM response complete - saving assistant message")
|
||||
(log/debug "LLM response content:" content)
|
||||
(db/save-message store {:session-id session-id
|
||||
:role :assistant
|
||||
:content content}))
|
||||
;; If there's a permission request, save it as a message and emit event
|
||||
(when-let [perm-req (:permission-request event)]
|
||||
(log/info "Permission request detected:" perm-req)
|
||||
(log/debug "LLM permission request detected:" (pr-str perm-req))
|
||||
;; Build description for the permission message content
|
||||
(let [description (->> (:denials perm-req)
|
||||
(map (fn [{:keys [tool description]}]
|
||||
(str tool ": " description)))
|
||||
(clojure.string/join "\n"))
|
||||
;; Check if we should auto-accept this permission
|
||||
auto-accept? (should-auto-accept? session perm-req)
|
||||
;; Save permission request as a system message
|
||||
;; If auto-accepting, mark as "accept" status immediately
|
||||
perm-msg (db/save-message store
|
||||
{:session-id session-id
|
||||
:role :system
|
||||
:content description
|
||||
:metadata {:type "permission-request"
|
||||
:denials (:denials perm-req)
|
||||
:tools (:tools perm-req)}})
|
||||
:metadata (cond-> {:type "permission-request"
|
||||
:denials (:denials perm-req)
|
||||
:tools (:tools perm-req)}
|
||||
auto-accept?
|
||||
(assoc :status "accept"))})
|
||||
msg-id (:id perm-msg)]
|
||||
;; Store pending permission with message ID for later update
|
||||
(set-pending-permission store session-id
|
||||
(assoc perm-req :message-id msg-id))
|
||||
(callback {:event :permission-request
|
||||
:permission-request perm-req
|
||||
:message-id msg-id
|
||||
:message perm-msg})))))))
|
||||
(if auto-accept?
|
||||
;; Auto-accept: set up for continuation after stream ends
|
||||
(do
|
||||
(log/debug "Auto-accepting permission request for tools:" (:tools perm-req))
|
||||
;; Store pending permission so respond-to-permission can use it
|
||||
(set-pending-permission store session-id
|
||||
(assoc perm-req :message-id msg-id))
|
||||
;; Emit permission event with auto-accept flag so UI knows it was auto-accepted
|
||||
(callback {:event :permission-request
|
||||
:permission-request perm-req
|
||||
:message-id msg-id
|
||||
:message perm-msg
|
||||
:auto-accepted true})
|
||||
;; Mark that we should auto-accept after stream ends
|
||||
(reset! auto-accepted true))
|
||||
;; Normal flow: store pending and wait for user response
|
||||
(let [perm-req-with-id (assoc perm-req :message-id msg-id)]
|
||||
(set-pending-permission store session-id perm-req-with-id)
|
||||
;; Send push notification for permission request after 15s delay
|
||||
(send-permission-notification-delayed! store session-id session perm-req-with-id 15000)
|
||||
(callback {:event :permission-request
|
||||
:permission-request perm-req
|
||||
:message-id msg-id
|
||||
:message perm-msg})))))))))
|
||||
;; Update session with last known working directory
|
||||
(when @last-working-dir
|
||||
(log/info "Updating session working directory to:" @last-working-dir)
|
||||
(log/debug "Updating session working directory to:" @last-working-dir)
|
||||
(db/update-session store session-id {:working-dir @last-working-dir}))
|
||||
;; Update session status when stream ends
|
||||
;; If there's a pending permission, set status to awaiting-permission
|
||||
(let [new-status (if (get-pending-permission store session-id)
|
||||
:awaiting-permission
|
||||
:idle)]
|
||||
(db/update-session store session-id {:status new-status}))
|
||||
(.remove active-processes session-id)))
|
||||
;; Handle auto-accept continuation
|
||||
(if @auto-accepted
|
||||
(do
|
||||
(log/debug "Processing auto-accept for session:" session-id)
|
||||
;; Remove the old process handle (stream has ended)
|
||||
(.remove active-processes session-id)
|
||||
;; Respond to permission (spawns new process)
|
||||
(respond-to-permission store session-id :accept nil)
|
||||
;; Continue streaming the new response
|
||||
(stream-session-response store session-id callback))
|
||||
;; Normal flow: update status and cleanup
|
||||
(do
|
||||
;; Update session status when stream ends
|
||||
;; If there's a pending permission, set status to awaiting-permission
|
||||
(let [new-status (if (get-pending-permission store session-id)
|
||||
:awaiting-permission
|
||||
:idle)]
|
||||
(db/update-session store session-id {:status new-status}))
|
||||
(.remove active-processes session-id)))))
|
||||
|
||||
(defn cleanup-all
|
||||
"Stop all running sessions"
|
||||
@@ -263,7 +341,8 @@
|
||||
response-type: :accept, :deny, or :steer
|
||||
message: optional message for :deny or :steer responses"
|
||||
[store session-id response-type message]
|
||||
(log/debug "User action: permission-response" {:session-id session-id :response-type response-type :message message})
|
||||
(log/debug "User action: permission-response" {:session-id session-id :response-type response-type})
|
||||
(when message (log/debug "User steer message:" message))
|
||||
(let [session (db/get-session store session-id)
|
||||
_ (when-not session
|
||||
(throw (ex-info "Session not found" {:session-id session-id})))
|
||||
@@ -273,17 +352,11 @@
|
||||
adapter (get-adapter (:provider session))
|
||||
;; Use spawn-dir for spawning, fall back to working-dir for existing sessions
|
||||
spawn-dir (or (:spawn-dir session) (:working-dir session))
|
||||
;; Auto-accept tools from session setting (always included if enabled)
|
||||
auto-accept-tools (when (:auto-accept-edits session)
|
||||
["Write" "Edit"])
|
||||
;; Tools granted from accepting the permission request
|
||||
granted-tools (when (= response-type :accept) (:tools pending))
|
||||
;; Combine both sets of allowed tools
|
||||
all-allowed-tools (seq (distinct (concat auto-accept-tools granted-tools)))
|
||||
;; Build spawn options
|
||||
;; Build spawn options based on response type
|
||||
opts (cond-> {:working-dir spawn-dir}
|
||||
all-allowed-tools
|
||||
(assoc :allowed-tools (vec all-allowed-tools)))
|
||||
;; For :accept, grant the requested tools
|
||||
(= response-type :accept)
|
||||
(assoc :allowed-tools (:tools pending)))
|
||||
;; Determine the message to send
|
||||
send-msg (case response-type
|
||||
:accept "continue"
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
Hello from OpenCode test
|
||||
Reference in New Issue
Block a user