Hash λ Bye

Haskell, Clojure, MLなどの話

duct-frameworkに定時起動ジョブを仕込む

ただのtips。みんな気付いているけどショボすぎて言わないことだ。

ductをREPLから起動・停止くらいはできる人に向けている。 それも解らないよーという人は、なんというか、なんでこの記事を読もうと思ったのか。 そういう人はこちらを読むと良いと思う。

ClojureのDuctでWeb API開発してみた

結論

chimeをintegrantのライフサイクルに組み込むことでバックグラウンド処理するジョブが定義できるよという話。

構成要素

  • duct (core 0.6.2)
  • integrant
  • chime 0.2.2

やりかた

config.edn

まずは設定を書いておく

(抜粋)

 :example.job/channel {}
 :example.job/hello {:ch #ig/ref :example.job/channel}

:example.job/channel はジョブ起動の通知を送ってくれるchannel (core.asyncのchan) を保持する。

:example.job/hello は channel の通知を受け取って起動されるバックグラウンド処理そのもの。 ジョブの起動には通知をくれる channel が必要なので ig/ref で解決している。

clojure

(ns example.job
  (:require [integrant.core :as ig]
            [clojure.core.async :as a]
            [chime :refer [chime-ch]]
            [clj-time.core :as time]
            [clj-time.periodic :as periodic]))

(defmethod ig/init-key :example.job/channel [_ _]
  (chime-ch (periodic/periodic-seq (time/now) (time/seconds 5))
            {:ch (a/chan (a/dropping-buffer 1))}))

(defmethod ig/halt-key! :example.job/channel [_ ch]
  (a/close! ch))

(defmethod ig/init-key :example.job/hello [_ {:keys [ch]}]
  (a/go-loop []
    (when-let [time (a/<! ch)]
      (prn "Hello world at " time)
      (recur))))

少しかいつまんで解説。

channelの初期化

(defmethod ig/init-key :example.job/channel [_ _]
  (chime-ch (periodic/periodic-seq (time/now) (time/seconds 5))
            {:ch (a/chan (a/dropping-buffer 1))}))

chime-ch 指定された時刻にタイムスタンプデータを送信するchannelを生成する。

clj-time/periodicperiodic/periodic-seq を使ってchannelからメッセージを受け取る時刻の無限ストリームを取得する。 (個人的に時間の経過を無限ストリームで表現する設計大好き)

{:ch (a/chan (a/dropping-buffer 1))} ここは chime-ch で生成される channel のバッファに長さと廃棄ポリシを設定している。

channelの破棄

(defmethod ig/halt-key! :example.job/channel [_ ch]
  (a/close! ch))

channel を閉じるだけ。 channel からメッセージを受け取れなくなった軽量プロセスはparking状態のまま何もしなくなるので、実質的にバックグラウンドジョブは停止している。

channelを使ったジョブ

(defmethod ig/init-key :example.job/hello [_ {:keys [ch]}]
  (a/go-loop []
    (when-let [time (a/<! ch)]
      (prn "Hello world at " time)
      (recur))))

when-let を使って channel からメッセージが届く時のみ印字処理を行って次のメッセージがくるまで待機するようにループさせている。

Result

ductなのでREPLから (go) と入力するとサーバの起動とともにREPLに5秒間隔でHello worldとぬかしてくるジョブが起動する。

:initiated
"Hello world at " #object[org.joda.time.DateTime 0x86ddf48 "2018-04-15T07:49:08.518Z"]
"Hello world at " #object[org.joda.time.DateTime 0x7f4e3d0e "2018-04-15T07:49:23.518Z"]
"Hello world at " #object[org.joda.time.DateTime 0x6aa2035 "2018-04-15T07:49:28.518Z"]
"Hello world at " #object[org.joda.time.DateTime 0x549f285d "2018-04-15T07:49:33.518Z"

Web APIとバッチジョブ

モチベーションの話。

僕がやりたかったのは、Web API経由で操作可能なWebクローラみたいなもの。 クローラが巡回しにいくエンドポイントをCRUDで操作できる的なやつ。

ただ、Web APIとバッチジョブっていう組み合わせは 応用が効く ような気がしている。 ふつーの典型的なバッチジョブを作るにしても、ジョブの実行時間や結果の履歴などの統計情報をWeb API経由で取れるようにしておくと、他のAdmin向けサービスの統合がしやすくなったりする。

運用系APIがあるというのは僕みたいなインフラ屋からするととてもポイント高い。

(脱線) Finagleという実例

バックグラウンドジョブとは少し違うけど、僕がとても感心したのはfinagleというScala向けマイクロサービスフレームワーク

このfinagleはマイクロサービスのエンドポイントを作るたくさんのAPIを提供している。なかでも面白いのがサービスエンドポイントの統計情報もとれる仕組みが標準で提供されていること。

Metrics

ユーザのリクエストで起動するにせよ、定時起動するにせよ、サービスの稼働状況を抽出できるAPIを備えていると、デプロイ後のフィードバックが得やすくなる。 これはサービスの改善点を定量的に探ったり問題を検知したりするのに役立つ。

バッチジョブ作る人へ

こんな風にジョブの統計情報を外部にexposeできるような仕組みを仕込んでおくと、サービスが成長した時にありがたがられること間違いなし。

Web APIとバッチジョブは割といい組み合わせだと再認識した。

小さなバッチジョブを作ろうと思ったらまずは

lein new duct {プロジェクト名} +api +ataraxy

と打ってほしい。