Generic Cloud-Native Deployment and Infrastructure as Code patterns for 2025. Provides comprehensive implementation strategies for multi-cloud deployments, GitOps workflows, progressive delivery, and platform engineering. Framework-agnostic approach supporting any cloud provider, deployment tool, and orchestration platform.
name cloud-native description Generic Cloud-Native Deployment and Infrastructure as Code patterns for 2025. Provides comprehensive implementation strategies for multi-cloud deployments, GitOps workflows, progressive delivery, and platform engineering. Framework-agnostic approach supporting any cloud provider, deployment tool, and orchestration platform. license MIT Cloud-Native Deployment & Infrastructure Patterns This skill provides comprehensive patterns for implementing cloud-native deployments using modern Infrastructure as Code, GitOps workflows, and progressive delivery strategies. The patterns are designed to be framework-agnostic and applicable across any cloud provider or orchestration platform. When to Use This Skill Use this skill when you need to: Implement Infrastructure as Code with any provider (Terraform, Pulumi, CDK) Build GitOps workflows with Argo CD, Flux, or similar tools Create multi-cloud or hybrid deployment strategies Implement progressive delivery (canary, blue-green) Build internal developer platforms Set up policy as code and compliance automation Implement zero-trust security architectures Create disaster recovery and failover strategies Optimize costs across multiple providers Build observability into deployments
from abc import ABC, abstractmethod from typing import Dict , List , Any , Optional , Union from dataclasses import dataclass, field from enum import Enum from pathlib import Path import json import yaml import asyncio from datetime import datetime class IaCProvider ( str , Enum): """Supported IaC providers""" TERRAFORM = "terraform" PULUMI = "pulumi" CDK = "cdk" CROSSPLANE = "crossplane" class CloudProvider ( str , Enum): """Cloud providers""" AWS = "aws" AZURE = "azure" GCP = "gcp" DIGITALOCEAN = "digitalocean" ON_PREM = "on-prem" @dataclass class Resource : """Generic resource definition""" type : str name: str properties: Dict [ str , Any ] = field(default_factory= dict ) dependencies: List [ str ] = field(default_factory= list ) provider: Optional [CloudProvider] = None tags: Dict [ str , str ] = field(default_factory= dict ) @dataclass class InfrastructureConfig : """Infrastructure configuration""" name: str environment: str provider: CloudProvider region: str resources: List [Resource] = field(default_factory= list ) variables: Dict [ str , Any ] = field(default_factory= dict ) outputs: Dict [ str , str ] = field(default_factory= dict ) class IaCBackend ( ABC ): """Abstract IaC backend interface""" @abstractmethod async def initialize ( self, config: InfrastructureConfig ) -> None : """Initialize IaC backend""" pass @abstractmethod async def plan ( self, config: InfrastructureConfig ) -> Dict [ str , Any ]: """Generate execution plan""" pass @abstractmethod async def apply ( self, config: InfrastructureConfig ) -> Dict [ str , Any ]: """Apply infrastructure changes""" pass @abstractmethod async def destroy ( self, config: InfrastructureConfig ) -> Dict [ str , Any ]: """Destroy infrastructure""" pass @abstractmethod async def output ( self, config: InfrastructureConfig, key: str ) -> Any : """Get output value""" pass @abstractmethod async def validate ( self, config: InfrastructureConfig ) -> List [ str ]: """Validate configuration""" pass class TerraformBackend ( IaCBackend ): """Terraform backend implementation""" def ( self, working_dir: Path ): self .working_dir = working_dir self .state_backend: Optional [ str ] = None async def initialize ( self, config: InfrastructureConfig ) -> None : """Initialize Terraform"""
await self ._generate_terraform_files(config)
cmd = [ "terraform" , "init" ] result = await self ._run_command(cmd, cwd= self .working_dir) if result[ "returncode" ] != 0 : raise RuntimeError( f"Terraform init failed: {result[ 'stderr' ]} " ) async def plan ( self, config: InfrastructureConfig ) -> Dict [ str , Any ]: """Generate Terraform plan""" cmd = [ "terraform" , "plan" , "-out=tfplan" , "-json" ] result = await self ._run_command(cmd, cwd= self .working_dir) return { "success" : result[ "returncode" ] == 0 , "plan_file" : self .working_dir / "tfplan" , "output" : result[ "stdout" ], "changes" : json.loads(result[ "stdout" ]) if result[ "stdout" ] else {} } async def apply ( self, config: InfrastructureConfig ) -> Dict [ str , Any ]: """Apply Terraform changes""" cmd = [ "terraform" , "apply" , "-auto-approve" , "-json" , "tfplan" ] result = await self ._run_command(cmd, cwd= self .working_dir) return { "success" : result[ "returncode" ] == 0 , "output" : result[ "stdout" ], "applied_at" : datetime.utcnow().isoformat() } async def destroy ( self, config: InfrastructureConfig ) -> Dict [ str , Any ]: """Destroy Terraform resources""" cmd = [ "terraform" , "destroy" , "-auto-approve" , "-json" ] result = await self ._run_command(cmd, cwd= self .working_dir) return { "success" : result[ "returncode" ] == 0 , "output" : result[ "stdout" ], "destroyed_at" : datetime.utcnow().isoformat() } async def output ( self, config: InfrastructureConfig, key: str ) -> Any : """Get Terraform output""" cmd = [ "terraform" , "output" , "-json" , key] result = await self ._run_command(cmd, cwd= self .working_dir) if result[ "returncode" ] == 0 : return json.loads(result[ "stdout" ]) return None async def validate ( self, config: InfrastructureConfig ) -> List [ str ]: """Validate Terraform configuration""" errors = []
cmd = [ "terraform" , "validate" ] result = await self ._run_command(cmd, cwd= self .working_dir) if result[ "returncode" ] != 0 : errors.append( f"Validation failed: {result[ 'stderr' ]} " )
cmd = [ "terraform" , "fmt" , "-check" , "-diff" ] result = await self ._run_command(cmd, cwd= self .working_dir) if result[ "returncode" ] != 0 : errors.append( "Formatting check failed - run terraform fmt" )
cmd = [ "terraform" , "validate" , "-json" ] result = await self ._run_command(cmd, cwd= self .working_dir) return errors async def _generate_terraform_files ( self, config: InfrastructureConfig ) -> None : """Generate Terraform configuration files"""
main_tf = { "terraform" : { "required_providers" : self ._get_required_providers(config.provider), "backend" : self ._get_backend_config() }, "provider" : {config.provider.value: self ._get_provider_config(config)}, "resource" : self ._convert_resources(config.resources) } with open ( self .working_dir / "main.tf" , "w" ) as f: yaml.dump(main_tf, f, sort_keys= False )
if config.variables: variables_tf = { "variable" : { k: { "type" : self ._infer_type(v), "default" : v} for k, v in config.variables.items() } } with open ( self .working_dir / "variables.tf" , "w" ) as f: yaml.dump(variables_tf, f, sort_keys= False )
if config.outputs: outputs_tf = { "output" : { k: { "value" : v} for k, v in config.outputs.items() } } with open ( self .working_dir / "outputs.tf" , "w" ) as f: yaml.dump(outputs_tf, f, sort_keys= False ) async def _run_command ( self, cmd: List [ str ], cwd: Path ) -> Dict [ str , Any ]: """Run command and return result""" process = await asyncio.create_subprocess_exec( *cmd, cwd=cwd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE )
stdout, stderr =
await
process.communicate()
return
{
"returncode"
: process.returncode,
"stdout"
: stdout.decode(),
"stderr"
: stderr.decode()
}
def
_get_required_providers
(
self, provider: CloudProvider
) ->
Dict
[
str
,
Any
]:
"""Get required providers configuration"""
providers = {}
if
provider == CloudProvider.AWS:
providers[
"aws"
] = {
"source"
:
"hashicorp/aws"
,
"version"
:
"> 5.0"
}
elif
provider == CloudProvider.AZURE:
providers[
"azurerm"
] = {
"source"
:
"hashicorp/azurerm"
,
"version"
:
"> 3.0"
}
elif
provider == CloudProvider.GCP:
providers[
"google"
] = {
"source"
:
"hashicorp/google"
,
"version"
:
"~> 5.0"
}
return
providers
def
_get_backend_config
(
self
) ->
Dict
[
str
,
Any
]:
"""Get Terraform backend configuration"""
if
self
.state_backend:
return
{
"s3"
:
self
.state_backend}
return
{}
def
_get_provider_config
(
self, config: InfrastructureConfig
) ->
Dict
[
str
,
Any
]:
"""Get provider-specific configuration"""
base_config = {
"region"
: config.region}
if config.provider == CloudProvider.AWS: base_config[ "default_tags" ] = { "tags" : { "Environment" : config.environment, "ManagedBy" : "terraform" , "Project" : config.name } } return base_config def _convert_resources ( self, resources: List [Resource] ) -> Dict [ str , Any ]: """Convert generic resources to Terraform format""" terraform_resources = {} for resource in resources: resource_key = f" {resource. type } _ {resource.name} " resource_config = { "provider" : resource.provider.value if resource.provider else None , **resource.properties } if resource.tags: resource_config[ "tags" ] = resource.tags
terraform_resources[resource_key] = resource_config
"python" ): self .working_dir = working_dir self .language = language async def initialize ( self, config: InfrastructureConfig ) -> None : """Initialize Pulumi"""
await self ._generate_pulumi_program(config)
{config.environment} " ] result = await self ._run_command(cmd, cwd= self .working_dir) if result[ "returncode" ] != 0 : raise RuntimeError( f"Pulumi stack init failed: {result[ 'stderr' ]} " ) async def plan ( self, config: InfrastructureConfig ) -> Dict [ str , Any ]: """Preview Pulumi changes""" cmd = [ "pulumi" , "preview" , "--json" ] result = await self ._run_command(cmd, cwd= self .working_dir) return { "success" : result[ "returncode" ] == 0 , "output" : result[ "stdout" ], "changes" : json.loads(result[ "stdout" ]) if result[ "stdout" ] else {} } async def apply ( self, config: InfrastructureConfig ) -> Dict [ str , Any ]: """Apply Pulumi changes""" cmd = [ "pulumi" , "up" , "--yes" , "--json" ] result = await self ._run_command(cmd, cwd= self .working_dir) return { "success" : result[ "returncode" ] == 0 , "output" : result[ "stdout" ], "applied_at" : datetime.utcnow().isoformat() } async def destroy ( self, config: InfrastructureConfig ) -> Dict [ str , Any ]: """Destroy Pulumi resources""" cmd = [ "pulumi" , "destroy" , "--yes" , "--json" ] result = await self ._run_command(cmd, cwd= self .working_dir) return { "success" : result[ "returncode" ] == 0 , "output" : result[ "stdout" ], "destroyed_at" : datetime.utcnow().isoformat() } async def output ( self, config: InfrastructureConfig, key: str ) -> Any : """Get Pulumi stack output""" cmd = [ "pulumi" , "stack" , "output" , "--json" ] result = await self ._run_command(cmd, cwd= self .working_dir) if result[ "returncode" ] == 0 : outputs = json.loads(result[ "stdout" ]) return outputs.get(key) return None async def validate ( self, config: InfrastructureConfig ) -> List [ str ]: """Validate Pulumi configuration""" errors = []
{config.environment} """
import pulumi import pulumi_ {config.provider.value} as provider
config = pulumi.Config()
'''
for resource in config.resources: provider_module = f"provider. {resource. type .replace( '_' , '.' )} " program += f''' {resource.name} _resource = {provider_module} . {resource.title()} ( " {resource.name} ", '''
{value} ,\n'
if resource.tags: program += f' tags= {resource.tags} ,\n' program += ')\n\n'
for key, value in config.outputs.items(): program += f'pulumi.export(" {key} ", {value} )\n'
with open ( self .working_dir / "main.py" , "w" ) as f: f.write(program)
with open ( self .working_dir / "requirements.txt" , "w" ) as f: f.write( f"pulumi>= {config.provider.value} \n" ) async def _run_command ( self, cmd: List [ str ], cwd: Path ) -> Dict [ str , Any ]: """Run command and return result""" process = await asyncio.create_subprocess_exec( *cmd, cwd=cwd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE )
stdout, stderr =
await process.communicate() return { "returncode" : process.returncode, "stdout" : stdout.decode(), "stderr" : stderr.decode() }
class IaCFactory : """Factory for creating IaC backends""" @staticmethod def create ( provider: IaCProvider, working_dir: Path, **kwargs ) -> IaCBackend: """Create IaC backend instance""" working_dir.mkdir(parents= True , exist_ok= True ) if provider == IaCProvider.TERRAFORM: return TerraformBackend(working_dir) elif provider == IaCProvider.PULUMI: return PulumiBackend( working_dir, language=kwargs.get( "language" , "python" ) ) else : raise ValueError( f"Unsupported IaC provider: {provider} " ) Multi-Cloud Resource Manager
0
True class MultiCloudManager : """Manages deployments across multiple cloud providers""" def init ( self ): self .deployments: Dict [ str , CloudDeployment] = {} self .backends: Dict [ str , IaCBackend] = {} self .global_resources: List [Resource] = [] self .shared_state: Dict [ str , Any ] = {} def add_deployment ( self, name: str , deployment: CloudDeployment ) -> None : """Add a cloud deployment""" self .deployments[name] = deployment def add_global_resource ( self, resource: Resource ) -> None : """Add a global resource (DNS, CDN, etc.)""" self .global_resources.append(resource) async def deploy_all ( self ) -> Dict [ str , Any ]: """Deploy to all configured clouds""" results = {}
sorted_deployments = sorted ( self .deployments.items(), key= lambda x: x[ 1 ].priority, reverse= True ) for name, deployment in sorted_deployments: if not deployment.enabled: continue print ( f"Deploying to {name} ( {deployment.provider.value} )..." ) try : result = await self ._deploy_single(name, deployment) results[name] = result
if result[ "success" ]: self .shared_state[name] = result.get( "outputs" , {}) except Exception as e: results[name] = { "success" : False , "error" : str (e) } print ( f"Failed to deploy to {name} : {e} " ) return results async def _deploy_single ( self, name: str , deployment: CloudDeployment ) -> Dict [ str , Any ]: """Deploy to a single cloud provider"""
config = InfrastructureConfig( name=name, environment=deployment.environment, provider=deployment.provider, region=deployment.region, resources= self ._merge_resources(deployment), variables= self ._get_variables(deployment) )
if name not in self .backends: from ..core.abstraction import IaCFactory, IaCProvider backend = IaCFactory.create( IaCProvider.TERRAFORM, Path( f"iac/ {name} " ) ) self .backends[name] = backend
backend =
self .backends[name]
await backend.initialize(config)
plan = await backend.plan(config) if not plan[ "success" ]: return { "success" : False , "error" : "Plan failed" }
apply_result = await backend.apply(config)
outputs = {} for key in deployment.resources: if hasattr (key, 'name' ): output_value = await backend.output(config, key.name) if output_value: outputs[key.name] = output_value return { "success" : apply_result[ "success" ], "outputs" : outputs, "deployment_info" : { "provider" : deployment.provider.value, "region" : deployment.region, "environment" : deployment.environment } } def _merge_resources ( self, deployment: CloudDeployment ) -> List [Resource]: """Merge global and deployment-specific resources""" resources = []
resources.extend( self .global_resources)
resources.extend(deployment.resources) return resources def _get_variables ( self, deployment: CloudDeployment ) -> Dict [ str , Any ]: """Get variables for deployment""" variables = { "environment" : deployment.environment, "region" : deployment.region }
for name, state in self .shared_state.items(): if name != deployment.provider.value: variables.update({ f" {name} _endpoint" : state.get( "endpoint" ), f" {name} _credentials" : state.get( "credentials" ) }) return variables async def failover ( self, primary: str , backup: str ) -> Dict [ str , Any ]: """Perform failover from primary to backup deployment""" print ( f"Initiating failover from {primary} to {backup} " )
backup_deployment = self .deployments.get(backup) if not backup_deployment: return { "success" : False , "error" : f"Backup deployment {backup} not found" }
"route53_record" , name= "failover" , properties={ "name" : "api.example.com" , "type" : "A" , "ttl" : 60 , "records" : [ self .shared_state[backup][ "endpoint" ]] } )
backup_deployment.resources.append(dns_resource)
result = await self ._deploy_single(backup, backup_deployment) if result[ "success" ]:
if primary in self .deployments: self .deployments[primary].enabled = False print ( f"Failover to {backup} completed successfully" ) return result async def sync_state ( self ) -> Dict [ str , Any ]: """Synchronize state across deployments""" sync_results = {} for name, deployment in self .deployments.items(): if not deployment.enabled: continue try : backend = self .backends.get(name) if not backend: continue
config = InfrastructureConfig( name=name, environment=deployment.environment, provider=deployment.provider, region=deployment.region, resources=[] )
outputs = {} for resource in deployment.resources: if hasattr (resource, 'name' ): output = await backend.output(config, resource.name) if output: outputs[resource.name] = output
sync_results[name] = {
"success" : True , "outputs" : outputs } except Exception as e: sync_results[name] = { "success" : False , "error" : str (e) } return sync_results 2. GitOps Patterns Generic GitOps Engine
import asyncio import aiohttp from typing import Dict , List , Any , Optional , Callable from dataclasses import dataclass, field from enum import Enum from pathlib import Path import yaml import json from datetime import datetime import logging
"gitops-system" applications: List [Application] = field(default_factory= list ) repositories: Dict [ str , str ] = field(default_factory= dict ) policies: List [ Dict [ str , Any ]] = field(default_factory= list ) notifications: Dict [ str , Any ] = field(default_factory= dict ) class GitOpsEngine : """Generic GitOps engine""" def init ( self, config: GitOpsConfig ): self .config = config self .kubeconfig_path: Optional [Path] = None self .tool_client = None async def initialize ( self ) -> None : """Initialize GitOps tool""" if self .config.tool == GitOpsTool.ARGOCD: await self ._init_argocd() elif self .config.tool == GitOpsTool.FLUX: await self ._init_flux() else : raise ValueError( f"Unsupported GitOps tool: {self.config.tool} " ) async def deploy_application ( self, app: Application, strategy: DeploymentStrategy = DeploymentStrategy.ROLLING ) -> Dict [ str , Any ]: """Deploy application using GitOps""" logger.info( f"Deploying application {app.name} with {strategy.value} strategy" )
await self ._execute_hooks(app, "pre-deploy" )
if strategy == DeploymentStrategy.CANARY: result = await self ._deploy_canary(app) elif strategy == DeploymentStrategy.BLUE_GREEN: result = await self ._deploy_blue_green(app) else : result = await self ._deploy_rolling(app)
if result[ "success" ]: await self ._execute_hooks(app, "post-deploy" )
health_status = await self ._check_health(app) result[ "health" ] = health_status return result async def _init_argocd ( self ) -> None : """Initialize ArgoCD"""
install_cmd = [ "kubectl" , "apply" , "-n" , self .config.namespace, "-f" , "https://raw.githubusercontent.com/argoproj/argo-cd/stable/manifests/install.yaml" ]
result =
await self ._run_kubectl_command(install_cmd) if result[ "returncode" ] != 0 and "AlreadyExists" not in result[ "stderr" ]: raise RuntimeError( f"Failed to install ArgoCD: {result[ 'stderr' ]} " )
await self ._wait_for_deployment( "argocd-server" , self .config.namespace) async def _init_flux ( self ) -> None : """Initialize Flux"""
install_cmd = [ "curl" , "-s" , "https://fluxcd.io/install.sh | bash" ]
result =
await self ._run_command(install_cmd, shell= True ) if result[ "returncode" ] != 0 : raise RuntimeError( f"Failed to install Flux: {result[ 'stderr' ]} " )
bootstrap_cmd = [ "flux" , "bootstrap" , "git" , "--url" , self .config.repositories.get( "main" , "" ), "--branch" , "main" , "--path" , "clusters/my-cluster" ]
result =
await self ._run_command(bootstrap_cmd) if result[ "returncode" ] != 0 : raise RuntimeError( f"Failed to bootstrap Flux: {result[ 'stderr' ]} " ) async def _deploy_rolling ( self, app: Application ) -> Dict [ str , Any ]: """Deploy using rolling update strategy"""
app_manifest = self ._generate_argocd_app(app)
result = await self ._apply_kubernetes_yaml(app_manifest)
if result[ "success" ]: sync_result = await self ._wait_for_sync(app.name) result.update(sync_result) return result async def _deploy_canary ( self, app: Application ) -> Dict [ str , Any ]: """Deploy using canary strategy"""
canary_app = Application( name= f" {app.name} -canary" , repo_url=app.repo_url, path=app.path, target_revision=app.target_revision, namespace=app.namespace, sync_policy={ "automated" : { "prune" : False , "self_heal" : False } } )
stable_app = Application( name= f" {app.name} -stable" , repo_url=app.repo_url, path=app.path, target_revision=app.target_revision, namespace=app.namespace, sync_policy=app.sync_policy )
results = {}
results[ "stable" ] = await self ._deploy_rolling(stable_app)
if results[ "stable" ][ "success" ]: results[ "canary" ] = await self ._deploy_rolling(canary_app)
if results[ "canary" ][ "success" ]: traffic_result = await self ._setup_traffic_split(app, canary_weight= 10 ) results[ "traffic" ] = traffic_result return { "success" : all (r.get( "success" , False ) for r in results.values()), "phases" : results } async def _deploy_blue_green ( self, app: Application ) -> Dict [ str , Any ]: """Deploy using blue-green strategy"""
active_color = await self ._get_active_color(app.name) new_color = "green" if active_color == "blue" else "blue"
{new_color} " , repo_url=app.repo_url, path=app.path, target_revision=app.target_revision, namespace=app.namespace, sync_policy=app.sync_policy )
deploy_result = await self ._deploy_rolling(new_app) if deploy_result[ "success" ]:
{new_color} " , app.namespace ) if health_result[ "healthy" ]:
switch_result = await self ._switch_traffic(app.name, new_color) deploy_result[ "traffic_switch" ] = switch_result if switch_result[ "success" ]:
{old_color} " , app.namespace ) deploy_result[ "cleanup" ] = cleanup_result else : deploy_result[ "health_check" ] = health_result deploy_result[ "success" ] = False return deploy_result def _generate_argocd_app ( self, app: Application ) -> str : """Generate ArgoCD application manifest""" manifest = { "apiVersion" : "argoproj.io/v1alpha1" , "kind" : "Application" , "metadata" : { "name" : app.name, "namespace" : self .config.namespace }, "spec" : { "project" : "default" , "source" : { "repoURL" : app.repo_url, "targetRevision" : app.target_revision, "path" : app.path }, "destination" : { "server" : app.destination_server, "namespace" : app.namespace }, "syncPolicy" : { "automated" : { "prune" : app.sync_policy.get( "prune" , True ), "selfHeal" : app.sync_policy.get( "self_heal" , True ) }, "syncOptions" : [ "CreateNamespace=true" ] } } } return yaml.dump(manifest, sort_keys= False ) async def _apply_kubernetes_yaml ( self, yaml_content: str ) -> Dict [ str , Any ]: """Apply Kubernetes YAML manifest""" cmd = [ "kubectl" , "apply" , "-f" , "-" ]
process =
await asyncio.create_subprocess_exec( *cmd, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env={ "KUBECONFIG" : str ( self .kubeconfig_path)} if self .kubeconfig_path else None )
stdout, stderr =
300 ) -> Dict [ str , Any ]: """Wait for ArgoCD application to sync""" start_time = datetime.utcnow() while (datetime.utcnow() - start_time).seconds < timeout: cmd = [ "argocd" , "app" , "get" , app_name, "-o" , "json" ] result = await self ._run_command(cmd) if result[ "returncode" ] == 0 : app_data = json.loads(result[ "stdout" ]) status = app_data.get( "status" , {})
sync_status = status.get(
"sync" , {}).get( "status" , "" ) health_status = status.get( "health" , {}).get( "status" , "" ) if sync_status == "Synced" and health_status == "Healthy" : return { "success" : True , "status" : "Synced and Healthy" , "duration" : (datetime.utcnow() - start_time).seconds } await asyncio.sleep( 5 ) return { "success" : False , "error" : f"Sync timeout after {timeout} seconds" } async def _execute_hooks ( self, app: Application, hook_type: str ) -> None : """Execute deployment hooks""" hooks = app.hooks.get(hook_type, []) for hook in hooks: logger.info( f"Executing {hook_type} hook: {hook} " ) try : result = await self ._run_command(hook, shell= True ) if result[ "returncode" ] != 0 : logger.error( f"Hook failed: {result[ 'stderr' ]} " ) except Exception as e: logger.error( f"Error executing hook: {e} " ) async def _check_health ( self, app: Application ) -> Dict [ str , Any ]: """Check application health""" health_results = {} for check in app.health_checks: if check[ "type" ] == "http" : result = await self ._check_http_health(check) elif check[ "type" ] == "kubernetes" : result = await self ._check_kubernetes_health(check) else : result = { "healthy" : False , "error" : f"Unknown health check type: {check[ 'type' ]} " }
health_results[check[
False , env: Optional [ Dict [ str , str ]] = None ) -> Dict [ str , Any ]: """Run command""" if shell: cmd = [ "bash" , "-c" , " " .join(cmd) if isinstance (cmd, list ) else cmd]
process =
await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env )
stdout, stderr =
300 ) -> None : """Wait for deployment to be ready""" cmd = [ "kubectl" , "wait" , f"deployment/ {deployment_name} " , "--for" , "condition=available" , f"--namespace= {namespace} " , f"--timeout= {timeout} s" ]
result =
await self ._run_kubectl_command(cmd) if result[ "returncode" ] != 0 : raise RuntimeError( f"Deployment {deployment_name} not ready: {result[ 'stderr' ]} " ) 3. Progressive Delivery Patterns Canary Analysis Engine
import asyncio import aiohttp from typing import Dict , List , Any , Optional , Callable from dataclasses import dataclass, field from datetime import datetime, timedelta import numpy as np import logging
"less_than"
60
True class CanaryAnalyzer : """Analyzes canary deployment metrics""" def init ( self, prometheus_url: str ): self .prometheus_url = prometheus_url self .session = None async def analyze ( self, config: CanaryConfig, current_traffic: int ) -> Dict [ str , Any ]: """Analyze canary vs baseline metrics""" analysis = { "timestamp" : datetime.utcnow().isoformat(), "traffic_percentage" : current_traffic, "metrics" : {}, "overall_score" : 0.0 , "recommendation" : "proceed" }
baseline_metrics = await self ._query_metrics(config, "baseline" ) canary_metrics = await self ._query_metrics(config, "canary" )
metric_scores = [] for metric in config.metrics: baseline_value = baseline_metrics.get(metric.name, 0 ) canary_value = canary_metrics.get(metric.name, 0 )
if baseline_value != 0 : diff_percentage = ((canary_value - baseline_value) / baseline_value) * 100 else : diff_percentage = 0
passes = self ._evaluate_metric(metric, canary_value, baseline_value)
score = self ._calculate_score(metric, diff_percentage, passes) metric_scores.append(score * metric.weight)
analysis[
"metrics" ][metric.name] = { "baseline" : baseline_value, "canary" : canary_value, "difference_percentage" : diff_percentage, "passes" : passes, "score" : score }
if metric_scores: analysis[ "overall_score" ] = np.mean(metric_scores)
analysis[ "recommendation" ] = self ._get_recommendation( analysis[ "overall_score" ], config, current_traffic ) return analysis async def _query_metrics ( self, config: CanaryConfig, variant: str ) -> Dict [ str , float ]: """Query metrics from Prometheus""" label = config.canary_label if variant == "canary" else config.baseline_label metrics = {} for metric in config.metrics: query = metric.query. format (label=label) try : value = await self ._query_prometheus(query) metrics[metric.name] = value except Exception as e: logger.error( f"Failed to query metric {metric.name} : {e} " ) metrics[metric.name] = 0 return metrics async def _query_prometheus ( self, query: str ) -> float : """Query single metric from Prometheus""" if not self .session: self .session = aiohttp.ClientSession()
url =
f" {self.prometheus_url} /api/v1/query" params = { "query" : query} async with self .session.get(url, params=params) as response: data = await response.json() if data[ "status" ] == "success" and data[ "data" ][ "result" ]: return float (data[ "data" ][ "result" ][ 0 ][ "value" ][ 1 ]) return 0.0 def _evaluate_metric ( self, metric: CanaryMetric, canary_value: float , baseline_value: float ) -> bool : """Evaluate if metric passes threshold""" if metric.threshold is None : return True if metric.comparison == "less_than" : return canary_value <= metric.threshold elif metric.comparison == "greater_than" : return canary_value >= metric.threshold elif metric.comparison == "equals" : return abs (canary_value - metric.threshold) < 0.01 return True def _calculate_score ( self, metric: CanaryMetric, diff_percentage: float , passes: bool ) -> float : """Calculate metric score""" if not passes: return 0.0
{ int (datetime.utcnow().timestamp())} "
await self ._create_canary_deployment(config)
await self ._set_traffic_split(config, config.traffic_step)
asyncio.create_task( self ._analysis_loop(canary_id, config)) self .active_canaries[canary_id] = { "config" : config, "current_traffic" : config.traffic_step, "started_at" : datetime.utcnow(), "status" : "running" }
logger.info(
f"Started canary deployment {canary_id} " ) return canary_id async def _analysis_loop ( self, canary_id: str , config: CanaryConfig ) -> None : """Run continuous analysis""" while canary_id in self .active_canaries: canary_info = self .active_canaries[canary_id] if canary_info[ "status" ] != "running" : break
analysis = await self .analyzer.analyze( config, canary_info[ "current_traffic" ] )
await self ._handle_recommendation(canary_id, analysis)
await asyncio.sleep(config.analysis_interval) async def _handle_recommendation ( self, canary_id: str , analysis: Dict [ str , Any ] ) -> None : """Handle analysis recommendation""" canary_info = self .active_canaries.get(canary_id) if not canary_info: return recommendation = analysis[ "recommendation" ] config = canary_info[ "config" ] if recommendation == "rollback" : logger.warning( f"Rolling back canary {canary_id} " ) await self ._rollback_canary(canary_id) elif recommendation == "hold" : logger.info( f"Holding canary {canary_id} at {canary_info[ 'current_traffic' ]} % traffic" ) elif recommendation == "promote" : if canary_info[ "current_traffic" ] >= config.max_traffic:
logger.info( f"Promoting canary {canary_id} to production" ) await self ._promote_canary(canary_id) else :
new_traffic = min ( canary_info[ "current_traffic" ] + config.traffic_step, config.max_traffic )
logger.info(
f"Increasing canary {canary_id} traffic to {new_traffic} %" ) await self ._set_traffic_split(config, new_traffic) canary_info[ "current_traffic" ] = new_traffic async def _set_traffic_split ( self, config: CanaryConfig, canary_percentage: int ) -> None : """Set traffic split between baseline and canary"""
if config.namespace == "istio-system" : await self ._set_istio_traffic_split(config, canary_percentage) elif config.namespace == "aws" : await self ._set_alb_traffic_split(config, canary_percentage) else : await self ._set_service_mesh_traffic_split(config, canary_percentage) async def _set_istio_traffic_split ( self, config: CanaryConfig, canary_percentage: int ) -> None : """Set traffic split using Istio"""
virtual_service = { "apiVersion" : "networking.istio.io/v1beta1" , "kind" : "VirtualService" , "metadata" : { "name" : config.application, "namespace" : config.namespace }, "spec" : { "http" : [ { "name" : "primary" , "route" : [ { "destination" : { "host" : f" {config.application} " , "subset" : "canary" }, "weight" : canary_percentage }, { "destination" : { "host" : f" {config.application} " , "subset" : "baseline" }, "weight" : 100
cmd = [ "kubectl" , "apply" , "-f" , "-" , "-n" , config.namespace ]
pass This comprehensive cloud-native deployment skill provides production-ready patterns for implementing modern infrastructure as code, GitOps workflows, and progressive delivery strategies across any cloud provider or platform. ★ Insight ───────────────────────────────────── The cloud-native patterns shown here emphasize several key 2025 best practices: Provider Abstraction : Generic interfaces allow switching between Terraform, Pulumi, and other IaC tools Multi-Cloud Strategy : Built-in support for managing deployments across multiple providers with failover capabilities GitOps-First : Declarative deployment through Git with support for ArgoCD, Flux, and custom tools Progressive Delivery : Safe deployments with canary analysis, blue-green strategies, and automated rollback Policy as Code : Integration with Open Policy Agent for automated compliance and security These patterns ensure your cloud deployments are automated, secure, and resilient across any infrastructure. ─────────────────────────────────────────────────