Tutorial2 min readUpdated Mar 22, 2026

Bulk Price Checking with PriceFetch API

TL;DR

Use asyncio.Semaphore to limit concurrent PriceFetch API calls, handle errors gracefully, and process hundreds of URLs in minutes instead of hours.

Why Bulk Checking Needs a Pattern

Checking 500 product URLs one at a time takes forever — each PriceFetch API call takes a few seconds for the live scrape. But firing 500 requests simultaneously will hit rate limits and probably crash your script.

The sweet spot is controlled concurrency: send 5-10 requests in parallel, handle errors individually, and retry transient failures. This section shows you the pattern that works reliably at scale.

Try it yourself — 500 free API credits, no credit card required.

Start Free

Async HTTP Client Setup

We use `httpx.AsyncClient` with a semaphore to control concurrency. The semaphore limits how many requests are in-flight at once — set it to match PriceFetch's per-second rate limit (default 5 for most plans).

The client is created once and reused for all requests, which keeps TCP connections alive and reduces overhead.

python
import httpx
import asyncio
import csv
import os
from dataclasses import dataclass, asdict
from datetime import datetime

API_KEY = os.environ["PRICEFETCH_API_KEY"]
BASE_URL = "https://api.pricefetch.dev/v1/price"
MAX_CONCURRENT = 5  # Match your rate limit

@dataclass
class BulkResult:
    url: str
    price: float | None
    currency: str | None
    retailer: str | None
    in_stock: bool | None
    error: str | None

semaphore = asyncio.Semaphore(MAX_CONCURRENT)

async def fetch_one(client: httpx.AsyncClient, url: str) -> BulkResult:
    """Fetch price for one URL with concurrency control."""
    async with semaphore:
        try:
            resp = await client.get(
                BASE_URL,
                params={"url": url},
                headers={"X-API-Key": API_KEY},
                timeout=20.0,
            )
            data = resp.json()
            if data["success"]:
                d = data["data"]
                return BulkResult(url=url, price=d["price"], currency=d["currency"],
                                  retailer=d["retailer"], in_stock=d["in_stock"], error=None)
            return BulkResult(url=url, price=None, currency=None, retailer=None,
                              in_stock=None, error=data["error"]["code"])
        except httpx.TimeoutException:
            return BulkResult(url=url, price=None, currency=None, retailer=None,
                              in_stock=None, error="TIMEOUT")
        except Exception as e:
            return BulkResult(url=url, price=None, currency=None, retailer=None,
                              in_stock=None, error=str(e))

Retry Logic for Failed Requests

Some failures are transient — timeouts, rate limit hits, temporary server issues. Retrying these with exponential backoff usually works. But don't retry client errors like invalid URLs or unsupported retailers — those will fail every time.

The retry wrapper distinguishes between retryable and permanent errors. Rate limit responses (429) get a longer backoff because the API is explicitly telling you to slow down.

python
RETRYABLE_ERRORS = {"TIMEOUT", "SCRAPE_FAILED", "PAGE_LOAD_FAILED"}
MAX_RETRIES = 3

async def fetch_with_retry(client: httpx.AsyncClient, url: str) -> BulkResult:
    """Fetch with exponential backoff for transient errors."""
    for attempt in range(MAX_RETRIES):
        result = await fetch_one(client, url)
        if result.error is None:
            return result
        if result.error not in RETRYABLE_ERRORS:
            return result  # Permanent error, don't retry
        if attempt < MAX_RETRIES - 1:
            wait = 2 ** attempt  # 1s, 2s, 4s
            await asyncio.sleep(wait)
    return result  # Return last failed result

Processing the Full List

Load URLs from a CSV file, process them all concurrently (respecting the semaphore limit), and write results to an output CSV. Progress logging helps you monitor long-running batches.

For 500 URLs with MAX_CONCURRENT=5, expect about 15-20 minutes total. Each URL takes ~5-8 seconds for the live scrape, and we're processing 5 at a time.

python
async def bulk_check(input_file: str, output_file: str) -> None:
    """Check prices for all URLs in a CSV file."""
    # Load URLs
    with open(input_file) as f:
        urls = [row[0] for row in csv.reader(f) if row]
    print(f"Processing {len(urls)} URLs...")

    # Fetch all prices
    async with httpx.AsyncClient() as client:
        tasks = [fetch_with_retry(client, url) for url in urls]
        results: list[BulkResult] = []
        for i, coro in enumerate(asyncio.as_completed(tasks)):
            result = await coro
            results.append(result)
            if (i + 1) % 50 == 0:
                print(f"  Progress: {i + 1}/{len(urls)}")

    # Write results
    success = sum(1 for r in results if r.error is None)
    failed = len(results) - success
    print(f"Done: {success} succeeded, {failed} failed")

    with open(output_file, "w", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=["url", "price", "currency", "retailer", "in_stock", "error"])
        writer.writeheader()
        writer.writerows(asdict(r) for r in results)
    print(f"Results written to {output_file}")

if __name__ == "__main__":
    asyncio.run(bulk_check("urls.csv", "prices_output.csv"))

Optimizing Credit Usage

Each successful price fetch costs one credit. Failed requests due to server-side errors don't cost credits, but invalid URLs and unsupported retailers do consume a validation check.

To minimize costs: validate URLs before sending them to the API (check they match supported retailer domains), deduplicate your URL list, and cache results if you're checking the same products frequently. For recurring bulk checks, consider checking only products whose prices you expect to change.

Frequently asked questions

Related Retailers

Start fetching prices — 500 free credits

Sign up in 30 seconds. No credit card required. One credit per successful API call.