Microservices Architektur Grundlagen: Service Mesh, API Gateway, Circuit Breaker & Distributed Tracing
Dieser Beitrag ist eine umfassende Einführung in die Microservices Architektur Grundlagen – inklusive Service Mesh, API Gateway, Circuit Breaker und Distributed Tracing mit praktischen Beispielen.
In a Nutshell
Microservices Architektur ist ein Ansatz, bei dem eine Anwendung als Sammlung kleiner, unabhängiger Dienste entwickelt wird, die jeweils eine spezifische Geschäftsfunktion erfüllen.
Kompakte Fachbeschreibung
Microservices Architektur ist ein architektonischer Stil, der eine Anwendung in lose gekoppelte, autonom deploybare Dienste aufteilt, die über Netzwerkprotokolle kommunizieren.
Kernkomponenten:
Service Mesh
- Sidecar Pattern: Proxy neben jedem Service
- Traffic Management: Routing, Load Balancing
- Security: mTLS, Access Control
- Observability: Metrics, Logging, Tracing
- Implementierungen: Istio, Linkerd, Consul Connect
API Gateway
- Single Entry Point: Zentraler API-Zugang
- Request Routing: Anfragen an Services weiterleiten
- Authentication: Authentifizierung und Autorisierung
- Rate Limiting: Anfragelimits durchsetzen
- Protocol Translation: Protokollkonvertierung
Resilience Patterns
- Circuit Breaker: Fehlerkaskaden verhindern
- Retry Pattern: Wiederholungsversuche bei Fehlern
- Timeout Pattern: Zeitlimits für Anfragen
- Bulkhead Pattern: Ressourcenisolierung
- Fallback Pattern: Ersatzlösungen bei Ausfällen
Distributed Tracing
- Request Tracing: Anfragen durch Services verfolgen
- Span Trees: Hierarchische Trace-Struktur
- Context Propagation: Kontext weitergeben
- Sampling: Strategien zur Datenerfassung
- Visualization: Trace-Daten visualisieren
Prüfungsrelevante Stichpunkte
- Microservices: Kleine, autonome Dienste mit eigener Verantwortlichkeit
- Service Mesh: Infrastrukturschicht für Service-Kommunikation
- API Gateway: Zentraler Eingangspunkt für Client-Anfragen
- Circuit Breaker: Schutzmechanismus gegen Fehlerkaskaden
- Distributed Tracing: Verfolgung von Anfragen über Service-Grenzen hinweg
- Service Discovery: Automatische Erkennung von Service-Instanzen
- Load Balancing: Verteilung von Anfragen auf mehrere Instanzen
- Resilience: Widerstandsfähigkeit gegen Ausfälle
- IHK-relevant: Moderne verteilte Systemarchitekturen
Kernkomponenten
- Service Discovery: Dynamische Service-Erkennung und Registrierung
- API Gateway: Zentrale API-Verwaltung und Routing
- Service Mesh: Kommunikationssteuerung zwischen Services
- Circuit Breaker: Fehlerbehandlung und -isolation
- Distributed Tracing: Anfrageverfolgung und Performance-Analyse
- Configuration Management: Zentrale Konfigurationsverwaltung
- Monitoring: Überwachung und Alerting
- Deployment: Continuous Delivery und Orchestrierung
Praxisbeispiele
1. Microservice mit Go und Service Discovery
// main.go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"strconv"
"time"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
// Internal packages
"github.com/company/microservices/internal/config"
"github.com/company/microservices/internal/database"
"github.com/company/microservices/internal/models"
"github.com/company/microservices/internal/registry"
"github.com/company/microservices/internal/circuitbreaker"
"github.com/company/microservices/internal/tracing"
)
// Service configuration
type ServiceConfig struct {
Name string `json:"name"`
Version string `json:"version"`
Port int `json:"port"`
DatabaseURL string `json:"database_url"`
RegistryURL string `json:"registry_url"`
JaegerURL string `json:"jaeger_url"`
}
// Metrics
var (
httpRequestsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "http_requests_total",
Help: "Total number of HTTP requests",
},
[]string{"method", "endpoint", "status"},
)
httpRequestDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "http_request_duration_seconds",
Help: "HTTP request duration in seconds",
},
[]string{"method", "endpoint"},
)
activeConnections = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "active_connections",
Help: "Number of active connections",
},
)
)
func init() {
prometheus.MustRegister(httpRequestsTotal)
prometheus.MustRegister(httpRequestDuration)
prometheus.MustRegister(activeConnections)
}
// UserService handles user-related operations
type UserService struct {
config *ServiceConfig
db *database.Database
registry *registry.ServiceRegistry
tracer trace.Tracer
breaker *circuitbreaker.CircuitBreaker
}
// NewUserService creates a new user service
func NewUserService(cfg *ServiceConfig) (*UserService, error) {
// Initialize database
db, err := database.NewDatabase(cfg.DatabaseURL)
if err != nil {
return nil, fmt.Errorf("failed to initialize database: %w", err)
}
// Initialize service registry
reg, err := registry.NewServiceRegistry(cfg.RegistryURL)
if err != nil {
return nil, fmt.Errorf("failed to initialize service registry: %w", err)
}
// Initialize circuit breaker
breakerConfig := circuitbreaker.Config{
MaxRequests: 100,
Interval: 60 * time.Second,
Timeout: 30 * time.Second,
ReadyToTrip: circuitbreaker.DefaultReadyToTrip,
OnStateChange: circuitbreaker.DefaultOnStateChange,
FallbackFunc: nil,
}
breaker := circuitbreaker.NewCircuitBreaker(breakerConfig)
// Initialize tracer
tracer := otel.Tracer("user-service")
return &UserService{
config: cfg,
db: db,
registry: reg,
tracer: tracer,
breaker: breaker,
}, nil
}
// RegisterService registers the service with the service registry
func (s *UserService) RegisterService(ctx context.Context) error {
serviceInfo := registry.ServiceInfo{
Name: s.config.Name,
Version: s.config.Version,
Address: fmt.Sprintf("localhost:%d", s.config.Port),
Tags: []string{"user", "service", "v1"},
Metadata: map[string]string{
"port": strconv.Itoa(s.config.Port),
"version": s.config.Version,
"health_check": "/health",
},
}
return s.registry.Register(ctx, serviceInfo)
}
// DeregisterService removes the service from the registry
func (s *UserService) DeregisterService(ctx context.Context) error {
return s.registry.Deregister(ctx, s.config.Name)
}
// HealthCheck returns the health status of the service
func (s *UserService) HealthCheck(w http.ResponseWriter, r *http.Request) {
ctx, span := s.tracer.Start(r.Context(), "health-check")
defer span.End()
// Check database connectivity
if err := s.db.Ping(ctx); err != nil {
span.SetAttributes(semconv.ExceptionMessageKey.String(err.Error()))
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(map[string]string{
"status": "unhealthy",
"error": err.Error(),
})
return
}
// Check circuit breaker state
breakerState := s.breaker.State()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"status": "healthy",
"service": s.config.Name,
"version": s.config.Version,
"timestamp": time.Now().Unix(),
"breaker_state": breakerState.String(),
})
}
// CreateUser creates a new user
func (s *UserService) CreateUser(w http.ResponseWriter, r *http.Request) {
ctx, span := s.tracer.Start(r.Context(), "create-user")
defer span.End()
start := time.Now()
defer func() {
httpRequestDuration.WithLabelValues("POST", "/users").Observe(time.Since(start).Seconds())
}()
var user models.User
if err := json.NewDecoder(r.Body).Decode(&user); err != nil {
span.SetAttributes(semconv.ExceptionMessageKey.String(err.Error()))
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
return
}
// Validate user
if err := user.Validate(); err != nil {
span.SetAttributes(semconv.ExceptionMessageKey.String(err.Error()))
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
return
}
// Create user with circuit breaker
result, err := s.breaker.Execute(func() (interface{}, error) {
return s.db.CreateUser(ctx, &user)
})
if err != nil {
span.SetAttributes(semconv.ExceptionMessageKey.String(err.Error()))
if s.breaker.State() == circuitbreaker.StateOpen {
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(map[string]string{
"error": "Service temporarily unavailable",
"code": "CIRCUIT_BREAKER_OPEN",
})
return
}
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
return
}
createdUser := result.(*models.User)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(createdUser)
httpRequestsTotal.WithLabelValues("POST", "/users", "201").Inc()
}
// GetUser retrieves a user by ID
func (s *UserService) GetUser(w http.ResponseWriter, r *http.Request) {
ctx, span := s.tracer.Start(r.Context(), "get-user")
defer span.End()
start := time.Now()
defer func() {
httpRequestDuration.WithLabelValues("GET", "/users/{id}").Observe(time.Since(start).Seconds())
}()
vars := mux.Vars(r)
userID := vars["id"]
if userID == "" {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(map[string]string{"error": "User ID is required"})
return
}
// Get user with circuit breaker
result, err := s.breaker.Execute(func() (interface{}, error) {
return s.db.GetUser(ctx, userID)
})
if err != nil {
span.SetAttributes(semconv.ExceptionMessageKey.String(err.Error()))
if s.breaker.State() == circuitbreaker.StateOpen {
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(map[string]string{
"error": "Service temporarily unavailable",
"code": "CIRCUIT_BREAKER_OPEN",
})
return
}
if err.Error() == "user not found" {
w.WriteHeader(http.StatusNotFound)
json.NewEncoder(w).Encode(map[string]string{"error": "User not found"})
return
}
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
return
}
user := result.(*models.User)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(user)
httpRequestsTotal.WithLabelValues("GET", "/users/{id}", "200").Inc()
}
// UpdateUser updates an existing user
func (s *UserService) UpdateUser(w http.ResponseWriter, r *http.Request) {
ctx, span := s.tracer.Start(r.Context(), "update-user")
defer span.End()
start := time.Now()
defer func() {
httpRequestDuration.WithLabelValues("PUT", "/users/{id}").Observe(time.Since(start).Seconds())
}()
vars := mux.Vars(r)
userID := vars["id"]
if userID == "" {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(map[string]string{"error": "User ID is required"})
return
}
var user models.User
if err := json.NewDecoder(r.Body).Decode(&user); err != nil {
span.SetAttributes(semconv.ExceptionMessageKey.String(err.Error()))
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
return
}
user.ID = userID
// Update user with circuit breaker
result, err := s.breaker.Execute(func() (interface{}, error) {
return s.db.UpdateUser(ctx, &user)
})
if err != nil {
span.SetAttributes(semconv.ExceptionMessageKey.String(err.Error()))
if s.breaker.State() == circuitbreaker.StateOpen {
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(map[string]string{
"error": "Service temporarily unavailable",
"code": "CIRCUIT_BREAKER_OPEN",
})
return
}
if err.Error() == "user not found" {
w.WriteHeader(http.StatusNotFound)
json.NewEncoder(w).Encode(map[string]string{"error": "User not found"})
return
}
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
return
}
updatedUser := result.(*models.User)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(updatedUser)
httpRequestsTotal.WithLabelValues("PUT", "/users/{id}", "200").Inc()
}
// DeleteUser deletes a user by ID
func (s *UserService) DeleteUser(w http.ResponseWriter, r *http.Request) {
ctx, span := s.tracer.Start(r.Context(), "delete-user")
defer span.End()
start := time.Now()
defer func() {
httpRequestDuration.WithLabelValues("DELETE", "/users/{id}").Observe(time.Since(start).Seconds())
}()
vars := mux.Vars(r)
userID := vars["id"]
if userID == "" {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(map[string]string{"error": "User ID is required"})
return
}
// Delete user with circuit breaker
_, err := s.breaker.Execute(func() (interface{}, error) {
return nil, s.db.DeleteUser(ctx, userID)
})
if err != nil {
span.SetAttributes(semconv.ExceptionMessageKey.String(err.Error()))
if s.breaker.State() == circuitbreaker.StateOpen {
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(map[string]string{
"error": "Service temporarily unavailable",
"code": "CIRCUIT_BREAKER_OPEN",
})
return
}
if err.Error() == "user not found" {
w.WriteHeader(http.StatusNotFound)
json.NewEncoder(w).Encode(map[string]string{"error": "User not found"})
return
}
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
return
}
w.WriteHeader(http.StatusNoContent)
httpRequestsTotal.WithLabelValues("DELETE", "/users/{id}", "204").Inc()
}
// ListUsers retrieves a list of users with pagination
func (s *UserService) ListUsers(w http.ResponseWriter, r *http.Request) {
ctx, span := s.tracer.Start(r.Context(), "list-users")
defer span.End()
start := time.Now()
defer func() {
httpRequestDuration.WithLabelValues("GET", "/users").Observe(time.Since(start).Seconds())
}()
// Parse query parameters
page := 1
limit := 10
if p := r.URL.Query().Get("page"); p != "" {
if parsed, err := strconv.Atoi(p); err == nil && parsed > 0 {
page = parsed
}
}
if l := r.URL.Query().Get("limit"); l != "" {
if parsed, err := strconv.Atoi(l); err == nil && parsed > 0 && parsed <= 100 {
limit = parsed
}
}
// Get users with circuit breaker
result, err := s.breaker.Execute(func() (interface{}, error) {
return s.db.ListUsers(ctx, page, limit)
})
if err != nil {
span.SetAttributes(semconv.ExceptionMessageKey.String(err.Error()))
if s.breaker.State() == circuitbreaker.StateOpen {
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(map[string]string{
"error": "Service temporarily unavailable",
"code": "CIRCUIT_BREAKER_OPEN",
})
return
}
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
return
}
users := result.([]*models.User)
response := map[string]interface{}{
"users": users,
"pagination": map[string]interface{}{
"page": page,
"limit": limit,
"total": len(users),
},
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
httpRequestsTotal.WithLabelValues("GET", "/users", "200").Inc()
}
// GetServiceInfo returns service information
func (s *UserService) GetServiceInfo(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"name": s.config.Name,
"version": s.config.Version,
"port": s.config.Port,
"timestamp": time.Now().Unix(),
"breaker_state": s.breaker.State().String(),
})
}
// MetricsHandler exposes Prometheus metrics
func (s *UserService) MetricsHandler(w http.ResponseWriter, r *http.Request) {
promhttp.Handler().ServeHTTP(w, r)
}
// SetupRoutes configures the HTTP routes
func (s *UserService) SetupRoutes() *mux.Router {
r := mux.NewRouter()
// Add OpenTelemetry middleware
r.Use(otelmux.Middleware("user-service"))
// Add middleware for metrics and logging
r.Use(s.metricsMiddleware)
r.Use(s.loggingMiddleware)
// API routes
api := r.PathPrefix("/api/v1").Subrouter()
api.HandleFunc("/users", s.ListUsers).Methods("GET")
api.HandleFunc("/users", s.CreateUser).Methods("POST")
api.HandleFunc("/users/{id}", s.GetUser).Methods("GET")
api.HandleFunc("/users/{id}", s.UpdateUser).Methods("PUT")
api.HandleFunc("/users/{id}", s.DeleteUser).Methods("DELETE")
// Health and info endpoints
r.HandleFunc("/health", s.HealthCheck).Methods("GET")
r.HandleFunc("/info", s.GetServiceInfo).Methods("GET")
// Metrics endpoint
r.Handle("/metrics", promhttp.Handler()).Methods("GET")
return r
}
// metricsMiddleware tracks HTTP request metrics
func (s *UserService) metricsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
activeConnections.Inc()
defer activeConnections.Dec()
// Wrap response writer to capture status code
wrapped := &responseWriter{ResponseWriter: w, statusCode: http.StatusOK}
next.ServeHTTP(wrapped, r)
// Record metrics
httpRequestsTotal.WithLabelValues(r.Method, r.URL.Path, strconv.Itoa(wrapped.statusCode)).Inc()
})
}
// loggingMiddleware logs HTTP requests
func (s *UserService) loggingMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// Wrap response writer
wrapped := &responseWriter{ResponseWriter: w, statusCode: http.StatusOK}
next.ServeHTTP(wrapped, r)
duration := time.Since(start)
log.Printf(
"%s %s %d %v %s",
r.Method,
r.URL.Path,
wrapped.statusCode,
duration,
r.UserAgent(),
)
})
}
// responseWriter wraps http.ResponseWriter to capture status code
type responseWriter struct {
http.ResponseWriter
statusCode int
}
func (rw *responseWriter) WriteHeader(code int) {
rw.statusCode = code
rw.ResponseWriter.WriteHeader(code)
}
// initTracer initializes OpenTelemetry tracing
func initTracer(serviceName, jaegerURL string) (*sdktrace.TracerProvider, error) {
exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(jaegerURL)))
if err != nil {
return nil, err
}
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
sdktrace.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(serviceName),
semconv.ServiceVersionKey.String("1.0.0"),
)),
)
otel.SetTracerProvider(tp)
return tp, nil
}
func main() {
// Load configuration
cfg, err := config.Load[ServiceConfig]("config.json")
if err != nil {
log.Fatalf("Failed to load configuration: %v", err)
}
// Initialize tracing
tp, err := initTracer(cfg.Name, cfg.JaegerURL)
if err != nil {
log.Fatalf("Failed to initialize tracer: %v", err)
}
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
log.Printf("Error shutting down tracer provider: %v", err)
}
}()
// Create user service
userService, err := NewUserService(cfg)
if err != nil {
log.Fatalf("Failed to create user service: %v", err)
}
// Register service
ctx := context.Background()
if err := userService.RegisterService(ctx); err != nil {
log.Fatalf("Failed to register service: %v", err)
}
// Setup graceful shutdown
go func() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
log.Println("Shutting down service...")
// Deregister service
if err := userService.DeregisterService(ctx); err != nil {
log.Printf("Failed to deregister service: %v", err)
}
// Shutdown tracer
if err := tp.Shutdown(ctx); err != nil {
log.Printf("Error shutting down tracer provider: %v", err)
}
os.Exit(0)
}()
// Setup routes and start server
router := userService.SetupRoutes()
log.Printf("Starting %s on port %d", cfg.Name, cfg.Port)
log.Printf("Health check: http://localhost:%d/health", cfg.Port)
log.Printf("Metrics: http://localhost:%d/metrics", cfg.Port)
if err := http.ListenAndServe(fmt.Sprintf(":%d", cfg.Port), router); err != nil {
log.Fatalf("Failed to start server: %v", err)
}
}
2. API Gateway mit Java Spring Cloud
// GatewayApplication.java
package com.company.gateway;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gateway.route.RouteLocator;
import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.web.server.SecurityWebFilterChain;
import org.springframework.web.cors.CorsConfiguration;
import org.springframework.web.cors.reactive.CorsWebFilter;
import org.springframework.web.cors.reactive.UrlBasedCorsConfigurationSource;
import java.util.Arrays;
@SpringBootApplication
@EnableWebFluxSecurity
public class GatewayApplication {
public static void main(String[] args) {
SpringApplication.run(GatewayApplication.class, args);
}
@Bean
public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
return builder.routes()
// User Service Routes
.route("user-service", r -> r.path("/api/v1/users/**")
.filters(f -> f
.stripPrefix(2)
.addRequestHeader("X-Request-User-Service", "Gateway")
.addResponseHeader("X-Response-User-Service", "Gateway")
.circuitBreaker(config -> config
.setName("user-service")
.setFallbackUri("forward:/fallback/user-service"))
.retry(retryConfig -> retryConfig
.setRetries(3)
.setBackoff(Duration.ofMillis(100), Duration.ofMillis(500), 2))
.requestRateLimiter(config -> config
.setRateLimiter(redisRateLimiter())
.setKeyResolver(userKeyResolver()))
)
.uri("lb://user-service"))
// Order Service Routes
.route("order-service", r -> r.path("/api/v1/orders/**")
.filters(f -> f
.stripPrefix(2)
.addRequestHeader("X-Request-Order-Service", "Gateway")
.addResponseHeader("X-Response-Order-Service", "Gateway")
.circuitBreaker(config -> config
.setName("order-service")
.setFallbackUri("forward:/fallback/order-service"))
.retry(retryConfig -> retryConfig
.setRetries(3)
.setBackoff(Duration.ofMillis(100), Duration.ofMillis(500), 2))
.requestRateLimiter(config -> config
.setRateLimiter(redisRateLimiter())
.setKeyResolver(userKeyResolver()))
)
.uri("lb://order-service"))
// Product Service Routes
.route("product-service", r -> r.path("/api/v1/products/**")
.filters(f -> f
.stripPrefix(2)
.addRequestHeader("X-Request-Product-Service", "Gateway")
.addResponseHeader("X-Response-Product-Service", "Gateway")
.circuitBreaker(config -> config
.setName("product-service")
.setFallbackUri("forward:/fallback/product-service"))
.retry(retryConfig -> retryConfig
.setRetries(3)
.setBackoff(Duration.ofMillis(100), Duration.ofMillis(500), 2))
.requestRateLimiter(config -> config
.setRateLimiter(redisRateLimiter())
.setKeyResolver(userKeyResolver()))
)
.uri("lb://product-service"))
// Notification Service Routes
.route("notification-service", r -> r.path("/api/v1/notifications/**")
.filters(f -> f
.stripPrefix(2)
.addRequestHeader("X-Request-Notification-Service", "Gateway")
.addResponseHeader("X-Response-Notification-Service", "Gateway")
.circuitBreaker(config -> config
.setName("notification-service")
.setFallbackUri("forward:/fallback/notification-service"))
.retry(retryConfig -> retryConfig
.setRetries(3)
.setBackoff(Duration.ofMillis(100), Duration.ofMillis(500), 2))
.requestRateLimiter(config -> config
.setRateLimiter(redisRateLimiter())
.setKeyResolver(userKeyResolver()))
)
.uri("lb://notification-service"))
// Authentication Service Routes
.route("auth-service", r -> r.path("/api/v1/auth/**")
.filters(f -> f
.stripPrefix(2)
.addRequestHeader("X-Request-Auth-Service", "Gateway")
.addResponseHeader("X-Response-Auth-Service", "Gateway")
.circuitBreaker(config -> config
.setName("auth-service")
.setFallbackUri("forward:/fallback/auth-service"))
.retry(retryConfig -> retryConfig
.setRetries(3)
.setBackoff(Duration.ofMillis(100), Duration.ofMillis(500), 2))
)
.uri("lb://auth-service"))
// Health Check Routes (no rate limiting)
.route("health-check", r -> r.path("/health/**")
.filters(f -> f.stripPrefix(1))
.uri("lb://user-service"))
.build();
}
@Bean
public SecurityWebFilterChain securityWebFilterChain(ServerHttpSecurity http) {
return http
.csrf(ServerHttpSecurity.CsrfSpec::disable)
.authorizeExchange(exchanges -> exchanges
.pathMatchers("/api/v1/auth/**").permitAll()
.pathMatchers("/health/**").permitAll()
.pathMatchers("/actuator/**").permitAll()
.anyExchange().authenticated()
)
.oauth2ResourceServer(oauth2 -> oauth2
.jwt(jwt -> jwt.jwtDecoder(jwtDecoder()))
)
.build();
}
@Bean
public CorsWebFilter corsWebFilter() {
CorsConfiguration corsConfig = new CorsConfiguration();
corsConfig.setAllowCredentials(true);
corsConfig.addAllowedOriginPattern("*");
corsConfig.addAllowedHeader("*");
corsConfig.addAllowedMethod("*");
UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
source.registerCorsConfiguration("/**", corsConfig);
return new CorsWebFilter(source);
}
@Bean
public RedisRateLimiter redisRateLimiter() {
return new RedisRateLimiter(10, 20, 1); // replenishRate, burstCapacity, requestedTokens
}
@Bean
public KeyResolver userKeyResolver() {
return exchange -> exchange.getRequest()
.getHeaders()
.getFirst("X-User-ID")
.map(userId -> (String) userId)
.defaultIfEmpty("anonymous");
}
@Bean
public JwtDecoder jwtDecoder() {
// Configure JWT decoder based on your authentication service
return NimbusJwtDecoder.withJwkSetUri("http://auth-service/.well-known/jwks.json").build();
}
}
// GatewayController.java
package com.company.gateway.controller;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import java.util.HashMap;
import java.util.Map;
@RestController
@RequestMapping("/fallback")
public class GatewayController {
@GetMapping("/user-service")
public Mono<ResponseEntity<Map<String, Object>>> userServiceFallback() {
Map<String, Object> response = new HashMap<>();
response.put("service", "user-service");
response.put("status", "fallback");
response.put("message", "User service is temporarily unavailable");
response.put("timestamp", System.currentTimeMillis());
return Mono.just(ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).body(response));
}
@GetMapping("/order-service")
public Mono<ResponseEntity<Map<String, Object>>> orderServiceFallback() {
Map<String, Object> response = new HashMap<>();
response.put("service", "order-service");
response.put("status", "fallback");
response.put("message", "Order service is temporarily unavailable");
response.put("timestamp", System.currentTimeMillis());
return Mono.just(ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).body(response));
}
@GetMapping("/product-service")
public Mono<ResponseEntity<Map<String, Object>>> productServiceFallback() {
Map<String, Object> response = new HashMap<>();
response.put("service", "product-service");
response.put("status", "fallback");
response.put("message", "Product service is temporarily unavailable");
response.put("timestamp", System.currentTimeMillis());
return Mono.just(ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).body(response));
}
@GetMapping("/notification-service")
public Mono<ResponseEntity<Map<String, Object>>> notificationServiceFallback() {
Map<String, Object> response = new HashMap<>();
response.put("service", "notification-service");
response.put("status", "fallback");
response.put("message", "Notification service is temporarily unavailable");
response.put("timestamp", System.currentTimeMillis());
return Mono.just(ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).body(response));
}
@GetMapping("/auth-service")
public Mono<ResponseEntity<Map<String, Object>>> authServiceFallback() {
Map<String, Object> response = new HashMap<>();
response.put("service", "auth-service");
response.put("status", "fallback");
response.put("message", "Authentication service is temporarily unavailable");
response.put("timestamp", System.currentTimeMillis());
return Mono.just(ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).body(response));
}
}
// GatewayFilter.java
package com.company.gateway.filter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import java.util.UUID;
@Component
public class GatewayFilter implements GlobalFilter, Ordered {
private static final Logger logger = LoggerFactory.getLogger(GatewayFilter.class);
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// Generate unique request ID
String requestId = UUID.randomUUID().toString();
// Add request ID to headers
exchange.getRequest().mutate()
.header("X-Request-ID", requestId)
.header("X-Gateway-Timestamp", String.valueOf(System.currentTimeMillis()));
// Log request
logger.info("Request: {} {} - ID: {} - User: {}",
exchange.getRequest().getMethod(),
exchange.getRequest().getURI(),
requestId,
exchange.getRequest().getHeaders().getFirst("X-User-ID"));
// Process request and log response
return chain.filter(exchange).then(Mono.fromRunnable(() -> {
// Log response
logger.info("Response: {} {} - ID: {} - Status: {}",
exchange.getRequest().getMethod(),
exchange.getRequest().getURI(),
requestId,
exchange.getResponse().getStatusCode());
// Add response headers
exchange.getResponse().getHeaders().add("X-Request-ID", requestId);
exchange.getResponse().getHeaders().add("X-Gateway-Response-Time", String.valueOf(System.currentTimeMillis()));
}));
}
@Override
public int getOrder() {
return -1; // High precedence
}
}
// application.yml
server:
port: 8080
spring:
application:
name: api-gateway
cloud:
gateway:
discovery:
locator:
enabled: true
lower-case-service-id: true
routes:
# Additional configuration can be added here
redis:
host: localhost
port: 6379
timeout: 2000ms
lettuce:
pool:
max-active: 8
max-idle: 8
min-idle: 0
security:
oauth2:
resourceserver:
jwt:
issuer-uri: http://auth-service
management:
endpoints:
web:
exposure:
include: health,info,metrics,prometheus,gateway
endpoint:
gateway:
enabled: true
logging:
level:
org.springframework.cloud.gateway: DEBUG
org.springframework.web.reactive: DEBUG
com.company.gateway: INFO
# Resilience4j configuration
resilience4j:
circuitbreaker:
instances:
user-service:
registerHealthIndicator: true
slidingWindowSize: 10
minimumNumberOfCalls: 5
permittedNumberOfCallsInHalfOpenState: 3
automaticTransitionFromOpenToHalfOpenEnabled: true
waitDurationInOpenState: 5s
failureRateThreshold: 50
eventConsumerBufferSize: 10
recordExceptions:
- org.springframework.web.reactive.function.client.WebClientResponseException
order-service:
registerHealthIndicator: true
slidingWindowSize: 10
minimumNumberOfCalls: 5
permittedNumberOfCallsInHalfOpenState: 3
automaticTransitionFromOpenToHalfOpenEnabled: true
waitDurationInOpenState: 5s
failureRateThreshold: 50
eventConsumerBufferSize: 10
recordExceptions:
- org.springframework.web.reactive.function.client.WebClientResponseException
product-service:
registerHealthIndicator: true
slidingWindowSize: 10
minimumNumberOfCalls: 5
permittedNumberOfCallsInHalfOpenState: 3
automaticTransitionFromOpenToHalfOpenEnabled: true
waitDurationInOpenState: 5s
failureRateThreshold: 50
eventConsumerBufferSize: 10
recordExceptions:
- org.springframework.web.reactive.function.client.WebClientResponseException
retry:
instances:
user-service:
maxAttempts: 3
waitDuration: 1s
retryExceptions:
- org.springframework.web.reactive.function.client.WebClientResponseException
order-service:
maxAttempts: 3
waitDuration: 1s
retryExceptions:
- org.springframework.web.reactive.function.client.WebClientResponseException
product-service:
maxAttempts: 3
waitDuration: 1s
retryExceptions:
- org.springframework.web.reactive.function.client.WebClientResponseException
ratelimiter:
instances:
user-service:
limitForPeriod: 10
limitRefreshPeriod: 1s
timeoutDuration: 0
registerHealthIndicator: true
order-service:
limitForPeriod: 20
limitRefreshPeriod: 1s
timeoutDuration: 0
registerHealthIndicator: true
product-service:
limitForPeriod: 15
limitRefreshPeriod: 1s
timeoutDuration: 0
registerHealthIndicator: true
3. Circuit Breaker Implementierung mit Python
# circuit_breaker.py
import time
import logging
from enum import Enum
from typing import Callable, Any, Optional
from functools import wraps
from dataclasses import dataclass
from threading import Lock
import asyncio
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
@dataclass
class CircuitBreakerConfig:
max_requests: int = 100
interval: float = 60.0 # seconds
timeout: float = 30.0 # seconds
expected_exception: type = Exception
reset_timeout: float = 60.0 # seconds
class CircuitBreaker:
def __init__(self, config: CircuitBreakerConfig):
self.config = config
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
self.success_count = 0
self.request_count = 0
self.lock = Lock()
self.logger = logging.getLogger(__name__)
def __call__(self, func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
return self.call(func, *args, **kwargs)
return wrapper
def call(self, func: Callable, *args, **kwargs) -> Any:
with self.lock:
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
self.logger.info("Circuit breaker transitioning to HALF_OPEN")
else:
raise CircuitBreakerOpenException("Circuit breaker is OPEN")
try:
self.request_count += 1
result = func(*args, **kwargs)
self._on_success()
return result
except self.config.expected_exception as e:
self._on_failure()
raise
except Exception as e:
self._on_failure()
raise
async def call_async(self, func: Callable, *args, **kwargs) -> Any:
with self.lock:
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
self.logger.info("Circuit breaker transitioning to HALF_OPEN")
else:
raise CircuitBreakerOpenException("Circuit breaker is OPEN")
try:
self.request_count += 1
if asyncio.iscoroutinefunction(func):
result = await func(*args, **kwargs)
else:
result = func(*args, **kwargs)
self._on_success()
return result
except self.config.expected_exception as e:
self._on_failure()
raise
except Exception as e:
self._on_failure()
raise
def _should_attempt_reset(self) -> bool:
return (time.time() - self.last_failure_time) >= self.config.reset_timeout
def _on_success(self):
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.config.max_requests:
self._reset()
else:
self.failure_count = 0
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
self.logger.warning("Circuit breaker transitioning to OPEN")
elif self.failure_count >= self.config.max_requests:
self.state = CircuitState.OPEN
self.logger.warning("Circuit breaker transitioning to OPEN")
def _reset(self):
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.request_count = 0
self.logger.info("Circuit breaker reset to CLOSED")
def get_state(self) -> CircuitState:
return self.state
def get_stats(self) -> dict:
return {
"state": self.state.value,
"failure_count": self.failure_count,
"success_count": self.success_count,
"request_count": self.request_count,
"last_failure_time": self.last_failure_time
}
class CircuitBreakerOpenException(Exception):
pass
# Usage examples
def example_service():
"""Example service that can fail"""
import random
if random.random() < 0.3: # 30% chance of failure
raise ValueError("Service temporarily unavailable")
return "Service response"
# Create circuit breaker
config = CircuitBreakerConfig(
max_requests=5,
interval=60.0,
timeout=30.0,
expected_exception=ValueError,
reset_timeout=30.0
)
circuit_breaker = CircuitBreaker(config)
# Apply circuit breaker to service
protected_service = circuit_breaker(example_service)
# Test the circuit breaker
def test_circuit_breaker():
for i in range(20):
try:
result = protected_service()
print(f"Request {i+1}: SUCCESS - {result}")
except CircuitBreakerOpenException as e:
print(f"Request {i+1}: CIRCUIT BREAKER OPEN - {e}")
except ValueError as e:
print(f"Request {i+1}: SERVICE FAILURE - {e}")
except Exception as e:
print(f"Request {i+1}: UNEXPECTED ERROR - {e}")
time.sleep(0.1)
# Async circuit breaker decorator
def circuit_breaker_async(config: CircuitBreakerConfig):
def decorator(func):
cb = CircuitBreaker(config)
@wraps(func)
async def wrapper(*args, **kwargs):
return await cb.call_async(func, *args, **kwargs)
return wrapper
return decorator
# Usage with async functions
@circuit_breaker_async(config)
async def async_service():
import random
await asyncio.sleep(0.1) # Simulate async operation
if random.random() < 0.3:
raise ValueError("Async service temporarily unavailable")
return "Async service response"
# Test async circuit breaker
async def test_async_circuit_breaker():
for i in range(20):
try:
result = await async_service()
print(f"Async Request {i+1}: SUCCESS - {result}")
except CircuitBreakerOpenException as e:
print(f"Async Request {i+1}: CIRCUIT BREAKER OPEN - {e}")
except ValueError as e:
print(f"Async Request {i+1}: SERVICE FAILURE - {e}")
except Exception as e:
print(f"Async Request {i+1}: UNEXPECTED ERROR - {e}")
await asyncio.sleep(0.1)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
print("Testing synchronous circuit breaker:")
test_circuit_breaker()
print("\nTesting asynchronous circuit breaker:")
asyncio.run(test_async_circuit_breaker())
Microservices Architektur
Service Communication Patterns
graph TD
A[Client] --> B[API Gateway]
B --> C[Service Discovery]
B --> D[User Service]
B --> E[Order Service]
B --> F[Product Service]
C --> G[Service Registry]
D --> H[Database]
E --> I[Database]
F --> J[Database]
D --> K[Message Queue]
E --> K
F --> K
K --> L[Notification Service]
M[Service Mesh] --> D
M --> E
M --> F
N[Monitoring] --> M
O[Logging] --> M
P[Tracing] --> M
Service Mesh Implementierungen
Istio vs. Linkerd vs. Consul Connect
| Feature | Istio | Linkerd | Consul Connect |
|---|---|---|---|
| Complexity | Hoch | Niedrig | Mittel |
| Performance | Gut | Sehr Gut | Gut |
| Integration | Kubernetes | Kubernetes | Multi-Cloud |
| Security | mTLS, RBAC | mTLS | mTLS, Intentions |
| Observability | Jaeger, Prometheus | Grafana, Prometheus | Prometheus |
Service Mesh Features
- Traffic Management: Routing, Load Balancing, Traffic Splitting
- Security: mTLS, Access Control, Certificate Management
- Observability: Metrics, Logging, Tracing
- Reliability: Retries, Timeouts, Circuit Breaking
- Policy Enforcement: Access Policies, Rate Limiting
API Gateway Patterns
Gateway Routing Strategies
| Pattern | Beschreibung | Anwendung |
|---|---|---|
| Routing | Anfragen an Services weiterleiten | Load Balancing |
| Composition | Multiple Services kombinieren | API Composition |
| Protocol Translation | Protokollkonvertierung | Legacy Integration |
| Aggregation | Daten von mehreren Services | Data Aggregation |
Gateway Features
- Authentication: JWT, OAuth2, API Keys
- Authorization: RBAC, Policy Enforcement
- Rate Limiting: User-based, Service-based
- Caching: Response Caching, Edge Caching
- Monitoring: Metrics, Logging, Tracing
Resilience Patterns
Circuit Breaker States
stateDiagram-v2
[*] --> CLOSED
CLOSED --> OPEN: Failure Threshold Reached
OPEN --> HALF_OPEN: Reset Timeout
HALF_OPEN --> CLOSED: Success Threshold Reached
HALF_OPEN --> OPEN: Failure Occurs
CLOSED --> CLOSED: Success
OPEN --> OPEN: Reset Timeout Not Reached
Resilience Patterns Overview
| Pattern | Zweck | Implementierung |
|---|---|---|
| Circuit Breaker | Fehlerkaskaden verhindern | State Machine |
| Retry | Temporäre Fehler überwinden | Exponential Backoff |
| Timeout | Endlosschleifen vermeiden | Time Limits |
| Bulkhead | Ressourcenisolierung | Thread Pools |
| Fallback | Ersatzlösungen bereitstellen | Alternative Services |
Distributed Tracing
Trace Context Propagation
# Trace context example
class TraceContext:
def __init__(self, trace_id: str, span_id: str, parent_span_id: str = None):
self.trace_id = trace_id
self.span_id = span_id
self.parent_span_id = parent_span_id
self.baggage = {}
def to_headers(self) -> dict:
return {
"X-Trace-ID": self.trace_id,
"X-Span-ID": self.span_id,
"X-Parent-Span-ID": self.parent_span_id or "",
"X-Baggage": json.dumps(self.baggage)
}
@classmethod
def from_headers(cls, headers: dict) -> 'TraceContext':
return cls(
trace_id=headers.get("X-Trace-ID", str(uuid.uuid4())),
span_id=headers.get("X-Span-ID", str(uuid.uuid4())),
parent_span_id=headers.get("X-Parent-Span-ID"),
baggage=json.loads(headers.get("X-Baggage", "{}"))
)
Tracing Implementierungen
| Tool | Integration | Features | Anwendung |
|---|---|---|---|
| Jaeger | OpenTelemetry | Distributed Tracing | Microservices |
| Zipkin | Spring Cloud | Request Tracing | Legacy Systems |
| AWS X-Ray | AWS Services | Cloud Tracing | AWS Environment |
| Google Cloud Trace | GCP Services | Performance Analysis | GCP Environment |
Vorteile und Nachteile
Vorteile von Microservices
- Skalierbarkeit: Einzelne Services unabhängig skalieren
- Flexibilität: Verschiedene Technologien pro Service
- Resilienz: Ausfall eines Services betrifft nicht das Gesamtsystem
- Team-Autonomie: Kleine Teams können unabhängig arbeiten
- Schnellere Deployment: Kleine, häufige Deployments
Nachteile
- Komplexität: Hohe infrastrukturelle Komplexität
- Kommunikations-Overhead: Netzwerklatenzen
- Datenkonsistenz: Verteilte Transaktionen
- Monitoring: Komplexes Monitoring erforderlich
- Testing: End-to-End Tests komplex
Häufige Prüfungsfragen
-
Was ist der Unterschied zwischen Service Mesh und API Gateway? Service Mesh manages communication between services, while API Gateway manages incoming client requests and routes them to services.
-
Erklären Sie das Circuit Breaker Pattern! Circuit Breaker verhindert Fehlerkaskaden, indem es Anfragen an fehleranfällige Services blockiert, wenn eine bestimmte Fehlerschwelle erreicht ist.
-
Wann verwendet man Distributed Tracing? Distributed Tracing wird verwendet, um Anfragen durch mehrere Microservices zu verfolgen, Performance-Engpässe zu identifizieren und Fehler zu debuggen.
-
Was sind die Hauptvorteile von Microservices? Unabhängige Skalierbarkeit, technologische Flexibilität, verbesserte Resilienz und schnellere Deployment-Zyklen.
Wichtigste Quellen
- https://istio.io/
- https://linkerd.io/
- https://spring.io/projects/spring-cloud-gateway
- https://opentelemetry.io/