Python
Concurrent I/O with Python's asyncio and async/await
Introduction
A service that fetches data from three independent REST APIs and waits for each to respond before starting the next is wasting most of its runtime. The APIs don't know about each other and don't need to run in sequence — yet the synchronous version stalls on each network round-trip in turn. For two-second APIs, that's six seconds of latency where the CPU sits idle.
Python's asyncio library solves this without threads or multiprocessing. It runs a single-threaded event loop that suspends a coroutine the moment it would block on I/O and immediately switches to another coroutine that is ready to proceed. From the program's perspective, all three requests are "in flight" simultaneously. From the OS's perspective, a single thread manages them all. The result is latency equal to the slowest single request, not the sum of all requests.
The cost is a new execution model. async def functions are not called the way normal functions are — they return a coroutine object that must be awaited or scheduled. Understanding when to await directly, when to launch concurrent tasks, and how to enforce deadlines is the difference between asyncio that works and asyncio that behaves exactly like synchronous code with extra syntax.
This tutorial builds a market data service that fetches prices from three independent APIs, merges the results into a consolidated snapshot, and writes it to disk. Each section introduces one piece of the asyncio machinery: the event loop, async/await, asyncio.gather, asyncio.create_task, deadline enforcement with asyncio.timeout, and file I/O with aiofiles.
Background
The event loop is the scheduler at the core of asyncio. It maintains a queue of coroutines and callbacks. When a coroutine hits an await expression on an I/O operation, the loop suspends it, registers a callback for when the I/O completes, and runs other ready coroutines in the meantime. There is no blocking: the loop is always doing useful work.
Concurrency vs. parallelism — asyncio provides concurrency, not parallelism. One thread runs one coroutine at a time, but many coroutines can be alive simultaneously because each one voluntarily yields control at await points. CPU-bound work (computation, not I/O) is never suspended and blocks the entire loop for its duration. asyncio is the right tool for I/O-bound services; multiprocessing is the right tool for CPU-bound work.
Coroutines and awaitables — an async def function is a coroutine function. Calling it returns a coroutine object; it does not execute. Execution begins when the coroutine is awaited or scheduled. An awaitable is anything that can follow await: coroutines, Task objects, and Future objects all qualify.
Key building blocks:
asyncio.run(coro)— creates a fresh event loop, runs one coroutine to completion, then closes the loop. The standard entry point for async programs.await coro— suspends the current coroutine untilcorocompletes. Coroutines run sequentially relative to each other when awaited directly.asyncio.gather(*coros)— schedules all coroutines concurrently and waits for all to complete. Returns results in the same order as the inputs.asyncio.create_task(coro)— schedules a coroutine as aTaskthat runs concurrently with the current coroutine. Returns immediately with aTaskhandle.asyncio.sleep(seconds)— suspends the current coroutine for a duration without blocking the event loop. The standard stand-in for real network I/O in examples.asyncio.timeout(seconds)— a context manager that raisesTimeoutErrorif its body takes longer than the deadline.
Practical Scenario
A market data service aggregates current asset prices from three independent REST APIs: one for equities, one for commodities, and one for foreign exchange rates. Each API is operated by a different vendor with its own latency profile. The service polls all three on a fixed schedule and writes a consolidated snapshot to disk for downstream consumers — risk engines, trading algorithms, and dashboards — to read.
When the service fetches APIs sequentially, the snapshot latency equals the sum of all three API response times. If the equity API takes 1.2 seconds, commodities takes 0.9 seconds, and forex takes 1.1 seconds, every snapshot cycle takes 3.2 seconds. Downstream consumers read stale data for most of that window. When any one API slows down — a common occurrence in high-traffic market hours — the entire pipeline degrades.
The service also needs a safety valve: if an API has not responded within a hard deadline, the snapshot should proceed with the data it has and flag the missing source, rather than blocking indefinitely on a vendor that may be down. Without deadline enforcement, a hung API stalls every downstream consumer until the connection finally times out at the OS level.
The Problem
A sequential version fetches each API one at a time. Total latency is the sum of all three response times.
Create a new file:
touch market_data.py
Run it using:
python3 market_data.py
import time
def fetch_equities():
print(" Fetching equities...")
time.sleep(1.2)
return {"AAPL": 185.20, "MSFT": 378.85, "GOOGL": 141.50}
def fetch_commodities():
print(" Fetching commodities...")
time.sleep(0.9)
return {"GOLD": 2345.10, "OIL": 78.42, "SILVER": 27.85}
def fetch_forex():
print(" Fetching forex...")
time.sleep(1.1)
return {"EUR/USD": 1.0873, "GBP/USD": 1.2654, "USD/JPY": 149.82}
def build_snapshot():
start = time.perf_counter()
equities = fetch_equities()
commodities = fetch_commodities()
forex = fetch_forex()
snapshot = {"equities": equities, "commodities": commodities, "forex": forex}
elapsed = time.perf_counter() - start
print(f"\nSnapshot ready in {elapsed:.2f}s")
print(f"Assets captured: {sum(len(v) for v in snapshot.values())}")
build_snapshot()
Fetching equities...
Fetching commodities...
Fetching forex...
Snapshot ready in 3.20s
Assets captured: 9
Total latency is 3.20 seconds — the sum of 1.2 + 0.9 + 1.1. Every cycle, the service blocks on each API in turn. When the equity feed slows to 3 seconds during market open, the entire snapshot takes over 5 seconds, and downstream consumers read data that is already stale before it arrives. There is nothing wrong with the data — the problem is the sequencing.
async def and await: Writing Coroutines
The first step is converting the fetcher functions to coroutines. Replace time.sleep with asyncio.sleep — this is the only change needed to make the sleeps non-blocking. await asyncio.sleep(n) suspends the current coroutine for n seconds while the event loop remains free to run other work.
Replace the entire content of market_data.py with the following:
import asyncio
import time
async def fetch_equities():
print(" Fetching equities...")
await asyncio.sleep(1.2)
return {"AAPL": 185.20, "MSFT": 378.85, "GOOGL": 141.50}
async def fetch_commodities():
print(" Fetching commodities...")
await asyncio.sleep(0.9)
return {"GOLD": 2345.10, "OIL": 78.42, "SILVER": 27.85}
async def fetch_forex():
print(" Fetching forex...")
await asyncio.sleep(1.1)
return {"EUR/USD": 1.0873, "GBP/USD": 1.2654, "USD/JPY": 149.82}
async def build_snapshot():
start = time.perf_counter()
equities = await fetch_equities()
commodities = await fetch_commodities()
forex = await fetch_forex()
snapshot = {"equities": equities, "commodities": commodities, "forex": forex}
elapsed = time.perf_counter() - start
print(f"\nSnapshot ready in {elapsed:.2f}s")
print(f"Assets captured: {sum(len(v) for v in snapshot.values())}")
asyncio.run(build_snapshot())
Fetching equities...
Fetching commodities...
Fetching forex...
Snapshot ready in 3.20s
Assets captured: 9
The output is unchanged — still 3.20 seconds. await fetch_equities() suspends build_snapshot until fetch_equities finishes before moving to fetch_commodities. The fetches run sequentially even though they are coroutines, because each one is awaited directly before the next is started. Coroutines are concurrent only when scheduled concurrently.
asyncio.run is the standard entry point: it creates a new event loop, runs the coroutine, closes the loop, and returns the result. It replaces the older loop.run_until_complete pattern.
The code is now written in the asyncio model, which means the next step — making the fetches actually concurrent — requires changing only the scheduling call, not the fetcher functions themselves. Every await is a voluntary yield point: the event loop can run other coroutines while the current one waits.
asyncio.gather: Running Coroutines Concurrently
asyncio.gather schedules all passed coroutines at the same time and waits for all of them to complete. Total latency drops from the sum to the maximum — the slowest single request.
Replace build_snapshot with the following:
async def build_snapshot():
start = time.perf_counter()
equities, commodities, forex = await asyncio.gather(
fetch_equities(),
fetch_commodities(),
fetch_forex(),
)
snapshot = {"equities": equities, "commodities": commodities, "forex": forex}
elapsed = time.perf_counter() - start
print(f"\nSnapshot ready in {elapsed:.2f}s")
print(f"Assets captured: {sum(len(v) for v in snapshot.values())}")
asyncio.run(build_snapshot())
Fetching equities...
Fetching commodities...
Fetching forex...
Snapshot ready in 1.20s
Assets captured: 9
Total time drops from 3.20 seconds to 1.20 seconds — the duration of the slowest single fetch. All three print statements appear before any sleep completes because all three coroutines start immediately and reach their await asyncio.sleep points almost simultaneously. gather returns results in the same order as the inputs, regardless of completion order, so unpacking directly into equities, commodities, forex is safe.
Three independent I/O calls that previously took 3.20 seconds now take 1.20 seconds with a one-line change. gather is the right tool when all coroutines must complete before the program can continue. The results are delivered in input order, which makes the unpacking unambiguous regardless of which API responded first.
Note: By default, if any coroutine passed to gather raises an exception, gather cancels the remaining coroutines and re-raises the exception. Pass return_exceptions=True to collect exceptions as results instead, allowing partial success.
asyncio.create_task: Fire-and-Forget Tasks
asyncio.gather waits for all coroutines before proceeding. Sometimes a coroutine should be launched and allowed to run in the background while the current function continues with other work. asyncio.create_task schedules a coroutine as a Task and returns a handle immediately — the task starts running at the next await point.
Add a record_snapshot_event coroutine that simulates writing an audit event to a logging service, and launch it as a background task while the snapshot is assembled. Replace build_snapshot with the following:
async def record_snapshot_event(asset_count: int):
await asyncio.sleep(0.3)
print(f" [audit] Snapshot event recorded: {asset_count} assets")
async def build_snapshot():
start = time.perf_counter()
equities, commodities, forex = await asyncio.gather(
fetch_equities(),
fetch_commodities(),
fetch_forex(),
)
snapshot = {"equities": equities, "commodities": commodities, "forex": forex}
asset_count = sum(len(v) for v in snapshot.values())
audit_task = asyncio.create_task(record_snapshot_event(asset_count))
elapsed = time.perf_counter() - start
print(f"\nSnapshot ready in {elapsed:.2f}s")
print(f"Assets captured: {asset_count}")
await audit_task
asyncio.run(build_snapshot())
Fetching equities...
Fetching commodities...
Fetching forex...
Snapshot ready in 1.20s
Assets captured: 9
[audit] Snapshot event recorded: 9 assets
create_task schedules record_snapshot_event immediately, but the current coroutine continues without waiting for it. The snapshot result is printed before the audit event completes. The final await audit_task ensures the event is recorded before build_snapshot returns — useful when the task must complete before the caller proceeds, but its result is not needed at the point of creation.
A background task runs concurrently with the current coroutine from the moment it is created. Unlike gather, which blocks until all coroutines finish, create_task lets the current function proceed while the task runs. This is the correct pattern for fire-and-forget work — audit logging, cache warming, telemetry — that must not delay the critical path.
Note: If a Task raises an exception and the task object is never awaited, Python will print an "unhandled exception in task" warning to stderr. Always await or attach a callback to tasks that might fail.
asyncio.timeout: Enforcing Deadlines
A vendor API that stops responding should not block the snapshot indefinitely. asyncio.timeout is a context manager that raises TimeoutError if its body does not complete within the specified number of seconds.
Add a slow forex fetcher to simulate a degraded API, and wrap each fetch in an individual timeout. Replace fetch_forex and build_snapshot with the following:
async def fetch_forex():
print(" Fetching forex...")
await asyncio.sleep(3.5)
return {"EUR/USD": 1.0873, "GBP/USD": 1.2654, "USD/JPY": 149.82}
async def fetch_with_timeout(coro, name: str, deadline: float):
try:
async with asyncio.timeout(deadline):
return await coro
except TimeoutError:
print(f" [warning] {name} timed out after {deadline}s — skipping")
return {}
async def build_snapshot():
start = time.perf_counter()
equities, commodities, forex = await asyncio.gather(
fetch_with_timeout(fetch_equities(), "equities", 2.0),
fetch_with_timeout(fetch_commodities(), "commodities", 2.0),
fetch_with_timeout(fetch_forex(), "forex", 2.0),
)
snapshot = {"equities": equities, "commodities": commodities, "forex": forex}
asset_count = sum(len(v) for v in snapshot.values())
audit_task = asyncio.create_task(record_snapshot_event(asset_count))
elapsed = time.perf_counter() - start
print(f"\nSnapshot ready in {elapsed:.2f}s")
print(f"Assets captured: {asset_count} (forex degraded)")
await audit_task
asyncio.run(build_snapshot())
Fetching equities...
Fetching commodities...
Fetching forex...
[warning] forex timed out after 2.0s — skipping
Snapshot ready in 2.00s
Assets captured: 6 (forex degraded)
[audit] Snapshot event recorded: 6 assets
The snapshot completes in 2.00 seconds — the deadline — rather than 3.50 seconds. The forex fetch was cancelled at the timeout boundary. fetch_with_timeout catches TimeoutError and returns an empty dict, so the pipeline continues with partial data. The audit task records the degraded count.
Without deadline enforcement, a slow or hung vendor API blocks the event loop's gather for as long as the OS connection timeout allows — often 30 seconds or more. Wrapping each source in a per-source timeout means the pipeline's worst-case latency is bounded by the deadline, not by the slowest vendor. Partial results are far more useful to downstream consumers than a full stall.
Note: asyncio.timeout was added in Python 3.11. On earlier versions, use asyncio.wait_for(coro, timeout) for the same effect. asyncio.timeout is preferred when the deadline applies to a block of code rather than a single coroutine.
aiofiles: Non-Blocking File I/O
Writing the snapshot to disk with the built-in open() is a blocking call — it suspends the entire event loop for the duration of the write. For small files the pause is negligible, but in a high-frequency pipeline this adds measurable jitter. aiofiles wraps file operations in thread-pool executors so await-ing them yields control back to the event loop.
Add a write_snapshot coroutine and call it after building the snapshot. Replace build_snapshot with the following:
import json
import aiofiles
SNAPSHOT_PATH = "/home/coder/learning/market_snapshot.json"
async def write_snapshot(snapshot: dict):
async with aiofiles.open(SNAPSHOT_PATH, "w") as f:
await f.write(json.dumps(snapshot, indent=2))
print(f" Snapshot written to {SNAPSHOT_PATH}")
async def build_snapshot():
start = time.perf_counter()
equities, commodities, forex = await asyncio.gather(
fetch_with_timeout(fetch_equities(), "equities", 2.0),
fetch_with_timeout(fetch_commodities(), "commodities", 2.0),
fetch_with_timeout(fetch_forex(), "forex", 2.0),
)
snapshot = {"equities": equities, "commodities": commodities, "forex": forex}
asset_count = sum(len(v) for v in snapshot.values())
audit_task = asyncio.create_task(record_snapshot_event(asset_count))
await write_snapshot(snapshot)
elapsed = time.perf_counter() - start
print(f"\nSnapshot ready in {elapsed:.2f}s")
print(f"Assets captured: {asset_count} (forex degraded)")
await audit_task
asyncio.run(build_snapshot())
Fetching equities...
Fetching commodities...
Fetching forex...
[warning] forex timed out after 2.0s — skipping
Snapshot written to /home/coder/learning/market_snapshot.json
Snapshot ready in 2.00s
Assets captured: 6 (forex degraded)
[audit] Snapshot event recorded: 6 assets
aiofiles.open is used as an async context manager — async with rather than with. Inside the block, await f.write(...) dispatches the write to a thread pool and suspends the current coroutine until it completes, keeping the event loop free. The file is closed automatically when the async with block exits.
Replacing open() with aiofiles.open() keeps file I/O off the event loop thread, preserving the non-blocking guarantee for every other coroutine that may be running. A pipeline that writes many snapshots per second — or writes large files — will see measurable improvement. More importantly, the pattern is consistent: all I/O uses await, and no call ever blocks the loop silently.
Summary
Python's asyncio library gives I/O-bound services concurrency without threads or processes, by running coroutines on a single-threaded event loop that suspends and resumes work at await points. This tutorial built a market data aggregation service across six patterns:
async defmarks a function as a coroutine function; calling it returns a coroutine object that does not execute until awaited or scheduled — the event loop runs the coroutine, not the callasyncio.run(coro)is the standard entry point: it creates a fresh event loop, runs one top-level coroutine to completion, and closes the loopawait cororuns coroutines sequentially relative to each other; to achieve concurrency, coroutines must be scheduled together rather than awaited one at a timeasyncio.gather(*coros)schedules all coroutines concurrently and collects results in input order — total latency equals the slowest single coroutine, not the sumasyncio.create_task(coro)schedules a coroutine as a background task and returns immediately — use it for work that must not block the critical path but must still complete before the program exitsasyncio.timeout(seconds)raisesTimeoutErrorif its body exceeds the deadline — wrapping each I/O source independently bounds the pipeline's worst-case latency to the chosen deadlineasyncio.sleep(n)is the non-blocking substitute fortime.sleep(n)in async code; usingtime.sleepinside a coroutine blocks the entire event loop for all concurrent tasksaiofiles.openprovides async file I/O through an async context manager, keeping disk writes off the event loop thread and consistent with theawait-everything convention