Predictive Analytics with Vertex AI
Building forecasting and prediction models for business metrics, customer behavior, and time-series data with AutoML or custom training.
Overview
Vertex AI handles regression tasks, forecasting, and classification at scale. Use AutoML for structured data or custom training for advanced techniques.
Tabular AutoML for Sales Forecasting
Python - Quick Forecasting Model
from google.cloud import aiplatform
# Initialize
aiplatform.init(project="your-project-id", location="us-central1")
# Create tabular dataset from BigQuery
dataset = aiplatform.TabularDataset.create(
display_name="sales-forecast-data",
bq_source="bq://your-project.sales_data.monthly_sales",
)
# AutoML regression job (forecasting)
job = aiplatform.AutoMLTabularTrainingJob(
display_name="sales-forecaster",
optimization_prediction_type="regression", # Forecasting
optimization_objective="minimize_rmse",
)
model = job.run(
dataset=dataset,
target_column="sales_amount",
training_fraction_split=0.8,
validation_fraction_split=0.1,
test_fraction_split=0.1,
budget_milli_node_hours=1000,
disable_early_stopping=False,
)
Time-Series Forecasting
Python - ARIMA-like Forecasting
from google.cloud import aiplatform
import pandas as pd
# Time-series specific dataset
dataset = aiplatform.TimeSeriesDataset.create(
display_name="traffic-forecast",
gcs_source="gs://your-bucket/time_series_data.csv",
)
# Time-series forecasting job
job = aiplatform.AutoMLForecastingTrainingJob(
display_name="traffic-forecaster",
optimization_objective="minimize_rmse",
)
model = job.run(
dataset=dataset,
target_column="traffic_volume",
time_column="timestamp",
time_series_identifier_column="location",
forecast_horizon=12, # Predict 12 periods ahead
context_window=24, # Use 24 periods for context
budget_milli_node_hours=2000,
)
Customer Churn Prediction
Python - Classification Model
from google.cloud import aiplatform
from google.cloud import bigquery
import pandas as pd
# Prepare churn data in BigQuery
bq_client = bigquery.Client()
query = """
SELECT
customer_id,
customer_tenure_months,
monthly_charges,
total_charges,
internet_service_type,
contract_type,
churn AS churn_label
FROM
`your-project.telecom_data.customers`
WHERE
DATE(signup_date) < DATE_SUB(CURRENT_DATE(), INTERVAL 12 MONTH)
"""
# Create dataset from query
dataset = aiplatform.TabularDataset.create(
display_name="churn-data",
bq_source=f"bq://{bq_client.project}.temp.churn_dataset",
)
# Train classification model
job = aiplatform.AutoMLTabularTrainingJob(
display_name="churn-predictor",
optimization_prediction_type="classification",
optimization_objective="maximize_au_roc",
)
model = job.run(
dataset=dataset,
target_column="churn_label",
budget_milli_node_hours=1000,
)
Custom XGBoost Training
Python - Custom Training
from google.cloud import aiplatform
import xgboost as xgb
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import joblib
# Training script (save as train.py)
def train_xgboost():
# Load data from BigQuery
from google.cloud import bigquery
client = bigquery.Client()
df = client.query("""
SELECT * FROM `your-project.prediction_data.features`
""").to_dataframe()
# Split data
X = df.drop(['target'], axis=1)
y = df['target']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2
)
# Scale features
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
# Train XGBoost
model = xgb.XGBRegressor(
objective='reg:squarederror',
max_depth=6,
learning_rate=0.1,
n_estimators=100,
)
model.fit(X_train, y_train, eval_set=[(X_test, y_test)])
# Save model
model.save_model('model.bst')
joblib.dump(scaler, 'scaler.pkl')
# Upload to GCS
from google.cloud import storage
bucket = storage.Client().bucket('your-bucket')
for file in ['model.bst', 'scaler.pkl']:
blob = bucket.blob(f'models/xgb/{file}')
blob.upload_from_filename(file)
# Create custom training job
aiplatform.init(project="your-project-id")
job = aiplatform.CustomTrainingJob(
display_name="xgboost-forecaster",
script_path="train.py",
container_uri="gcr.io/cloud-aiplatform/training/sklearn-cpu.0-24",
requirements=["xgboost", "scikit-learn"],
)
model = job.run(
replica_count=1,
machine_type="n1-standard-4",
)
Feature Engineering Pipeline
Python - Automated Features
from google.cloud import aiplatform
import pandas as pd
from sklearn.preprocessing import PolynomialFeatures
# Feature engineering function
def create_features(df):
# Lag features
df['revenue_lag_1'] = df['revenue'].shift(1)
df['revenue_lag_7'] = df['revenue'].shift(7)
# Rolling averages
df['revenue_ma_7'] = df['revenue'].rolling(7).mean()
df['revenue_ma_30'] = df['revenue'].rolling(30).mean()
# Growth rates
df['revenue_growth'] = df['revenue'].pct_change()
# Seasonality (if applicable)
df['month'] = df.index.month
df['quarter'] = df.index.quarter
df['day_of_week'] = df.index.dayofweek
return df.dropna()
# Apply features and save to BigQuery
df = pd.read_csv('gs://bucket/sales_data.csv')
df_with_features = create_features(df)
# Upload to BigQuery
from google.cloud import bigquery
client = bigquery.Client()
client.load_table_from_dataframe(
df_with_features,
"your-project.analytics.sales_with_features"
).result()
# Now use this for training
dataset = aiplatform.TabularDataset.create(
display_name="sales-data-engineered",
bq_source="bq://your-project.analytics.sales_with_features",
)
Real-Time Predictions
Python - Online Prediction
from google.cloud import aiplatform
import json
# Deploy model to endpoint
endpoint = model.deploy(
machine_type="n1-standard-2",
traffic_percentage=100,
)
# Make prediction
instance = [{
"customer_tenure_months": 24,
"monthly_charges": 65.5,
"total_charges": 1570.20,
"internet_service_type": "Fiber optic",
"contract_type": "Month-to-month",
}]
predictions = endpoint.predict(instances=instance)
for prediction in predictions.predictions:
print(f"Churn probability: {prediction[0]:.2%}")
print(f"Confidence: {max(prediction):.2%}")
Model Explainability
Python - Feature Importance
from google.cloud import aiplatform
# Get model
model = aiplatform.Model("your-model-resource-id")
# Get explanations
explanations = endpoint.explain(instances=instance)
# Feature importance
for explanation in explanations:
print("Feature Attributions:")
for attr in explanation.attributions:
print(f"{attr.baseline_output_value} -> {attr.instance_output_value}")
for feature in attr.feature_attributions:
print(f" {feature.feature_name}: {feature.attribution_value}")
# Evaluation metrics
model_eval = model.get_model_evaluation()
print(f"Model Accuracy: {model_eval.metrics.get('accuracy', 'N/A')}")
print(f"AUC-ROC: {model_eval.metrics.get('auRoc', 'N/A')}")
print(f"Precision: {model_eval.metrics.get('precision', 'N/A')}")
print(f"Recall: {model_eval.metrics.get('recall', 'N/A')}")
Best Practices
- Start with data exploration and quality checks
- Handle missing values and outliers appropriately
- Use proper train/validation/test splits
- Scale features consistently
- Monitor model drift in production
- Retrain regularly with new data
- Use proper evaluation metrics for your problem