G2[Azure Policy
Compliance]
G3[Audit Logs
Activity Tracking]
end
A1 --> B1
A2 --> B1
A3 --> B1
B1 --> C1
B2 --> C1
B3 --> C1
C1 --> D1
C2 --> D1
C3 --> D1
C4 --> D1
D1 --> E1
D2 --> E1
D3 --> E1
E1 --> F1
E2 --> F2
E3 --> F3
F1 --> C1
F2 --> C1
F3 --> C1
G1 --> C1
G2 --> D1
G3 --> F1
Architecture Overview: Architecture Layers:
import pandas as pd import numpy as np from typing import Dict, List
def assess_data_quality(df: pd.DataFrame) -> Dict[str, any]:
"""
Comprehensive data quality assessment
"""
report = {
'total_rows': len(df),
'total_columns': len(df.columns),
'memory_usage_mb': df.memory_usage(deep=True).sum() / 1024**2,
'missing_values': {},
'duplicates': df.duplicated().sum(),
'duplicate_percentage': (df.duplicated().sum() / len(df)) * 100,
'numeric_columns': df.select_dtypes(include=[np.number]).columns.tolist(),
'categorical_columns': df.select_dtypes(include=['object', 'category']).columns.tolist(),
'datetime_columns': df.select_dtypes(include=['datetime64']).columns.tolist(),
}
# Missing value analysis
for col in df.columns:
missing_count = df[col].isnull().sum()
if missing_count > 0:
report['missing_values'][col] = {
'count': int(missing_count),
'percentage': round((missing_count / len(df)) * 100, 2)
}
## Numeric column statistics
report['numeric_stats'] = {}
for col in report['numeric_columns']:
report['numeric_stats'][col] = {
'mean': float(df[col].mean()),
'std': float(df[col].std()),
'min': float(df[col].min()),
'max': float(df[col].max()),
'outliers': int(((df[col] < df[col].quantile(0.01)) |
(df[col] > df[col].quantile(0.99))).sum())
}
## Categorical column statistics
report['categorical_stats'] = {}
for col in report['categorical_columns']:
value_counts = df[col].value_counts()
report['categorical_stats'][col] = {
'unique_values': int(df[col].nunique()),
'most_common': str(value_counts.index[0]) if len(value_counts) > 0 else None,
'most_common_count': int(value_counts.iloc[0]) if len(value_counts) > 0 else 0,
'cardinality_ratio': round(df[col].nunique() / len(df), 3)
}
return report
Example usage
df = pd.read_csv('customer_data.csv') quality_report = assess_data_quality(df) print(f"Dataset: {quality_report['total_rows']:,} rows, {quality_report['total_columns']} columns") print(f"Missing values: {len(quality_report['missing_values'])} columns affected") print(f"Duplicates: {quality_report['duplicates']:,} ({quality_report['duplicate_percentage']:.2f}%)")
## Handling Missing Values
Different imputation strategies for different scenarios:
```python
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
def handle_missing_values(df: pd.DataFrame, strategy: str = 'auto') -> pd.DataFrame:
```sql
"""
Handle missing values with multiple strategies
Parameters:
- strategy: 'mean', 'median', 'mode', 'knn', 'iterative', 'auto'
"""
df_imputed = df.copy()
numeric_cols = df.select_dtypes(include=[np.number]).columns
categorical_cols = df.select_dtypes(include=['object', 'category']).columns
if strategy == 'auto':
# Numeric: use median for skewed distributions, mean for normal
for col in numeric_cols:
if df[col].skew() > 1: # Skewed distribution
imputer = SimpleImputer(strategy='median')
else: # Normal distribution
imputer = SimpleImputer(strategy='mean')
df_imputed[col] = imputer.fit_transform(df[[col]])
# Categorical: use most frequent
for col in categorical_cols:
imputer = SimpleImputer(strategy='most_frequent')
df_imputed[col] = imputer.fit_transform(df[[col]]).ravel()
elif strategy == 'knn':
# KNN imputation (considers feature relationships)
imputer = KNNImputer(n_neighbors=5, weights='distance')
df_imputed[numeric_cols] = imputer.fit_transform(df[numeric_cols])
elif strategy == 'iterative':
# Iterative imputation (MICE algorithm)
imputer = IterativeImputer(max_iter=10, random_state=42)
df_imputed[numeric_cols] = imputer.fit_transform(df[numeric_cols])
else:
# Simple strategy (mean, median, mode)
numeric_imputer = SimpleImputer(strategy=strategy if strategy in ['mean', 'median'] else 'median')
df_imputed[numeric_cols] = numeric_imputer.fit_transform(df[numeric_cols])
categorical_imputer = SimpleImputer(strategy='most_frequent')
for col in categorical_cols:
df_imputed[col] = categorical_imputer.fit_transform(df[[col]]).ravel()
return df_imputed
Example usage
Figure: Configuration and management dashboard with status overview.
df_clean = handle_missing_values(df, strategy='auto')
## Feature Engineering Patterns
Transform raw data into predictive features:
```python
from sklearn.preprocessing import StandardScaler, MinMaxScaler, LabelEncoder, OneHotEncoder
from sklearn.preprocessing import PolynomialFeatures, PowerTransformer
import category_encoders as ce # pip install category-encoders
class FeatureEngineer:
```python
"""
Comprehensive feature engineering pipeline
"""
def __init__(self):
self.scalers = {}
self.encoders = {}
self.feature_names = []
def create_date_features(self, df: pd.DataFrame, date_column: str) -> pd.DataFrame:
"""Extract temporal features from datetime"""
df = df.copy()
df[date_column] = pd.to_datetime(df[date_column])
df[f'{date_column}_year'] = df[date_column].dt.year
df[f'{date_column}_month'] = df[date_column].dt.month
df[f'{date_column}_day'] = df[date_column].dt.day
df[f'{date_column}_dayofweek'] = df[date_column].dt.dayofweek
df[f'{date_column}_quarter'] = df[date_column].dt.quarter
df[f'{date_column}_is_weekend'] = df[date_column].dt.dayofweek.isin([5, 6]).astype(int)
df[f'{date_column}_is_month_start'] = df[date_column].dt.is_month_start.astype(int)
df[f'{date_column}_is_month_end'] = df[date_column].dt.is_month_end.astype(int)
return df
def create_interaction_features(self, df: pd.DataFrame,
feature_pairs: List[tuple]) -> pd.DataFrame:
"""Create feature interactions (multiplication, division, etc.)"""
df = df.copy()
for feat1, feat2 in feature_pairs:
# Multiplicative interaction
df[f'{feat1}_x_{feat2}'] = df[feat1] * df[feat2]
# Ratio (avoid division by zero)
df[f'{feat1}_div_{feat2}'] = df[feat1] / (df[feat2] + 1e-8)
# Difference
df[f'{feat1}_minus_{feat2}'] = df[feat1] - df[feat2]
return df
def create_aggregation_features(self, df: pd.DataFrame,
group_cols: List[str],
agg_cols: List[str]) -> pd.DataFrame:
"""Create aggregation features (group-by statistics)"""
df = df.copy()
for agg_col in agg_cols:
for group_col in group_cols:
# Mean
df[f'{agg_col}_mean_by_{group_col}'] = df.groupby(group_col)[agg_col].transform('mean')
# Std
df[f'{agg_col}_std_by_{group_col}'] = df.groupby(group_col)[agg_col].transform('std')
# Max/Min
df[f'{agg_col}_max_by_{group_col}'] = df.groupby(group_col)[agg_col].transform('max')
df[f'{agg_col}_min_by_{group_col}'] = df.groupby(group_col)[agg_col].transform('min')
# Rank
df[f'{agg_col}_rank_by_{group_col}'] = df.groupby(group_col)[agg_col].rank(pct=True)
return df
def encode_categorical(self, df: pd.DataFrame,
categorical_cols: List[str],
method: str = 'target') -> pd.DataFrame:
"""
Encode categorical variables
Methods:
- 'onehot': One-hot encoding (for low cardinality < 10)
- 'label': Label encoding (for ordinal features)
- 'target': Target encoding (for high cardinality)
- 'frequency': Frequency encoding
"""
df = df.copy()
for col in categorical_cols:
if method == 'onehot':
encoder = OneHotEncoder(sparse=False, handle_unknown='ignore')
encoded = encoder.fit_transform(df[[col]])
encoded_df = pd.DataFrame(
encoded,
columns=[f'{col}_{cat}' for cat in encoder.categories_[0]]
)
df = pd.concat([df.drop(col, axis=1), encoded_df], axis=1)
self.encoders[col] = encoder
elif method == 'label':
encoder = LabelEncoder()
df[f'{col}_encoded'] = encoder.fit_transform(df[col])
self.encoders[col] = encoder
elif method == 'target':
# Target encoding (requires target variable)
encoder = ce.TargetEncoder(cols=[col])
df[f'{col}_encoded'] = encoder.fit_transform(df[col], df['target'])
self.encoders[col] = encoder
elif method == 'frequency':
freq = df[col].value_counts(normalize=True).to_dict()
df[f'{col}_freq'] = df[col].map(freq)
return df
def scale_features(self, df: pd.DataFrame,
numeric_cols: List[str],
method: str = 'standard') -> pd.DataFrame:
"""
Scale numeric features
Methods:
- 'standard': StandardScaler (mean=0, std=1)
- 'minmax': MinMaxScaler (range 0-1)
- 'robust': RobustScaler (median=0, IQR=1, handles outliers)
- 'power': PowerTransformer (Yeo-Johnson, makes data more Gaussian)
"""
df = df.copy()
if method == 'standard':
scaler = StandardScaler()
elif method == 'minmax':
scaler = MinMaxScaler()
elif method == 'robust':
from sklearn.preprocessing import RobustScaler
scaler = RobustScaler()
elif method == 'power':
scaler = PowerTransformer(method='yeo-johnson')
df[numeric_cols] = scaler.fit_transform(df[numeric_cols])
self.scalers['numeric'] = scaler
return df
def create_polynomial_features(self, df: pd.DataFrame,
numeric_cols: List[str],
degree: int = 2) -> pd.DataFrame:
"""Create polynomial and interaction features"""
df = df.copy()
poly = PolynomialFeatures(degree=degree, include_bias=False)
poly_features = poly.fit_transform(df[numeric_cols])
poly_df = pd.DataFrame(
poly_features,
columns=poly.get_feature_names_out(numeric_cols)
)
df = pd.concat([df.drop(numeric_cols, axis=1), poly_df], axis=1)
self.feature_names = poly_df.columns.tolist()
return df
Example comprehensive feature engineering
engineer = FeatureEngineer()
Load data
df = pd.read_csv('transactions.csv')
Handle missing values
df = handle_missing_values(df, strategy='auto')
Date features
df = engineer.create_date_features(df, 'transaction_date')
Interaction features
Figure: Connector browser – actions with dynamic content picker.
df = engineer.create_interaction_features(df, [
('amount', 'quantity'),
('price', 'discount')```
])
## Aggregation features (customer-level statistics)
df = engineer.create_aggregation_features(
```text
df,
group_cols=['customer_id', 'product_category'],
agg_cols=['amount', 'quantity']```
)
## Encode categorical
df = engineer.encode_categorical(
```text
df,
categorical_cols=['product_category', 'region'],
method='target'```
)
## Scale numeric features
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
df = engineer.scale_features(df, numeric_cols, method='standard')
print(f"Final feature count: {len(df.columns)}")
```text
## Feature Selection
Remove irrelevant or redundant features to improve model performance and reduce overfitting:
```python
from sklearn.feature_selection import SelectKBest, f_classif, mutual_info_classif
from sklearn.feature_selection import RFE
from sklearn.ensemble import RandomForestClassifier
import matplotlib.pyplot as plt
import seaborn as sns
def select_features_statistical(X, y, k=20, method='f_classif'):
```text
"""Statistical feature selection"""
if method == 'f_classif':
selector = SelectKBest(score_func=f_classif, k=k)
else: # mutual_info
selector = SelectKBest(score_func=mutual_info_classif, k=k)
X_selected = selector.fit_transform(X, y)
selected_features = X.columns[selector.get_support()].tolist()
feature_scores = pd.DataFrame({
'feature': X.columns,
'score': selector.scores_
}).sort_values('score', ascending=False)
return X_selected, selected_features, feature_scores
def select_features_model_based(X, y, n_features=20):
"""Model-based feature selection using Random Forest"""
rf = RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1)
rf.fit(X, y)
feature_importance = pd.DataFrame({
'feature': X.columns,
'importance': rf.feature_importances_
}).sort_values('importance', ascending=False)
selected_features = feature_importance.head(n_features)['feature'].tolist()
X_selected = X[selected_features]
return X_selected, selected_features, feature_importance
def select_features_rfe(X, y, n_features=20):
"""Recursive Feature Elimination"""
estimator = RandomForestClassifier(n_estimators=50, random_state=42)
rfe = RFE(estimator, n_features_to_select=n_features, step=5)
rfe.fit(X, y)
selected_features = X.columns[rfe.support_].tolist()
X_selected = X[selected_features]
feature_ranking = pd.DataFrame({
'feature': X.columns,
'ranking': rfe.ranking_,
'selected': rfe.support_
}).sort_values('ranking')
return X_selected, selected_features, feature_ranking
Example: Feature selection workflow
X = df.drop('target', axis=1) y = df['target']
Method 1: Statistical (fast, univariate)
X_stat, features_stat, scores_stat = select_features_statistical(X, y, k=30) print(f"Statistical selection: {len(features_stat)} features")
Method 2: Model-based (considers feature interactions)
X_model, features_model, importance_model = select_features_model_based(X, y, n_features=30) print(f"Model-based selection: {len(features_model)} features")
Method 3: RFE (expensive but comprehensive)
X_rfe, features_rfe, ranking_rfe = select_features_rfe(X, y, n_features=30) print(f"RFE selection: {len(features_rfe)} features")
Intersection of all three methods (most robust features)
final_features = list(set(features_stat) & set(features_model) & set(features_rfe)) print(f"Consensus features: {len(final_features)}")
## Model Training with Scikit-Learn
### Train-Test Split & Cross-Validation
Proper data splitting prevents overfitting and provides reliable performance estimates:
```python
from sklearn.model_selection import train_test_split, KFold, StratifiedKFold, cross_val_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import numpy as np
## Prepare data
X = df.drop('target', axis=1)
y = df['target']
## Method 1: Simple train-test split (70/30 or 80/20)
X_train, X_test, y_train, y_test = train_test_split(
```python
X, y,
test_size=0.2, # 80% train, 20% test
stratify=y, # Maintain class distribution
random_state=42```
)
print(f"Training set: {X_train.shape[0]} samples")
print(f"Test set: {X_test.shape[0]} samples")
print(f"Class distribution - Train: {y_train.value_counts().to_dict()}")
print(f"Class distribution - Test: {y_test.value_counts().to_dict()}")
## Method 2: Train-validation-test split (60/20/20)
X_train_full, X_test, y_train_full, y_test = train_test_split(
```text
X, y, test_size=0.2, stratify=y, random_state=42```
)
X_train, X_val, y_train, y_val = train_test_split(
```text
X_train_full, y_train_full, test_size=0.25, stratify=y_train_full, random_state=42```
)
print(f"Training: {X_train.shape[0]} samples")
print(f"Validation: {X_val.shape[0]} samples")
print(f"Test: {X_test.shape[0]} samples")
## Method 3: K-Fold Cross-Validation (more robust performance estimate)
kfold = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
model = RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42)
cv_scores = cross_val_score(model, X, y, cv=kfold, scoring='accuracy', n_jobs=-1)
print(f"Cross-validation scores: {cv_scores}")
print(f"Mean CV accuracy: {cv_scores.mean():.4f} (+/- {cv_scores.std() * 2:.4f})")
```text
## Training Multiple Algorithms
Compare multiple algorithms to identify the best performer:
```python
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.naive_bayes import GaussianNB
from sklearn.neighbors import KNeighborsClassifier
from xgboost import XGBClassifier
import time
def train_and_evaluate_models(X_train, X_test, y_train, y_test):
```python
"""
Train multiple models and compare performance
"""
models = {
'Logistic Regression': LogisticRegression(max_iter=1000, random_state=42),
'Decision Tree': DecisionTreeClassifier(max_depth=10, random_state=42),
'Random Forest': RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42),
'Gradient Boosting': GradientBoostingClassifier(n_estimators=100, learning_rate=0.1, random_state=42),
'XGBoost': XGBClassifier(n_estimators=100, learning_rate=0.1, random_state=42, use_label_encoder=False),
'SVM': SVC(kernel='rbf', random_state=42),
'Naive Bayes': GaussianNB(),
'KNN': KNeighborsClassifier(n_neighbors=5)
}
results = []
for name, model in models.items():
print(f"Training {name}...")
start_time = time.time()
# Train
model.fit(X_train, y_train)
train_time = time.time() - start_time
# Predict
start_time = time.time()
y_pred = model.predict(X_test)
inference_time = (time.time() - start_time) / len(X_test) * 1000 # ms per sample
# Evaluate
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, average='weighted')
recall = recall_score(y_test, y_pred, average='weighted')
f1 = f1_score(y_test, y_pred, average='weighted')
results.append({
'Model': name,
'Accuracy': round(accuracy, 4),
'Precision': round(precision, 4),
'Recall': round(recall, 4),
'F1-Score': round(f1, 4),
'Train Time (s)': round(train_time, 2),
'Inference (ms)': round(inference_time, 3)
})
results_df = pd.DataFrame(results).sort_values('F1-Score', ascending=False)
return results_df
Train and compare
results = train_and_evaluate_models(X_train, X_test, y_train, y_test) print("\n=== Model Comparison ===") print(results.to_string(index=False))
Select best model
best_model_name = results.iloc[0]['Model'] print(f"\nBest model: {best_model_name}")
## Advanced Model Training with Class Imbalance
Handle imbalanced datasets (common in fraud detection, rare disease prediction):
```python
from sklearn.utils import class_weight
from imblearn.over_sampling import SMOTE, ADASYN
from imblearn.under_sampling import RandomUnderSampler
from imblearn.combine import SMOTETomek
from collections import Counter
## Check class distribution
print(f"Original class distribution: {Counter(y_train)}")
## Method 1: Class weights (built into most sklearn models)
class_weights = class_weight.compute_class_weight(
```text
'balanced',
classes=np.unique(y_train),
y=y_train```
)
class_weight_dict = dict(zip(np.unique(y_train), class_weights))
print(f"Class weights: {class_weight_dict}")
model_weighted = RandomForestClassifier(
```text
n_estimators=100,
class_weight=class_weight_dict,
random_state=42```
)
model_weighted.fit(X_train, y_train)
## Method 2: SMOTE (Synthetic Minority Over-sampling)
smote = SMOTE(sampling_strategy='auto', random_state=42)
X_train_smote, y_train_smote = smote.fit_resample(X_train, y_train)
print(f"After SMOTE: {Counter(y_train_smote)}")
model_smote = RandomForestClassifier(n_estimators=100, random_state=42)
model_smote.fit(X_train_smote, y_train_smote)
## Method 3: Combined SMOTE + Tomek Links (removes noisy samples)
smote_tomek = SMOTETomek(random_state=42)
X_train_combined, y_train_combined = smote_tomek.fit_resample(X_train, y_train)
print(f"After SMOTE+Tomek: {Counter(y_train_combined)}")
model_combined = RandomForestClassifier(n_estimators=100, random_state=42)
model_combined.fit(X_train_combined, y_train_combined)
## Compare approaches on imbalanced metrics
from sklearn.metrics import classification_report
print("\n=== Model with Class Weights ===")
y_pred_weighted = model_weighted.predict(X_test)
print(classification_report(y_test, y_pred_weighted))
print("\n=== Model with SMOTE ===")
y_pred_smote = model_smote.predict(X_test)
print(classification_report(y_test, y_pred_smote))
print("\n=== Model with SMOTE+Tomek ===")
y_pred_combined = model_combined.predict(X_test)
print(classification_report(y_test, y_pred_combined))
```text
## Hyperparameter Tuning
Systematic optimization of model hyperparameters can improve performance by 5-15%:
### Grid Search (Exhaustive)
```python
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
from sklearn.ensemble import RandomForestClassifier
import numpy as np
## Define parameter grid
param_grid = {
```text
'n_estimators': [50, 100, 200, 300],
'max_depth': [5, 10, 15, 20, None],
'min_samples_split': [2, 5, 10, 20],
'min_samples_leaf': [1, 2, 4, 8],
'max_features': ['sqrt', 'log2', None],
'bootstrap': [True, False]```
}
## Grid search with cross-validation
grid_search = GridSearchCV(
```text
estimator=RandomForestClassifier(random_state=42, n_jobs=-1),
param_grid=param_grid,
cv=5,
scoring='f1_weighted',
n_jobs=-1,
verbose=2,
return_train_score=True```
)
print(f"Testing {len(param_grid['n_estimators']) * len(param_grid['max_depth']) * len(param_grid['min_samples_split']) * len(param_grid['min_samples_leaf']) * len(param_grid['max_features']) * len(param_grid['bootstrap'])} combinations...")
grid_search.fit(X_train, y_train)
print(f"\nBest parameters: {grid_search.best_params_}")
print(f"Best CV score: {grid_search.best_score_:.4f}")
## Train final model with best parameters
best_model = grid_search.best_estimator_
y_pred = best_model.predict(X_test)
print(f"Test accuracy: {accuracy_score(y_test, y_pred):.4f}")
```text
## Randomized Search (Faster)
For large parameter spaces, randomized search is more efficient:
```python
from scipy.stats import randint, uniform
## Define parameter distributions
param_distributions = {
```text
'n_estimators': randint(50, 500),
'max_depth': randint(5, 50),
'min_samples_split': randint(2, 20),
'min_samples_leaf': randint(1, 10),
'max_features': ['sqrt', 'log2', None],
'bootstrap': [True, False]```
}
## Randomized search
random_search = RandomizedSearchCV(
```text
estimator=RandomForestClassifier(random_state=42, n_jobs=-1),
param_distributions=param_distributions,
n_iter=100, # Number of random combinations to try
cv=5,
scoring='f1_weighted',
n_jobs=-1,
verbose=2,
random_state=42,
return_train_score=True```
)
random_search.fit(X_train, y_train)
print(f"\nBest parameters: {random_search.best_params_}")
print(f"Best CV score: {random_search.best_score_:.4f}")
## Evaluate
y_pred = random_search.best_estimator_.predict(X_test)
print(f"Test accuracy: {accuracy_score(y_test, y_pred):.4f}")
```text
## Bayesian Optimization (Most Efficient)
```python
from skopt import BayesSearchCV
from skopt.space import Real, Integer
## Define search space
search_spaces = {
```text
'n_estimators': Integer(50, 500),
'max_depth': Integer(5, 50),
'min_samples_split': Integer(2, 20),
'min_samples_leaf': Integer(1, 10),
'max_features': ['sqrt', 'log2'],
'learning_rate': Real(0.01, 0.3, prior='log-uniform') # For gradient boosting```
}
## Bayesian optimization
bayes_search = BayesSearchCV(
```text
estimator=GradientBoostingClassifier(random_state=42),
search_spaces=search_spaces,
n_iter=50,
cv=5,
scoring='f1_weighted',
n_jobs=-1,
verbose=2,
random_state=42```
)
bayes_search.fit(X_train, y_train)
print(f"\nBest parameters: {bayes_search.best_params_}")
print(f"Best CV score: {bayes_search.best_score_:.4f}")
```text
## Azure Machine Learning Training
Azure ML provides enterprise-grade infrastructure for distributed training, experiment tracking, and model management:
### Azure ML Workspace Setup
```bash
## Create Azure ML workspace using Azure CLI
az ml workspace create \
```text
--name ml-workspace \
--resource-group ml-rg \
--location eastus
Create compute cluster for training
az ml compute create \
--name cpu-cluster \
--type AmlCompute \
--min-instances 0 \
--max-instances 4 \
--size Standard_DS3_v2 \
--resource-group ml-rg \
--workspace-name ml-workspace
Create GPU cluster for deep learning
Figure: Failover Cluster Manager – node status, roles, and quorum config.
az ml compute create \
--name gpu-cluster \
--type AmlCompute \
--min-instances 0 \
--max-instances 2 \
--size Standard_NC6 \
--resource-group ml-rg \
--workspace-name ml-workspace
## Azure ML Python SDK V2 Training
```python
from azure.ai.ml import MLClient, command, Input
from azure.ai.ml.entities import Environment, AmlCompute
from azure.identity import DefaultAzureCredential
from azure.ai.ml.constants import AssetTypes
import os
## Connect to workspace
ml_client = MLClient(
```text
credential=DefaultAzureCredential(),
subscription_id="blog-subscription-id",
resource_group_name="ml-rg",
workspace_name="ml-workspace"```
)
## Define training job
job = command(
```python
code="./src", # Local folder containing training script
command="python train.py --data-path ${{inputs.training_data}} --epochs ${{inputs.epochs}} --lr ${{inputs.learning_rate}}",
inputs={
"training_data": Input(type=AssetTypes.URI_FOLDER, path="azureml://datastores/workspaceblobstore/paths/training_data/"),
"epochs": 50,
"learning_rate": 0.001
},
environment="AzureML-sklearn-1.0@latest", # Curated environment
compute="cpu-cluster",
display_name="rf-training-run",
description="Random Forest training with hyperparameter tuning",
experiment_name="customer-churn-prediction",
tags={"model_type": "random_forest", "version": "1.0"}```
)
## Submit job
returned_job = ml_client.jobs.create_or_update(job)
print(f"Job submitted: {returned_job.name}")
print(f"Studio URL: {returned_job.studio_url}")
## Wait for completion
ml_client.jobs.stream(returned_job.name)
```text
## Training Script with MLflow Tracking
```python
## src/train.py - Training script with Azure ML integration
import argparse
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import pandas as pd
import joblib
import os
def parse_args():
```text
parser = argparse.ArgumentParser()
parser.add_argument("--data-path", type=str, required=True, help="Path to training data")
parser.add_argument("--epochs", type=int, default=100, help="Number of estimators")
parser.add_argument("--lr", type=float, default=0.1, help="Learning rate (not used for RF)")
parser.add_argument("--max-depth", type=int, default=10, help="Max tree depth")
parser.add_argument("--output-model", type=str, default="./outputs/model.pkl", help="Output model path")
return parser.parse_args()
def main():
args = parse_args()
## Enable autologging
mlflow.sklearn.autolog()
## Load data
print(f"Loading data from {args.data_path}")
df = pd.read_csv(os.path.join(args.data_path, "train.csv"))
X = df.drop('target', axis=1)
y = df['target']
## Split data
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)
print(f"Training samples: {len(X_train)}, Validation samples: {len(X_val)}")
## Train model
print("Training Random Forest model...")
model = RandomForestClassifier(
n_estimators=args.epochs,
max_depth=args.max_depth,
random_state=42,
n_jobs=-1
)
model.fit(X_train, y_train)
## Evaluate
y_pred = model.predict(X_val)
accuracy = accuracy_score(y_val, y_pred)
precision = precision_score(y_val, y_pred, average='weighted')
recall = recall_score(y_val, y_pred, average='weighted')
f1 = f1_score(y_val, y_pred, average='weighted')
## Log metrics
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("precision", precision)
mlflow.log_metric("recall", recall)
mlflow.log_metric("f1_score", f1)
## Log parameters
mlflow.log_param("n_estimators", args.epochs)
mlflow.log_param("max_depth", args.max_depth)
mlflow.log_param("train_samples", len(X_train))
print(f"Accuracy: {accuracy:.4f}")
print(f"F1-Score: {f1:.4f}")
## Save model
os.makedirs(os.path.dirname(args.output_model), exist_ok=True)
joblib.dump(model, args.output_model)
print(f"Model saved to {args.output_model}")
## Register model
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="model",
registered_model_name="customer-churn-rf"
)
if name == "main":
main()
## Hyperparameter Tuning with Azure ML Sweep
```python
from azure.ai.ml.sweep import Choice, Uniform, RandomSamplingAlgorithm, BanditPolicy
## Define sweep job for hyperparameter tuning
sweep_job = command(
```python
code="./src",
command="python train.py --data-path ${{inputs.training_data}} --epochs ${{inputs.epochs}} --max-depth ${{inputs.max_depth}}",
inputs={
"training_data": Input(type=AssetTypes.URI_FOLDER, path="azureml://datastores/workspaceblobstore/paths/training_data/"),
"epochs": Choice([50, 100, 200, 300]),
"max_depth": Choice([5, 10, 15, 20, 25])
},
environment="AzureML-sklearn-1.0@latest",
compute="cpu-cluster",
experiment_name="customer-churn-sweep"```
)
## Configure sweep
sweep_job = sweep_job.sweep(
```text
sampling_algorithm=RandomSamplingAlgorithm(),
primary_metric="f1_score",
goal="maximize",
max_total_trials=20,
max_concurrent_trials=4,
early_termination_policy=BanditPolicy(
evaluation_interval=2,
slack_factor=0.1,
delay_evaluation=5
)```
)
## Submit sweep
sweep_run = ml_client.jobs.create_or_update(sweep_job)
print(f"Sweep job submitted: {sweep_run.name}")
## Get best trial
best_trial = ml_client.jobs.get(sweep_run.name)
print(f"Best trial: {best_trial.properties.get('best_child_run_id')}")
```text
## AutoML for Automated Model Selection
Azure AutoML automatically tries multiple algorithms and hyperparameters:
```python
from azure.ai.ml import automl
from azure.ai.ml.constants import AssetTypes
## Configure AutoML classification job
automl_job = automl.classification(
```text
compute="cpu-cluster",
experiment_name="customer-churn-automl",
training_data=Input(type=AssetTypes.MLTABLE, path="azureml://datastores/workspaceblobstore/paths/training_data/"),
target_column_name="target",
primary_metric="accuracy",
n_cross_validations=5,
enable_model_explainability=True,
enable_onnx_compatible_models=True,
tags={"project": "customer-churn", "approach": "automl"}```
)
## Set limits
automl_job.set_limits(
```text
timeout_minutes=120,
trial_timeout_minutes=20,
max_trials=20,
max_concurrent_trials=4,
enable_early_termination=True```
)
## Set training
automl_job.set_training(
```text
blocked_training_algorithms=["LogisticRegression"], # Exclude specific algorithms
enable_dnn_training=False,
enable_stack_ensemble=True,
enable_vote_ensemble=True```
)
## Set featurization
automl_job.set_featurization(
```text
mode="auto",
enable_dnn_featurization=False```
)
## Submit AutoML job
automl_run = ml_client.jobs.create_or_update(automl_job)
print(f"AutoML job submitted: {automl_run.name}")
print(f"Studio URL: {automl_run.studio_url}")
## Wait for completion and get best model
ml_client.jobs.stream(automl_run.name)
best_run = ml_client.jobs.get(automl_run.name)
print(f"Best model accuracy: {best_run.properties.get('best_primary_metric')}")
```text
## Model Evaluation Metrics
Selecting appropriate evaluation metrics is crucial for measuring model performance correctly:
### Classification Metrics
```python
from sklearn.metrics import (
```text
accuracy_score, precision_score, recall_score, f1_score,
confusion_matrix, classification_report, roc_auc_score, roc_curve,
precision_recall_curve, average_precision_score```
)
import matplotlib.pyplot as plt
import seaborn as sns
def evaluate_classification_model(y_true, y_pred, y_pred_proba=None):
```text
"""
Comprehensive classification evaluation
"""
## Basic metrics
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='weighted')
recall = recall_score(y_true, y_pred, average='weighted')
f1 = f1_score(y_true, y_pred, average='weighted')
print("=== Classification Metrics ===")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-Score: {f1:.4f}")
## Confusion matrix
cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title('Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.savefig('confusion_matrix.png')
print("\nConfusion Matrix saved to confusion_matrix.png")
## Classification report
print("\n=== Classification Report ===")
print(classification_report(y_true, y_pred))
## ROC-AUC (if probabilities available)
if y_pred_proba is not None:
roc_auc = roc_auc_score(y_true, y_pred_proba, multi_class='ovr')
print(f"\nROC-AUC Score: {roc_auc:.4f}")
# Plot ROC curve
fpr, tpr, _ = roc_curve(y_true, y_pred_proba)
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, label=f'ROC Curve (AUC = {roc_auc:.4f})')
plt.plot([0, 1], [0, 1], 'k--', label='Random Classifier')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend()
plt.savefig('roc_curve.png')
print("ROC Curve saved to roc_curve.png")
return {
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'f1_score': f1,
'confusion_matrix': cm
}
Example usage
Figure: Configuration and management dashboard with status overview.
metrics = evaluate_classification_model(y_test, y_pred, model.predict_proba(X_test)[:, 1])
**Metric Selection Guide:**
| Metric | Formula | Use When | Interpretation |
|--------|---------|----------|----------------|
| **Accuracy** | (TP+TN) / Total | Balanced classes, all errors equally costly | % of correct predictions |
| **Precision** | TP / (TP+FP) | False positives costly (spam filter) | Of predicted positives, % actually positive |
| **Recall** | TP / (TP+FN) | False negatives costly (cancer detection) | Of actual positives, % correctly identified |
| **F1-Score** | 2 × (Prec × Rec) / (Prec + Rec) | Balance precision/recall, imbalanced classes | Harmonic mean of precision/recall |
| **ROC-AUC** | Area under ROC curve | Compare models, probability calibration | Model discrimination ability (0.5-1.0) |
## Regression Metrics
```python
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score, mean_absolute_percentage_error
import numpy as np
def evaluate_regression_model(y_true, y_pred):
```text
"""
Comprehensive regression evaluation
"""
mae = mean_absolute_error(y_true, y_pred)
mse = mean_squared_error(y_true, y_pred)
rmse = np.sqrt(mse)
r2 = r2_score(y_true, y_pred)
mape = mean_absolute_percentage_error(y_true, y_pred) * 100
print("=== Regression Metrics ===")
print(f"MAE (Mean Absolute Error): ${mae:,.2f}")
print(f"MSE (Mean Squared Error): ${mse:,.2f}")
print(f"RMSE (Root Mean Squared Error): ${rmse:,.2f}")
print(f"R² Score: {r2:.4f}")
print(f"MAPE (Mean Absolute % Error): {mape:.2f}%")
## Residual plot
residuals = y_true - y_pred
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.scatter(y_pred, residuals, alpha=0.5)
plt.axhline(y=0, color='r', linestyle='--')
plt.xlabel('Predicted Values')
plt.ylabel('Residuals')
plt.title('Residual Plot')
plt.subplot(1, 2, 2)
plt.scatter(y_true, y_pred, alpha=0.5)
plt.plot([y_true.min(), y_true.max()], [y_true.min(), y_true.max()], 'r--', lw=2)
plt.xlabel('True Values')
plt.ylabel('Predicted Values')
plt.title('Predictions vs Actual')
plt.tight_layout()
plt.savefig('regression_evaluation.png')
print("\nPlots saved to regression_evaluation.png")
return {
'mae': mae,
'mse': mse,
'rmse': rmse,
'r2': r2,
'mape': mape
}
Example usage
Figure: Configuration and management dashboard with status overview.
reg_metrics = evaluate_regression_model(y_test, y_pred)
**Regression Metric Selection:**
| Metric | Formula | Use When | Interpretation |
|--------|---------|----------|----------------|
| **MAE** | Σ\|y_true - y_pred\| / n | Outliers shouldn't dominate | Average absolute error in original units |
| **MSE** | Σ(y_true - y_pred)² / n | Penalize large errors more | Squared error (same units as target²) |
| **RMSE** | √MSE | Want interpretable error in original units | Square root of MSE (original units) |
| **R²** | 1 - (SS_res / SS_tot) | Model comparison, variance explained | % of variance explained (0-1, higher better) |
| **MAPE** | Σ(\|y_true - y_pred\| / y_true) / n | Relative error matters | Average % error (scale-independent) |
## Model Deployment Patterns
### Azure ML Managed Online Endpoints
Real-time inference with automatic scaling and load balancing:
```python
from azure.ai.ml.entities import (
```text
ManagedOnlineEndpoint,
ManagedOnlineDeployment,
Model,
Environment,
CodeConfiguration```
)
from azure.ai.ml.constants import AssetTypes
## Register model
model = Model(
```text
path="./outputs/model.pkl",
type=AssetTypes.CUSTOM_MODEL,
name="customer-churn-rf",
description="Random Forest for customer churn prediction",
tags={"framework": "sklearn", "version": "1.0"}```
)
registered_model = ml_client.models.create_or_update(model)
## Create endpoint
endpoint = ManagedOnlineEndpoint(
```text
name="churn-prediction-endpoint",
description="Customer churn prediction service",
auth_mode="key", # or "aml_token" for Azure AD authentication
tags={"project": "customer-churn", "env": "production"}```
)
endpoint_result = ml_client.online_endpoints.begin_create_or_update(endpoint).result()
print(f"Endpoint created: {endpoint_result.name}")
## Create scoring script (score.py)
scoring_script = """
import joblib
import json
import numpy as np
def init():
```text
global model
model_path = os.path.join(os.getenv('AZUREML_MODEL_DIR'), 'model.pkl')
model = joblib.load(model_path)
print("Model loaded successfully")
def run(raw_data):
try:
data = json.loads(raw_data)['data']
data_array = np.array(data)
predictions = model.predict(data_array)
probabilities = model.predict_proba(data_array)
return {
'predictions': predictions.tolist(),
'probabilities': probabilities.tolist()
}
except Exception as e:
return {"error": str(e)}```
"""
## Create deployment
deployment = ManagedOnlineDeployment(
```text
name="blue",
endpoint_name="churn-prediction-endpoint",
model=registered_model.id,
instance_type="Standard_DS2_v2", # 2 vCPU, 7GB RAM
instance_count=2, # Minimum 2 instances for HA
code_configuration=CodeConfiguration(
code="./deployment",
scoring_script="score.py"
),
environment="AzureML-sklearn-1.0@latest",
request_settings={
"request_timeout_ms": 5000,
"max_concurrent_requests_per_instance": 1
},
liveness_probe={
"initial_delay": 10,
"period": 10,
"timeout": 2,
"success_threshold": 1,
"failure_threshold": 3
},
readiness_probe={
"initial_delay": 10,
"period": 10,
"timeout": 2,
"success_threshold": 1,
"failure_threshold": 3
}```
)
deployment_result = ml_client.online_deployments.begin_create_or_update(deployment).result()
print(f"Deployment created: {deployment_result.name}")
## Allocate 100% traffic to blue deployment
endpoint.traffic = {"blue": 100}
ml_client.online_endpoints.begin_create_or_update(endpoint).result()
## Get endpoint credentials
keys = ml_client.online_endpoints.get_keys(name="churn-prediction-endpoint")
print(f"Endpoint URL: {endpoint_result.scoring_uri}")
print(f"Primary key: {keys.primary_key}")
```text
## Testing Deployment
```python
import requests
import json
## Test endpoint
scoring_uri = endpoint_result.scoring_uri
api_key = keys.primary_key
headers = {
```text
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}'```
}
test_data = {
```text
'data': [
[35, 50000, 3, 12, 0.8], # Sample customer features
[42, 75000, 5, 24, 0.6]
]```
}
response = requests.post(scoring_uri, json=test_data, headers=headers)
print(f"Status: {response.status_code}")
print(f"Response: {response.json()}")
```text
## Blue-Green Deployment (Zero Downtime)
```python
## Create green deployment with new model version
green_deployment = ManagedOnlineDeployment(
```text
name="green",
endpoint_name="churn-prediction-endpoint",
model=new_model.id, # Updated model
instance_type="Standard_DS2_v2",
instance_count=2,
code_configuration=CodeConfiguration(
code="./deployment",
scoring_script="score.py"
),
environment="AzureML-sklearn-1.0@latest"```
)
ml_client.online_deployments.begin_create_or_update(green_deployment).result()
## Canary release: 10% traffic to green, 90% to blue
endpoint.traffic = {"blue": 90, "green": 10}
ml_client.online_endpoints.begin_create_or_update(endpoint).result()
## Monitor green deployment metrics...
## Full cutover to green
endpoint.traffic = {"green": 100}
ml_client.online_endpoints.begin_create_or_update(endpoint).result()
## Delete blue deployment (after verification)
ml_client.online_deployments.begin_delete(
```text
name="blue",
endpoint_name="churn-prediction-endpoint"```
).result()
```text
## Batch Endpoints (Scheduled Scoring)
For large-scale batch predictions:
```python
from azure.ai.ml.entities import BatchEndpoint, BatchDeployment, BatchRetrySettings
from azure.ai.ml.constants import BatchDeploymentOutputAction
## Create batch endpoint
batch_endpoint = BatchEndpoint(
```text
name="churn-batch-endpoint",
description="Batch scoring for customer churn"```
)
ml_client.batch_endpoints.begin_create_or_update(batch_endpoint).result()
## Create batch deployment
batch_deployment = BatchDeployment(
```text
name="production",
endpoint_name="churn-batch-endpoint",
model=registered_model.id,
compute="cpu-cluster",
instance_count=4,
max_concurrency_per_instance=2,
mini_batch_size=10,
output_action=BatchDeploymentOutputAction.APPEND_ROW,
output_file_name="predictions.csv",
retry_settings=BatchRetrySettings(max_retries=3, timeout=300),
logging_level="info",
code_configuration=CodeConfiguration(
code="./batch_deployment",
scoring_script="batch_score.py"
),
environment="AzureML-sklearn-1.0@latest"```
)
ml_client.batch_deployments.begin_create_or_update(batch_deployment).result()
## Invoke batch job
job = ml_client.batch_endpoints.invoke(
```text
endpoint_name="churn-batch-endpoint",
deployment_name="production",
input=Input(type=AssetTypes.URI_FOLDER, path="azureml://datastores/workspaceblobstore/paths/batch_data/")```
)
print(f"Batch job submitted: {job.name}")
```text
## Monitoring & Operations
### Key Performance Indicators (KPIs)
| KPI | Target | Measurement | Alert Threshold |
|-----|--------|-------------|-----------------|
| **Model Accuracy** | > 85% | Weekly evaluation on holdout set | < 80% |
| **Prediction Latency (P95)** | < 200ms | Application Insights metrics | > 500ms |
| **Throughput** | > 100 req/sec | Endpoint metrics | < 50 req/sec |
| **Error Rate** | < 1% | Failed requests / total requests | > 2% |
| **Data Drift** | < 10% | PSI (Population Stability Index) | > 15% |
| **Model Drift** | < 5% accuracy drop | Compare vs baseline | > 10% drop |
| **Cost per 1K Predictions** | < $0.50 | Azure Cost Management | > $1.00 |
| **Deployment Success Rate** | > 99% | Deployment pipeline metrics | < 95% |
### Application Insights Monitoring
```python
## Add Application Insights to deployment
from azure.ai.ml.entities import ProbeSettings
deployment = ManagedOnlineDeployment(
```text
name="blue",
endpoint_name="churn-prediction-endpoint",
model=registered_model.id,
instance_type="Standard_DS2_v2",
instance_count=2,
app_insights_enabled=True, # Enable Application Insights
environment_variables={
"APPLICATIONINSIGHTS_CONNECTION_STRING": "InstrumentationKey=xxx"
}```
)
```text
## KQL Queries for Monitoring
```kql
// Prediction latency (P50, P95, P99)
requests
| where cloud_RoleName == "churn-prediction-endpoint"
| summarize
```text
P50 = percentile(duration, 50),
P95 = percentile(duration, 95),
P99 = percentile(duration, 99),
Count = count()```
by bin(timestamp, 5m)
| render timechart
// Error rate over time
requests
| where cloud_RoleName == "churn-prediction-endpoint"
| summarize
```text
Total = count(),
Errors = countif(success == false),
ErrorRate = todouble(countif(success == false)) / count() * 100```
by bin(timestamp, 1h)
| render timechart
// Prediction distribution (detect data drift)
traces
| where message contains "prediction"
| extend prediction = toint(customDimensions.prediction)
| summarize count() by prediction, bin(timestamp, 1d)
| render columnchart
> **Architecture Overview:** ## ML Maturity Model
- Intended use, limitations, performance by subgroup
- Training data characteristics (time period, sample size, class distribution)
- Known biases and fairness considerations
DON'T ❌
-
Use Accuracy as Sole Metric
- Accuracy misleads with imbalanced data (99% accuracy detecting 1% fraud by predicting all negative)
- Always report precision, recall, F1-score, ROC-AUC for classification
- Use business metrics (cost of false positive vs false negative)
-
Skip Data Quality Checks
- Never train on data without profiling (missing values, outliers, duplicates)
- Avoid assuming data distributions are stable over time
- Don't ignore temporal dependencies in sequential data
-
Overfit to Test Set
- Never tune hyperparameters based on test set performance
- Avoid repeatedly evaluating on test set during development
- Don't select features based on test set correlations
-
Ignore Feature Engineering
- Raw features rarely perform best (engineer interactions, aggregations, temporal)
- Don't skip domain expertise (consult business stakeholders for feature ideas)
- Avoid high-cardinality categorical encoding without proper techniques
-
Deploy Without Monitoring
- Never deploy "fire-and-forget" models without drift detection
- Don't ignore production logs and error rates
- Avoid assuming model performance remains constant
-
Use Default Hyperparameters
- Default parameters rarely optimal (tune at least learning rate, regularization)
- Don't skip hyperparameter search entirely
- Avoid manual tuning without systematic search (Grid/Random/Bayesian)
-
Train on All Available Data
- Always hold out 15-20% for final test set (never used during development)
- Don't use future data for historical predictions (temporal leakage)
- Avoid contaminating validation set with training data
-
Neglect Model Explainability
- Black-box models create compliance risks (GDPR "right to explanation")
- Don't deploy models you can't debug when errors occur
- Avoid ignoring stakeholder concerns about transparency
-
Forget About Inference Cost
- Large models (neural networks) cost 10-100× more than simpler models
- Don't optimize only for accuracy without considering latency/cost
- Avoid complex feature engineering that slows inference
-
Skip Staging Environments
- Never deploy directly to production without staging validation
- Don't test only with synthetic data (use production-like data)
- Avoid assuming local testing is sufficient
> **Architecture Overview:** ## Validation and Versioning
## Architecture Decision and Tradeoffs
When designing AI/ML solutions with Azure AI Services, consider these key architectural trade-offs:
| Approach | Best For | Tradeoff |
|----------|----------|----------|
| Managed / platform service | Rapid delivery, reduced ops burden | Less customisation, potential vendor lock-in |
| Custom / self-hosted | Full control, advanced tuning | Higher operational overhead and cost |
> **Recommendation:** Start with the managed approach for most workloads and move to custom only when specific requirements demand it.
## Security and Governance Considerations
- **Least Privilege:** Grant only the permissions required for each role
- **Secret Management:** Store credentials in Azure Key Vault or equivalent; never hard-code secrets
- **Audit Logging:** Enable diagnostic and activity logs for compliance and forensic analysis
- **Data Protection:** Encrypt data at rest and in transit; classify data with sensitivity labels where applicable
## Cost and Performance Notes
- **Primary Cost Drivers:** Compute tier, storage volume, and network egress
- **Optimization Levers:** Right-size resources, use reserved instances or savings plans, and review Azure Advisor recommendations regularly
- **Performance Baseline:** Define SLAs, latency targets, and throughput thresholds before going live
- **Scaling Strategy:** Use auto-scale rules and monitor utilisation to balance cost and responsiveness
## Validation and Versioning
- **Last Validated:** April 2026
- **Tested With:** Current generally-available Azure AI Services APIs and SDKs
- **Known Constraints:** Check regional availability and service limits before production deployment
## Official Microsoft References
- [Microsoft Learn – Azure AI Services](https://learn.microsoft.com)
- [Azure AI Services Documentation](https://learn.microsoft.com)
- [Azure Architecture Center](https://learn.microsoft.com/azure/architecture/)
## Public Examples from Official Sources
- [Microsoft official samples on GitHub](https://github.com/Azure-Samples)
- [Microsoft Learn training modules](https://learn.microsoft.com/training/)
Discussion