"Fork-Join in Papaline"

Papaline 0.3 introduced a new model "fork-join" for task execution. It allows you to split a task into smaller units, and execute them in parallel.

Before that, a task is processed as a single unit from the first stage to the second, the third and the last. Within a stage, all computing is done in a single thread.

linear execution

This model has limitation that you are required to execute any of your stage in serial. If your task has a few split-able units, it's always better to run them in parallel. Here we have (fork) command for the situation.

For example, you are using the fanout-on-write model to build an activity stream. Once a user posted a new status, you need to find all followers(stage 1) of that user and append the status to their timeline(stage 2).

In previous version of papaline, these two stages are:

(defn find-followers [id msg]
  (let [followers (query-db-for-followers id)]
    [followers msg]))

(defn fanout-to-user-timeline [user-ids msg]
  (doseq [user-id user-ids]
    (write-redis-list user-id msg)))

In the second task, the msg is appended to user's timeline one by one.

Using (fork), the fanout-to-user-timeline can be executed in parallel.

(defn find-followers [id msg]
  (let [followers (query-db-for-followers id)]
    (fork (map #(vector % msg) followers))))

(defn fanout-to-user-timeline [user-ids msg]
  (write-redis-list user-id msg))

After the find-followers function, the result will be splitted into (count followers) parts and sent into input channel of stage 2. So the tasks execution will be like:

forked execution

To collect the results of all forked sub-tasks, you can use (join). If the return value is wrapped with join, it won't trigger next stage immediately but to wait all forked tasks to finish.

join

So with (fork) and (join), it's very flexible to change execution model in Papaline. Internally, I use clojure's metadata to add flags for the return value, without ruining the non-invasive design of Papaline.