Reactive Programming: Asynchronous Data Streams, Observables & Subjects
This article is a comprehensive introduction to Reactive Programming – including asynchronous data streams, Observables, Subjects and practical examples.
In a Nutshell
Reactive Programming revolves around asynchronous data streams. Instead of waiting, you subscribe to data streams and react to events as they arrive.
Compact Technical Description
Reactive Programming is a paradigm for processing asynchronous data streams with non-blocking, event-driven programs.
Core Principles:
- Asynchronicity: Tasks are executed in the background, main program is not blocked
- Data Streams (Streams): Everything is considered as a sequence of events
- Observer Pattern: Data streams are subscribed to, not polled
- Reactive Streams: Standard for asynchronous stream processing with backpressure
Important Concepts:
- Observable: Data stream that can emit 0..n values
- Observer: Recipient of data from the Observable
- Subscription: Connection between Observable and Observer
- Operators: Transformations and filtering of data streams
- Scheduler: Control over thread execution
- Backpressure: Protection against overload from fast producers
Exam-Relevant Key Points
- Reactive Programming: Asynchronous, non-blocking processing
- Observable: Data stream with 0..n values
- Observer: Recipient for data events
- Subscription: Connection management between Observable and Observer
- Operators: map, filter, flatMap for stream transformation
- Backpressure: Protection against overload, flow control
- Scheduler: Thread control for asynchronous operations
- IHK-relevant: Modern architecture paradigm for scalable systems
Core Components
- Observable: Source of data events
- Observer: Recipient of data
- Subscription: Connection and resource management
- Operators: Data stream transformations
- Subjects: Both Observable and Observer
- Scheduler: Thread execution control
- Backpressure: Flow control mechanisms
- Reactive Streams: Standard specification
Practical Examples
1. Reactive Programming with RxJava
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;
public class ReactiveProgrammingDemo {
public static void main(String[] args) throws InterruptedException {
// Simple Observable
Observable<String> observable = Observable.just("Hello", "Reactive", "World");
// Subscribe Observer
observable.subscribe(
value -> System.out.println("Next: " + value),
error -> System.err.println("Error: " + error),
() -> System.out.println("Completed!")
);
// Asynchronous operations
asyncOperations();
// Operators demonstration
operatorsDemo();
// Backpressure with Flowable
backpressureDemo();
// Subjects for multicasting
subjectsDemo();
}
private static void asyncOperations() {
System.out.println("\n=== Asynchronous Operations ===");
// Observable with Scheduler
Observable.fromArray("Task 1", "Task 2", "Task 3")
.subscribeOn(Schedulers.io()) // Execution on IO-Thread
.observeOn(Schedulers.single()) // Observation on single-thread
.subscribe(
task -> System.out.println("Processing: " + task + " on " + Thread.currentThread().getName()),
error -> System.err.println("Error: " + error),
() -> System.out.println("All tasks completed")
);
// Time-based Observable
Observable.interval(1, TimeUnit.SECONDS)
.take(5)
.map(tick -> "Tick " + (tick + 1))
.subscribe(
tick -> System.out.println(tick + " on " + Thread.currentThread().getName())
);
// Simulate asynchronous API calls
Observable<String> apiCall = Observable.fromCallable(() -> {
Thread.sleep(1000); // Simulated API call
return "API-Data";
}).subscribeOn(Schedulers.io());
apiCall
.observeOn(Schedulers.single())
.subscribe(
data -> System.out.println("API result: " + data),
error -> System.err.println("API error: " + error)
);
}
private static void operatorsDemo() {
System.out.println("\n=== Operators Demonstration ===");
Observable.range(1, 10)
.filter(n -> n % 2 == 0) // Filter even numbers
.map(n -> n * n) // Square
.take(3) // Only first 3 elements
.subscribe(
result -> System.out.println("Result: " + result),
error -> System.err.println("Error: " + error),
() -> System.out.println("Operators demo completed")
);
// flatMap for asynchronous transformation
Observable.just("user1", "user2", "user3")
.flatMap(userId -> getUserData(userId)
.subscribeOn(Schedulers.io()) // Each call on own thread
)
.subscribe(
userData -> System.out.println("User Data: " + userData),
error -> System.err.println("User Error: " + error)
);
}
private static void backpressureDemo() {
System.out.println("\n=== Backpressure Demo ===");
// Fast producer with Flowable
Flowable.range(1, 1000)
.onBackpressureBuffer(100) // Buffer with size 100
.observeOn(Schedulers.io())
.subscribe(
item -> {
try {
Thread.sleep(10); // Slow consumer
System.out.println("Processed: " + item);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
},
error -> System.err.println("Backpressure Error: " + error)
);
}
private static void subjectsDemo() {
System.out.println("\n=== Subjects Demo ===");
// PublishSubject for multicasting
PublishSubject<String> publishSubject = PublishSubject.create();
// Multiple observers subscribe
Observer<String> observer1 = new Observer<String>() {
@Override
public void onNext(String value) {
System.out.println("Observer 1: " + value);
}
@Override
public void onError(Throwable e) {
System.err.println("Observer 1 Error: " + e);
}
@Override
public void onComplete() {
System.out.println("Observer 1 Completed");
}
};
Observer<String> observer2 = new Observer<String>() {
@Override
public void onNext(String value) {
System.out.println("Observer 2: " + value);
}
@Override
public void onError(Throwable e) {
System.err.println("Observer 2 Error: " + e);
}
@Override
public void onComplete() {
System.out.println("Observer 2 Completed");
}
};
publishSubject.subscribe(observer1);
publishSubject.subscribe(observer2);
// Emit data
publishSubject.onNext("Message 1");
publishSubject.onNext("Message 2");
publishSubject.onComplete();
// BehaviorSubject for last value
BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.createDefault(0);
behaviorSubject.subscribe(
value -> System.out.println("Behavior Subject: " + value)
);
behaviorSubject.onNext(10);
behaviorSubject.onNext(20);
// Late subscriber receives last value
behaviorSubject.subscribe(
value -> System.out.println("Late Subscriber: " + value)
);
}
// Simulated API call
private static Observable<String> getUserData(String userId) {
return Observable.fromCallable(() -> {
Thread.sleep(500);
return "UserData for " + userId;
});
}
}
2. Reactive Programming with Project Reactor (Spring)
import reactor.core.publisher.*;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.List;
public class ReactorDemo {
public static void main(String[] args) throws InterruptedException {
// Mono for 0..1 values
Mono<String> mono = Mono.just("Hello Reactor");
mono.subscribe(
value -> System.out.println("Mono: " + value),
error -> System.err.println("Error: " + error)
);
// Flux for 0..n values
Flux<String> flux = Flux.just("A", "B", "C", "D", "E");
flux
.filter(letter -> !"C".equals(letter))
.map(String::toUpperCase)
.subscribe(
letter -> System.out.println("Flux: " + letter),
error -> System.err.println("Flux Error: " + error),
() -> System.out.println("Flux Completed")
);
// Asynchronous Web-API simulation
webApiSimulation();
// Error Handling
errorHandling();
// Backpressure
backpressureHandling();
}
private static void webApiSimulation() {
System.out.println("\n=== Web API Simulation ===");
// Simulate web request
Mono<String> webResponse = Mono.fromCallable(() -> {
Thread.sleep(1000); // Network latency
return "Response Data";
}).subscribeOn(Schedulers.boundedElastic());
webResponse
.map(response -> "Processed: " + response)
.timeout(Duration.ofSeconds(2))
.onErrorResume(error -> Mono.just("Fallback Response"))
.subscribe(
result -> System.out.println("Web Result: " + result),
error -> System.err.println("Web Error: " + error)
);
}
private static void errorHandling() {
System.out.println("\n=== Error Handling ===");
Flux<Integer> numbers = Flux.range(1, 10)
.map(n -> {
if (n == 5) {
throw new RuntimeException("Error at " + n);
}
return n * n;
});
numbers
.onErrorContinue((error, item) ->
System.err.println("Error processing " + item + ": " + error))
.subscribe(
result -> System.out.println("Result: " + result),
error -> System.err.println("Final Error: " + error),
() -> System.out.println("Error Handling Completed")
);
}
private static void backpressureHandling() {
System.out.println("\n=== Backpressure Handling ===");
// Fast Producer
Flux<Long> fastProducer = Flux.interval(Duration.ofMillis(10))
.take(100);
// Slow Consumer with backpressure
fastProducer
.onBackpressureBuffer(50) // Buffer with drop strategy
.publishOn(Schedulers.boundedElastic())
.delayElements(Duration.ofMillis(50)) // Slow consumer
.subscribe(
item -> System.out.println("Processed: " + item),
error -> System.err.println("Backpressure Error: " + error)
);
}
}
// Reactive Service example
@Service
public class ReactiveUserService {
public Flux<User> getAllUsers() {
return Flux.fromIterable(getUserList())
.subscribeOn(Schedulers.parallel());
}
public Mono<User> getUserById(String id) {
return Mono.fromCallable(() -> findUserById(id))
.subscribeOn(Schedulers.boundedElastic())
.filter(user -> user != null)
.switchIfEmpty(Mono.error(new UserNotFoundException("User not found: " + id)));
}
public Flux<User> searchUsers(String query) {
return getAllUsers()
.filter(user -> user.getName().toLowerCase().contains(query.toLowerCase()))
.take(10); // Limit results
}
// Simulated data source
private List<User> getUserList() {
return List.of(
new User("1", "Alice", "alice@example.com"),
new User("2", "Bob", "bob@example.com"),
new User("3", "Charlie", "charlie@example.com")
);
}
private User findUserById(String id) {
return getUserList().stream()
.filter(user -> user.getId().equals(id))
.findFirst()
.orElse(null);
}
}
class User {
private String id;
private String name;
private String email;
public User(String id, String name, String email) {
this.id = id;
this.name = name;
this.email = email;
}
// Getter
public String getId() { return id; }
public String getName() { return name; }
public String getEmail() { return email; }
}
3. Reactive Programming with JavaScript (RxJS)
// RxJS Reactive Programming
const { Observable, Subject, BehaviorSubject, fromEvent, interval, of } = require('rxjs');
const { map, filter, switchMap, take, debounceTime, distinctUntilChanged } = require('rxjs/operators');
// Simple Observable
const helloObservable = of('Hello', 'Reactive', 'JavaScript');
helloObservable.subscribe({
next: value => console.log('Next:', value),
error: error => console.error('Error:', error),
complete: () => console.log('Complete!')
});
// DOM Events as Observable
const button = document.createElement('button');
button.textContent = 'Click me!';
document.body.appendChild(button);
const clickObservable = fromEvent(button, 'click');
clickObservable
.pipe(
map(event => ({ type: 'click', timestamp: Date.now() })),
debounceTime(300),
distinctUntilChanged()
)
.subscribe(event => {
console.log('Button clicked:', event);
// Simulate API call
simulateApiCall();
});
// API calls with reactive pattern
function simulateApiCall() {
return new Observable(subscriber => {
console.log('API Call started...');
setTimeout(() => {
const success = Math.random() > 0.3;
if (success) {
subscriber.next({ data: 'API Response', status: 'success' });
subscriber.complete();
} else {
subscriber.error(new Error('API Error'));
}
}, 1000);
});
}
// Search with reactive pattern
const searchInput = document.createElement('input');
searchInput.placeholder = 'Search...';
document.body.appendChild(searchInput);
const searchObservable = fromEvent(searchInput, 'input')
.pipe(
map(event => event.target.value),
filter(query => query.length >= 3),
debounceTime(500),
distinctUntilChanged(),
switchMap(query => searchApi(query))
);
searchObservable.subscribe({
next: results => console.log('Search results:', results),
error: error => console.error('Search error:', error)
});
function searchApi(query) {
return new Observable(subscriber => {
console.log('Searching for:', query);
setTimeout(() => {
const results = [`Result 1 for ${query}`, `Result 2 for ${query}`];
subscriber.next(results);
subscriber.complete();
}, 300);
});
}
// Subject for multicasting
const subject = new Subject();
// Multiple subscribers
const subscriber1 = {
next: value => console.log('Subscriber 1:', value),
error: error => console.error('Subscriber 1 Error:', error),
complete: () => console.log('Subscriber 1 Complete')
};
const subscriber2 = {
next: value => console.log('Subscriber 2:', value),
error: error => console.error('Subscriber 2 Error:', error),
complete: () => console.log('Subscriber 2 Complete')
};
subject.subscribe(subscriber1);
subject.subscribe(subscriber2);
// Emit data
subject.next('Message 1');
subject.next('Message 2');
subject.complete();
// BehaviorSubject for last value
const behaviorSubject = new BehaviorSubject(0);
behaviorSubject.subscribe(value => console.log('Initial:', value));
behaviorSubject.next(10);
behaviorSubject.next(20);
// Late subscriber receives last value
behaviorSubject.subscribe(value => console.log('Late Subscriber:', value));
// WebSocket with reactive pattern
class ReactiveWebSocket {
constructor(url) {
this.url = url;
this.messageSubject = new Subject();
this.connectionStatus = new BehaviorSubject('disconnected');
}
connect() {
this.connectionStatus.next('connecting');
// Simulate WebSocket connection
this.socket = {
send: (data) => console.log('Sending:', data),
close: () => console.log('WebSocket closed')
};
// Simulate incoming messages
setInterval(() => {
const message = { type: 'data', payload: Math.random() };
this.messageSubject.next(message);
}, 2000);
this.connectionStatus.next('connected');
return this.connectionStatus.asObservable();
}
disconnect() {
if (this.socket) {
this.socket.close();
this.connectionStatus.next('disconnected');
}
}
sendMessage(message) {
if (this.socket) {
this.socket.send(JSON.stringify(message));
}
}
getMessages() {
return this.messageSubject.asObservable();
}
getConnectionStatus() {
return this.connectionStatus.asObservable();
}
}
// Use WebSocket
const webSocket = new ReactiveWebSocket('ws://localhost:8080');
webSocket.connect().subscribe(status => {
console.log('Connection Status:', status);
});
webSocket.getMessages().subscribe(message => {
console.log('Received Message:', message);
});
// Send message
setTimeout(() => {
webSocket.sendMessage({ type: 'ping', data: 'Hello Server' });
}, 5000);
4. Reactive Programming with Python (RxPY)
import rx
from rx import operators as ops
import time
import threading
from datetime import datetime
# Simple Observable
def simple_observable():
source = rx.of("Python", "Reactive", "Programming")
source.subscribe(
on_next=lambda value: print(f"Next: {value}"),
on_error=lambda error: print(f"Error: {error}"),
on_completed=lambda: print("Completed!")
)
# Asynchronous Operations
def async_operations():
print("\n=== Asynchronous Operations ===")
# Time-based Observable
rx.interval(1.0).pipe(
ops.take(5),
ops.map(lambda i: f"Tick {i + 1}")
).subscribe(
on_next=lambda value: print(f"{value} at {datetime.now().second}s"),
on_completed=lambda: print("Timer completed!")
)
# Simulate asynchronous API calls
def simulate_api_call(item):
def observer(observer):
def worker():
time.sleep(1) # Simulate network delay
if item % 3 == 0:
observer.on_next(f"Data for {item}")
observer.on_completed()
else:
observer.on_error(Exception(f"Error for {item}"))
thread = threading.Thread(target=worker)
thread.start()
return rx.create(observer)
rx.range(1, 6).pipe(
ops.map(simulate_api_call),
ops.merge_all()
).subscribe(
on_next=lambda value: print(f"API Result: {value}"),
on_error=lambda error: print(f"API Error: {error}")
)
# Operators Demonstration
def operators_demo():
print("\n=== Operators Demo ===")
rx.range(1, 11).pipe(
ops.filter(lambda x: x % 2 == 0),
ops.map(lambda x: x * x),
ops.take(3)
).subscribe(
on_next=lambda value: print(f"Result: {value}"),
on_completed=lambda: print("Operators completed!")
)
# flatMap for asynchronous transformation
def get_user_data(user_id):
return rx.of(f"UserData for {user_id}").pipe(
ops.delay(0.5) # Simulate delay
)
rx.of("user1", "user2", "user3").pipe(
ops.flat_map(get_user_data)
).subscribe(
on_next=lambda data: print(f"User: {data}")
)
# Subject for Multicasting
def subjects_demo():
print("\n=== Subjects Demo ===")
# PublishSubject
subject = rx.Subject()
# Multiple Subscribers
def observer1(value):
print(f"Observer 1: {value}")
def observer2(value):
print(f"Observer 2: {value}")
subject.subscribe(observer1)
subject.subscribe(observer2)
# Emit data
subject.on_next("Message 1")
subject.on_next("Message 2")
subject.on_completed()
# BehaviorSubject
behavior_subject = rx.BehaviorSubject(0)
behavior_subject.subscribe(
on_next=lambda value: print(f"Behavior Subject: {value}")
)
behavior_subject.on_next(10)
behavior_subject.on_next(20)
# Late Subscriber
behavior_subject.subscribe(
on_next=lambda value: print(f"Late Subscriber: {value}")
)
# Hot vs Cold Observable
def hot_cold_demo():
print("\n=== Hot vs Cold Observable ===")
# Cold Observable (each subscriber gets all values)
cold = rx.of("A", "B", "C")
print("Cold Observable:")
cold.subscribe(on_next=lambda x: print(f"Subscriber 1: {x}"))
time.sleep(1)
cold.subscribe(on_next=lambda x: print(f"Subscriber 2: {x}"))
# Hot Observable (shared values)
hot = rx.Subject()
print("\nHot Observable:")
hot.subscribe(on_next=lambda x: print(f"Subscriber 1: {x}"))
hot.on_next("X")
hot.on_next("Y")
hot.subscribe(on_next=lambda x: print(f"Subscriber 2: {x}"))
hot.on_next("Z")
if __name__ == "__main__":
simple_observable()
async_operations()
operators_demo()
subjects_demo()
hot_cold_demo()
# Keep program running for asynchronous operations
time.sleep(10)
Reactive Streams Backpressure
Backpressure Strategies
// RxJava Backpressure
Flowable.range(1, 1000)
.onBackpressureBuffer() // Buffer (Standard)
.onBackpressureDrop() // Drop excess elements
.onBackpressureLatest() // Keep only latest element
.onBackpressureError() // Error on overload
.subscribe(item -> processItem(item));
// Project Reactor Backpressure
Flux.range(1, 1000)
.onBackpressureBuffer(100) // Buffer with size
.onBackpressureDrop() // Drop elements
.onBackpressureLatest() // Only last element
.limitRate(100) // Rate Limiting
.subscribe(item -> processItem(item));
Scheduler for Thread Control
RxJava Scheduler
Observable.just("data")
.subscribeOn(Schedulers.io()) // Execution on IO thread
.observeOn(Schedulers.single()) // Observation on single thread
.observeOn(Schedulers.computation()) // Computations on CPU thread
.subscribe(result -> handleResult(result));
Project Reactor Scheduler
Mono.just("data")
.subscribeOn(Schedulers.boundedElastic()) // IO operations
.publishOn(Schedulers.parallel()) // Parallel processing
.publishOn(Schedulers.single()) // Single thread
.subscribe(result -> handleResult(result));
Advantages and Disadvantages
Advantages of Reactive Programming
- Asynchronicity: Non-blocking processing
- Scalability: Better utilization of system resources
- Responsiveness: Faster response times
- Flexibility: Easy composition of operations
- Error Handling: Centralized error handling
Disadvantages
- Complexity: Steep learning curve
- Debugging: More difficult error tracing
- Overhead: Additional abstraction layer
- Resource Management: More complex lifecycle management
Common Exam Questions
-
What is the difference between Observable and Subject? Observable is only a source, Subject can function both as a source and as a receiver.
-
Explain Backpressure! Mechanism to protect against overload when fast producers and slow consumers interact.
-
When do you use Reactive Programming? For asynchronous operations, real-time applications, and systems with high scalability requirements.
-
What is the purpose of Schedulers? Control over thread execution and parallel processing in reactive systems.