重学SpringBoot3-Spring WebFlux之Reactor事件感知 API

重学SpringBoot3-Spring WebFlux之Reactor事件感知 API

CoderJia 22 2024-10-27

在现代应用中,响应式编程逐渐成为处理高并发、异步非阻塞任务的首选编程范式。在 Spring Boot 3 中,响应式编程通过 Reactor 库得到了广泛应用,提供了强大的流式数据处理能力。为了增强对流式数据流的调试和处理能力,Reactor 提供了一组非常重要的事件感知(side-effect)API,也就是我们常听到的 doOnXxx 系列方法。

这篇博客将详细介绍 doOnXxx 系列 API 的功能和用法,帮助大家更好地理解它们在响应式流中的作用,并展示其在实际开发中的一些应用场景。

1. 什么是 doOnXxx 系列 API?

doOnXxx 系列方法是 Reactor 提供的一组用于在流操作过程中执行副作用的 API。它们不会改变流的内容或数据流本身,而是允许我们在特定的生命周期事件发生时进行操作(如日志记录、调试、监控等)。

doOnXxx

这些 API 名称中的 Xxx 代表不同的事件类型,比如:

  • doOnNext(): 当下一个元素被发出时执行操作。
  • doOnError(): 当流中出现错误时执行操作。
  • doOnComplete(): 当流完成时执行操作。
  • doOnSubscribe(): 当订阅发生时执行操作。

这些方法非常适合用于监控、调试或者记录流的行为。

2. doOnXxx API 的常用方法

下面我们依次介绍常见的 doOnXxx API,并通过简单的示例进行演示。

2.1 doOnNext()

doOnNext() 方法允许你在每个元素被发布时执行操作,通常用于对每个数据元素进行日志记录、调试或者进行某种副作用操作。

示例:

        Flux<String> flux = Flux.just("Spring", "Boot", "3","Reactor")
                .doOnNext(value -> System.out.println("Processing value: " + value))
                .map(String::toUpperCase);

        flux.subscribe(System.out::println);

输出:

doOnNext()

在这个例子中,doOnNext() 被用于每个元素发出时打印日志。这对于调试非常有用,可以清楚看到每个数据元素何时被处理。

2.2 doOnError()

doOnError() 方法允许你在流中出现异常时执行操作,通常用于记录异常信息、执行错误处理逻辑等。

示例:

        Flux<Integer> fluxWithError = Flux.just(1, 2, 0)
                .map(i -> 10 / i)  // 这里会抛出 ArithmeticException: / by zero
                .doOnError(e -> System.err.println("Error occurred: " + e.getMessage()));

        fluxWithError.subscribe(
                System.out::println,
                error -> System.err.println("Subscriber received error: " + error)
        );

输出:

doOnError()

在这个例子中,Flux 被用来创建一个数据流,并且在这个数据流中执行了一些操作,包括可能抛出异常的操作。下面是对消费者和生产者异常捕获的区别:
生产者异常捕获:

  • 在生产者端,可以使用 doOnError 方法来捕获并处理异常,这个方法会在数据流中发生错误时被调用。
  • doOnError 可以用于记录日志或执行一些清理操作,它不会改变数据流的行为,但数据流会被终止。

消费者异常捕获:

  • 在消费者端,可以通过 subscribe 方法的第二个参数(错误处理回调)来捕获并处理异常。
  • 这个错误处理回调会在数据流中发生错误时被调用,可以用于记录日志或执行其他错误处理逻辑。

2.3 doOnComplete()

doOnComplete() 方法在流完成时(即没有更多元素发出)执行操作。你可以利用它在流结束时执行一些收尾工作,比如关闭资源、统计处理结果等。

示例:

        Flux<String> flux = Flux.just("Spring", "Boot", "3","Reactor")
                .doOnComplete(() -> System.out.println("Stream completed"));

        flux.subscribe(System.out::println);

输出:

doOnComplete()

这里,doOnComplete() 用于在数据流结束时打印一条日志,通知处理完成。

2.4 doOnSubscribe()

doOnSubscribe() 允许你在流被订阅时执行操作。它通常用于监控订阅事件,适合用于统计订阅数或进行相关的初始化操作。

示例:

        Flux<String> flux = Flux.just("A", "B", "C")
                .doOnSubscribe(subscription -> System.out.println("Subscription started"));

        flux.subscribe(System.out::println);

输出:

doOnSubscribe()

在这个例子中,当流被订阅时,doOnSubscribe() 被调用,打印订阅开始的日志。

2.5 doOnCancel()

doOnCancel() 方法在取消订阅时执行操作。取消订阅通常是在消费者不再需要流数据时发生的(例如手动取消订阅或者发生超时等情况),可以用于处理一些资源释放的操作。

示例:

Flux<String> flux = Flux.just("A", "B", "C")
    .doOnCancel(() -> System.out.println("Subscription canceled"))
    .take(2);  // 只取前两个元素,第三个元素将被跳过(取消)

flux.subscribe(System.out::println);

输出:

doOnCancel()

这里 doOnCancel() 在流被取消时执行了取消订阅的操作。

2.6 doFinally()

doFinally() 是一个非常有用的方法,它在流结束时始终会被调用(无论是正常完成、错误还是取消订阅)。它类似于 try-finally 语句中的 finally,适合做一些无论流如何结束都需要执行的操作,如清理资源等。

示例:

        Flux<String> flux = Flux.just("A", "B", "C")
                .doFinally(signalType -> System.out.println("Stream ended with signal: " + signalType));

        flux.subscribe(System.out::println);

输出:

doFinally()

doFinally() 可以捕捉到不同类型的信号,包括 onComplete, onErroronCancel

2.7 doOnTerminate()

doOnTerminate() 在流完成或出错时执行操作。它是 doOnComplete()doOnError() 的组合,但不区分流是正常完成还是出现错误,只要流结束了,它就会被调用。

示例:

        Flux<String> flux = Flux.just("A", "B", "C")
                .doOnTerminate(() -> System.out.println("Stream terminated"));

        flux.subscribe(System.out::println);

输出:

doOnTerminate()

它在流结束时总会执行,不管是否出现错误。

2.8 doOnEach()

doOnEach() 是一个非常通用的事件感知 API,它允许对流中的每一个信号(包括 onNextonErroronCompleteonSubscribe)进行统一处理。这个方法会接收一个 Signal 对象,表示当前发生的事件类型,从而可以处理不同的信号类型。

示例:

        Flux<String> flux = Flux.just("Spring", "Boot", "3", "Reactor")
                .doOnEach(signal -> {
                    if (signal.isOnNext()) {
                        System.out.println("Element received: " + signal.get());
                    } else if (signal.isOnError()) {
                        System.err.println("Error occurred: " + signal.getThrowable().getMessage());
                    } else if (signal.isOnComplete()) {
                        System.out.println("Stream completed");
                    }
                });

        flux.subscribe(System.out::println);

输出:

doOnEach()

2.9 doOnDiscard()

doOnDiscard() 方法用于处理被 丢弃的元素。当某些元素由于某种原因(例如 filter() 操作或上游取消)没有被使用时,可以通过 doOnDiscard() 来感知这些元素的丢弃,并执行相关的操作(如清理资源、记录日志等)。

可能使用 doOnDiscard 钩子的例子包括以下情况:

  • filter: 不符合过滤器的项被视为 “丢弃”。
  • skip:跳过的项将被丢弃。
  • buffer(maxSize, skip)maxSize < skip:“丢弃的缓冲区” — 缓冲区之间的元素被丢弃。

示例:

        Flux<String> flux = Flux.just("AA", "BB", "C", "D", "E")
                .filter(s -> s.length() > 1)
                .doOnDiscard(String.class, discardedValue ->
                        System.out.println("Discarded: " + discardedValue));

        flux.subscribe(System.out::println);

输出:

doOnDiscard()

2.10 doOnRequest()

doOnRequest() 是一个用于处理 背压请求(request signals) 的 API,它允许你在下游请求元素时执行操作。响应式流中上游发送元素的数量通常由下游通过请求背压机制控制,因此 doOnRequest() 可以帮助我们监控下游对元素的需求。

示例:

        Flux<Integer> flux = Flux.range(1, 5)
                .doOnRequest(request ->
                        System.out.println("Request for: " + request + " elements"));

        flux.subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                s.request(3);
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println("Received: " + integer);
            }

            @Override
            public void onError(Throwable t) {

            }

            @Override
            public void onComplete() {

            }
        });  // 请求 3 个元素

输出:

doOnRequest()

3. doOnXxx 的应用场景

  1. 日志记录与调试:在流的不同阶段插入 doOnXxx,帮助我们记录每个阶段的状态变化或异常情况,从而更好地调试响应式流。
  2. 监控和统计:我们可以使用 doOnSubscribe()doOnComplete() 结合监控系统来统计订阅的数量、完成的流数量,分析流的性能。
  3. 资源管理:使用 doFinally() 进行资源释放和清理,确保无论流如何结束都能进行相应的收尾工作。
  4. 错误处理:使用 doOnError() 可以在发生错误时记录日志、发送通知或者做出其他相应的处理。

4. 总结

Reactor 的 doOnXxx 系列 API 是在响应式流中进行事件感知和副作用处理的强大工具。它们的主要作用是让开发者能够在不干扰流式数据处理的情况下,插入额外的操作,如调试、监控、资源清理等。通过合理使用 doOnNext()doOnError()doFinally() 等方法,我们可以更好地理解和控制响应式流的执行过程,从而构建更加健壮和高效的应用程序。

希望这篇文章能帮助你更好地掌握 doOnXxx 系列方法。如果你有任何问题或建议,欢迎讨论!