Jun 14

Clojure Websockets server and event sourcing

In my previous post I explained what event sourcing is and why is it so interesting. Today I’m sharing a minimal multi-threaded implementation of websockets server that uses event sourcing. My language of choice is Clojure. I used core.async channels to enable communication between threads. Alternatively, you may use blocking queues and manually create threads – it may be more performant but I haven’t tested it. Another alternative is to use Disruptor for inter-thread communication – it is probably the most performant method but I also haven’t checked it yet.

(ns event-sourcing-sample.core
  (:require [org.httpkit.server :as httpkit :refer :all]
            [ clojure.edn :as edn]))

(require '[ clojure.core.async :as async :refer [chan <! >! go close! go-loop]])

(def conn-hub (atom {}))

(def unmarshal-ch (chan 10))
(def journaling-ch (chan 10))
(def logic-ch (chan 10))
(def marshal-ch (chan 10))

(defn dummy-business-logic [state data user-id]
  {:state (update-in state [:id] (fnil inc 0))
   :events {:event :all-ok :broadcast user-id}})

;unmarshaller loop
(go-loop []
         (let [{:keys [body user-id]} (<! unmarshal-ch)]
             (let [data (edn/read-string body)]
               ;pass unmarshalled data to journaler
               (>! journaling-ch data) 
               ; pass events to business logic
               (>! logic-ch {:data data :user-id user-id})) 
             (catch Exception e (send! (@conn-hub user-id) "Bad request"))))

;journaler loop
(go-loop []
         (let [req (<! journaling-ch)]
           ;replace with logging to a file
           (println req) 
;business logic loop
(go-loop [state {}]
         (when-let [{:keys [data user-id]} (<! logic-ch)]
           (let [logic-out (dummy-business-logic state data user-id)
                 new-state (:state logic-out)
                 events (:events logic-out)]
             ;if there are any resulting events pass them to marshaller
             (when events (>! marshal-ch events)) 
             ;repeat loop with a new state
             (recur new-state)))) 

;marshaller loop
(go-loop []
         (let [e (<! marshal-ch)]
           ;get user-id from resulting event
           (let [user-id (:broadcast e) 
                 ;get rid of broadcasting information and form a message
                 msg (dissoc e :broadcast)] 
             ;get channel associated to user-id
             (if-let [ch (@conn-hub user-id)] 
               (send! ch (pr-str msg)) ;send response
               (println "User " user-id " no longer connected"))))

(defn handler
  "Websocket connection handler"
  (with-channel request httpkit-ch
    (let [user-id 1]
      (swap! conn-hub assoc user-id httpkit-ch)
      (send! httpkit-ch (str "Hi " user-id))

      (on-close httpkit-ch (fn [status]
                             (swap! conn-hub dissoc user-id)))

      (on-receive httpkit-ch (fn[raw] (go (>! unmarshal-ch {:body raw :user-id user-id})))))))

(defn start []
  (run-server handler {:port 9090})
  (println (str "Server started. listen at")))


In the code above there are four channels that serve as pipes between threads. For each channel, a dedicated go-loop continuously checks if there is a new data available for processing. In this implementation I chose to first unmarshal the data and send it to journaler as Clojure data structure – this differs from diagram in the last post. Business logic is trivial as it doesn’t do anything apart from returning a well formed value containing dummy event and state with just one field incremented (change-id).

All the client connection channels are stored in conn-hub atom that is accessible from all the threads. For extra performance it’s better to avoid accessing this atom from more than one thread. In practice it’s hard to achieve it as both the HTTP handler and marshaller need to do something with HTTP channel. One solution might be to send HTTP channel reference as part of a message between threads and don’t store it anywhere. It might get tricky though if access to more than one HTTP channel is required to broadcast a message to many clients.

My impressions

I created a server that uses code above with a bit more complex business logic function. It handles 500000 (half million) concurrent connections while running on my local machine. It is only a beginning of my adventure with event sourcing but I must say I am pretty excited by ease of development and testing it enables. The performance gains make the whole exercise even more rewarding.

I need to explore possible application of event sourcing more thoroughly. I think it should do well in applications that are usually implemented with standard CRUD stack. Sounds like another idea to try out!

Leave a Reply

Your email address will not be published.