Build a competitor tracker: store URLs in a database, run a scheduled fetcher against PriceFetch, diff prices against previous checks, and alert on changes.
Competitor price tracking is a data pipeline with three stages: collect, compare, and act.
**Collect** — call PriceFetch for each competitor URL on a schedule. Store every result with a timestamp. This is your raw data layer.
**Compare** — for each new price point, compare against the previous value. Calculate the delta as both absolute and percentage change. Flag significant moves.
**Act** — route significant price changes to the right destination. That could be a Slack channel, a pricing dashboard, an automated repricing system, or all three.
The key insight: separate data collection from decision-making. Collect everything, decide later what matters. You'll want the historical data even if you don't act on every change today.
Here's a complete tracker that checks competitor URLs, detects price changes, and sends alerts. This runs as a scheduled job — trigger it with cron, a cloud scheduler, or a task queue.
import requests
import json
from datetime import datetime
API_KEY = "pf_live_abc123"
SLACK_WEBHOOK = "https://hooks.slack.com/services/T.../B.../xxx"
# In production, load from your database
competitors = [
{"sku": "WIDGET-100", "name": "Blue Widget", "url": "https://www.amazon.com/dp/B0EXAMPLE1", "last_price": 29.99},
{"sku": "WIDGET-200", "name": "Red Widget", "url": "https://www.walmart.com/ip/123456", "last_price": 24.99},
]
def fetch_price(url: str) -> dict | None:
resp = requests.get(
"https://api.pricefetch.dev/v1/price",
params={"url": url},
headers={"X-API-Key": API_KEY},
timeout=30,
)
data = resp.json()
return data["data"] if data["success"] else None
def check_competitors():
changes = []
for comp in competitors:
result = fetch_price(comp["url"])
if not result:
continue
new_price = result["price"]
old_price = comp["last_price"]
if new_price != old_price:
pct_change = ((new_price - old_price) / old_price) * 100
changes.append({
"sku": comp["sku"],
"name": comp["name"],
"old_price": old_price,
"new_price": new_price,
"change_pct": round(pct_change, 1),
"url": comp["url"],
})
# Update last_price in your database here
if changes:
send_slack_alert(changes)
def send_slack_alert(changes: list[dict]):
lines = ["*Competitor Price Changes Detected*\n"]
for c in changes:
direction = "dropped" if c["change_pct"] < 0 else "increased"
lines.append(
f"• *{c['name']}* ({c['sku']}): ${c['old_price']} → ${c['new_price']} "
f"({direction} {abs(c['change_pct'])}%) — <{c['url']}|View>"
)
requests.post(SLACK_WEBHOOK, json={"text": "\n".join(lines)})
if __name__ == "__main__":
check_competitors()The simple loop above works for dozens of products. For thousands, you need concurrency and error resilience.
**Async fetching** — use `asyncio` and `httpx` to check multiple URLs concurrently. Respect the rate limit (default 5 requests/second) by using a semaphore. This turns a 30-minute sequential run into a 3-minute concurrent one.
**Retry with backoff** — if a request fails with a 429 (rate limited) or 5xx (server error), retry with exponential backoff. Don't retry on 4xx client errors — those indicate a problem with your input.
**Chunked processing** — break your product list into chunks of 100-200. Process each chunk, commit results to the database, then move to the next. This way, if the job crashes halfway through, you don't lose all progress.
**Dead letter queue** — products that fail 3 times in a row should be flagged for manual review. The URL might have changed, the product might be discontinued, or the retailer might have restructured their pages.
import asyncio
import httpx
API_KEY = "pf_live_abc123"
MAX_CONCURRENT = 5 # Match your rate limit
async def fetch_prices(urls: list[str]) -> list[dict]:
semaphore = asyncio.Semaphore(MAX_CONCURRENT)
async with httpx.AsyncClient(timeout=30) as client:
async def fetch_one(url: str) -> dict | None:
async with semaphore:
resp = await client.get(
"https://api.pricefetch.dev/v1/price",
params={"url": url},
headers={"X-API-Key": API_KEY},
)
data = resp.json()
return data["data"] if data["success"] else None
tasks = [fetch_one(url) for url in urls]
return await asyncio.gather(*tasks)Price is the headline number, but stock status is equally valuable for competitive intelligence. PriceFetch returns `in_stock` for every request.
When a competitor runs out of stock on a popular product, that's your opportunity to capture their demand — potentially at a higher margin since there's less competition. Conversely, when a competitor restocks, you might see a price drop as they clear inventory.
Track both data points together. A useful pattern: alert when a competitor's product goes from out-of-stock to in-stock (they just restocked, possibly at a new price), or from in-stock to out-of-stock (opportunity window for you).
Some teams also track whether a product is Prime-eligible on Amazon, as that affects conversion rates and effective price comparison. While PriceFetch focuses on price and stock status, you can combine this with other signals from your own research.
Sign up in 30 seconds. No credit card required. One credit per successful API call.