Data Source Integration with MCP
Exposing databases, files, and data sources as MCP resources, enabling AI models to query and access structured data through a standardized protocol.
Overview
MCP resources represent data sources that can be accessed by clients. Resources have URIs, descriptions, and optional schemas. Multiple data sources can be exposed through a single MCP server for unified access.
Basic Database Resource Server
Python - PostgreSQL via MCP
from mcp.server import Server, Resource
from mcp.types import TextContent
import psycopg2
import json
server = Server("postgres-data-server")
# Define resources
products_resource = Resource(
uri="postgres://products",
name="Products Table",
description="Access product database"
)
users_resource = Resource(
uri="postgres://users",
name="Users Table",
description="Access user database"
)
@server.resource(products_resource)
def read_products(limit: int = 10) -> str:
"""Read products from database"""
try:
conn = psycopg2.connect(
host="localhost",
database="myapp",
user="admin",
password="password"
)
cursor = conn.cursor()
cursor.execute("SELECT id, name, price FROM products LIMIT %s", (limit,))
rows = cursor.fetchall()
products = [
{"id": row[0], "name": row[1], "price": row[2]}
for row in rows
]
cursor.close()
conn.close()
return TextContent(text=json.dumps(products))
except Exception as e:
return TextContent(text=f"Error: {str(e)}")
@server.resource(users_resource)
def read_users(filter_country: str = None) -> str:
"""Read users from database with optional filter"""
try:
conn = psycopg2.connect(
host="localhost",
database="myapp",
user="admin",
password="password"
)
cursor = conn.cursor()
if filter_country:
cursor.execute(
"SELECT id, name, email, country FROM users WHERE country = %s",
(filter_country,)
)
else:
cursor.execute("SELECT id, name, email, country FROM users")
rows = cursor.fetchall()
users = [
{"id": row[0], "name": row[1], "email": row[2], "country": row[3]}
for row in rows
]
cursor.close()
conn.close()
return TextContent(text=json.dumps(users))
except Exception as e:
return TextContent(text=f"Error: {str(e)}")
File System Resource Server
Python - Expose Files and Directories
from mcp.server import Server, Resource
from mcp.types import TextContent
import os
import json
from pathlib import Path
server = Server("file-system-server")
class FileResource(Resource):
def __init__(self, path: str):
super().__init__(
uri=f"file://{path}",
name=Path(path).name,
description=f"File: {path}"
)
self.path = path
@server.resource_template("file://{path}")
def read_file(path: str, format: str = "text") -> str:
"""Read file content"""
try:
full_path = f"/data/{path}"
# Security check
if ".." in path or not os.path.exists(full_path):
return TextContent(text="Access denied or file not found")
with open(full_path, "r") as f:
content = f.read()
if format == "json":
data = json.loads(content)
return TextContent(text=json.dumps(data, indent=2))
elif format == "lines":
lines = content.split("\n")
return TextContent(text=f"File has {len(lines)} lines")
else:
return TextContent(text=content[:5000])
except Exception as e:
return TextContent(text=f"Error: {str(e)}")
@server.resource_template("directory://{path}")
def list_directory(path: str) -> str:
"""List directory contents"""
try:
full_path = f"/data/{path}"
if not os.path.isdir(full_path):
return TextContent(text="Not a directory")
items = []
for item in os.listdir(full_path):
item_path = os.path.join(full_path, item)
is_dir = os.path.isdir(item_path)
size = os.path.getsize(item_path) if not is_dir else 0
items.append({
"name": item,
"type": "directory" if is_dir else "file",
"size": size
})
return TextContent(text=json.dumps(items))
except Exception as e:
return TextContent(text=f"Error: {str(e)}")
API Data Resource
Python - Expose API as Resource
from mcp.server import Server, Resource
from mcp.types import TextContent
import requests
import json
from datetime import datetime, timedelta
server = Server("api-data-server")
class CachedResource:
def __init__(self, uri: str, fetch_func, cache_ttl: int = 300):
self.resource = Resource(uri=uri, name=uri)
self.fetch_func = fetch_func
self.cache_ttl = cache_ttl
self.cache = None
self.cache_time = None
def get_data(self):
now = datetime.now()
if (self.cache is None or
(self.cache_time and (now - self.cache_time).seconds > self.cache_ttl)):
self.cache = self.fetch_func()
self.cache_time = now
return self.cache
def fetch_github_trends():
"""Fetch trending repositories"""
try:
url = "https://api.github.com/search/repositories"
params = {
"q": "stars:>10000",
"sort": "stars",
"order": "desc",
"per_page": 10
}
response = requests.get(url, params=params)
data = response.json()
repos = [
{
"name": repo["name"],
"stars": repo["stargazers_count"],
"language": repo["language"],
"url": repo["html_url"]
}
for repo in data.get("items", [])
]
return json.dumps(repos)
except Exception as e:
return json.dumps({"error": str(e)})
# Create cached resource
github_resource = CachedResource(
"api://github/trending",
fetch_github_trends,
cache_ttl=3600 # Cache for 1 hour
)
@server.resource(github_resource.resource)
def get_github_trends() -> str:
"""Get trending GitHub repositories"""
return TextContent(text=github_resource.get_data())
Vector Database Resource
Python - Vector Store Access via MCP
from mcp.server import Server, Resource
from mcp.types import TextContent
import json
import numpy as np
# Simulated vector database
class VectorDB:
def __init__(self):
self.vectors = {
"doc1": {"text": "Python tutorial", "embedding": [0.1, 0.2, 0.3]},
"doc2": {"text": "JavaScript guide", "embedding": [0.15, 0.25, 0.35]},
"doc3": {"text": "Python best practices", "embedding": [0.12, 0.22, 0.32]},
}
def search(self, query_embedding: list, top_k: int = 5):
"""Vector similarity search"""
query = np.array(query_embedding)
results = []
for doc_id, doc in self.vectors.items():
embedding = np.array(doc["embedding"])
similarity = np.dot(query, embedding) / (np.linalg.norm(query) * np.linalg.norm(embedding))
results.append({
"doc_id": doc_id,
"text": doc["text"],
"similarity": float(similarity)
})
return sorted(results, key=lambda x: x["similarity"], reverse=True)[:top_k]
server = Server("vector-db-server")
vector_db = VectorDB()
@server.resource(Resource(
uri="vectordb://search",
name="Vector Search",
description="Search documents by semantic similarity"
))
def vector_search(query_text: str, top_k: int = 5) -> str:
"""Search vector database"""
try:
# In real scenario, would embed query_text using an embedding model
query_embedding = [0.1, 0.2, 0.3] # Mock embedding
results = vector_db.search(query_embedding, top_k)
return TextContent(text=json.dumps(results))
except Exception as e:
return TextContent(text=f"Search error: {str(e)}")
Resource Discovery and Querying
Python - Client Resource Access
from mcp.client import Client
import asyncio
import json
async def query_resources():
client = Client("data-server", transport="stdio")
# List all available resources
resources = await client.list_resources()
print("Available data sources:")
for resource in resources:
print(f" - {resource.uri}: {resource.description}")
# Read from a specific resource
products = await client.read_resource(
uri="postgres://products",
arguments={"limit": 5}
)
print("\nProducts:")
data = json.loads(products.contents[0].text)
for product in data:
print(f" {product['name']}: ${product['price']}")
# Access files
file_contents = await client.read_resource(
uri="file://config.json",
arguments={"format": "json"}
)
config = json.loads(file_contents.contents[0].text)
print(f"\nConfig: {json.dumps(config, indent=2)}")
asyncio.run(query_resources())
Multi-Source Data Integration
Python - Unified Data Access
from mcp.server import Server
from mcp.types import TextContent
import json
server = Server("unified-data-server")
@server.resource(Resource(
uri="data://user_profile/complete",
name="Complete User Profile",
description="Combines data from multiple sources"
))
def get_complete_user_profile(user_id: int) -> str:
"""Aggregate data from multiple sources"""
try:
# In practice, would query actual data sources
user_data = {
"id": user_id,
"name": "John Doe",
"email": "john@example.com"
}
orders_data = {
"total_orders": 5,
"total_spent": 249.99,
"recent_orders": [
{"id": 1001, "date": "2025-01-15", "amount": 50.00},
{"id": 1002, "date": "2025-01-10", "amount": 75.50}
]
}
preferences_data = {
"language": "en",
"notifications": True,
"timezone": "UTC"
}
profile = {
**user_data,
"orders": orders_data,
"preferences": preferences_data
}
return TextContent(text=json.dumps(profile))
except Exception as e:
return TextContent(text=f"Error: {str(e)}")
Best Practices
- Define clear resource URIs following a consistent pattern
- Implement query result pagination for large datasets
- Cache frequently accessed data to reduce latency
- Enforce authentication and authorization for sensitive data
- Document data schema and available fields
- Set appropriate timeouts for long-running queries
- Validate and sanitize all query parameters
- Provide aggregated views of related data