← Back to GCP Vertex AI

ML Ops & Scaling with Vertex AI

Managing production ML systems with automated pipelines, model monitoring, continuous retraining, and enterprise-scale infrastructure.

Overview

Vertex Pipelines orchestrates ML workflows. Model Registry manages versions. Monitoring tracks performance. Feature Store centralizes features. Together they enable MLOps at scale.

Vertex Pipelines for Workflow Orchestration

Python - Automated ML Pipeline
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
import kfp
from kfp.v2 import dsl
from kfp.v2.dsl import component, pipeline, Input, Output

# Define pipeline components
@component
def load_data(output_path: str):
    """Load training data"""
    import pandas as pd
    df = pd.read_csv('gs://bucket/data.csv')
    df.to_csv(output_path, index=False)

@component
def preprocess_data(
    input_data: Input[dsl.Dataset],
    output_data: Output[dsl.Dataset]
):
    """Preprocess data"""
    import pandas as pd
    df = pd.read_csv(input_data.path)
    # Preprocessing logic
    df.to_csv(output_data.path, index=False)

@component
def train_model(
    training_data: Input[dsl.Dataset],
    model_path: str
) -> str:
    """Train model"""
    from google.cloud import aiplatform
    # Training code
    return f"gs://bucket/model"

@component
def evaluate_model(
    model_path: str,
    test_data: Input[dsl.Dataset]
) -> float:
    """Evaluate model"""
    # Evaluation code
    return 0.95

# Define pipeline
@pipeline(
    name="ml-training-pipeline",
    description="Complete ML pipeline",
    pipeline_root="gs://bucket/pipeline_root",
)
def ml_pipeline():
    load_task = load_data(output_path="gs://bucket/raw_data.csv")
    preprocess_task = preprocess_data(input_data=load_task.output)
    train_task = train_model(
        training_data=preprocess_task.output,
        model_path="gs://bucket/model"
    )
    eval_task = evaluate_model(
        model_path=train_task.output,
        test_data=preprocess_task.output
    )

# Compile and run pipeline
from kfp import compiler
compiler.Compiler().compile(ml_pipeline, "pipeline.yaml")

# Submit to Vertex
aiplatform.init(project="your-project")
job = aiplatform.PipelineJob(
    display_name="ml-pipeline-run",
    template_path="pipeline.yaml",
    pipeline_root="gs://bucket/pipeline_root",
)
job.run()

Model Registry & Versioning

Python - Model Management
from google.cloud import aiplatform

aiplatform.init(project="your-project-id")

# Register model in Model Registry
model = aiplatform.Model.upload(
    display_name="sales-forecaster-v1",
    artifact_uri="gs://bucket/models/forecaster",
    serving_container_image_uri="gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-12",
)

print(f"Model resource ID: {model.resource_name}")

# Create model versions
version1 = aiplatform.Model.upload(
    display_name="sales-forecaster-v2",
    artifact_uri="gs://bucket/models/forecaster_v2",
    serving_container_image_uri="gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-12",
    parent_model=model.resource_name,
)

# List all versions
versions = aiplatform.Model.list(
    filter=f"parent_model={model.resource_name}"
)

# Deploy specific version to endpoint
endpoint = aiplatform.Endpoint.create(
    display_name="sales-forecast-endpoint"
)

endpoint.deploy(
    model=version1,
    machine_type="n1-standard-4",
    traffic_percentage=100,
)

Continuous Model Retraining

Python - Automated Retraining
from google.cloud import aiplatform, scheduler_v1
import json

def schedule_retraining():
    """Setup scheduled model retraining"""
    
    # Create Cloud Scheduler job
    client = scheduler_v1.CloudSchedulerClient()
    parent = client.common_project_path("your-project-id")
    
    # Schedule weekly retraining
    job = {
        "name": client.job_path("your-project-id", "us-central1", "weekly-retrain"),
        "description": "Weekly model retraining",
        "schedule": "0 2 * * 1",  # Every Monday at 2am
        "timezone": "America/New_York",
        "http_target": {
            "uri": "https://region-run.googleapis.com/apis/run.googleapis.com/v1/projects/your-project-id/locations/us-central1/jobs/retrain-job:run",
            "http_method": "POST",
            "headers": {"Content-Type": "application/json"},
        }
    }
    
    response = client.create_job(request={"parent": parent, "job": job})
    return response

def retrain_model_job():
    """Retraining job function"""
    from google.cloud import bigquery
    import pandas as pd
    
    # 1. Get latest data
    client = bigquery.Client()
    df = client.query("""
        SELECT * FROM `your-project.sales_data.daily_sales`
        WHERE DATE(sales_date) > DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
    """).to_dataframe()
    
    # 2. Check data quality
    if df.empty or df.isnull().sum().sum() > len(df) * 0.1:
        print("Data quality check failed")
        return False
    
    # 3. Retrain model
    aiplatform.init(project="your-project-id")
    
    job = aiplatform.AutoMLTabularTrainingJob(
        display_name="sales-forecaster-retrain",
        optimization_prediction_type="regression",
    )
    
    dataset = aiplatform.TabularDataset.create(
        display_name="sales-data-latest",
        bq_source="bq://your-project.sales_data.daily_sales",
    )
    
    model = job.run(
        dataset=dataset,
        target_column="sales_amount",
        budget_milli_node_hours=1000,
    )
    
    # 4. Evaluate new model
    eval_metrics = model.get_model_evaluation()
    
    # 5. Compare with baseline
    baseline_model = aiplatform.Model("baseline-model-id")
    baseline_metrics = baseline_model.get_model_evaluation()
    
    if eval_metrics.metrics['rmse'] < baseline_metrics.metrics['rmse']:
        print("New model is better, deploying...")
        
        # 6. Deploy to endpoint with canary
        endpoint = aiplatform.Endpoint("endpoint-id")
        endpoint.deploy(
            model=model,
            machine_type="n1-standard-2",
            traffic_percentage=10,  # Start with 10% traffic
        )
        
        return True
    else:
        print("New model is not better than baseline")
        return False

# Schedule retraining
schedule_retraining()

Model Monitoring & Drift Detection

Python - Production Monitoring
from google.cloud import aiplatform
from google.cloud.aiplatform import monitoring

aiplatform.init(project="your-project-id")

# Create model monitoring job
monitoring_job = aiplatform.ModelMonitoringJob.create(
    display_name="sales-forecaster-monitoring",
    project="your-project-id",
    location="us-central1",
    objective_config=monitoring.ModelMonitoringObjectiveConfig(
        skew_detection_config=monitoring.ModelMonitoringAlertConfig(
            data_drift_threshold=0.1,
            attribution_score_skew_threshold=0.05,
        ),
        prediction_drift_config=monitoring.ModelMonitoringAlertConfig(
            data_drift_threshold=0.15,
        ),
    ),
    alert_config=monitoring.ModelMonitoringAlertConfig(
        enable_logging=True,
    ),
)

# Get monitoring results
def check_model_drift(endpoint_id: str):
    """Check for data/prediction drift"""
    
    from google.cloud import logging_v2
    
    client = logging_v2.Client()
    
    # Query monitoring alerts
    query = f"""
    resource.type="aiplatform.googleapis.com/Endpoint"
    resource.labels.endpoint_id="{endpoint_id}"
    jsonPayload.alert_type="data_drift"
    """
    
    entries = client.list_entries(filter_=query)
    
    for entry in entries:
        alert = entry.payload
        if alert['drift_score'] > 0.1:
            print(f"Alert: Data drift detected - {alert['feature']}: {alert['drift_score']}")
            return True
    
    return False

Feature Store for Centralized Features

Python - Feature Management
from google.cloud import aiplatform
from google.cloud.aiplatform_v1beta1.services.feature_store_service import FeatureStoreServiceClient

aiplatform.init(project="your-project-id", location="us-central1")

# Create Feature Store
feature_store = aiplatform.FeatureStore.create(
    name="sales_features",
    online_store_type="OPTIMIZED",
    online_enable_fresh_fresh_transfer=False,
)

# Create entity type
customers = aiplatform.EntityType.create(
    entity_type_id="customer",
    feature_store_name=feature_store.name,
)

# Create features
features_config = [
    {
        "id": "customer_age",
        "value_type": "INT64",
        "description": "Customer age",
    },
    {
        "id": "total_purchase_amount",
        "value_type": "DOUBLE",
        "description": "Total purchase amount",
    },
    {
        "id": "purchase_frequency",
        "value_type": "INT64",
        "description": "Number of purchases",
    },
]

for feature in features_config:
    aiplatform.Feature.create(
        feature_id=feature["id"],
        entity_type_name=customers.name,
        value_type=feature["value_type"],
        description=feature["description"],
    )

# Ingest feature data
from google.cloud import bigquery

def ingest_features():
    """Load features from BigQuery"""
    bq_client = bigquery.Client()
    
    query = """
    SELECT
        customer_id,
        age as customer_age,
        total_purchases as total_purchase_amount,
        COUNT(*) as purchase_frequency
    FROM `your-project.customer_data.customers`
    GROUP BY customer_id, customer_age, total_purchase_amount
    """
    
    df = bq_client.query(query).to_dataframe()
    
    # Write to feature store
    import json
    request_body = {
        "entity_id": "customer",
        "feature_values": df.to_dict('records')
    }
    
    # Ingest via API
    # Implementation details for writing features

# Use features in training
def get_training_features(entity_ids: list):
    """Retrieve features for training"""
    
    feature_list = [
        f"{customers.name}/features/customer_age",
        f"{customers.name}/features/total_purchase_amount",
        f"{customers.name}/features/purchase_frequency",
    ]
    
    # Fetch features
    # Implementation for feature retrieval

Model Serving with Auto-Scaling

Python - Production Deployment
from google.cloud import aiplatform

aiplatform.init(project="your-project-id")

# Create endpoint with auto-scaling
endpoint = aiplatform.Endpoint.create(
    display_name="high-volume-endpoint",
    network="projects/your-project/global/networks/vpc-name",
)

# Deploy model with auto-scaling
model = aiplatform.Model("model-resource-id")

endpoint.deploy(
    model=model,
    deployed_model_display_name="forecaster-v1",
    machine_type="n1-standard-4",
    accelerator_type="NVIDIA_TESLA_K80",
    accelerator_count=1,
    min_replica_count=2,
    max_replica_count=10,
    traffic_percentage=100,
    enable_access_logging=True,
)

# Monitor endpoint
traffic_split = {
    model.resource_name: 100,  # Route 100% to new model
}

endpoint.update_traffic_split(traffic_split)

# Get endpoint metrics
metrics = endpoint.gca_resource.model_deployment_monitoring_job
print(f"Deployed models: {len(endpoint.deployed_models)}")
print(f"Traffic: {endpoint.traffic_split}")

Best Practices