Market Data Pipeline: Virtual Threads Capstone
"The system that would not be watched, would not be trusted."
-- Gary Stevenson
Stevenson was talking about financial markets, but the same principle applies to the software that processes them. A pipeline you can't observe, can't compose, and can't reason about is one you'll never trust in production. That's why this capstone example doesn't just work; it's built so you can see exactly what each stage does, swap any piece out, and know that failures will not hide.
- How to build a complete, concurrent data pipeline from feed generation to alert dispatch
- Progressive introduction of fourteen HKJ features, each solving a concrete problem
- Design principles that make the pipeline composable, safe, and resilient
- When and why to choose each concurrency primitive
./gradlew :hkj-examples:run \
-PmainClass=org.higherkindedj.example.market.runner.MarketDataDemo
The Scenario
It's Monday morning. You're the lead engineer on the market data team at a mid-size trading firm. Three exchange feeds (NYSE, LSE, and the Tokyo Stock Exchange) push tens of thousands of price ticks per second into your system. Each tick is raw: just a symbol, a bid, an ask, a volume, and a timestamp. Before anyone can act on this data, your pipeline must:
- Merge the feeds into a single stream in true arrival order
- Enrich each tick with instrument metadata and an FX conversion rate
- Assess risk: flag ticks with anomalous spreads, volume spikes, or inversions
- Aggregate into time windows, computing VWAP and other summary statistics
- Detect anomalies: compare aggregated views against thresholds
- Dispatch alerts to multiple channels simultaneously (log, email, webhook)
- Protect the pipeline with rate limiting, circuit breakers, and feed failover
The legacy system handles this with a tangle of thread pools, CompletableFuture chains,
manual try-catch propagation, and synchronized blocks that nobody dares refactor.
You've been asked to rebuild it.
Here's what the new pipeline looks like with HKJ:
List<Alert> alerts = pipeline.fullPipeline().toList().run();
One line to run it. The rest is composition.
Architecture Overview
Each stage is a lazy stream transformation. Nothing executes until the terminal
.toList().run() drives evaluation from the end. Data flows left-to-right through
composable operators, each one a single method call.
What Makes This Different
| Concern | Legacy Approach | HKJ Approach |
|---|---|---|
| Concurrency | Thread pools, ExecutorService, manual task submission | Virtual threads via Par.map2, parEvalMap, Scope |
| Error handling | Nested try-catch, CompletableFuture.exceptionally | Fail-fast propagation, recoverWith for failover |
| Backpressure | Reactive streams protocol (request(n)) | Pull-based: consumer drives evaluation naturally |
| Resource cleanup | finally blocks, easy to miss | Structured concurrency: scope cancels all children |
| Composition | Callback chains, hard to reorder stages | Each stage is a function VStream<A> → VStream<B> |
Design Principles
These four principles guide every design decision in the pipeline. Each step in the walkthrough calls back to them.
Lazy Composition
Every operator returns a description of work, not a result. The pipeline definition
allocates no threads, opens no connections, and generates no ticks. Execution only begins
when a terminal operation like .toList().run() starts pulling.
This means you can build, test, and compose pipeline fragments independently. Assemble the full pipeline from smaller pieces that each work in isolation.
Bounded Concurrency Without Backpressure
VStream is pull-based: the consumer drives evaluation by requesting elements. The parallel
operations add bounded concurrency on top of this pull model. At most concurrency elements
are in flight at any time. Because virtual threads block cheaply, no explicit backpressure
protocol is needed; if the consumer is slow, the producer simply blocks until the consumer
pulls the next element.
Structured Concurrency Throughout
All parallelism in the pipeline uses StructuredTaskScope:
Par.map2forks two tasks within a scopeparEvalMapforks a batch of tasks within a scopeScope.allSucceedforks alert channels within a scopeVStreamPar.mergeforks one consumer per source within a scope
If any task fails, the scope cancels all remaining tasks. No orphaned threads, no leaked resources.
Fail-Fast Error Handling
Errors propagate immediately at every level:
- If any enrichment lookup fails, the entire batch fails
- If any risk calculation throws, the pipeline surfaces the error
- If any alert channel fails, the dispatch for that alert fails
- If any source stream errors during merge, the merged stream fails
No silent failures, no swallowed exceptions. Recovery is explicit (recoverWith), not
accidental.
The Domain Model
The pipeline operates on a progression of record types. Each stage adds a layer of information:
// Raw tick from an exchange
record PriceTick(Symbol symbol, Price bid, Price ask, Volume volume,
Exchange exchange, Instant timestamp) {
Price mid() { ... } // (bid + ask) / 2
Price spread() { ... } // ask - bid
}
// Tick enriched with instrument metadata and FX rate
record EnrichedTick(PriceTick tick, Instrument instrument, BigDecimal fxRate) {
Price midInUsd() { ... } // mid * fxRate
}
// Tick with risk assessment attached
record RiskAssessment(EnrichedTick tick, double riskScore, List<String> flags) {
boolean isHighRisk() { return riskScore > 0.7; }
}
// Aggregate view over a window of ticks
record AggregatedView(Symbol symbol, Price vwap, Price bestBid, Price bestAsk,
Volume totalVolume, int tickCount, double maxRiskScore) {
Price spread() { return bestAsk.minus(bestBid); }
}
// Alert raised when an anomaly is detected
record Alert(Symbol symbol, Severity severity, String message, Instant timestamp) {
enum Severity { CRITICAL, WARNING, INFO }
}
Each record is immutable. Transformation flows forward only; no stage mutates a previous stage's output.
HKJ Feature Map
Every HKJ feature used in the pipeline, mapped to the stage where it appears:
| Stage | HKJ Feature | What It Does | Page |
|---|---|---|---|
| Generate | VStream.unfold | Stateful infinite stream from a seed | Building |
| Merge | VStreamPar.merge | Concurrent multi-source merging | Building |
| Enrich | Par.map2 | Run two VTasks in parallel, combine | Building |
| Enrich | VStreamPar.parEvalMap | Bounded parallel map, order preserved | Building |
| Risk | parEvalMapUnordered | Bounded parallel map, completion order | Building |
| Window | VStream.chunk | Group elements into fixed-size batches | Building |
| Aggregate | VStream.map | Transform each chunk into a summary | Building |
| Detect | VStream.flatMap | Each element produces zero or more results | Alerts |
| Dispatch | VStream.mapTask | Apply effectful function per element | Alerts |
| Dispatch | Scope.allSucceed | Fork/join, all channels must succeed | Alerts |
| Throttle | VStreamThrottle.throttle | Rate-limit emissions | Alerts |
| Recover | VStream.recoverWith | Switch to fallback stream on failure | Alerts |
| Protect | CircuitBreaker | Trip open after repeated failures | Alerts |
| Limit | VStream.take | Safety valve: caps total elements | Building |
Chapter Contents
- Building the Pipeline: Steps 1–5, covering tick generation, feed merging, concurrent enrichment, risk assessment, and windowed aggregation
- Alerts and Resilience: Steps 6–9, covering anomaly detection, multi-channel dispatch, rate limiting, circuit breakers, and the full end-to-end pipeline
- Quick Reference: Feature summary, source file index, and running instructions
Next: Building the Pipeline →