Practical, repeatable data cleaning of incoming orders using pandas, validating columns rows and tables, dropping and logging invalid rows, and saving cleaned data and audit artifacts.
Last month you successfully ingested July’s orders. This month’s August orders arrived from multiple humans and systems — and of course some rows look a little off: missing customer IDs, non-existent customers, strange product IDs, negative quantities, and malformed dates. Manual line-by-line fixes aren’t feasible at scale.In this lesson we cover practical, repeatable data cleaning using pandas. We’ll focus on common dirty-data patterns (missing values, invalid types, duplicates, mismatched foreign keys), the three levels of validation (column, row, table), and a pragmatic strategy: drop rows that fail validation while logging everything dropped so it can be reviewed and corrected later.
Table-level checks: foreign keys (customer_id, product_id) must exist in lookup tables.
Log and save dropped rows for auditing.
Save cleaned dataset and update ingestion logs.
Before you start, activate your environment and ensure pandas is installed. This process is repeatable and should be run as part of your ETL pipeline. Use the raw copy of the incoming file for traceability and audits.
Initial setup — prepare folders and find the orders file
Create required folders, locate the orders CSV (any filename containing orders), and load an ingest log if present.
# pythonimport osimport pandas as pdimport shutilfrom datetime import datetime# Define folders and pathsdata_folder = "data"archive_folder = os.path.join(data_folder, "archive")insights_folder = "insights"logs_folder = "logs"log_path = os.path.join(logs_folder, "ingest_log.csv")# Create folders if they don't existfor folder in [data_folder, archive_folder, insights_folder, logs_folder]: os.makedirs(folder, exist_ok=True)print("✓ Folder structure and paths set up.")# Find the orders file in data_folder (any filename containing 'orders')files = os.listdir(data_folder)file_name = next((f for f in files if "orders" in f and f.lower().endswith(".csv")), None)if not file_name: raise FileNotFoundError("No orders file found in the data folder.")else: file_path = os.path.join(data_folder, file_name) file_id = os.path.splitext(file_name)[0] print(f"✅ Found file: {file_name}")# Load existing ingest log if it existsif os.path.exists(log_path): log = pd.read_csv(log_path)else: log = pd.DataFrame()
For this lesson the pipeline policy is to drop rows that fail validation and log them. Dropping is acceptable when only a small fraction of rows are bad and when you retain the dropped rows for later review or repair.
Dropping rows can bias downstream analytics if many rows are removed. Always log discarded rows and their reasons so the data owner can correct the source or you can implement targeted fixes later.
# python# Load data from the located file and lookup tablesorders_path = file_path # the file we found aboveproducts_path = os.path.join("data", "products.csv")customers_path = os.path.join("data", "customers.csv")orders = pd.read_csv(orders_path)products = pd.read_csv(products_path)customers = pd.read_csv(customers_path)# Keep a raw copy for logging or later re-ingestionorders_raw = orders.copy()
Decide which columns are mandatory. If a required column is missing from the file entirely, you should either raise an error or log and skip processing (depending on your pipeline policy). Here we assume the columns exist and drop rows with nulls in required fields.
# pythonrequired_columns = ["order_id", "customer_id", "product_id", "quantity", "order_date"]# Validate required columns existmissing_columns = [c for c in required_columns if c not in orders.columns]if missing_columns: raise KeyError(f"Missing required columns: {missing_columns}")# Identify rows with missing required fieldsmissing_mask = orders[required_columns].isnull().any(axis=1)dropped_missing_ids = orders.loc[missing_mask, "order_id"].tolist()if dropped_missing_ids: print(f"🗑️ Removed {len(dropped_missing_ids)} rows with missing required fields: {dropped_missing_ids}")# Drop themorders = orders[~missing_mask]
Use pandas to parse dates. errors='coerce' converts unparsable values to NaT, which you can then drop. If you require strict formats, pass a format= argument.
# python# Convert order_date to datetime, invalid parsing will be NaTorders["order_date_parsed"] = pd.to_datetime(orders["order_date"], errors="coerce")invalid_dates_mask = orders["order_date_parsed"].isna()dropped_date_ids = orders.loc[invalid_dates_mask, "order_id"].tolist()if dropped_date_ids: print(f"🗑️ Removed {len(dropped_date_ids)} rows with invalid order_date: {dropped_date_ids}")orders = orders[~invalid_dates_mask]
Coerce numeric fields and drop rows that fail numeric validation. For quantity, enforce strictly positive values (> 0). For IDs, require non-negative integers. Use temporary checked columns during validation and remove them afterwards.
# pythonnumeric_fields = ["customer_id", "product_id", "quantity"]invalid_numeric_mask = pd.Series(False, index=orders.index)for field in numeric_fields: # Coerce to numeric; invalid parsing becomes NaN orders[f"{field}_checked"] = pd.to_numeric(orders[field], errors="coerce") # Invalid if NaN or negative (for quantity require > 0; for IDs require >= 0) if field == "quantity": invalids = orders[f"{field}_checked"].isna() | (orders[f"{field}_checked"] <= 0) else: invalids = orders[f"{field}_checked"].isna() | (orders[f"{field}_checked"] < 0) if invalids.any(): dropped = orders.loc[invalids, "order_id"].tolist() print(f"🗑️ Removed {len(dropped)} rows with invalid {field}: {dropped}") invalid_numeric_mask |= invalids# Drop all rows that failed any numeric checksorders = orders[~invalid_numeric_mask]# Convert checked numeric columns to integers (safe now)orders["customer_id"] = orders["customer_id_checked"].astype(int)orders["product_id"] = orders["product_id_checked"].astype(int)orders["quantity"] = orders["quantity_checked"].astype(int)# Drop helper checked columnsorders.drop(columns=[f"{field}_checked" for field in numeric_fields], inplace=True)
Note: If you must disallow fractional IDs, check that the checked numeric values equal their integer cast before accepting them; astype(int) will silently truncate floats.
Remove exact duplicate rows (or duplicates by order_id if that’s your unique key). Make sure any helper columns that could affect duplicate detection are dropped before running this check.
Verify that customer_id and product_id exist in their respective lookup tables. Make sure lookup key dtypes match (both int or both string) to avoid false negatives.
Compare the raw copy to the cleaned dataframe to extract and save exactly what was removed during cleaning. This creates an auditable CSV that the data owner can inspect and use to fix source issues.
# pythondropped_order_ids = set(orders_raw["order_id"].tolist()) - set(orders["order_id"].tolist())if dropped_order_ids: dropped_rows = orders_raw[orders_raw["order_id"].isin(dropped_order_ids)] dropped_path = os.path.join(insights_folder, "ordersDropped.csv") dropped_rows.to_csv(dropped_path, index=False) print(f"Saved dropped rows ({len(dropped_rows)}) to {dropped_path}")else: print("No rows were dropped during cleaning.")
Final tidy-up, save cleaned data, archive raw file, update log
Remove any temporary helper columns, reset the index, save the cleaned file to insights/, archive the original raw file, and append an entry to the ingest log.
# python# Remove helper columns and reset indexif "order_date_parsed" in orders.columns: orders.drop(columns=["order_date_parsed"], inplace=True)orders = orders.reset_index(drop=True)# Save cleaned orderscleaned_path = os.path.join(insights_folder, f"{file_id}_cleaned.csv")orders.to_csv(cleaned_path, index=False)print(f"Saved cleaned data to {cleaned_path}")# Archive the raw source fileos.makedirs(archive_folder, exist_ok=True)shutil.move(file_path, os.path.join(archive_folder, file_name))print(f"Moved raw file to {archive_folder}/{file_name}")# Update ingest logstatus = "cleaned"row_count = len(orders)log_entry = pd.DataFrame([{ "file_name": file_name, "status": status, "rows": row_count, "timestamp": datetime.now().replace(microsecond=0).isoformat()}])if os.path.exists(log_path): log = pd.read_csv(log_path) log = pd.concat([log, log_entry], ignore_index=True)else: log = log_entryos.makedirs(logs_folder, exist_ok=True)log.to_csv(log_path, index=False)print(f"Logged ingestion to {log_path}")
Missing data: rows with IDs 1035 and 1050 were dropped.
Invalid dates: entries like “10:45” (time-only) were dropped.
Invalid numbers: negative quantity or fractional IDs were removed.
Duplicate rows: order ID 1072 appeared twice; one duplicate was removed.
Missing foreign keys: orders referencing customer_id = 999 or product_id = 999 were dropped because those IDs don’t exist in the lookup tables.
When you open the cleaned file you should see the dirty rows removed. The cleaned dataset is now ready for the next step in your pipeline: enrichment, aggregation, and analytics.
Dirty data comes in many shapes: missing values, invalid formats (dates), negative or non-integer numbers, duplicates, or mismatched foreign keys. Apply validation at these three levels:
Column-level: Are required columns present, and are their dtypes sensible?
Row-level: Are the values in each row complete and valid?
Table-level: Do foreign keys match values in lookup tables?
Cleaning often means dropping bad rows, but always log what you discard so errors can be traced back and fixed.