Tutorial Data Engineering

Extracting data from anywhere — APIs, databases, and files

SkyDeLake Jun 28, 2026 5 min read 0 views

Data Engineering Fundamentals

View full path →
  1. What data engineers actually do
  2. The anatomy of a data pipeline
  3. Warehouses, lakes, and lakehouses — what they are and when each wins
  4. Why Parquet? File formats explained
  5. Partitioning data at scale
  6. Predicate pushdown and column pruning — how query engines skip work
  7. Delta Lake, Iceberg, Hudi — open table formats explained
  8. Extracting data from anywhere — APIs, databases, and files
  9. Change Data Capture explained
  10. Batch vs streaming — choosing the right ingestion model
  11. SQL for data engineers
  12. dbt from first principles
  13. The medallion architecture — Bronze, Silver, Gold
  14. Idempotency — the most important property a data pipeline can have
  15. Orchestration fundamentals

Data engineering starts at the source. Before you can transform, aggregate, or serve data, you have to extract it from where it lives — and that turns out to be one of the most varied, frustrating, and practically important parts of the job. Source systems were not built to be extracted from. They were built to do something else.

Full vs incremental extraction

Every ingestion pipeline makes a fundamental choice: extract everything every time, or extract only what changed.

Full extraction is simpler. Read the entire source table, write it to the destination. No need to track state, no risk of missing changes. The cost is proportional to the table size — fine at small scale, prohibitive when the table has billions of rows.

Incremental extraction extracts only records that changed since the last run. Faster, cheaper, and necessary at scale. The challenge: you need a reliable way to identify what changed.

The most common incremental pattern is the high-water mark: store the maximum value of a timestamp or sequence column from the last successful run, and on the next run, extract only rows where that column is greater than the stored value.

import psycopg2
from datetime import datetime

def extract_incremental(conn, last_watermark: datetime):
    cursor = conn.cursor()
    cursor.execute("""
        SELECT id, customer_id, amount, status, updated_at
        FROM orders
        WHERE updated_at > %s
        ORDER BY updated_at ASC
    """, (last_watermark,))

    rows = cursor.fetchall()

    if rows:
        new_watermark = rows[-1][-1]  # updated_at of last row
        return rows, new_watermark

    return [], last_watermark

The catch: this pattern requires the source table to have a reliable updated_at column that is always set when a row changes, and it misses hard deletes (rows that are physically removed). For sources where deletes matter, you need Change Data Capture — covered in the next post.

Extracting from databases

Relational databases are the most common data source. A few patterns and gotchas:

Use a read replica. Never extract directly from the primary database at scale. Long-running analytical queries can degrade performance for application users. Most cloud databases support read replicas — route your extraction there.

Avoid SELECT * in production. Specify columns explicitly. This protects you from breaking changes (a new column in the source doesn't automatically break your pipeline) and reduces the data transferred.

Handle large tables with cursor-based pagination. Fetching 100 million rows in one query will exhaust memory. Chunk the extraction using keyset pagination:

def extract_in_chunks(conn, table: str, chunk_size: int = 100_000):
    last_id = 0
    while True:
        cursor = conn.cursor()
        cursor.execute(f"""
            SELECT id, customer_id, amount, updated_at
            FROM {table}
            WHERE id > %s
            ORDER BY id ASC
            LIMIT %s
        """, (last_id, chunk_size))

        rows = cursor.fetchall()
        if not rows:
            break

        yield rows
        last_id = rows[-1][0]  # last id in this chunk

Lock awareness. Long-running reads can block writes in some databases (depending on isolation level). Use READ COMMITTED isolation or snapshot-based reads where available.

Extracting from REST APIs

SaaS tools (Salesforce, HubSpot, Stripe, Shopify) expose data via REST APIs. Three things make API extraction harder than database extraction:

Pagination. APIs return data in pages, usually 100–1000 records at a time. You must handle pagination until you've retrieved all records. Two common patterns:

import requests

def extract_api_with_cursor(base_url: str, api_key: str):
    cursor = None
    all_records = []

    while True:
        params = {'limit': 100}
        if cursor:
            params['cursor'] = cursor

        response = requests.get(
            base_url,
            headers={'Authorization': f'Bearer {api_key}'},
            params=params
        ).json()

        all_records.extend(response['data'])

        cursor = response.get('next_cursor')
        if not cursor:
            break

    return all_records

Rate limits. APIs restrict how many requests you can make per minute or hour. Always implement exponential backoff with jitter on 429 (Too Many Requests) responses:

import time
import random

def request_with_backoff(url, headers, max_retries=5):
    for attempt in range(max_retries):
        response = requests.get(url, headers=headers)

        if response.status_code == 429:
            wait = (2 ** attempt) + random.uniform(0, 1)
            time.sleep(wait)
            continue

        response.raise_for_status()
        return response.json()

    raise Exception(f"Failed after {max_retries} retries")

Incremental extraction from APIs. Most APIs support filtering by a timestamp: ?updated_since=2024-01-15T00:00:00Z. Use the same high-water mark pattern as with databases. Some APIs (notably Stripe) use event streams instead — you poll for events after a given event ID.

Extracting from files

CSV/JSON/Excel files dropped into an SFTP server or cloud storage bucket are common in B2B data exchange. They're often the most fragile source type.

Key patterns:

import pandas as pd

EXPECTED_COLUMNS = {'order_id', 'customer_id', 'amount', 'currency', 'date'}

def validate_and_load(filepath: str) -> pd.DataFrame:
    df = pd.read_csv(filepath)
    actual = set(df.columns)

    missing = EXPECTED_COLUMNS - actual
    if missing:
        raise ValueError(f"Missing columns: {missing}")

    return df[list(EXPECTED_COLUMNS)]  # select only expected columns

The extraction layer's contract

Whatever the source, the extraction layer has one job: deliver raw data to the landing zone faithfully, completely, and without disrupting the source. It should not transform, clean, or interpret the data — that's the transformation layer's job. Landing raw, unmodified source data is what lets you reprocess cleanly when (not if) your downstream logic has a bug.