init commit

This commit is contained in:
2026-01-18 22:07:48 -05:00
parent 9c019e3d41
commit 56dde9cf91
43 changed files with 2925 additions and 0 deletions
+36
View File
@@ -0,0 +1,36 @@
{:paths ["src" "resources"]
:deps {org.clojure/clojure {:mvn/version "1.11.1"}
;; Web server
ring/ring-core {:mvn/version "1.10.0"}
ring/ring-jetty-adapter {:mvn/version "1.10.0"}
ring/ring-json {:mvn/version "0.5.1"}
ring-cors/ring-cors {:mvn/version "0.1.13"}
;; Routing
metosin/reitit {:mvn/version "0.7.0-alpha7"}
;; WebSocket
info.sunng/ring-jetty9-adapter {:mvn/version "0.30.0"}
;; Database
com.github.seancorfield/next.jdbc {:mvn/version "1.3.894"}
org.xerial/sqlite-jdbc {:mvn/version "3.44.1.0"}
;; JSON
metosin/jsonista {:mvn/version "0.3.8"}
;; Utilities
aero/aero {:mvn/version "1.1.6"}
mount/mount {:mvn/version "0.1.18"}
org.clojure/tools.logging {:mvn/version "1.2.4"}
ch.qos.logback/logback-classic {:mvn/version "1.4.11"}}
:aliases
{:run {:main-opts ["-m" "spiceflow.core"]}
:test {:extra-paths ["test"]
:extra-deps {lambdaisland/kaocha {:mvn/version "1.87.1366"}}
:main-opts ["-m" "kaocha.runner"]}
:repl {:main-opts ["-m" "nrepl.cmdline" "-i"]
:extra-deps {nrepl/nrepl {:mvn/version "1.1.0"}
cider/cider-nrepl {:mvn/version "0.44.0"}}}}}
+10
View File
@@ -0,0 +1,10 @@
{:server {:port #long #or [#env SPICEFLOW_PORT 3000]
:host #or [#env SPICEFLOW_HOST "0.0.0.0"]}
:database {:type :sqlite
:dbname #or [#env SPICEFLOW_DB "spiceflow.db"]}
:claude {:sessions-dir #or [#env CLAUDE_SESSIONS_DIR
#join [#env HOME "/.claude/projects"]]}
:opencode {:command #or [#env OPENCODE_CMD "opencode"]}}
+15
View File
@@ -0,0 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<logger name="spiceflow" level="INFO"/>
<logger name="org.eclipse.jetty" level="WARN"/>
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
+171
View File
@@ -0,0 +1,171 @@
(ns spiceflow.adapters.claude
"Adapter for Claude Code CLI"
(:require [spiceflow.adapters.protocol :as proto]
[clojure.java.io :as io]
[clojure.string :as str]
[jsonista.core :as json]
[clojure.tools.logging :as log])
(:import [java.io BufferedReader InputStreamReader BufferedWriter OutputStreamWriter]
[java.nio.file Files Paths]
[java.net URLDecoder]
[java.nio.charset StandardCharsets]))
(def ^:private mapper (json/object-mapper {:decode-key-fn keyword}))
(defn- decode-path
"Decode a URL-encoded path segment"
[encoded]
(-> encoded
(str/replace "%" "%25") ; Handle already-encoded % signs
(str/replace "%25" "%")
(URLDecoder/decode "UTF-8")))
(defn- encoded-path->real-path
"Convert Claude's encoded path format back to real path.
e.g., '-home-user-project' -> '/home/user/project'"
[encoded]
(-> encoded
(str/replace #"^-" "/")
(str/replace "-" "/")))
(defn- discover-project-sessions
"Discover sessions from a single project directory"
[project-dir]
(let [project-path (encoded-path->real-path (.getName project-dir))]
(->> (.listFiles project-dir)
(filter #(str/ends-with? (.getName %) ".jsonl"))
(map (fn [session-file]
(let [session-id (str/replace (.getName session-file) ".jsonl" "")]
{:external-id session-id
:provider :claude
:working-dir project-path
:title (str "Session " (subs session-id 0 8) "...")
:file-path (.getAbsolutePath session-file)}))))))
(defn- parse-jsonl-message
"Parse a JSONL message from Claude Code"
[line]
(try
(let [data (json/read-value line mapper)]
(case (:type data)
"user" {:role :user
:content (get-in data [:message :content])
:timestamp (:timestamp data)
:uuid (:uuid data)}
"assistant" {:role :assistant
:content (get-in data [:message :content])
:timestamp (:timestamp data)
:uuid (:uuid data)
:metadata {:model (get-in data [:message :model])
:stop-reason (get-in data [:message :stopReason])}}
;; Stream events from --output-format stream-json
"content_block_start" {:event :content-start
:index (:index data)
:content-type (get-in data [:content_block :type])}
"content_block_delta" {:event :content-delta
:index (:index data)
:text (get-in data [:delta :text])}
"content_block_stop" {:event :content-stop
:index (:index data)}
"message_start" {:event :message-start
:message-id (get-in data [:message :id])}
"message_delta" {:event :message-delta
:stop-reason (get-in data [:delta :stop_reason])}
"message_stop" {:event :message-stop}
"result" {:event :result
:content (get-in data [:result :assistant :content])
:cost (:cost data)
:session-id (:session_id data)}
;; Unknown type
{:raw data}))
(catch Exception e
(log/debug "Failed to parse JSONL line:" line (.getMessage e))
nil)))
(defn- read-session-messages
"Read messages from a session JSONL file"
[file-path]
(try
(with-open [reader (io/reader file-path)]
(->> (line-seq reader)
(keep parse-jsonl-message)
(filter #(contains? #{:user :assistant} (:role %)))
vec))
(catch Exception e
(log/warn "Failed to read session file:" file-path (.getMessage e))
[])))
(defrecord ClaudeAdapter [sessions-dir]
proto/AgentAdapter
(provider-name [_] :claude)
(discover-sessions [_]
(let [base-dir (io/file sessions-dir)]
(if (.exists base-dir)
(->> (.listFiles base-dir)
(filter #(.isDirectory %))
(mapcat discover-project-sessions)
vec)
[])))
(spawn-session [_ session-id opts]
(let [{:keys [working-dir model permission-mode]} opts
args (cond-> ["claude"
"--resume" session-id
"--output-format" "stream-json"
"--input-format" "stream-json"
"--print"]
model (conj "--model" model)
permission-mode (conj "--permission-mode" permission-mode))
pb (ProcessBuilder. args)]
(when working-dir
(.directory pb (io/file working-dir)))
(.redirectErrorStream pb false)
(let [process (.start pb)]
{:process process
:stdin (BufferedWriter. (OutputStreamWriter. (.getOutputStream process) StandardCharsets/UTF_8))
:stdout (BufferedReader. (InputStreamReader. (.getInputStream process) StandardCharsets/UTF_8))
:stderr (BufferedReader. (InputStreamReader. (.getErrorStream process) StandardCharsets/UTF_8))})))
(send-message [_ {:keys [stdin]} message]
(try
(let [json-msg (json/write-value-as-string {:type "user" :content message})]
(.write stdin json-msg)
(.newLine stdin)
(.flush stdin)
true)
(catch Exception e
(log/error "Failed to send message:" (.getMessage e))
false)))
(read-stream [this {:keys [stdout]} callback]
(try
(loop []
(when-let [line (.readLine stdout)]
(when-let [parsed (proto/parse-output this line)]
(callback parsed))
(recur)))
(catch Exception e
(log/debug "Stream ended:" (.getMessage e)))))
(kill-process [_ {:keys [process]}]
(when process
(.destroyForcibly process)))
(parse-output [_ line]
(when (and line (not (str/blank? line)))
(parse-jsonl-message line))))
(defn create-adapter
"Create a Claude Code adapter"
([]
(create-adapter (str (System/getProperty "user.home") "/.claude/projects")))
([sessions-dir]
(->ClaudeAdapter sessions-dir)))
(defn get-session-messages
"Read messages from a discovered session"
[session]
(when-let [file-path (:file-path session)]
(read-session-messages file-path)))
+145
View File
@@ -0,0 +1,145 @@
(ns spiceflow.adapters.opencode
"Adapter for OpenCode CLI"
(:require [spiceflow.adapters.protocol :as proto]
[clojure.java.io :as io]
[clojure.java.shell :as shell]
[clojure.string :as str]
[jsonista.core :as json]
[clojure.tools.logging :as log])
(:import [java.io BufferedReader InputStreamReader BufferedWriter OutputStreamWriter]
[java.nio.charset StandardCharsets]))
(def ^:private mapper (json/object-mapper {:decode-key-fn keyword}))
(defn- run-command
"Run an opencode command and return parsed output"
[command & args]
(let [result (apply shell/sh command args)]
(when (zero? (:exit result))
(:out result))))
(defn- parse-session-list
"Parse output from 'opencode session list'"
[output]
(try
(let [data (json/read-value output mapper)]
(->> data
(map (fn [session]
{:external-id (or (:id session) (:session_id session))
:provider :opencode
:title (:title session)
:working-dir (:working_dir session)
:created-at (get-in session [:time :created])
:updated-at (get-in session [:time :updated])}))))
(catch Exception e
(log/debug "Failed to parse session list, trying line-by-line")
;; Fallback: parse line by line if it's not JSON
(->> (str/split-lines output)
(filter #(str/starts-with? % "ses_"))
(map (fn [line]
(let [[id title] (str/split line #"\s+" 2)]
{:external-id id
:provider :opencode
:title (or title "Untitled")})))))))
(defn- parse-session-export
"Parse output from 'opencode export <session-id>'"
[output]
(try
(let [data (json/read-value output mapper)]
{:info (:info data)
:messages (->> (:messages data)
(map (fn [msg]
{:role (keyword (get-in msg [:info :role] "assistant"))
:content (->> (:parts msg)
(filter #(= (:type %) "text"))
(map :text)
(str/join "\n"))
:metadata {:parts (:parts msg)}})))})
(catch Exception e
(log/warn "Failed to parse session export:" (.getMessage e))
nil)))
(defn- parse-stream-output
"Parse a line of streaming output from OpenCode"
[line]
(try
(when (and line (not (str/blank? line)))
(if (str/starts-with? line "{")
(let [data (json/read-value line mapper)]
(cond
(:content data) {:event :content-delta
:text (:content data)}
(:done data) {:event :message-stop}
(:error data) {:event :error
:message (:error data)}
:else {:raw data}))
;; Plain text output
{:event :content-delta
:text line}))
(catch Exception e
(log/debug "Failed to parse line:" line)
{:event :content-delta :text line})))
(defrecord OpenCodeAdapter [command]
proto/AgentAdapter
(provider-name [_] :opencode)
(discover-sessions [_]
(if-let [output (run-command command "session" "list" "--json")]
(vec (parse-session-list output))
[]))
(spawn-session [_ session-id opts]
(let [{:keys [working-dir]} opts
args [command "run" "--session" session-id]
pb (ProcessBuilder. args)]
(when working-dir
(.directory pb (io/file working-dir)))
(.redirectErrorStream pb false)
(let [process (.start pb)]
{:process process
:stdin (BufferedWriter. (OutputStreamWriter. (.getOutputStream process) StandardCharsets/UTF_8))
:stdout (BufferedReader. (InputStreamReader. (.getInputStream process) StandardCharsets/UTF_8))
:stderr (BufferedReader. (InputStreamReader. (.getErrorStream process) StandardCharsets/UTF_8))})))
(send-message [_ {:keys [stdin]} message]
(try
(.write stdin message)
(.newLine stdin)
(.flush stdin)
true
(catch Exception e
(log/error "Failed to send message:" (.getMessage e))
false)))
(read-stream [this {:keys [stdout]} callback]
(try
(loop []
(when-let [line (.readLine stdout)]
(when-let [parsed (proto/parse-output this line)]
(callback parsed))
(recur)))
(catch Exception e
(log/debug "Stream ended:" (.getMessage e)))))
(kill-process [_ {:keys [process]}]
(when process
(.destroyForcibly process)))
(parse-output [_ line]
(parse-stream-output line)))
(defn create-adapter
"Create an OpenCode adapter"
([]
(create-adapter "opencode"))
([command]
(->OpenCodeAdapter command)))
(defn export-session
"Export a session's full history"
[adapter session-id]
(when-let [output (run-command (:command adapter) "export" session-id)]
(parse-session-export output)))
@@ -0,0 +1,38 @@
(ns spiceflow.adapters.protocol)
(defprotocol AgentAdapter
"Protocol for interacting with AI coding assistants (Claude Code, OpenCode, etc.)"
(provider-name [this]
"Return the provider name as a keyword (:claude or :opencode)")
(discover-sessions [this]
"Discover existing sessions from the CLI's storage.
Returns a sequence of session maps with :external-id, :title, :working-dir, etc.")
(spawn-session [this session-id opts]
"Spawn a new CLI process for the given session.
opts may include :working-dir, :model, :permission-mode
Returns a process handle map with :process, :stdin, :stdout, :stderr")
(send-message [this process-handle message]
"Send a message to a running CLI process.
Returns true if sent successfully.")
(read-stream [this process-handle callback]
"Read streamed output from the CLI process.
callback is called with each parsed message/event.
Returns when the process completes or stream ends.")
(kill-process [this process-handle]
"Kill a running CLI process.")
(parse-output [this line]
"Parse a line of CLI output into a structured message.
Returns nil if the line should be ignored."))
(defn running?
"Check if a process handle represents a running process"
[{:keys [process]}]
(when process
(.isAlive process)))
+141
View File
@@ -0,0 +1,141 @@
(ns spiceflow.api.routes
"REST API routes"
(:require [reitit.ring :as ring]
[reitit.ring.middleware.parameters :as parameters]
[ring.middleware.json :refer [wrap-json-body wrap-json-response]]
[ring.middleware.cors :refer [wrap-cors]]
[ring.util.response :as response]
[spiceflow.db.protocol :as db]
[spiceflow.session.manager :as manager]
[spiceflow.adapters.claude :as claude]
[clojure.tools.logging :as log]))
(defn- json-response
"Create a JSON response"
[body]
(-> (response/response body)
(response/content-type "application/json")))
(defn- error-response
"Create an error response"
[status message]
(-> (response/response {:error message})
(response/status status)
(response/content-type "application/json")))
;; Session handlers
(defn list-sessions-handler
[store]
(fn [_request]
(json-response (db/get-sessions store))))
(defn get-session-handler
[store]
(fn [request]
(let [id (get-in request [:path-params :id])]
(if-let [session (db/get-session store id)]
(let [messages (db/get-messages store id)]
(json-response (assoc session :messages messages)))
(error-response 404 "Session not found")))))
(defn create-session-handler
[store]
(fn [request]
(let [body (:body request)]
(if (db/valid-session? body)
(let [session (db/save-session store body)]
(-> (json-response session)
(response/status 201)))
(error-response 400 "Invalid session data")))))
(defn delete-session-handler
[store]
(fn [request]
(let [id (get-in request [:path-params :id])]
(if (db/get-session store id)
(do
(manager/stop-session store id)
(db/delete-session store id)
(response/status (response/response nil) 204))
(error-response 404 "Session not found")))))
(defn send-message-handler
[store broadcast-fn]
(fn [request]
(let [id (get-in request [:path-params :id])
message (get-in request [:body :message])]
(if-let [session (db/get-session store id)]
(try
;; Send message and start streaming in a separate thread
(manager/send-message-to-session store id message)
(future
(try
(manager/stream-session-response store id
(fn [event]
(broadcast-fn id event)))
(catch Exception e
(log/error "Streaming error:" (.getMessage e))
(broadcast-fn id {:event :error :message (.getMessage e)}))))
(json-response {:status "sent"})
(catch Exception e
(error-response 500 (.getMessage e))))
(error-response 404 "Session not found")))))
;; Discovery handlers
(defn discover-claude-handler
[_store]
(fn [_request]
(let [adapter (claude/create-adapter)
sessions (manager/discover-all-sessions)]
(json-response (->> sessions
(filter #(= :claude (:provider %)))
vec)))))
(defn discover-opencode-handler
[_store]
(fn [_request]
(let [sessions (manager/discover-all-sessions)]
(json-response (->> sessions
(filter #(= :opencode (:provider %)))
vec)))))
(defn import-session-handler
[store]
(fn [request]
(let [body (:body request)]
(if (:external-id body)
(let [session (manager/import-session store body)]
(-> (json-response session)
(response/status 201)))
(error-response 400 "Missing external-id")))))
;; Health check
(defn health-handler
[_request]
(json-response {:status "ok" :service "spiceflow"}))
(defn create-routes
"Create API routes with the given store and broadcast function"
[store broadcast-fn]
[["/api"
["/health" {:get health-handler}]
["/sessions" {:get (list-sessions-handler store)
:post (create-session-handler store)}]
["/sessions/:id" {:get (get-session-handler store)
:delete (delete-session-handler store)}]
["/sessions/:id/send" {:post (send-message-handler store broadcast-fn)}]
["/discover/claude" {:get (discover-claude-handler store)}]
["/discover/opencode" {:get (discover-opencode-handler store)}]
["/import" {:post (import-session-handler store)}]]])
(defn create-app
"Create the Ring application"
[store broadcast-fn]
(-> (ring/ring-handler
(ring/router (create-routes store broadcast-fn))
(ring/create-default-handler))
(wrap-json-body {:keywords? true})
wrap-json-response
(wrap-cors :access-control-allow-origin [#".*"]
:access-control-allow-methods [:get :post :put :delete :options]
:access-control-allow-headers [:content-type :authorization])))
+108
View File
@@ -0,0 +1,108 @@
(ns spiceflow.api.websocket
"WebSocket handlers for real-time updates"
(:require [jsonista.core :as json]
[clojure.tools.logging :as log])
(:import [org.eclipse.jetty.websocket.api Session WebSocketListener]
[java.util.concurrent ConcurrentHashMap]))
(def ^:private mapper (json/object-mapper {:encode-key-fn name}))
;; Connected WebSocket sessions: session-id -> #{ws-sessions}
(defonce ^:private connections (ConcurrentHashMap.))
;; All connected WebSocket sessions for broadcast
(defonce ^:private all-connections (ConcurrentHashMap/newKeySet))
(defn- send-to-ws
"Send a message to a WebSocket session"
[^Session ws-session message]
(try
(when (.isOpen ws-session)
(let [json-str (json/write-value-as-string message mapper)]
(.sendString (.getRemote ws-session) json-str)))
(catch Exception e
(log/debug "Failed to send to WebSocket:" (.getMessage e)))))
(defn broadcast-to-session
"Broadcast an event to all WebSocket connections subscribed to a session"
[session-id event]
(when-let [subscribers (.get connections session-id)]
(let [message (assoc event :session-id session-id)]
(doseq [ws-session subscribers]
(send-to-ws ws-session message)))))
(defn broadcast-all
"Broadcast an event to all connected WebSocket sessions"
[event]
(doseq [ws-session all-connections]
(send-to-ws ws-session event)))
(defn- subscribe-to-session
"Subscribe a WebSocket session to updates for a session"
[ws-session session-id]
(.compute connections session-id
(fn [_k existing]
(let [subscribers (or existing (ConcurrentHashMap/newKeySet))]
(.add subscribers ws-session)
subscribers))))
(defn- unsubscribe-from-session
"Unsubscribe a WebSocket session from a session"
[ws-session session-id]
(when-let [subscribers (.get connections session-id)]
(.remove subscribers ws-session)
(when (.isEmpty subscribers)
(.remove connections session-id))))
(defn- unsubscribe-from-all
"Unsubscribe a WebSocket session from all sessions"
[ws-session]
(doseq [[session-id _] connections]
(unsubscribe-from-session ws-session session-id)))
(defn- handle-message
"Handle an incoming WebSocket message"
[ws-session message]
(try
(let [data (json/read-value message mapper)]
(case (:type data)
"subscribe" (do
(subscribe-to-session ws-session (:session-id data))
(send-to-ws ws-session {:type "subscribed"
:session-id (:session-id data)}))
"unsubscribe" (do
(unsubscribe-from-session ws-session (:session-id data))
(send-to-ws ws-session {:type "unsubscribed"
:session-id (:session-id data)}))
"ping" (send-to-ws ws-session {:type "pong"})
(log/debug "Unknown WebSocket message type:" (:type data))))
(catch Exception e
(log/warn "Failed to handle WebSocket message:" (.getMessage e)))))
(defn create-ws-listener
"Create a WebSocket listener"
[]
(reify WebSocketListener
(onWebSocketConnect [_ session]
(log/debug "WebSocket connected")
(.add all-connections session)
(send-to-ws session {:type "connected"}))
(onWebSocketText [_ session message]
(handle-message session message))
(onWebSocketBinary [_ _session _payload _offset _len]
(log/debug "Binary WebSocket message ignored"))
(onWebSocketClose [_ session status-code reason]
(log/debug "WebSocket closed:" status-code reason)
(.remove all-connections session)
(unsubscribe-from-all session))
(onWebSocketError [_ _session cause]
(log/warn "WebSocket error:" (.getMessage cause)))))
(defn ws-handler
"Ring handler for WebSocket upgrade"
[_request]
{:ring.websocket/listener (create-ws-listener)})
+28
View File
@@ -0,0 +1,28 @@
(ns spiceflow.config
(:require [aero.core :as aero]
[clojure.java.io :as io]
[mount.core :refer [defstate]]))
(defn load-config
"Load configuration from config.edn or environment"
([]
(load-config :default))
([profile]
(let [config-file (io/resource "config.edn")]
(if config-file
(aero/read-config config-file {:profile profile})
;; Default config if no file exists
{:server {:port 3000
:host "0.0.0.0"}
:database {:type :sqlite
:dbname "spiceflow.db"}
:claude {:sessions-dir (str (System/getProperty "user.home") "/.claude/projects")}
:opencode {:command "opencode"}}))))
(defstate config
:start (load-config (keyword (or (System/getenv "SPICEFLOW_ENV") "default"))))
(defn get-in-config
"Get a nested value from config"
[ks]
(get-in config ks))
+56
View File
@@ -0,0 +1,56 @@
(ns spiceflow.core
"Main entry point for Spiceflow server"
(:require [ring.adapter.jetty9 :as jetty]
[spiceflow.config :as config]
[spiceflow.db.sqlite :as sqlite]
[spiceflow.api.routes :as routes]
[spiceflow.api.websocket :as ws]
[spiceflow.session.manager :as manager]
[mount.core :as mount :refer [defstate]]
[clojure.tools.logging :as log])
(:gen-class))
;; Database store
(defstate store
:start (do
(log/info "Initializing database...")
(let [db-path (get-in config/config [:database :dbname] "spiceflow.db")]
(sqlite/create-store db-path)))
:stop nil)
;; HTTP Server
(defstate server
:start (let [port (get-in config/config [:server :port] 3000)
host (get-in config/config [:server :host] "0.0.0.0")
app (routes/create-app store ws/broadcast-to-session)]
(log/info "Starting Spiceflow server on" (str host ":" port))
(jetty/run-jetty app
{:port port
:host host
:join? false
:websockets {"/api/ws" ws/ws-handler}}))
:stop (do
(log/info "Stopping Spiceflow server...")
(manager/cleanup-all store)
(.stop server)))
(defn -main
"Main entry point"
[& _args]
(log/info "Starting Spiceflow - The spice must flow!")
(mount/start)
(log/info "Spiceflow is ready.")
;; Keep the main thread alive
@(promise))
(comment
;; Development helpers
(mount/start)
(mount/stop)
;; Test database
(require '[spiceflow.db.protocol :as db])
(db/get-sessions store)
;; Test discovery
(manager/discover-all-sessions))
+79
View File
@@ -0,0 +1,79 @@
(ns spiceflow.db.memory
"In-memory implementation of DataStore for testing"
(:require [spiceflow.db.protocol :as proto])
(:import [java.util UUID]
[java.time Instant]))
(defn- generate-id []
(str (UUID/randomUUID)))
(defn- now-iso []
(.toString (Instant/now)))
(defrecord MemoryStore [sessions messages]
proto/DataStore
(get-sessions [_]
(->> @sessions
vals
(sort-by :updated-at)
reverse
vec))
(get-session [_ id]
(get @sessions id))
(save-session [this session]
(let [id (or (:id session) (generate-id))
now (now-iso)
new-session (assoc session
:id id
:status (or (:status session) :idle)
:created-at now
:updated-at now)]
(swap! sessions assoc id new-session)
new-session))
(update-session [_ id data]
(let [now (now-iso)]
(swap! sessions update id merge data {:updated-at now})
(get @sessions id)))
(delete-session [_ id]
(swap! messages (fn [msgs]
(into {} (remove #(= (:session-id (val %)) id) msgs))))
(swap! sessions dissoc id)
nil)
(get-messages [_ session-id]
(->> @messages
vals
(filter #(= (:session-id %) session-id))
(sort-by :created-at)
vec))
(save-message [this message]
(let [id (or (:id message) (generate-id))
now (now-iso)
new-message (assoc message
:id id
:created-at now)]
(swap! messages assoc id new-message)
;; Update session's updated-at
(when-let [session-id (:session-id message)]
(swap! sessions update session-id assoc :updated-at now))
new-message))
(get-message [_ id]
(get @messages id)))
(defn create-store
"Create an in-memory store for testing"
[]
(->MemoryStore (atom {}) (atom {})))
(defn clear-store!
"Clear all data from an in-memory store"
[store]
(reset! (:sessions store) {})
(reset! (:messages store) {}))
+38
View File
@@ -0,0 +1,38 @@
(ns spiceflow.db.protocol)
(defprotocol DataStore
"Protocol for database operations. Implementations can be SQLite, in-memory, etc."
;; Session operations
(get-sessions [this]
"Get all sessions")
(get-session [this id]
"Get a single session by ID")
(save-session [this session]
"Save a new session. Returns the saved session with ID.")
(update-session [this id data]
"Update session fields. Returns updated session.")
(delete-session [this id]
"Delete a session and its messages")
;; Message operations
(get-messages [this session-id]
"Get all messages for a session, ordered by created_at")
(save-message [this message]
"Save a new message. Returns the saved message with ID.")
(get-message [this id]
"Get a single message by ID"))
(defn valid-session?
"Validate session data has required fields"
[{:keys [provider]}]
(and provider
(contains? #{:claude :opencode "claude" "opencode"} provider)))
(defn valid-message?
"Validate message data has required fields"
[{:keys [session-id role content]}]
(and session-id
role
(contains? #{:user :assistant :system "user" "assistant" "system"} role)
content))
+194
View File
@@ -0,0 +1,194 @@
(ns spiceflow.db.sqlite
(:require [next.jdbc :as jdbc]
[next.jdbc.result-set :as rs]
[next.jdbc.sql :as sql]
[spiceflow.db.protocol :as proto]
[jsonista.core :as json]
[clojure.string :as str])
(:import [java.util UUID]
[java.time Instant]))
(def ^:private mapper (json/object-mapper {:decode-key-fn keyword}))
(defn- generate-id []
(str (UUID/randomUUID)))
(defn- now-iso []
(.toString (Instant/now)))
(defn- row->session
"Convert a database row to a session map"
[row]
(when row
{:id (:sessions/id row)
:provider (keyword (:sessions/provider row))
:external-id (:sessions/external_id row)
:title (:sessions/title row)
:working-dir (:sessions/working_dir row)
:status (keyword (or (:sessions/status row) "idle"))
:created-at (:sessions/created_at row)
:updated-at (:sessions/updated_at row)}))
(defn- row->message
"Convert a database row to a message map"
[row]
(when row
{:id (:messages/id row)
:session-id (:messages/session_id row)
:role (keyword (:messages/role row))
:content (:messages/content row)
:metadata (when-let [m (:messages/metadata row)]
(json/read-value m mapper))
:created-at (:messages/created_at row)}))
(defn- session->row
"Convert a session map to database columns"
[{:keys [id provider external-id title working-dir status]}]
(cond-> {}
id (assoc :id id)
provider (assoc :provider (name provider))
external-id (assoc :external_id external-id)
title (assoc :title title)
working-dir (assoc :working_dir working-dir)
status (assoc :status (name status))))
(defn- message->row
"Convert a message map to database columns"
[{:keys [id session-id role content metadata]}]
(cond-> {}
id (assoc :id id)
session-id (assoc :session_id session-id)
role (assoc :role (name role))
content (assoc :content content)
metadata (assoc :metadata (json/write-value-as-string metadata))))
(defrecord SQLiteStore [datasource]
proto/DataStore
(get-sessions [_]
(let [rows (jdbc/execute! datasource
["SELECT * FROM sessions ORDER BY updated_at DESC"]
{:builder-fn rs/as-unqualified-kebab-maps})]
(mapv (fn [row]
{:id (:id row)
:provider (keyword (:provider row))
:external-id (:external-id row)
:title (:title row)
:working-dir (:working-dir row)
:status (keyword (or (:status row) "idle"))
:created-at (:created-at row)
:updated-at (:updated-at row)})
rows)))
(get-session [_ id]
(let [row (jdbc/execute-one! datasource
["SELECT * FROM sessions WHERE id = ?" id]
{:builder-fn rs/as-unqualified-kebab-maps})]
(when row
{:id (:id row)
:provider (keyword (:provider row))
:external-id (:external-id row)
:title (:title row)
:working-dir (:working-dir row)
:status (keyword (or (:status row) "idle"))
:created-at (:created-at row)
:updated-at (:updated-at row)})))
(save-session [this session]
(let [id (or (:id session) (generate-id))
now (now-iso)
row (-> (session->row session)
(assoc :id id
:created_at now
:updated_at now))]
(sql/insert! datasource :sessions row)
(proto/get-session this id)))
(update-session [this id data]
(let [row (-> (session->row data)
(assoc :updated_at (now-iso)))]
(sql/update! datasource :sessions row {:id id})
(proto/get-session this id)))
(delete-session [_ id]
(jdbc/execute! datasource ["DELETE FROM messages WHERE session_id = ?" id])
(jdbc/execute! datasource ["DELETE FROM sessions WHERE id = ?" id])
nil)
(get-messages [_ session-id]
(let [rows (jdbc/execute! datasource
["SELECT * FROM messages WHERE session_id = ? ORDER BY created_at ASC"
session-id]
{:builder-fn rs/as-unqualified-kebab-maps})]
(mapv (fn [row]
{:id (:id row)
:session-id (:session-id row)
:role (keyword (:role row))
:content (:content row)
:metadata (when-let [m (:metadata row)]
(json/read-value m mapper))
:created-at (:created-at row)})
rows)))
(save-message [this message]
(let [id (or (:id message) (generate-id))
now (now-iso)
row (-> (message->row message)
(assoc :id id
:created_at now))]
(sql/insert! datasource :messages row)
;; Update session's updated_at
(jdbc/execute! datasource
["UPDATE sessions SET updated_at = ? WHERE id = ?"
now (:session-id message)])
(proto/get-message this id)))
(get-message [_ id]
(let [row (jdbc/execute-one! datasource
["SELECT * FROM messages WHERE id = ?" id]
{:builder-fn rs/as-unqualified-kebab-maps})]
(when row
{:id (:id row)
:session-id (:session-id row)
:role (keyword (:role row))
:content (:content row)
:metadata (when-let [m (:metadata row)]
(json/read-value m mapper))
:created-at (:created-at row)}))))
(def schema
"SQLite schema for spiceflow"
["CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY,
provider TEXT NOT NULL,
external_id TEXT,
title TEXT,
working_dir TEXT,
status TEXT DEFAULT 'idle',
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now'))
)"
"CREATE TABLE IF NOT EXISTS messages (
id TEXT PRIMARY KEY,
session_id TEXT REFERENCES sessions(id),
role TEXT NOT NULL,
content TEXT,
metadata TEXT,
created_at TEXT DEFAULT (datetime('now'))
)"
"CREATE INDEX IF NOT EXISTS idx_messages_session_id ON messages(session_id)"
"CREATE INDEX IF NOT EXISTS idx_sessions_provider ON sessions(provider)"
"CREATE INDEX IF NOT EXISTS idx_sessions_external_id ON sessions(external_id)"])
(defn init-schema!
"Initialize database schema"
[datasource]
(doseq [stmt schema]
(jdbc/execute! datasource [stmt])))
(defn create-store
"Create a SQLite store with the given database path"
[db-path]
(let [datasource (jdbc/get-datasource {:dbtype "sqlite" :dbname db-path})]
(init-schema! datasource)
(->SQLiteStore datasource)))
+122
View File
@@ -0,0 +1,122 @@
(ns spiceflow.session.manager
"Session lifecycle management"
(:require [spiceflow.db.protocol :as db]
[spiceflow.adapters.protocol :as adapter]
[spiceflow.adapters.claude :as claude]
[spiceflow.adapters.opencode :as opencode]
[clojure.tools.logging :as log])
(:import [java.util.concurrent ConcurrentHashMap]))
;; Active process handles for running sessions
(defonce ^:private active-processes (ConcurrentHashMap.))
(defn get-adapter
"Get the appropriate adapter for a provider"
[provider]
(case (keyword provider)
:claude (claude/create-adapter)
:opencode (opencode/create-adapter)
(throw (ex-info "Unknown provider" {:provider provider}))))
(defn discover-all-sessions
"Discover sessions from all configured providers"
[]
(let [claude-sessions (adapter/discover-sessions (claude/create-adapter))
opencode-sessions (adapter/discover-sessions (opencode/create-adapter))]
(concat claude-sessions opencode-sessions)))
(defn import-session
"Import a discovered session into the database"
[store session]
(db/save-session store session))
(defn get-active-process
"Get the active process handle for a session"
[session-id]
(.get active-processes session-id))
(defn session-running?
"Check if a session has an active process"
[session-id]
(when-let [handle (get-active-process session-id)]
(adapter/running? handle)))
(defn start-session
"Start a CLI process for a session"
[store session-id]
(let [session (db/get-session store session-id)]
(when-not session
(throw (ex-info "Session not found" {:session-id session-id})))
(when (session-running? session-id)
(throw (ex-info "Session already running" {:session-id session-id})))
(let [adapter (get-adapter (:provider session))
handle (adapter/spawn-session adapter
(:external-id session)
{:working-dir (:working-dir session)})]
(.put active-processes session-id handle)
(db/update-session store session-id {:status :running})
handle)))
(defn stop-session
"Stop a running CLI process for a session"
[store session-id]
(when-let [handle (.remove active-processes session-id)]
(let [session (db/get-session store session-id)
adapter (get-adapter (:provider session))]
(adapter/kill-process adapter handle)
(db/update-session store session-id {:status :idle}))))
(defn send-message-to-session
"Send a message to a running session"
[store session-id message]
(let [session (db/get-session store session-id)
_ (when-not session
(throw (ex-info "Session not found" {:session-id session-id})))
handle (get-active-process session-id)
;; Start session if not running
handle (or handle (start-session store session-id))
adapter (get-adapter (:provider session))]
;; Save user message
(db/save-message store {:session-id session-id
:role :user
:content message})
;; Send to CLI
(adapter/send-message adapter handle message)))
(defn stream-session-response
"Stream response from a running session, calling callback for each event"
[store session-id callback]
(let [session (db/get-session store session-id)
_ (when-not session
(throw (ex-info "Session not found" {:session-id session-id})))
handle (get-active-process session-id)
_ (when-not handle
(throw (ex-info "Session not running" {:session-id session-id})))
adapter (get-adapter (:provider session))
content-buffer (StringBuilder.)]
;; Read stream and accumulate content
(adapter/read-stream adapter handle
(fn [event]
(callback event)
;; Accumulate text content
(when-let [text (:text event)]
(.append content-buffer text))
;; On message stop, save the accumulated message
(when (= :message-stop (:event event))
(let [content (.toString content-buffer)]
(when (seq content)
(db/save-message store {:session-id session-id
:role :assistant
:content content}))))))
;; Update session status when stream ends
(db/update-session store session-id {:status :idle})
(.remove active-processes session-id)))
(defn cleanup-all
"Stop all running sessions"
[store]
(doseq [session-id (keys active-processes)]
(try
(stop-session store session-id)
(catch Exception e
(log/warn "Failed to stop session:" session-id (.getMessage e))))))
+80
View File
@@ -0,0 +1,80 @@
(ns spiceflow.adapters-test
(:require [clojure.test :refer [deftest testing is]]
[spiceflow.adapters.protocol :as proto]
[spiceflow.adapters.claude :as claude]
[spiceflow.adapters.opencode :as opencode]))
(deftest test-claude-adapter
(testing "Provider name"
(let [adapter (claude/create-adapter)]
(is (= :claude (proto/provider-name adapter)))))
(testing "Parse JSONL messages"
(let [adapter (claude/create-adapter)]
;; User message
(let [parsed (proto/parse-output adapter
"{\"type\":\"user\",\"message\":{\"content\":\"Hello\"},\"timestamp\":\"2024-01-01T00:00:00Z\"}")]
(is (= :user (:role parsed)))
(is (= "Hello" (:content parsed))))
;; Assistant message
(let [parsed (proto/parse-output adapter
"{\"type\":\"assistant\",\"message\":{\"content\":\"Hi!\"},\"timestamp\":\"2024-01-01T00:00:00Z\"}")]
(is (= :assistant (:role parsed)))
(is (= "Hi!" (:content parsed))))
;; Content delta (streaming)
(let [parsed (proto/parse-output adapter
"{\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"text\":\"Hello\"}}")]
(is (= :content-delta (:event parsed)))
(is (= "Hello" (:text parsed))))
;; Message stop
(let [parsed (proto/parse-output adapter
"{\"type\":\"message_stop\"}")]
(is (= :message-stop (:event parsed))))
;; Empty line
(is (nil? (proto/parse-output adapter "")))
(is (nil? (proto/parse-output adapter " "))))))
(deftest test-opencode-adapter
(testing "Provider name"
(let [adapter (opencode/create-adapter)]
(is (= :opencode (proto/provider-name adapter)))))
(testing "Parse stream output"
(let [adapter (opencode/create-adapter)]
;; JSON content
(let [parsed (proto/parse-output adapter "{\"content\":\"Hello\"}")]
(is (= :content-delta (:event parsed)))
(is (= "Hello" (:text parsed))))
;; Plain text
(let [parsed (proto/parse-output adapter "Some output text")]
(is (= :content-delta (:event parsed)))
(is (= "Some output text" (:text parsed))))
;; Done message
(let [parsed (proto/parse-output adapter "{\"done\":true}")]
(is (= :message-stop (:event parsed)))))))
(deftest test-discover-sessions
(testing "Claude adapter discovers sessions (may be empty)"
(let [adapter (claude/create-adapter)
sessions (proto/discover-sessions adapter)]
(is (vector? sessions))
(doseq [session sessions]
(is (:external-id session))
(is (= :claude (:provider session))))))
(testing "OpenCode adapter discovers sessions (may fail if not installed)"
(let [adapter (opencode/create-adapter)]
;; This may fail if opencode is not installed, so we just check
;; that it doesn't throw an unexpected exception
(try
(let [sessions (proto/discover-sessions adapter)]
(is (or (nil? sessions) (vector? sessions))))
(catch Exception _
;; OK if opencode is not installed
(is true))))))
+86
View File
@@ -0,0 +1,86 @@
(ns spiceflow.db-test
(:require [clojure.test :refer [deftest testing is use-fixtures]]
[spiceflow.db.protocol :as db]
[spiceflow.db.memory :as memory]
[spiceflow.db.sqlite :as sqlite]))
;; Test both implementations
(def ^:dynamic *store* nil)
(defn memory-fixture [f]
(binding [*store* (memory/create-store)]
(f)))
(defn sqlite-fixture [f]
(let [db-path (str "/tmp/spiceflow-test-" (System/currentTimeMillis) ".db")]
(binding [*store* (sqlite/create-store db-path)]
(try
(f)
(finally
(clojure.java.io/delete-file db-path true))))))
;; Session tests
(deftest test-session-crud
(testing "Create and retrieve session"
(let [session (db/save-session *store* {:provider :claude
:external-id "test-123"
:title "Test Session"
:working-dir "/home/test"})]
(is (:id session))
(is (= :claude (:provider session)))
(is (= "test-123" (:external-id session)))
(is (:created-at session))
;; Get session
(let [retrieved (db/get-session *store* (:id session))]
(is (= (:id session) (:id retrieved)))
(is (= "Test Session" (:title retrieved))))))
(testing "List sessions"
(let [sessions (db/get-sessions *store*)]
(is (>= (count sessions) 1))))
(testing "Update session"
(let [session (first (db/get-sessions *store*))
updated (db/update-session *store* (:id session) {:title "Updated Title"
:status :running})]
(is (= "Updated Title" (:title updated)))
(is (= :running (:status updated)))))
(testing "Delete session"
(let [session (first (db/get-sessions *store*))]
(db/delete-session *store* (:id session))
(is (nil? (db/get-session *store* (:id session)))))))
;; Message tests
(deftest test-message-crud
(testing "Create and retrieve messages"
(let [session (db/save-session *store* {:provider :claude
:external-id "msg-test"
:title "Message Test"})
msg1 (db/save-message *store* {:session-id (:id session)
:role :user
:content "Hello"})
msg2 (db/save-message *store* {:session-id (:id session)
:role :assistant
:content "Hi there!"
:metadata {:model "claude-3"}})]
(is (:id msg1))
(is (:id msg2))
;; Get messages
(let [messages (db/get-messages *store* (:id session))]
(is (= 2 (count messages)))
(is (= :user (:role (first messages))))
(is (= :assistant (:role (second messages))))
(is (= {:model "claude-3"} (:metadata (second messages)))))
;; Get single message
(let [retrieved (db/get-message *store* (:id msg1))]
(is (= "Hello" (:content retrieved)))))))
;; Run with memory store
(use-fixtures :each memory-fixture)
;; Uncomment to run with SQLite
;; (use-fixtures :each sqlite-fixture)
+4
View File
@@ -0,0 +1,4 @@
#kaocha/v1
{:tests [{:id :unit
:test-paths ["test"]}]
:reporter [kaocha.report/dots]}