Stand: 2018-10-08

Ereignisgesteuerte Zeitfenster mit Reactor

Wie kann man in Reactor Zeitfenster in Abhängigkeit von Ereignissen und nicht nach einem starren Takt bilden?

Mit den Operatoren buffer und window kann eine Folge von Signalen in Zeitfenster “portioniert” werden. Dazu wird bei der Subscription im Hintergrund ein Timer gestartet, der das Zeitfenster nach der angegebenen Zeit schließt. Was macht man aber, wenn das Zeitfenster nicht bei der Subscription sondern erst bei einem Signal beginnen soll?

Mit einem kleinen Beispiel wird der Unterschied verständlicher. Der Kommentar über den Werten im nächsten Code-Ausschnitt zeigt die Zeitfenster, die durch buffer(Duration.ofSeconds(3)) gebildet werden:

fun main(args: Array<String>) {
    // Fenster:          |-------|--------|--------|-----
    val source = sourceOf(0, 0, 0, 0, 1, 2, 0, 0, 3, 4, 5)
    source.buffer(Duration.ofSeconds(3)).subscribe(::println)

    Thread.sleep(12000)
}

Die Methode sourceOf erzeugt eine Folge von Signalen im Sekundentakt, wobei für Eingabewerte von 0 keine Signale erzeugt werden, sondern nur die Zeit verstreichen gelassen wird.

fun sourceOf(vararg signals: Int): Flux<Int> =
        Flux.just(*signals.toTypedArray())
                .delayElements(Duration.ofMillis(990))
                .filter { it != 0 }

Die Signale werden übrigens mit einer Verzögerung von 990 Millisekunden gesendet, damit nicht ein Signal genau auf das Ende eines Zeitfensters fällt, was in der Realität auch ganz kurz danach sein könnte. Außerdem braucht die Ausführung auch noch etwas Zeit, was zu weiteren Abweichungen vom Ideal führt. Bei einem Test mit virtueller Zeit könnte man auf diesen kleinen Trick verzichten.

Die Zeitfenster werden genau wie erwartet beginnend mit der Subscription alle 3 Sekunden gebildet. Ausgabe:

[1, 2]
[3]
[4, 5]

Es gibt allerdings eine Variante der buffer-Methode, mit der man die Zeitsteuerung selbst in die Hand nehmen kann:

public final Flux<List<T>> buffer(Publisher<?> other)

Immer wenn der Publisher other ein Signal (egal welches) sendet, wird der Puffer, in diesem Fall also das Zeitfenster, geschlossen. Als Timer bietet sich ein Mono mit verzögerter Ausgabe an:

fun <T> bufferOnSignal(source: Flux<T>, timespan: Duration) : Flux<List<T>> {
    var shouldOpenWindowOnNext = true
    val delimiter = source.flatMap {
        if (shouldOpenWindowOnNext) {
            shouldOpenWindowOnNext = false
            Mono.just(Unit).delayElement(timespan)
        } else {
            Mono.empty()
        }
    }
    return source.buffer(delimiter).doOnNext { shouldOpenWindowOnNext = true }
}

Zu Beginn eines Zeitfensters wird als delimiter jeweils eine Mono-Instanz erzeugt und mit delayElement die Ausgabe dessen Signals verzögert. delimiter und buffer müssen auf derselben source basieren. Der delimiter gibt das nichtssagende Unit (vergleichbar mit Void in Java) aus, weil es ja nur auf das Signal als solches ankommt.

Natürlich darf aber nicht bei jedem Eingangssignal ein neues Mono-Objekt als Timer erzeugt werden, weil die Zeitfenster sonst jeweils nur ein Element enthalten würden. Deshalb wird shouldOpenWindowOnNext auf false gesetzt, solange ein Zeitfenster geöffnet ist. Nach dem Schließen eines Zeitfensters wird der Wert dann wieder auf true gesetzt.

In das Ausgangsbeispiel kann die Funktion bufferOnSignal mit transform eingebaut werden:

fun main(args: Array<String>) {
    // Fenster:                      |-------|   |-------|
    val source = sourceOf(0, 0, 0, 0, 1, 2, 0, 0, 3, 4, 5)
    source.transform { s -> bufferOnSignal(s, Duration.ofSeconds(3)) }
            .subscribe(::println)

    Thread.sleep(12000)
}

Die Ausgabe zeigt, dass die Zeitfenster wie gewünscht bei einem Signal und nicht nach einem starren Takt gebildet werden:

[1, 2]
[3, 4, 5]

In Kotlin kann statt transform auch eine Erweiterungsfunktion verwendet werden, was den Code noch etwas lesbarer macht:

source.bufferOnSignal(Duration.ofSeconds(3)).subscribe(::println)

Als Erweiterungsfunktion wird bufferOnSignal so implementiert:

fun <T> Flux<T>.bufferOnSignal(timespan: Duration): Flux<List<T>> {
    var shouldOpenWindowOnNext = true
    val delimiter = this.flatMap {
        if (shouldOpenWindowOnNext) {
            shouldOpenWindowOnNext = false
            Mono.just(Unit).delayElement(timespan)
        } else {
            Mono.empty()
        }
    }
    return this.buffer(delimiter).doOnNext { shouldOpenWindowOnNext = true }
}