Lernreise 4 — Den Loop nicht blockieren

Was du nach dieser Reise kannst

Du verstehst, warum eine TUI / GUI / Server-App auf einem einzigen Thread läuft, was ein „Event-Loop" ist, und welche Auswirkung eine einzige blockierende Funktion hat. Du erkennst blockierende Aufrufe in deinem Code, triagest sie nach Intention und wählst den richtigen Pattern für jeden Fall.

Konkret nach Tier 3: Du hast eine eigene Subprocess-Triage-Matrix und einen Plan, wie du existierenden Code daraufhin sweepst.

Anker — der UI-Freeze, der nicht aussah wie ein Subprocess-Problem

In iMenu öffnete der Benutzer das Sessions-Overlay. Im Hintergrund wurde alle 2 Sekunden tmux list-sessions aufgerufen. Eines Tages hing ein tmux-Server. Der subprocess.run("tmux list-sessions") blockierte. Das Overlay fror ein. Die ganze App war tot — Tastendrücke verpufften.

Drei subtile Aspekte dieses Bugs:

  1. Es war Code, der schon hunderte Mal funktioniert hatte.
  2. Es gab keinen Fehler, keinen Stack-Trace — nur Stille.
  3. Die schuldige Zeile war subprocess.run(["tmux", "list-sessions"]) — ohne timeout=.

Ein Audit der SessionBridge fand 18 weitere Stellen mit derselben Eigenschaft.


Tier 1 · Erkennen — Was ist ein Event-Loop?

Eine TUI hat einen einzigen Hauptthread, der in einer Schleife Folgendes tut:

TXT
while running:
    event = wait_for_next_event()       # Tastendruck, Timer, IO
    handle(event)                       # Widgets aktualisieren, etc.
    redraw_if_needed()

wait_for_next_event ist die einzige Stelle, an der der Thread „pausieren" darf — überall sonst muss er schnell wieder zurückkehren. Wenn handle(event) 30 Sekunden braucht, ist die App für 30 Sekunden tot.

Das gleiche Muster gilt für:

  • Browser-JavaScript (eine Event-Loop pro Tab)
  • Node.js-Server (eine Event-Loop pro Worker)
  • GUI-Frameworks (Qt, GTK, Tkinter)
  • Game-Engines (eine Frame-Schleife)

Wenn du das Pattern in einem dieser Kontexte gelernt hast, gilt es in allen.

Was „blockierend" wirklich heißt

PYTHON
subprocess.run(["tmux", "list-sessions"])
# blockiert, bis das Subkommando endet — ohne Begrenzung

time.sleep(5)
# blockiert für 5 Sekunden

requests.get("https://api.example.com/...")
# blockiert, bis die HTTP-Antwort da ist (kann unendlich sein)

with open("/proc/whatever") as f:
    data = f.read()
# blockiert, bis das Kernel die Datei liefert

Alle vier sind im Event-Loop-Kontext gefährlich. Die Frage ist nicht „darf ich blockieren?" — sondern „wie begrenze ich die Blockade?"


Tier 2 · Üben — die Subprocess-Triage-Matrix

Wenn dein Code ein Subprocess aufruft, frag erst: Wofür? Drei Hauptfälle, drei Patterns.

Fall A — synchron: muss zu Ende laufen, Ergebnis nutzen

Beispiel: tmux list-sessions — du brauchst die Ausgabe, um eine Liste anzuzeigen.

Lösung: subprocess.run mit timeout=.

PYTHON
import subprocess

def list_tmux_sessions() -> list[str]:
    try:
        result = subprocess.run(
            ["tmux", "list-sessions", "-F", "#{session_name}"],
            capture_output=True,
            text=True,
            timeout=3,            # ← die entscheidende Zeile
        )
    except subprocess.TimeoutExpired:
        return []
    if result.returncode != 0:
        return []
    return result.stdout.splitlines()

Drei Werte gleichzeitig erzwungen:

  • Begrenzung der Blockade durch timeout=
  • Sauberer Fallback durch TimeoutExpired-Behandlung
  • Keine Ausnahmeflut in den UI-Thread

Was ist ein guter timeout-Wert? Pro Aufruf abwägen: „Wenn das nicht in X Sekunden kommt, ist es kaputt." Für tmux-Befehle: 3 Sekunden. Für git status: 5–10. Für API-Aufrufe: je nach Dienst.

Fall B — Fire-and-forget: starten und vergessen

Beispiel: tmux switch-client — der Benutzer wechselt die Session, die TUI wird im selben Moment beendet, niemand wartet auf das Ergebnis.

Lösung: subprocess.Popen ohne .wait().

PYTHON
def switch_to_session(name: str) -> None:
    subprocess.Popen(
        ["tmux", "switch-client", "-t", name],
        stdout=subprocess.DEVNULL,
        stderr=subprocess.DEVNULL,
    )
    # Funktion kehrt sofort zurück — der Kindprozess läuft im Hintergrund

Achtung: Popen ist nicht „macht alles asynchron". Es startet einen Kindprozess. Scheitert dieser, erfährst du es nicht. Wähle den Pattern nur, wenn du das Ergebnis wirklich nicht brauchst.

Fall C — interaktiver Launch: Editor / Browser / Pager

Beispiel: Der Benutzer drückt evim soll im selben Terminal aufgehen → die TUI muss sich erst zurückziehen, sonst kollidieren beide um das Terminal.

Lösung in Textual: with app.suspend():.

PYTHON
def edit_in_external_editor(self, path: str) -> None:
    with self.app.suspend():
        subprocess.run(["vim", path])
        # Die TUI ist während dieses Blocks pausiert
        # vim hat das volle Terminal
    # Nach dem Block: TUI nimmt das Terminal zurück, redraw automatisch

app.suspend() macht drei Dinge gleichzeitig:

  • TUI-Rendering pausiert
  • Terminal-Steuerung wird ans Kindkommando übergeben
  • Beim Verlassen des with-Blocks: Terminal wird neu initialisiert, App rendert neu

Die Matrix zusammengefasst

Frage Pattern
Ich brauche die Ausgabe und kann begrenzt warten subprocess.run(..., timeout=N)
Ich brauche das Ergebnis nicht subprocess.Popen(...)
Der Kindprozess soll mit dem Terminal interagieren with app.suspend():

Vor jedem subprocess.run ohne timeout=: hast du absichtlich Fall A mit unbegrenzter Wartezeit gewählt? Wenn nein — füg timeout= hinzu.


Tier 3 · Übertragen — Sweep + reaktiv

Sweep deines Codes

BASH
# subprocess.run ohne timeout=
grep -rn "subprocess\.run(" --include="*.py" . | grep -v "timeout="

# synchrone HTTP-Aufrufe in UI-Code
grep -rn "requests\.\(get\|post\|put\|delete\)" --include="*.py" .

# time.sleep
grep -rn "time\.sleep(" --include="*.py" .

Pro Treffer: nach der Matrix triagieren. Pro Fall A ohne timeout=: entweder hinzufügen — oder bewusst dokumentieren, warum unbegrenzt OK ist.

Das Gegenstück — reaktive Zustände

Bisher ging es um Code, der zu lange wartet. Es gibt aber ein zweites Anti-Pattern: Code, der zu eifrig reagiert.

Textual hat reactive Felder und watch_<name>-Hooks:

PYTHON
class MyOverlay(ModalScreen):
    count: reactive[int] = reactive(0)

    def watch_count(self, old: int, new: int) -> None:
        self.refresh_display()

Wenn count 100 Mal pro Sekunde sich ändert, läuft refresh_display 100 Mal pro Sekunde. Auch das ist eine Blockade — nicht durch Warten, sondern durch zu viel Arbeit.

*Frage an jeden `watch_`-Hook:** Ist die Arbeit hier billig genug, dass sie jedem Wert-Wechsel folgen darf? Oder muss ich entprellen?

Deine Übung

Wähl ein Stück deines Codes mit Subprocess-Aufrufen oder einer Event-Loop. Erstelle einen kleinen Bericht:

TXT
DATEI: SessionBridge.py
SUBPROCESS-AUFRUFE: 18
- 4 × Fall A  (mit timeout=3 angepasst)
- 11 × Fall B (Popen, fire-and-forget)
-  3 × Fall C (suspend für tmux attach)
RESTRISIKO: keiner ohne explizite Wahl

Wenn dir auch nur ein Aufruf bleibt, der „nur 99% der Zeit schnell ist" — das ist die Zeitbombe.


Was du jetzt weißt

  • Event-Loops sind Single-Threaded. Eine blockierende Funktion = die ganze App friert ein.
  • „Wie begrenze ich die Blockade?" ist die richtige Frage — nicht „darf ich blockieren?"
  • Drei Patterns, eine Wahl: run+timeout, Popen, suspend. Pro Aufruf bewusst.
  • Reaktive Zustände sind das Gegenstück. Zu eifrig reagieren ist auch eine Blockade.

Weiterführende Reisen

  • Code als Daten — AST — bau einen Wächter gegen „subprocess.run ohne timeout=" und hänge ihn als Pre-Commit-Hook ein.
  • Refactoring ohne Zeitbomben — wenn du einen Subprocess-Aufruf umbenennst oder verschiebst: jede Aufrufstelle mitziehen.
  • Testpyramide für TUIs — der Pilot-Test, der diesen Bug nicht finden kann. Manchmal braucht es eine echte tmux-Instanz, die hängt — und das gehört in eine eigene Test-Stufe.
  • Erst verifizieren, dann erweitern — wenn dein Fix „immer noch schnell" ist, hast du nicht verifiziert.

Quellen & Anker