Every few weeks something lands in the JarvisCore community that is worth stopping to look at properly. This is one of those.
A team building financial intelligence tooling shared their implementation of a three-agent daily market briefing pipeline. It is not a toy example. It exercises crash recovery, sequential dependency management, multi-tier memory, blob persistence, and episodic logging — all in a single file, all in under 350 lines. If you have been wondering what a real JarvisCore workflow looks like end-to-end, this is worth reading.
What They Built
The pipeline runs three AutoAgents in sequence: MarketDataAgent fetches and structures synthetic OHLCV data and news sentiment scores for a set of tickers. AnalysisAgent reads that output and computes momentum classifications, signal scores, and alerts. ReportAgent writes an executive Markdown briefing from both previous outputs and saves it to blob storage.
The workflow is declared explicitly with depends_on to enforce sequencing:
results = await mesh.workflow(WORKFLOW_ID, [
{
"id": "fetch",
"agent": "market_data",
"task": "Generate synthetic daily market data for AAPL, MSFT, NVDA...",
},
{
"id": "analyse",
"agent": "analyst",
"task": "Analyse the market data from the fetch step...",
"depends_on": ["fetch"],
},
{
"id": "report",
"agent": "reporter",
"task": "Write an executive Markdown market briefing...",
"depends_on": ["analyse"],
},
])
Each step receives the previous step's output through the workflow context automatically. The analysis agent reads market data without being explicitly passed it. The report agent reads both market data and signals without being explicitly passed either. The workflow engine handles that.
The Detail That Made It Resilient
The pipeline uses a deterministic workflow_id:
WORKFLOW_ID = "financial-daily-001"
This is not cosmetic. When a workflow runs with a fixed ID, the WorkflowEngine stores each step's output in Redis keyed to that ID. If the process crashes mid-run — between analyse and report, for example — restarting it with the same ID picks up from the last completed step rather than starting over. Fetch and analyse are not re-executed. The team said this was the feature that made them confident enough to run it unattended overnight.
How Each Agent Handles Memory
Every agent initialises its own UnifiedMemory instance in setup(), using the stores that the Mesh injects automatically before setup is called:
async def setup(self):
await super().setup()
self.memory = UnifiedMemory(
workflow_id=WORKFLOW_ID,
step_id="fetch",
agent_id=self.role,
redis_store=self._redis_store,
blob_storage=self._blob_storage,
)
The team did not configure the stores themselves. self._redis_store and self._blob_storage arrive already connected because the Mesh injected them at start time. Their setup() just wires them into a UnifiedMemory instance and logs which tiers came online.
At the end of the run, the report agent writes a completion event to the episodic ledger:
await report_agent.memory.episodic.append({
"event": "pipeline_completed",
"workflow_id": WORKFLOW_ID,
"steps_completed": len([r for r in results if r.get("status") == "success"]),
})
The final Markdown briefing is saved to blob storage at reports/financial-daily-001.md — one file per run, named by workflow ID.
What They Said About It
The team shared a few observations after running it in their environment. The one that stood out: they expected the multi-step context passing to require explicit wiring between agents. It did not. The workflow engine propagates outputs through context automatically, so the analysis agent received structured market data without them writing any routing code. They described this as the biggest reduction in boilerplate compared to their previous setup.
They also noted that the capabilities list on each agent — ["market_data", "data_collection", "finance"] on the market data agent, ["analysis", "technical_analysis", "finance"] on the analyst — is not required for this single-process workflow. They kept it because they are planning to extend the pipeline to a distributed setup where agents on different nodes will need to discover each other by capability.
Try It
The full example is in the JarvisCore repository at examples/financial_pipeline.py. Redis is required for crash recovery and episodic logging. Without it the pipeline still runs, memory degrades gracefully to in-process only, and the workflow completes without persistence.
docker compose -f docker-compose.infra.yml up -d
cp .env.example .env # set your LLM API key
pip install -e ".[redis]"
python examples/financial_pipeline.py
If you are building something on JarvisCore that you think belongs here, open a discussion in the GitHub Discussions.