Python
Instrumenting a Financial Transaction Pipeline with Python's logging Module
Introduction
Every production Python service eventually needs more than print(). Print statements cannot be disabled selectively, cannot write to multiple destinations simultaneously, and carry no metadata — no timestamp, no severity level, no module name. When an exception surfaces in a pipeline that has been running overnight, the first thing an engineer needs is a log that tells them not just what went wrong but when, where in the call stack it happened, and with what transaction data. Without structured logging, that information either does not exist or must be reconstructed from fragments of console output.
Python's logging module has been part of the standard library since Python 2.3. It provides the full infrastructure: loggers organized in a dotted hierarchy, handlers that route messages to files or streams, formatters that control the output shape, and severity levels that let you silence development noise without touching production configuration. The difference between a pipeline that fails mysteriously and one that fails traceably is almost entirely in how logging is set up.
The failure mode this tutorial addresses is not "I didn't log anything." It is "I logged everything to stdout with print() and now I have fifty thousand lines of mixed messages with no way to filter by severity, isolate a single transaction ID across pipeline stages, or archive warnings separately from routine status lines." This tutorial builds a financial transaction processing pipeline from scratch, adding one logging capability at a time until the output is production-ready.
Background
The logging module is organized around four objects:
- Logger: The object your code calls. Created by name with
logging.getLogger("name"). Loggers exist in a dotted-name hierarchy —pipeline.validatoris a child ofpipeline, which is a child of the root logger. Log records propagate up the hierarchy by default. - Handler: The destination for records.
StreamHandlerwrites to stderr or stdout.FileHandlerwrites to a file. A single logger can have multiple handlers. - Formatter: Controls the string representation of a record. Configured on the handler. Can include timestamp, level name, logger name, message, and any custom fields.
- Level: A numeric threshold —
DEBUG(10),INFO(20),WARNING(30),ERROR(40),CRITICAL(50). A record must exceed both the logger's level and the handler's level to appear in output.
LogRecord is the object that flows between loggers and handlers. Beyond the message string, it carries %(name)s, %(levelname)s, %(asctime)s, %(filename)s, %(lineno)d, and any custom fields attached via extra=.
Practical Scenario
A fintech startup runs a nightly batch that processes thousands of payment transactions: validates each record's fields, applies currency conversion rates, and writes approved records to a ledger database. The pipeline reads from a CSV export and runs three sequential stages — validation, enrichment, ledger write — producing a summary report.
The current implementation uses print() for all status output. In development this is fine. In production, the batch runs overnight and its output is captured to a log file that the operations team reviews each morning. The team cannot distinguish validation warnings from database errors, cannot filter the file to show only failures, and cannot trace a specific transaction ID through the three pipeline stages because each stage uses a different print format.
When the overnight batch fails partway through, the team's first task is manually grepping a fifty-thousand-line file to find which transaction caused the failure and which stage it failed in. The team needs logging that emits structured lines with timestamps and transaction IDs, routes warnings and above to a separate alerts file, and can be silenced to INFO level in production without changing any pipeline code.
The Problem
Create the initial pipeline file:
touch pipeline.py
Run it using:
python3 pipeline.py
import csv
import io
TRANSACTIONS = """id,amount,currency,status
TXN-001,1500.00,USD,pending
TXN-002,-50.00,EUR,pending
TXN-003,200.00,GBP,pending
TXN-004,abc,USD,pending
TXN-005,750.00,JPY,pending
"""
def validate(txn):
if float(txn["amount"]) <= 0:
print(f"WARN: negative amount in {txn['id']}")
return False
return True
def enrich(txn):
rates = {"USD": 1.0, "EUR": 1.08, "GBP": 1.26, "JPY": 0.0067}
rate = rates.get(txn["currency"])
if rate is None:
print(f"ERROR: unknown currency {txn['currency']}")
return None
txn["amount_usd"] = round(float(txn["amount"]) * rate, 2)
return txn
def process():
reader = csv.DictReader(io.StringIO(TRANSACTIONS.strip()))
for row in reader:
try:
if not validate(row):
continue
enriched = enrich(row)
if enriched:
print(f"OK: processed {enriched['id']} — ${enriched['amount_usd']}")
except (ValueError, TypeError) as e:
print(f"FAILED: {row.get('id', 'unknown')} — {e}")
process()
WARN: negative amount in TXN-002
FAILED: TXN-004 — could not convert string to float: 'abc'
OK: processed TXN-001 — $1500.0
OK: processed TXN-003 — $252.0
OK: processed TXN-005 — $5.025
There is no timestamp, no severity level in a parseable column, and no module or function name. The WARN and ERROR prefixes are hand-typed strings that cannot be filtered programmatically. The exception for TXN-004 shows the message but no stack trace. Adding a file destination means wrapping every print with duplicate logic or redirecting stdout — neither of which lets warnings and errors go to a different file than INFO messages.
basicConfig and the Root Logger
Replace the entire content of pipeline.py with the following:
import csv
import io
import logging
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s %(levelname)-8s %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
TRANSACTIONS = """id,amount,currency,status
TXN-001,1500.00,USD,pending
TXN-002,-50.00,EUR,pending
TXN-003,200.00,GBP,pending
TXN-004,abc,USD,pending
TXN-005,750.00,JPY,pending
"""
def validate(txn):
try:
amount = float(txn["amount"])
except ValueError:
logging.error("invalid amount %r in %s", txn["amount"], txn["id"])
return False
if amount <= 0:
logging.warning("non-positive amount in %s", txn["id"])
return False
return True
def enrich(txn):
rates = {"USD": 1.0, "EUR": 1.08, "GBP": 1.26, "JPY": 0.0067}
txn["amount_usd"] = round(float(txn["amount"]) * rates[txn["currency"]], 2)
return txn
def process():
reader = csv.DictReader(io.StringIO(TRANSACTIONS.strip()))
for row in reader:
try:
if not validate(row):
continue
enriched = enrich(row)
logging.info("accepted %s — $%s", enriched["id"], enriched["amount_usd"])
except Exception:
logging.exception("unhandled error on %s", row.get("id", "?"))
process()
2024-03-15T14:22:01 WARNING non-positive amount in TXN-002
2024-03-15T14:22:01 ERROR invalid amount 'abc' in TXN-004
2024-03-15T14:22:01 INFO accepted TXN-001 — $1500.0
2024-03-15T14:22:01 INFO accepted TXN-003 — $252.0
2024-03-15T14:22:01 INFO accepted TXN-005 — $5.025
logging.exception logs at ERROR level and automatically appends the current exception's traceback — no extra code needed. The %s format arguments are passed separately from the message string so logging can delay string interpolation until the record is actually emitted, avoiding formatting cost when the level is suppressed.
Every line now has a parseable timestamp and a standardized severity field. Grepping for WARNING or ERROR works reliably because the level is in a fixed-width column. One change to the format string updates every log line simultaneously — not every print call.
Note: logging.basicConfig only takes effect if no handlers have been configured on the root logger yet. If any import configures logging before your code does, basicConfig's settings are silently ignored. In production code, configure handlers explicitly rather than relying on basicConfig.
Named Loggers and the Logger Hierarchy
The root logger is global — anything that calls logging.warning() goes through it. Named loggers, created with logging.getLogger("name"), scope log output to a component and participate in the hierarchy: a logger named pipeline.validator propagates records upward to pipeline, then to the root logger. This hierarchy enables fine-grained level control per subsystem without touching any handler code.
Replace the entire content of pipeline.py with the following:
import csv
import io
import logging
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s %(name)-22s %(levelname)-8s %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
logger = logging.getLogger("pipeline")
validator_log = logging.getLogger("pipeline.validator")
enricher_log = logging.getLogger("pipeline.enricher")
TRANSACTIONS = """id,amount,currency,status
TXN-001,1500.00,USD,pending
TXN-002,-50.00,EUR,pending
TXN-003,200.00,GBP,pending
TXN-004,abc,USD,pending
"""
def validate(txn):
try:
amount = float(txn["amount"])
except ValueError:
validator_log.error("invalid amount %r in %s", txn["amount"], txn["id"])
return False
if amount <= 0:
validator_log.warning("non-positive amount %.2f in %s", amount, txn["id"])
return False
validator_log.debug("valid: %s amount=%.2f", txn["id"], amount)
return True
def enrich(txn):
rates = {"USD": 1.0, "EUR": 1.08, "GBP": 1.26}
rate = rates.get(txn["currency"], 1.0)
txn["amount_usd"] = round(float(txn["amount"]) * rate, 2)
enricher_log.debug("enriched %s: %s %.2f -> USD %.2f",
txn["id"], txn["currency"], float(txn["amount"]), txn["amount_usd"])
return txn
def process():
logger.info("batch started")
reader = csv.DictReader(io.StringIO(TRANSACTIONS.strip()))
accepted = 0
for row in reader:
if validate(row):
logger.info("accepted %s — $%s", enrich(row)["id"], enrich(row)["amount_usd"])
accepted += 1
logger.info("batch complete: %d accepted", accepted)
process()
2024-03-15T14:22:01 pipeline INFO batch started
2024-03-15T14:22:01 pipeline.validator DEBUG valid: TXN-001 amount=1500.00
2024-03-15T14:22:01 pipeline.enricher DEBUG enriched TXN-001: USD 1500.00 -> USD 1500.00
2024-03-15T14:22:01 pipeline INFO accepted TXN-001 — $1500.0
2024-03-15T14:22:01 pipeline.validator WARNING non-positive amount -50.00 in TXN-002
2024-03-15T14:22:01 pipeline.validator DEBUG valid: TXN-003 amount=200.00
2024-03-15T14:22:01 pipeline.enricher DEBUG enriched TXN-003: GBP 200.00 -> USD 252.00
2024-03-15T14:22:01 pipeline INFO accepted TXN-003 — $252.0
2024-03-15T14:22:01 pipeline.validator ERROR invalid amount 'abc' in TXN-004
2024-03-15T14:22:01 pipeline INFO batch complete: 2 accepted
The logger name in every line — pipeline.validator, pipeline.enricher — identifies the component without any extra text in the message. Setting logging.getLogger("pipeline.validator").setLevel(logging.WARNING) in production silences DEBUG output from the validator globally while leaving the enricher and pipeline loggers untouched. No if DEBUG: conditions, no per-function flags — the hierarchy handles it.
Handlers: StreamHandler and FileHandler
A handler decides where a log record goes. Adding a FileHandler sends records to a file simultaneously with the console. Adding a second FileHandler configured for WARNING and above creates a separate alerts file that the operations team can monitor independently.
Replace the entire content of pipeline.py with the following:
import csv
import io
import logging
def setup_logging():
fmt = logging.Formatter(
"%(asctime)s %(name)-22s %(levelname)-8s %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(fmt)
full_log = logging.FileHandler("/home/coder/learning/pipeline.log", mode="w")
full_log.setLevel(logging.DEBUG)
full_log.setFormatter(fmt)
alerts_log = logging.FileHandler("/home/coder/learning/pipeline_alerts.log", mode="w")
alerts_log.setLevel(logging.WARNING)
alerts_log.setFormatter(fmt)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(console)
root.addHandler(full_log)
root.addHandler(alerts_log)
setup_logging()
logger = logging.getLogger("pipeline")
validator_log = logging.getLogger("pipeline.validator")
TRANSACTIONS = """id,amount,currency,status
TXN-001,1500.00,USD,pending
TXN-002,-50.00,EUR,pending
TXN-004,abc,USD,pending
"""
def validate(txn):
try:
amount = float(txn["amount"])
except ValueError:
validator_log.error("invalid amount %r in %s", txn["amount"], txn["id"])
return False
if amount <= 0:
validator_log.warning("non-positive amount in %s", txn["id"])
return False
validator_log.debug("valid: %s", txn["id"])
return True
def process():
logger.info("batch started")
reader = csv.DictReader(io.StringIO(TRANSACTIONS.strip()))
for row in reader:
if validate(row):
logger.info("accepted %s", row["id"])
logger.info("batch complete")
process()
print("\n--- pipeline_alerts.log ---")
with open("/home/coder/learning/pipeline_alerts.log") as f:
print(f.read(), end="")
2024-03-15T14:22:01 pipeline INFO batch started
2024-03-15T14:22:01 pipeline INFO accepted TXN-001
2024-03-15T14:22:01 pipeline.validator WARNING non-positive amount in TXN-002
2024-03-15T14:22:01 pipeline.validator ERROR invalid amount 'abc' in TXN-004
2024-03-15T14:22:01 pipeline INFO batch complete
--- pipeline_alerts.log ---
2024-03-15T14:22:01 pipeline.validator WARNING non-positive amount in TXN-002
2024-03-15T14:22:01 pipeline.validator ERROR invalid amount 'abc' in TXN-004
The console shows INFO and above. pipeline.log captures everything including DEBUG. pipeline_alerts.log contains only WARNING and ERROR lines. All three destinations share the same formatter.
Multiple handlers turn logging into a routing system: one pipeline, multiple destinations with independent level thresholds. The operations team monitors the alerts file for actionable failures without drowning in INFO noise. Adding a remote log handler — Datadog, Sentry, CloudWatch — is one addHandler call with no changes to any application code.
LoggerAdapter for Per-Transaction Context
Each log line currently includes the transaction ID embedded in the message text. This means grepping for TXN-003 works, but the ID is in an unstructured string position that changes between messages. LoggerAdapter wraps a logger and automatically injects contextual key-value pairs into every record it emits, without the calling code repeating the context in every message.
Replace the entire content of pipeline.py with the following:
import csv
import io
import logging
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(txn_id)-12s] %(levelname)-8s %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
TRANSACTIONS = """id,amount,currency,status
TXN-001,1500.00,USD,pending
TXN-002,-50.00,EUR,pending
TXN-004,abc,USD,pending
"""
def process_transaction(row: dict) -> None:
log = logging.LoggerAdapter(
logging.getLogger("pipeline"),
{"txn_id": row["id"]},
)
log.debug("started: currency=%s amount=%s", row["currency"], row["amount"])
try:
amount = float(row["amount"])
except ValueError:
log.error("invalid amount field: %r", row["amount"])
return
if amount <= 0:
log.warning("rejected: non-positive amount %.2f", amount)
return
rates = {"USD": 1.0, "EUR": 1.08, "GBP": 1.26}
amount_usd = round(amount * rates.get(row["currency"], 1.0), 2)
log.info("accepted: amount_usd=%.2f", amount_usd)
reader = csv.DictReader(io.StringIO(TRANSACTIONS.strip()))
for row in reader:
process_transaction(row)
2024-03-15T14:22:01 [TXN-001 ] DEBUG started: currency=USD amount=1500.00
2024-03-15T14:22:01 [TXN-001 ] INFO accepted: amount_usd=1500.00
2024-03-15T14:22:01 [TXN-002 ] DEBUG started: currency=EUR amount=-50.00
2024-03-15T14:22:01 [TXN-002 ] WARNING rejected: non-positive amount -50.00
2024-03-15T14:22:01 [TXN-004 ] DEBUG started: currency=USD amount=abc
2024-03-15T14:22:01 [TXN-004 ] ERROR invalid amount field: 'abc'
[TXN-001] appears in every line for that transaction without process_transaction mentioning it in any log call after the adapter is created. The format string %(txn_id)s reads from the extra dict the adapter injects automatically. Every line for TXN-004 — startup, failure — is traceable as a unit.
LoggerAdapter separates "what context is active" from "what happened." Each call site logs the event; the adapter ensures the context travels with it. In a concurrent pipeline where multiple transactions are in flight simultaneously, each goroutine or thread creates its own adapter with its transaction ID, so every line is tagged correctly without any coordination between threads.
Structured Logging with a Custom JSON Formatter
Operations teams increasingly consume logs through aggregators — Elasticsearch, Datadog, Splunk — that parse key-value fields rather than free text. Emitting logs as JSON makes every field directly queryable without fragile regex patterns.
Replace the entire content of pipeline.py with the following:
import csv
import io
import json
import logging
class JsonFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
payload = {
"ts": self.formatTime(record, "%Y-%m-%dT%H:%M:%S"),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}
for key in ("txn_id", "currency", "amount_usd", "stage"):
if hasattr(record, key):
payload[key] = getattr(record, key)
if record.exc_info:
payload["exception"] = self.formatException(record.exc_info)
return json.dumps(payload)
handler = logging.StreamHandler()
handler.setFormatter(JsonFormatter())
logging.getLogger().addHandler(handler)
logging.getLogger().setLevel(logging.DEBUG)
TRANSACTIONS = """id,amount,currency,status
TXN-001,1500.00,USD,pending
TXN-002,-50.00,EUR,pending
TXN-004,abc,USD,pending
"""
def process_transaction(row: dict) -> None:
log = logging.LoggerAdapter(
logging.getLogger("pipeline"),
{"txn_id": row["id"]},
)
try:
amount = float(row["amount"])
except ValueError:
log.error("invalid amount", extra={"stage": "validation"})
return
if amount <= 0:
log.warning("non-positive amount", extra={"stage": "validation"})
return
rates = {"USD": 1.0, "EUR": 1.08}
amount_usd = round(amount * rates.get(row["currency"], 1.0), 2)
log.info("accepted", extra={"stage": "enrichment", "currency": row["currency"], "amount_usd": amount_usd})
reader = csv.DictReader(io.StringIO(TRANSACTIONS.strip()))
for row in reader:
process_transaction(row)
{"ts": "2024-03-15T14:22:01", "level": "INFO", "logger": "pipeline", "message": "accepted", "txn_id": "TXN-001", "stage": "enrichment", "currency": "USD", "amount_usd": 1500.0}
{"ts": "2024-03-15T14:22:01", "level": "WARNING", "logger": "pipeline", "message": "non-positive amount", "txn_id": "TXN-002", "stage": "validation"}
{"ts": "2024-03-15T14:22:01", "level": "ERROR", "logger": "pipeline", "message": "invalid amount", "txn_id": "TXN-004", "stage": "validation"}
JsonFormatter subclasses logging.Formatter and overrides format to produce a JSON object. The base class provides formatTime and formatException. Custom fields — txn_id, currency, amount_usd, stage — are extracted from the record if present, whether injected by the adapter or by individual extra= calls.
JSON output makes every field queryable. A query for level:ERROR AND stage:validation surfaces exactly the records an engineer needs without any regex extraction. Switching from text to JSON in an existing pipeline is one line — swapping the formatter on the handler — with no changes to any logging call site.
Summary
The financial transaction pipeline built in this tutorial demonstrates Python's logging module as a production infrastructure layer rather than a debugging convenience:
logging.basicConfigconfigures the root logger for quick setup; for production, configure handlers explicitly usingStreamHandler,FileHandler, andsetFormatterto control destinations and level thresholds independently- Named loggers created with
logging.getLogger("pipeline.component")participate in a dotted hierarchy where records propagate upward; setting a level on a child logger silences its verbose output without affecting any sibling or parent logger - A handler's level is independent of the logger's level — a record must exceed both thresholds to appear; use a WARNING-level
FileHandleras an alerts file alongside a DEBUG-levelFileHandlerfor the full audit log logging.exceptionlogs at ERROR and appends the current exception's traceback automatically; pass format arguments as positional arguments rather than pre-formatting the string to avoid interpolation cost when the level is suppressedLoggerAdapterinjects contextual key-value pairs into every record emitted through it, tagging all log lines for a transaction with a shared identifier without repeating it in every call- A custom
Formattersubclass that overridesformatto emit JSON makes every field queryable by log aggregators without changing any application logging call site