Reactive Programming: Asynchrone Datenströme, Observables & Subjects
Dieser Beitrag ist eine umfassende Einführung in das Reactive Programming – inklusive asynchroner Datenströme, Observables, Subjects und praktischen Beispielen.
In a Nutshell
Reactive Programming dreht sich um asynchrone Datenströme. Anstatt zu warten, abonniert man Datenströme und reagiert auf Ereignisse, sobald sie eintreffen.
Kompakte Fachbeschreibung
Reactive Programming ist ein Paradigma für die Verarbeitung asynchroner Datenströme mit nicht-blockierenden, ereignisgesteuerten Programmen.
Kernprinzipien:
- Asynchronität: Aufgaben werden im Hintergrund ausgeführt, Hauptprogramm wird nicht blockiert
- Datenströme (Streams): Alles wird als Sequenz von Ereignissen betrachtet
- Observer Pattern: Datenströme werden abonniert, nicht abgefragt
- Reactive Streams: Standard für asynchrone Stream-Verarbeitung mit Backpressure
Wichtige Konzepte:
- Observable: Datenstrom, der 0..n Werte emitieren kann
- Observer: Empfänger der Daten vom Observable
- Subscription: Verbindung zwischen Observable und Observer
- Operators: Transformationen und Filterung von Datenströmen
- Scheduler: Kontrolle über Thread-Ausführung
- Backpressure: Schutz vor Überlastung bei schnellen Produzenten
Prüfungsrelevante Stichpunkte
- Reactive Programming: Asynchrone, nicht-blockierende Verarbeitung
- Observable: Datenstrom mit 0..n Werten
- Observer: Empfänger für Datenereignisse
- Subscription: Verbindungsmanagement zwischen Observable und Observer
- Operators: map, filter, flatMap für Stream-Transformation
- Backpressure: Schutz vor Überlastung, Flow Control
- Scheduler: Thread-Kontrolle für asynchrone Operationen
- IHK-relevant: Modernes Architekturparadigma für skalierbare Systeme
Kernkomponenten
- Observable: Quelle von Datenereignissen
- Observer: Empfänger der Daten
- Subscription: Verbindung und Ressourcenmanagement
- Operators: Datenstrom-Transformationen
- Subjects: Sowohl Observable als auch Observer
- Scheduler: Thread-Ausführungskontrolle
- Backpressure: Flow Control Mechanismen
- Reactive Streams: Standard-Spezifikation
Praxisbeispiele
1. Reactive Programming mit RxJava
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;
public class ReactiveProgrammingDemo {
public static void main(String[] args) throws InterruptedException {
// Einfaches Observable
Observable<String> observable = Observable.just("Hallo", "Reactive", "World");
// Observer abonnieren
observable.subscribe(
value -> System.out.println("Next: " + value),
error -> System.err.println("Error: " + error),
() -> System.out.println("Completed!")
);
// Asynchrone Operationen
asyncOperations();
// Operators Demonstration
operatorsDemo();
// Backpressure mit Flowable
backpressureDemo();
// Subjects für Multicasting
subjectsDemo();
}
private static void asyncOperations() {
System.out.println("\n=== Asynchrone Operationen ===");
// Observable mit Scheduler
Observable.fromArray("Task 1", "Task 2", "Task 3")
.subscribeOn(Schedulers.io()) // Ausführung auf IO-Thread
.observeOn(Schedulers.single()) // Beobachtung auf Single-Thread
.subscribe(
task -> System.out.println("Verarbeite: " + task + " auf " + Thread.currentThread().getName()),
error -> System.err.println("Fehler: " + error),
() -> System.out.println("Alle Tasks abgeschlossen")
);
// Zeitbasiertes Observable
Observable.interval(1, TimeUnit.SECONDS)
.take(5)
.map(tick -> "Tick " + (tick + 1))
.subscribe(
tick -> System.out.println(tick + " auf " + Thread.currentThread().getName())
);
// Asynchrone API-Aufrufe simulieren
Observable<String> apiCall = Observable.fromCallable(() -> {
Thread.sleep(1000); // Simulierter API-Aufruf
return "API-Daten";
}).subscribeOn(Schedulers.io());
apiCall
.observeOn(Schedulers.single())
.subscribe(
data -> System.out.println("API-Ergebnis: " + data),
error -> System.err.println("API-Fehler: " + error)
);
}
private static void operatorsDemo() {
System.out.println("\n=== Operators Demonstration ===");
Observable.range(1, 10)
.filter(n -> n % 2 == 0) // Gerade Zahlen filtern
.map(n -> n * n) // Quadrieren
.take(3) // Nur erste 3 Elemente
.subscribe(
result -> System.out.println("Result: " + result),
error -> System.err.println("Error: " + error),
() -> System.out.println("Operators Demo abgeschlossen")
);
// flatMap für asynchrone Transformation
Observable.just("user1", "user2", "user3")
.flatMap(userId -> getUserData(userId)
.subscribeOn(Schedulers.io()) // Jeder Aufruf auf eigenem Thread
)
.subscribe(
userData -> System.out.println("User Data: " + userData),
error -> System.err.println("User Error: " + error)
);
}
private static void backpressureDemo() {
System.out.println("\n=== Backpressure Demo ===");
// Fast Producer mit Flowable
Flowable.range(1, 1000)
.onBackpressureBuffer(100) // Buffer mit Größe 100
.observeOn(Schedulers.io())
.subscribe(
item -> {
try {
Thread.sleep(10); // Langsamer Consumer
System.out.println("Processed: " + item);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
},
error -> System.err.println("Backpressure Error: " + error)
);
}
private static void subjectsDemo() {
System.out.println("\n=== Subjects Demo ===");
// PublishSubject für Multicasting
PublishSubject<String> publishSubject = PublishSubject.create();
// Mehrere Observer abonnieren
Observer<String> observer1 = new Observer<String>() {
@Override
public void onNext(String value) {
System.out.println("Observer 1: " + value);
}
@Override
public void onError(Throwable e) {
System.err.println("Observer 1 Error: " + e);
}
@Override
public void onComplete() {
System.out.println("Observer 1 Completed");
}
};
Observer<String> observer2 = new Observer<String>() {
@Override
public void onNext(String value) {
System.out.println("Observer 2: " + value);
}
@Override
public void onError(Throwable e) {
System.err.println("Observer 2 Error: " + e);
}
@Override
public void onComplete() {
System.out.println("Observer 2 Completed");
}
};
publishSubject.subscribe(observer1);
publishSubject.subscribe(observer2);
// Daten emittieren
publishSubject.onNext("Message 1");
publishSubject.onNext("Message 2");
publishSubject.onComplete();
// BehaviorSubject für letzten Wert
BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.createDefault(0);
behaviorSubject.subscribe(
value -> System.out.println("Behavior Subject: " + value)
);
behaviorSubject.onNext(10);
behaviorSubject.onNext(20);
// Späterer Subscriber erhält letzten Wert
behaviorSubject.subscribe(
value -> System.out.println("Late Subscriber: " + value)
);
}
// Simulierter API-Aufruf
private static Observable<String> getUserData(String userId) {
return Observable.fromCallable(() -> {
Thread.sleep(500);
return "UserData for " + userId;
});
}
}
2. Reactive Programming mit Project Reactor (Spring)
import reactor.core.publisher.*;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.List;
public class ReactorDemo {
public static void main(String[] args) throws InterruptedException {
// Mono für 0..1 Werte
Mono<String> mono = Mono.just("Hello Reactor");
mono.subscribe(
value -> System.out.println("Mono: " + value),
error -> System.err.println("Error: " + error)
);
// Flux für 0..n Werte
Flux<String> flux = Flux.just("A", "B", "C", "D", "E");
flux
.filter(letter -> !"C".equals(letter))
.map(String::toUpperCase)
.subscribe(
letter -> System.out.println("Flux: " + letter),
error -> System.err.println("Flux Error: " + error),
() -> System.out.println("Flux Completed")
);
// Asynchrone Web-API Simulation
webApiSimulation();
// Error Handling
errorHandling();
// Backpressure
backpressureHandling();
}
private static void webApiSimulation() {
System.out.println("\n=== Web API Simulation ===");
// Simuliere Web-Request
Mono<String> webResponse = Mono.fromCallable(() -> {
Thread.sleep(1000); // Network latency
return "Response Data";
}).subscribeOn(Schedulers.boundedElastic());
webResponse
.map(response -> "Processed: " + response)
.timeout(Duration.ofSeconds(2))
.onErrorResume(error -> Mono.just("Fallback Response"))
.subscribe(
result -> System.out.println("Web Result: " + result),
error -> System.err.println("Web Error: " + error)
);
}
private static void errorHandling() {
System.out.println("\n=== Error Handling ===");
Flux<Integer> numbers = Flux.range(1, 10)
.map(n -> {
if (n == 5) {
throw new RuntimeException("Error at " + n);
}
return n * n;
});
numbers
.onErrorContinue((error, item) ->
System.err.println("Error processing " + item + ": " + error))
.subscribe(
result -> System.out.println("Result: " + result),
error -> System.err.println("Final Error: " + error),
() -> System.out.println("Error Handling Completed")
);
}
private static void backpressureHandling() {
System.out.println("\n=== Backpressure Handling ===");
// Fast Producer
Flux<Long> fastProducer = Flux.interval(Duration.ofMillis(10))
.take(100);
// Slow Consumer with backpressure
fastProducer
.onBackpressureBuffer(50) // Buffer mit Drop-Strategy
.publishOn(Schedulers.boundedElastic())
.delayElements(Duration.ofMillis(50)) // Langsamer Consumer
.subscribe(
item -> System.out.println("Processed: " + item),
error -> System.err.println("Backpressure Error: " + error)
);
}
}
// Reactive Service Beispiel
@Service
public class ReactiveUserService {
public Flux<User> getAllUsers() {
return Flux.fromIterable(getUserList())
.subscribeOn(Schedulers.parallel());
}
public Mono<User> getUserById(String id) {
return Mono.fromCallable(() -> findUserById(id))
.subscribeOn(Schedulers.boundedElastic())
.filter(user -> user != null)
.switchIfEmpty(Mono.error(new UserNotFoundException("User not found: " + id)));
}
public Flux<User> searchUsers(String query) {
return getAllUsers()
.filter(user -> user.getName().toLowerCase().contains(query.toLowerCase()))
.take(10); // Limit results
}
// Simulierte Datenquelle
private List<User> getUserList() {
return List.of(
new User("1", "Alice", "alice@example.com"),
new User("2", "Bob", "bob@example.com"),
new User("3", "Charlie", "charlie@example.com")
);
}
private User findUserById(String id) {
return getUserList().stream()
.filter(user -> user.getId().equals(id))
.findFirst()
.orElse(null);
}
}
class User {
private String id;
private String name;
private String email;
public User(String id, String name, String email) {
this.id = id;
this.name = name;
this.email = email;
}
// Getter
public String getId() { return id; }
public String getName() { return name; }
public String getEmail() { return email; }
}
3. Reactive Programming mit JavaScript (RxJS)
// RxJS Reactive Programming
const { Observable, Subject, BehaviorSubject, fromEvent, interval, of } = require('rxjs');
const { map, filter, switchMap, take, debounceTime, distinctUntilChanged } = require('rxjs/operators');
// Einfaches Observable
const helloObservable = of('Hello', 'Reactive', 'JavaScript');
helloObservable.subscribe({
next: value => console.log('Next:', value),
error: error => console.error('Error:', error),
complete: () => console.log('Complete!')
});
// DOM Events als Observable
const button = document.createElement('button');
button.textContent = 'Click me!';
document.body.appendChild(button);
const clickObservable = fromEvent(button, 'click');
clickObservable
.pipe(
map(event => ({ type: 'click', timestamp: Date.now() })),
debounceTime(300),
distinctUntilChanged()
)
.subscribe(event => {
console.log('Button clicked:', event);
// API-Aufruf simulieren
simulateApiCall();
});
// API-Aufrufe mit Reactive Pattern
function simulateApiCall() {
return new Observable(subscriber => {
console.log('API Call started...');
setTimeout(() => {
const success = Math.random() > 0.3;
if (success) {
subscriber.next({ data: 'API Response', status: 'success' });
subscriber.complete();
} else {
subscriber.error(new Error('API Error'));
}
}, 1000);
});
}
// Search mit Reactive Pattern
const searchInput = document.createElement('input');
searchInput.placeholder = 'Search...';
document.body.appendChild(searchInput);
const searchObservable = fromEvent(searchInput, 'input')
.pipe(
map(event => event.target.value),
filter(query => query.length >= 3),
debounceTime(500),
distinctUntilChanged(),
switchMap(query => searchApi(query))
);
searchObservable.subscribe({
next: results => console.log('Search results:', results),
error: error => console.error('Search error:', error)
});
function searchApi(query) {
return new Observable(subscriber => {
console.log('Searching for:', query);
setTimeout(() => {
const results = [`Result 1 for ${query}`, `Result 2 for ${query}`];
subscriber.next(results);
subscriber.complete();
}, 300);
});
}
// Subject für Multicasting
const subject = new Subject();
// Mehrere Subscriber
const subscriber1 = {
next: value => console.log('Subscriber 1:', value),
error: error => console.error('Subscriber 1 Error:', error),
complete: () => console.log('Subscriber 1 Complete')
};
const subscriber2 = {
next: value => console.log('Subscriber 2:', value),
error: error => console.error('Subscriber 2 Error:', error),
complete: () => console.log('Subscriber 2 Complete')
};
subject.subscribe(subscriber1);
subject.subscribe(subscriber2);
// Daten emittieren
subject.next('Message 1');
subject.next('Message 2');
subject.complete();
// BehaviorSubject für letzten Wert
const behaviorSubject = new BehaviorSubject(0);
behaviorSubject.subscribe(value => console.log('Initial:', value));
behaviorSubject.next(10);
behaviorSubject.next(20);
// Späterer Subscriber erhält letzten Wert
behaviorSubject.subscribe(value => console.log('Late Subscriber:', value));
// WebSocket mit Reactive Pattern
class ReactiveWebSocket {
constructor(url) {
this.url = url;
this.messageSubject = new Subject();
this.connectionStatus = new BehaviorSubject('disconnected');
}
connect() {
this.connectionStatus.next('connecting');
// WebSocket Verbindung simulieren
this.socket = {
send: (data) => console.log('Sending:', data),
close: () => console.log('WebSocket closed')
};
// Simuliere eingehende Nachrichten
setInterval(() => {
const message = { type: 'data', payload: Math.random() };
this.messageSubject.next(message);
}, 2000);
this.connectionStatus.next('connected');
return this.connectionStatus.asObservable();
}
disconnect() {
if (this.socket) {
this.socket.close();
this.connectionStatus.next('disconnected');
}
}
sendMessage(message) {
if (this.socket) {
this.socket.send(JSON.stringify(message));
}
}
getMessages() {
return this.messageSubject.asObservable();
}
getConnectionStatus() {
return this.connectionStatus.asObservable();
}
}
// WebSocket verwenden
const webSocket = new ReactiveWebSocket('ws://localhost:8080');
webSocket.connect().subscribe(status => {
console.log('Connection Status:', status);
});
webSocket.getMessages().subscribe(message => {
console.log('Received Message:', message);
});
// Nachricht senden
setTimeout(() => {
webSocket.sendMessage({ type: 'ping', data: 'Hello Server' });
}, 5000);
4. Reactive Programming mit Python (RxPY)
import rx
from rx import operators as ops
import time
import threading
from datetime import datetime
# Einfaches Observable
def simple_observable():
source = rx.of("Python", "Reactive", "Programming")
source.subscribe(
on_next=lambda value: print(f"Next: {value}"),
on_error=lambda error: print(f"Error: {error}"),
on_completed=lambda: print("Completed!")
)
# Asynchrone Operationen
def async_operations():
print("\n=== Asynchrone Operationen ===")
# Zeitbasiertes Observable
rx.interval(1.0).pipe(
ops.take(5),
ops.map(lambda i: f"Tick {i + 1}")
).subscribe(
on_next=lambda value: print(f"{value} at {datetime.now().second}s"),
on_completed=lambda: print("Timer completed!")
)
# Asynchrone API-Aufrufe simulieren
def simulate_api_call(item):
def observer(observer):
def worker():
time.sleep(1) # Simuliere Netzwerkverzögerung
if item % 3 == 0:
observer.on_next(f"Data for {item}")
observer.on_completed()
else:
observer.on_error(Exception(f"Error for {item}"))
thread = threading.Thread(target=worker)
thread.start()
return rx.create(observer)
rx.range(1, 6).pipe(
ops.map(simulate_api_call),
ops.merge_all()
).subscribe(
on_next=lambda value: print(f"API Result: {value}"),
on_error=lambda error: print(f"API Error: {error}")
)
# Operators Demonstration
def operators_demo():
print("\n=== Operators Demo ===")
rx.range(1, 11).pipe(
ops.filter(lambda x: x % 2 == 0),
ops.map(lambda x: x * x),
ops.take(3)
).subscribe(
on_next=lambda value: print(f"Result: {value}"),
on_completed=lambda: print("Operators completed!")
)
# flatMap für asynchrone Transformation
def get_user_data(user_id):
return rx.of(f"UserData for {user_id}").pipe(
ops.delay(0.5) # Simuliere Verzögerung
)
rx.of("user1", "user2", "user3").pipe(
ops.flat_map(get_user_data)
).subscribe(
on_next=lambda data: print(f"User: {data}")
)
# Subject für Multicasting
def subjects_demo():
print("\n=== Subjects Demo ===")
# PublishSubject
subject = rx.Subject()
# Mehrere Subscriber
def observer1(value):
print(f"Observer 1: {value}")
def observer2(value):
print(f"Observer 2: {value}")
subject.subscribe(observer1)
subject.subscribe(observer2)
# Daten emittieren
subject.on_next("Message 1")
subject.on_next("Message 2")
subject.on_completed()
# BehaviorSubject
behavior_subject = rx.BehaviorSubject(0)
behavior_subject.subscribe(
on_next=lambda value: print(f"Behavior Subject: {value}")
)
behavior_subject.on_next(10)
behavior_subject.on_next(20)
# Späterer Subscriber
behavior_subject.subscribe(
on_next=lambda value: print(f"Late Subscriber: {value}")
)
# Hot vs Cold Observable
def hot_cold_demo():
print("\n=== Hot vs Cold Observable ===")
# Cold Observable (jeder Subscriber bekommt alle Werte)
cold = rx.of("A", "B", "C")
print("Cold Observable:")
cold.subscribe(on_next=lambda x: print(f"Subscriber 1: {x}"))
time.sleep(1)
cold.subscribe(on_next=lambda x: print(f"Subscriber 2: {x}"))
# Hot Observable (geteilte Werte)
hot = rx.Subject()
print("\nHot Observable:")
hot.subscribe(on_next=lambda x: print(f"Subscriber 1: {x}"))
hot.on_next("X")
hot.on_next("Y")
hot.subscribe(on_next=lambda x: print(f"Subscriber 2: {x}"))
hot.on_next("Z")
if __name__ == "__main__":
simple_observable()
async_operations()
operators_demo()
subjects_demo()
hot_cold_demo()
# Programm laufen lassen für asynchrone Operationen
time.sleep(10)
Reactive Streams Backpressure
Backpressure Strategien
// RxJava Backpressure
Flowable.range(1, 1000)
.onBackpressureBuffer() // Buffer (Standard)
.onBackpressureDrop() // Überschüssige Elemente verwerfen
.onBackpressureLatest() // Nur neuestes Element behalten
.onBackpressureError() // Fehler bei Überlastung
.subscribe(item -> processItem(item));
// Project Reactor Backpressure
Flux.range(1, 1000)
.onBackpressureBuffer(100) // Buffer mit Größe
.onBackpressureDrop() // Elemente verwerfen
.onBackpressureLatest() // Nur letztes Element
.limitRate(100) // Rate Limiting
.subscribe(item -> processItem(item));
Scheduler für Thread-Kontrolle
RxJava Scheduler
Observable.just("data")
.subscribeOn(Schedulers.io()) // Ausführung auf IO-Thread
.observeOn(Schedulers.single()) // Beobachtung auf Single-Thread
.observeOn(Schedulers.computation()) // Berechnungen auf CPU-Thread
.subscribe(result -> handleResult(result));
Project Reactor Scheduler
Mono.just("data")
.subscribeOn(Schedulers.boundedElastic()) // IO-Operationen
.publishOn(Schedulers.parallel()) // Parallelverarbeitung
.publishOn(Schedulers.single()) // Single Thread
.subscribe(result -> handleResult(result));
Vorteile und Nachteile
Vorteile von Reactive Programming
- Asynchronität: Nicht-blockierende Verarbeitung
- Skalierbarkeit: Bessere Nutzung von Systemressourcen
- Responsiveness: Schnellere Antwortzeiten
- Flexibilität: Leichte Komposition von Operationen
- Error Handling: Zentralisierte Fehlerbehandlung
Nachteile
- Komplexität: Steile Lernkurve
- Debugging: Schwierigere Fehlersuche
- Overhead: Zusätzliche Abstraktionsschicht
- Ressourcenmanagement: Komplexeres Lifecycle-Management
Häufige Prüfungsfragen
-
Was ist der Unterschied zwischen Observable und Subject? Observable ist nur Quelle, Subject kann sowohl als Quelle als auch als Empfänger fungieren.
-
Erklären Sie Backpressure! Mechanismus zum Schutz vor Überlastung bei schnellen Produzenten und langsamen Konsumenten.
-
Wann verwendet man Reactive Programming? Bei asynchronen Operationen, Echtzeitanwendungen und Systemen mit hohen Anforderungen an Skalierbarkeit.
-
Was ist der Zweck von Schedulern? Kontrolle über Thread-Ausführung und Parallelverarbeitung in reaktiven Systemen.