Python Python

Instrumenting a Financial Transaction Pipeline with Python's logging Module

Dima Jun 30, 2026

Introduction

Every production Python service eventually needs more than print(). Print statements cannot be disabled selectively, cannot write to multiple destinations simultaneously, and carry no metadata — no timestamp, no severity level, no module name. When an exception surfaces in a pipeline that has been running overnight, the first thing an engineer needs is a log that tells them not just what went wrong but when, where in the call stack it happened, and with what transaction data. Without structured logging, that information either does not exist or must be reconstructed from fragments of console output.

Python's logging module has been part of the standard library since Python 2.3. It provides the full infrastructure: loggers organized in a dotted hierarchy, handlers that route messages to files or streams, formatters that control the output shape, and severity levels that let you silence development noise without touching production configuration. The difference between a pipeline that fails mysteriously and one that fails traceably is almost entirely in how logging is set up.

The failure mode this tutorial addresses is not "I didn't log anything." It is "I logged everything to stdout with print() and now I have fifty thousand lines of mixed messages with no way to filter by severity, isolate a single transaction ID across pipeline stages, or archive warnings separately from routine status lines." This tutorial builds a financial transaction processing pipeline from scratch, adding one logging capability at a time until the output is production-ready.


Background

The logging module is organized around four objects:

  • Logger: The object your code calls. Created by name with logging.getLogger("name"). Loggers exist in a dotted-name hierarchy — pipeline.validator is a child of pipeline, which is a child of the root logger. Log records propagate up the hierarchy by default.
  • Handler: The destination for records. StreamHandler writes to stderr or stdout. FileHandler writes to a file. A single logger can have multiple handlers.
  • Formatter: Controls the string representation of a record. Configured on the handler. Can include timestamp, level name, logger name, message, and any custom fields.
  • Level: A numeric threshold — DEBUG (10), INFO (20), WARNING (30), ERROR (40), CRITICAL (50). A record must exceed both the logger's level and the handler's level to appear in output.

LogRecord is the object that flows between loggers and handlers. Beyond the message string, it carries %(name)s, %(levelname)s, %(asctime)s, %(filename)s, %(lineno)d, and any custom fields attached via extra=.


Practical Scenario

A fintech startup runs a nightly batch that processes thousands of payment transactions: validates each record's fields, applies currency conversion rates, and writes approved records to a ledger database. The pipeline reads from a CSV export and runs three sequential stages — validation, enrichment, ledger write — producing a summary report.

The current implementation uses print() for all status output. In development this is fine. In production, the batch runs overnight and its output is captured to a log file that the operations team reviews each morning. The team cannot distinguish validation warnings from database errors, cannot filter the file to show only failures, and cannot trace a specific transaction ID through the three pipeline stages because each stage uses a different print format.

When the overnight batch fails partway through, the team's first task is manually grepping a fifty-thousand-line file to find which transaction caused the failure and which stage it failed in. The team needs logging that emits structured lines with timestamps and transaction IDs, routes warnings and above to a separate alerts file, and can be silenced to INFO level in production without changing any pipeline code.


The Problem

Create the initial pipeline file:

touch pipeline.py

Run it using:

python3 pipeline.py
import csv
import io

TRANSACTIONS = """id,amount,currency,status
TXN-001,1500.00,USD,pending
TXN-002,-50.00,EUR,pending
TXN-003,200.00,GBP,pending
TXN-004,abc,USD,pending
TXN-005,750.00,JPY,pending
"""

def validate(txn):
    if float(txn["amount"]) <= 0:
        print(f"WARN: negative amount in {txn['id']}")
        return False
    return True

def enrich(txn):
    rates = {"USD": 1.0, "EUR": 1.08, "GBP": 1.26, "JPY": 0.0067}
    rate = rates.get(txn["currency"])
    if rate is None:
        print(f"ERROR: unknown currency {txn['currency']}")
        return None
    txn["amount_usd"] = round(float(txn["amount"]) * rate, 2)
    return txn

def process():
    reader = csv.DictReader(io.StringIO(TRANSACTIONS.strip()))
    for row in reader:
        try:
            if not validate(row):
                continue
            enriched = enrich(row)
            if enriched:
                print(f"OK: processed {enriched['id']} — ${enriched['amount_usd']}")
        except (ValueError, TypeError) as e:
            print(f"FAILED: {row.get('id', 'unknown')}{e}")

process()


WARN: negative amount in TXN-002
FAILED: TXN-004  could not convert string to float: 'abc'
OK: processed TXN-001  $1500.0
OK: processed TXN-003  $252.0
OK: processed TXN-005  $5.025


There is no timestamp, no severity level in a parseable column, and no module or function name. The WARN and ERROR prefixes are hand-typed strings that cannot be filtered programmatically. The exception for TXN-004 shows the message but no stack trace. Adding a file destination means wrapping every print with duplicate logic or redirecting stdout — neither of which lets warnings and errors go to a different file than INFO messages.


basicConfig and the Root Logger

Replace the entire content of pipeline.py with the following:

import csv
import io
import logging

logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s %(levelname)-8s %(message)s",
    datefmt="%Y-%m-%dT%H:%M:%S",
)

TRANSACTIONS = """id,amount,currency,status
TXN-001,1500.00,USD,pending
TXN-002,-50.00,EUR,pending
TXN-003,200.00,GBP,pending
TXN-004,abc,USD,pending
TXN-005,750.00,JPY,pending
"""

def validate(txn):
    try:
        amount = float(txn["amount"])
    except ValueError:
        logging.error("invalid amount %r in %s", txn["amount"], txn["id"])
        return False
    if amount <= 0:
        logging.warning("non-positive amount in %s", txn["id"])
        return False
    return True

def enrich(txn):
    rates = {"USD": 1.0, "EUR": 1.08, "GBP": 1.26, "JPY": 0.0067}
    txn["amount_usd"] = round(float(txn["amount"]) * rates[txn["currency"]], 2)
    return txn

def process():
    reader = csv.DictReader(io.StringIO(TRANSACTIONS.strip()))
    for row in reader:
        try:
            if not validate(row):
                continue
            enriched = enrich(row)
            logging.info("accepted %s — $%s", enriched["id"], enriched["amount_usd"])
        except Exception:
            logging.exception("unhandled error on %s", row.get("id", "?"))

process()


2024-03-15T14:22:01 WARNING  non-positive amount in TXN-002
2024-03-15T14:22:01 ERROR    invalid amount 'abc' in TXN-004
2024-03-15T14:22:01 INFO     accepted TXN-001  $1500.0
2024-03-15T14:22:01 INFO     accepted TXN-003  $252.0
2024-03-15T14:22:01 INFO     accepted TXN-005  $5.025


logging.exception logs at ERROR level and automatically appends the current exception's traceback — no extra code needed. The %s format arguments are passed separately from the message string so logging can delay string interpolation until the record is actually emitted, avoiding formatting cost when the level is suppressed.

Every line now has a parseable timestamp and a standardized severity field. Grepping for WARNING or ERROR works reliably because the level is in a fixed-width column. One change to the format string updates every log line simultaneously — not every print call.

Note: logging.basicConfig only takes effect if no handlers have been configured on the root logger yet. If any import configures logging before your code does, basicConfig's settings are silently ignored. In production code, configure handlers explicitly rather than relying on basicConfig.


Named Loggers and the Logger Hierarchy

The root logger is global — anything that calls logging.warning() goes through it. Named loggers, created with logging.getLogger("name"), scope log output to a component and participate in the hierarchy: a logger named pipeline.validator propagates records upward to pipeline, then to the root logger. This hierarchy enables fine-grained level control per subsystem without touching any handler code.

Replace the entire content of pipeline.py with the following:

import csv
import io
import logging

logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s %(name)-22s %(levelname)-8s %(message)s",
    datefmt="%Y-%m-%dT%H:%M:%S",
)

logger        = logging.getLogger("pipeline")
validator_log = logging.getLogger("pipeline.validator")
enricher_log  = logging.getLogger("pipeline.enricher")

TRANSACTIONS = """id,amount,currency,status
TXN-001,1500.00,USD,pending
TXN-002,-50.00,EUR,pending
TXN-003,200.00,GBP,pending
TXN-004,abc,USD,pending
"""

def validate(txn):
    try:
        amount = float(txn["amount"])
    except ValueError:
        validator_log.error("invalid amount %r in %s", txn["amount"], txn["id"])
        return False
    if amount <= 0:
        validator_log.warning("non-positive amount %.2f in %s", amount, txn["id"])
        return False
    validator_log.debug("valid: %s amount=%.2f", txn["id"], amount)
    return True

def enrich(txn):
    rates = {"USD": 1.0, "EUR": 1.08, "GBP": 1.26}
    rate = rates.get(txn["currency"], 1.0)
    txn["amount_usd"] = round(float(txn["amount"]) * rate, 2)
    enricher_log.debug("enriched %s: %s %.2f -> USD %.2f",
                       txn["id"], txn["currency"], float(txn["amount"]), txn["amount_usd"])
    return txn

def process():
    logger.info("batch started")
    reader = csv.DictReader(io.StringIO(TRANSACTIONS.strip()))
    accepted = 0
    for row in reader:
        if validate(row):
            logger.info("accepted %s — $%s", enrich(row)["id"], enrich(row)["amount_usd"])
            accepted += 1
    logger.info("batch complete: %d accepted", accepted)

process()


2024-03-15T14:22:01 pipeline               INFO     batch started
2024-03-15T14:22:01 pipeline.validator     DEBUG    valid: TXN-001 amount=1500.00
2024-03-15T14:22:01 pipeline.enricher      DEBUG    enriched TXN-001: USD 1500.00 -> USD 1500.00
2024-03-15T14:22:01 pipeline               INFO     accepted TXN-001  $1500.0
2024-03-15T14:22:01 pipeline.validator     WARNING  non-positive amount -50.00 in TXN-002
2024-03-15T14:22:01 pipeline.validator     DEBUG    valid: TXN-003 amount=200.00
2024-03-15T14:22:01 pipeline.enricher      DEBUG    enriched TXN-003: GBP 200.00 -> USD 252.00
2024-03-15T14:22:01 pipeline               INFO     accepted TXN-003  $252.0
2024-03-15T14:22:01 pipeline.validator     ERROR    invalid amount 'abc' in TXN-004
2024-03-15T14:22:01 pipeline               INFO     batch complete: 2 accepted


The logger name in every line — pipeline.validator, pipeline.enricher — identifies the component without any extra text in the message. Setting logging.getLogger("pipeline.validator").setLevel(logging.WARNING) in production silences DEBUG output from the validator globally while leaving the enricher and pipeline loggers untouched. No if DEBUG: conditions, no per-function flags — the hierarchy handles it.


Handlers: StreamHandler and FileHandler

A handler decides where a log record goes. Adding a FileHandler sends records to a file simultaneously with the console. Adding a second FileHandler configured for WARNING and above creates a separate alerts file that the operations team can monitor independently.

Replace the entire content of pipeline.py with the following:

import csv
import io
import logging

def setup_logging():
    fmt = logging.Formatter(
        "%(asctime)s %(name)-22s %(levelname)-8s %(message)s",
        datefmt="%Y-%m-%dT%H:%M:%S",
    )
    console = logging.StreamHandler()
    console.setLevel(logging.INFO)
    console.setFormatter(fmt)

    full_log = logging.FileHandler("/home/coder/learning/pipeline.log", mode="w")
    full_log.setLevel(logging.DEBUG)
    full_log.setFormatter(fmt)

    alerts_log = logging.FileHandler("/home/coder/learning/pipeline_alerts.log", mode="w")
    alerts_log.setLevel(logging.WARNING)
    alerts_log.setFormatter(fmt)

    root = logging.getLogger()
    root.setLevel(logging.DEBUG)
    root.addHandler(console)
    root.addHandler(full_log)
    root.addHandler(alerts_log)

setup_logging()
logger        = logging.getLogger("pipeline")
validator_log = logging.getLogger("pipeline.validator")

TRANSACTIONS = """id,amount,currency,status
TXN-001,1500.00,USD,pending
TXN-002,-50.00,EUR,pending
TXN-004,abc,USD,pending
"""

def validate(txn):
    try:
        amount = float(txn["amount"])
    except ValueError:
        validator_log.error("invalid amount %r in %s", txn["amount"], txn["id"])
        return False
    if amount <= 0:
        validator_log.warning("non-positive amount in %s", txn["id"])
        return False
    validator_log.debug("valid: %s", txn["id"])
    return True

def process():
    logger.info("batch started")
    reader = csv.DictReader(io.StringIO(TRANSACTIONS.strip()))
    for row in reader:
        if validate(row):
            logger.info("accepted %s", row["id"])
    logger.info("batch complete")

process()

print("\n--- pipeline_alerts.log ---")
with open("/home/coder/learning/pipeline_alerts.log") as f:
    print(f.read(), end="")


2024-03-15T14:22:01 pipeline               INFO     batch started
2024-03-15T14:22:01 pipeline               INFO     accepted TXN-001
2024-03-15T14:22:01 pipeline.validator     WARNING  non-positive amount in TXN-002
2024-03-15T14:22:01 pipeline.validator     ERROR    invalid amount 'abc' in TXN-004
2024-03-15T14:22:01 pipeline               INFO     batch complete

--- pipeline_alerts.log ---
2024-03-15T14:22:01 pipeline.validator     WARNING  non-positive amount in TXN-002
2024-03-15T14:22:01 pipeline.validator     ERROR    invalid amount 'abc' in TXN-004


The console shows INFO and above. pipeline.log captures everything including DEBUG. pipeline_alerts.log contains only WARNING and ERROR lines. All three destinations share the same formatter.

Multiple handlers turn logging into a routing system: one pipeline, multiple destinations with independent level thresholds. The operations team monitors the alerts file for actionable failures without drowning in INFO noise. Adding a remote log handler — Datadog, Sentry, CloudWatch — is one addHandler call with no changes to any application code.


LoggerAdapter for Per-Transaction Context

Each log line currently includes the transaction ID embedded in the message text. This means grepping for TXN-003 works, but the ID is in an unstructured string position that changes between messages. LoggerAdapter wraps a logger and automatically injects contextual key-value pairs into every record it emits, without the calling code repeating the context in every message.

Replace the entire content of pipeline.py with the following:

import csv
import io
import logging

logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s [%(txn_id)-12s] %(levelname)-8s %(message)s",
    datefmt="%Y-%m-%dT%H:%M:%S",
)

TRANSACTIONS = """id,amount,currency,status
TXN-001,1500.00,USD,pending
TXN-002,-50.00,EUR,pending
TXN-004,abc,USD,pending
"""

def process_transaction(row: dict) -> None:
    log = logging.LoggerAdapter(
        logging.getLogger("pipeline"),
        {"txn_id": row["id"]},
    )
    log.debug("started: currency=%s amount=%s", row["currency"], row["amount"])
    try:
        amount = float(row["amount"])
    except ValueError:
        log.error("invalid amount field: %r", row["amount"])
        return
    if amount <= 0:
        log.warning("rejected: non-positive amount %.2f", amount)
        return
    rates = {"USD": 1.0, "EUR": 1.08, "GBP": 1.26}
    amount_usd = round(amount * rates.get(row["currency"], 1.0), 2)
    log.info("accepted: amount_usd=%.2f", amount_usd)

reader = csv.DictReader(io.StringIO(TRANSACTIONS.strip()))
for row in reader:
    process_transaction(row)


2024-03-15T14:22:01 [TXN-001     ] DEBUG    started: currency=USD amount=1500.00
2024-03-15T14:22:01 [TXN-001     ] INFO     accepted: amount_usd=1500.00
2024-03-15T14:22:01 [TXN-002     ] DEBUG    started: currency=EUR amount=-50.00
2024-03-15T14:22:01 [TXN-002     ] WARNING  rejected: non-positive amount -50.00
2024-03-15T14:22:01 [TXN-004     ] DEBUG    started: currency=USD amount=abc
2024-03-15T14:22:01 [TXN-004     ] ERROR    invalid amount field: 'abc'


[TXN-001] appears in every line for that transaction without process_transaction mentioning it in any log call after the adapter is created. The format string %(txn_id)s reads from the extra dict the adapter injects automatically. Every line for TXN-004 — startup, failure — is traceable as a unit.

LoggerAdapter separates "what context is active" from "what happened." Each call site logs the event; the adapter ensures the context travels with it. In a concurrent pipeline where multiple transactions are in flight simultaneously, each goroutine or thread creates its own adapter with its transaction ID, so every line is tagged correctly without any coordination between threads.


Structured Logging with a Custom JSON Formatter

Operations teams increasingly consume logs through aggregators — Elasticsearch, Datadog, Splunk — that parse key-value fields rather than free text. Emitting logs as JSON makes every field directly queryable without fragile regex patterns.

Replace the entire content of pipeline.py with the following:

import csv
import io
import json
import logging

class JsonFormatter(logging.Formatter):
    def format(self, record: logging.LogRecord) -> str:
        payload = {
            "ts":      self.formatTime(record, "%Y-%m-%dT%H:%M:%S"),
            "level":   record.levelname,
            "logger":  record.name,
            "message": record.getMessage(),
        }
        for key in ("txn_id", "currency", "amount_usd", "stage"):
            if hasattr(record, key):
                payload[key] = getattr(record, key)
        if record.exc_info:
            payload["exception"] = self.formatException(record.exc_info)
        return json.dumps(payload)

handler = logging.StreamHandler()
handler.setFormatter(JsonFormatter())
logging.getLogger().addHandler(handler)
logging.getLogger().setLevel(logging.DEBUG)

TRANSACTIONS = """id,amount,currency,status
TXN-001,1500.00,USD,pending
TXN-002,-50.00,EUR,pending
TXN-004,abc,USD,pending
"""

def process_transaction(row: dict) -> None:
    log = logging.LoggerAdapter(
        logging.getLogger("pipeline"),
        {"txn_id": row["id"]},
    )
    try:
        amount = float(row["amount"])
    except ValueError:
        log.error("invalid amount", extra={"stage": "validation"})
        return
    if amount <= 0:
        log.warning("non-positive amount", extra={"stage": "validation"})
        return
    rates = {"USD": 1.0, "EUR": 1.08}
    amount_usd = round(amount * rates.get(row["currency"], 1.0), 2)
    log.info("accepted", extra={"stage": "enrichment", "currency": row["currency"], "amount_usd": amount_usd})

reader = csv.DictReader(io.StringIO(TRANSACTIONS.strip()))
for row in reader:
    process_transaction(row)


{"ts": "2024-03-15T14:22:01", "level": "INFO", "logger": "pipeline", "message": "accepted", "txn_id": "TXN-001", "stage": "enrichment", "currency": "USD", "amount_usd": 1500.0}
{"ts": "2024-03-15T14:22:01", "level": "WARNING", "logger": "pipeline", "message": "non-positive amount", "txn_id": "TXN-002", "stage": "validation"}
{"ts": "2024-03-15T14:22:01", "level": "ERROR", "logger": "pipeline", "message": "invalid amount", "txn_id": "TXN-004", "stage": "validation"}


JsonFormatter subclasses logging.Formatter and overrides format to produce a JSON object. The base class provides formatTime and formatException. Custom fields — txn_id, currency, amount_usd, stage — are extracted from the record if present, whether injected by the adapter or by individual extra= calls.

JSON output makes every field queryable. A query for level:ERROR AND stage:validation surfaces exactly the records an engineer needs without any regex extraction. Switching from text to JSON in an existing pipeline is one line — swapping the formatter on the handler — with no changes to any logging call site.


Summary

The financial transaction pipeline built in this tutorial demonstrates Python's logging module as a production infrastructure layer rather than a debugging convenience:

  • logging.basicConfig configures the root logger for quick setup; for production, configure handlers explicitly using StreamHandler, FileHandler, and setFormatter to control destinations and level thresholds independently
  • Named loggers created with logging.getLogger("pipeline.component") participate in a dotted hierarchy where records propagate upward; setting a level on a child logger silences its verbose output without affecting any sibling or parent logger
  • A handler's level is independent of the logger's level — a record must exceed both thresholds to appear; use a WARNING-level FileHandler as an alerts file alongside a DEBUG-level FileHandler for the full audit log
  • logging.exception logs at ERROR and appends the current exception's traceback automatically; pass format arguments as positional arguments rather than pre-formatting the string to avoid interpolation cost when the level is suppressed
  • LoggerAdapter injects contextual key-value pairs into every record emitted through it, tagging all log lines for a transaction with a shared identifier without repeating it in every call
  • A custom Formatter subclass that overrides format to emit JSON makes every field queryable by log aggregators without changing any application logging call site

You need to be logged in to access the cloud lab.

Log in