Python Python

Guaranteed Cleanup: Resource Management with Python Context Managers

Dima Июн 30, 2026

Introduction

Every program that opens a file, acquires a lock, or establishes a database connection has the same problem: the cleanup must happen, even when something goes wrong. The naive solution is a try/finally block — connect in the try, disconnect in the finally. This works, but it puts the cleanup logic at the call site. Every developer who uses the resource must remember to write the same boilerplate, in the correct order, every time. When they forget, connections leak and files stay open.

Python's context manager protocol solves this by moving the cleanup into the resource itself. The with statement guarantees that setup and teardown run as a matched pair, with the teardown happening unconditionally regardless of exceptions. More importantly, the teardown logic lives where it belongs — inside the class that understands the resource — rather than scattered across every call site.

This tutorial builds a trading data export pipeline. Each section introduces one piece of the context manager machinery: the protocol, exception handling, generator-based shortcuts, targeted exception suppression, and dynamic resource stacks.


Background

The with statement works with any object that implements two methods:

  • __enter__ — called when the with block is entered. Its return value is bound to the as target.
  • __exit__(exc_type, exc_val, exc_tb) — called when the with block exits, whether normally or due to an exception. The three arguments are the exception class, instance, and traceback; all three are None if no exception occurred. Returning True suppresses the exception; returning False or None lets it propagate.

The standard library's contextlib module provides three tools that cover the remaining patterns:

  • @contextmanager — turns a generator function into a context manager. Code before yield runs on enter; code after runs on exit.
  • suppress(*exceptions) — a context manager that silently discards any of the listed exception types.
  • ExitStack — a context manager that holds other context managers and ensures all of them are cleaned up, even when the set is built at runtime.

Practical Scenario

A trading platform's end-of-day reporting service queries a database for daily quote data, writes the results to CSV reports, and logs timing information for each pipeline stage. Multiple report jobs can run concurrently, so each job writes a lock file on start and removes it on finish to prevent duplicate exports.

The service also supports per-symbol exports: when a client requests breakdowns by ticker, the pipeline opens one output file per symbol, writes to all of them simultaneously, and closes them all cleanly — even if the symbol list is not known until the query returns.

Every cleanup — disconnecting from the database, removing the lock file, closing output files — must happen whether the pipeline succeeds or raises. A connection that leaks on exception means the next job may find the database pool exhausted. A lock file left behind after a crash blocks all future exports until someone notices and removes it manually.


The Problem with Manual Resource Management

A first version of the pipeline manages the database connection by hand.

Create a new file:

touch trading_pipeline.py

Run it using:

python3 trading_pipeline.py
import csv

class DatabaseConnection:
    def __init__(self, host):
        self.host = host
        self.connected = False

    def connect(self):
        print(f"[DB] Connecting to {self.host}")
        self.connected = True

    def disconnect(self):
        print(f"[DB] Disconnecting from {self.host}")
        self.connected = False

    def query(self, sql):
        if not self.connected:
            raise RuntimeError("Query attempted on closed connection")
        return [
            {"symbol": "AAPL", "price": 185.20, "volume": 12400000},
            {"symbol": "MSFT", "price": 378.85, "volume":  8900000},
            {"symbol": "GOOGL", "price": 141.50, "volume":  5200000},
        ]

OUTPUT = "/home/coder/learning/report.csv"

def export_report():
    db = DatabaseConnection("db.trading.internal")
    db.connect()
    try:
        rows = db.query("SELECT symbol, price, volume FROM quotes")
        with open(OUTPUT, "w", newline="") as f:
            writer = csv.DictWriter(f, fieldnames=["symbol", "price", "volume"])
            writer.writeheader()
            writer.writerows(rows)
        print(f"Exported {len(rows)} rows to {OUTPUT}")
    finally:
        db.disconnect()

export_report()


[DB] Connecting to db.trading.internal
Exported 3 rows to /home/coder/learning/report.csv
[DB] Disconnecting from db.trading.internal


Every call to export_report requires a matching try/finally around db.connect() and db.disconnect(). A developer who adds a second database call elsewhere in the codebase must replicate this pattern correctly — or connections leak silently. The cleanup logic belongs inside DatabaseConnection, which already knows how to manage itself, not duplicated at every call site.


__enter__ and __exit__: The Context Manager Protocol

Adding __enter__ and __exit__ to DatabaseConnection moves the connect/disconnect pair inside the class. The with statement then guarantees cleanup without any try/finally at the call site.

Replace the DatabaseConnection class and export_report function with the following:

class DatabaseConnection:
    def __init__(self, host):
        self.host = host
        self.connected = False

    def connect(self):
        print(f"[DB] Connecting to {self.host}")
        self.connected = True

    def disconnect(self):
        print(f"[DB] Disconnecting from {self.host}")
        self.connected = False

    def query(self, sql):
        if not self.connected:
            raise RuntimeError("Query attempted on closed connection")
        return [
            {"symbol": "AAPL", "price": 185.20, "volume": 12400000},
            {"symbol": "MSFT", "price": 378.85, "volume":  8900000},
            {"symbol": "GOOGL", "price": 141.50, "volume":  5200000},
        ]

    def __enter__(self):
        self.connect()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.disconnect()
        return False

def export_report():
    with DatabaseConnection("db.trading.internal") as db:
        rows = db.query("SELECT symbol, price, volume FROM quotes")
        with open(OUTPUT, "w", newline="") as f:
            writer = csv.DictWriter(f, fieldnames=["symbol", "price", "volume"])
            writer.writeheader()
            writer.writerows(rows)
        print(f"Exported {len(rows)} rows to {OUTPUT}")

export_report()


The output is unchanged.

__enter__ calls connect() and returns self — the value bound to the as db target. __exit__ calls disconnect() unconditionally and returns False, which tells Python to let any exception continue propagating. The with block handles both success and failure without any code at the call site.

The cleanup logic lives inside DatabaseConnection exactly once. Any with DatabaseConnection(...) block anywhere in the codebase gets the same guaranteed teardown, regardless of what happens inside it. Developers using the class cannot forget the cleanup — the protocol enforces it.


Exception Handling in __exit__

__exit__ receives three arguments when an exception escapes the with block: the exception class (exc_type), the exception instance (exc_val), and the traceback (exc_tb). When the block exits normally, all three are None. This lets the teardown logic react differently depending on whether the resource is being released cleanly or after a failure.

Replace __exit__ in DatabaseConnection and add an error scenario:

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type is not None:
            print(f"[DB] Session ended with error: {exc_val}")
        self.disconnect()
        return False

def export_with_error():
    with DatabaseConnection("db.trading.internal") as db:
        rows = db.query("SELECT symbol, price, volume FROM quotes")
        print(f"Queried {len(rows)} rows")
        raise ValueError("Validation failed: stale prices in feed")

try:
    export_with_error()
except ValueError as e:
    print(f"Export aborted: {e}")


[DB] Connecting to db.trading.internal
Queried 3 rows
[DB] Session ended with error: Validation failed: stale prices in feed
[DB] Disconnecting from db.trading.internal
Export aborted: Validation failed: stale prices in feed


disconnect() runs even though the with block raised. The __exit__ method saw the exception via exc_type and logged it before cleaning up. Because __exit__ returns False, the ValueError continues propagating to the outer try/except.

The teardown now carries context about how the session ended — useful for connection-pool accounting, audit logs, or deciding whether to retry. Returning True from __exit__ would suppress the exception entirely; returning False or None lets it propagate. A transaction manager, for example, would commit on None and roll back on any exception type.

Note: Raising a new exception inside __exit__ replaces the original exception. If the cleanup itself might fail, wrap it in its own try/except to preserve the original error.


The @contextmanager Decorator

Writing a class for every context manager is verbose when all you need is a short setup/teardown pair. The contextlib.contextmanager decorator turns a generator function into a context manager: code before yield runs on enter, code after runs on exit. Wrapping the yield in try/finally guarantees the teardown even when the with block raises.

Add the following import and timed_section function, then update export_report:

from contextlib import contextmanager
import time

@contextmanager
def timed_section(label):
    start = time.perf_counter()
    try:
        yield
    finally:
        elapsed = time.perf_counter() - start
        print(f"[TIMER] {label}: {elapsed:.3f}s")

def export_report():
    with DatabaseConnection("db.trading.internal") as db:
        with timed_section("query"):
            rows = db.query("SELECT symbol, price, volume FROM quotes")

        with timed_section("write"):
            with open(OUTPUT, "w", newline="") as f:
                writer = csv.DictWriter(f, fieldnames=["symbol", "price", "volume"])
                writer.writeheader()
                writer.writerows(rows)

        print(f"Exported {len(rows)} rows to {OUTPUT}")

export_report()


[DB] Connecting to db.trading.internal
[TIMER] query: 0.000s
[TIMER] write: 0.001s
Exported 3 rows to /home/coder/learning/report.csv
[DB] Disconnecting from db.trading.internal


timed_section has no state beyond the start timestamp, so a generator function is far more concise than a class with __init__, __enter__, and __exit__. The value passed to yield becomes the as target — yielding nothing, as here, means the with block gets no bound value. The finally around the yield ensures the elapsed time prints whether or not the timed block raises.

A generator-based context manager requires no class boilerplate for stateless setup/teardown pairs. Any function that has a natural "before" and "after" phase — acquiring a lock, changing a working directory, patching configuration for a test — becomes a one-screen context manager with @contextmanager. The try/finally around yield is not optional: without it, an exception in the with block skips everything after yield.


contextlib.suppress: Silencing Expected Exceptions

Some cleanup operations are best-effort: the operation should be attempted, but a specific failure should be ignored rather than handled. contextlib.suppress provides a context manager that silences the listed exception types without requiring an explicit try/except with a pass body.

Add a lock file that signals an export is in progress. Replace export_report:

import os
from contextlib import suppress

LOCK_PATH = "/home/coder/learning/export.lock"

def export_report():
    open(LOCK_PATH, "w").close()
    print(f"Lock acquired: {LOCK_PATH}")

    try:
        with DatabaseConnection("db.trading.internal") as db:
            with timed_section("query"):
                rows = db.query("SELECT symbol, price, volume FROM quotes")

            with timed_section("write"):
                with open(OUTPUT, "w", newline="") as f:
                    writer = csv.DictWriter(f, fieldnames=["symbol", "price", "volume"])
                    writer.writeheader()
                    writer.writerows(rows)

            print(f"Exported {len(rows)} rows to {OUTPUT}")
    finally:
        with suppress(FileNotFoundError):
            os.remove(LOCK_PATH)
        print("Lock released")

export_report()


Lock acquired: /home/coder/learning/export.lock
[DB] Connecting to db.trading.internal
[TIMER] query: 0.000s
[TIMER] write: 0.001s
Exported 3 rows to /home/coder/learning/report.csv
[DB] Disconnecting from db.trading.internal
Lock released


If a previous crash left no lock file to remove, os.remove raises FileNotFoundError. Without suppress, that error would obscure or replace the original exception that caused the crash. with suppress(FileNotFoundError) handles the case in one line without an exception handler that has nothing useful to do.

The alternative is try: os.remove(LOCK_PATH) except FileNotFoundError: pass — three lines that communicate "delete this, ignore missing" at the cost of a full exception handler. with suppress(FileNotFoundError): os.remove(LOCK_PATH) says the same thing in one line that reads like an intent statement. suppress accepts multiple exception types: suppress(FileNotFoundError, PermissionError) handles both in the same form.


contextlib.ExitStack: Managing Dynamic Resources

When the set of resources to manage is not known until runtime, nested with statements are not an option. contextlib.ExitStack collects context managers at runtime and ensures all of them are torn down in reverse order when the stack exits — whether normally or due to an exception.

Replace export_report with a per-symbol export:

from contextlib import ExitStack

SYMBOL_PATHS = {
    "AAPL":  "/home/coder/learning/AAPL_report.csv",
    "MSFT":  "/home/coder/learning/MSFT_report.csv",
    "GOOGL": "/home/coder/learning/GOOGL_report.csv",
}

def export_by_symbol():
    with DatabaseConnection("db.trading.internal") as db:
        rows = db.query("SELECT symbol, price, volume FROM quotes")

        with ExitStack() as stack:
            writers = {}
            for symbol, path in SYMBOL_PATHS.items():
                f = stack.enter_context(open(path, "w", newline=""))
                writer = csv.DictWriter(f, fieldnames=["symbol", "price", "volume"])
                writer.writeheader()
                writers[symbol] = writer

            for row in rows:
                writers[row["symbol"]].writerow(row)

            print(f"Wrote {len(rows)} rows across {len(writers)} files")

export_by_symbol()


[DB] Connecting to db.trading.internal
Wrote 3 rows across 3 files
[DB] Disconnecting from db.trading.internal


stack.enter_context(resource) registers a context manager and returns its __enter__ value — the same value you would get from the as clause of a with statement. When the ExitStack block exits, every registered resource is cleaned up in reverse registration order. If opening the third file raises, the two already-opened files are still closed before the exception propagates.

Three output files require three nested with statements in the static case — and the nesting grows with each additional symbol. When the symbol list comes from a database query, you cannot write the nesting at all. ExitStack handles both cases with the same pattern: add each resource with enter_context, write the processing logic, let the stack exit. Adding a resource is one line regardless of how many already exist.

Note: ExitStack can also register plain cleanup callbacks with stack.callback(fn) — useful for teardown functions that do not implement the context manager protocol.


Summary

Python's context manager protocol gives every resource class the ability to manage its own lifecycle, moving cleanup out of call sites and into the class that understands the resource. This tutorial built a trading data export pipeline across five patterns:

  • __enter__ runs setup and returns the managed object; __exit__ runs teardown unconditionally — together they guarantee the cleanup pair runs as an atomic unit around any with block
  • __exit__ receives exc_type, exc_val, and exc_tb when the block raises; returning False lets the exception propagate, returning True suppresses it — teardown can react to failure (log, rollback) without deciding whether the caller should see the error
  • @contextmanager converts a generator function into a context manager: code before yield is the enter phase, code after is the exit phase, and try/finally around the yield is required to guarantee the exit phase runs on exceptions
  • contextlib.suppress(*exceptions) silences specific exception types in a single line, replacing try/except: pass blocks for cleanup operations where a particular failure is expected and harmless
  • contextlib.ExitStack accumulates context managers at runtime via enter_context and tears them all down in reverse order on exit — the only correct solution when the number of resources is not known until the code runs

Чтобы получить доступ к облачной лаборатории, необходимо войти в систему.

Войти