clojurecore.async

Clojure: Polling database periodically - core.async w/ timeout channel VS vanilla recursive Thread w/ sleep?


I have a Ring-based server which has an atom for storing application state which is periodically fetched from database every 10 seconds for frequently changing info and every 60 seconds for the rest.

(defn set-world-update-interval
  [f time-in-ms]
  (let [stop (async/chan)]
    (async/go-loop []
      (async/alt!
        (async/timeout time-in-ms) (do (async/<! (async/thread (f)))
                                       (recur))
        stop :stop))
    stop))

(mount/defstate world-listener
  :start (set-world-update-interval #(do (println "Checking data in db") (reset! world-atom (fetch-world-data)) ) 10000)
  :stop (async/close! world-listener))

It works pretty good. RAM usage is pretty stable. But I'm wondering if this is an improper use of core.async?

Perhaps it should be a regular Thread instead like this?

(doto (Thread. (fn []
                 (loop []
                   (Thread/sleep 1000)
                   (println "Checking data in db")
                   (reset! world-atom (fetch-world-data))
                   (recur))))
  (.setUncaughtExceptionHandler
    (reify Thread$UncaughtExceptionHandler
      (uncaughtException [this thread exception]
        (println "Cleaning up!"))))
  (.start))

Solution

  • While there's nothing wrong with your core.async implementation of this pattern, I'd suggest using a java.util.concurrent.ScheduledExecutorService for this. It gives you precise control over the thread pool and the scheduling.

    Try something like this:

    (ns your-app.world
      (:require [clojure.tools.logging :as log]
                [mount.core :as mount])
      (:import
        (java.util.concurrent Executors ScheduledExecutorService ThreadFactory TimeUnit)))
    
    (defn ^ThreadFactory create-thread-factory
      [thread-name-prefix]
      (let [thread-number (atom 0)]
        (reify ThreadFactory
          (newThread [_ runnable]
            (Thread. runnable (str thread-name-prefix "-" (swap! thread-number inc)))))))
    
    (defn ^ScheduledExecutorService create-single-thread-scheduled-executor
      [thread-name-prefix]
      (let [thread-factory (create-thread-factory thread-name-prefix)]
        (Executors/newSingleThreadScheduledExecutor thread-factory)))
    
    (defn schedule
      [executor runnable interval unit]
      (.scheduleWithFixedDelay executor runnable 0 interval unit))
    
    (defn shutdown-executor
      "Industrial-strength executor shutdown, modify/simplify according to need."
      [^ScheduledExecutorService executor]
      (if (.isShutdown executor)
        (log/info "Executor already shut down")
        (do
          (log/info "Shutting down executor")
          (.shutdown executor)                                  ;; Disable new tasks from being scheduled
          (try
            ;; Wait a while for currently running tasks to finish
            (if-not (.awaitTermination executor 10 TimeUnit/SECONDS)
              (do
                (.shutdownNow executor)                         ;; Cancel currently running tasks
                (log/info "Still waiting to shut down executor. Sending interrupt to tasks.")
                ;; Wait a while for tasks to respond to being cancelled
                (when-not (.awaitTermination executor 10 TimeUnit/SECONDS)
                  (throw (ex-info "Executor could not be shut down" {}))))
              (log/info "Executor shutdown completed"))
            (catch InterruptedException _
              (log/info "Interrupted while shutting down. Sending interrupt to tasks.")
              ;; Re-cancel if current thread also interrupted
              (.shutdownNow executor)
              ;; Preserve interrupt status
              (.interrupt (Thread/currentThread)))))))
    
    (defn world-updating-fn
      []
      (log/info "Updating world atom")
      ;; Do your thing here
      )
    
    (mount/defstate world-listener
      :start (doto (create-single-thread-scheduled-executor "world-listener")
               (schedule world-updating-fn 10 TimeUnit/MINUTES))
      :stop (shutdown-executor world-listener))