Stand: 2019-02-06

Fortsetzung folgt!

Effizient und einfach asynchron programmieren mit Kotlin Koroutinen.

Asynchrone Programmierung dient dazu, die Prozessorzeit optimal zu nutzen, indem Wartezeiten für andere Aufgaben genutzt werden. Im normalen Leben tun wir das ständig: Während wir zum Beispiel darauf warten, dass das Wasser für die Nudeln kocht, kümmern wir uns schon mal um die Soße. Übertragen auf ein Computerprogramm bedeutet das, dass ein Thread in Wartezeiten andere Aufgaben ausführt, statt zu blockieren. Deshalb wird dieses Programmiermodell auch als non-blocking bezeichnet.

Threads nicht zu blockieren, sondern sie mit anderen Aufgaben zu beschäftigen, führt vor allem unter hoher Last zu einem höheren Durchsatz, weil die Anzahl der tatsächlich nutzbaren Threads begrenzt ist. Man kann zwar ohne weiteres mehr Threads in einem Programm verwenden, als der Computer CPU-Kerne hat, aber für jeden Wechsel eines nativen Threads ist auch ein relativ teurer Kontextwechsel des Prozessors fällig. Falls eine sehr große Zahl paralleler Threads benötigt werden sollte, wäre zudem auch irgendwann der Hauptspeicher an seiner Grenze. Praktisch kostenlos ist es hingegen, eine Koroutine zu pausieren und zu einer anderen zu wechseln.

Routiniert asynchron programmieren

Kotlin bietet mit Koroutinen ein besonders gut lesbares und einfach nutzbares Mittel zur asynchronen Programmierung. Bei Koroutinen handelt es sich um Funktionen, die bei der Ausführung unterbrochen und später wieder fortgesetzt werden können, ohne in der Zwischenzeit einen Thread zu blockieren. Aus Sicht des Programmierers ähnelt eine Koroutine einem Thread, nur dass dieser sehr viel leichtgewichtiger ist und problemlos zig tausend davon parallel existieren können. Der Zustand bei der Unterbrechung wird bis zur Fortsetzung gespeichert.

Dass der Code ähnlich einfach und gut nachvollziehbar wie bei herkömmlicher (blockierender) Programmierung aussieht, ist der große Vorteil von Koroutinen gegenüber anderen Ansätzen wie Futures. Man muss keine verschachtelten Callbacks mehr schreiben – und später auch noch verstehen (Stichwort: Callback Hell). Technischer Ballast wie thenApply, thenAccecpt oder thenCompose, wie bei der Programmierung mit CompletableFutures entfällt weitgehend. Der komplizierte Teil der asynchronen Programmierung ist in Bibliotheken verborgen. Eine Bibliothek verpackt die betreffenden Teile des Codes in Callbacks, lauscht auf Ereignisse und steuert die Ausführung auf verschiedenen Threads.

Der Ansatz, die eigentlichen Mittel zur asynchronen Programmierung in Bibliotheken bereitzustellen, ist auch ein wesentlicher Unterschied zu anderen Sprachen mit ähnlichen Konzepten wie C# oder Python, die dafür jeweils mehrere Schlüsselwörter hinzubekommen haben. In Kotlin gibt es für diesen Zweck lediglich das Schlüsselwort suspend, um eine Funktion als aussetzbar zu markieren – alles andere findet in Bibliotheken statt. Dadurch wird eine große Flexibilität erreicht, die die Implementierung von async/await wie in C#, Channels und select wie in Go, Generatoren und yield wie in C# oder Python oder auch Aktoren ermöglicht.

Zutaten

Um die folgenden Beispiele nachvollziehen zu können, installierst du dir am besten IntelliJ IDEA (die kostenlose Community Edition reicht). Alles nötige für die Kotlin-Programmierung ist darin schon enthalten und auch Koroutinen werden unterstützt. Lege als Rahmen für die Beispiele dann ein neues Projekt an. In den Beispielen wird Gradle als Build Tool verwendet, die mit IntelliJ ausgelieferte Gradle-Version ist ausreichend.

Prüfung von Koroutinen in IntelliJ
Koroutinen werden von IntelliJ IDE so gut unterstützt wie altbekannte Sprachmerkmale.

Wie schon erwähnt, sind Koroutinen zum großen Teil in einer separaten Bibliothek implementiert, die du in der Datei “build.gradle” als Abhängigkeit angibst:

dependencies {
    ...
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.1.1'
}

Eine komplette Gradle-Datei für ein Kotlin-Projekt mit Koroutinen sieht folgendermaßen aus:

buildscript {
    ext.kotlin_version = '1.3.20'

    repositories {
        jcenter()
    }
    dependencies {
        classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:1.3.20"
    }
}

apply plugin: 'kotlin'
apply plugin: 'application'

mainClassName = "blocking.RunblockingKt"

repositories {
    jcenter()
}

dependencies {
    implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.1.1'
}

compileKotlin {
    kotlinOptions.jvmTarget = "1.8"
}
compileTestKotlin {
    kotlinOptions.jvmTarget = "1.8"
}

Das Tor zur asynchronen Welt

Im Gegensatz zu Threads erfolgt die Unterbrechung der Ausführung nicht an einem beliebigen Punkt, sondern nur an sogenannten “suspension points”. Das sind in Kotlin Aufrufe von Funktionen, die mit dem Schlüsselwort suspend markiert sind:

suspend fun greet(name: String) {
    delay(1000) // 1 Sekunde nicht-blockierend warten
    println("$name!")
}
Kennzeichnung von suspending Function in IntelliJ
Aufrufe von suspending Functions kennzeichnet IntelliJ IDEA durch ein Symbol.

Die Anweisung delay dient hier und in weiteren Beispielen übrigens nur stellvertretend für eine reale Verzögerung, die zum Beispiel durch das Warten auf eine Antwort eines externen Systems wie einer Datenbank zustande käme.

Suspending Functions können nicht von überall aufgerufen werden, stattdessen muss man die asynchrone Welt explizit betreten. Das geschieht durch einen Coroutine Builder. Ein Coroutine Builder ist eine reguläre Funktion, die eine suspending Function als Parameter entgegen nimmt und den nötigen Kontext zur asynchronen Ausführung bereitstellt. Ein Beispiel dafür ist die Funktion launch, der ein Block zur asynchronen Ausführung übergeben wird und die eine Referenz auf die erzeugte Koroutine als Job zurückgibt. Der Job selbst enthält keinen Ergebniswert, weshalb sich launch zur Ausführung von Anweisungen mit Seiteneffekten eignet (“fire and forget”).

Die Funktion greet von oben wird im folgenden Beispiel innerhalb einer von launch erzeugten Koroutine aufgerufen:

fun main() {
    GlobalScope.launch {
        greet("Pia")
        print("Hi ")
    }

    // JVM am Leben halten
    Thread.sleep(2000)

    println("Ende.")
}

Zuerst wird “Hi " ausgegeben, während parallel greet in der Koroutine ausgeführt wird. Da greet zunächst eine Sekunde verstreichen lässt, folgt die Ausgabe von “Pia!” verzögert. Nach rund einer Sekunde ist die Ausgabe “Hi Pia!” dann komplett. Thread.sleep ist nötig, damit der Haupt-Thread des Programms nicht schon vor dem Abschluss der Koroutine beendet wird.

launch ist eine reguläre Funktion, die einen mit suspend markierten Block zur asynchronen Ausführung entgegen nimmt. Stark vereinfacht sieht die Signatur so aus:

fun launch(block: suspend CoroutineScope.() -> Unit): Job

Als Parameter nimmt sie den Block mit dem Inhalt der Koroutine entgegen und gibt eine Referenz auf den erzeugen Job zurück. Über diese kann man den Job zum Beispiel abbrechen.

Wenn nicht nur eine oder mehrere Anweisungen asynchron ausgeführt werden sollen, sondern der auszuführende Block ein Ergebnis zurückliefern soll, wird der Coroutine Builder async verwendet. async gibt ein Ergebnis vom Typ Deferred<T> zurück. Objekte dieses Typs sind ein Platzhalter für ein Ergebnis, das erst später verfügbar ist. Im Prinzip ist das das Gleiche wie ein “Future”. Deferred besitzt eine Methode await mit der, ohne einen Thread zu blockieren, auf das Ergebnis gewartet werden kann.

Im folgenden Beispiel wird die suspending Function calculate in zwei async-Blöcken aufgerufen. Die aufgeschobenen Ergebnisse dieser Blöcke werden durch deferred1 und deferred2 referenziert. In einem launch-Block wird schließlich mit await() auf die Ergebnisse gewartet. Wenn beide verfügbar sind, wird die Summe ausgegeben:

fun main() {

    val deferred1 = GlobalScope.async {
        calculate(1)
    }

    val deferred2 = GlobalScope.async {
        calculate(2)
    }

    GlobalScope.launch {
        val sum = deferred1.await() + deferred2.await()
        println(sum)
    }

    Thread.sleep(2000)
}

suspend fun calculate(x: Int): Int {
    delay(1000)
    return x * 2
}

launch wird hier verwendet, weil await() ebenfalls eine suspending Function ist, die in einem CoroutineScope aufgerufen werden muss.

Um die Brücke zwischen blockierendem und nicht blockierendem Code zu schlagen, gibt es den Coroutine Builder runBlocking. Diese Funktion führt asynchronen Code aus, blockiert aber den aktuellen Thread, bis der übergebene Block abgearbeitet ist. runBlocking eignet sich zum Beispiel , um in der main-Funktion asynchronen Code zu nutzen. Damit kann auch das etwas unbeholfene Thread.sleep aus dem vorigen Beispiel entfallen, weil der Haupt-Thread bis zum Ende am Leben gehalten wird.

Potenziell unendliche Folgen können mit dem Coroutine Builder buildSequence erzeugt werden. Im folgenden Beispiel wird beim Betreten der Sequenz “Start” ausgegeben. Mit yield wird ein weiterer Wert geliefert. Danach wird “weiter” ausgegeben. Am Ende der Sequenz wird schließlich “Ende.” ausgegeben.

val seq: Sequence<Int> = sequence {
    println("Start")
    for (i in 1..10) {
        yield(i)
        println("weiter")
    }
    println("Ende.")
}

seq.take(12).forEach(::println)

Wenn mit seq.take(1) nur ein Element der Sequenz abgerufen wird, wird nur

Start
1

ausgegeben. Die Ausführung kommt also nicht mal bis “weiter”. Erst wenn ein weiteres Element abgerufen wird, wird die Ausführung an dieser Stelle fortgesetzt. seq.take(3) führt zu dieser Ausgabe:

Start
1
weiter
2
weiter
3

Bis zum Ende würde man im Beispiel nur mit einer Zahl größer oder gleich 11 kommen: seq.take(11). Dieses Beispiel zeigt, dass die Sequenz “lazy” ist und immer nur so weit wie nötig ausgeführt wird. Die so erzeugte Sequenz ist übrigens eine ganz normale Sequenz vom Typ Sequence<Int>, die ohne async, launch oder ähnliches genutzt werden kann; lediglich der interne Ablauf ist asynchron.

Plug and Play

Es gibt noch eine ganze Reihe weiterer Coroutine Builder, die über zusätzliche Bibliotheken bereitgestellt werden. Damit können zum Beispiel Android, JavaFX, Reactor, RxJava, NIO oder CompletableFutures mit Koroutinen genutzt werden. Eine Übersicht findet sich unter https://kotlin.github.io/kotlinx.coroutines/

Federleicht

Wie leichtgewichtig Koroutinen sind, lässt sich leicht herausfinden, wenn man sehr viele von ihn parallel startet:

fun main() = runBlocking {
    val jobs = (1..1_000_000).map {
        launch {
            delay(1000)
            print(".")
        }
    }
    jobs.forEach { it.join() }
}

Eine Millionen parallele Koroutinen sind überhaupt kein Problem. Innerhalb von ein paar Sekunden sind sie abgearbeitet. Das System verschwendet keine kostbaren Ressourcen mit schlafenden Threads.

Thread-Ansicht in VisualVM
Mit Koroutinen zeigt VisualVM nur wenige Threads. Keiner davon schläft.

Wenn man in dem Beispiel Koroutinen durch gewöhnliche Threads ersetzt, indem man launch durch thread und delay durch sleep ersetzt, läuft das Programm für eine gefühlte Ewigkeit.

fun main() = runBlocking {
    val threads = (1..1_000_000).map {
        thread {
            sleep(1000)
            print(".")
        }
    }
    threads.forEach { it.join() }
}

Auf schwächeren Rechnern könnte es auch mit einem OutOfMemoryError abstürzen. Wenn man sich das laufende Programm mit einem Profiler ansieht, entdeckt man tausende schlafende Threads. Wer es selbst ausprobieren will, findet mit VisualVM im “bin”-Verzeichnis des JDK einen Profiler (jvisualvm.exe).

Thread-Ansicht in VisualVM
VisualVM zeigt tausende parallele Threads, die nichts anderes tun als zu schlafen und Ressourcen zu verschwenden.

Immer schön der Reihe nach

Die Anweisungen innerhalb einer Koroutine werden standardmäßig der Reihe nach ausgeführt. Dieses Verhalten kann in der Praxis zum Beispiel dazu genutzt werden, abhängig vom Ergebnis der ersten Methode die zweite Methode aufzurufen oder eben nicht.

Ein einfaches Beispiel veranschaulicht die sequenzielle Abarbeitung:

fun main() {
    GlobalScope.launch {
        greet("Pia")
        print("Hi ")
    }

    // JVM am Leben halten
    Thread.sleep(2000)
}

Die Anweisung Anweisung print("Hi ") aus dem ersten ersten Beispiel steht nun innerhalb der Koroutine. Dadurch wird mit einer Sekunde Verzögerung “Pia” (sofort) gefolgt von “Hi " ausgegeben. print wird also erst aufgerufen nachdem greet zurückgekehrt ist. Dass greet mit dem Schlüsselwort suspend markiert ist, ändert nichts an der seriellen Ausführung innerhalb der Koroutine.

Serielle Ausführung innerhalb einer Koroutine ist ein sinnvolles Standardverhalten, da es leicht nachvollziehbar ist und parallele Verarbeitung Herausforderungen mit sich bringen kann, die man sich nicht versehentlich einhandeln sollte. Der Code innerhalb einer Koroutine ist gewöhnlicher Code ohne irgendwelche Magie.

Parallelwelt

Wenn die Aufrufe der Funktionen foo und bar nicht voneinander abhängen und deshalb parallelisiert werden können, kann jeder Aufruf in einer eigenen Koroutine erfolgen. Dazu wird der Funktionsaufruf in einen async-Block verpackt und am Ende mit await gewartet, bis beide fertig sind:

fun main() = runBlocking {
    val time = measureTimeMillis {
        val a = async { foo() }
        val b = async { bar() }
        println("Ergebnis: ${a.await() + b.await()}")
    }
    println("Zeit: $time ms")
}

suspend fun foo(): Int {
    delay(1000)
    return 1
}

suspend fun bar(): Int {
    delay(1000)
    return 2
}

public inline fun measureTimeMillis(block: () -> Unit): Long {
    val start = System.currentTimeMillis()
    block()
    return System.currentTimeMillis() - start
}

Ausgabe:

Ergebnis: 3
Zeit: 1031 ms

Die Ausgabe zeigt, dass die gesamte Ausführungszeit nur etwas mehr als eine Sekunde gedauert hat. Wären foo und bar nacheinander ausgeführt worden, hätten es mindestens zwei Sekunden sein müssen; beide Methoden wurden also parallel ausgeführt.

Den Job kündigen

Wenn es mal wieder länger dauert, braucht man eine Möglichkeit, um die Ausführung einer Koroutine zu beenden. Zu diesem Zweck gibt die Funktion launch einen Job zurück, der die Methode cancel anbietet.

Im folgenden Beispiel wird mit launch ein Job erzeugt, der für eine Minute schläft. So lange wartet das Programm aber nicht, sondern beendet nach 10 Sekunden genervt den Job:

fun main() = runBlocking {
    val job = launch {
        println("Tiefschlaf ...")
        delay(60_000)
        println("ausgeschlafen")
    }
    delay(10_000)
    println("Jetzt reicht's!")
    job.cancelAndJoin()
    println("Ende.")
}

cancel() veranlasst den Abbruch und join() wartet, bis er abgeschlossen ist. Da das eine gängige Kombination ist, können beide Schritte mit dem Aufruf cancelAndJoin() auch gemeinsam erledigt werden.

Die Ausgabe lautet:

Tiefschlaf ...
ausgeschlafen
Ende.

Zeit zum Ausschlafen hat die Koroutine also nicht!

Wenn eine Koroutine wiederum Koroutinen enthält, beendet cancel() auch diese.

Unter der Haube

Koroutinen werden in Kotlin ausschließlich durch den Compiler und Bibliotheken realisiert; eine Unterstützung durch die JVM oder das Betriebssystem ist nicht erforderlich. Der Compiler übersetzt Koroutinen in einen Zustandsautomaten, bei dem der Aufruf einer suspending Function einem Zustand entspricht. Unmittelbar vor dem Aussetzen wird der Zustand zusammen mit relevanten lokalen Variablen in einer vom Compiler erzeugen Klasse gespeichert. Wenn eine Koroutine fortgesetzt wird, wird der Zustand wiederhergestellt.

Als Beispiel für einen Coroutine Builder soll hier noch mal die Funktion launch genauer betrachtet werden:

fun CoroutineScope.launch(
    context: CoroutineContext = DefaultDispatcher,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job

Der context ist die Ausführungsumgebung der Koroutine; er enthält nicht nur den zur Ausführung nötigen Zustand sondern auch noch einen Dispatcher, der für die Zuweisung eines Threads zuständig ist. Der Standardwert des Parameters start, CoroutineStart.DEFAULT, besagt, dass die Koroutine zur sofortigen Ausführung eingeplant werden soll (eine andere Option wäre etwa LAZY). Der dritte Parameter ist schließlich der Code-Block, der asynchron ausgeführt werden soll. Entscheidend ist hier das Schlüsselwort suspend, denn das bedeutet, dass es sich um eine Funktion handelt, deren Ausführung ausgesetzt werden kann. Die Syntax CoroutineScope.() -> Unit ist ein Funktionsliteral mit CoroutineScope als Empfänger, was heißt, dass sich nicht weiter qualifizierte Funktionsaufrufe auf den CoroutineScope beziehen (implizites this). Der Rückgabewert vom Typ Job ist eine Referenz auf die Koroutine und dient zum Beispiel zum Abbrechen.

Eine ausgesetzte Koroutine kann wie jedes andere Objekt gespeichert und herumgereicht werden. Sie ist vom Typ Continuation. Eine Continuation ist der Zustand einer ausgesetzten Koroutine am suspension point und repräsentiert den verbleibenden Teil der Ausführung, eben die Fortsetzung (engl. continuation).

Die Schnittstelle für Continuations stellt eine Rückruffunktion (Callback) dar:

interface Continuation<in T> {
    val context: CoroutineContext
    fun resume(value: T)
    fun resumeWithException(exception: Throwable)
}

Der CoroutineContext ist eine Datenstruktur, die einer Map ähnlich ist, und beliebige Daten aufnehmen kann. In dem Kontext können zum Beispiel Informationen zu Berechtigungen oder Transaktionen gespeichert werden, die bei synchron ausgeführten Anwendungen häufig an einen Thread gebunden sind und über ein ThreadLocal-Object bereitgestellt werden. Die Funktionen resume und resumeWithException dienen dazu, die Ausführung einer Koroutine fortzusetzen, wobei erstere im Erfolgsfall und letztere im Fehlerfall aufgerufen wird.

Wenn innerhalb einer Koroutine suspendCoroutine aufgerufen wird, wird der Ausführungszustand in einer Continuation-Instanz gespeichert. Dieser Zustand wird dann bei der Fortsetzung dem Block übergeben, die die Ausführung fortsetzt.

suspend fun <T> suspendCoroutine(block: (Continuation<T>) -> Unit): T

Ausgeführt werden Koroutinen normalerweise durch einen Thread Pool. Wenn die Ausführung fortgesetzt werden soll, bekommt eine Koroutine einen Thread aus dem Pool zugewiesen, der wieder zurückgegeben wird, wenn die Ausführung unterbrochen oder beendet wird. Wie die Abbildung von Koroutinen auf Threads erfolgt, kann durch verschiedene Implementierungen beeinflusst werden.

Fazit

Koroutinen vereinfachen die asynchrone Programmierung enorm. Zudem können ohne großen Aufwand alle möglichen synchronen und asynchronen Frameworks mit Koroutinen verbunden werden. Dadurch das Kotlin selbst nur die grundlegendsten Elemente zur Umsetzung von Koroutinen bereitstellt und alles andere in Bibliotheken realisiert ist, können leicht neue Ansätze implementiert werden. Noch stehen die Koroutinen in Kotlin ganz am Anfang, aber sie haben beste Chancen zu einem Standardwerkzeug der asynchronen Programmierung zu werden.

Glossar

Eine Koroutine ist eine Instanz einer aussetzbaren Berechnung. Prinzipiell ähnelt sie einem Thread, der zur Ausführung eines Code-Blocks erzeugt und gestartet wird, aber eine Koroutine ist nicht an einen bestimmten Thread gebunden. Die Ausführung kann unterbrochen und später auf einem anderen Thread fortgesetzt werden.

Eine suspending Function ist mit dem Modifikator suspend markierte Funktion deren Ausführung ausgesetzt werden kann, ohne den aktuellen Thread zu blockieren. So eine Funktion kann nicht von regulärem Code, sondern nur von anderen suspending Functions oder Koroutinen aufgerufen werden. Die Standardbibliothek stellt primitive suspending Functions bereit, die die Grundlage für alle anderen suspending Functions sind. Der Modifikator suspend kann mit fast allen Arten von Funktionen verwendet werden: Funktionen auf höchster Ebene, Erweiterungsfunktionen, Funktionen von Klassen (Methoden) oder Operatorfunktionen verwenden. Ausgenommen sind lokale Funktionen, Properties und Konstruktoren; diese Einschränkungen sollen in der Zukunft wegfallen.

Für die Nutzung von Bibliotheken mit Koroutinen sind suspending Lambdas von besonderer Bedeutung. Dabei handelt es sich um anonyme Funktionen, die mit dem suspend-Modifikator markiert sind. Sie werden zum Beispiel als Code-Blöcke hinter die Funktionen launch und async geschrieben.

Ein suspending function type ist der Typ einer aussetzbaren Funktion, unabhängig davon ob sie anonym ist oder nicht. suspend (Int) -> Boolean beschreibt eine aussetzbare Funktion, die einen Integer annimmt und einen Boolean zurückgibt.

Ein Coroutine Builder ist eine Funktion, die ein suspending Lambda als Parameter entgegen nimmt und eine Koroutine erzeugt. Beispiele sind launch oder buildSequence.

Ein suspension point ist ein Punkt, beim die Ausführung einer Koroutine ausgesetzt werden kann. In Kotlin ist das der Aufruf einer suspending Function.

Eine Continuation ist der Zustand einer ausgesetzten Koroutine am suspension point und repräsentiert den verbleibenden Teil der Ausführung.

Weitere Information