Exploring core.async.flow as an Agent Executor
Using core.async.flow as an execution substrate for a graph-based AI agent runtime: what fit, what did not, and what I learned from the attempt
I have been building an experimental graph-based AI agent runtime in Clojure called Ayatori. It is a side project where I try to stay hands-on with new ideas, in my favorite language. For Ayatori, I had in mind agents as graphs: nodes as functions, edges defining routing, and an executor tying it together.
This article is about exploring whether core.async.flow could replace the executor.
The agent model
In Ayatori, an agent is a data structure. It declares nodes, edges, capabilities, and dependencies.
{:nodes {:preprocess (fn [input] {:result {...}})
:llm {:type :llm :client ... :prompt "..."}
:search (fn [input] {:result {...}})}
:edges {:preprocess :llm
:llm {:search :search :done :ayatori/done}
:search :llm}
:caps {:chat {:entry :preprocess}}
:deps [:external-cap]}Nodes are either plain functions or typed maps for built-in behaviors like LLM invocation or fan-out. Edges are either a keyword (unconditional routing) or a map from a route key to a target node (conditional routing). Caps declare which entry points are externally callable. Deps declare which external capabilities the agent needs at runtime, resolved by the system at start time.
The executor’s job is to take this definition and run it.
What I was building by hand was a straightforward go-loop:
(defn execute [graph input]
(async/go
(loop [node (:entry graph)
data input
step 0]
(when (>= step max-steps)
(throw (ex-info "Max steps exceeded" {})))
(let [result (<! (invoke-node graph node data))]
(when (instance? Throwable result)
(throw result))
(if-let [next (resolve-route graph node result)]
(recur (:next-node next) (:input next) (inc step))
(extract-result result))))))This worked well enough for an experiment. But I kept accumulating concerns around it: What if I need to pause mid-execution? What if a node produces faster than the next can consume? What if I want to stop one node without tearing down the whole graph? Each concern meant more code in the executor.
For readers unfamiliar with core.async.flow, the overview and guide are good starting points.
Mapping to core.async.flow
Thinking about those concerns, node lifecycle, channel wiring, backpressure, routing, the problem started to feel familiar. These are the kinds of concerns a runtime handles, not application code. Then I remembered core.async.flow. I had not found the time to look at it carefully when it was first announced. Working on this executor turned out to be a good reason to go back.
core.async.flow describes itself as a library for building “concurrent, event driven data processing flows out of communication-free functions, while centralizing control, reporting, execution and error handling.” The model is a directed graph of processes communicating via channels. You define step functions that transform data. The flow manages channel creation, process lifecycle, backpressure, and error handling.
Those were the same concerns accumulating in the executor. At first glance, the agent DSL and flow’s topology model seem to map well onto each other. An agent is a directed graph of computation steps. A flow is a directed graph of processes communicating through channels. The unit of work in both is a function that takes input and produces output. The connections are explicit and declared separately from the logic. That structural similarity is what made me want to try this.
Agent → Flow graph
Node → Process
Edge → Connection
[[from :out] [to :in]]Conditional edge → Multi-output process, route key selects port
Cap entry point →
flow/injecttargetDep (cross-agent call) → IO process, blocking resolver
Fan-out node → Multi-output process with step state correlation
Graph result → Collector sink
The topology that I was managing imperatively in a go-loop would become a data structure passed to create-flow. Each node requires a step function and explicit port declarations, but the wiring and lifecycle are handled by the framework.
A simplified view of what that topology definition looks like:
{:procs {:preprocess {:proc (process #'preprocess-step)}
:llm {:proc (process #'llm-step) :args llm-config}
:search {:proc (process #'search-step)}
:collector {:proc (process (collector-step registry))}}
:conns [[:preprocess :out] [:llm :in]]
[[:llm :search] [:search :in]]
[[:search :out] [:llm :in]]
[[:llm :done] [:collector :in]]]}The nodes, edges, and routing from the Ayatori definition translate directly into procs and conns.
The mismatch
flow’s model is fire-and-forget. A caller injects a message and the graph processes it downstream. There is no built-in way to wait for a result.
Ayatori needs callers to wait for a result. This mismatch was not a surprise. Flow is a poor fit for RPC-style request paths, as Alex Miller noted on Ask Clojure. What I wanted to find out was what bridging that gap would actually cost in practice.
What I did was build request/reply semantics on top of flow using a correlation ID.
When a caller invokes a cap, a UUID is generated and stored in a registry alongside a promise channel. The message carries that ID through the entire flow. The terminal node is a dedicated collector process that delivers results directly to the caller’s channel. I initially considered using ::flow/report for result delivery, but the guide describes it as a channel for unified logging across the flow. Using it for results would be working against its purpose, so I went with the collector process instead.
;; inject side
(let [corr-id (str (random-uuid))
result-ch (async/promise-chan)]
(swap! registry assoc corr-id {:ch result-ch})
(flow/inject flow [entry-key :in] [{:data input :corr-id corr-id}])
result-ch)
;; collector step
(defn- collector-step [registry]
(flow/map->step
{:describe (fn [] {:ins {:in ""}})
:transform
(fn [state _ msg]
(when-let [{:keys [ch]} (get @registry (:corr-id msg))]
(async/put! ch (:data msg))
(swap! registry dissoc (:corr-id msg)))
[state {}])}))
;; topology - terminal outputs route to collector
{:procs {...
:collector {:proc (process (collector-step registry))}}
:conns [...
[[:llm :done] [:collector :in]]]}Error handling follows the same pattern. Processes throw exceptions with the corr-id in ex-data. A consumer on the flow’s error channel extracts the corr-id and delivers the exception back to the correct caller. Without this, errors would go to the error channel but never reach the caller waiting on the promise.
(catch Throwable e
(throw (ex-info "Node failed" {:corr-id corr-id} e)))
;; error handler
(let [corr-id (some-> err ::flow/ex ex-data :corr-id)]
(when corr-id
(deliver-result! registry corr-id (::flow/ex err))))The corr-id is a plain string. In multi-node work, the same pattern can apply across process boundaries without structural change.
This also introduces external mutable state. The registry is an atom that lives outside the flow. Flow’s own model assumes communication-free functions. The collector process breaks that assumption: it reads from and writes to the registry. This feels like forcing the model rather than working with it.
Fan-out
Fan-out requires coordinating results from multiple branches before emitting a final output. A fan-out node broadcasts to N processes and waits for all of them to respond.
flow does not yet provide a built-in way to wait for results from multiple branches. Rich Hickey has noted a planned sync->map process that will handle exactly this. For now, step state serves as a workaround: the fan-out process accumulates branch results across invocations and emits only when all have arrived.
This comes with tradeoffs. If a branch never responds, the pending entry stays in state indefinitely. There is no timeout in this implementation. For an experiment on a single node this is acceptable, but it is something to address before going further.
Each broadcast generates a fan-out ID. The step state holds a pending map keyed by that ID. When all expected results arrive, the step emits the aggregate with the original corr-id.
;; step state init
([_params] {:pending {}})
;; on input - broadcast to all branches
([state :in msg]
(let [fan-id (random-uuid)
outputs (into {} (map (fn [b] [b [{:data (:data msg) :fan-out-id fan-id}]])
branches))]
[(assoc-in state [:pending fan-id] {:expected (set branches)
:results {}
:corr-id (:corr-id msg)})
outputs]))
;; on branch result - collect and maybe emit
([state result-port msg]
(let [fan-id (:fan-out-id msg)
branch (parse-branch result-port)
updated (update-in state [:pending fan-id :results] assoc branch (:data msg))]
(if (all-branches-done? updated fan-id)
[(dissoc-in updated [:pending fan-id])
{:out [{:data (get-in updated [:pending fan-id :results])
:corr-id (get-in updated [:pending fan-id :corr-id])}]}]
[updated {}])))Streaming: the self-loop
The LLM node handles streaming responses. Tokens arrive on a channel as the provider sends them. The node needs to read from that channel continuously until the stream is done, then emit the final result.
The approach is a self-loop: the process routes output back to one of its own input ports. On each invocation, it reads one event from the stream channel and either loops back to read another or emits the final result.
;; on request arriving at :in
([state :in msg]
(let [stream-ch (llm/start-stream config (:data msg) state)]
[(assoc state :stream-ch stream-ch :corr-id (:corr-id msg))
{::self-out [{:type :next}]}]))
;; on self-loop trigger at ::self-in
([state ::self-in _]
(let [event (async/<!! (:stream-ch state))]
(case (:type event)
:delta [(update state :accumulated str (:delta event))
{::token-out [{:delta (:delta event) :corr-id (:corr-id state)}]
::self-out [{:type :next}]}]
:done [(dissoc state :stream-ch :corr-id :accumulated)
{::done-out [{:data (:message event) :corr-id (:corr-id state)}]}])))
The process routes ::self-out back to ::self-in in the flow topology. This keeps the process self-contained: it does not need a reference to the flow itself, and the loop is visible in the topology definition rather than hidden inside imperative code.
Ayatori targets Java 21+. Processes declared with :workload :io run on virtual threads. That is why a blocking read inside the self-loop is not a practical concern. If flow were used on an older JVM, this implementation would need a different approach.
The flow guide says a step need not output at all: it can receive input, update its state, and emit nothing until it is ready. I think this self-loop pattern fits that intent. That said, the blocking read is working around flow’s async model rather than with it. It holds for now, but it is worth noting.
What I found
In this branch, lifecycle management that I would otherwise write by hand comes from the framework. Pause, resume, stop, and ping work across the entire graph or per process without additional code. Backpressure is automatic. The topology is still inspectable as data before execution begins.
(aya/pause-agent! sys :assistant)
(aya/ping-agent sys :assistant)
(aya/resume-agent! sys :assistant)These things could certainly be added to the hand-written executor. But each one takes time, and none of them is what this experiment is actually about. Each one was a solved problem I was solving again. The execution model is now more explicit: the topology is a data structure, the connections are declared, and the lifecycle is managed in one place. And with core.async.flow-monitor, visualization comes for free as well:
(aya/describe-topology agent)
;; => {:procs {...} :conns [...] :entry-key :preprocess :deps #{...}}Closing
Where flow worked well in this experiment: topology as data, lifecycle management, backpressure, observability. These came for free and replaced code I would have written by hand.
Where it required bridging: request/reply semantics, fan-out coordination, streaming. Each of those is outside what flow is designed for. The correlation pattern introduces external mutable state. The fan-out workaround has no timeout. The self-loop uses a blocking read inside the step function, bypassing flow’s assumption that step functions do not interact with channels directly.
Rich Hickey’s “Effective Programs” talk distinguishes between situated programs, long-running systems with state, context, and lifecycle, and transient programs, short-lived computations that start, do work, and finish. Ayatori’s runtime is situated. But its external API is transient: invoke a cap, get a result back. Flow handles the situated side well. That tension was always going to require bridging. Whether the bridge I built here is worth maintaining is what I am still working out.
Each workaround gave me the feeling of using flow outside its intended purpose. That might still be acceptable. I have not decided. What I did learn is what those costs actually look like in practice, which is what I set out to find.
The branch is not merged. The code is here: https://github.com/serefayar/ayatori/tree/flow
If you have worked with flow in a similar context, or think I am looking at this the wrong way, I would be glad to hear it.


