Use Case2 min readUpdated Mar 22, 2026

Competitor Price Tracking with PriceFetch

TL;DR

Build a competitor tracker: store URLs in a database, run a scheduled fetcher against PriceFetch, diff prices against previous checks, and alert on changes.

The Price Tracking Pipeline

Competitor price tracking is a data pipeline with three stages: collect, compare, and act.

**Collect** — call PriceFetch for each competitor URL on a schedule. Store every result with a timestamp. This is your raw data layer.

**Compare** — for each new price point, compare against the previous value. Calculate the delta as both absolute and percentage change. Flag significant moves.

**Act** — route significant price changes to the right destination. That could be a Slack channel, a pricing dashboard, an automated repricing system, or all three.

The key insight: separate data collection from decision-making. Collect everything, decide later what matters. You'll want the historical data even if you don't act on every change today.

Building the Tracker

Here's a complete tracker that checks competitor URLs, detects price changes, and sends alerts. This runs as a scheduled job — trigger it with cron, a cloud scheduler, or a task queue.

python
import requests
import json
from datetime import datetime

API_KEY = "pf_live_abc123"
SLACK_WEBHOOK = "https://hooks.slack.com/services/T.../B.../xxx"

# In production, load from your database
competitors = [
    {"sku": "WIDGET-100", "name": "Blue Widget", "url": "https://www.amazon.com/dp/B0EXAMPLE1", "last_price": 29.99},
    {"sku": "WIDGET-200", "name": "Red Widget", "url": "https://www.walmart.com/ip/123456", "last_price": 24.99},
]

def fetch_price(url: str) -> dict | None:
    resp = requests.get(
        "https://api.pricefetch.dev/v1/price",
        params={"url": url},
        headers={"X-API-Key": API_KEY},
        timeout=30,
    )
    data = resp.json()
    return data["data"] if data["success"] else None

def check_competitors():
    changes = []
    for comp in competitors:
        result = fetch_price(comp["url"])
        if not result:
            continue

        new_price = result["price"]
        old_price = comp["last_price"]
        if new_price != old_price:
            pct_change = ((new_price - old_price) / old_price) * 100
            changes.append({
                "sku": comp["sku"],
                "name": comp["name"],
                "old_price": old_price,
                "new_price": new_price,
                "change_pct": round(pct_change, 1),
                "url": comp["url"],
            })
            # Update last_price in your database here

    if changes:
        send_slack_alert(changes)

def send_slack_alert(changes: list[dict]):
    lines = ["*Competitor Price Changes Detected*\n"]
    for c in changes:
        direction = "dropped" if c["change_pct"] < 0 else "increased"
        lines.append(
            f"• *{c['name']}* ({c['sku']}): ${c['old_price']} → ${c['new_price']} "
            f"({direction} {abs(c['change_pct'])}%) — <{c['url']}|View>"
        )
    requests.post(SLACK_WEBHOOK, json={"text": "\n".join(lines)})

if __name__ == "__main__":
    check_competitors()

Scaling to Thousands of Products

The simple loop above works for dozens of products. For thousands, you need concurrency and error resilience.

**Async fetching** — use `asyncio` and `httpx` to check multiple URLs concurrently. Respect the rate limit (default 5 requests/second) by using a semaphore. This turns a 30-minute sequential run into a 3-minute concurrent one.

**Retry with backoff** — if a request fails with a 429 (rate limited) or 5xx (server error), retry with exponential backoff. Don't retry on 4xx client errors — those indicate a problem with your input.

**Chunked processing** — break your product list into chunks of 100-200. Process each chunk, commit results to the database, then move to the next. This way, if the job crashes halfway through, you don't lose all progress.

**Dead letter queue** — products that fail 3 times in a row should be flagged for manual review. The URL might have changed, the product might be discontinued, or the retailer might have restructured their pages.

python
import asyncio
import httpx

API_KEY = "pf_live_abc123"
MAX_CONCURRENT = 5  # Match your rate limit

async def fetch_prices(urls: list[str]) -> list[dict]:
    semaphore = asyncio.Semaphore(MAX_CONCURRENT)
    async with httpx.AsyncClient(timeout=30) as client:
        async def fetch_one(url: str) -> dict | None:
            async with semaphore:
                resp = await client.get(
                    "https://api.pricefetch.dev/v1/price",
                    params={"url": url},
                    headers={"X-API-Key": API_KEY},
                )
                data = resp.json()
                return data["data"] if data["success"] else None

        tasks = [fetch_one(url) for url in urls]
        return await asyncio.gather(*tasks)

Beyond Price: Stock Status and Availability

Price is the headline number, but stock status is equally valuable for competitive intelligence. PriceFetch returns `in_stock` for every request.

When a competitor runs out of stock on a popular product, that's your opportunity to capture their demand — potentially at a higher margin since there's less competition. Conversely, when a competitor restocks, you might see a price drop as they clear inventory.

Track both data points together. A useful pattern: alert when a competitor's product goes from out-of-stock to in-stock (they just restocked, possibly at a new price), or from in-stock to out-of-stock (opportunity window for you).

Some teams also track whether a product is Prime-eligible on Amazon, as that affects conversion rates and effective price comparison. While PriceFetch focuses on price and stock status, you can combine this with other signals from your own research.

Frequently asked questions

Start fetching prices — 500 free credits

Sign up in 30 seconds. No credit card required. One credit per successful API call.