The catalog sync pipeline bridges supplier APIs and the API-HUB database. When you trigger an import — manually via the API or automatically through the scheduler or an n8n workflow — aDocumentation Index
Fetch the complete documentation index at: https://mintlify.com/VisualGraphxLLC/API-HUB/llms.txt
Use this file to discover all available pages before exploring further.
SyncJob record is created immediately so you can poll its status, and all the actual work runs in the background. Products that fail individually do not abort the job; they are recorded in the errors array and the job continues.
Discovery modes
TheDiscoveryMode enum controls which products are fetched from the supplier during a sync. Each mode maps to a different query strategy inside the adapter.
- full_sellable
- delta
- first_n
- explicit_list
- closeouts
Fetches the supplier’s full active catalog — every product that is currently available for sale. Use this for the initial load and periodic full refreshes.Recommended schedule: weekly
Not all adapters implement every mode. If the adapter does not support
closeouts, calling it raises NotImplementedError and the job is marked failed. Check the adapter’s source or supplier documentation before scheduling a closeout sync.Triggering an import
POST /api/suppliers//import
Starts a new import job for the specified supplier. Returns202 Accepted immediately; work runs in a BackgroundTask.
Request
sync_job_id to poll the job’s status via GET /api/suppliers/{supplier_id}/sync-jobs.
Conflict protection: If an import of the same mode is already pending or running for the supplier, the endpoint returns 409 Conflict rather than starting a duplicate job.
Adapter precondition: If the supplier has no adapter_class configured, the endpoint returns 409 before creating a job.
Sync job lifecycle
Job created (pending)
create_pending_import_job inserts a SyncJob row with status = "pending", total_products = 0, and started_at set to now. The job ID is returned to the caller immediately.Adapter resolved
run_existing_import_job updates the job to status = "running" and calls get_adapter(supplier, db) to instantiate the correct protocol adapter (SOAP, REST, etc.) from the adapter registry. An AdapterNotConfiguredError or AdapterNotRegisteredError at this point marks the job failed.Discovery
The adapter’s
discover(mode, limit, explicit_list) method returns a list of ProductRef objects — lightweight references containing supplier_sku and optionally part_id. The count is written to job.total_products.An AuthError during discovery aborts immediately and marks the job failed. Other adapter errors (SupplierError, TransientError) also abort and mark failed.Hydrate and persist (per-product loop)
For each
ProductRef, the adapter calls hydrate_product(ref) to fetch full product details and normalize them to the ProductIngest schema. The result is then upserted to the database via persist_product.TransientError(network, timeout, 5xx): retried up to 2 times with exponential backoff (1s, 2s). After all retries are exhausted, the product is counted as failed and the loop continues.AuthErrormid-loop: aborts the entire job immediately.SupplierError,PersistError,AdapterError: counted as failed, loop continues.- Unexpected exceptions: logged and counted as failed, loop continues.
Job finalized
After the loop,
_finalize_job computes the terminal status and writes completed_at:"success"— zero errors"partial_success"— some succeeded, some failed"failed"— zero products succeeded
success or partial_success, the supplier’s last_full_sync or last_delta_sync timestamp is updated, and stale detection runs (see below).SyncJob schema
Jobs are stored in thesync_jobs table and exposed through SyncJobRead.
| Field | Type | Description |
|---|---|---|
id | UUID | Job identifier |
supplier_id | UUID | FK → suppliers.id |
supplier_name | VARCHAR | Denormalized for display |
job_type | VARCHAR | "import:{mode}" e.g. "import:delta" |
status | VARCHAR | pending → running → success / partial_success / failed |
discovery_mode | VARCHAR | Raw enum value of the mode used |
total_products | INTEGER | Count of ProductRef objects returned by discover |
success_count | INTEGER | Products successfully hydrated and stored |
failed_count | INTEGER | Products that errored during hydration or persistence |
records_processed | INTEGER | Alias for success_count (used by OPS-side reporting) |
errors | JSONB | Array of error objects: { phase, ref?, code?, msg } |
started_at | timestamptz | When the job entered running state |
completed_at | timestamptz | When _finalize_job ran |
Example error entry
phase is one of "registry", "discover", or "hydrate". For hydrate errors, ref contains the supplier_sku of the failing product.
Stale detection
When a product is successfully re-synced, any customer that previously received that product (via OPS push) needs to know that the catalog data has changed. After a successful or partially successful sync, API-HUB identifies allcustomer_product_selections rows where:
- The product belongs to the synced supplier
product.last_synced >= job.started_at(the product was actually updated in this run)status = "pushed"(the product has been delivered to a storefront)pushed_at < now(the push happened before this sync completed)
status flipped to "stale". Operators can query for stale selections to identify products that need to be re-pushed to reflect updated pricing, images, or attributes.
Background scheduler
API-HUB includes a built-in Python scheduler (start_scheduler) that triggers a delta sync for all active, adapter-configured suppliers on a fixed interval.
Disabling the scheduler
start_scheduler logs a message and returns immediately without entering the loop.
Listing sync jobs
GET /api/suppliers//sync-jobs
Returns the 50 most recent sync jobs for a supplier, ordered bystarted_at descending.