Data engineering starts at the source. Before you can transform, aggregate, or serve data, you have to extract it from where it lives — and that turns out to be one of the most varied, frustrating, and practically important parts of the job. Source systems were not built to be extracted from. They were built to do something else.
Full vs incremental extraction
Every ingestion pipeline makes a fundamental choice: extract everything every time, or extract only what changed.
Full extraction is simpler. Read the entire source table, write it to the destination. No need to track state, no risk of missing changes. The cost is proportional to the table size — fine at small scale, prohibitive when the table has billions of rows.
Incremental extraction extracts only records that changed since the last run. Faster, cheaper, and necessary at scale. The challenge: you need a reliable way to identify what changed.
The most common incremental pattern is the high-water mark: store the maximum value of a timestamp or sequence column from the last successful run, and on the next run, extract only rows where that column is greater than the stored value.
import psycopg2
from datetime import datetime
def extract_incremental(conn, last_watermark: datetime):
cursor = conn.cursor()
cursor.execute("""
SELECT id, customer_id, amount, status, updated_at
FROM orders
WHERE updated_at > %s
ORDER BY updated_at ASC
""", (last_watermark,))
rows = cursor.fetchall()
if rows:
new_watermark = rows[-1][-1] # updated_at of last row
return rows, new_watermark
return [], last_watermark
The catch: this pattern requires the source table to have a reliable updated_at column that is always set when a row changes, and it misses hard deletes (rows that are physically removed). For sources where deletes matter, you need Change Data Capture — covered in the next post.
Extracting from databases
Relational databases are the most common data source. A few patterns and gotchas:
Use a read replica. Never extract directly from the primary database at scale. Long-running analytical queries can degrade performance for application users. Most cloud databases support read replicas — route your extraction there.
Avoid SELECT * in production. Specify columns explicitly. This protects you from breaking changes (a new column in the source doesn't automatically break your pipeline) and reduces the data transferred.
Handle large tables with cursor-based pagination. Fetching 100 million rows in one query will exhaust memory. Chunk the extraction using keyset pagination:
def extract_in_chunks(conn, table: str, chunk_size: int = 100_000):
last_id = 0
while True:
cursor = conn.cursor()
cursor.execute(f"""
SELECT id, customer_id, amount, updated_at
FROM {table}
WHERE id > %s
ORDER BY id ASC
LIMIT %s
""", (last_id, chunk_size))
rows = cursor.fetchall()
if not rows:
break
yield rows
last_id = rows[-1][0] # last id in this chunk
Lock awareness. Long-running reads can block writes in some databases (depending on isolation level). Use READ COMMITTED isolation or snapshot-based reads where available.
Extracting from REST APIs
SaaS tools (Salesforce, HubSpot, Stripe, Shopify) expose data via REST APIs. Three things make API extraction harder than database extraction:
Pagination. APIs return data in pages, usually 100–1000 records at a time. You must handle pagination until you've retrieved all records. Two common patterns:
- Offset pagination:
?page=1&limit=100,?page=2&limit=100. Simple but breaks if new records are inserted while you're paginating. - Cursor pagination: each response includes a
next_cursortoken. More reliable — the cursor is stable even if data changes.
import requests
def extract_api_with_cursor(base_url: str, api_key: str):
cursor = None
all_records = []
while True:
params = {'limit': 100}
if cursor:
params['cursor'] = cursor
response = requests.get(
base_url,
headers={'Authorization': f'Bearer {api_key}'},
params=params
).json()
all_records.extend(response['data'])
cursor = response.get('next_cursor')
if not cursor:
break
return all_records
Rate limits. APIs restrict how many requests you can make per minute or hour. Always implement exponential backoff with jitter on 429 (Too Many Requests) responses:
import time
import random
def request_with_backoff(url, headers, max_retries=5):
for attempt in range(max_retries):
response = requests.get(url, headers=headers)
if response.status_code == 429:
wait = (2 ** attempt) + random.uniform(0, 1)
time.sleep(wait)
continue
response.raise_for_status()
return response.json()
raise Exception(f"Failed after {max_retries} retries")
Incremental extraction from APIs. Most APIs support filtering by a timestamp: ?updated_since=2024-01-15T00:00:00Z. Use the same high-water mark pattern as with databases. Some APIs (notably Stripe) use event streams instead — you poll for events after a given event ID.
Extracting from files
CSV/JSON/Excel files dropped into an SFTP server or cloud storage bucket are common in B2B data exchange. They're often the most fragile source type.
Key patterns:
- Never process files in place. Move or copy them to a processing bucket first. If your job fails mid-read, you know which file was being processed.
- Track processed files (by filename, checksum, or S3 object version) to avoid reprocessing. S3 event notifications can trigger ingestion automatically when a file lands.
- Validate schema before loading. File sources are where schema surprises are most common — a partner sends a CSV with a column renamed or missing. Validate column names and types against an expected schema and fail loudly if they don't match.
import pandas as pd
EXPECTED_COLUMNS = {'order_id', 'customer_id', 'amount', 'currency', 'date'}
def validate_and_load(filepath: str) -> pd.DataFrame:
df = pd.read_csv(filepath)
actual = set(df.columns)
missing = EXPECTED_COLUMNS - actual
if missing:
raise ValueError(f"Missing columns: {missing}")
return df[list(EXPECTED_COLUMNS)] # select only expected columns
The extraction layer's contract
Whatever the source, the extraction layer has one job: deliver raw data to the landing zone faithfully, completely, and without disrupting the source. It should not transform, clean, or interpret the data — that's the transformation layer's job. Landing raw, unmodified source data is what lets you reprocess cleanly when (not if) your downstream logic has a bug.