init codebase

This commit is contained in:
2026-02-17 17:30:45 -05:00
parent a3b28549b4
commit f7e2755a91
175 changed files with 21600 additions and 232 deletions
+12
View File
@@ -0,0 +1,12 @@
FROM clojure:temurin-21-tools-deps AS builder
WORKDIR /app
COPY deps.edn build.clj ./
COPY shared/ shared/
COPY tui-sm/ tui-sm/
RUN clj -T:build uber :module tui-sm
FROM eclipse-temurin:21-jre-alpine
WORKDIR /app
COPY --from=builder /app/tui-sm/target/tui-sm.jar app.jar
EXPOSE 3003
CMD ["java", "-jar", "app.jar"]
+8
View File
@@ -0,0 +1,8 @@
{:server {:port 3003
:host "0.0.0.0"
:max-connections 5000}
:api {:base-url "http://localhost:3001"}
:nats {:url "nats://localhost:4222"}
:sse {:ping-interval-ms 30000
:backpressure-limit 1000}
:presence {:batch-interval-ms 60000}}
+166 -2
View File
@@ -1,5 +1,169 @@
(ns ajet.chat.tui-sm.core
"TUI session manager — http-kit WebSocket/SSE for terminal clients.")
"TUI session manager — http-kit SSE for terminal clients.
Manages the full lifecycle: NATS connection, connection tracker,
ping scheduler, presence flusher, and HTTP server.
System state held in a single atom for REPL-driven development.
No direct PostgreSQL access — all data flows through the API service
via the shared api-client, and real-time events via NATS."
(:refer-clojure :exclude [reset!])
(:require [clojure.tools.logging :as log]
[org.httpkit.server :as http-kit]
[ajet.chat.shared.config :as config]
[ajet.chat.shared.eventbus :as eventbus]
[ajet.chat.tui-sm.routes :as routes]
[ajet.chat.tui-sm.sse :as sse]
[ajet.chat.tui-sm.presence :as presence])
(:gen-class))
(defonce system (atom nil))
;;; ---------------------------------------------------------------------------
;;; Ping Scheduler
;;; ---------------------------------------------------------------------------
(defn- start-ping-scheduler!
"Start a background thread that sends keepalive pings to all connected
clients at the configured interval (default 30s).
Returns the future running the scheduler."
[interval-ms]
(let [running (volatile! true)]
(future
(log/info "Ping scheduler started, interval:" interval-ms "ms")
(while @running
(try
(Thread/sleep interval-ms)
(sse/ping-all!)
(catch InterruptedException _
(vreset! running false))
(catch Exception e
(log/error e "Error in ping scheduler"))))
(log/info "Ping scheduler stopped"))))
;;; ---------------------------------------------------------------------------
;;; Lifecycle
;;; ---------------------------------------------------------------------------
(defn start!
"Start the TUI session manager service.
Connects to NATS, initializes connection tracking, starts the ping
scheduler and presence flusher, then starts the HTTP server."
[& [{:keys [config-overrides]}]]
(when @system
(log/warn "System already started -- call (stop!) first")
(throw (ex-info "System already running" {})))
(let [cfg (config/load-config {:resource "tui-sm-config.edn"})
cfg (if config-overrides
(merge cfg config-overrides)
cfg)
_ (log/info "Loaded config:" (config/redact cfg))
;; NATS connection
nats (eventbus/connect! (:nats cfg))
_ (log/info "Connected to NATS")
;; Connection tracker (atom lives in sse ns)
_ (clojure.core/reset! sse/connections {})
_ (log/info "Connection tracker initialized")
;; System map (no DB, no S3 for TUI SM)
sys {:config cfg
:nats nats
:connections sse/connections}
;; HTTP server
handler (routes/app sys)
port (get-in cfg [:server :port] 3003)
host (get-in cfg [:server :host] "0.0.0.0")
server (http-kit/run-server handler {:port port :ip host
:max-body (* 1 1024 1024)})
_ (log/info (str "TUI SM HTTP server started on " host ":" port))
;; Ping scheduler
ping-ms (get-in cfg [:sse :ping-interval-ms] 30000)
ping-fut (start-ping-scheduler! ping-ms)
_ (log/info "Ping scheduler started")
;; Presence flush scheduler
pres-ms (get-in cfg [:presence :batch-interval-ms] 60000)
pres-fut (presence/start-flush-scheduler!
sse/broadcast-presence-changes!
pres-ms)
_ (log/info "Presence flush scheduler started")]
(clojure.core/reset! system (assoc sys
:server server
:port port
:ping-fut ping-fut
:pres-fut pres-fut))
(log/info (str "TUI session manager fully started on port " port))
@system))
(defn stop!
"Stop the TUI session manager service.
Shuts down HTTP server, ping scheduler, presence flusher, NATS,
and clears all connection state."
[]
(when-let [sys @system]
(log/info "Shutting down TUI session manager...")
;; Stop ping scheduler
(when-let [f (:ping-fut sys)]
(future-cancel f)
(log/info "Ping scheduler stopped"))
;; Stop presence flush scheduler
(presence/stop-flush-scheduler!)
(log/info "Presence flush scheduler stopped")
;; Stop HTTP server (wait up to 30s for in-flight requests)
(when-let [server (:server sys)]
(server :timeout 30000)
(log/info "HTTP server stopped"))
;; Clean up all SSE connections (unsubscribe NATS)
(when-let [nats (:nats sys)]
(sse/reset-connections! nats)
(log/info "All SSE connections cleaned up"))
;; Close NATS connection
(when-let [nats (:nats sys)]
(try
(eventbus/close! nats)
(log/info "NATS connection closed")
(catch Exception e
(log/error e "Error closing NATS connection"))))
;; Reset presence state
(presence/reset-state!)
(clojure.core/reset! system nil)
(log/info "TUI session manager stopped")))
(defn reset!
"Stop then start the system (REPL convenience)."
[]
(stop!)
(start!))
;;; ---------------------------------------------------------------------------
;;; Entry point
;;; ---------------------------------------------------------------------------
(defn -main [& _args]
(println "ajet-chat TUI session manager starting..."))
(start!)
;; Graceful shutdown hook
(.addShutdownHook
(Runtime/getRuntime)
(Thread. ^Runnable (fn []
(log/info "Shutdown hook triggered")
(stop!))))
;; Block main thread
@(promise))
+351
View File
@@ -0,0 +1,351 @@
(ns ajet.chat.tui-sm.handlers
"Signal handlers for TUI session manager.
All POST/GET /tui/* handlers (except SSE and health). Each handler
parses the JSON body, calls the API via the shared api-client, and
returns a JSON response.
Auth is handled by Auth GW upstream -- these handlers receive
X-User-Id, X-User-Role headers on every request."
(:require [clojure.data.json :as json]
[clojure.tools.logging :as log]
[ajet.chat.shared.api-client :as api]
[ajet.chat.shared.eventbus :as eventbus]
[ajet.chat.tui-sm.sse :as sse]))
;;; ---------------------------------------------------------------------------
;;; Response Helpers
;;; ---------------------------------------------------------------------------
(defn- json-response
"Build a Ring response with JSON content type."
([status body]
{:status status
:headers {"Content-Type" "application/json"}
:body (json/write-str body)})
([body]
(json-response 200 body)))
(defn- error-response
"Build a JSON error response."
([status code message]
(json-response status {:error {:code code :message message}}))
([status code message details]
(json-response status {:error {:code code :message message :details details}})))
(defn- require-user-id
"Check that user-id is present on the request. Returns nil if valid,
or an error response if missing."
[request]
(when-not (:user-id request)
(error-response 401 "UNAUTHORIZED" "Missing X-User-Id header")))
(defn- api-ctx
"Build API client context from the request."
[request]
(let [config (get-in request [:system :config])
base-url (get-in config [:api :base-url] "http://localhost:3001")]
(api/request->ctx request base-url)))
(defn- with-api-call
"Execute an API call, returning the result as JSON or an error response."
[request api-fn]
(or (require-user-id request)
(try
(let [result (api-fn (api-ctx request))]
(json-response 200 result))
(catch clojure.lang.ExceptionInfo e
(let [data (ex-data e)]
(if (= :ajet.chat/api-error (:type data))
(json-response (:status data) (or (:body data)
{:error {:code "API_ERROR"
:message (ex-message e)}}))
(do
(log/error e "Handler error" (pr-str data))
(error-response 500 "INTERNAL_ERROR" "An internal error occurred")))))
(catch Exception e
(log/error e "Unexpected handler error")
(error-response 500 "INTERNAL_ERROR" "An internal error occurred")))))
;;; ---------------------------------------------------------------------------
;;; Message Handlers
;;; ---------------------------------------------------------------------------
(defn send-message
"POST /tui/messages - Send a message to a channel.
Body: {:channel-id uuid, :body-md string, :parent-id uuid?}"
[request]
(with-api-call request
(fn [ctx]
(let [body (:body-params request)
channel-id (:channel-id body)]
(when-not channel-id
(throw (ex-info "channel-id is required"
{:type :ajet.chat/validation-error})))
(when-not (:body-md body)
(throw (ex-info "body-md is required"
{:type :ajet.chat/validation-error})))
(api/send-message ctx channel-id
(select-keys body [:body-md :parent-id]))))))
(defn edit-message
"POST /tui/messages/:id/edit - Edit an existing message.
Body: {:body-md string}"
[request]
(with-api-call request
(fn [ctx]
(let [message-id (get-in request [:path-params :id])
body (:body-params request)]
(when-not (:body-md body)
(throw (ex-info "body-md is required"
{:type :ajet.chat/validation-error})))
(api/edit-message ctx message-id (select-keys body [:body-md]))))))
(defn delete-message
"POST /tui/messages/:id/delete - Delete a message.
No body required."
[request]
(with-api-call request
(fn [ctx]
(let [message-id (get-in request [:path-params :id])]
(api/delete-message ctx message-id)))))
;;; ---------------------------------------------------------------------------
;;; Reaction Handlers
;;; ---------------------------------------------------------------------------
(defn add-reaction
"POST /tui/reactions - Add a reaction to a message.
Body: {:message-id uuid, :emoji string}"
[request]
(with-api-call request
(fn [ctx]
(let [body (:body-params request)
message-id (:message-id body)
emoji (:emoji body)]
(when-not message-id
(throw (ex-info "message-id is required"
{:type :ajet.chat/validation-error})))
(when-not emoji
(throw (ex-info "emoji is required"
{:type :ajet.chat/validation-error})))
(api/add-reaction ctx message-id emoji)))))
(defn remove-reaction
"POST /tui/reactions/remove - Remove a reaction from a message.
Body: {:message-id uuid, :emoji string}"
[request]
(with-api-call request
(fn [ctx]
(let [body (:body-params request)
message-id (:message-id body)
emoji (:emoji body)]
(when-not message-id
(throw (ex-info "message-id is required"
{:type :ajet.chat/validation-error})))
(when-not emoji
(throw (ex-info "emoji is required"
{:type :ajet.chat/validation-error})))
(api/remove-reaction ctx message-id emoji)))))
;;; ---------------------------------------------------------------------------
;;; Navigation
;;; ---------------------------------------------------------------------------
(defn navigate
"POST /tui/navigate - Switch the user's active community/channel.
Body: {:community-id uuid, :channel-id uuid?}
Updates connection tracking and re-subscribes NATS subjects.
Returns the channel's messages if a channel-id is provided."
[request]
(or (require-user-id request)
(try
(let [ctx (api-ctx request)
body (:body-params request)
user-id (:user-id request)
community-id (:community-id body)
channel-id (:channel-id body)
nats-conn (get-in request [:system :nats])
config (get-in request [:system :config])
bp-limit (get-in config [:sse :backpressure-limit] 1000)]
(when-not community-id
(throw (ex-info "community-id is required"
{:type :ajet.chat/validation-error})))
;; Re-subscribe NATS to the new community
(sse/resubscribe! nats-conn user-id community-id bp-limit)
;; Update active channel in connection tracker
(when (sse/get-connection user-id)
(swap! sse/connections assoc-in [user-id :active-channel] channel-id))
;; Fetch channel messages if channel-id provided
(let [result (if channel-id
(let [messages (api/get-messages ctx channel-id {:limit 50})]
{:community-id community-id
:channel-id channel-id
:messages messages})
{:community-id community-id
:channel-id nil
:messages []})]
(json-response 200 result)))
(catch clojure.lang.ExceptionInfo e
(let [data (ex-data e)]
(if (= :ajet.chat/api-error (:type data))
(json-response (:status data) (or (:body data)
{:error {:code "API_ERROR"
:message (ex-message e)}}))
(if (= :ajet.chat/validation-error (:type data))
(error-response 422 "VALIDATION_ERROR" (ex-message e))
(do
(log/error e "Navigate error" (pr-str data))
(error-response 500 "INTERNAL_ERROR" "An internal error occurred"))))))
(catch Exception e
(log/error e "Unexpected navigate error")
(error-response 500 "INTERNAL_ERROR" "An internal error occurred")))))
;;; ---------------------------------------------------------------------------
;;; Read Tracking
;;; ---------------------------------------------------------------------------
(defn mark-read
"POST /tui/read - Mark a channel as read.
Body: {:channel-id uuid, :last-message-id uuid?}"
[request]
(with-api-call request
(fn [ctx]
(let [body (:body-params request)
channel-id (:channel-id body)]
(when-not channel-id
(throw (ex-info "channel-id is required"
{:type :ajet.chat/validation-error})))
(api/mark-read ctx channel-id
(select-keys body [:last-message-id]))))))
;;; ---------------------------------------------------------------------------
;;; Typing Indicator
;;; ---------------------------------------------------------------------------
(defn typing
"POST /tui/typing - Send a typing indicator.
Body: {:channel-id uuid, :community-id uuid}
Publishes directly to NATS for low latency (bypasses API)."
[request]
(or (require-user-id request)
(try
(let [body (:body-params request)
user-id (:user-id request)
channel-id (:channel-id body)
community-id (:community-id body)
nats-conn (get-in request [:system :nats])]
(when-not channel-id
(throw (ex-info "channel-id is required"
{:type :ajet.chat/validation-error})))
(when-not community-id
(throw (ex-info "community-id is required"
{:type :ajet.chat/validation-error})))
;; Publish typing indicator directly to NATS
(eventbus/publish! nats-conn
(str "chat.typing." community-id "." channel-id)
:typing/started
{:user-id user-id
:channel-id channel-id
:community-id community-id})
(json-response 200 {:ok true}))
(catch clojure.lang.ExceptionInfo e
(let [data (ex-data e)]
(if (= :ajet.chat/validation-error (:type data))
(error-response 422 "VALIDATION_ERROR" (ex-message e))
(do
(log/error e "Typing handler error")
(error-response 500 "INTERNAL_ERROR" "An internal error occurred")))))
(catch Exception e
(log/error e "Unexpected typing error")
(error-response 500 "INTERNAL_ERROR" "An internal error occurred")))))
;;; ---------------------------------------------------------------------------
;;; Heartbeat
;;; ---------------------------------------------------------------------------
(defn heartbeat
"POST /tui/heartbeat - Client heartbeat. Proxied to the API."
[request]
(with-api-call request
(fn [ctx]
(api/heartbeat ctx))))
;;; ---------------------------------------------------------------------------
;;; Fetch Messages
;;; ---------------------------------------------------------------------------
(defn fetch-messages
"GET /tui/messages - Fetch messages for a channel (paginated).
Query params: channel-id (required), before (uuid?), limit (int?)"
[request]
(with-api-call request
(fn [ctx]
(let [params (:query-params request)
channel-id (get params "channel-id")]
(when-not channel-id
(throw (ex-info "channel-id query parameter is required"
{:type :ajet.chat/validation-error})))
(api/get-messages ctx channel-id
(cond-> {}
(get params "before") (assoc :before (get params "before"))
(get params "limit") (assoc :limit (parse-long (get params "limit")))))))))
;;; ---------------------------------------------------------------------------
;;; Search
;;; ---------------------------------------------------------------------------
(defn search
"GET /tui/search - Search messages. Proxied to API search.
Query params: q (required), type, community-id, channel-id, from, after, before, limit"
[request]
(with-api-call request
(fn [ctx]
(let [params (:query-params request)
q (get params "q")]
(when-not q
(throw (ex-info "q query parameter is required"
{:type :ajet.chat/validation-error})))
(api/search ctx
(cond-> {:q q}
(get params "type") (assoc :type (keyword (get params "type")))
(get params "community-id") (assoc :community-id (get params "community-id"))
(get params "channel-id") (assoc :channel-id (get params "channel-id"))
(get params "from") (assoc :from (get params "from"))
(get params "after") (assoc :after (get params "after"))
(get params "before") (assoc :before (get params "before"))
(get params "limit") (assoc :limit (parse-long (get params "limit")))))))))
;;; ---------------------------------------------------------------------------
;;; Slash Commands
;;; ---------------------------------------------------------------------------
(defn slash-command
"POST /tui/command - Execute a slash command. Proxied to API.
Body: {:command string, :args string?, :channel-id uuid?, :community-id uuid?}"
[request]
(with-api-call request
(fn [ctx]
(let [body (:body-params request)]
(when-not (:command body)
(throw (ex-info "command is required"
{:type :ajet.chat/validation-error})))
(api/execute-command ctx body)))))
+130
View File
@@ -0,0 +1,130 @@
(ns ajet.chat.tui-sm.presence
"Presence batching for the TUI session manager.
Collects presence events over configurable windows (default 60s),
then diffs against the previous state and sends only changes as
presence.update SSE events to connected clients.
Typing indicators are forwarded immediately (not batched)."
(:require [clojure.tools.logging :as log])
(:import [java.time Instant]))
;;; ---------------------------------------------------------------------------
;;; State
;;; ---------------------------------------------------------------------------
;; Current window accumulator: {community-id {user-id {:status "online" :last-seen instant}}}
(defonce ^:private current-window (atom {}))
;; Previous flush snapshot: same shape as current-window
(defonce ^:private previous-snapshot (atom {}))
;; The scheduled flush future
(defonce ^:private flush-future (atom nil))
;;; ---------------------------------------------------------------------------
;;; Accumulate
;;; ---------------------------------------------------------------------------
(defn record-presence!
"Record a presence event into the current batch window.
community-id: string UUID of the community
user-id: string UUID of the user
status: string like \"online\", \"idle\", \"offline\""
[community-id user-id status]
(swap! current-window assoc-in [community-id user-id]
{:status status
:last-seen (.toString (Instant/now))}))
;;; ---------------------------------------------------------------------------
;;; Diff & Flush
;;; ---------------------------------------------------------------------------
(defn- diff-presence
"Compare current and previous presence maps. Returns a map of
{community-id [{:user-id ... :status ... :last-seen ...}]} containing
only entries that changed."
[current previous]
(reduce-kv
(fn [acc community-id users]
(let [prev-users (get previous community-id {})
changes (reduce-kv
(fn [ch user-id state]
(let [prev-state (get prev-users user-id)]
(if (= state prev-state)
ch
(conj ch (assoc state :user-id user-id)))))
[]
users)]
(if (seq changes)
(assoc acc community-id changes)
acc)))
{}
current))
(defn flush!
"Flush the current presence window. Computes diff against the previous
snapshot and returns the changes. Resets the window for the next cycle.
Returns: {community-id [{:user-id ... :status ... :last-seen ...}]}
or nil if no changes."
[]
(let [current @current-window
previous @previous-snapshot
changes (diff-presence current previous)]
(reset! previous-snapshot current)
(reset! current-window {})
(when (seq changes)
changes)))
;;; ---------------------------------------------------------------------------
;;; Scheduled Flush
;;; ---------------------------------------------------------------------------
(defn start-flush-scheduler!
"Start a background thread that flushes presence every interval-ms
milliseconds and calls broadcast-fn with the changes.
broadcast-fn: (fn [changes]) where changes is the diff map from flush!
interval-ms: flush interval in milliseconds (default 60000)
Returns: the future running the scheduler."
[broadcast-fn interval-ms]
(let [running (volatile! true)
f (future
(log/info "Presence flush scheduler started, interval:" interval-ms "ms")
(while @running
(try
(Thread/sleep interval-ms)
(when-let [changes (flush!)]
(try
(broadcast-fn changes)
(catch Exception e
(log/error e "Error broadcasting presence changes"))))
(catch InterruptedException _
(vreset! running false))
(catch Exception e
(log/error e "Error in presence flush loop"))))
(log/info "Presence flush scheduler stopped"))]
(reset! flush-future f)
f))
(defn stop-flush-scheduler!
"Stop the presence flush scheduler."
[]
(when-let [f @flush-future]
(future-cancel f)
(reset! flush-future nil)
(log/info "Presence flush scheduler cancelled")))
;;; ---------------------------------------------------------------------------
;;; Reset (for REPL)
;;; ---------------------------------------------------------------------------
(defn reset-state!
"Reset all presence state. For use during REPL development."
[]
(stop-flush-scheduler!)
(reset! current-window {})
(reset! previous-snapshot {}))
+186
View File
@@ -0,0 +1,186 @@
(ns ajet.chat.tui-sm.routes
"Reitit router for the TUI session manager.
All routes are under /tui. SSE events stream at /tui/sse/events,
all other endpoints are JSON request/response.
Middleware pipeline: exception handling -> request logging ->
system injection -> JSON body parsing -> user context extraction ->
route dispatch."
(:require [reitit.ring :as ring]
[reitit.ring.middleware.parameters :as parameters]
[clojure.data.json :as json]
[clojure.string :as str]
[clojure.tools.logging :as log]
[ajet.chat.shared.eventbus :as eventbus]
[ajet.chat.shared.logging :as logging]
[ajet.chat.tui-sm.sse :as sse]
[ajet.chat.tui-sm.handlers :as handlers]))
;;; ---------------------------------------------------------------------------
;;; Response Helpers
;;; ---------------------------------------------------------------------------
(defn- json-response
[status body]
{:status status
:headers {"Content-Type" "application/json"}
:body (json/write-str body)})
(defn- error-response
[status code message]
(json-response status {:error {:code code :message message}}))
;;; ---------------------------------------------------------------------------
;;; Middleware
;;; ---------------------------------------------------------------------------
(defn- wrap-exception-handler
"Catch-all exception handler."
[handler]
(fn [request]
(try
(handler request)
(catch clojure.lang.ExceptionInfo e
(let [data (ex-data e)
typ (:type data)]
(cond
(= typ :ajet.chat/validation-error)
(error-response 422 "VALIDATION_ERROR" (ex-message e))
(= typ :ajet.chat/api-error)
(json-response (:status data)
(or (:body data)
{:error {:code "API_ERROR" :message (ex-message e)}}))
:else
(do
(log/error e "Unhandled exception" (pr-str data))
(error-response 500 "INTERNAL_ERROR" "An internal error occurred")))))
(catch Exception e
(log/error e "Unhandled exception")
(error-response 500 "INTERNAL_ERROR" "An internal error occurred")))))
(defn- wrap-system
"Inject system map into the request."
[handler system]
(fn [request]
(handler (assoc request :system system))))
(defn- wrap-json-body
"Parse JSON request body into :body-params."
[handler]
(fn [request]
(let [content-type (get-in request [:headers "content-type"] "")]
(if (and (:body request)
(or (.contains content-type "application/json")
(.contains content-type "text/json")))
(try
(let [body-str (slurp (:body request))
parsed (when-not (str/blank? body-str)
(json/read-str body-str :key-fn keyword))]
(handler (assoc request :body-params (or parsed {}))))
(catch Exception _
(error-response 400 "INVALID_JSON" "Request body is not valid JSON")))
(handler (assoc request :body-params {}))))))
(defn- wrap-user-context
"Extract user context from Auth GW headers."
[handler]
(fn [request]
(let [headers (:headers request)
user-id (get headers "x-user-id")
user-role (get headers "x-user-role")
community-id (get headers "x-community-id")]
(handler (cond-> request
user-id (assoc :user-id user-id)
user-role (assoc :user-role user-role)
community-id (assoc :community-id community-id))))))
;;; ---------------------------------------------------------------------------
;;; Health Check
;;; ---------------------------------------------------------------------------
(defn health-check
"GET /tui/health - Health check endpoint."
[request]
(let [nats-conn (get-in request [:system :nats])
nats-ok (try
(and nats-conn
(eventbus/connected? nats-conn))
(catch Exception _ false))]
(json-response (if nats-ok 200 503)
{:status (if nats-ok "healthy" "degraded")
:connections (sse/connection-count)
:nats (if nats-ok "connected" "disconnected")})))
;;; ---------------------------------------------------------------------------
;;; Router
;;; ---------------------------------------------------------------------------
(defn router
"Build the reitit router with all TUI routes."
[]
(ring/router
[["/tui"
;; Health check - no auth required
["/health" {:get {:handler health-check}}]
;; SSE event stream
["/sse/events" {:get {:handler sse/sse-handler}}]
;; Messages
["/messages"
["" {:get {:handler handlers/fetch-messages}
:post {:handler handlers/send-message}}]
["/:id/edit" {:post {:handler handlers/edit-message}}]
["/:id/delete" {:post {:handler handlers/delete-message}}]]
;; Reactions
["/reactions"
["" {:post {:handler handlers/add-reaction}}]
["/remove" {:post {:handler handlers/remove-reaction}}]]
;; Navigation
["/navigate" {:post {:handler handlers/navigate}}]
;; Read tracking
["/read" {:post {:handler handlers/mark-read}}]
;; Typing indicator
["/typing" {:post {:handler handlers/typing}}]
;; Heartbeat
["/heartbeat" {:post {:handler handlers/heartbeat}}]
;; Search
["/search" {:get {:handler handlers/search}}]
;; Slash commands
["/command" {:post {:handler handlers/slash-command}}]]]
{:data {:middleware [parameters/parameters-middleware]}}))
;;; ---------------------------------------------------------------------------
;;; App
;;; ---------------------------------------------------------------------------
(defn app
"Build the full Ring handler with middleware stack.
system: map with :config, :nats, :connections keys."
[system]
(let [handler (ring/ring-handler
(router)
(ring/create-default-handler
{:not-found
(constantly (error-response 404 "NOT_FOUND" "Route not found"))
:method-not-allowed
(constantly (error-response 405 "METHOD_NOT_ALLOWED" "Method not allowed"))}))]
(-> handler
wrap-user-context
wrap-json-body
(wrap-system system)
logging/wrap-request-logging
wrap-exception-handler)))
+387
View File
@@ -0,0 +1,387 @@
(ns ajet.chat.tui-sm.sse
"SSE endpoint for TUI clients.
Uses http-kit's with-channel for async streaming. Each connected client
gets a dedicated SSE stream with JSON-structured events.
Connection lifecycle:
1. Client connects to GET /tui/sse/events
2. Server builds init event (fetches communities, channels, DMs, user from API)
3. Server subscribes to relevant NATS subjects
4. Events flow: NATS -> convert to SSE event type -> send to client
5. On disconnect: unsubscribe NATS, remove from connection tracker
Connection tracking:
{user-id {:sse-channel ch
:active-community uuid
:active-channel uuid
:nats-subs [dispatchers]
:last-event-id int
:connected-at instant}}"
(:require [org.httpkit.server :as http-kit]
[clojure.data.json :as json]
[clojure.tools.logging :as log]
[ajet.chat.shared.api-client :as api]
[ajet.chat.shared.eventbus :as eventbus]
[ajet.chat.tui-sm.presence :as presence])
(:import [java.time Instant]))
;;; ---------------------------------------------------------------------------
;;; Connection Tracker
;;; ---------------------------------------------------------------------------
;; Atom holding all active connections: {user-id conn-info-map}
(defonce connections (atom {}))
(defn connection-count
"Return the current number of active SSE connections."
[]
(count @connections))
(defn get-connection
"Retrieve connection info for a user."
[user-id]
(get @connections user-id))
(defn- update-connection!
"Update a field in a user's connection tracking entry."
[user-id k v]
(swap! connections assoc-in [user-id k] v))
;;; ---------------------------------------------------------------------------
;;; SSE Wire Format
;;; ---------------------------------------------------------------------------
(defn- format-sse-event
"Format a single SSE event as a string.
Returns: \"event: <type>\\nid: <id>\\ndata: <json>\\n\\n\""
[event-type event-id data]
(str "event: " event-type "\n"
"id: " event-id "\n"
"data: " (json/write-str data) "\n\n"))
(defn- next-event-id!
"Atomically increment and return the next event ID for a connection."
[user-id]
(let [result (swap! connections update-in [user-id :last-event-id] (fnil inc 0))]
(get-in result [user-id :last-event-id])))
(defn send-event!
"Send an SSE event to a specific user's connection.
Returns true if sent, false if the connection is gone or buffer exceeded."
[user-id event-type data]
(when-let [conn (get-connection user-id)]
(let [ch (:sse-channel conn)]
(when (and ch (http-kit/open? ch))
(let [event-id (next-event-id! user-id)
payload (format-sse-event event-type event-id data)]
(http-kit/send! ch payload false)
true)))))
(defn send-ping!
"Send a keepalive ping event to a user's connection."
[user-id]
(send-event! user-id "ping" {}))
;;; ---------------------------------------------------------------------------
;;; NATS Event Mapping
;;; ---------------------------------------------------------------------------
(def ^:private nats-type->sse-type
"Map NATS event :type keywords to SSE event type strings."
{:message/created "message.new"
:message/updated "message.edit"
:message/deleted "message.delete"
:reaction/added "reaction.add"
:reaction/removed "reaction.remove"
:typing/started "typing.start"
:typing/stopped "typing.stop"
:presence/updated "presence.update"
:channel/created "channel.new"
:channel/updated "channel.update"
:channel/deleted "channel.delete"
:member/joined "member.join"
:member/left "member.leave"
:notification/new "notification"
:unread/updated "unread.update"})
(defn- nats-event->sse
"Convert a NATS event envelope to [sse-event-type data-map].
Returns nil if the event type is unknown."
[nats-event]
(let [event-type (:type nats-event)
sse-type (get nats-type->sse-type event-type)]
(when sse-type
[sse-type (:payload nats-event)])))
;;; ---------------------------------------------------------------------------
;;; NATS Subscription Management
;;; ---------------------------------------------------------------------------
(defn- make-nats-handler
"Create a NATS message handler that converts events and sends to the SSE client.
Implements backpressure: disconnects client if buffer exceeds limit."
[user-id backpressure-limit]
(fn [nats-event]
(try
(let [conn (get-connection user-id)]
;; Check backpressure -- disconnect if exceeded
(if (and conn (> (:last-event-id conn 0) backpressure-limit))
(do
(log/warn "Backpressure limit exceeded for user" user-id
"- disconnecting client")
(when-let [ch (:sse-channel conn)]
(http-kit/close ch)))
;; Normal event processing
(do
;; Accumulate presence events for batching
(when (= :presence/updated (:type nats-event))
(let [payload (:payload nats-event)]
(presence/record-presence!
(:community-id payload)
(:user-id payload)
(:status payload))))
;; Convert and send (skip presence -- those are batched)
(when (not= :presence/updated (:type nats-event))
(when-let [[sse-type data] (nats-event->sse nats-event)]
(send-event! user-id sse-type data))))))
(catch Exception e
(log/error e "Error processing NATS event for user" user-id)))))
(defn- subscribe-user!
"Subscribe a user to their relevant NATS subjects.
Returns a vector of dispatcher handles."
[nats-conn user-id community-id backpressure-limit]
(let [handler (make-nats-handler user-id backpressure-limit)
community-sub (when community-id
(eventbus/subscribe!
nats-conn
(str "chat.events." community-id)
handler))
presence-sub (when community-id
(eventbus/subscribe!
nats-conn
(str "chat.presence." community-id)
handler))
typing-sub (when community-id
(eventbus/subscribe!
nats-conn
(str "chat.typing." community-id ".>")
handler))
notif-sub (eventbus/subscribe!
nats-conn
(str "chat.notifications." user-id)
handler)]
(filterv some? [community-sub presence-sub typing-sub notif-sub])))
(defn- unsubscribe-all!
"Unsubscribe from all NATS dispatchers for a connection."
[nats-conn dispatchers]
(doseq [d dispatchers]
(try
(eventbus/unsubscribe! nats-conn d)
(catch Exception e
(log/error e "Error unsubscribing NATS dispatcher")))))
;;; ---------------------------------------------------------------------------
;;; Init Event
;;; ---------------------------------------------------------------------------
(defn- build-init-event
"Fetch initial data from the API and build the init event payload.
Returns a map with communities, channels, dms, user, and unread counts."
[ctx]
(let [user (api/get-me ctx)
communities (api/get-communities ctx)
;; For each community, fetch channels
comm-channels (reduce
(fn [acc comm]
(let [cid (or (:id comm) (:community_id comm))]
(try
(assoc acc (str cid) (api/get-channels ctx cid))
(catch Exception e
(log/warn "Failed to fetch channels for community"
cid (ex-message e))
acc))))
{}
communities)
dms (api/get-dms ctx)
unread (try (api/get-unread-count ctx)
(catch Exception _e {:count 0}))]
{:user user
:communities communities
:channels comm-channels
:dms dms
:unread unread
:server-time (.toString (Instant/now))}))
;;; ---------------------------------------------------------------------------
;;; Resubscription (for navigate)
;;; ---------------------------------------------------------------------------
(defn resubscribe!
"Unsubscribe from old NATS subjects and subscribe to new ones for the
user's active community. Called when the user navigates to a different
community."
[nats-conn user-id new-community-id backpressure-limit]
(let [conn (get-connection user-id)]
(when conn
;; Unsubscribe old
(unsubscribe-all! nats-conn (:nats-subs conn))
;; Subscribe new
(let [new-subs (subscribe-user! nats-conn user-id new-community-id
backpressure-limit)]
(swap! connections update user-id assoc
:nats-subs new-subs
:active-community new-community-id)
new-subs))))
;;; ---------------------------------------------------------------------------
;;; Disconnect
;;; ---------------------------------------------------------------------------
(defn- disconnect!
"Clean up a user's connection: unsubscribe NATS, remove from tracker."
[nats-conn user-id]
(when-let [conn (get-connection user-id)]
(log/info "Disconnecting user" user-id)
(unsubscribe-all! nats-conn (:nats-subs conn))
(swap! connections dissoc user-id)))
;;; ---------------------------------------------------------------------------
;;; SSE Endpoint Handler
;;; ---------------------------------------------------------------------------
(defn sse-handler
"Ring handler for GET /tui/sse/events.
Opens an SSE stream via http-kit's with-channel. On connect:
1. Checks max connections
2. Builds init event from API
3. Subscribes to NATS
4. Sends init event
On close: unsubscribes NATS, removes from tracker."
[request]
(let [system (:system request)
nats-conn (:nats system)
config (:config system)
max-conns (get-in config [:server :max-connections] 5000)
bp-limit (get-in config [:sse :backpressure-limit] 1000)
api-base-url (get-in config [:api :base-url] "http://localhost:3001")
user-id (:user-id request)]
(if-not user-id
{:status 401
:headers {"Content-Type" "application/json"}
:body (json/write-str {:error {:code "UNAUTHORIZED"
:message "Missing X-User-Id header"}})}
(if (>= (connection-count) max-conns)
{:status 503
:headers {"Content-Type" "application/json"}
:body (json/write-str {:error {:code "SERVICE_UNAVAILABLE"
:message "Maximum connections exceeded"}})}
(http-kit/with-channel request ch
(if (not (http-kit/open? ch))
(log/warn "Channel closed immediately for user" user-id)
(do
(log/info "SSE connection opened for user" user-id)
;; Set SSE headers
(http-kit/send! ch
{:status 200
:headers {"Content-Type" "text/event-stream"
"Cache-Control" "no-cache"
"Connection" "keep-alive"
"X-Accel-Buffering" "no"}}
false)
;; Build context for API calls
(let [ctx (api/request->ctx request api-base-url)]
;; Register on-close before doing anything async
(http-kit/on-close ch
(fn [_status]
(log/info "SSE connection closed for user" user-id)
(disconnect! nats-conn user-id)))
;; Build init event
(let [init-data (try
(build-init-event ctx)
(catch Exception e
(log/error e "Failed to build init event for user"
user-id)
{:error "Failed to load initial data"
:server-time (.toString (Instant/now))}))]
;; Track the connection
(swap! connections assoc user-id
{:sse-channel ch
:active-community nil
:active-channel nil
:nats-subs []
:last-event-id 0
:connected-at (.toString (Instant/now))})
;; Send init event
(let [event-id (next-event-id! user-id)]
(http-kit/send! ch
(format-sse-event "init" event-id init-data)
false))
;; Subscribe to user-level NATS subjects (notifications)
;; Community-level subs happen when user navigates
(let [notif-handler (make-nats-handler user-id bp-limit)
notif-sub (eventbus/subscribe!
nats-conn
(str "chat.notifications." user-id)
notif-handler)]
(update-connection! user-id :nats-subs [notif-sub])))))))))))
;;; ---------------------------------------------------------------------------
;;; Broadcast Helpers
;;; ---------------------------------------------------------------------------
(defn broadcast-to-community!
"Send an SSE event to all users currently viewing a given community."
[community-id event-type data]
(doseq [[user-id conn] @connections]
(when (= (str community-id) (str (:active-community conn)))
(send-event! user-id event-type data))))
(defn broadcast-presence-changes!
"Broadcast batched presence changes to connected clients.
Called by the presence flush scheduler.
changes: {community-id [{:user-id ... :status ... :last-seen ...}]}"
[changes]
(doseq [[community-id user-changes] changes]
(broadcast-to-community! community-id "presence.update"
{:community-id community-id
:users user-changes})))
;;; ---------------------------------------------------------------------------
;;; Ping All
;;; ---------------------------------------------------------------------------
(defn ping-all!
"Send a keepalive ping to every connected client."
[]
(doseq [[user-id _conn] @connections]
(try
(send-ping! user-id)
(catch Exception e
(log/error e "Error sending ping to user" user-id)))))
;;; ---------------------------------------------------------------------------
;;; Reset (for REPL)
;;; ---------------------------------------------------------------------------
(defn reset-connections!
"Close all connections and clear tracking state."
[nats-conn]
(doseq [[user-id _conn] @connections]
(disconnect! nats-conn user-id))
(reset! connections {}))