← Back to MCP

Multi-Model Orchestration with MCP

Coordinating multiple AI models across different providers and domains through MCP, enabling sophisticated workflows where models collaborate and share results.

Overview

MCP enables building complex AI systems where multiple models can access shared data sources, pass results between stages, and coordinate decisions. Each model connects as an MCP client to central resources and services.

Multi-Model Pipeline Architecture

Python - Orchestrating Multiple Models
from mcp.client import Client
import asyncio
import json
from typing import Dict, Any

class ModelOrchestrator:
    def __init__(self):
        # Initialize clients to different MCP servers
        self.nlp_client = Client("nlp-backend", transport="stdio")
        self.vision_client = Client("vision-backend", transport="stdio")
        self.recommendation_client = Client("recommendation-backend", transport="stdio")
        self.data_client = Client("data-server", transport="stdio")
    
    async def process_product_listing(self, product_data: Dict[str, Any]):
        """Process product through multiple models"""
        
        # Stage 1: Extract text and categorize (NLP model)
        description = product_data["description"]
        nlp_result = await self.nlp_client.call_tool(
            name="classify_text",
            arguments={"text": description}
        )
        category = json.loads(nlp_result.content[0].text)
        
        # Stage 2: Analyze product image (Vision model)
        image_data = product_data.get("image_base64", "")
        if image_data:
            vision_result = await self.vision_client.call_tool(
                name="extract_features",
                arguments={"image_base64": image_data}
            )
            visual_features = json.loads(vision_result.content[0].text)
        else:
            visual_features = {}
        
        # Stage 3: Get recommendations (Recommendation model)
        rec_result = await self.recommendation_client.call_tool(
            name="get_related_products",
            arguments={
                "category": category["class"],
                "features": visual_features.get("dominant_colors", [])
            }
        )
        recommendations = json.loads(rec_result.content[0].text)
        
        # Stage 4: Store enriched data
        enriched_data = {
            "original": product_data,
            "nlp_analysis": category,
            "vision_analysis": visual_features,
            "recommendations": recommendations
        }
        
        return enriched_data

async def main():
    orchestrator = ModelOrchestrator()
    
    product = {
        "id": "prod_123",
        "name": "Smart Watch",
        "description": "Advanced fitness tracking wearable with heart rate monitor",
        "image_base64": "..."
    }
    
    result = await orchestrator.process_product_listing(product)
    print(json.dumps(result, indent=2))

asyncio.run(main())

Complex Workflow with Model Consensus

Python - Multi-Model Voting and Consensus
from mcp.client import Client
import asyncio
import json
from typing import List
from collections import Counter

class ConsensusOrchestrator:
    def __init__(self):
        # Multiple models for same task - ensemble approach
        self.classifier_models = [
            Client("classifier-model-1", transport="stdio"),
            Client("classifier-model-2", transport="stdio"),
            Client("classifier-model-3", transport="stdio")
        ]
        self.sentiment_models = [
            Client("sentiment-model-1", transport="stdio"),
            Client("sentiment-model-2", transport="stdio")
        ]
    
    async def classify_with_consensus(self, text: str, top_k: int = 3):
        """Get classification from multiple models and find consensus"""
        
        results = []
        for model_client in self.classifier_models:
            try:
                result = await model_client.call_tool(
                    name="classify",
                    arguments={"text": text}
                )
                classification = json.loads(result.content[0].text)
                results.append(classification["category"])
            except Exception as e:
                print(f"Model failed: {e}")
        
        if not results:
            return {"error": "All models failed"}
        
        # Find consensus
        counter = Counter(results)
        top_predictions = counter.most_common(top_k)
        
        return {
            "consensus": top_predictions[0][0],
            "confidence": top_predictions[0][1] / len(results),
            "all_votes": dict(counter),
            "total_models": len(self.classifier_models)
        }
    
    async def analyze_sentiment_ensemble(self, text: str):
        """Combine sentiment analysis from multiple models"""
        
        sentiments = []
        scores = []
        
        for model_client in self.sentiment_models:
            try:
                result = await model_client.call_tool(
                    name="analyze_sentiment",
                    arguments={"text": text}
                )
                sentiment = json.loads(result.content[0].text)
                sentiments.append(sentiment["sentiment"])
                scores.append(sentiment["score"])
            except Exception as e:
                print(f"Model error: {e}")
        
        # Calculate ensemble metrics
        avg_score = sum(scores) / len(scores) if scores else 0
        sentiment_votes = Counter(sentiments)
        
        return {
            "average_score": avg_score,
            "consensus_sentiment": sentiment_votes.most_common(1)[0][0],
            "model_agreement": sentiment_votes.most_common(1)[0][1] / len(scores),
            "individual_votes": dict(sentiment_votes)
        }

async def main():
    orchestrator = ConsensusOrchestrator()
    
    text = "This product is amazing and works perfectly!"
    
    classification = await orchestrator.classify_with_consensus(text)
    print("Classification consensus:", classification)
    
    sentiment = await orchestrator.analyze_sentiment_ensemble(text)
    print("Sentiment ensemble:", sentiment)

asyncio.run(main())

Adaptive Model Selection

Python - Choose Model Based on Context
from mcp.client import Client
import asyncio
import json
import time

class AdaptiveOrchestrator:
    def __init__(self):
        # Different models for different needs
        self.models = {
            "fast": Client("lightweight-model", transport="stdio"),
            "accurate": Client("large-model", transport="stdio"),
            "specialized": Client("domain-specific-model", transport="stdio")
        }
        self.metrics_server = Client("metrics-server", transport="stdio")
    
    async def process_with_latency_constraint(self, text: str, max_latency_ms: int = 100):
        """Choose model based on latency requirements"""
        
        if max_latency_ms < 50:
            model_key = "fast"
        elif max_latency_ms < 200:
            model_key = "accurate"
        else:
            model_key = "specialized"
        
        client = self.models[model_key]
        
        start = time.time()
        result = await client.call_tool(
            name="process",
            arguments={"text": text}
        )
        latency = (time.time() - start) * 1000
        
        # Log metrics
        await self.metrics_server.call_tool(
            name="log_inference",
            arguments={
                "model": model_key,
                "latency_ms": latency,
                "text_length": len(text)
            }
        )
        
        return {
            "result": json.loads(result.content[0].text),
            "model_used": model_key,
            "actual_latency_ms": latency
        }
    
    async def route_to_specialized_model(self, text: str, domain: str = None):
        """Route to specialized model for specific domains"""
        
        # If no domain specified, detect it
        if not domain:
            classifier_result = await self.models["fast"].call_tool(
                name="detect_domain",
                arguments={"text": text}
            )
            domain = json.loads(classifier_result.content[0].text)["domain"]
        
        # Use specialized model for detected domain
        result = await self.models["specialized"].call_tool(
            name="process_domain",
            arguments={"text": text, "domain": domain}
        )
        
        return {
            "detected_domain": domain,
            "specialized_result": json.loads(result.content[0].text)
        }

async def main():
    orchestrator = AdaptiveOrchestrator()
    
    text = "The patient shows symptoms of pneumonia"
    
    # Fast inference needed
    fast_result = await orchestrator.process_with_latency_constraint(text, max_latency_ms=50)
    print("Fast processing:", fast_result)
    
    # Route to specialized medical model
    specialized = await orchestrator.route_to_specialized_model(text, domain="medical")
    print("Specialized processing:", specialized)

asyncio.run(main())

Cross-Model Data Flow

Python - Passing Results Between Models
from mcp.client import Client
import asyncio
import json

class DataFlowOrchestrator:
    def __init__(self):
        self.extraction_model = Client("extraction-backend", transport="stdio")
        self.summarization_model = Client("summarization-backend", transport="stdio")
        self.translation_model = Client("translation-backend", transport="stdio")
        self.storage_service = Client("storage-service", transport="stdio")
    
    async def process_document_pipeline(self, document: str):
        """Multi-stage pipeline with data flow"""
        
        # Stage 1: Extract key information
        extraction_result = await self.extraction_model.call_tool(
            name="extract_entities",
            arguments={"text": document}
        )
        entities = json.loads(extraction_result.content[0].text)
        
        # Stage 2: Summarize using extracted entities
        summary_result = await self.summarization_model.call_tool(
            name="summarize",
            arguments={
                "text": document,
                "key_entities": entities["entities"]
            }
        )
        summary = json.loads(summary_result.content[0].text)
        
        # Stage 3: Translate summary to multiple languages
        translations = {}
        for lang in ["es", "fr", "de"]:
            translation_result = await self.translation_model.call_tool(
                name="translate",
                arguments={
                    "text": summary["summary"],
                    "target_language": lang
                }
            )
            translations[lang] = json.loads(translation_result.content[0].text)["translated"]
        
        # Stage 4: Store complete result
        final_output = {
            "document_id": "doc_123",
            "entities": entities,
            "summary": summary["summary"],
            "translations": translations
        }
        
        storage_result = await self.storage_service.call_tool(
            name="store_document",
            arguments={
                "data": json.dumps(final_output),
                "index_fields": ["document_id", "entities"]
            }
        )
        
        return final_output

async def main():
    orchestrator = DataFlowOrchestrator()
    
    document = "Apple Inc. was founded in 1976 by Steve Jobs in California..."
    result = await orchestrator.process_document_pipeline(document)
    print(json.dumps(result, indent=2))

asyncio.run(main())

Fault Tolerance and Fallbacks

Python - Resilient Multi-Model System
from mcp.client import Client
import asyncio
import json
from typing import Optional

class ResilientOrchestrator:
    def __init__(self):
        self.primary_model = Client("primary-backend", transport="stdio")
        self.fallback_model = Client("fallback-backend", transport="stdio")
        self.cache_service = Client("cache-service", transport="stdio")
        self.max_retries = 3
    
    async def call_with_fallback(self, text: str, tool_name: str):
        """Call model with fallback and caching"""
        
        # Try cache first
        try:
            cache_result = await self.cache_service.call_tool(
                name="get",
                arguments={"key": f"{tool_name}:{text[:100]}"}
            )
            cached = json.loads(cache_result.content[0].text)
            if cached:
                print("Cache hit!")
                return cached
        except:
            pass
        
        # Try primary model with retries
        for attempt in range(self.max_retries):
            try:
                result = await asyncio.wait_for(
                    self.primary_model.call_tool(
                        name=tool_name,
                        arguments={"text": text}
                    ),
                    timeout=5.0
                )
                output = json.loads(result.content[0].text)
                
                # Cache successful result
                await self.cache_service.call_tool(
                    name="set",
                    arguments={
                        "key": f"{tool_name}:{text[:100]}",
                        "value": json.dumps(output),
                        "ttl": 3600
                    }
                )
                
                return output
            except asyncio.TimeoutError:
                print(f"Primary model timeout (attempt {attempt + 1})")
            except Exception as e:
                print(f"Primary model error: {e}")
            
            await asyncio.sleep(0.5 * (attempt + 1))  # Exponential backoff
        
        # Fall back to secondary model
        print("Using fallback model")
        try:
            result = await self.fallback_model.call_tool(
                name=tool_name,
                arguments={"text": text}
            )
            return json.loads(result.content[0].text)
        except Exception as e:
            return {"error": f"Both models failed: {e}"}

async def main():
    orchestrator = ResilientOrchestrator()
    
    text = "This is a test message"
    result = await orchestrator.call_with_fallback(text, "process")
    print(result)

asyncio.run(main())

Best Practices