Use asyncio.Semaphore to limit concurrent PriceFetch API calls, handle errors gracefully, and process hundreds of URLs in minutes instead of hours.
Checking 500 product URLs one at a time takes forever — each PriceFetch API call takes a few seconds for the live scrape. But firing 500 requests simultaneously will hit rate limits and probably crash your script.
The sweet spot is controlled concurrency: send 5-10 requests in parallel, handle errors individually, and retry transient failures. This section shows you the pattern that works reliably at scale.
Try it yourself — 500 free API credits, no credit card required.
Start FreeWe use `httpx.AsyncClient` with a semaphore to control concurrency. The semaphore limits how many requests are in-flight at once — set it to match PriceFetch's per-second rate limit (default 5 for most plans).
The client is created once and reused for all requests, which keeps TCP connections alive and reduces overhead.
import httpx
import asyncio
import csv
import os
from dataclasses import dataclass, asdict
from datetime import datetime
API_KEY = os.environ["PRICEFETCH_API_KEY"]
BASE_URL = "https://api.pricefetch.dev/v1/price"
MAX_CONCURRENT = 5 # Match your rate limit
@dataclass
class BulkResult:
url: str
price: float | None
currency: str | None
retailer: str | None
in_stock: bool | None
error: str | None
semaphore = asyncio.Semaphore(MAX_CONCURRENT)
async def fetch_one(client: httpx.AsyncClient, url: str) -> BulkResult:
"""Fetch price for one URL with concurrency control."""
async with semaphore:
try:
resp = await client.get(
BASE_URL,
params={"url": url},
headers={"X-API-Key": API_KEY},
timeout=20.0,
)
data = resp.json()
if data["success"]:
d = data["data"]
return BulkResult(url=url, price=d["price"], currency=d["currency"],
retailer=d["retailer"], in_stock=d["in_stock"], error=None)
return BulkResult(url=url, price=None, currency=None, retailer=None,
in_stock=None, error=data["error"]["code"])
except httpx.TimeoutException:
return BulkResult(url=url, price=None, currency=None, retailer=None,
in_stock=None, error="TIMEOUT")
except Exception as e:
return BulkResult(url=url, price=None, currency=None, retailer=None,
in_stock=None, error=str(e))Some failures are transient — timeouts, rate limit hits, temporary server issues. Retrying these with exponential backoff usually works. But don't retry client errors like invalid URLs or unsupported retailers — those will fail every time.
The retry wrapper distinguishes between retryable and permanent errors. Rate limit responses (429) get a longer backoff because the API is explicitly telling you to slow down.
RETRYABLE_ERRORS = {"TIMEOUT", "SCRAPE_FAILED", "PAGE_LOAD_FAILED"}
MAX_RETRIES = 3
async def fetch_with_retry(client: httpx.AsyncClient, url: str) -> BulkResult:
"""Fetch with exponential backoff for transient errors."""
for attempt in range(MAX_RETRIES):
result = await fetch_one(client, url)
if result.error is None:
return result
if result.error not in RETRYABLE_ERRORS:
return result # Permanent error, don't retry
if attempt < MAX_RETRIES - 1:
wait = 2 ** attempt # 1s, 2s, 4s
await asyncio.sleep(wait)
return result # Return last failed resultLoad URLs from a CSV file, process them all concurrently (respecting the semaphore limit), and write results to an output CSV. Progress logging helps you monitor long-running batches.
For 500 URLs with MAX_CONCURRENT=5, expect about 15-20 minutes total. Each URL takes ~5-8 seconds for the live scrape, and we're processing 5 at a time.
async def bulk_check(input_file: str, output_file: str) -> None:
"""Check prices for all URLs in a CSV file."""
# Load URLs
with open(input_file) as f:
urls = [row[0] for row in csv.reader(f) if row]
print(f"Processing {len(urls)} URLs...")
# Fetch all prices
async with httpx.AsyncClient() as client:
tasks = [fetch_with_retry(client, url) for url in urls]
results: list[BulkResult] = []
for i, coro in enumerate(asyncio.as_completed(tasks)):
result = await coro
results.append(result)
if (i + 1) % 50 == 0:
print(f" Progress: {i + 1}/{len(urls)}")
# Write results
success = sum(1 for r in results if r.error is None)
failed = len(results) - success
print(f"Done: {success} succeeded, {failed} failed")
with open(output_file, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=["url", "price", "currency", "retailer", "in_stock", "error"])
writer.writeheader()
writer.writerows(asdict(r) for r in results)
print(f"Results written to {output_file}")
if __name__ == "__main__":
asyncio.run(bulk_check("urls.csv", "prices_output.csv"))Each successful price fetch costs one credit. Failed requests due to server-side errors don't cost credits, but invalid URLs and unsupported retailers do consume a validation check.
To minimize costs: validate URLs before sending them to the API (check they match supported retailer domains), deduplicate your URL list, and cache results if you're checking the same products frequently. For recurring bulk checks, consider checking only products whose prices you expect to change.
Sign up in 30 seconds. No credit card required. One credit per successful API call.