Stand: 2018-06-02

Verzögertes Retry mit Reactor

Wie kann man fehlgeschlagenen Aktionen mit Reactor verzögert wiederholen?

Wenn Reactor unzuverlässige Operationen wie Aufrufe externer Systeme ausführt, kann ein fehlgeschlagener Aufruf wiederholt werden. Mehrere Varianten der retry-Funktion sind für diesen Zweck schon in reactor-core enthalten. Allerdings bietet keine davon eine zeitliche Verzögerung und würde so im Wiederholungsfall zu extrem schnellen Wiederholungen führen.

Die retry-Funktionen aus reactor-core:

retry()
retry(long numRetries)
retry(Predicate<? super Throwable> retryMatcher)
retry(long numRetries, Predicate<? super Throwable> retryMatcher)
retryWhen(Function<Flux<Throwable>,? extends Publisher<?>> whenFactory)

In einem verteilten System, in dem mehrere Dienste zum Beispiel auf eine Datenbank zugreifen, könnte die Datenbank für einen Moment ausfallen, sodass die anderen Dienste die Anfrage wiederholen. Wenn dazu die mitgelieferte retry-Funktion verwendet würde, könnte der Datenbankserver mit massenhaften Anfragen gleich wieder lahmgelegt werden!

Um dieses Problem zu lösen, gibt es als Teil des Projekts reactor-addons die Bibliothek reactor-extra mit einer verbesserten retry-Funktion mit Verzögerung.

Beispiel

Im folgenden Beispiel (in Kotlin) wird mit der unreliableSource eine unzuverlässige Operation simuliert. In den ersten 3 Sekunden werden im Sekundentakt Exceptions geworfen. Danach wird eine jeweiles um 1 erhöhte Zahl zurückgegeben.

Der eigentlich interessante Teil folgt nach Retry:

import reactor.core.publisher.Flux
import reactor.retry.Retry
import java.time.Duration

fun main(args: Array<String>) {
    var counter = 0
    val unreliableSource = Flux.interval(Duration.ofSeconds(1))
            .map {
                counter++
                if (counter < 4) throw RuntimeException("test") else counter
            }

    val resilientSource = Retry.anyOf<Any>(RuntimeException::class.java)
            .doOnRetry { println("retrying ... ${it.exception()}") }
            .retryMax(5)
            .fixedBackoff(Duration.ofSeconds(3))
            .apply(unreliableSource)

    resilientSource.subscribe({ println("success: $it") }, { println("ERROR") })

    Thread.sleep(20000)
}

Schließlich gibt apply den dekorierten Flux zurück, auf dem dann alle üblichen Reactor-Funktionen aufgerufen werden können; hier bleibt es bei dem abschließenden subscribe.

Das Beispiel führt zu dieser Ausgabe:

[DEBUG] (main) Using Console logging
[DEBUG] (parallel-1) Scheduling retry attempt, retry context: iteration=1 exception=java.lang.RuntimeException: test backoff={3000ms}
retrying ... java.lang.RuntimeException: test
[DEBUG] (parallel-3) Scheduling retry attempt, retry context: iteration=2 exception=java.lang.RuntimeException: test backoff={3000ms}
retrying ... java.lang.RuntimeException: test
[DEBUG] (parallel-5) Scheduling retry attempt, retry context: iteration=3 exception=java.lang.RuntimeException: test backoff={3000ms}
retrying ... java.lang.RuntimeException: test
success: 4
success: 5
success: 6
success: 7
success: 8
success: 9
success: 10

Die Ausgabe ERROR erscheint nicht, weil in diesem Beispiel kein Fehler bis zum Ende der Verarbeitungskette durchdringt.

Die Wiederholung in einem festen Intervall mit fixedBackoff ist der einfachste Fall, aber es gibt noch ausgefeiltere Strategien wie eine exponentenielle Verlängerung der Zeit mit exponentialBackoff(Duration firstBackoff, Duration maxBackoff) oder zufällige Verzögerungen mit randomBackoff. Letzteres ist besonders dann von Vorteil, wenn mehrere Dienste versuchen, auf einen ausgefallenen Dienst zuzugfreifen, denn wenn alle Dienste im gleichen Takt erneut auf den ausgefallenen Dienst zugreifen, bestünde die Gefahr einer Überlastung.

Eingebunden wird Retry durch folgende Abhängigkeit (Konfiguration mit Gradle):

dependencies {
    implementation('io.projectreactor.addons:reactor-extra')
}

Weitere Information