add git diffs and permission support

This commit is contained in:
2026-01-19 23:45:03 -05:00
parent 313ac44337
commit 61a2e9b8af
44 changed files with 2051 additions and 267 deletions
+6 -1
View File
@@ -32,4 +32,9 @@
:main-opts ["-m" "kaocha.runner"]}
:repl {:main-opts ["-m" "nrepl.cmdline" "-i"]
:extra-deps {nrepl/nrepl {:mvn/version "1.1.0"}
cider/cider-nrepl {:mvn/version "0.44.0"}}}}}
cider/cider-nrepl {:mvn/version "0.44.0"}}}
:dev {:extra-paths ["dev"]
:extra-deps {nrepl/nrepl {:mvn/version "1.1.0"}
cider/cider-nrepl {:mvn/version "0.44.0"}
org.clojure/tools.namespace {:mvn/version "1.5.0"}
hawk/hawk {:mvn/version "0.2.11"}}}}}
+76
View File
@@ -0,0 +1,76 @@
(ns user
(:require [clojure.tools.namespace.repl :as repl]
[clojure.tools.logging :as log]
[mount.core :as mount]
[hawk.core :as hawk]))
;; Only reload spiceflow namespaces
(repl/set-refresh-dirs "src")
(defonce watcher (atom nil))
(defn start
"Start the application."
[]
(require 'spiceflow.core)
((resolve 'spiceflow.core/-main)))
(defn stop
"Stop the application."
[]
(mount/stop))
(defn reset
"Stop, reload all changed namespaces, and restart."
[]
(stop)
(repl/refresh :after 'user/start))
(defn reload
"Reload all changed namespaces without restarting."
[]
(repl/refresh))
(defn reload-all
"Force reload all app namespaces."
[]
(stop)
(repl/refresh-all :after 'user/start))
(defn- clj-file? [_ {:keys [file]}]
(and file (.endsWith (.getName file) ".clj")))
(defn- on-file-change [_ _]
(log/info "File change detected, reloading namespaces...")
(log/info "Reloading workspaces...")
(try
(reset)
(log/info "Reload complete")
(catch Exception e
(log/error e "Reload failed"))))
(defn watch
"Start watching src directory for changes and auto-reload."
[]
(when @watcher
(hawk/stop! @watcher))
(reset! watcher
(hawk/watch! [{:paths ["src"]
:filter clj-file?
:handler on-file-change}]))
(log/info "File watcher started - will auto-reload on .clj changes"))
(defn unwatch
"Stop the file watcher."
[]
(when @watcher
(hawk/stop! @watcher)
(reset! watcher nil)
(log/info "File watcher stopped")))
(defn go
"Start the app and enable auto-reload on file changes."
[]
(start)
(watch)
:ready)
+5
View File
@@ -0,0 +1,5 @@
# My Haiku
Code finds its truth
In tests that catch the bugs
Software sleeps sound
+20 -2
View File
@@ -1,15 +1,33 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- Console appender - INFO and above only -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<logger name="spiceflow" level="INFO"/>
<!-- File appender - DEBUG and above (captures all agent/user activity) -->
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>logs/spiceflow.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>logs/spiceflow.%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>7</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- Spiceflow at DEBUG to capture all agent responses and user actions -->
<logger name="spiceflow" level="DEBUG"/>
<logger name="org.eclipse.jetty" level="WARN"/>
<root level="INFO">
<root level="DEBUG">
<appender-ref ref="STDOUT"/>
<appender-ref ref="FILE"/>
</root>
</configuration>
+141 -41
View File
@@ -1,5 +1,8 @@
(ns spiceflow.adapters.opencode
"Adapter for OpenCode CLI"
"Adapter for OpenCode CLI.
OpenCode works differently from Claude - it uses a single-shot execution model
where each message spawns a new `opencode run` process with the message as an argument."
(:require [spiceflow.adapters.protocol :as proto]
[clojure.java.io :as io]
[clojure.java.shell :as shell]
@@ -60,26 +63,64 @@
(log/warn "Failed to parse session export:" (.getMessage e))
nil)))
(defn- parse-stream-output
"Parse a line of streaming output from OpenCode"
(defn- parse-json-event
"Parse a JSON event from OpenCode's --format json output"
[line]
(try
(when (and line (not (str/blank? line)))
(if (str/starts-with? line "{")
(let [data (json/read-value line mapper)]
(cond
(:content data) {:event :content-delta
:text (:content data)}
(:done data) {:event :message-stop}
(:error data) {:event :error
:message (:error data)}
:else {:raw data}))
;; Plain text output
{:event :content-delta
:text line}))
(let [data (json/read-value line mapper)]
(log/debug "OpenCode JSON event:" (:type data))
(case (:type data)
;; Step start - beginning of a new step
"step_start" {:event :init
:session-id (:sessionID data)
:cwd (get-in data [:part :cwd])}
;; Text content - the actual response text
"text" {:event :content-delta
:text (get-in data [:part :text])}
;; Tool use events
"tool_start" {:event :tool-start
:tool (get-in data [:part :tool])
:input (get-in data [:part :input])}
"tool_finish" {:event :tool-result
:tool (get-in data [:part :tool])
:result (get-in data [:part :result])}
;; Step finish - end of current step
"step_finish" {:event :result
:session-id (:sessionID data)
:cost (get-in data [:part :cost])
:stop-reason (get-in data [:part :reason])}
;; Error events
"error" {:event :error
:message (:message data)}
;; Permission-related events (if OpenCode has them)
"permission_request" {:event :permission-request
:permission-request {:tools [(get-in data [:part :tool])]
:denials [{:tool (get-in data [:part :tool])
:input (get-in data [:part :input])
:description (get-in data [:part :description])}]}}
;; Default: pass through as raw (don't emit content-delta for unknown types)
(do
(log/debug "Unknown OpenCode event type:" (:type data))
{:raw data}))))
(catch Exception e
(log/debug "Failed to parse line:" line)
{:event :content-delta :text line})))
(log/debug "Failed to parse JSON line:" line (.getMessage e))
;; If it's not JSON, treat as plain text
(when (and line (not (str/blank? line)))
{:event :content-delta :text (str line "\n")}))))
(defn- parse-default-output
"Parse default (non-JSON) output from OpenCode"
[line]
(when (and line (not (str/blank? line)))
{:event :content-delta
:text (str line "\n")}))
(defrecord OpenCodeAdapter [command]
proto/AgentAdapter
@@ -92,44 +133,103 @@
[]))
(spawn-session [_ session-id opts]
(let [{:keys [working-dir]} opts
args [command "run" "--session" session-id]
pb (ProcessBuilder. args)]
(when working-dir
(.directory pb (io/file working-dir)))
(.redirectErrorStream pb false)
(let [process (.start pb)]
{:process process
:stdin (BufferedWriter. (OutputStreamWriter. (.getOutputStream process) StandardCharsets/UTF_8))
:stdout (BufferedReader. (InputStreamReader. (.getInputStream process) StandardCharsets/UTF_8))
:stderr (BufferedReader. (InputStreamReader. (.getErrorStream process) StandardCharsets/UTF_8))})))
;; For OpenCode, we don't start a process here - we just create a handle
;; that stores the configuration. The actual process is started when send-message is called.
(let [{:keys [working-dir allowed-tools]} opts]
{:session-id session-id
:working-dir working-dir
:allowed-tools allowed-tools
:command command
;; These will be populated when send-message is called
:process nil
:stdout nil
:stderr nil}))
(send-message [_ {:keys [stdin]} message]
(send-message [_ handle message]
(try
(.write stdin message)
(.newLine stdin)
(.flush stdin)
true
(let [{:keys [session-id working-dir command allowed-tools]} handle
;; Build command args
;; Use --format json for structured output
;; Build the opencode command string
opencode-cmd (str command " run --format json"
(when session-id (str " --session " session-id))
" " (pr-str message))
;; Wrap with script -qc to create a pseudo-terminal
;; This forces Go to flush stdout properly (Go binaries ignore stdbuf)
args ["script" "-qc" opencode-cmd "/dev/null"]
_ (log/info "Starting OpenCode with args:" args)
pb (ProcessBuilder. (vec args))]
;; Set working directory
(when working-dir
(.directory pb (io/file working-dir)))
;; Don't merge stderr into stdout - keep them separate
(.redirectErrorStream pb false)
;; Start the process
(let [process (.start pb)
stdout (BufferedReader. (InputStreamReader. (.getInputStream process) StandardCharsets/UTF_8))
stderr (BufferedReader. (InputStreamReader. (.getErrorStream process) StandardCharsets/UTF_8))]
;; Update the handle with the running process
;; Note: We're mutating the handle here by storing process info
;; The caller should use the returned handle
(assoc handle
:process process
:stdout stdout
:stderr stderr)))
(catch Exception e
(log/error "Failed to send message:" (.getMessage e))
false)))
(log/error "Failed to start OpenCode process:" (.getMessage e))
nil)))
(read-stream [this {:keys [stdout]} callback]
(read-stream [this {:keys [stdout stderr process]} callback]
(log/info "read-stream starting, stdout:" (boolean stdout) "process:" (boolean process) "process-alive:" (when process (.isAlive process)))
(try
;; Start a thread to log stderr
(when stderr
(future
(try
(loop []
(when-let [line (.readLine stderr)]
(log/info "[OpenCode stderr]" line)
(recur)))
(catch Exception e
(log/info "Stderr stream ended:" (.getMessage e))))))
;; Read stdout for JSON events
(log/info "Starting stdout read loop")
(loop []
(log/debug "Waiting for line from stdout...")
(when-let [line (.readLine stdout)]
(when-let [parsed (proto/parse-output this line)]
(callback parsed))
(recur)))
(log/info "[OpenCode stdout]" line)
(let [parsed (proto/parse-output this line)]
(when parsed
(log/info "Parsed event:" (:event parsed))
(callback parsed))
;; Continue reading unless we hit a terminal event
;; Note: step_finish with reason "tool-calls" is NOT terminal - OpenCode
;; continues after running tools. Only stop on :error or :result with
;; a non-tool-calls reason.
(if (or (= :error (:event parsed))
(and (= :result (:event parsed))
(not= "tool-calls" (:stop-reason parsed))))
(log/info "Received terminal event, stopping stream read. stop-reason:" (:stop-reason parsed))
(recur)))))
(log/info "stdout read loop ended (nil line)")
;; Wait for process to complete
(when process
(log/info "Waiting for process to complete")
(.waitFor process)
(log/info "Process completed with exit code:" (.exitValue process)))
(catch Exception e
(log/debug "Stream ended:" (.getMessage e)))))
(log/error "Stream error:" (.getMessage e) (class e)))))
(kill-process [_ {:keys [process]}]
(when process
(.destroyForcibly process)))
(parse-output [_ line]
(parse-stream-output line)))
(parse-json-event line)))
(defn create-adapter
"Create an OpenCode adapter"
+6 -1
View File
@@ -41,6 +41,7 @@
[store]
(fn [request]
(let [body (:body request)]
(log/debug "API request: create-session" {:body body})
(if (db/valid-session? body)
(let [session (db/save-session store body)]
(-> (json-response session)
@@ -51,6 +52,7 @@
[store]
(fn [request]
(let [id (get-in request [:path-params :id])]
(log/debug "API request: delete-session" {:session-id id})
(if (db/get-session store id)
(do
(manager/stop-session store id)
@@ -63,8 +65,9 @@
(fn [request]
(let [id (get-in request [:path-params :id])
body (:body request)]
(log/debug "API request: update-session" {:session-id id :body body})
(if (db/get-session store id)
(let [updated (db/update-session store id (select-keys body [:title]))]
(let [updated (db/update-session store id (select-keys body [:title :auto-accept-edits]))]
(json-response updated))
(error-response 404 "Session not found")))))
@@ -73,6 +76,7 @@
(fn [request]
(let [id (get-in request [:path-params :id])
message (get-in request [:body :message])]
(log/debug "API request: send-message" {:session-id id :message message})
(if-let [session (db/get-session store id)]
(try
;; Send message and start streaming in a separate thread
@@ -101,6 +105,7 @@
(let [id (get-in request [:path-params :id])
{:keys [response message]} (:body request)
response-type (keyword response)]
(log/debug "API request: permission-response" {:session-id id :response response :message message})
(if-let [session (db/get-session store id)]
(if (#{:accept :deny :steer} response-type)
(try
+18 -2
View File
@@ -8,6 +8,14 @@
(def ^:private mapper (json/object-mapper {:encode-key-fn name
:decode-key-fn keyword}))
;; Function to get pending permission for a session (set by core.clj)
(defonce ^:private pending-permission-fn (atom nil))
(defn set-pending-permission-fn!
"Set the function to retrieve pending permissions for a session"
[f]
(reset! pending-permission-fn f))
;; Connected WebSocket sessions: session-id -> #{sockets}
(defonce ^:private connections (ConcurrentHashMap.))
@@ -27,7 +35,8 @@
(defn broadcast-to-session
"Broadcast an event to all WebSocket connections subscribed to a session"
[session-id event]
(log/debug "Broadcasting to session:" session-id "event:" event)
(log/debug "Broadcasting to session:" session-id "event-type:" (:event event))
(log/debug "Broadcast full event data:" (pr-str event))
(when-let [subscribers (.get connections session-id)]
(let [message (assoc event :session-id session-id)]
(doseq [socket subscribers]
@@ -47,7 +56,14 @@
(let [new-set (ConcurrentHashMap/newKeySet)]
(.putIfAbsent connections session-id new-set)
(or (.get connections session-id) new-set)))]
(.add subscribers socket)))
(.add subscribers socket)
;; Send any pending permission request immediately after subscribing
(when-let [get-pending @pending-permission-fn]
(when-let [pending (get-pending session-id)]
(log/debug "Sending pending permission to newly subscribed client:" pending)
(send-to-ws socket {:event :permission-request
:permission-request pending
:session-id session-id})))))
(defn- unsubscribe-from-session
"Unsubscribe a WebSocket socket from a session"
+2
View File
@@ -25,6 +25,8 @@
(defstate server
:start (let [port (get-in config/config [:server :port] 3000)
host (get-in config/config [:server :host] "0.0.0.0")
;; Wire up pending permission function for WebSocket (partially apply store)
_ (ws/set-pending-permission-fn! (partial manager/get-pending-permission store))
api-app (routes/create-app store ws/broadcast-to-session)
;; Wrap the app to handle WebSocket upgrades on /api/ws
app (fn [request]
+4
View File
@@ -65,6 +65,10 @@
new-message))
(get-message [_ id]
(get @messages id))
(update-message [_ id data]
(swap! messages update id merge data)
(get @messages id)))
(defn create-store
+3 -1
View File
@@ -21,7 +21,9 @@
(save-message [this message]
"Save a new message. Returns the saved message with ID.")
(get-message [this id]
"Get a single message by ID"))
"Get a single message by ID")
(update-message [this id data]
"Update message fields. Returns updated message."))
(defn valid-session?
"Validate session data has required fields"
+120 -23
View File
@@ -16,6 +16,17 @@
(defn- now-iso []
(.toString (Instant/now)))
;; Status ID constants for foreign key references
(def status-ids
{:idle 1
:processing 2
:awaiting-permission 3})
(def status-names
{1 :idle
2 :processing
3 :awaiting-permission})
(defn- row->session
"Convert a database row to a session map"
[row]
@@ -43,14 +54,24 @@
(defn- session->row
"Convert a session map to database columns"
[{:keys [id provider external-id title working-dir status]}]
[{:keys [id provider external-id title working-dir spawn-dir status pending-permission auto-accept-edits]}]
(cond-> {}
id (assoc :id id)
provider (assoc :provider (name provider))
external-id (assoc :external_id external-id)
title (assoc :title title)
working-dir (assoc :working_dir working-dir)
status (assoc :status (name status))))
spawn-dir (assoc :spawn_dir spawn-dir)
status (assoc :status_id (get status-ids (keyword status) 1))
;; Handle pending-permission: can be a map (to serialize) or :clear (to set null)
(some? pending-permission)
(assoc :pending_permission
(if (= :clear pending-permission)
nil
(json/write-value-as-string pending-permission)))
;; Handle auto-accept-edits as integer (0 or 1)
(some? auto-accept-edits)
(assoc :auto_accept_edits (if auto-accept-edits 1 0))))
(defn- message->row
"Convert a message map to database columns"
@@ -70,14 +91,18 @@
["SELECT * FROM sessions ORDER BY updated_at DESC"]
{:builder-fn rs/as-unqualified-kebab-maps})]
(mapv (fn [row]
{:id (:id row)
:provider (keyword (:provider row))
:external-id (:external-id row)
:title (:title row)
:working-dir (:working-dir row)
:status (keyword (or (:status row) "idle"))
:created-at (:created-at row)
:updated-at (:updated-at row)})
(cond-> {:id (:id row)
:provider (keyword (:provider row))
:external-id (:external-id row)
:title (:title row)
:working-dir (:working-dir row)
:spawn-dir (:spawn-dir row)
:status (get status-names (:status-id row) :idle)
:auto-accept-edits (= 1 (:auto-accept-edits row))
:created-at (:created-at row)
:updated-at (:updated-at row)}
(:pending-permission row)
(assoc :pending-permission (json/read-value (:pending-permission row) mapper))))
rows)))
(get-session [_ id]
@@ -85,14 +110,18 @@
["SELECT * FROM sessions WHERE id = ?" id]
{:builder-fn rs/as-unqualified-kebab-maps})]
(when row
{:id (:id row)
:provider (keyword (:provider row))
:external-id (:external-id row)
:title (:title row)
:working-dir (:working-dir row)
:status (keyword (or (:status row) "idle"))
:created-at (:created-at row)
:updated-at (:updated-at row)})))
(cond-> {:id (:id row)
:provider (keyword (:provider row))
:external-id (:external-id row)
:title (:title row)
:working-dir (:working-dir row)
:spawn-dir (:spawn-dir row)
:status (get status-names (:status-id row) :idle)
:auto-accept-edits (= 1 (:auto-accept-edits row))
:created-at (:created-at row)
:updated-at (:updated-at row)}
(:pending-permission row)
(assoc :pending-permission (json/read-value (:pending-permission row) mapper))))))
(save-session [this session]
(let [id (or (:id session) (generate-id))
@@ -154,17 +183,28 @@
:content (:content row)
:metadata (when-let [m (:metadata row)]
(json/read-value m mapper))
:created-at (:created-at row)}))))
:created-at (:created-at row)})))
(update-message [this id data]
(let [row (message->row data)]
(sql/update! datasource :messages row {:id id})
(proto/get-message this id))))
(def schema
"SQLite schema for spiceflow"
["CREATE TABLE IF NOT EXISTS sessions (
["CREATE TABLE IF NOT EXISTS session_statuses (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL UNIQUE
)"
"CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY,
provider TEXT NOT NULL,
external_id TEXT,
title TEXT,
working_dir TEXT,
status TEXT DEFAULT 'idle',
spawn_dir TEXT,
status_id INTEGER DEFAULT 1 REFERENCES session_statuses(id),
pending_permission TEXT,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now'))
)"
@@ -178,13 +218,70 @@
)"
"CREATE INDEX IF NOT EXISTS idx_messages_session_id ON messages(session_id)"
"CREATE INDEX IF NOT EXISTS idx_sessions_provider ON sessions(provider)"
"CREATE INDEX IF NOT EXISTS idx_sessions_external_id ON sessions(external_id)"])
"CREATE INDEX IF NOT EXISTS idx_sessions_external_id ON sessions(external_id)"
"CREATE INDEX IF NOT EXISTS idx_sessions_status_id ON sessions(status_id)"])
(def migrations
"Database migrations for existing databases"
[;; Add pending_permission column if it doesn't exist
"ALTER TABLE sessions ADD COLUMN pending_permission TEXT"
;; Add status_id column if it doesn't exist (for migration from TEXT status)
"ALTER TABLE sessions ADD COLUMN status_id INTEGER DEFAULT 1 REFERENCES session_statuses(id)"
;; Add spawn_dir column - the original directory used when spawning Claude
;; This is separate from working_dir which tracks current directory
"ALTER TABLE sessions ADD COLUMN spawn_dir TEXT"
;; Add auto_accept_edits column - when enabled, pre-grants Write/Edit tool permissions
"ALTER TABLE sessions ADD COLUMN auto_accept_edits INTEGER DEFAULT 0"])
(defn- seed-statuses!
"Seed the session_statuses table with initial values"
[datasource]
(doseq [[status-name status-id] status-ids]
(try
(jdbc/execute! datasource
["INSERT OR IGNORE INTO session_statuses (id, name) VALUES (?, ?)"
status-id (name status-name)])
(catch Exception _
;; Ignore - already exists
nil))))
(defn- migrate-status-column!
"Migrate existing TEXT status values to status_id foreign key"
[datasource]
;; Update status_id based on existing status TEXT column
(try
(jdbc/execute! datasource
["UPDATE sessions SET status_id = CASE status
WHEN 'idle' THEN 1
WHEN 'running' THEN 2
WHEN 'processing' THEN 2
WHEN 'awaiting-permission' THEN 3
WHEN 'awaiting_permission' THEN 3
ELSE 1
END
WHERE status_id IS NULL OR status IS NOT NULL"])
(catch Exception _
;; Ignore - status column may not exist or already migrated
nil)))
(defn- run-migrations!
"Run migrations, ignoring errors for already-applied migrations"
[datasource]
(doseq [stmt migrations]
(try
(jdbc/execute! datasource [stmt])
(catch Exception _
;; Ignore - migration likely already applied
nil))))
(defn init-schema!
"Initialize database schema"
[datasource]
(doseq [stmt schema]
(jdbc/execute! datasource [stmt])))
(jdbc/execute! datasource [stmt]))
(seed-statuses! datasource)
(run-migrations! datasource)
(migrate-status-column! datasource))
(defn create-store
"Create a SQLite store with the given database path"
+155 -39
View File
@@ -10,9 +10,6 @@
;; Active process handles for running sessions
(defonce ^:private active-processes (ConcurrentHashMap.))
;; Pending permission requests: session-id -> {:tools [...] :denials [...]}
(defonce ^:private pending-permissions (ConcurrentHashMap.))
(defn get-adapter
"Get the appropriate adapter for a provider"
[provider]
@@ -35,22 +32,33 @@
(defn start-session
"Start a CLI process for a session"
[store session-id]
(log/debug "User action: start-session" {:session-id session-id})
(let [session (db/get-session store session-id)]
(when-not session
(throw (ex-info "Session not found" {:session-id session-id})))
(when (session-running? session-id)
(throw (ex-info "Session already running" {:session-id session-id})))
;; Use spawn-dir for spawning (this is the original directory the session was created in)
;; Fall back to working-dir for existing sessions that don't have spawn-dir yet
(let [adapter (get-adapter (:provider session))
spawn-dir (or (:spawn-dir session) (:working-dir session))
;; Pre-grant Write/Edit tools if auto-accept-edits is enabled
allowed-tools (when (:auto-accept-edits session)
["Write" "Edit"])
_ (log/debug "Starting session with spawn-dir:" spawn-dir "external-id:" (:external-id session) "allowed-tools:" allowed-tools)
handle (adapter/spawn-session adapter
(:external-id session)
{:working-dir (:working-dir session)})]
(cond-> {:working-dir spawn-dir}
(seq allowed-tools)
(assoc :allowed-tools allowed-tools)))]
(.put active-processes session-id handle)
(db/update-session store session-id {:status :running})
(db/update-session store session-id {:status :processing})
handle)))
(defn stop-session
"Stop a running CLI process for a session"
[store session-id]
(log/debug "User action: stop-session" {:session-id session-id})
(when-let [handle (.remove active-processes session-id)]
(let [session (db/get-session store session-id)
adapter (get-adapter (:provider session))]
@@ -60,6 +68,7 @@
(defn send-message-to-session
"Send a message to a running session"
[store session-id message]
(log/debug "User action: send-message" {:session-id session-id :message message})
(let [session (db/get-session store session-id)
_ (when-not session
(throw (ex-info "Session not found" {:session-id session-id})))
@@ -71,40 +80,88 @@
(db/save-message store {:session-id session-id
:role :user
:content message})
;; Send to CLI
(adapter/send-message adapter handle message)))
;; Send to CLI - for OpenCode, this returns an updated handle with the process
(let [result (adapter/send-message adapter handle message)]
(log/info "send-message result type:" (type result) "has-process:" (boolean (:process result)))
;; If result is a map with :process, it's an updated handle (OpenCode)
;; Store it so stream-session-response can use it
(when (and (map? result) (:process result))
(log/info "Storing updated handle with process for session:" session-id)
(.put active-processes session-id result))
result)))
;; Permission handling - defined before stream-session-response which uses them
;; Permission handling - persisted to database
(defn set-pending-permission
"Store a pending permission request for a session"
[session-id permission-request]
(.put pending-permissions session-id permission-request))
"Store a pending permission request for a session in the database"
[store session-id permission-request]
(db/update-session store session-id {:pending-permission permission-request}))
(defn get-pending-permission
"Get pending permission request for a session"
[session-id]
(.get pending-permissions session-id))
"Get pending permission request for a session from the database"
[store session-id]
(:pending-permission (db/get-session store session-id)))
(defn clear-pending-permission
"Clear pending permission for a session"
[session-id]
(.remove pending-permissions session-id))
"Clear pending permission for a session in the database"
[store session-id]
(db/update-session store session-id {:pending-permission :clear}))
(defn- extract-path-from-string
"Extract a Unix path from a string, looking for patterns like /home/user/dir"
[s]
(when (string? s)
;; Look for absolute paths - match /something/something pattern
;; Paths typically don't contain spaces, newlines, or common sentence punctuation
(let [path-pattern #"(/(?:home|root|usr|var|tmp|opt|etc|mnt|media|Users)[^\s\n\r\"'<>|:;,!?\[\]{}()]*)"
;; Also match generic absolute paths that look like directories
generic-pattern #"^(/[^\s\n\r\"'<>|:;,!?\[\]{}()]+)$"]
(or
;; First try to find a path that looks like a home/project directory
(when-let [match (re-find path-pattern s)]
(if (vector? match) (first match) match))
;; If the entire trimmed string is a single absolute path, use it
(when-let [match (re-find generic-pattern (clojure.string/trim s))]
(if (vector? match) (second match) match))))))
(defn- extract-working-dir-from-tool-result
"Extract working directory from tool result if it looks like a path.
Tool results from pwd, cd, or bash commands may contain directory paths."
[content]
(when (and (vector? content) (seq content))
;; Look through tool results for path-like content
(->> content
(filter #(= "tool_result" (:type %)))
(map :content)
(filter string?)
;; Check each line of the output for a path
(mapcat clojure.string/split-lines)
(map clojure.string/trim)
(keep extract-path-from-string)
;; Prefer longer paths (more specific), take the last one found
(sort-by count)
last)))
(defn stream-session-response
"Stream response from a running session, calling callback for each event"
[store session-id callback]
(log/info "stream-session-response starting for session:" session-id)
(let [session (db/get-session store session-id)
_ (when-not session
(throw (ex-info "Session not found" {:session-id session-id})))
handle (get-active-process session-id)
_ (log/info "Got handle for session:" session-id "has-process:" (boolean (:process handle)) "has-stdout:" (boolean (:stdout handle)))
_ (when-not handle
(throw (ex-info "Session not running" {:session-id session-id})))
adapter (get-adapter (:provider session))
content-buffer (StringBuilder.)]
content-buffer (StringBuilder.)
last-working-dir (atom nil)]
;; Read stream and accumulate content
(log/info "Starting to read stream for session:" session-id)
(adapter/read-stream adapter handle
(fn [event]
(log/info "Received event:" (:event event) "text:" (when (:text event) (subs (str (:text event)) 0 (min 50 (count (str (:text event)))))))
(log/debug "Agent response full event:" (pr-str event))
(callback event)
;; Accumulate text content
(when-let [text (:text event)]
@@ -115,12 +172,24 @@
(not (:external-id session)))
(log/debug "Capturing external session-id:" (:session-id event))
(db/update-session store session-id {:external-id (:session-id event)}))
;; Also capture working directory from cwd
(when (and (:cwd event)
(or (nil? (:working-dir session))
(empty? (:working-dir session))))
(log/debug "Capturing working directory:" (:cwd event))
(db/update-session store session-id {:working-dir (:cwd event)})))
;; Capture spawn-dir from init cwd (only for new sessions that don't have it)
;; This is the directory where Claude's project is, used for resuming
(when (and (:cwd event) (not (:spawn-dir session)))
(log/debug "Capturing spawn-dir from init:" (:cwd event))
(db/update-session store session-id {:spawn-dir (:cwd event)}))
;; Also set working directory from init cwd
(when (:cwd event)
(log/debug "Capturing working directory from init:" (:cwd event))
(reset! last-working-dir (:cwd event))))
;; Track working directory from tool results (e.g., after cd && pwd)
(when (and (= :user (:role event))
(vector? (:content event)))
(when-let [dir (extract-working-dir-from-tool-result (:content event))]
(log/info "Detected working directory from tool result:" dir)
(reset! last-working-dir dir)
;; Emit working-dir-update event so UI can update in real-time
(callback {:event :working-dir-update
:working-dir dir})))
;; Also capture external-id from result event if not already set
(when (and (= :result (:event event))
(:session-id event)
@@ -129,21 +198,52 @@
(db/update-session store session-id {:external-id (:session-id event)}))
;; On result event, check for permission requests
(when (= :result (:event event))
(let [content (.toString content-buffer)]
;; Save accumulated message if any
;; Use accumulated content, or fall back to result content
;; (resumed sessions may not stream content-delta events)
(let [accumulated (.toString content-buffer)
result-content (:content event)
content (if (seq accumulated)
accumulated
result-content)]
;; If no streaming content but result has content, emit it for the client
(when (and (empty? accumulated) (seq result-content))
(callback {:event :content-delta :text result-content}))
;; Save message if any content
(when (seq content)
(db/save-message store {:session-id session-id
:role :assistant
:content content}))
;; If there's a permission request, store it and emit event
;; If there's a permission request, save it as a message and emit event
(when-let [perm-req (:permission-request event)]
(log/info "Permission request detected:" perm-req)
(set-pending-permission session-id perm-req)
(callback {:event :permission-request
:permission-request perm-req}))))))
;; Build description for the permission message content
(let [description (->> (:denials perm-req)
(map (fn [{:keys [tool description]}]
(str tool ": " description)))
(clojure.string/join "\n"))
;; Save permission request as a system message
perm-msg (db/save-message store
{:session-id session-id
:role :system
:content description
:metadata {:type "permission-request"
:denials (:denials perm-req)
:tools (:tools perm-req)}})
msg-id (:id perm-msg)]
;; Store pending permission with message ID for later update
(set-pending-permission store session-id
(assoc perm-req :message-id msg-id))
(callback {:event :permission-request
:permission-request perm-req
:message-id msg-id
:message perm-msg})))))))
;; Update session with last known working directory
(when @last-working-dir
(log/info "Updating session working directory to:" @last-working-dir)
(db/update-session store session-id {:working-dir @last-working-dir}))
;; Update session status when stream ends
;; If there's a pending permission, set status to awaiting-permission
(let [new-status (if (get-pending-permission session-id)
(let [new-status (if (get-pending-permission store session-id)
:awaiting-permission
:idle)]
(db/update-session store session-id {:status new-status}))
@@ -163,18 +263,27 @@
response-type: :accept, :deny, or :steer
message: optional message for :deny or :steer responses"
[store session-id response-type message]
(log/debug "User action: permission-response" {:session-id session-id :response-type response-type :message message})
(let [session (db/get-session store session-id)
_ (when-not session
(throw (ex-info "Session not found" {:session-id session-id})))
pending (get-pending-permission session-id)
pending (get-pending-permission store session-id)
_ (when-not pending
(throw (ex-info "No pending permission request" {:session-id session-id})))
adapter (get-adapter (:provider session))
;; Build spawn options based on response type
opts (cond-> {:working-dir (:working-dir session)}
;; For :accept, grant the requested tools
(= response-type :accept)
(assoc :allowed-tools (:tools pending)))
;; Use spawn-dir for spawning, fall back to working-dir for existing sessions
spawn-dir (or (:spawn-dir session) (:working-dir session))
;; Auto-accept tools from session setting (always included if enabled)
auto-accept-tools (when (:auto-accept-edits session)
["Write" "Edit"])
;; Tools granted from accepting the permission request
granted-tools (when (= response-type :accept) (:tools pending))
;; Combine both sets of allowed tools
all-allowed-tools (seq (distinct (concat auto-accept-tools granted-tools)))
;; Build spawn options
opts (cond-> {:working-dir spawn-dir}
all-allowed-tools
(assoc :allowed-tools (vec all-allowed-tools)))
;; Determine the message to send
send-msg (case response-type
:accept "continue"
@@ -184,11 +293,18 @@
handle (adapter/spawn-session adapter
(:external-id session)
opts)]
;; Update the permission message with the resolution status
(when-let [msg-id (:message-id pending)]
(let [current-msg (db/get-message store msg-id)
current-metadata (or (:metadata current-msg) {})
status (name response-type)]
(db/update-message store msg-id
{:metadata (assoc current-metadata :status status)})))
;; Clear pending permission
(clear-pending-permission session-id)
(clear-pending-permission store session-id)
;; Store new process handle
(.put active-processes session-id handle)
(db/update-session store session-id {:status :running})
(db/update-session store session-id {:status :processing})
;; Send the response message
(adapter/send-message adapter handle send-msg)
handle))
+1
View File
@@ -0,0 +1 @@
Hello from OpenCode test