Wyko Rijnsburger     About     Feed

Combining Streams using Reactor 3 + Kotlin

I spent a day migrating a service that does a lot of aggregation from RxJava 1 to Reactor 3. The migration went pretty smooth, with the Reactor API being more elegant compared to RxJava in most cases.

However, I struggled migrating the large zip() operations, where we merge async data from various sources into one big Observable. RxJava has zip that works like this:

public static <T1,T2,T3,T4,T5,R> Single<R> zip(Single<? extends T1> o1,
                                   Single<? extends T2> o2,
                                   Single<? extends T3> o3,
                                   Single<? extends T4> o4,
                                   Single<? extends T5> o5,
                                   Func5<? super T1,? super T2,? super T3,? super T4,? super T5,? extends R> zipFunction)

While I think from a maintenance perspective, this is far from ideal, as a user, the resulting code becomes quite readable:

Single.zip(singleA, singleB, singleC, singleD, singleE, 
           (a, b, c, d, e) -> new Aggregate(a, b, c, d, e))

In Reactor, the API unfortunately isn’t as user-friendly. In the combinator function, you lose all type information which results in a long, ugly Function where we’re forced to do manual casting.

Mono.zip(array -> {
    A a = (A) array[0];
    B b = (B) array[1];
    C c = (C) array[2];
    D d = (D) array[3];
    E e = (E) array[4];
    
    return new Aggregate(a, b, c, d, e)
}, monoA, monoB, monoC, monoD, monoE)

There is also the when approach, which returns a Tuple that you can then transform. This approach is slightly more maintainable as it does not require manual casting, but is not that readable:

Mono.when(monoA, monoB, monoC, monoD, monoE)
    .map(tuple -> new Aggregate(tuple.t1(), tuple.t2(), tuple.t3(), tuple.t4(), tuple.t5()))

In Kotlin, we can improve this by using the Tuple Extension Functions for Kotlin provided by Reactor. These Kotlin extension functions provide component() methods to the Reactor Tuples which allow us to use Kotlin’s Destructuring Declarations syntax. We need to manually import these extension functions, in my experience IDEA does not do it for you.

import reactor.util.function.component1
import reactor.util.function.component2
import reactor.util.function.component3
import reactor.util.function.component4
import reactor.util.function.component5

Mono.when(monoA, monoB, monoC, monoD, monoE)
    .map {(a, b, c, d, e) -> Aggregate(a, b, c, d, e)}

Rate-limiting API-calls with Reactor 3

I’ve been working with Reactor 3 recently. Reactor is a Reactive library similar to RxJava, but developed for Java 8 by the guys at Spring.

I ran into a situation where I had to call an external API for all items in a stream. That external API only allowed four requests a second, so I had to rate-limit the outgoing calls. Luckily, using Reactor, this is easily implemented.

We can use the delayElements method on the stream to limit the amount of emitted items per second.

Flux.just("ID1", "ID2")
    .delayElements(Duration.ofMillis(250))
    .map(id -> apiClient.doSomething(id));

Update 12th April 2017

This post has been updated to use the new delayElements method instead of the now deprecated delayMillis.