Cloud Native Development Fundamentals: Kubernetes, Docker, Containers, Serverless & Cloud Architecture
This article is a comprehensive introduction to Cloud Native Development Fundamentals – including Kubernetes, Docker, Containers, Serverless and Cloud Architecture with practical examples.
In a Nutshell
Cloud Native development is an approach to building and operating applications that fully leverages cloud computing models to create scalable, resilient, and manageable systems.
Concise Technical Description
Cloud Native is a methodology for developing applications based on containers, microservices, dynamic orchestration, and continuous delivery.
Core Components:
Containerization
- Docker: De facto standard for containers
- Container Images: Immutable, portable application packages
- Container Registries: Docker Hub, AWS ECR, Google GCR
- Multi-Stage Builds: Optimized image creation
Kubernetes Orchestration
- Pods: Smallest deployable unit
- Services: Network abstraction for pods
- Deployments: Management of pod replicas
- ConfigMaps & Secrets: Configuration management
Serverless Computing
- Function as a Service (FaaS): Event-driven functions
- Backend as a Service (BaaS): Managed backend services
- Event-Driven Architecture: Asynchronous communication
- Pay-per-Use: Usage-based billing
Cloud Native Patterns
- Sidecar Pattern: Additional containers in a pod
- Ambassador Pattern: API gateway in a container
- Adapter Pattern: Interface standardization
- Init Containers: Pre-initialization
Exam-Relevant Key Points
- Cloud Native: Cloud-optimized application development
- Containers: Isolated application environments
- Kubernetes: Container orchestration platform
- Serverless: Event-driven, pay-per-use architecture
- Microservices: Small, autonomous services
- CI/CD: Continuous integration and deployment
- Immutable Infrastructure: Unchanging infrastructure
- Observability: Monitoring, logging, tracing
- IHK-relevant: Modern cloud development and architecture
Core Components
- Container: Docker, Container Images, Registries
- Orchestration: Kubernetes, Services, Deployments
- Serverless: FaaS, Event-Driven Architecture
- CI/CD: Automated builds and deployments
- Monitoring: Prometheus, Grafana, OpenTelemetry
- Security: mTLS, RBAC, Secrets Management
- Networking: Service Mesh, Ingress, Load Balancing
- Storage: Persistent Volumes, Storage Classes
Practical Examples
1. Cloud Native Application with Kubernetes
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: cloud-native-app
labels:
app: cloud-native-app
version: v1
spec:
replicas: 3
selector:
matchLabels:
app: cloud-native-app
template:
metadata:
labels:
app: cloud-native-app
version: v1
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9090"
prometheus.io/path: "/metrics"
spec:
containers:
- name: app
image: cloud-native-app:1.0.0
ports:
- containerPort: 8080
name: http
- containerPort: 9090
name: metrics
env:
- name: PORT
value: "8080"
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: app-secrets
key: database-url
- name: LOG_LEVEL
valueFrom:
configMapKeyRef:
name: app-config
key: log-level
resources:
requests:
memory: "64Mi"
cpu: "250m"
limits:
memory: "128Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 3
startupProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 10
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 30
volumeMounts:
- name: config-volume
mountPath: /app/config
readOnly: true
- name: cache-volume
mountPath: /app/cache
- name: sidecar
image: logging-sidecar:1.0.0
env:
- name: LOG_PATH
value: "/app/logs"
volumeMounts:
- name: log-volume
mountPath: /app/logs
initContainers:
- name: init-db
image: db-migrator:1.0.0
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: app-secrets
key: database-url
command: ["/app/migrate"]
volumes:
- name: config-volume
configMap:
name: app-config
- name: cache-volume
emptyDir:
sizeLimit: 100Mi
- name: log-volume
emptyDir:
sizeLimit: 50Mi
securityContext:
runAsNonRoot: true
runAsUser: 1000
fsGroup: 1000
nodeSelector:
cloud.google.com/gke-nodepool: application-pool
tolerations:
- key: "workload"
operator: "Equal"
value: "application"
effect: "NoSchedule"
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: app
operator: In
values:
- cloud-native-app
topologyKey: kubernetes.io/hostname
---
# service.yaml
apiVersion: v1
kind: Service
metadata:
name: cloud-native-app-service
labels:
app: cloud-native-app
spec:
type: ClusterIP
selector:
app: cloud-native-app
ports:
- name: http
port: 80
targetPort: 8080
protocol: TCP
- name: metrics
port: 9090
targetPort: 9090
protocol: TCP
---
# ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: cloud-native-app-ingress
annotations:
kubernetes.io/ingress.class: "nginx"
cert-manager.io/cluster-issuer: "letsencrypt-prod"
nginx.ingress.kubernetes.io/rate-limit: "100"
nginx.ingress.kubernetes.io/rate-limit-window: "1m"
nginx.ingress.kubernetes.io/enable-cors: "true"
nginx.ingress.kubernetes.io/cors-allow-origin: "*"
nginx.ingress.kubernetes.io/cors-allow-methods: "GET, POST, PUT, DELETE, OPTIONS"
nginx.ingress.kubernetes.io/cors-allow-headers: "DNT,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Range,Authorization"
spec:
tls:
- hosts:
- app.example.com
secretName: app-tls
rules:
- host: app.example.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: cloud-native-app-service
port:
number: 80
---
# hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: cloud-native-app-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: cloud-native-app
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
behavior:
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 10
periodSeconds: 60
scaleUp:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 100
periodSeconds: 15
---
# configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: app-config
data:
log-level: "info"
feature-flags: |
{
"new-ui": true,
"beta-features": false,
"debug-mode": false
}
app.properties: |
server.port=8080
server.servlet.context-path=/api
spring.datasource.hikari.maximum-pool-size=10
spring.datasource.hikari.minimum-idle=2
---
# secret.yaml
apiVersion: v1
kind: Secret
metadata:
name: app-secrets
type: Opaque
data:
database-url: <base64-encoded-database-url>
api-key: <base64-encoded-api-key>
jwt-secret: <base64-encoded-jwt-secret>
---
# serviceaccount.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
name: app-service-account
namespace: default
---
# rbac.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: app-role
namespace: default
rules:
- apiGroups: [""]
resources: ["configmaps", "secrets"]
verbs: ["get", "list", "watch"]
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: app-role-binding
namespace: default
subjects:
- kind: ServiceAccount
name: app-service-account
namespace: default
roleRef:
kind: Role
name: app-role
apiGroup: rbac.authorization.k8s.io
---
# networkpolicy.yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: app-network-policy
spec:
podSelector:
matchLabels:
app: cloud-native-app
policyTypes:
- Ingress
- Egress
ingress:
- from:
- namespaceSelector:
matchLabels:
name: ingress-nginx
ports:
- protocol: TCP
port: 8080
egress:
- to:
- namespaceSelector:
matchLabels:
name: database
ports:
- protocol: TCP
port: 5432
- to: []
ports:
- protocol: TCP
port: 53
port: 443
2. Serverless Function with AWS Lambda
# lambda_function.py
import json
import os
import logging
import boto3
from datetime import datetime
from typing import Dict, Any
import asyncio
from functools import wraps
# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Initialize AWS clients
dynamodb = boto3.resource('dynamodb')
s3 = boto3.client('s3')
sns = boto3.client('sns')
stepfunctions = boto3.client('stepfunctions')
# Environment variables
TABLE_NAME = os.environ.get('TABLE_NAME', 'serverless-app')
BUCKET_NAME = os.environ.get('BUCKET_NAME', 'serverless-bucket')
SNS_TOPIC_ARN = os.environ.get('SNS_TOPIC_ARN')
STEP_FUNCTION_ARN = os.environ.get('STEP_FUNCTION_ARN')
# Initialize DynamoDB table
table = dynamodb.Table(TABLE_NAME)
class LambdaContext:
"""Lambda context for better type hints"""
def __init__(self, aws_request_id: str, function_name: str, function_version: str):
self.aws_request_id = aws_request_id
self.function_name = function_name
self.function_version = function_version
def error_handler(func):
"""Decorator for error handling and logging"""
@wraps(func)
def wrapper(event, context):
try:
logger.info(f"Function {context.function_name} invoked with request ID: {context.aws_request_id}")
logger.info(f"Event: {json.dumps(event)}")
result = func(event, context)
logger.info(f"Function completed successfully")
return result
except Exception as e:
logger.error(f"Error in function {context.function_name}: {str(e)}")
logger.error(f"Event: {json.dumps(event)}")
# Send error notification
if SNS_TOPIC_ARN:
send_error_notification(str(e), context.aws_request_id)
# Return error response
return {
'statusCode': 500,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps({
'error': 'Internal server error',
'message': str(e),
'requestId': context.aws_request_id
})
}
return wrapper
def cors_headers(func):
"""Decorator for CORS headers"""
@wraps(func)
def wrapper(event, context):
result = func(event, context)
if isinstance(result, dict) and 'statusCode' in result:
headers = result.get('headers', {})
headers.update({
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token',
'Access-Control-Allow-Methods': 'GET,POST,PUT,DELETE,OPTIONS'
})
result['headers'] = headers
return result
return wrapper
def validate_request(required_fields):
"""Decorator for request validation"""
def decorator(func):
@wraps(func)
def wrapper(event, context):
if 'body' in event and event['body']:
try:
body = json.loads(event['body'])
except json.JSONDecodeError:
return {
'statusCode': 400,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({'error': 'Invalid JSON in request body'})
}
else:
body = {}
# Validate required fields
missing_fields = [field for field in required_fields if field not in body]
if missing_fields:
return {
'statusCode': 400,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({
'error': 'Missing required fields',
'missing_fields': missing_fields
})
}
event['parsed_body'] = body
return func(event, context)
return wrapper
return decorator
@cors_headers
@error_handler
def lambda_handler(event: Dict[str, Any], context: LambdaContext) -> Dict[str, Any]:
"""Main Lambda handler"""
http_method = event.get('httpMethod')
path = event.get('path')
# Route based on HTTP method and path
if http_method == 'GET' and path == '/health':
return health_check(event, context)
elif http_method == 'GET' and path == '/items':
return list_items(event, context)
elif http_method == 'GET' and path.startswith('/items/'):
item_id = path.split('/')[-1]
return get_item(event, context, item_id)
elif http_method == 'POST' and path == '/items':
return create_item(event, context)
elif http_method == 'PUT' and path.startswith('/items/'):
item_id = path.split('/')[-1]
return update_item(event, context, item_id)
elif http_method == 'DELETE' and path.startswith('/items/'):
item_id = path.split('/')[-1]
return delete_item(event, context, item_id)
else:
return {
'statusCode': 404,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({'error': 'Not found'})
}
def health_check(event: Dict[str, Any], context: LambdaContext) -> Dict[str, Any]:
"""Health check endpoint"""
try:
# Check DynamoDB connectivity
table.load()
return {
'statusCode': 200,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({
'status': 'healthy',
'timestamp': datetime.utcnow().isoformat(),
'function': context.function_name,
'version': context.function_version,
'services': {
'dynamodb': 'connected',
's3': 'connected'
}
})
}
except Exception as e:
return {
'statusCode': 503,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({
'status': 'unhealthy',
'error': str(e),
'timestamp': datetime.utcnow().isoformat()
})
}
@cors_headers
@error_handler
def list_items(event: Dict[str, Any], context: LambdaContext) -> Dict[str, Any]:
"""List all items"""
try:
# Parse query parameters
query_params = event.get('queryStringParameters', {}) or {}
limit = int(query_params.get('limit', 10))
offset = int(query_params.get('offset', 0))
# Scan DynamoDB table
response = table.scan(
Limit=limit,
ExclusiveStartKey={'id': str(offset)} if offset > 0 else None
)
items = response.get('Items', [])
return {
'statusCode': 200,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({
'items': items,
'count': len(items),
'last_evaluated_key': response.get('LastEvaluatedKey')
})
}
except Exception as e:
logger.error(f"Error listing items: {str(e)}")
raise
@cors_headers
@error_handler
@validate_request(['name', 'description'])
def create_item(event: Dict[str, Any], context: LambdaContext) -> Dict[str, Any]:
"""Create a new item"""
try:
body = event['parsed_body']
# Generate item ID
item_id = str(int(datetime.utcnow().timestamp() * 1000))
# Create item object
item = {
'id': item_id,
'name': body['name'],
'description': body['description'],
'created_at': datetime.utcnow().isoformat(),
'updated_at': datetime.utcnow().isoformat(),
'status': 'active'
}
# Save to DynamoDB
table.put_item(Item=item)
# Trigger step function if needed
if STEP_FUNCTION_ARN:
trigger_step_function(item_id, 'created')
# Send SNS notification
if SNS_TOPIC_ARN:
send_notification(f"Item created: {item_id}", item)
return {
'statusCode': 201,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps(item)
}
except Exception as e:
logger.error(f"Error creating item: {str(e)}")
raise
@cors_headers
@error_handler
def get_item(event: Dict[str, Any], context: LambdaContext, item_id: str) -> Dict[str, Any]:
"""Get item by ID"""
try:
response = table.get_item(
Key={'id': item_id}
)
item = response.get('Item')
if not item:
return {
'statusCode': 404,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({'error': 'Item not found'})
}
return {
'statusCode': 200,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps(item)
}
except Exception as e:
logger.error(f"Error getting item: {str(e)}")
raise
@cors_headers
@error_handler
@validate_request(['name', 'description'])
def update_item(event: Dict[str, Any], context: LambdaContext, item_id: str) -> Dict[str, Any]:
"""Update item by ID"""
try:
body = event['parsed_body']
# Update expression
update_expression = "SET #n = :name, description = :description, updated_at = :updated_at"
expression_attribute_names = {'#n': 'name'}
expression_attribute_values = {
':name': body['name'],
':description': body['description'],
':updated_at': datetime.utcnow().isoformat()
}
response = table.update_item(
Key={'id': item_id},
UpdateExpression=update_expression,
ExpressionAttributeNames=expression_attribute_names,
ExpressionAttributeValues=expression_attribute_values,
ReturnValues="ALL_NEW"
)
item = response.get('Attributes')
if not item:
return {
'statusCode': 404,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({'error': 'Item not found'})
}
# Trigger step function if needed
if STEP_FUNCTION_ARN:
trigger_step_function(item_id, 'updated')
return {
'statusCode': 200,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps(item)
}
except Exception as e:
logger.error(f"Error updating item: {str(e)}")
raise
@cors_headers
@error_handler
def delete_item(event: Dict[str, Any], context: LambdaContext, item_id: str) -> Dict[str, Any]:
"""Delete item by ID"""
try:
response = table.delete_item(
Key={'id': item_id},
ReturnValues="ALL_OLD"
)
item = response.get('Attributes')
if not item:
return {
'statusCode': 404,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({'error': 'Item not found'})
}
# Trigger step function if needed
if STEP_FUNCTION_ARN:
trigger_step_function(item_id, 'deleted')
# Send SNS notification
if SNS_TOPIC_ARN:
send_notification(f"Item deleted: {item_id}", item)
return {
'statusCode': 204,
'headers': {'Content-Type': 'application/json'},
'body': ''
}
except Exception as e:
logger.error(f"Error deleting item: {str(e)}")
raise
def send_notification(message: str, item: Dict[str, Any]) -> None:
"""Send SNS notification"""
try:
sns.publish(
TopicArn=SNS_TOPIC_ARN,
Message=json.dumps({
'message': message,
'item': item,
'timestamp': datetime.utcnow().isoformat()
}),
Subject='Serverless App Notification'
)
except Exception as e:
logger.error(f"Error sending notification: {str(e)}")
def send_error_notification(error: str, request_id: str) -> None:
"""Send error notification"""
try:
sns.publish(
TopicArn=SNS_TOPIC_ARN,
Message=json.dumps({
'error': error,
'request_id': request_id,
'timestamp': datetime.utcnow().isoformat()
}),
Subject='Serverless App Error'
)
except Exception as e:
logger.error(f"Error sending error notification: {str(e)}")
def trigger_step_function(item_id: str, action: str) -> None:
"""Trigger Step Function"""
try:
stepfunctions.start_execution(
stateMachineArn=STEP_FUNCTION_ARN,
name=f"{item_id}-{action}-{int(datetime.utcnow().timestamp())}",
input=json.dumps({
'item_id': item_id,
'action': action,
'timestamp': datetime.utcnow().isoformat()
})
)
except Exception as e:
logger.error(f"Error triggering step function: {str(e)}")
# Additional Lambda functions for different purposes
def process_image_upload(event: Dict[str, Any], context: LambdaContext) -> Dict[str, Any]:
"""Process uploaded image"""
try:
bucket = event['Records'][0]['s3']['bucket']['name']
key = event['Records'][0]['s3']['object']['key']
# Download image
s3_object = s3.get_object(Bucket=bucket, Key=key)
image_data = s3_object['Body'].read()
# Process image (resize, compress, etc.)
# This would typically use libraries like Pillow
# Upload processed image
processed_key = f"processed/{key}"
s3.put_object(
Bucket=BUCKET_NAME,
Key=processed_key,
Body=image_data
)
# Update database record
table.update_item(
Key={'id': key.split('/')[-1].split('.')[0]},
UpdateExpression="SET processed_url = :processed_url, processed_at = :processed_at",
ExpressionAttributeValues={
':processed_url': f"s3://{BUCKET_NAME}/{processed_key}",
':processed_at': datetime.utcnow().isoformat()
}
)
return {
'statusCode': 200,
'body': json.dumps({'message': 'Image processed successfully'})
}
except Exception as e:
logger.error(f"Error processing image: {str(e)}")
raise
def cleanup_old_items(event: Dict[str, Any], context: LambdaContext) -> Dict[str, Any]:
"""Cleanup old items (scheduled Lambda)"""
try:
# Get items older than 30 days
cutoff_date = datetime.utcnow().replace(day=1) # First day of current month
response = table.scan(
FilterExpression="created_at < :cutoff_date",
ExpressionAttributeValues={':cutoff_date': cutoff_date.isoformat()}
)
old_items = response.get('Items', [])
# Delete old items
for item in old_items:
table.delete_item(Key={'id': item['id']})
# Delete associated files from S3
if 'image_url' in item:
s3_key = item['image_url'].split('/')[-1]
s3.delete_object(Bucket=BUCKET_NAME, Key=s3_key)
return {
'statusCode': 200,
'body': json.dumps({
'message': f'Cleaned up {len(old_items)} old items'
})
}
except Exception as e:
logger.error(f"Error cleaning up old items: {str(e)}")
raise
3. Cloud Native Dockerfile Best Practices
# Multi-stage Dockerfile for cloud-native applications
# Stage 1: Build stage
FROM golang:1.21-alpine AS builder
# Install build dependencies
RUN apk add --no-cache git ca-certificates tzdata
# Set working directory
WORKDIR /app
# Copy go mod files
COPY go.mod go.sum ./
# Download dependencies
RUN go mod download
# Copy source code
COPY . .
# Build the application
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o main .
# Stage 2: Runtime stage
FROM alpine:3.18
# Install runtime dependencies
RUN apk --no-cache add ca-certificates tzdata curl
# Create non-root user
RUN addgroup -g 1001 -S appgroup && \
adduser -u 1001 -S appuser -G appgroup
# Set working directory
WORKDIR /app
# Copy binary from builder stage
COPY --from=builder /app/main .
# Copy configuration files
COPY --chown=appuser:appgroup config/ ./config/
# Create necessary directories
RUN mkdir -p /app/logs /app/data && \
chown -R appuser:appgroup /app
# Switch to non-root user
USER appuser
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8080/health || exit 1
# Expose port
EXPOSE 8080
# Set environment variables
ENV GIN_MODE=release
ENV PORT=8080
# Default command
CMD ["./main"]
# Labels for metadata
LABEL maintainer="devops@company.com" \
version="1.0.0" \
description="Cloud Native Application" \
org.opencontainers.image.source="https://github.com/company/cloud-native-app" \
org.opencontainers.image.licenses="MIT"
# Stage 3: Development stage
FROM builder AS development
# Install development tools
RUN go install github.com/cosmtrek/air@latest
# Set development environment
ENV GIN_MODE=debug
# Default command for development
CMD ["air", "-c", ".air.toml"]
# Stage 4: Testing stage
FROM builder AS testing
# Install test dependencies
RUN go install github.com/golang/mock/mockgen@latest && \
go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest
# Run tests
RUN go test -v ./... && \
golangci-lint run
# Stage 5: Security scanning stage
FROM builder AS security
# Install security tools
RUN go install github.com/securecodewarrior/sdlc-scanner@latest
# Run security scans
RUN sdlc-scanner scan ./...
4. Kubernetes Operator mit Go
// main.go
package main
import (
"context"
"flag"
"fmt"
"os"
"time"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/webhook"
customv1 "github.com/company/cloud-native-operator/api/v1"
"github.com/company/cloud-native-operator/controllers"
//+kubebuilder:scaffold:imports
)
var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(customv1.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme
}
func main() {
var metricsAddr string
var enableLeaderElection bool
var probeAddr string
var secureMetrics bool
var enableHTTP2 bool
var qps float32
var burst int
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
flag.BoolVar(&secureMetrics, "metrics-secure", false,
"If set the metrics endpoint is served securely via HTTPS. "+
"If not set the metrics endpoint is served over HTTP.")
flag.BoolVar(&enableHTTP2, "enable-http2", false,
"If set, HTTP/2 will be enabled for the metrics and webhook servers")
flag.Float32Var(&qps, "kube-api-qps", 50, "QPS to use while talking with the kubernetes api")
flag.IntVar(&burst, "kube-api-burst", 100, "Burst to use while talking with the kubernetes api")
opts := zap.Options{
Development: true,
}
opts.BindFlags(flag.CommandLine)
flag.Parse()
ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
// Set up the manager
restConfig := ctrl.GetConfigOrDie()
restConfig.QPS = qps
restConfig.Burst = burst
mgr, err := ctrl.NewManager(restConfig, ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Port: 9443,
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "cloud-native-operator.leader.election",
// LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily
// when the Manager ends. This requires the binary to immediately end when the
// Manager is stopped, otherwise, this setting is unsafe. Setting this variable
// to true when the binary is not being immediately ended might cause pending
// requests to be dropped and can lead to inconsistent state.
LeaderElectionReleaseOnCancel: true,
SecureMetrics: secureMetrics,
// NewWebhookServerBuilder configures the webhook server in the manager.
NewWebhookServerBuilder: webhook.NewServer(webhook.Options{
Host: "0.0.0.0",
Port: 9443,
CertDir: "/tmp/k8s-webhook-server/serving-certs",
}),
NewCache: cache.BuilderWithOptions(cache.Options{
ByObject: map[client.Object]cache.ByObject{
&customv1.CloudNativeApp{}: {
Transform: func(ctx context.Context, obj client.Object) (client.Object, error) {
// Transform the object before caching
return obj, nil
},
},
},
}),
})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
// Set up metrics server with options
if err = mgr.AddMetricsServer(metricsserver.Options{
BindAddress: metricsAddr,
SecureServing: secureMetrics,
// TODO: Consider using a separate metrics port
// BindAddress: "0.0.0.0:8080",
}); err != nil {
setupLog.Error(err, "unable to set up metrics server")
os.Exit(1)
}
// Add health probes
if err = mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
os.Exit(1)
}
if err = mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up ready check")
os.Exit(1)
}
// Set up field indexers
if err = setupFieldIndexes(mgr.GetCache()); err != nil {
setupLog.Error(err, "unable to set up field indexes")
os.Exit(1)
}
// Set up controllers
if err = setupControllers(mgr); err != nil {
setupLog.Error(err, "unable to set up controllers")
os.Exit(1)
}
// Set up webhooks
if err = setupWebhooks(mgr); err != nil {
setupLog.Error(err, "unable to set up webhooks")
os.Exit(1)
}
// Start the manager
setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
}
func setupFieldIndexes(cache cache.Cache) error {
// Set up custom field indexes
if err := cache.IndexField(context.Background(), &customv1.CloudNativeApp{}, ".spec.status", func(obj client.Object) []string {
app := obj.(*customv1.CloudNativeApp)
return []string{string(app.Spec.Status)}
}); err != nil {
return fmt.Errorf("failed to set up index for CloudNativeApp.spec.status: %w", err)
}
return nil
}
func setupControllers(mgr ctrl.Manager) error {
// Set up CloudNativeApp controller
if err := (&controllers.CloudNativeAppReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
EventRecorder: mgr.GetEventRecorderFor("cloud-native-app-controller"),
}).SetupWithManager(mgr); err != nil {
return fmt.Errorf("unable to create CloudNativeApp controller: %w", err)
}
// Set up additional controllers here
//+kubebuilder:scaffold:builder
return nil
}
func setupWebhooks(mgr ctrl.Manager) error {
// Set up CloudNativeApp webhook
if err := (&customv1.CloudNativeApp{}).SetupWebhookWithManager(mgr); err != nil {
return fmt.Errorf("unable to set up CloudNativeApp webhook: %w", err)
}
return nil
}
// controllers/cloudnativeapp_controller.go
package controllers
import (
"context"
"fmt"
"time"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
customv1 "github.com/company/cloud-native-operator/api/v1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// CloudNativeAppReconciler reconciles a CloudNativeApp object
type CloudNativeAppReconciler struct {
client.Client
Scheme *runtime.Scheme
EventRecorder record.EventRecorder
}
//+kubebuilder:rbac:groups=custom.company.com,resources=cloudnativeapps,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=custom.company.com,resources=cloudnativeapps/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=custom.company.com,resources=cloudnativeapps/finalizers,verbs=update
//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch;create;update;patch;delete
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
func (r *CloudNativeAppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
// Fetch the CloudNativeApp instance
var cloudNativeApp customv1.CloudNativeApp
if err := r.Get(ctx, req.NamespacedName, &cloudNativeApp); err != nil {
if errors.IsNotFound(err) {
// Object not found, return. Created objects are automatically garbage collected.
return ctrl.Result{}, nil
}
logger.Error(err, "unable to fetch CloudNativeApp")
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// Check if the deployment exists
deployment := &appsv1.Deployment{}
deploymentName := fmt.Sprintf("%s-deployment", cloudNativeApp.Name)
err := r.Get(ctx, types.NamespacedName{Name: deploymentName, Namespace: cloudNativeApp.Namespace}, deployment)
if err != nil && errors.IsNotFound(err) {
// Create deployment
deployment, err = r.createDeployment(ctx, &cloudNativeApp)
if err != nil {
logger.Error(err, "unable to create deployment")
return ctrl.Result{}, err
}
r.EventRecorder.Event(&cloudNativeApp, corev1.EventTypeNormal, "Created", "Deployment created")
} else if err != nil {
logger.Error(err, "unable to get deployment")
return ctrl.Result{}, err
} else {
// Update deployment
if err := r.updateDeployment(ctx, &cloudNativeApp, deployment); err != nil {
logger.Error(err, "unable to update deployment")
return ctrl.Result{}, err
}
}
// Check if the service exists
service := &corev1.Service{}
serviceName := fmt.Sprintf("%s-service", cloudNativeApp.Name)
err = r.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: cloudNativeApp.Namespace}, service)
if err != nil && errors.IsNotFound(err) {
// Create service
service, err = r.createService(ctx, &cloudNativeApp)
if err != nil {
logger.Error(err, "unable to create service")
return ctrl.Result{}, err
}
r.EventRecorder.Event(&cloudNativeApp, corev1.EventTypeNormal, "Created", "Service created")
} else if err != nil {
logger.Error(err, "unable to get service")
return ctrl.Result{}, err
} else {
// Update service
if err := r.updateService(ctx, &cloudNativeApp, service); err != nil {
logger.Error(err, "unable to update service")
return ctrl.Result{}, err
}
}
// Update status
if err := r.updateStatus(ctx, &cloudNativeApp, deployment); err != nil {
logger.Error(err, "unable to update status")
return ctrl.Result{}, err
}
return ctrl.Result{RequeueAfter: time.Minute * 5}, nil
}
func (r *CloudNativeAppReconciler) createDeployment(ctx context.Context, app *customv1.CloudNativeApp) (*appsv1.Deployment, error) {
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-deployment", app.Name),
Namespace: app.Namespace,
Labels: map[string]string{
"app": app.Name,
"managed": "cloud-native-operator",
},
},
Spec: appsv1.DeploymentSpec{
Replicas: app.Spec.Replicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": app.Name,
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": app.Name,
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: app.Name,
Image: app.Spec.Image,
Ports: []corev1.ContainerPort{
{
ContainerPort: app.Spec.Port,
},
},
Env: app.Spec.Env,
Resources: app.Spec.Resources,
LivenessProbe: app.Spec.LivenessProbe,
ReadinessProbe: app.Spec.ReadinessProbe,
},
},
},
},
},
}
// Set controller reference
if err := controllerutil.SetControllerReference(app, deployment, r.Scheme); err != nil {
return nil, err
}
if err := r.Create(ctx, deployment); err != nil {
return nil, err
}
return deployment, nil
}
func (r *CloudNativeAppReconciler) updateDeployment(ctx context.Context, app *customv1.CloudNativeApp, deployment *appsv1.Deployment) error {
// Update deployment spec
deployment.Spec.Replicas = app.Spec.Replicas
deployment.Spec.Template.Spec.Containers[0].Image = app.Spec.Image
deployment.Spec.Template.Spec.Containers[0].Ports[0].ContainerPort = app.Spec.Port
deployment.Spec.Template.Spec.Containers[0].Env = app.Spec.Env
deployment.Spec.Template.Spec.Containers[0].Resources = app.Spec.Resources
deployment.Spec.Template.Spec.Containers[0].LivenessProbe = app.Spec.LivenessProbe
deployment.Spec.Template.Spec.Containers[0].ReadinessProbe = app.Spec.ReadinessProbe
return r.Update(ctx, deployment)
}
func (r *CloudNativeAppReconciler) createService(ctx context.Context, app *customv1.CloudNativeApp) (*corev1.Service, error) {
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-service", app.Name),
Namespace: app.Namespace,
Labels: map[string]string{
"app": app.Name,
"managed": "cloud-native-operator",
},
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"app": app.Name,
},
Ports: []corev1.ServicePort{
{
Port: app.Spec.ServicePort,
TargetPort: app.Spec.Port,
Protocol: corev1.ProtocolTCP,
},
},
Type: corev1.ServiceTypeClusterIP,
},
}
// Set controller reference
if err := controllerutil.SetControllerReference(app, service, r.Scheme); err != nil {
return nil, err
}
if err := r.Create(ctx, service); err != nil {
return nil, err
}
return service, nil
}
func (r *CloudNativeAppReconciler) updateService(ctx context.Context, app *customv1.CloudNativeApp, service *corev1.Service) error {
// Update service spec
service.Spec.Ports[0].Port = app.Spec.ServicePort
service.Spec.Ports[0].TargetPort = intstr.FromInt(app.Spec.Port)
return r.Update(ctx, service)
}
func (r *CloudNativeAppReconciler) updateStatus(ctx context.Context, app *customv1.CloudNativeApp, deployment *appsv1.Deployment) error {
app.Status.Replicas = deployment.Status.Replicas
app.Status.ReadyReplicas = deployment.Status.ReadyReplicas
app.Status.UpdatedReplicas = deployment.Status.UpdatedReplicas
app.Status.AvailableReplicas = deployment.Status.AvailableReplicas
// Set conditions
conditions := []customv1.CloudNativeAppCondition{
{
Type: customv1.ConditionReady,
Status: corev1.ConditionTrue,
},
}
app.Status.Conditions = conditions
return r.Status().Update(ctx, app)
}
// SetupWithManager sets up the controller with the Manager.
func (r *CloudNativeAppReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&customv1.CloudNativeApp{}).
Owns(&appsv1.Deployment{}).
Owns(&corev1.Service{}).
Complete(r)
}
Cloud Native Architecture
Cloud Native Stack
graph TD
A[Developer] --> B[CI/CD Pipeline]
B --> C[Container Registry]
C --> D[Kubernetes Cluster]
D --> E[Pods & Containers]
D --> F[Services & Ingress]
D --> G[ConfigMaps & Secrets]
E --> H[Application Code]
F --> I[Load Balancer]
G --> J[Configuration]
K[Service Mesh] --> E
L[Monitoring] --> D
M[Logging] --> D
N[Security] --> D
O[Serverless] --> P[Functions]
P --> Q[Event Sources]
R[GitOps] --> D
S[IaC Tools] --> D
Containerization vs. Serverless
Comparison Table
| Aspect | Container | Serverless |
|---|---|---|
| Control | Full control | Managed infrastructure |
| Costs | Ongoing costs | Pay-per-Use |
| Scaling | Manual/Auto-scaling | Automatic |
| Cold Starts | None | Yes |
| State Management | Persistent | Ephemeral |
| Portability | High | Cloud-specific |
Cloud Native Patterns
Sidecar Pattern
# Sidecar container example
apiVersion: v1
kind: Pod
metadata:
name: app-with-sidecar
spec:
containers:
- name: main-app
image: myapp:1.0
ports:
- containerPort: 8080
- name: logging-sidecar
image: logging-sidecar:1.0
volumeMounts:
- name: shared-logs
mountPath: /var/log/app
volumes:
- name: shared-logs
emptyDir: {}
Ambassador Pattern
# API Gateway as ambassador
apiVersion: v1
kind: Service
metadata:
name: api-gateway
spec:
selector:
app: api-gateway
ports:
- port: 80
targetPort: 8080
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: api-gateway
spec:
replicas: 2
selector:
matchLabels:
app: api-gateway
template:
metadata:
labels:
app: api-gateway
spec:
containers:
- name: gateway
image: api-gateway:1.0
ports:
- containerPort: 8080
env:
- name: BACKEND_URL
value: "http://backend-service:8080"
Advantages and Disadvantages
Advantages of Cloud Native
- Scalability: Automatic horizontal scaling
- Resilience: Self-healing systems
- Portability: Cloud-independent applications
- Efficiency: Resource optimization
- Speed: Fast deployment cycles
Disadvantages
- Complexity: High technical complexity
- Learning Curve: Steep learning curve for teams
- Costs: Unpredictable costs with Serverless
- Vendor Lock-in: Cloud-specific features
- Debugging: Complex debugging in distributed systems
Common Exam Questions
-
What is the difference between containers and serverless? Containers offer full control over the runtime, while Serverless fully manages the infrastructure and charges on a pay-per-use basis.
-
Explain the Sidecar Pattern! The Sidecar Pattern adds additional containers to a main container that provide supporting functions such as logging, monitoring, or security.
-
When do you use Kubernetes vs. Serverless? Kubernetes for complex, stateful applications with full control, Serverless for event-driven, stateless functions with low complexity.
-
What makes an application “Cloud Native”? Cloud Native applications use containers, microservices, dynamic orchestration, and continuous delivery to maximize cloud utilization.