Loading data into Snowflake is a common need. Using Python and pandas is a common go-to solution for data professionals. Whether you’re pulling data from a relational database, wrangling a CSV file, or prototyping a new pipeline, this combination leverages pandas’ intuitive data manipulation and Snowflake’s cloud-native scalability. But let’s be real—data loading isn’t always a simple task.
Files go missing, connections drop, and type mismatches pop up when you least expect them. That’s why robust error handling isn’t just nice-to-have; it’s essential for anything you’d trust in production. In this guide, we’ll walk through the fundamentals of getting data into Snowflake, explore practical examples with pandas and SQLAlchemy, and equip you with the tools to build a dependable, real-world-ready pipeline. Let’s dive in and make your data loading process as smooth as possible!
Understanding How Snowflake Handles Data
Before we dive into the code and examples, let’s chat about how Snowflake approaches data loading. Think of Snowflake like a really efficient postal service. Just as you might have different ways to send a package (regular mail, express delivery, or instant messenger), Snowflake offers various methods to load your data.
The “post office” in this case is what we call a staging area. You’ve got three types: Internal stages (like having a PO box at the post office) managed by Snowflake for simplified management; External stages that leverage cloud storage services like AWS S3, Azure Blob Storage, or GCP Cloud Storage; and User stages which provide personal file storage areas for individual users.
For File Processing: Snowflake automatically handles:
- Format detection and validation
- Compression/decompression (supported formats: GZIP, BZIP2, ZSTD)
- Data parsing and type conversion
- Micro-partitioning for optimal query performance
Snowflake’s automated type inference is powerful but needs careful attention. Initially, Snowflake analyzes a sample of rows to determine column data types. For example, for string columns, it considers the maximum length in the sample, not the entire dataset.
This is often good enough, but if the sample does not represent the longest values in a set, (e.g., one row with 200 characters among millions with 10), Snowflake might underestimate the required datatype length, leading to errors.
The best practice whenever possible, particularly when you process will be repeated or automated (like a production load), is to explicitly define column types to avoid type inference issues
Getting Your Hands Dirty: Different Ways to Load Data
Before we get into weeds of setting up python code, let’s explore different ways to load data into Snowflake based on common scenarios:
- Relational Database Sources
- Using database connectors (Oracle, PostgreSQL, MySQL, etc.)
- Setting up incremental loads
- Handling schema changes
- File-Based Sources
- CSV files (with various delimiters)
- JSON documents
- Parquet and other columnar formats
- XML files
- API and Streaming Sources
- Real-time data ingestion
- Message queue integration
- Change data capture (CDC) feeds
In this article, I will demonstrate something I do most every day- working with Python and pandas to load data into Snowflake using a few typical methods.
Using Python with pandas to load relational database data into Snowflake
First, let’s look at a scenario where you’re loading customer transaction data from your e-commerce relational database. This example also covers how to write a pandas DataFrame to Snowflake using SQLAlchemy, a Python SQL toolkit and Object Relational Mapper.
This is probably the most common scenario for data engineers, data scientists and analysts. This code snippet will feature:
- Error Handling: Captures exceptions, logs them, saves failed records, and supports retries for transient failures (e.g., network timeouts).
- Chunking: The chunksize=10000 parameter processes data in batches, reducing memory usage—crucial for larger datasets.
- Logging: Tracks success and failure for auditing and debugging.
- Flexibility: i
f_exists='append'
adds new data without overwriting existing records.
In this example, we’ll extract yesterday’s transaction data from an e-commerce database (like MySQL or PostgreSQL) and load it into a Snowflake table called ‘daily_transactions’. The workflow includes handling potential errors, saving problematic records for further analysis, and implementing retry logic for temporary failures such as network timeouts.
The example assumes the source database has a ‘transactions’ table with a ‘date’ column (without a time of day aspect to the value) to filter by. The target Snowflake table structure will match the source data schema, as pandas will automatically create matching columns.
Let’s walk through this process step by step. First, we’ll connect to our source database and extract the previous day’s transaction data (this code can be downloaded from here):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
import pandas as pd from sqlalchemy import create_engine import logging<br>from datetime import datetime # Connect to the source database and extract yesterday's transactions # In a production environment, you would configure this connection properly # and potentially include more sophisticated filtering logic transactions_df = pd.read_sql( "SELECT * FROM transactions WHERE date = CURRENT_DATE - 1", source_database_connection # Note: 'date' here is a date field in the transactions table # This assumes a standard date column, but you might need to use # transaction_timestamp or another date/time field in your database ) |
Next, we’ll set up our connection to Snowflake using SQLAlchemy. This provides a clean, standardized way to interact with Snowflake:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
# Create Snowflake connection using SQLAlchemy engine # For production environments, use environment variables or a secure credential store engine = create_engine( 'snowflake://{user}:{password}@{account}/{database}/{schema}?warehouse={warehouse}&role={role}'.format( user='your_username', password='your_password', account='your_account', database='your_database', schema='your_schema', warehouse='your_warehouse', role='your_role' ) ) |
Finally, we’ll load the data into Snowflake with comprehensive error handling:
This will first attempt to execute the code in the try block to load the data into Snowflake and will be directed to the exception block in case of an error.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# Write to Snowflake (includes error handling for retries) try: # The to_sql method will create or append to a table named 'daily_transactions' # The target table's schema will match our pandas DataFrame structure # Processing in chunks helps manage memory and enables better error recovery transactions_df.to_sql( 'daily_transactions', engine, if_exists='append', # Adds new records without overwriting existing data index=False, # Don't include DataFrame index as a column chunksize=10000 # Process in smaller batches for better memory management ) except Exception as e: # Log the error with details error_message = f"Failed to load transactions: {str(e)}" logging.error(error_message) # Identify rows that aren't completely null (have at least one non-null value) # This helps isolate potentially problematic records failed_records = transactions_df[transactions_df.apply(lambda x: not x.isnull().all(), axis=1)] # Save failed records for review and later processing failed_records.to_csv(f'failed_loads/transactions_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv') # Notify the team send_alert(error_message) # If the error is transient (like a network timeout or connection issue), # we can attempt to reload the data. The retry_load function would implement # exponential backoff to avoid overwhelming the server. if is_retryable_error(e): retry_load(transactions_df, max_retries=3) |
This snippet pulls yesterday’s transactions, connects to Snowflake, and loads the data in batches. If something goes wrong—like a timeout or a schema mismatch—it logs the error, saves the problematic records, and even tries again if the issue is transient. It’s a solid foundation for daily ETL jobs or ad-hoc analysis.
For optimal performance and security, consider these additional tips:
- Credentials Security: Store user, password, etc., in environment variables or a secure vault (e.g., AWS Secrets Manager).
- Performance: For larger datasets, consider adjusting the chunksize parameter based on your data volume and available memory. For very large datasets, the native Snowflake Connector (explained in my next article) might be more efficient.
- Performance: For larger datasets, consider adjusting the chunksize
- Validation: Pre-check transactions_df for nulls or duplicates before loading.
The more practical application of this technique is for:
- Ad-hoc Analysis: Load small to medium datasets for quick exploration and analysis by data scientists.
- ETL Pipelines: Extract from relational sources, transform with pandas, and load into Snowflake as part of a scheduled workflow.
- Prototyping: Test data workflows before scaling to higher-volume batch or streaming solutions.
Load File-Based Sources into Snowflake using Pandas
Now, let’s shift gears to file-based data—like a CSV with customer info. This scenario demands more: validating the data, ensuring consistency, and tracking what’s loaded.
Let’s see an example of how to load CSV Files with Custom Delimiter Detection into Snowflake and explore an approach to loading CSV files into Snowflake. The following class implements a comprehensive solution that:
- Validates data before attempting to load it
- Uses transactions to ensure data consistency
- Implements proper error handling and logging
- Verifies that the data was loaded correctly
- Maintains audit records of each load operation
This pattern is particularly useful for production ETL processes where reliability and traceability are critical. The code is structured as a reusable class that can be integrated into larger data pipelines (The code module described can be downloaded whole from here):
Let’s begin by importing python modules:
- pandas – A popular library for working with data tables
- snowflake.connector – The tool that lets Python talk to Snowflake
- logging – For keeping track of what happens during execution
- Other utilities for handling files, dates, and data validation
1 2 3 4 5 6 7 8 |
import pandas as pd from snowflake.connector.pandas_tools import write_pandas import snowflake.connector import logging from datetime import datetime import os from typing import Optional, Tuple import hashlib |
Let’s now define a class, initialize it and set up logging. This object will handle the entire process of loading CSV data into Snowflake. Think of this as designing a machine that has specific parts (methods) working together. When you create a new instance of this “machine,” you need to provide configuration settings for connecting to Snowflake (like username, password, etc.). __init__
method saves those settings and sets up logging so you can track what happens. setup_logging
creates a ‘logs’ folder if it doesn’t exist, then configures the logging system to write logs to both a file and the console including timestamps, severity levels using the current date in the log filename
The following code sets up a function that you can call. It is broken up into sections for discussion here:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
class CSVtoSnowflakeLoader: def __init__(self, snowflake_config: dict): """ Initialize the loader with Snowflake connection configuration. Args: snowflake_config (dict): Configuration with connection details """ self.config = snowflake_config self.setup_logging() def setup_logging(self): """Set up logging configuration""" log_dir = 'logs' if not os.path.exists(log_dir): os.makedirs(log_dir) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(f'{log_dir}/csv_load_{datetime.now().strftime("%Y%m%d")}.log'), logging.StreamHandler() ] ) |
This next code block establishes a connection to Snowflake using the config details you provided. If it can’t connect (wrong password, network issue, etc.), it logs the error and raises an exception. Think of this as dialing a phone number to talk to Snowflake.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
def get_connection(self) -> snowflake.connector.SnowflakeConnection: """Create and return a Snowflake connection""" try: return snowflake.connector.connect( user=self.config['user'], password=self.config['password'], account=self.config['account'], warehouse=self.config['warehouse'], database=self.config['database'], schema=self.config['schema'] ) except Exception as e: logging.error(f"Failed to connect to Snowflake: {str(e)}") raise |
Before attempting to load data, let’s look at a method to check if it meets basic requirements such as:
- Make sure the data file isn’t empty
- Confirm required columns like ‘id’ and ‘name’ exist
- Verify data types match expectations (e.g., ‘id’ should be an integer)
This is like checking ingredients before cooking to avoid problems later.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
def validate_csv(self, df: pd.DataFrame) -> Tuple[bool, Optional[str]]: """ Validate the CSV data before loading. Args: df (pd.DataFrame): DataFrame to validate Returns: Tuple[bool, Optional[str]]: (is_valid, error_message) """ try: # Check for empty DataFrame if df.empty: return False, "CSV file is empty" # Check for missing values in required columns required_columns = ['id', 'name'] # Add your required columns missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: return False, f"Missing required columns: {missing_columns}" # Check data types expected_types = { 'id': 'int64', 'amount': 'float64', 'date': 'datetime64[ns]' } for col, expected_type in expected_types.items(): if col in df.columns and str(df[col].dtype) != expected_type: return False, f"Column {col} has incorrect type: {df[col].dtype}, expected: {expected_type}" return True, None except Exception as e:<br> return False, f"Validation error: {str(e)}" |
Let’s make sure that the data integrity is intact. This creates a unique “fingerprint” of the data using a mathematical function. Later, you can use this fingerprint to verify if the data was changed or corrupted during transfer. Read more here on checksum if you want to understand the basics of it.
1 2 3 |
def calculate_checksum(self, df: pd.DataFrame) -> str: """Calculate checksum of DataFrame for validation""" return hashlib.md5(pd.util.hash_pandas_object(df).values).hexdigest() |
Now, let’s start the main process – loading the date. This involves multiple steps – creating an orchestration method for the entire load process, reading the csv file, validating the csv data, loading the data, verifying the load process all along having robust error handling. Follow along the comments on the code to understand this process.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
def load_csv(self, csv_path: str, table_name: str) -> bool: """ Load CSV file into Snowflake table with error handling. Args: csv_path (str): Path to CSV file table_name (str): Target Snowflake table name Returns: bool: True if successful, False otherwise """ conn = None error_file = None success = False try: logging.info(f"Starting to load CSV file: {csv_path}") |
Opens and reads the CSV file into a pandas DataFrame (a table-like structure). The parse_dates parameter tells pandas to interpret the ‘date’ column as actual dates rather than text.
1 2 3 4 5 6 7 |
# Read CSV file try: # Adjust parse_dates as needed df = pd.read_csv(csv_path, parse_dates=['date']) except Exception as e: logging.error(f"Failed to read CSV file: {str(e)}") return False |
Uses the validation method to check if the data meets requirements.
1 2 3 4 5 |
# Validate data is_valid, error_msg = self.validate_csv(df) if not is_valid: logging.error(f"CSV validation failed: {error_msg}") return False |
Creates the unique fingerprint of the data for integrity checking.
1 2 |
# Calculate checksum for validation checksum = self.calculate_checksum(df) |
Establishes a connection to Snowflake.
1 2 |
# Get Snowflake connection conn = self.get_connection() |
Starts a database transaction. This is like telling Snowflake “I’m about to do several things, but treat them as one action.”
1 2 |
# Begin transaction conn.cursor().execute("BEGIN") |
Loads the data into Snowflake in chunks of 10,000 rows at a time. Chunking is almost always more efficient for large datasets. It will definitely behoove you to test different sizes, especially for often repeated load processes.
1 |
try:<br> # Write to Snowflake<br> success, nchunks, nrows, _ = write_pandas(<br> conn=conn,<br> df=df,<br> table_name=table_name,<br> chunk_size=10000 # Adjust based on your needs<br> )<br> <br> if not success:<br> raise Exception("Failed to write data to Snowflake") |
Double-checks that all records were loaded by comparing the count in Snowflake to the original DataFrame size.
1 2 3 4 5 6 7 8 9 |
# Verify row count cursor = conn.cursor() cursor.execute(f"SELECT COUNT(*) FROM {table_name}") snowflake_count = cursor.fetchone()[0] if snowflake_count != len(df): raise Exception( f"Row count mismatch: {len(df)} in CSV vs {snowflake_count} in Snowflake" ) |
Adds an entry to a tracking table that records what was loaded, when, and with what checksum. This is like keeping a shipping receipt.
1 2 3 4 5 6 7 8 9 10 |
# Log success metadata cursor.execute(""" INSERT INTO load_history ( table_name, file_name, load_timestamp, record_count, checksum ) VALUES (%s, %s, CURRENT_TIMESTAMP, %s, %s) """, (table_name, os.path.basename(csv_path), len(df), checksum)) |
Finalizes all changes in Snowflake if everything went well.
1 2 3 4 |
# Commit transaction conn.cursor().execute("COMMIT") success = True logging.info(f"Successfully loaded {nrows} rows into {table_name}") |
If anything goes wrong during the process:
ROLLBACK
cancels all changes (like undoing a transaction)- The error is logged
- The data that failed to load is saved to a CSV file in an ‘errors’ folder for later analysis
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
except Exception as e: # Rollback transaction conn.cursor().execute("ROLLBACK") logging.error(f"Failed to load data: {str(e)}") # Save failed records error_file = f"errors/{table_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" os.makedirs('errors', exist_ok=True) df.to_csv(error_file, index=False) logging.info(f"Failed records saved to {error_file}") success = False except Exception as e: logging.error(f"Unexpected error: {str(e)}") success = False |
Always closes the database connection (regardless of success or failure) and returns whether the operation succeeded.
1 2 3 4 5 |
finally: if conn: conn.close() return success |
Let’s see an example of using the above created class to load csv data into snowflake.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# Example usage if __name__ == "__main__": # Snowflake connection configuration config = { 'user': 'your_username', 'password': 'your_password', 'account': 'your_account', 'warehouse': 'your_warehouse', 'database': 'your_database', 'schema': 'your_schema' } # Initialize loader loader = CSVtoSnowflakeLoader(config) # Load CSV file success = loader.load_csv( csv_path='path/to/your/data.csv', table_name='your_target_table' )<br> if success: print("CSV loaded successfully!") else: print("Failed to load CSV. Check logs for details.") |
This shows how to use the class we created:
- Define connection settings
- Create an instance of our loader
- Call the load_csv method to load a specific file into a specific table
- Check if it worked and print an appropriate message
For optimal performance and security, consider these best practices:
- Chunk-based loading for large files
- Proper resource cleanup
- Configurable logging
- Type hints for better code maintainability
Wrapping Up
By now, you’ve seen how pandas and Python can make loading data into Snowflake both approachable and reliable. Whether you’re pulling from a database with SQLAlchemy or tackling CSV files with the Snowflake Connector, these methods balance ease of use with production-grade robustness. The error handling keeps things running smoothly when hiccups occur, while chunking and validation ensure scalability and accuracy.
Start with these examples for your next project—tweak them, watch the logs, and refine as you go. Remember that successful data loading is not just about getting data from point A to point B—it’s about ensuring data quality, maintaining system performance, and implementing proper error handling throughout the process. Start with these patterns and adjust them based on your specific use cases and requirements.
As your datasets grow or your needs evolve, you’ll be ready to scale up to batch processing or streaming with confidence. Happy loading, and may your pipelines always run clean!
Load comments