fix last session

This commit is contained in:
2026-01-21 14:26:30 -05:00
parent 3d5ae8efca
commit 051e3dfcb4
28 changed files with 699 additions and 1295 deletions
+1 -3
View File
@@ -31,7 +31,6 @@ server/
│ ├── adapters/ # CLI integrations
│ ├── api/ # HTTP & WebSocket
│ ├── session/ # Session lifecycle
│ ├── push/ # Push notifications
│ └── terminal/ # Terminal diff caching
├── dev/user.clj # REPL helpers
├── test/ # Unit tests
@@ -42,8 +41,7 @@ server/
## Mount States (start order)
1. `store` - SQLite database
2. `push` - Push notification store
3. `server` - Jetty HTTP server
2. `server` - Jetty HTTP server
## Namespaces
+3 -6
View File
@@ -11,14 +11,12 @@
Mount states (start order):
1. `store` - SQLite via `sqlite/create-store`
2. `push` - Push store using same DB connection
3. `server` - Jetty with WebSocket support
2. `server` - Jetty with WebSocket support
Server wiring:
```clojure
(ws/set-pending-permission-fn! (partial manager/get-pending-permission store))
(manager/set-push-store! push)
(routes/create-app store ws/broadcast-to-session push)
(routes/create-app store ws/broadcast-to-session)
```
## Subdirectories
@@ -29,14 +27,13 @@ Server wiring:
| `adapters/` | AgentAdapter protocol + CLI integrations |
| `api/` | HTTP routes + WebSocket |
| `session/` | Session state machine |
| `push/` | Push notifications |
| `terminal/` | Terminal diff caching |
## Patterns
**Dependency injection via currying:**
```clojure
(defn create-app [store broadcast-fn push-store]
(defn create-app [store broadcast-fn]
(let [handlers (make-handlers store broadcast-fn)]
(ring/ring-handler (router handlers))))
```
+29 -23
View File
@@ -256,31 +256,37 @@
;; \x1b[H moves cursor home, \x1b[2J clears screen, \x1b[3J clears scrollback
(def ^:private clear-screen-pattern #"\u001b\[H\u001b\[2J|\u001b\[2J\u001b\[H|\u001b\[2J|\u001b\[3J")
(defn- content-after-last-clear
"Return content after the last clear screen sequence, or full content if no clear found."
[content]
(if (str/blank? content)
content
(let [matcher (re-matcher clear-screen-pattern content)]
(loop [last-end nil]
(if (.find matcher)
(recur (.end matcher))
(if last-end
(subs content last-end)
content))))))
(defn- find-clear-line-indices
"Find 0-based line indices where clear screen sequences occur.
Returns a vector of line indices."
[lines]
(vec (keep-indexed
(fn [idx line]
(when (re-find clear-screen-pattern line)
idx))
lines)))
(defn capture-pane
"Capture the current content of a tmux pane.
Returns the visible terminal content as a string, or nil if session doesn't exist.
Preserves ANSI escape sequences for color rendering in the client.
Content before the last clear screen sequence is stripped."
"Capture the full content of a tmux pane including scrollback history.
Returns a map with:
:content - Full terminal content as a string
:clear-indices - Vector of 0-based line indices where clears occurred
:last-clear-line - The last line index where a clear occurred (for scroll positioning)
Returns nil if session doesn't exist.
Preserves ANSI escape sequences for color rendering in the client."
[session-name]
(when session-name
;; Use capture-pane with -p to print to stdout, -e to include escape sequences
;; -S -1000 captures scrollback history
(let [result (shell/sh "tmux" "capture-pane" "-t" session-name "-p" "-e" "-S" "-1000")]
;; Use -S - to capture from start of scrollback history (infinite scroll up)
(let [result (shell/sh "tmux" "capture-pane" "-t" session-name "-p" "-e" "-S" "-")]
(when (zero? (:exit result))
(content-after-last-clear (:out result))))))
(let [content (:out result)
lines (str/split-lines content)
clear-indices (find-clear-line-indices lines)
last-clear (when (seq clear-indices) (last clear-indices))]
{:content content
:clear-indices clear-indices
:last-clear-line last-clear})))))
(defn get-session-name
"Get the tmux session name for a spiceflow session.
@@ -339,10 +345,10 @@
;; Screen size presets for different device orientations
(def ^:private screen-sizes
{:fullscreen {:width 260 :height 36}
:desktop {:width 120 :height 36}
:landscape {:width 86 :height 24}
:portrait {:width 42 :height 24}})
{:fullscreen {:width 260 :height 24}
:desktop {:width 120 :height 24}
:landscape {:width 86 :height 16}
:portrait {:width 42 :height 16}})
(defn resize-session
"Resize a tmux session window to a preset size.
+15 -50
View File
@@ -10,7 +10,6 @@
[spiceflow.adapters.protocol :as adapter]
[spiceflow.adapters.tmux :as tmux]
[spiceflow.terminal.diff :as terminal-diff]
[spiceflow.push.protocol :as push-proto]
[clojure.tools.logging :as log]))
(defn- json-response
@@ -239,13 +238,16 @@
;; If fresh=true, invalidate cache to ensure full content is returned
(when fresh?
(terminal-diff/invalidate-cache id))
(let [{:keys [content diff]} (terminal-diff/capture-with-diff id tmux/capture-pane)
(let [{:keys [content diff clear-indices scroll-to-line]}
(terminal-diff/capture-with-diff id tmux/capture-pane)
layout (tmux/detect-layout-mode id)]
(json-response-no-cache {:content (or content "")
:alive true
:session-name id
:diff diff
:layout (when layout (name layout))})))
:layout (when layout (name layout))
:clear-indices (or clear-indices [])
:scroll-to-line scroll-to-line})))
(do
;; Session died - invalidate cache
(terminal-diff/invalidate-cache id)
@@ -268,10 +270,13 @@
;; Always broadcast to ensure client receives update, even if content unchanged
(future
(Thread/sleep 100) ;; Small delay to let terminal update
(let [{:keys [content diff]} (terminal-diff/capture-with-diff id tmux/capture-pane)]
(let [{:keys [content diff clear-indices scroll-to-line]}
(terminal-diff/capture-with-diff id tmux/capture-pane)]
(broadcast-fn id {:event :terminal-update
:content (or content "")
:diff diff})))
:diff diff
:clear-indices (or clear-indices [])
:scroll-to-line scroll-to-line})))
(json-response {:status "sent"}))
(error-response 400 "Tmux session not alive"))
(error-response 400 "Not a tmux session")))))
@@ -334,46 +339,9 @@
[_request]
(json-response {:status "ok" :service "spiceflow"}))
;; Push notification handlers
(defn vapid-key-handler
"Return the public VAPID key for push subscriptions"
[push-store]
(fn [_request]
(if-let [vapid-keys (push-proto/get-vapid-keys push-store)]
(json-response {:publicKey (:public-key vapid-keys)})
(error-response 500 "VAPID keys not configured"))))
(defn subscribe-handler
"Save a push subscription"
[push-store]
(fn [request]
(let [body (:body request)
subscription {:endpoint (:endpoint body)
:p256dh (get-in body [:keys :p256dh])
:auth (get-in body [:keys :auth])
:user-agent (get-in request [:headers "user-agent"])}]
(log/debug "Push subscribe request:" {:endpoint (:endpoint subscription)})
(if (push-proto/valid-subscription? subscription)
(let [saved (push-proto/save-subscription push-store subscription)]
(-> (json-response {:id (:id saved)})
(response/status 201)))
(error-response 400 "Invalid subscription: endpoint, p256dh, and auth are required")))))
(defn unsubscribe-handler
"Remove a push subscription"
[push-store]
(fn [request]
(let [endpoint (get-in request [:body :endpoint])]
(log/debug "Push unsubscribe request:" {:endpoint endpoint})
(if endpoint
(do
(push-proto/delete-subscription-by-endpoint push-store endpoint)
(response/status (response/response nil) 204))
(error-response 400 "Endpoint is required")))))
(defn create-routes
"Create API routes with the given store, broadcast function, and push store"
[store broadcast-fn push-store]
"Create API routes with the given store and broadcast function"
[store broadcast-fn]
[["/api"
["/health" {:get health-handler}]
["/sessions" {:get (list-sessions-handler store)
@@ -388,16 +356,13 @@
["/sessions/:id/terminal/resize" {:post (terminal-resize-handler store)}]
["/sessions/:id/eject" {:post (eject-tmux-handler store)}]
["/tmux/external" {:get list-external-tmux-handler}]
["/tmux/import" {:post import-tmux-handler}]
["/push/vapid-key" {:get (vapid-key-handler push-store)}]
["/push/subscribe" {:post (subscribe-handler push-store)}]
["/push/unsubscribe" {:post (unsubscribe-handler push-store)}]]])
["/tmux/import" {:post import-tmux-handler}]]])
(defn create-app
"Create the Ring application"
[store broadcast-fn push-store]
[store broadcast-fn]
(-> (ring/ring-handler
(ring/router (create-routes store broadcast-fn push-store)
(ring/router (create-routes store broadcast-fn)
{:data {:middleware [parameters/parameters-middleware]}})
(ring/create-default-handler))
(wrap-json-body {:keywords? true})
+1 -11
View File
@@ -6,7 +6,6 @@
[spiceflow.api.routes :as routes]
[spiceflow.api.websocket :as ws]
[spiceflow.session.manager :as manager]
[spiceflow.push.store :as push-store]
[mount.core :as mount :refer [defstate]]
[clojure.tools.logging :as log])
(:gen-class))
@@ -19,13 +18,6 @@
(sqlite/create-store db-path)))
:stop nil)
;; Push notification store (shares datasource with main store)
(defstate push
:start (do
(log/info "Initializing push notification store...")
(push-store/create-push-store (:datasource store)))
:stop nil)
;; Atom to hold the Jetty server instance
(defonce ^:private jetty-server (atom nil))
@@ -35,9 +27,7 @@
host (get-in config/config [:server :host] "0.0.0.0")
;; Wire up pending permission function for WebSocket (partially apply store)
_ (ws/set-pending-permission-fn! (partial manager/get-pending-permission store))
;; Wire up push store for notifications (used by manager)
_ (manager/set-push-store! push)
api-app (routes/create-app store ws/broadcast-to-session push)
api-app (routes/create-app store ws/broadcast-to-session)
;; Wrap the app to handle WebSocket upgrades on /api/ws
app (fn [request]
(if (and (jetty/ws-upgrade-request? request)
-33
View File
@@ -1,33 +0,0 @@
(ns spiceflow.push.protocol
"Protocol for push notification subscription storage")
(defprotocol PushStore
"Protocol for managing push subscriptions and VAPID keys"
;; Subscription operations
(get-subscriptions [this]
"Get all push subscriptions")
(get-subscription [this id]
"Get a subscription by ID")
(get-subscription-by-endpoint [this endpoint]
"Get a subscription by endpoint URL")
(save-subscription [this subscription]
"Save a new push subscription. Returns the saved subscription with ID.")
(delete-subscription [this id]
"Delete a subscription by ID")
(delete-subscription-by-endpoint [this endpoint]
"Delete a subscription by endpoint URL")
;; VAPID key operations
(get-vapid-keys [this]
"Get the VAPID key pair (returns {:public-key :private-key} or nil)")
(save-vapid-keys [this keys]
"Save VAPID key pair. Only called once on first use."))
(defn valid-subscription?
"Validate subscription data has required fields"
[{:keys [endpoint p256dh auth]}]
(and endpoint p256dh auth
(string? endpoint)
(string? p256dh)
(string? auth)))
-337
View File
@@ -1,337 +0,0 @@
(ns spiceflow.push.sender
"Web Push message encryption and delivery.
Implements RFC 8291 (Message Encryption for Web Push) and RFC 8292 (VAPID)."
(:require [clj-http.client :as http]
[jsonista.core :as json]
[spiceflow.push.protocol :as proto]
[spiceflow.push.vapid :as vapid]
[clojure.tools.logging :as log])
(:import [java.security KeyPairGenerator SecureRandom KeyFactory]
[java.security.spec ECGenParameterSpec ECPublicKeySpec ECPoint]
[javax.crypto Cipher KeyAgreement Mac]
[javax.crypto.spec SecretKeySpec GCMParameterSpec]
[java.util Base64 Arrays]
[java.nio ByteBuffer]
[java.math BigInteger]
[org.bouncycastle.jce ECNamedCurveTable]
[org.bouncycastle.jce.spec ECNamedCurveSpec]))
;; Note: We implement Web Push encryption using Java crypto primitives
;; to avoid additional dependencies beyond buddy-core
(defn- base64url-decode
"Decode URL-safe base64 string to bytes"
[^String s]
(.decode (Base64/getUrlDecoder) s))
(defn- base64url-encode
"Encode bytes to URL-safe base64 without padding"
[^bytes b]
(-> (Base64/getUrlEncoder)
(.withoutPadding)
(.encodeToString b)))
(defn- generate-salt
"Generate 16 random bytes for encryption salt"
[]
(let [salt (byte-array 16)
random (SecureRandom.)]
(.nextBytes random salt)
salt))
(defn- generate-ephemeral-keypair
"Generate an ephemeral ECDH key pair for message encryption"
[]
(let [kpg (KeyPairGenerator/getInstance "EC")
_ (.initialize kpg (ECGenParameterSpec. "secp256r1") (SecureRandom.))]
(.generateKeyPair kpg)))
(defn- public-key->uncompressed-bytes
"Convert EC public key to uncompressed point format (0x04 || x || y)"
[public-key]
(let [point (.getW public-key)
x-bytes (.toByteArray (.getAffineX point))
y-bytes (.toByteArray (.getAffineY point))
x-padded (byte-array 32)
y-padded (byte-array 32)]
(let [x-len (min 32 (alength x-bytes))
x-offset (max 0 (- (alength x-bytes) 32))
y-len (min 32 (alength y-bytes))
y-offset (max 0 (- (alength y-bytes) 32))]
(System/arraycopy x-bytes x-offset x-padded (- 32 x-len) x-len)
(System/arraycopy y-bytes y-offset y-padded (- 32 y-len) y-len))
(let [result (byte-array 65)]
(aset-byte result 0 (unchecked-byte 0x04))
(System/arraycopy x-padded 0 result 1 32)
(System/arraycopy y-padded 0 result 33 32)
result)))
(defn- uncompressed-bytes->public-key
"Convert uncompressed point bytes (0x04 || x || y) to EC public key"
[^bytes point-bytes]
(when (and (= 65 (alength point-bytes))
(= 0x04 (aget point-bytes 0)))
(let [x-bytes (byte-array 32)
y-bytes (byte-array 32)
_ (System/arraycopy point-bytes 1 x-bytes 0 32)
_ (System/arraycopy point-bytes 33 y-bytes 0 32)
x (BigInteger. 1 x-bytes)
y (BigInteger. 1 y-bytes)
;; Get EC parameters for P-256
kpg (doto (KeyPairGenerator/getInstance "EC")
(.initialize (ECGenParameterSpec. "secp256r1")))
temp-pair (.generateKeyPair kpg)
params (.getParams (.getPublic temp-pair))
point (ECPoint. x y)
spec (ECPublicKeySpec. point params)]
(-> (KeyFactory/getInstance "EC")
(.generatePublic spec)))))
(defn- ecdh-derive-secret
"Perform ECDH to derive shared secret"
[private-key public-key]
(let [ka (KeyAgreement/getInstance "ECDH")]
(.init ka private-key)
(.doPhase ka public-key true)
(.generateSecret ka)))
(defn- hmac-sha256
"Compute HMAC-SHA256"
[^bytes key ^bytes data]
(let [mac (Mac/getInstance "HmacSHA256")
secret-key (SecretKeySpec. key "HmacSHA256")]
(.init mac secret-key)
(.doFinal mac data)))
(defn- hkdf-extract
"HKDF extract step"
[salt ikm]
(let [salt (if (and salt (pos? (alength salt))) salt (byte-array 32))]
(hmac-sha256 salt ikm)))
(defn- hkdf-expand
"HKDF expand step"
[prk info length]
(let [hash-len 32
n (int (Math/ceil (/ length hash-len)))
okm (byte-array (* n hash-len))
prev (byte-array 0)]
(loop [i 1
prev prev]
(when (<= i n)
(let [input (byte-array (+ (alength prev) (alength info) 1))
_ (System/arraycopy prev 0 input 0 (alength prev))
_ (System/arraycopy info 0 input (alength prev) (alength info))
_ (aset-byte input (dec (alength input)) (unchecked-byte i))
output (hmac-sha256 prk input)]
(System/arraycopy output 0 okm (* (dec i) hash-len) hash-len)
(recur (inc i) output))))
(Arrays/copyOf okm length)))
(defn- hkdf
"Full HKDF key derivation"
[salt ikm info length]
(let [prk (hkdf-extract salt ikm)]
(hkdf-expand prk info length)))
(defn- build-info
"Build the info parameter for HKDF according to RFC 8291"
[^String type ^bytes client-public ^bytes server-public]
(let [type-bytes (.getBytes type "UTF-8")
;; Info structure: "Content-Encoding: <type>" || 0x00 || "P-256" || 0x00
;; || client public key length (2 bytes) || client public key
;; || server public key length (2 bytes) || server public key
info-len (+ (count "Content-Encoding: ") (alength type-bytes) 1
5 1 ;; "P-256" + null
2 65 ;; client key length + key
2 65) ;; server key length + key
info (byte-array info-len)
buf (ByteBuffer/wrap info)]
(.put buf (.getBytes (str "Content-Encoding: " type) "UTF-8"))
(.put buf (byte 0))
(.put buf (.getBytes "P-256" "UTF-8"))
(.put buf (byte 0))
(.putShort buf (short 65))
(.put buf client-public)
(.putShort buf (short 65))
(.put buf server-public)
info))
(defn- aes-128-gcm-encrypt
"Encrypt data using AES-128-GCM"
[^bytes key ^bytes nonce ^bytes plaintext]
(let [cipher (Cipher/getInstance "AES/GCM/NoPadding")
secret-key (SecretKeySpec. key "AES")
gcm-spec (GCMParameterSpec. 128 nonce)]
(.init cipher Cipher/ENCRYPT_MODE secret-key gcm-spec)
(.doFinal cipher plaintext)))
(defn- pad-plaintext
"Add padding to plaintext according to RFC 8291.
Padding: 2 bytes length prefix (big-endian) + padding bytes"
[^bytes plaintext]
;; For simplicity, use minimal padding (just the required 2-byte header)
;; The format is: padding_length (2 bytes, big-endian) || zeros || plaintext
(let [plaintext-len (alength plaintext)
;; Use 0 bytes of actual padding
padding-len 0
result (byte-array (+ 2 padding-len plaintext-len))]
;; Write padding length as big-endian 16-bit integer
(aset-byte result 0 (unchecked-byte (bit-shift-right padding-len 8)))
(aset-byte result 1 (unchecked-byte (bit-and padding-len 0xFF)))
;; Copy plaintext after padding header
(System/arraycopy plaintext 0 result (+ 2 padding-len) plaintext-len)
result))
(defn encrypt-payload
"Encrypt a push message payload using Web Push encryption (RFC 8291).
Parameters:
- p256dh: Client's ECDH public key (base64url encoded)
- auth: Client's auth secret (base64url encoded)
- plaintext: The message to encrypt (bytes)
Returns a map with:
- :ciphertext - The encrypted payload
- :salt - The encryption salt (for Content-Encoding header)
- :public-key - Server's ephemeral public key"
[p256dh auth plaintext]
(let [;; Decode client keys
client-public-bytes (base64url-decode p256dh)
auth-secret (base64url-decode auth)
client-public-key (uncompressed-bytes->public-key client-public-bytes)
;; Generate ephemeral server key pair
server-keypair (generate-ephemeral-keypair)
server-private-key (.getPrivate server-keypair)
server-public-key (.getPublic server-keypair)
server-public-bytes (public-key->uncompressed-bytes server-public-key)
;; Generate salt
salt (generate-salt)
;; ECDH to derive shared secret
ecdh-secret (ecdh-derive-secret server-private-key client-public-key)
;; Derive PRK using auth secret
;; PRK = HKDF-Extract(auth_secret, ecdh_secret)
auth-info (.getBytes "Content-Encoding: auth\u0000" "UTF-8")
prk-key (hkdf auth-secret ecdh-secret auth-info 32)
;; Derive content encryption key (CEK)
cek-info (build-info "aes128gcm" client-public-bytes server-public-bytes)
cek (hkdf salt prk-key cek-info 16)
;; Derive nonce
nonce-info (build-info "nonce" client-public-bytes server-public-bytes)
nonce (hkdf salt prk-key nonce-info 12)
;; Pad and encrypt
padded-plaintext (pad-plaintext plaintext)
ciphertext (aes-128-gcm-encrypt cek nonce padded-plaintext)]
{:ciphertext ciphertext
:salt salt
:public-key server-public-bytes}))
(defn build-encrypted-body
"Build the full encrypted body with header for aes128gcm Content-Encoding.
Format: salt (16 bytes) || rs (4 bytes) || idlen (1 byte) || keyid || ciphertext"
[^bytes salt ^bytes server-public ^bytes ciphertext]
(let [rs 4096 ;; Record size
idlen (alength server-public)
body (byte-array (+ 16 4 1 idlen (alength ciphertext)))
buf (ByteBuffer/wrap body)]
(.put buf salt)
(.putInt buf rs)
(.put buf (unchecked-byte idlen))
(.put buf server-public)
(.put buf ciphertext)
body))
(defn send-notification
"Send a push notification to a subscription.
Parameters:
- subscription: Map with :endpoint, :p256dh, :auth
- payload: Map to be JSON-encoded as the notification payload
- vapid-keys: Map with :public-key, :private-key (base64url encoded)
- options: Optional map with :ttl (seconds), :urgency, :topic
Returns:
- {:success true} on success
- {:success false :status <code> :body <body>} on failure
- {:success false :error <message>} on exception"
[subscription payload vapid-keys & [{:keys [ttl urgency topic subject]
:or {ttl 86400
subject "mailto:spiceflow@localhost"}}]]
(try
(let [endpoint (:endpoint subscription)
plaintext (.getBytes (json/write-value-as-string payload) "UTF-8")
;; Encrypt payload
{:keys [ciphertext salt public-key]} (encrypt-payload
(:p256dh subscription)
(:auth subscription)
plaintext)
;; Build encrypted body
body (build-encrypted-body salt public-key ciphertext)
;; Build VAPID authorization header
auth-header (vapid/vapid-authorization-header endpoint subject vapid-keys)
;; Build request headers
headers {"Authorization" auth-header
"Content-Type" "application/octet-stream"
"Content-Encoding" "aes128gcm"
"TTL" (str ttl)}
headers (cond-> headers
urgency (assoc "Urgency" urgency)
topic (assoc "Topic" topic))
;; Send request
response (http/post endpoint
{:headers headers
:body body
:throw-exceptions false})]
(if (<= 200 (:status response) 299)
{:success true}
{:success false
:status (:status response)
:body (:body response)}))
(catch Exception e
(log/error e "Failed to send push notification")
{:success false
:error (.getMessage e)})))
(defn send-to-all-subscriptions
"Send a notification to all subscriptions in the push store.
Parameters:
- push-store: PushStore instance
- payload: Notification payload map
Returns a sequence of results for each subscription."
[push-store payload]
(let [subscriptions (proto/get-subscriptions push-store)
vapid-keys (proto/get-vapid-keys push-store)]
(when (and (seq subscriptions) vapid-keys)
(log/info "Sending push notification to" (count subscriptions) "subscription(s)")
(doall
(for [sub subscriptions]
(let [result (send-notification sub payload vapid-keys)]
;; If subscription is gone (410) or invalid (404), remove it
(when (#{404 410} (:status result))
(log/info "Removing invalid/expired subscription:" (:endpoint sub))
(proto/delete-subscription push-store (:id sub)))
(assoc result :subscription-id (:id sub))))))))
(comment
;; Test encryption
(def test-p256dh "BNcRdreALRFXTkOOUHK1EtK2wtaz5Ry4YfYCA_0QTpQtUbVlUls0VJXg7A8u-Ts1XbjhazAkj7I99e8QcYP7DkM")
(def test-auth "tBHItJI5svbpez7KI4CCXg")
(def test-payload {:title "Test" :body "Hello"})
(encrypt-payload test-p256dh test-auth
(.getBytes (json/write-value-as-string test-payload) "UTF-8")))
-148
View File
@@ -1,148 +0,0 @@
(ns spiceflow.push.store
"SQLite implementation of PushStore protocol for managing push subscriptions and VAPID keys"
(:require [next.jdbc :as jdbc]
[next.jdbc.result-set :as rs]
[next.jdbc.sql :as sql]
[spiceflow.push.protocol :as proto]
[spiceflow.push.vapid :as vapid]
[clojure.tools.logging :as log])
(:import [java.util UUID]
[java.time Instant]))
(defn- generate-id []
(str (UUID/randomUUID)))
(defn- now-iso []
(.toString (Instant/now)))
(defn- row->subscription
"Convert a database row to a subscription map"
[row]
(when row
{:id (:id row)
:endpoint (:endpoint row)
:p256dh (:p256dh row)
:auth (:auth row)
:user-agent (:user-agent row)
:created-at (:created-at row)}))
(defn- row->vapid-keys
"Convert a database row to VAPID keys map"
[row]
(when row
{:public-key (:public-key row)
:private-key (:private-key row)
:created-at (:created-at row)}))
(defrecord SQLitePushStore [datasource]
proto/PushStore
(get-subscriptions [_]
(let [rows (jdbc/execute! datasource
["SELECT * FROM push_subscriptions ORDER BY created_at DESC"]
{:builder-fn rs/as-unqualified-kebab-maps})]
(mapv row->subscription rows)))
(get-subscription [_ id]
(let [row (jdbc/execute-one! datasource
["SELECT * FROM push_subscriptions WHERE id = ?" id]
{:builder-fn rs/as-unqualified-kebab-maps})]
(row->subscription row)))
(get-subscription-by-endpoint [_ endpoint]
(let [row (jdbc/execute-one! datasource
["SELECT * FROM push_subscriptions WHERE endpoint = ?" endpoint]
{:builder-fn rs/as-unqualified-kebab-maps})]
(row->subscription row)))
(save-subscription [this subscription]
(let [id (or (:id subscription) (generate-id))
now (now-iso)]
;; Check if subscription with this endpoint already exists
(if-let [existing (proto/get-subscription-by-endpoint this (:endpoint subscription))]
;; Update existing subscription
(do
(sql/update! datasource :push_subscriptions
{:p256dh (:p256dh subscription)
:auth (:auth subscription)
:user_agent (:user-agent subscription)}
{:endpoint (:endpoint subscription)})
(proto/get-subscription-by-endpoint this (:endpoint subscription)))
;; Insert new subscription
(do
(sql/insert! datasource :push_subscriptions
{:id id
:endpoint (:endpoint subscription)
:p256dh (:p256dh subscription)
:auth (:auth subscription)
:user_agent (:user-agent subscription)
:created_at now})
(proto/get-subscription this id)))))
(delete-subscription [_ id]
(jdbc/execute! datasource ["DELETE FROM push_subscriptions WHERE id = ?" id])
nil)
(delete-subscription-by-endpoint [_ endpoint]
(jdbc/execute! datasource ["DELETE FROM push_subscriptions WHERE endpoint = ?" endpoint])
nil)
(get-vapid-keys [_]
(let [row (jdbc/execute-one! datasource
["SELECT * FROM vapid_keys WHERE id = 1"]
{:builder-fn rs/as-unqualified-kebab-maps})]
(row->vapid-keys row)))
(save-vapid-keys [this keys]
(let [now (now-iso)]
;; Only save if no keys exist (singleton)
(when-not (proto/get-vapid-keys this)
(sql/insert! datasource :vapid_keys
{:id 1
:public_key (:public-key keys)
:private_key (:private-key keys)
:created_at now}))
(proto/get-vapid-keys this))))
(def push-schema
"SQLite schema for push notifications"
["CREATE TABLE IF NOT EXISTS push_subscriptions (
id TEXT PRIMARY KEY,
endpoint TEXT NOT NULL UNIQUE,
p256dh TEXT NOT NULL,
auth TEXT NOT NULL,
user_agent TEXT,
created_at TEXT DEFAULT (datetime('now'))
)"
"CREATE TABLE IF NOT EXISTS vapid_keys (
id INTEGER PRIMARY KEY CHECK (id = 1),
public_key TEXT NOT NULL,
private_key TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now'))
)"
"CREATE INDEX IF NOT EXISTS idx_push_subscriptions_endpoint ON push_subscriptions(endpoint)"])
(defn init-push-schema!
"Initialize push notification tables"
[datasource]
(doseq [stmt push-schema]
(jdbc/execute! datasource [stmt])))
(defn ensure-vapid-keys!
"Ensure VAPID keys exist, generating them if needed"
[push-store]
(if-let [existing (proto/get-vapid-keys push-store)]
(do
(log/info "Using existing VAPID keys")
existing)
(let [keys (vapid/generate-keypair)]
(log/info "Generated new VAPID keys")
(proto/save-vapid-keys push-store keys))))
(defn create-push-store
"Create a SQLite push store using the given datasource"
[datasource]
(init-push-schema! datasource)
(let [store (->SQLitePushStore datasource)]
(ensure-vapid-keys! store)
store))
-129
View File
@@ -1,129 +0,0 @@
(ns spiceflow.push.vapid
"VAPID (Voluntary Application Server Identification) authentication for Web Push.
Generates ECDSA P-256 key pairs and creates JWT tokens for push service authentication."
(:require [buddy.core.keys :as keys]
[buddy.sign.jwt :as jwt]
[clojure.tools.logging :as log])
(:import [java.security KeyPairGenerator SecureRandom]
[java.security.spec ECGenParameterSpec]
[java.util Base64]))
(defn- bytes->base64url
"Convert bytes to URL-safe base64 without padding"
[^bytes b]
(-> (Base64/getUrlEncoder)
(.withoutPadding)
(.encodeToString b)))
(defn- base64url->bytes
"Convert URL-safe base64 string to bytes"
[^String s]
(.decode (Base64/getUrlDecoder) s))
(defn- ec-public-key->uncompressed-bytes
"Convert EC public key to uncompressed point format (0x04 || x || y).
This is the format expected by the Push API for applicationServerKey."
[public-key]
(let [point (.getW public-key)
x-bytes (.toByteArray (.getAffineX point))
y-bytes (.toByteArray (.getAffineY point))
;; Ensure exactly 32 bytes for each coordinate
x-padded (byte-array 32)
y-padded (byte-array 32)]
;; Handle BigInteger byte arrays (may have leading zero or be shorter)
(let [x-len (min 32 (alength x-bytes))
x-offset (max 0 (- (alength x-bytes) 32))
y-len (min 32 (alength y-bytes))
y-offset (max 0 (- (alength y-bytes) 32))]
(System/arraycopy x-bytes x-offset x-padded (- 32 x-len) x-len)
(System/arraycopy y-bytes y-offset y-padded (- 32 y-len) y-len))
;; Create uncompressed point: 0x04 || x || y
(let [result (byte-array 65)]
(aset-byte result 0 (unchecked-byte 0x04))
(System/arraycopy x-padded 0 result 1 32)
(System/arraycopy y-padded 0 result 33 32)
result)))
(defn generate-keypair
"Generate a new ECDSA P-256 key pair for VAPID.
Returns {:public-key <base64url> :private-key <base64url>}"
[]
(let [kpg (KeyPairGenerator/getInstance "EC")
_ (.initialize kpg (ECGenParameterSpec. "secp256r1") (SecureRandom.))
keypair (.generateKeyPair kpg)
public-key (.getPublic keypair)
private-key (.getPrivate keypair)
;; Public key in uncompressed format for Push API
public-bytes (ec-public-key->uncompressed-bytes public-key)
;; Private key as raw 32-byte scalar
private-bytes (.getS private-key)]
{:public-key (bytes->base64url public-bytes)
:private-key (bytes->base64url (.toByteArray private-bytes))}))
(defn- reconstruct-private-key
"Reconstruct an EC private key from raw bytes"
[^bytes private-bytes]
(let [s (java.math.BigInteger. 1 private-bytes)
curve-params (-> (java.security.KeyFactory/getInstance "EC")
(.generatePrivate
(java.security.spec.ECPrivateKeySpec.
s
(.getParams
(-> (KeyPairGenerator/getInstance "EC")
(doto (.initialize (ECGenParameterSpec. "secp256r1")))
(.generateKeyPair)
(.getPrivate))))))
spec (java.security.spec.ECPrivateKeySpec. s (.getParams curve-params))]
(-> (java.security.KeyFactory/getInstance "EC")
(.generatePrivate spec))))
(defn create-vapid-jwt
"Create a VAPID JWT for authenticating to a push service.
Parameters:
- audience: The origin of the push service (e.g., https://fcm.googleapis.com)
- subject: Contact info (mailto: or https: URL)
- private-key-b64: Base64url-encoded private key
Returns a signed JWT valid for 12 hours."
[audience subject private-key-b64]
(let [now (quot (System/currentTimeMillis) 1000)
exp (+ now (* 12 60 60)) ;; 12 hours
claims {:aud audience
:exp exp
:sub subject}
private-bytes (base64url->bytes private-key-b64)
private-key (reconstruct-private-key private-bytes)]
(jwt/sign claims private-key {:alg :es256})))
(defn vapid-authorization-header
"Create the Authorization header value for VAPID authentication.
Parameters:
- endpoint: The push subscription endpoint URL
- subject: Contact info (mailto: or https: URL)
- vapid-keys: Map with :public-key and :private-key (base64url encoded)
Returns the value for the Authorization header."
[endpoint subject vapid-keys]
(let [url (java.net.URL. endpoint)
audience (str (.getProtocol url) "://" (.getHost url))
jwt (create-vapid-jwt audience subject (:private-key vapid-keys))]
(str "vapid t=" jwt ",k=" (:public-key vapid-keys))))
(comment
;; Test key generation
(def kp (generate-keypair))
kp
;; Test JWT creation
(create-vapid-jwt
"https://fcm.googleapis.com"
"mailto:test@example.com"
(:private-key kp))
;; Test authorization header
(vapid-authorization-header
"https://fcm.googleapis.com/fcm/send/abc123"
"mailto:test@example.com"
kp))
-36
View File
@@ -5,46 +5,12 @@
[spiceflow.adapters.claude :as claude]
[spiceflow.adapters.opencode :as opencode]
[spiceflow.adapters.tmux :as tmux]
[spiceflow.push.sender :as push-sender]
[clojure.tools.logging :as log])
(:import [java.util.concurrent ConcurrentHashMap]))
;; Active process handles for running sessions
(defonce ^:private active-processes (ConcurrentHashMap.))
;; Push store for sending notifications (set by core.clj)
(defonce ^:private push-store (atom nil))
;; Forward declaration for use in send-permission-notification-delayed!
(declare get-pending-permission)
(defn set-push-store!
"Set the push store for sending notifications"
[store]
(reset! push-store store))
(defn- send-permission-notification-delayed!
"Send push notification for a permission request after a delay, only if still pending"
[store session-id session perm-req delay-ms]
(when-let [pstore @push-store]
(future
(try
(Thread/sleep delay-ms)
;; Check if permission is still pending
(when-let [pending (get-pending-permission store session-id)]
;; Verify same permission request (by message-id) to avoid race conditions
(when (= (:message-id pending) (:message-id perm-req))
(let [tools (:tools perm-req)
payload {:title "Permission Required"
:body (str "Claude wants to use: " (clojure.string/join ", " tools))
:sessionId (:id session)
:sessionTitle (or (:title session) "Untitled Session")
:tools tools}]
(log/debug "Sending push notification for permission request:" (:tools perm-req))
(push-sender/send-to-all-subscriptions pstore payload))))
(catch Exception e
(log/error e "Failed to send push notification"))))))
(defn get-adapter
"Get the appropriate adapter for a provider"
[provider]
@@ -297,8 +263,6 @@
;; Normal flow: store pending and wait for user response
(let [perm-req-with-id (assoc perm-req :message-id msg-id)]
(set-pending-permission store session-id perm-req-with-id)
;; Send push notification for permission request after 15s delay
(send-permission-notification-delayed! store session-id session perm-req-with-id 15000)
(callback {:event :permission-request
:permission-request perm-req
:message-id msg-id
+22 -7
View File
@@ -152,18 +152,31 @@
Arguments:
- session-name: tmux session identifier
- capture-fn: function that takes session-name and returns content string
- capture-fn: function that takes session-name and returns a map with:
:content - full content string
:clear-indices - vector of line indices where clears occurred
:last-clear-line - the last clear line index (for scroll positioning)
Returns map with:
- :content - full content string (always included for GET requests)
- :diff - diff payload for WebSocket updates (includes :frame-id)
- :changed - boolean indicating if content changed"
- :diff - diff payload for WebSocket updates (includes :frame-id and :clear-indices)
- :changed - boolean indicating if content changed
- :clear-indices - vector of line indices where clears occurred
- :scroll-to-line - suggested scroll position (line after last clear)"
[session-name capture-fn]
(let [new-content (capture-fn session-name)
(let [capture-result (capture-fn session-name)
;; Handle both old string format and new map format for compatibility
{:keys [content clear-indices last-clear-line]}
(if (map? capture-result)
capture-result
{:content capture-result :clear-indices [] :last-clear-line nil})
new-content (or content "")
cached (.get terminal-cache session-name)
diff-result (compute-diff cached new-content)
is-full? (= :full (:type diff-result))
now (System/currentTimeMillis)]
now (System/currentTimeMillis)
;; Include clear-indices in diff result
diff-with-clear (assoc diff-result :clear-indices (or clear-indices []))]
;; Update cache if changed or full frame sent
(when (not= :unchanged (:type diff-result))
(.put terminal-cache session-name
@@ -173,8 +186,10 @@
;; Track when last full frame was sent for periodic resync
:last-full-ms (if is-full? now (or (:last-full-ms cached) now))}))
{:content new-content
:diff diff-result
:changed (not= :unchanged (:type diff-result))}))
:diff diff-with-clear
:changed (not= :unchanged (:type diff-result))
:clear-indices (or clear-indices [])
:scroll-to-line last-clear-line}))
(defn get-cached-content
"Get cached content for a session without capturing.