ML Ops & Scaling with Vertex AI
Managing production ML systems with automated pipelines, model monitoring, continuous retraining, and enterprise-scale infrastructure.
Overview
Vertex Pipelines orchestrates ML workflows. Model Registry manages versions. Monitoring tracks performance. Feature Store centralizes features. Together they enable MLOps at scale.
Vertex Pipelines for Workflow Orchestration
Python - Automated ML Pipeline
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
import kfp
from kfp.v2 import dsl
from kfp.v2.dsl import component, pipeline, Input, Output
# Define pipeline components
@component
def load_data(output_path: str):
"""Load training data"""
import pandas as pd
df = pd.read_csv('gs://bucket/data.csv')
df.to_csv(output_path, index=False)
@component
def preprocess_data(
input_data: Input[dsl.Dataset],
output_data: Output[dsl.Dataset]
):
"""Preprocess data"""
import pandas as pd
df = pd.read_csv(input_data.path)
# Preprocessing logic
df.to_csv(output_data.path, index=False)
@component
def train_model(
training_data: Input[dsl.Dataset],
model_path: str
) -> str:
"""Train model"""
from google.cloud import aiplatform
# Training code
return f"gs://bucket/model"
@component
def evaluate_model(
model_path: str,
test_data: Input[dsl.Dataset]
) -> float:
"""Evaluate model"""
# Evaluation code
return 0.95
# Define pipeline
@pipeline(
name="ml-training-pipeline",
description="Complete ML pipeline",
pipeline_root="gs://bucket/pipeline_root",
)
def ml_pipeline():
load_task = load_data(output_path="gs://bucket/raw_data.csv")
preprocess_task = preprocess_data(input_data=load_task.output)
train_task = train_model(
training_data=preprocess_task.output,
model_path="gs://bucket/model"
)
eval_task = evaluate_model(
model_path=train_task.output,
test_data=preprocess_task.output
)
# Compile and run pipeline
from kfp import compiler
compiler.Compiler().compile(ml_pipeline, "pipeline.yaml")
# Submit to Vertex
aiplatform.init(project="your-project")
job = aiplatform.PipelineJob(
display_name="ml-pipeline-run",
template_path="pipeline.yaml",
pipeline_root="gs://bucket/pipeline_root",
)
job.run()
Model Registry & Versioning
Python - Model Management
from google.cloud import aiplatform
aiplatform.init(project="your-project-id")
# Register model in Model Registry
model = aiplatform.Model.upload(
display_name="sales-forecaster-v1",
artifact_uri="gs://bucket/models/forecaster",
serving_container_image_uri="gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-12",
)
print(f"Model resource ID: {model.resource_name}")
# Create model versions
version1 = aiplatform.Model.upload(
display_name="sales-forecaster-v2",
artifact_uri="gs://bucket/models/forecaster_v2",
serving_container_image_uri="gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-12",
parent_model=model.resource_name,
)
# List all versions
versions = aiplatform.Model.list(
filter=f"parent_model={model.resource_name}"
)
# Deploy specific version to endpoint
endpoint = aiplatform.Endpoint.create(
display_name="sales-forecast-endpoint"
)
endpoint.deploy(
model=version1,
machine_type="n1-standard-4",
traffic_percentage=100,
)
Continuous Model Retraining
Python - Automated Retraining
from google.cloud import aiplatform, scheduler_v1
import json
def schedule_retraining():
"""Setup scheduled model retraining"""
# Create Cloud Scheduler job
client = scheduler_v1.CloudSchedulerClient()
parent = client.common_project_path("your-project-id")
# Schedule weekly retraining
job = {
"name": client.job_path("your-project-id", "us-central1", "weekly-retrain"),
"description": "Weekly model retraining",
"schedule": "0 2 * * 1", # Every Monday at 2am
"timezone": "America/New_York",
"http_target": {
"uri": "https://region-run.googleapis.com/apis/run.googleapis.com/v1/projects/your-project-id/locations/us-central1/jobs/retrain-job:run",
"http_method": "POST",
"headers": {"Content-Type": "application/json"},
}
}
response = client.create_job(request={"parent": parent, "job": job})
return response
def retrain_model_job():
"""Retraining job function"""
from google.cloud import bigquery
import pandas as pd
# 1. Get latest data
client = bigquery.Client()
df = client.query("""
SELECT * FROM `your-project.sales_data.daily_sales`
WHERE DATE(sales_date) > DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
""").to_dataframe()
# 2. Check data quality
if df.empty or df.isnull().sum().sum() > len(df) * 0.1:
print("Data quality check failed")
return False
# 3. Retrain model
aiplatform.init(project="your-project-id")
job = aiplatform.AutoMLTabularTrainingJob(
display_name="sales-forecaster-retrain",
optimization_prediction_type="regression",
)
dataset = aiplatform.TabularDataset.create(
display_name="sales-data-latest",
bq_source="bq://your-project.sales_data.daily_sales",
)
model = job.run(
dataset=dataset,
target_column="sales_amount",
budget_milli_node_hours=1000,
)
# 4. Evaluate new model
eval_metrics = model.get_model_evaluation()
# 5. Compare with baseline
baseline_model = aiplatform.Model("baseline-model-id")
baseline_metrics = baseline_model.get_model_evaluation()
if eval_metrics.metrics['rmse'] < baseline_metrics.metrics['rmse']:
print("New model is better, deploying...")
# 6. Deploy to endpoint with canary
endpoint = aiplatform.Endpoint("endpoint-id")
endpoint.deploy(
model=model,
machine_type="n1-standard-2",
traffic_percentage=10, # Start with 10% traffic
)
return True
else:
print("New model is not better than baseline")
return False
# Schedule retraining
schedule_retraining()
Model Monitoring & Drift Detection
Python - Production Monitoring
from google.cloud import aiplatform
from google.cloud.aiplatform import monitoring
aiplatform.init(project="your-project-id")
# Create model monitoring job
monitoring_job = aiplatform.ModelMonitoringJob.create(
display_name="sales-forecaster-monitoring",
project="your-project-id",
location="us-central1",
objective_config=monitoring.ModelMonitoringObjectiveConfig(
skew_detection_config=monitoring.ModelMonitoringAlertConfig(
data_drift_threshold=0.1,
attribution_score_skew_threshold=0.05,
),
prediction_drift_config=monitoring.ModelMonitoringAlertConfig(
data_drift_threshold=0.15,
),
),
alert_config=monitoring.ModelMonitoringAlertConfig(
enable_logging=True,
),
)
# Get monitoring results
def check_model_drift(endpoint_id: str):
"""Check for data/prediction drift"""
from google.cloud import logging_v2
client = logging_v2.Client()
# Query monitoring alerts
query = f"""
resource.type="aiplatform.googleapis.com/Endpoint"
resource.labels.endpoint_id="{endpoint_id}"
jsonPayload.alert_type="data_drift"
"""
entries = client.list_entries(filter_=query)
for entry in entries:
alert = entry.payload
if alert['drift_score'] > 0.1:
print(f"Alert: Data drift detected - {alert['feature']}: {alert['drift_score']}")
return True
return False
Feature Store for Centralized Features
Python - Feature Management
from google.cloud import aiplatform
from google.cloud.aiplatform_v1beta1.services.feature_store_service import FeatureStoreServiceClient
aiplatform.init(project="your-project-id", location="us-central1")
# Create Feature Store
feature_store = aiplatform.FeatureStore.create(
name="sales_features",
online_store_type="OPTIMIZED",
online_enable_fresh_fresh_transfer=False,
)
# Create entity type
customers = aiplatform.EntityType.create(
entity_type_id="customer",
feature_store_name=feature_store.name,
)
# Create features
features_config = [
{
"id": "customer_age",
"value_type": "INT64",
"description": "Customer age",
},
{
"id": "total_purchase_amount",
"value_type": "DOUBLE",
"description": "Total purchase amount",
},
{
"id": "purchase_frequency",
"value_type": "INT64",
"description": "Number of purchases",
},
]
for feature in features_config:
aiplatform.Feature.create(
feature_id=feature["id"],
entity_type_name=customers.name,
value_type=feature["value_type"],
description=feature["description"],
)
# Ingest feature data
from google.cloud import bigquery
def ingest_features():
"""Load features from BigQuery"""
bq_client = bigquery.Client()
query = """
SELECT
customer_id,
age as customer_age,
total_purchases as total_purchase_amount,
COUNT(*) as purchase_frequency
FROM `your-project.customer_data.customers`
GROUP BY customer_id, customer_age, total_purchase_amount
"""
df = bq_client.query(query).to_dataframe()
# Write to feature store
import json
request_body = {
"entity_id": "customer",
"feature_values": df.to_dict('records')
}
# Ingest via API
# Implementation details for writing features
# Use features in training
def get_training_features(entity_ids: list):
"""Retrieve features for training"""
feature_list = [
f"{customers.name}/features/customer_age",
f"{customers.name}/features/total_purchase_amount",
f"{customers.name}/features/purchase_frequency",
]
# Fetch features
# Implementation for feature retrieval
Model Serving with Auto-Scaling
Python - Production Deployment
from google.cloud import aiplatform
aiplatform.init(project="your-project-id")
# Create endpoint with auto-scaling
endpoint = aiplatform.Endpoint.create(
display_name="high-volume-endpoint",
network="projects/your-project/global/networks/vpc-name",
)
# Deploy model with auto-scaling
model = aiplatform.Model("model-resource-id")
endpoint.deploy(
model=model,
deployed_model_display_name="forecaster-v1",
machine_type="n1-standard-4",
accelerator_type="NVIDIA_TESLA_K80",
accelerator_count=1,
min_replica_count=2,
max_replica_count=10,
traffic_percentage=100,
enable_access_logging=True,
)
# Monitor endpoint
traffic_split = {
model.resource_name: 100, # Route 100% to new model
}
endpoint.update_traffic_split(traffic_split)
# Get endpoint metrics
metrics = endpoint.gca_resource.model_deployment_monitoring_job
print(f"Deployed models: {len(endpoint.deployed_models)}")
print(f"Traffic: {endpoint.traffic_split}")
Best Practices
- Use Vertex Pipelines for reproducible workflows
- Maintain complete audit trails of model versions
- Automate retraining when drift is detected
- Monitor both data and prediction drift continuously
- Use Feature Store for consistency across teams
- Implement gradual rollouts (canary deployments)
- Set up proper alerts for production issues