Stand: 2018-10-08
Ereignisgesteuerte Zeitfenster mit Reactor
Wie kann man in Reactor Zeitfenster in Abhängigkeit von Ereignissen und nicht nach einem starren Takt bilden?
Mit den Operatoren buffer
und window
kann eine Folge von Signalen in Zeitfenster “portioniert” werden. Dazu wird bei der Subscription im Hintergrund ein Timer gestartet, der das Zeitfenster nach der angegebenen Zeit schließt. Was macht man aber, wenn das Zeitfenster nicht bei der Subscription sondern erst bei einem Signal beginnen soll?
Mit einem kleinen Beispiel wird der Unterschied verständlicher. Der Kommentar über den Werten im nächsten Code-Ausschnitt zeigt die Zeitfenster, die durch buffer(Duration.ofSeconds(3))
gebildet werden:
fun main(args: Array<String>) {
// Fenster: |-------|--------|--------|-----
val source = sourceOf(0, 0, 0, 0, 1, 2, 0, 0, 3, 4, 5)
source.buffer(Duration.ofSeconds(3)).subscribe(::println)
Thread.sleep(12000)
}
Die Methode sourceOf
erzeugt eine Folge von Signalen im Sekundentakt, wobei für Eingabewerte von 0
keine Signale erzeugt werden, sondern nur die Zeit verstreichen gelassen wird.
fun sourceOf(vararg signals: Int): Flux<Int> =
Flux.just(*signals.toTypedArray())
.delayElements(Duration.ofMillis(990))
.filter { it != 0 }
Die Signale werden übrigens mit einer Verzögerung von 990 Millisekunden gesendet, damit nicht ein Signal genau auf das Ende eines Zeitfensters fällt, was in der Realität auch ganz kurz danach sein könnte. Außerdem braucht die Ausführung auch noch etwas Zeit, was zu weiteren Abweichungen vom Ideal führt. Bei einem Test mit virtueller Zeit könnte man auf diesen kleinen Trick verzichten.
Die Zeitfenster werden genau wie erwartet beginnend mit der Subscription alle 3 Sekunden gebildet. Ausgabe:
[1, 2]
[3]
[4, 5]
Es gibt allerdings eine Variante der buffer
-Methode, mit der man die Zeitsteuerung selbst in die Hand nehmen kann:
public final Flux<List<T>> buffer(Publisher<?> other)
Immer wenn der Publisher other
ein Signal (egal welches) sendet, wird der Puffer, in diesem Fall also das Zeitfenster, geschlossen. Als Timer bietet sich ein Mono
mit verzögerter Ausgabe an:
fun <T> bufferOnSignal(source: Flux<T>, timespan: Duration) : Flux<List<T>> {
var shouldOpenWindowOnNext = true
val delimiter = source.flatMap {
if (shouldOpenWindowOnNext) {
shouldOpenWindowOnNext = false
Mono.just(Unit).delayElement(timespan)
} else {
Mono.empty()
}
}
return source.buffer(delimiter).doOnNext { shouldOpenWindowOnNext = true }
}
Zu Beginn eines Zeitfensters wird als delimiter
jeweils eine Mono
-Instanz erzeugt und mit delayElement
die Ausgabe dessen Signals verzögert. delimiter
und buffer
müssen auf derselben source
basieren. Der delimiter
gibt das nichtssagende Unit
(vergleichbar mit Void
in Java) aus, weil es ja nur auf das Signal als solches ankommt.
Natürlich darf aber nicht bei jedem Eingangssignal ein neues Mono
-Objekt als Timer erzeugt werden, weil die Zeitfenster sonst jeweils nur ein Element enthalten würden. Deshalb wird shouldOpenWindowOnNext
auf false
gesetzt, solange ein Zeitfenster geöffnet ist. Nach dem Schließen eines Zeitfensters wird der Wert dann wieder auf true
gesetzt.
In das Ausgangsbeispiel kann die Funktion bufferOnSignal
mit transform
eingebaut werden:
fun main(args: Array<String>) {
// Fenster: |-------| |-------|
val source = sourceOf(0, 0, 0, 0, 1, 2, 0, 0, 3, 4, 5)
source.transform { s -> bufferOnSignal(s, Duration.ofSeconds(3)) }
.subscribe(::println)
Thread.sleep(12000)
}
Die Ausgabe zeigt, dass die Zeitfenster wie gewünscht bei einem Signal und nicht nach einem starren Takt gebildet werden:
[1, 2]
[3, 4, 5]
In Kotlin kann statt transform
auch eine Erweiterungsfunktion verwendet werden, was den Code noch etwas lesbarer macht:
source.bufferOnSignal(Duration.ofSeconds(3)).subscribe(::println)
Als Erweiterungsfunktion wird bufferOnSignal
so implementiert:
fun <T> Flux<T>.bufferOnSignal(timespan: Duration): Flux<List<T>> {
var shouldOpenWindowOnNext = true
val delimiter = this.flatMap {
if (shouldOpenWindowOnNext) {
shouldOpenWindowOnNext = false
Mono.just(Unit).delayElement(timespan)
} else {
Mono.empty()
}
}
return this.buffer(delimiter).doOnNext { shouldOpenWindowOnNext = true }
}