在现代应用中,响应式编程逐渐成为处理高并发、异步非阻塞任务的首选编程范式。在 Spring Boot 3 中,响应式编程通过 Reactor 库得到了广泛应用,提供了强大的流式数据处理能力。为了增强对流式数据流的调试和处理能力,Reactor 提供了一组非常重要的事件感知(side-effect)API,也就是我们常听到的 doOnXxx
系列方法。
这篇博客将详细介绍 doOnXxx
系列 API 的功能和用法,帮助大家更好地理解它们在响应式流中的作用,并展示其在实际开发中的一些应用场景。
1. 什么是 doOnXxx
系列 API?
doOnXxx
系列方法是 Reactor 提供的一组用于在流操作过程中执行副作用的 API。它们不会改变流的内容或数据流本身,而是允许我们在特定的生命周期事件发生时进行操作(如日志记录、调试、监控等)。
这些 API 名称中的 Xxx
代表不同的事件类型,比如:
doOnNext()
: 当下一个元素被发出时执行操作。doOnError()
: 当流中出现错误时执行操作。doOnComplete()
: 当流完成时执行操作。doOnSubscribe()
: 当订阅发生时执行操作。
这些方法非常适合用于监控、调试或者记录流的行为。
2. doOnXxx
API 的常用方法
下面我们依次介绍常见的 doOnXxx
API,并通过简单的示例进行演示。
2.1 doOnNext()
doOnNext()
方法允许你在每个元素被发布时执行操作,通常用于对每个数据元素进行日志记录、调试或者进行某种副作用操作。
示例:
Flux<String> flux = Flux.just("Spring", "Boot", "3","Reactor")
.doOnNext(value -> System.out.println("Processing value: " + value))
.map(String::toUpperCase);
flux.subscribe(System.out::println);
输出:
在这个例子中,doOnNext()
被用于每个元素发出时打印日志。这对于调试非常有用,可以清楚看到每个数据元素何时被处理。
2.2 doOnError()
doOnError()
方法允许你在流中出现异常时执行操作,通常用于记录异常信息、执行错误处理逻辑等。
示例:
Flux<Integer> fluxWithError = Flux.just(1, 2, 0)
.map(i -> 10 / i) // 这里会抛出 ArithmeticException: / by zero
.doOnError(e -> System.err.println("Error occurred: " + e.getMessage()));
fluxWithError.subscribe(
System.out::println,
error -> System.err.println("Subscriber received error: " + error)
);
输出:
在这个例子中,Flux 被用来创建一个数据流,并且在这个数据流中执行了一些操作,包括可能抛出异常的操作。下面是对消费者和生产者异常捕获的区别:
生产者异常捕获:
- 在生产者端,可以使用 doOnError 方法来捕获并处理异常,这个方法会在数据流中发生错误时被调用。
- doOnError 可以用于记录日志或执行一些清理操作,它不会改变数据流的行为,但数据流会被终止。
消费者异常捕获:
- 在消费者端,可以通过 subscribe 方法的第二个参数(错误处理回调)来捕获并处理异常。
- 这个错误处理回调会在数据流中发生错误时被调用,可以用于记录日志或执行其他错误处理逻辑。
2.3 doOnComplete()
doOnComplete()
方法在流完成时(即没有更多元素发出)执行操作。你可以利用它在流结束时执行一些收尾工作,比如关闭资源、统计处理结果等。
示例:
Flux<String> flux = Flux.just("Spring", "Boot", "3","Reactor")
.doOnComplete(() -> System.out.println("Stream completed"));
flux.subscribe(System.out::println);
输出:
这里,doOnComplete()
用于在数据流结束时打印一条日志,通知处理完成。
2.4 doOnSubscribe()
doOnSubscribe()
允许你在流被订阅时执行操作。它通常用于监控订阅事件,适合用于统计订阅数或进行相关的初始化操作。
示例:
Flux<String> flux = Flux.just("A", "B", "C")
.doOnSubscribe(subscription -> System.out.println("Subscription started"));
flux.subscribe(System.out::println);
输出:
在这个例子中,当流被订阅时,doOnSubscribe()
被调用,打印订阅开始的日志。
2.5 doOnCancel()
doOnCancel()
方法在取消订阅时执行操作。取消订阅通常是在消费者不再需要流数据时发生的(例如手动取消订阅或者发生超时等情况),可以用于处理一些资源释放的操作。
示例:
Flux<String> flux = Flux.just("A", "B", "C")
.doOnCancel(() -> System.out.println("Subscription canceled"))
.take(2); // 只取前两个元素,第三个元素将被跳过(取消)
flux.subscribe(System.out::println);
输出:
这里 doOnCancel()
在流被取消时执行了取消订阅的操作。
2.6 doFinally()
doFinally()
是一个非常有用的方法,它在流结束时始终会被调用(无论是正常完成、错误还是取消订阅)。它类似于 try-finally
语句中的 finally
,适合做一些无论流如何结束都需要执行的操作,如清理资源等。
示例:
Flux<String> flux = Flux.just("A", "B", "C")
.doFinally(signalType -> System.out.println("Stream ended with signal: " + signalType));
flux.subscribe(System.out::println);
输出:
doFinally()
可以捕捉到不同类型的信号,包括 onComplete
, onError
和 onCancel
。
2.7 doOnTerminate()
doOnTerminate()
在流完成或出错时执行操作。它是 doOnComplete()
和 doOnError()
的组合,但不区分流是正常完成还是出现错误,只要流结束了,它就会被调用。
示例:
Flux<String> flux = Flux.just("A", "B", "C")
.doOnTerminate(() -> System.out.println("Stream terminated"));
flux.subscribe(System.out::println);
输出:
它在流结束时总会执行,不管是否出现错误。
2.8 doOnEach()
doOnEach()
是一个非常通用的事件感知 API,它允许对流中的每一个信号(包括 onNext、onError、onComplete 和 onSubscribe)进行统一处理。这个方法会接收一个 Signal
对象,表示当前发生的事件类型,从而可以处理不同的信号类型。
示例:
Flux<String> flux = Flux.just("Spring", "Boot", "3", "Reactor")
.doOnEach(signal -> {
if (signal.isOnNext()) {
System.out.println("Element received: " + signal.get());
} else if (signal.isOnError()) {
System.err.println("Error occurred: " + signal.getThrowable().getMessage());
} else if (signal.isOnComplete()) {
System.out.println("Stream completed");
}
});
flux.subscribe(System.out::println);
输出:
2.9 doOnDiscard()
doOnDiscard()
方法用于处理被 丢弃的元素。当某些元素由于某种原因(例如 filter()
操作或上游取消)没有被使用时,可以通过 doOnDiscard()
来感知这些元素的丢弃,并执行相关的操作(如清理资源、记录日志等)。
可能使用 doOnDiscard
钩子的例子包括以下情况:
filter
: 不符合过滤器的项被视为 “丢弃”。skip
:跳过的项将被丢弃。buffer(maxSize, skip)
与maxSize < skip
:“丢弃的缓冲区” — 缓冲区之间的元素被丢弃。
示例:
Flux<String> flux = Flux.just("AA", "BB", "C", "D", "E")
.filter(s -> s.length() > 1)
.doOnDiscard(String.class, discardedValue ->
System.out.println("Discarded: " + discardedValue));
flux.subscribe(System.out::println);
输出:
2.10 doOnRequest()
doOnRequest()
是一个用于处理 背压请求(request signals) 的 API,它允许你在下游请求元素时执行操作。响应式流中上游发送元素的数量通常由下游通过请求背压机制控制,因此 doOnRequest()
可以帮助我们监控下游对元素的需求。
示例:
Flux<Integer> flux = Flux.range(1, 5)
.doOnRequest(request ->
System.out.println("Request for: " + request + " elements"));
flux.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(3);
}
@Override
public void onNext(Integer integer) {
System.out.println("Received: " + integer);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
}); // 请求 3 个元素
输出:
3. doOnXxx
的应用场景
- 日志记录与调试:在流的不同阶段插入
doOnXxx
,帮助我们记录每个阶段的状态变化或异常情况,从而更好地调试响应式流。 - 监控和统计:我们可以使用
doOnSubscribe()
和doOnComplete()
结合监控系统来统计订阅的数量、完成的流数量,分析流的性能。 - 资源管理:使用
doFinally()
进行资源释放和清理,确保无论流如何结束都能进行相应的收尾工作。 - 错误处理:使用
doOnError()
可以在发生错误时记录日志、发送通知或者做出其他相应的处理。
4. 总结
Reactor 的 doOnXxx
系列 API 是在响应式流中进行事件感知和副作用处理的强大工具。它们的主要作用是让开发者能够在不干扰流式数据处理的情况下,插入额外的操作,如调试、监控、资源清理等。通过合理使用 doOnNext()
、doOnError()
、doFinally()
等方法,我们可以更好地理解和控制响应式流的执行过程,从而构建更加健壮和高效的应用程序。
希望这篇文章能帮助你更好地掌握 doOnXxx
系列方法。如果你有任何问题或建议,欢迎讨论!