Multi-Model Orchestration with MCP
Coordinating multiple AI models across different providers and domains through MCP, enabling sophisticated workflows where models collaborate and share results.
Overview
MCP enables building complex AI systems where multiple models can access shared data sources, pass results between stages, and coordinate decisions. Each model connects as an MCP client to central resources and services.
Multi-Model Pipeline Architecture
Python - Orchestrating Multiple Models
from mcp.client import Client
import asyncio
import json
from typing import Dict, Any
class ModelOrchestrator:
def __init__(self):
# Initialize clients to different MCP servers
self.nlp_client = Client("nlp-backend", transport="stdio")
self.vision_client = Client("vision-backend", transport="stdio")
self.recommendation_client = Client("recommendation-backend", transport="stdio")
self.data_client = Client("data-server", transport="stdio")
async def process_product_listing(self, product_data: Dict[str, Any]):
"""Process product through multiple models"""
# Stage 1: Extract text and categorize (NLP model)
description = product_data["description"]
nlp_result = await self.nlp_client.call_tool(
name="classify_text",
arguments={"text": description}
)
category = json.loads(nlp_result.content[0].text)
# Stage 2: Analyze product image (Vision model)
image_data = product_data.get("image_base64", "")
if image_data:
vision_result = await self.vision_client.call_tool(
name="extract_features",
arguments={"image_base64": image_data}
)
visual_features = json.loads(vision_result.content[0].text)
else:
visual_features = {}
# Stage 3: Get recommendations (Recommendation model)
rec_result = await self.recommendation_client.call_tool(
name="get_related_products",
arguments={
"category": category["class"],
"features": visual_features.get("dominant_colors", [])
}
)
recommendations = json.loads(rec_result.content[0].text)
# Stage 4: Store enriched data
enriched_data = {
"original": product_data,
"nlp_analysis": category,
"vision_analysis": visual_features,
"recommendations": recommendations
}
return enriched_data
async def main():
orchestrator = ModelOrchestrator()
product = {
"id": "prod_123",
"name": "Smart Watch",
"description": "Advanced fitness tracking wearable with heart rate monitor",
"image_base64": "..."
}
result = await orchestrator.process_product_listing(product)
print(json.dumps(result, indent=2))
asyncio.run(main())
Complex Workflow with Model Consensus
Python - Multi-Model Voting and Consensus
from mcp.client import Client
import asyncio
import json
from typing import List
from collections import Counter
class ConsensusOrchestrator:
def __init__(self):
# Multiple models for same task - ensemble approach
self.classifier_models = [
Client("classifier-model-1", transport="stdio"),
Client("classifier-model-2", transport="stdio"),
Client("classifier-model-3", transport="stdio")
]
self.sentiment_models = [
Client("sentiment-model-1", transport="stdio"),
Client("sentiment-model-2", transport="stdio")
]
async def classify_with_consensus(self, text: str, top_k: int = 3):
"""Get classification from multiple models and find consensus"""
results = []
for model_client in self.classifier_models:
try:
result = await model_client.call_tool(
name="classify",
arguments={"text": text}
)
classification = json.loads(result.content[0].text)
results.append(classification["category"])
except Exception as e:
print(f"Model failed: {e}")
if not results:
return {"error": "All models failed"}
# Find consensus
counter = Counter(results)
top_predictions = counter.most_common(top_k)
return {
"consensus": top_predictions[0][0],
"confidence": top_predictions[0][1] / len(results),
"all_votes": dict(counter),
"total_models": len(self.classifier_models)
}
async def analyze_sentiment_ensemble(self, text: str):
"""Combine sentiment analysis from multiple models"""
sentiments = []
scores = []
for model_client in self.sentiment_models:
try:
result = await model_client.call_tool(
name="analyze_sentiment",
arguments={"text": text}
)
sentiment = json.loads(result.content[0].text)
sentiments.append(sentiment["sentiment"])
scores.append(sentiment["score"])
except Exception as e:
print(f"Model error: {e}")
# Calculate ensemble metrics
avg_score = sum(scores) / len(scores) if scores else 0
sentiment_votes = Counter(sentiments)
return {
"average_score": avg_score,
"consensus_sentiment": sentiment_votes.most_common(1)[0][0],
"model_agreement": sentiment_votes.most_common(1)[0][1] / len(scores),
"individual_votes": dict(sentiment_votes)
}
async def main():
orchestrator = ConsensusOrchestrator()
text = "This product is amazing and works perfectly!"
classification = await orchestrator.classify_with_consensus(text)
print("Classification consensus:", classification)
sentiment = await orchestrator.analyze_sentiment_ensemble(text)
print("Sentiment ensemble:", sentiment)
asyncio.run(main())
Adaptive Model Selection
Python - Choose Model Based on Context
from mcp.client import Client
import asyncio
import json
import time
class AdaptiveOrchestrator:
def __init__(self):
# Different models for different needs
self.models = {
"fast": Client("lightweight-model", transport="stdio"),
"accurate": Client("large-model", transport="stdio"),
"specialized": Client("domain-specific-model", transport="stdio")
}
self.metrics_server = Client("metrics-server", transport="stdio")
async def process_with_latency_constraint(self, text: str, max_latency_ms: int = 100):
"""Choose model based on latency requirements"""
if max_latency_ms < 50:
model_key = "fast"
elif max_latency_ms < 200:
model_key = "accurate"
else:
model_key = "specialized"
client = self.models[model_key]
start = time.time()
result = await client.call_tool(
name="process",
arguments={"text": text}
)
latency = (time.time() - start) * 1000
# Log metrics
await self.metrics_server.call_tool(
name="log_inference",
arguments={
"model": model_key,
"latency_ms": latency,
"text_length": len(text)
}
)
return {
"result": json.loads(result.content[0].text),
"model_used": model_key,
"actual_latency_ms": latency
}
async def route_to_specialized_model(self, text: str, domain: str = None):
"""Route to specialized model for specific domains"""
# If no domain specified, detect it
if not domain:
classifier_result = await self.models["fast"].call_tool(
name="detect_domain",
arguments={"text": text}
)
domain = json.loads(classifier_result.content[0].text)["domain"]
# Use specialized model for detected domain
result = await self.models["specialized"].call_tool(
name="process_domain",
arguments={"text": text, "domain": domain}
)
return {
"detected_domain": domain,
"specialized_result": json.loads(result.content[0].text)
}
async def main():
orchestrator = AdaptiveOrchestrator()
text = "The patient shows symptoms of pneumonia"
# Fast inference needed
fast_result = await orchestrator.process_with_latency_constraint(text, max_latency_ms=50)
print("Fast processing:", fast_result)
# Route to specialized medical model
specialized = await orchestrator.route_to_specialized_model(text, domain="medical")
print("Specialized processing:", specialized)
asyncio.run(main())
Cross-Model Data Flow
Python - Passing Results Between Models
from mcp.client import Client
import asyncio
import json
class DataFlowOrchestrator:
def __init__(self):
self.extraction_model = Client("extraction-backend", transport="stdio")
self.summarization_model = Client("summarization-backend", transport="stdio")
self.translation_model = Client("translation-backend", transport="stdio")
self.storage_service = Client("storage-service", transport="stdio")
async def process_document_pipeline(self, document: str):
"""Multi-stage pipeline with data flow"""
# Stage 1: Extract key information
extraction_result = await self.extraction_model.call_tool(
name="extract_entities",
arguments={"text": document}
)
entities = json.loads(extraction_result.content[0].text)
# Stage 2: Summarize using extracted entities
summary_result = await self.summarization_model.call_tool(
name="summarize",
arguments={
"text": document,
"key_entities": entities["entities"]
}
)
summary = json.loads(summary_result.content[0].text)
# Stage 3: Translate summary to multiple languages
translations = {}
for lang in ["es", "fr", "de"]:
translation_result = await self.translation_model.call_tool(
name="translate",
arguments={
"text": summary["summary"],
"target_language": lang
}
)
translations[lang] = json.loads(translation_result.content[0].text)["translated"]
# Stage 4: Store complete result
final_output = {
"document_id": "doc_123",
"entities": entities,
"summary": summary["summary"],
"translations": translations
}
storage_result = await self.storage_service.call_tool(
name="store_document",
arguments={
"data": json.dumps(final_output),
"index_fields": ["document_id", "entities"]
}
)
return final_output
async def main():
orchestrator = DataFlowOrchestrator()
document = "Apple Inc. was founded in 1976 by Steve Jobs in California..."
result = await orchestrator.process_document_pipeline(document)
print(json.dumps(result, indent=2))
asyncio.run(main())
Fault Tolerance and Fallbacks
Python - Resilient Multi-Model System
from mcp.client import Client
import asyncio
import json
from typing import Optional
class ResilientOrchestrator:
def __init__(self):
self.primary_model = Client("primary-backend", transport="stdio")
self.fallback_model = Client("fallback-backend", transport="stdio")
self.cache_service = Client("cache-service", transport="stdio")
self.max_retries = 3
async def call_with_fallback(self, text: str, tool_name: str):
"""Call model with fallback and caching"""
# Try cache first
try:
cache_result = await self.cache_service.call_tool(
name="get",
arguments={"key": f"{tool_name}:{text[:100]}"}
)
cached = json.loads(cache_result.content[0].text)
if cached:
print("Cache hit!")
return cached
except:
pass
# Try primary model with retries
for attempt in range(self.max_retries):
try:
result = await asyncio.wait_for(
self.primary_model.call_tool(
name=tool_name,
arguments={"text": text}
),
timeout=5.0
)
output = json.loads(result.content[0].text)
# Cache successful result
await self.cache_service.call_tool(
name="set",
arguments={
"key": f"{tool_name}:{text[:100]}",
"value": json.dumps(output),
"ttl": 3600
}
)
return output
except asyncio.TimeoutError:
print(f"Primary model timeout (attempt {attempt + 1})")
except Exception as e:
print(f"Primary model error: {e}")
await asyncio.sleep(0.5 * (attempt + 1)) # Exponential backoff
# Fall back to secondary model
print("Using fallback model")
try:
result = await self.fallback_model.call_tool(
name=tool_name,
arguments={"text": text}
)
return json.loads(result.content[0].text)
except Exception as e:
return {"error": f"Both models failed: {e}"}
async def main():
orchestrator = ResilientOrchestrator()
text = "This is a test message"
result = await orchestrator.call_with_fallback(text, "process")
print(result)
asyncio.run(main())
Best Practices
- Design clear data contracts between models
- Implement model versioning for compatibility
- Add monitoring and logging at each pipeline stage
- Use consensus/ensemble approaches for critical decisions
- Implement circuit breakers for failing models
- Cache intermediate results to reduce latency
- Route requests adaptively based on latency/accuracy needs
- Test full pipelines end-to-end before production