Stand: 2018-06-02
Verzögertes Retry mit Reactor
Wie kann man fehlgeschlagenen Aktionen mit Reactor verzögert wiederholen?
Wenn Reactor unzuverlässige Operationen wie Aufrufe externer Systeme ausführt, kann ein fehlgeschlagener Aufruf wiederholt werden. Mehrere Varianten der retry
-Funktion sind für diesen Zweck schon in reactor-core
enthalten. Allerdings bietet keine davon eine zeitliche Verzögerung und würde so im Wiederholungsfall zu extrem schnellen Wiederholungen führen.
Die retry
-Funktionen aus reactor-core
:
retry()
retry(long numRetries)
retry(Predicate<? super Throwable> retryMatcher)
retry(long numRetries, Predicate<? super Throwable> retryMatcher)
retryWhen(Function<Flux<Throwable>,? extends Publisher<?>> whenFactory)
In einem verteilten System, in dem mehrere Dienste zum Beispiel auf eine Datenbank zugreifen, könnte die Datenbank für einen Moment ausfallen, sodass die anderen Dienste die Anfrage wiederholen. Wenn dazu die mitgelieferte retry
-Funktion verwendet würde, könnte der Datenbankserver mit massenhaften Anfragen gleich wieder lahmgelegt werden!
Um dieses Problem zu lösen, gibt es als Teil des Projekts reactor-addons die Bibliothek reactor-extra
mit einer verbesserten retry
-Funktion mit Verzögerung.
Beispiel
Im folgenden Beispiel (in Kotlin) wird mit der unreliableSource
eine unzuverlässige Operation simuliert. In den ersten 3 Sekunden werden im Sekundentakt Exceptions geworfen. Danach wird eine jeweiles um 1 erhöhte Zahl zurückgegeben.
Der eigentlich interessante Teil folgt nach Retry
:
import reactor.core.publisher.Flux
import reactor.retry.Retry
import java.time.Duration
fun main(args: Array<String>) {
var counter = 0
val unreliableSource = Flux.interval(Duration.ofSeconds(1))
.map {
counter++
if (counter < 4) throw RuntimeException("test") else counter
}
val resilientSource = Retry.anyOf<Any>(RuntimeException::class.java)
.doOnRetry { println("retrying ... ${it.exception()}") }
.retryMax(5)
.fixedBackoff(Duration.ofSeconds(3))
.apply(unreliableSource)
resilientSource.subscribe({ println("success: $it") }, { println("ERROR") })
Thread.sleep(20000)
}
anyOf
beschränkt die Wiederholung auf bestimmte Exceptions, hierRuntimeException
.doOnRetry
führt bei einem erneuten Versuch beliebigen Code aus, in diesem Fall die Ausgabe auf die Konsole.retryMax
begrenzt die Anzahl der Wiederholungsversuche.fixedBackoff
legt eine konstante Verzögerungszeit zwischen Wiederholungen fest.apply
nimmt die Quelle entgegen, die mit dem Retry-Verhalten dekoriert werden soll.
Schließlich gibt apply
den dekorierten Flux
zurück, auf dem dann alle üblichen Reactor-Funktionen aufgerufen werden können; hier bleibt es bei dem abschließenden subscribe
.
Das Beispiel führt zu dieser Ausgabe:
[DEBUG] (main) Using Console logging
[DEBUG] (parallel-1) Scheduling retry attempt, retry context: iteration=1 exception=java.lang.RuntimeException: test backoff={3000ms}
retrying ... java.lang.RuntimeException: test
[DEBUG] (parallel-3) Scheduling retry attempt, retry context: iteration=2 exception=java.lang.RuntimeException: test backoff={3000ms}
retrying ... java.lang.RuntimeException: test
[DEBUG] (parallel-5) Scheduling retry attempt, retry context: iteration=3 exception=java.lang.RuntimeException: test backoff={3000ms}
retrying ... java.lang.RuntimeException: test
success: 4
success: 5
success: 6
success: 7
success: 8
success: 9
success: 10
Die Ausgabe ERROR
erscheint nicht, weil in diesem Beispiel kein Fehler bis zum Ende der Verarbeitungskette durchdringt.
Die Wiederholung in einem festen Intervall mit fixedBackoff
ist der einfachste Fall, aber es gibt noch ausgefeiltere Strategien wie eine exponentenielle Verlängerung der Zeit mit exponentialBackoff(Duration firstBackoff, Duration maxBackoff)
oder zufällige Verzögerungen mit randomBackoff
. Letzteres ist besonders dann von Vorteil, wenn mehrere Dienste versuchen, auf einen ausgefallenen Dienst zuzugfreifen, denn wenn alle Dienste im gleichen Takt erneut auf den ausgefallenen Dienst zugreifen, bestünde die Gefahr einer Überlastung.
Eingebunden wird Retry
durch folgende Abhängigkeit (Konfiguration mit Gradle):
dependencies {
implementation('io.projectreactor.addons:reactor-extra')
}
Weitere Information
- Grundsätzliche Erklärung des
retry
-Mechanismus in der Reactor-Dokumentation