Back to BlogMy Three‑Phase Parallel Orchestrator: Typed Results, Exception‑Proof Phases, and a Rollout That Never Flaps
pythonorchestrationvoicelatencyreliability

My Three‑Phase Parallel Orchestrator: Typed Results, Exception‑Proof Phases, and a Rollout That Never Flaps

Daniel Anthony Romitelli Jr. · March 11, 2026

Listen to this article
0:00
0:00

I noticed it the first time I watched a voice session stall: the user had already finished speaking, but the system was still “thinking” like it was reading a novel line-by-line.

That was the moment I stopped treating orchestration like a chain of calls and started treating it like a compiler.

A voice-first intelligence assistant doesn’t get to be precious. It can’t crash because one sub-agent threw. It can’t block because one call got slow. And it definitely can’t depend on loosely-typed blobs passed around like notes on a sticky pad.

So I built a three-phase parallel orchestrator with typed intermediate results, strict latency budgets, and a fallback cascade. It replaced a ~3,500ms linear pipeline with a 600ms parallel architecture by splitting execution into phases that behave like compilation stages: each phase produces a well-defined intermediate representation, and the next phase consumes it without re-doing work.

The key insight: orchestration is a compilation pipeline

The non-obvious part of “multi-agent” work isn’t agents talking to each other.

The non-obvious part is that you need typed intermediate results so downstream stages can be fast, deterministic, and resilient. If Phase 2 has to re-interpret raw text because Phase 1 returned an unstructured blob (or nothing at all), you’ve already lost your latency budget.

In my design, Phase 1 runs three independent analyses in parallel:

  • Query parsing → intent + entities + filters
  • Embedding generation → vector for semantic search
  • Location extraction → geo filter expression

Each one produces a typed dataclass. That typed shape is the “IR” (intermediate representation) that Phase 2 consumes.

Per-phase budgets are explicit:

  • Phase 1 total: <150ms (parallel execution)
  • Phase 2: <300ms (search with pre-computed inputs)
  • Phase 3: <100ms (response formatting)
  • Total P95 target: <600ms (down from ~3,500ms in the linear pipeline)

Those numbers aren’t motivational quotes. They’re the shape of the system.

Phase 1 contracts: the typed intermediate results

When I say “typed contracts,” I mean I literally want Phase 1 to hand Phase 2 a stable object with defaults that make sense when something fails.

These are the three results I pass forward.

from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional

@dataclass
class QueryParserResult:
    intent: str
    entities: Dict[str, Any] = field(default_factory=dict)
    filters: Dict[str, Any] = field(default_factory=dict)
    confidence: float = 0.0
    raw_query: str = ""
    processing_time_ms: float = 0.0

@dataclass
class EmbeddingResult:
    embedding: List[float] = field(default_factory=list)
    cache_hit: bool = False
    model: str = "text-embedding-ada-002"
    dimensions: int = 1536
    processing_time_ms: float = 0.0

@dataclass
class LocationResult:
    state_code: Optional[str] = None
    state_name: Optional[str] = None
    city: Optional[str] = None
    filter_expression: Optional[str] = None
    cache_hit: bool = False
    processing_time_ms: float = 0.0

I like how boring these are.

That’s the point. When something goes wrong in a voice pipeline, “boring defaults” beat “clever exceptions” every time.

Phase 1 execution: parallel, exception-tolerant, and still typed

The naive approach is to run the three Phase 1 steps sequentially. That’s how you end up with “it works on my laptop” latency.

The second naive approach is to parallelize them, but let one exception tear down the whole gather. That’s how you end up with a fast system that’s down.

What I actually do is run them concurrently and treat exceptions as data. If an agent fails, I return a default instance of the expected dataclass and keep moving.

import asyncio
import logging

logger = logging.getLogger(__name__)

async def _execute_phase1(self, query, context, metrics):
    parser_task = self._run_query_parser(query, context)
    embedding_task = self._run_embedding_agent(query, context)
    location_task = self._run_location_agent(query, context)

    results = await asyncio.gather(
        parser_task, embedding_task, location_task,
        return_exceptions=True,
    )

    parser_result = self._handle_agent_result(results[0], QueryParserResult)
    embedding_result = self._handle_agent_result(results[1], EmbeddingResult)
    location_result = self._handle_agent_result(results[2], LocationResult)
    return parser_result, embedding_result, location_result

def _handle_agent_result(self, result, expected_type):
    if isinstance(result, Exception):
        logger.warning(f"Agent returned exception: {result}")
        return expected_type()  # Return default dataclass
    if isinstance(result, expected_type):
        return result
    return expected_type()

The thing that surprised me the first time I ran this wasn’t the latency win—it was how much calmer everything downstream became once failures stopped being “special.” A failure just turns into “empty embedding” or “no location filter,” which Phase 2 already knows how to handle.

Phase 2: search consumes precomputed inputs (no re-derivation)

Phase 2 is where most systems accidentally light money and time on fire.

If you don’t treat Phase 1 outputs as real inputs, you’ll end up recomputing or re-parsing inside search. That’s how you get a pipeline that looks modular but behaves like spaghetti.

My Phase 2 takes all three Phase 1 results and builds a single dictionary of optimized search parameters.

async def _execute_phase2(self, query, parser_result, embedding_result, location_result, user_id, context):
    search_params = {
        "query": query,
        "embedding": embedding_result.embedding,
        "location_filter": location_result.filter_expression,
        "state_code": location_result.state_code,
        "filters": parser_result.filters,
        "entities": parser_result.entities,
        "user_id": user_id,
        "limit": context.get("limit", 10),
    }
    result = await self._run_search_orchestrator(search_params, context)
    return result

I’m opinionated about this: Phase 2 should feel like calling a compiler backend. It gets a structured input bundle and produces results. It should not be doing “just one more quick parse.”

Phase 3: response formatting (and why it’s a phase, not a helper)

Phase 3 exists because formatting is not a footnote in voice.

Voice output has different constraints than web output, and if you let formatting creep into Phase 2 you’ll destroy both your latency accounting and your ability to evolve the system safely.

Here’s the actual implementation:

async def _execute_phase3(
    self,
    candidates: List[Dict[str, Any]],
    parser_result: QueryParserResult,
    user_id: str,
    context: Dict[str, Any],
) -> List[Dict[str, Any]]:
    start = time.time()

    try:
        formatted = await self._run_response_agent(
            candidates=candidates,
            intent=parser_result.intent,
            user_id=user_id,
            context=context,
        )

        elapsed = (time.time() - start) * 1000
        logger.debug(f"Response formatting took {elapsed:.0f}ms")

        return formatted

    except Exception as e:
        logger.exception("Phase 3 formatting failed")
        return candidates  # Return unformatted as fallback

The ResponseAgent that does the real work has four steps: apply a 30-day exclusion window (candidates shown recently get filtered), format each candidate into a typed card with action buttons, track the delivery for future exclusion, and generate a voice-friendly summary.

async def format_response(self, results, conversation_id=None, user_id=None,
                          preview_mode=False, apply_exclusion=True, intent=None):
    candidates = results.candidates
    excluded_count = 0

    # Step 1: 30-day exclusion window
    if apply_exclusion and user_id and self._db_manager:
        candidates, excluded_count = await self._apply_exclusion_window(
            candidates=candidates, user_email=user_id, conversation_id=conversation_id)

    # Step 2: Format candidate cards with action buttons
    formatted_cards = []
    for candidate in candidates:
        card = await self._format_single_card(candidate, include_actions=True)
        formatted_cards.append(card)

    # Step 3: Track delivery (non-blocking — failure doesn't break response)
    delivery_id = None
    if not preview_mode and conversation_id and self._delivery_tracker:
        try:
            delivery_id = await self._track_delivery(
                conversation_id=conversation_id, candidates=candidates, user_id=user_id)
        except Exception as e:
            logger.warning(f"Delivery tracking failed (non-blocking): {e}")

    # Step 4: Generate voice summary
    voice_summary = await self._generate_voice_summary(
        total_count=len(formatted_cards),
        filters=results.filters_applied,
        top_candidates=[c.full_name for c in formatted_cards[:3] if c.full_name])

    return VaultChatResponse(
        candidates=formatted_cards, total_count=len(formatted_cards),
        excluded_count=excluded_count, conversation_id=conversation_id,
        delivery_id=delivery_id, voice_summary=voice_summary, intent=intent or {})

The important part is the boundary: Phase 2 returns “what we found,” Phase 3 returns “what we say.” And the fallback in _execute_phase3 means if formatting explodes, the pipeline still returns raw candidates instead of nothing.

The fallback cascade: every runner degrades instead of crashing

Most orchestration failures I’ve seen in the wild aren’t “the model was wrong.” They’re operational: timeouts, transient API issues, unexpected response shapes.

So every agent runner follows the same pattern:

  1. Try the real agent
  2. If it errors or returns nothing usable, fall back to a simpler implementation

Here’s the real pattern for query parsing.

import time
import re

async def _run_query_parser(self, query, context):
    start = time.time()
    try:
        if self._query_parser_agent:
            response = await self._query_parser_agent.process(query, context)
            if hasattr(response, 'result') and response.result:
                return QueryParserResult(
                    intent=response.result.get("intent", "search"),
                    entities=response.result.get("entities", {}),
                    filters=response.result.get("filters", {}),
                    confidence=response.result.get("confidence", 0.7),
                    raw_query=query,
                    processing_time_ms=(time.time() - start) * 1000,
                )
        return await self._fallback_query_parser(query, start)
    except Exception as e:
        logger.warning(f"QueryParser error, using fallback: {e}")
        return await self._fallback_query_parser(query, start)

I like this structure because it’s brutally readable under pressure. When I’m debugging a production incident, I don’t want a clever abstraction. I want to see the exact decision points.

The query parser fallback: regex, deterministic, no dependency on “smart”

The fallback parser is intentionally simple: regex-based pattern matching, no LLM call.

That means when the “smart” path fails, the system still produces an intent and keeps the rest of the pipeline moving.

import re

async def _fallback_query_parser(self, query, start):
    query_lower = query.lower()
    if re.search(r'\bhow\s+many\b', query_lower): intent = "count"
    elif re.search(r'\b(find|search|show|get|list)\b', query_lower): intent = "search"
    elif re.search(r'\b(filter|with|having)\b', query_lower): intent = "filter"
    else: intent = "search"
    # ... extract designations, experience, remote from regex
    return QueryParserResult(intent=intent, confidence=0.7, ...)

The part I had to learn the hard way: fallbacks aren’t a “backup plan,” they’re part of the primary design. A voice system that only works when everything is healthy is a demo, not a product.

Metrics: timing every phase like it owes you money

If you want to hit a <600ms P95 target, you don’t get to measure “total time” and call it observability.

I track per-phase timings (and a few key booleans) in a dedicated metrics dataclass.

from dataclasses import dataclass

@dataclass
class OrchestratorMetrics:
    phase1_query_parser_ms: float = 0.0
    phase1_embedding_ms: float = 0.0
    phase1_location_ms: float = 0.0
    phase1_total_ms: float = 0.0
    phase2_search_ms: float = 0.0
    phase3_response_ms: float = 0.0
    total_ms: float = 0.0
    embedding_cache_hit: bool = False
    location_cache_hit: bool = False
    candidates_found: int = 0
    search_tier: str = "primary"

What I like about this shape is that it matches the architecture. When a phase goes long, it’s obvious which one. When we’re “fast but dumb,” cache hit flags and search tier tell me why.

Rollout: consistent user assignment via hash (no flapping)

A parallel orchestrator is an architectural change, not a refactor. Shipping it behind a gate is how you avoid learning about edge cases from your entire user base at once.

The rollout logic uses a feature flag and a percentage, but the key is consistent assignment: if a user is “in,” they stay in.

import hashlib
import random

def should_use_multi_agent_orchestrator(user_id=None):
    if not USE_MULTI_AGENT_ORCHESTRATOR: return False
    if MULTI_AGENT_ROLLOUT_PERCENTAGE >= 100: return True
    if MULTI_AGENT_ROLLOUT_PERCENTAGE <= 0: return False
    if user_id:
        user_hash = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
        return (user_hash % 100) < MULTI_AGENT_ROLLOUT_PERCENTAGE
    return random.randint(0, 99) < MULTI_AGENT_ROLLOUT_PERCENTAGE

I’ve shipped enough systems to be allergic to rollout “dice rolls.” The md5-based bucketing is simple, stable, and it keeps experiments honest.

Architecture, end-to-end

Here’s the whole dataflow, including the error handling paths that make this interesting:

Every failure degrades capability, never availability.

Nuances: why this works when “agent soup” doesn’t

A lot of multi-agent systems fail for one of two reasons:

1) Untyped messages create hidden coupling

If Phase 1 returns an unstructured dict and Phase 2 expects a certain key, you’ve built a runtime landmine. The failure mode isn’t “it returns worse results.” The failure mode is “it crashes in production when an upstream shape shifts.”

Typed dataclasses with defaults force a discipline: every phase can assume the object exists, and the “missingness” is explicit (empty embedding, no filter expression, low confidence).

2) Exceptions are treated as exceptional

In voice, exceptions are normal. Networks wobble. Dependencies degrade. That’s why I run Phase 1 with return_exceptions=True and immediately normalize results back into typed objects.

The orchestrator never blocks on a failed agent; it degrades to fallback and keeps moving.

The tradeoff: graceful degradation means occasionally less-smart answers

This design chooses availability over perfection.

If the embedding path fails and returns an empty vector, Phase 2 still runs. If location extraction fails, we lose geo filtering. If parsing falls back to regex, intent may be less nuanced.

That’s not a bug. That’s the contract: the system is always available, just sometimes less smart.

And in a voice-first experience, “always answers” beats “answers perfectly, sometimes.”

I don’t think of this orchestrator as agents cooperating—I think of it as a compilation pipeline that never panics, because every stage produces a typed artifact and every failure has a deterministic shape.