{"id":105989,"date":"2025-03-30T04:19:58","date_gmt":"2025-03-30T04:19:58","guid":{"rendered":"https:\/\/www.red-gate.com\/simple-talk\/?p=105989"},"modified":"2025-03-07T04:23:19","modified_gmt":"2025-03-07T04:23:19","slug":"load-data-into-snowflake-using-python-with-pandas","status":"publish","type":"post","link":"https:\/\/www.red-gate.com\/simple-talk\/databases\/snowflake\/load-data-into-snowflake-using-python-with-pandas\/","title":{"rendered":"Loading Data into Snowflake with Python and Pandas: write_pandas, DataFrame to Table, and File-Based Loading"},"content":{"rendered":"\n<p>Loading data into Snowflake is a common need. Using Python and pandas is a common go-to solution for data professionals. Whether you\u2019re pulling data from a relational database, wrangling a CSV file, or prototyping a new pipeline, this combination leverages pandas\u2019 intuitive data manipulation and Snowflake\u2019s cloud-native scalability. But let\u2019s be real\u2014data loading isn\u2019t always a simple task.<\/p>\n\n\n\n<p>Files go missing, connections drop, and type mismatches pop up when you least expect them. That\u2019s why robust error handling isn\u2019t just nice-to-have; it\u2019s essential for anything you\u2019d trust in production. In this guide, we\u2019ll walk through the fundamentals of getting data into Snowflake, explore practical examples with pandas and <a href=\"https:\/\/www.sqlalchemy.org\/\">SQLAlchemy<\/a>, and equip you with the tools to build a dependable, real-world-ready pipeline. Let\u2019s dive in and make your data loading process as smooth as possible!<\/p>\n\n\n\n<h2 class=\"wp-block-heading\" id=\"h-understanding-how-snowflake-handles-data\"><a id=\"post-105989-_heading=h.1fob9te\"><\/a>Understanding How Snowflake Handles Data<\/h2>\n\n\n\n<p>Before we dive into the code and examples, let&#8217;s chat about how Snowflake approaches data loading. Think of Snowflake like a really efficient postal service. Just as you might have different ways to send a package (regular mail, express delivery, or instant messenger), Snowflake offers various methods to load your data.<\/p>\n\n\n\n<p>The &#8220;post office&#8221; in this case is what we call a staging area. You&#8217;ve got three types: Internal stages (like having a PO box at the post office) managed by Snowflake for simplified management; External stages that leverage cloud storage services like AWS S3, Azure Blob Storage, or GCP Cloud Storage; and User stages which provide personal file storage areas for individual users.<\/p>\n\n\n\n<p><strong>For File Processing:<\/strong> Snowflake automatically handles:<\/p>\n\n\n<div class=\"block-core-list\">\n<ul class=\"wp-block-list\">\n<li>Format detection and validation<\/li>\n\n\n\n<li>Compression\/decompression (supported formats: GZIP, BZIP2, ZSTD)<\/li>\n\n\n\n<li>Data parsing and type conversion<\/li>\n\n\n\n<li>Micro-partitioning for optimal query performance<\/li>\n<\/ul>\n<\/div>\n\n\n<p>Snowflake&#8217;s automated type inference is powerful but needs careful attention. Initially, Snowflake analyzes a sample of rows to determine column data types. For example, for string columns, it considers the maximum length in the sample, not the entire dataset.<\/p>\n\n\n\n<p>This is often good enough, but if the sample does not represent the longest values in a set, (e.g., one row with 200 characters among millions with 10), Snowflake might underestimate the required datatype length, leading to errors.<\/p>\n\n\n\n<p>The best practice whenever possible, particularly when you process will be repeated or automated (like a production load), is to explicitly define column types to avoid type inference issues<\/p>\n\n\n\n<h2 class=\"wp-block-heading\" id=\"h-getting-your-hands-dirty-different-ways-to-load-data\"><a id=\"post-105989-_heading=h.3znysh7\"><\/a>Getting Your Hands Dirty: Different Ways to Load Data<\/h2>\n\n\n\n<p>Before we get into weeds of setting up python code, let&#8217;s explore different ways to load data into Snowflake based on common scenarios:<\/p>\n\n\n<div class=\"block-core-list\">\n<ul class=\"wp-block-list\">\n<li><strong>Relational Database Sources<\/strong><div class=\"block-core-list\">\n<ul class=\"wp-block-list\">\n<li>Using database connectors (Oracle, PostgreSQL, MySQL, etc.)<\/li>\n\n\n\n<li>Setting up incremental loads<\/li>\n\n\n\n<li>Handling schema changes<\/li>\n<\/ul>\n<\/div><\/li>\n\n\n\n<li><strong>File-Based Sources<\/strong><div class=\"block-core-list\">\n<ul class=\"wp-block-list\">\n<li>CSV files (with various delimiters)<\/li>\n\n\n\n<li>JSON documents<\/li>\n\n\n\n<li>Parquet and other columnar formats<\/li>\n\n\n\n<li>XML files<\/li>\n<\/ul>\n<\/div><\/li>\n\n\n\n<li><strong>API and Streaming Sources<\/strong><div class=\"block-core-list\">\n<ul class=\"wp-block-list\">\n<li>Real-time data ingestion<\/li>\n\n\n\n<li>Message queue integration<\/li>\n\n\n\n<li>Change data capture (CDC) feeds<\/li>\n<\/ul>\n<\/div><\/li>\n<\/ul>\n<\/div>\n\n\n<p>In this article, I will demonstrate something I do most every day- working with Python and pandas to load data into Snowflake using a few typical methods.<\/p>\n\n\n\n<h2 class=\"wp-block-heading\" id=\"h-using-python-with-pandas-to-load-relational-database-data-into-snowflake\"><a id=\"post-105989-_heading=h.2et92p0\"><\/a>Using Python with pandas to load relational database data into Snowflake<\/h2>\n\n\n\n<p>First, let&#8217;s look at a scenario where you&#8217;re loading customer transaction data from your e-commerce relational database. This example also covers how to write a pandas DataFrame to Snowflake using <a href=\"https:\/\/www.sqlalchemy.org\/\">SQLAlchemy<\/a>, a Python SQL toolkit and Object Relational Mapper.<\/p>\n\n\n\n<p>This is probably the most common scenario for data engineers, data scientists and analysts. This code snippet will feature:<\/p>\n\n\n<div class=\"block-core-list\">\n<ul class=\"wp-block-list\">\n<li><strong>Error Handling<\/strong>: Captures exceptions, logs them, saves failed records, and supports retries for transient failures (e.g., network timeouts).<\/li>\n\n\n\n<li><strong>Chunking<\/strong>: The chunksize=10000 parameter processes data in batches, reducing memory usage\u2014crucial for larger datasets.<\/li>\n\n\n\n<li><strong>Logging<\/strong>: Tracks success and failure for auditing and debugging.<\/li>\n\n\n\n<li><strong>Flexibility<\/strong>: i<code>f_exists='append'<\/code> adds new data without overwriting existing records.<\/li>\n<\/ul>\n<\/div>\n\n\n<p>In this example, we&#8217;ll extract yesterday&#8217;s transaction data from an e-commerce database (like MySQL or PostgreSQL) and load it into a Snowflake table called &#8216;daily_transactions&#8217;. The workflow includes handling potential errors, saving problematic records for further analysis, and implementing retry logic for temporary failures such as network timeouts.<\/p>\n\n\n\n<p>The example assumes the source database has a &#8216;transactions&#8217; table with a &#8216;date&#8217; column (without a time of day aspect to the value) to filter by. The target Snowflake table structure will match the source data schema, as pandas will automatically create matching columns.<\/p>\n\n\n\n<p>Let&#8217;s walk through this process step by step. First, we&#8217;ll connect to our source database and extract the previous day&#8217;s transaction data (this code can be downloaded <a href=\"https:\/\/www.red-gate.com\/simple-talk\/wp-content\/uploads\/2025\/03\/Code-Snippet-for-loading-data-from-a-relational-db-into-Snowflake.zip\">from here<\/a>):<\/p>\n\n\n\n<pre class=\"wp-block-preformatted\">import pandas as pd\nfrom sqlalchemy import create_engine\nimport logging<br>from datetime import datetime\n\n# Connect to the source database and extract yesterday's transactions\n# In a production environment, you would configure this connection properly\n# and potentially include more sophisticated filtering logic\n\ntransactions_df = pd.read_sql(\n    \"SELECT * FROM transactions WHERE date = CURRENT_DATE - 1\",\n    source_database_connection\n    # Note: 'date' here is a date field in the transactions table\n    # This assumes a standard date column, but you might need to use\n    # transaction_timestamp or another date\/time field in your database\n)<\/pre>\n\n\n\n<p>Next, we&#8217;ll set up our connection to Snowflake using SQLAlchemy. This provides a clean, standardized way to interact with Snowflake:<\/p>\n\n\n\n<pre class=\"wp-block-preformatted\"># Create Snowflake connection using SQLAlchemy engine \n# For production environments, use environment variables or a secure credential store\nengine = create_engine(\n'snowflake:\/\/{user}:{password}@{account}\/{database}\/{schema}?warehouse={warehouse}&amp;role={role}'.format(\n        user='your_username',\n        password='your_password',\n        account='your_account',\n        database='your_database',\n        schema='your_schema',\n        warehouse='your_warehouse',\n        role='your_role'\n    )\n)<\/pre>\n\n\n\n<p>Finally, we&#8217;ll load the data into Snowflake with comprehensive error handling:<\/p>\n\n\n\n<p>This will first attempt to execute the code in the try block to load the data into Snowflake and will be directed to the exception block in case of an error.<\/p>\n\n\n\n<pre class=\"wp-block-preformatted\"># Write to Snowflake (includes error handling for retries)\ntry:\n# The to_sql method will create or append to a table named 'daily_transactions' \n# The target table's schema will match our pandas DataFrame structure \n# Processing in chunks helps manage memory and enables better error recovery\n     transactions_df.to_sql(\n        'daily_transactions',\n        engine,\n        if_exists='append', # Adds new records without overwriting existing data\n        index=False, # Don't include DataFrame index as a column\n        chunksize=10000 # Process in smaller batches for better memory management\n    )\nexcept Exception as e:\n    # Log the error with details\n    error_message = f\"Failed to load transactions: {str(e)}\"\n    logging.error(error_message)\n    \n    # Identify rows that aren't completely null (have at least one non-null value)\n    # This helps isolate potentially problematic records\n    failed_records = transactions_df[transactions_df.apply(lambda x: not x.isnull().all(), axis=1)]\n    # Save failed records for review and later processing\nfailed_records.to_csv(f'failed_loads\/transactions_{datetime.now().strftime(\"%Y%m%d_%H%M%S\")}.csv')\n    # Notify the team\n    send_alert(error_message)\n    # If the error is transient (like a network timeout or connection issue),\n    # we can attempt to reload the data. The retry_load function would implement\n    # exponential backoff to avoid overwhelming the server.\n    if is_retryable_error(e):\n        retry_load(transactions_df, max_retries=3)<\/pre>\n\n\n\n<p>This snippet pulls yesterday&#8217;s transactions, connects to Snowflake, and loads the data in batches. If something goes wrong&#8212;like a timeout or a schema mismatch&#8212;it logs the error, saves the problematic records, and even tries again if the issue is transient. It&#8217;s a solid foundation for daily ETL jobs or ad-hoc analysis.<\/p>\n\n\n\n<p><strong>For optimal performance and security, consider these additional tips:<\/strong><\/p>\n\n\n<div class=\"block-core-list\">\n<ul class=\"wp-block-list\">\n<li><strong>Credentials Security<\/strong>: Store user, password, etc., in environment variables or a secure vault (e.g., AWS Secrets Manager).<\/li>\n\n\n\n<li><strong>Performance<\/strong>: For larger datasets, consider adjusting the chunksize parameter based on your data volume and available memory. For very large datasets, the native Snowflake Connector (explained in my next article) might be more efficient.<\/li>\n\n\n\n<li><strong>Performance<\/strong>: For larger datasets, consider adjusting the chunksize<\/li>\n\n\n\n<li><strong>Validation<\/strong>: Pre-check transactions_df for nulls or duplicates before loading.<\/li>\n<\/ul>\n<\/div>\n\n\n<p>The more practical application of this technique is for:<\/p>\n\n\n<div class=\"block-core-list\">\n<ul class=\"wp-block-list\">\n<li><strong>Ad-hoc Analysis<\/strong>: Load small to medium datasets for quick exploration and analysis by data scientists.<\/li>\n\n\n\n<li><strong>ETL Pipelines<\/strong>: Extract from relational sources, transform with pandas, and load into Snowflake as part of a scheduled workflow.<\/li>\n\n\n\n<li><strong>Prototyping<\/strong>: Test data workflows before scaling to higher-volume batch or streaming solutions.<\/li>\n<\/ul>\n<\/div>\n\n\n<h2 class=\"wp-block-heading\" id=\"h-load-file-based-sources-into-snowflake-using-pandas\"><a id=\"post-105989-_heading=h.tyjcwt\"><\/a>Load File-Based Sources into Snowflake using Pandas<\/h2>\n\n\n\n<p>Now, let\u2019s shift gears to file-based data\u2014like a CSV with customer info. This scenario demands more: validating the data, ensuring consistency, and tracking what\u2019s loaded.<\/p>\n\n\n\n<p>Let\u2019s see an example of how to load CSV Files with Custom Delimiter Detection into Snowflake and explore an approach to loading CSV files into Snowflake. The following class implements a comprehensive solution that:<\/p>\n\n\n<div class=\"block-core-list\">\n<ul class=\"wp-block-list\">\n<li>Validates data before attempting to load it<\/li>\n\n\n\n<li>Uses transactions to ensure data consistency<\/li>\n\n\n\n<li>Implements proper error handling and logging<\/li>\n\n\n\n<li>Verifies that the data was loaded correctly<\/li>\n\n\n\n<li>Maintains audit records of each load operation<\/li>\n<\/ul>\n<\/div>\n\n\n<p>This pattern is particularly useful for production ETL processes where reliability and traceability are critical. The code is structured as a reusable class that can be integrated into larger data pipelines (The code module described can be downloaded whole <a href=\"https:\/\/www.red-gate.com\/simple-talk\/wp-content\/uploads\/2025\/03\/Code-snippet-to-Load-File-Based-Sources-into-Snowflake-using-Pandas.zip\">from here<\/a>):<\/p>\n\n\n\n<p>Let\u2019s begin by importing python modules:<\/p>\n\n\n<div class=\"block-core-list\">\n<ul class=\"wp-block-list\">\n<li>pandas &#8211; A popular library for working with data tables<\/li>\n\n\n\n<li>snowflake.connector &#8211; The tool that lets Python talk to Snowflake<\/li>\n\n\n\n<li>logging &#8211; For keeping track of what happens during execution<\/li>\n\n\n\n<li>Other utilities for handling files, dates, and data validation<\/li>\n<\/ul>\n<\/div>\n\n\n<pre class=\"wp-block-preformatted\">import pandas as pd\nfrom snowflake.connector.pandas_tools import write_pandas\nimport snowflake.connector\nimport logging\nfrom datetime import datetime\nimport os\nfrom typing import Optional, Tuple\nimport hashlib<\/pre>\n\n\n\n<p><a id=\"post-105989-_heading=h.imsmaidpc8tx\"><\/a> Let\u2019s now define a class, initialize it and set up logging. This object will handle the entire process of loading CSV data into Snowflake. Think of this as designing a machine that has specific parts (methods) working together. When you create a new instance of this &#8220;machine,&#8221; you need to provide configuration settings for connecting to Snowflake (like username, password, etc.). <code>__init__<\/code> method saves those settings and sets up logging so you can track what happens. <code>setup_logging<\/code> creates a &#8216;logs&#8217; folder if it doesn&#8217;t exist, then configures the logging system to write logs to both a file and the console including timestamps, severity levels using the current date in the log filename<\/p>\n\n\n\n<p>The following code sets up a function that you can call. It is broken up into sections for discussion here:<\/p>\n\n\n\n<pre class=\"wp-block-preformatted\">class CSVtoSnowflakeLoader:\n    def __init__(self, snowflake_config: dict):\n        \"\"\"\n        Initialize the loader with Snowflake connection configuration.\n        \n        Args:\n            snowflake_config (dict): Configuration with connection details\n        \"\"\"\n        self.config = snowflake_config\n        self.setup_logging()\n        \n    def setup_logging(self):\n        \"\"\"Set up logging configuration\"\"\"\n        log_dir = 'logs'\n        if not os.path.exists(log_dir):\n            os.makedirs(log_dir)\n            \n        logging.basicConfig(\n            level=logging.INFO,\n            format='%(asctime)s - %(levelname)s - %(message)s',\n            handlers=[\n                logging.FileHandler(f'{log_dir}\/csv_load_{datetime.now().strftime(\"%Y%m%d\")}.log'),\n                logging.StreamHandler()\n            ]\n        )<\/pre>\n\n\n\n<p>This next code block establishes a connection to Snowflake using the config details you provided. If it can&#8217;t connect (wrong password, network issue, etc.), it logs the error and raises an exception. Think of this as dialing a phone number to talk to Snowflake.<\/p>\n\n\n\n<pre class=\"wp-block-preformatted\">    def get_connection(self) -&gt; snowflake.connector.SnowflakeConnection:\n        \"\"\"Create and return a Snowflake connection\"\"\"\n        try:\n            return snowflake.connector.connect(\n                user=self.config['user'],\n                password=self.config['password'],\n                account=self.config['account'],\n                warehouse=self.config['warehouse'],\n                database=self.config['database'],\n                schema=self.config['schema']\n            )\n        except Exception as e:\n            logging.error(f\"Failed to connect to Snowflake: {str(e)}\")\n            raise<\/pre>\n\n\n\n<p>Before attempting to load data, let\u2019s look at a method to check if it meets basic requirements such as:<\/p>\n\n\n<div class=\"block-core-list\">\n<ul class=\"wp-block-list\">\n<li>Make sure the data file isn&#8217;t empty<\/li>\n\n\n\n<li>Confirm required columns like &#8216;id&#8217; and &#8216;name&#8217; exist<\/li>\n\n\n\n<li>Verify data types match expectations (e.g., &#8216;id&#8217; should be an integer)<\/li>\n<\/ul>\n<\/div>\n\n\n<p>This is like checking ingredients before cooking to avoid problems later.<\/p>\n\n\n\n<pre class=\"wp-block-preformatted\">   def validate_csv(self, df: pd.DataFrame) -&gt; Tuple[bool, Optional[str]]:\n        \"\"\"\n        Validate the CSV data before loading.\n        \n        Args:\n            df (pd.DataFrame): DataFrame to validate\n            \n        Returns:\n            Tuple[bool, Optional[str]]: (is_valid, error_message)\n        \"\"\"\n        try:\n            # Check for empty DataFrame\n            if df.empty:\n                return False, \"CSV file is empty\"\n                \n            # Check for missing values in required columns\n            required_columns = ['id', 'name']  # Add your required columns\n            missing_columns = [col for col in required_columns if col not in df.columns]\n            if missing_columns:\n                return False, f\"Missing required columns: {missing_columns}\"\n                \n            # Check data types\n            expected_types = {\n                'id': 'int64',\n                'amount': 'float64',\n                'date': 'datetime64[ns]'\n            }\n            for col, expected_type in expected_types.items():\n                if col in df.columns and str(df[col].dtype) != expected_type:\n                    return False, f\"Column {col} has incorrect type: {df[col].dtype}, expected: {expected_type}\"\n                    \n            return True, None\n            \n        except Exception as e:<br>            return False, f\"Validation error: {str(e)}\"<\/pre>\n\n\n\n<p>Let\u2019s make sure that the data integrity is intact. This creates a unique &#8220;fingerprint&#8221; of the data using a mathematical function. Later, you can use this fingerprint to verify if the data was changed or corrupted during transfer. Read more <a href=\"https:\/\/www.techtarget.com\/searchsecurity\/definition\/cryptographic-checksum#:~:text=Generated%20by%20a%20cryptographic%20algorithm,the%20foundation%20of%20cryptography%20today.\">here on checksum<\/a> if you want to understand the basics of it.<\/p>\n\n\n\n<pre class=\"wp-block-preformatted\"> def calculate_checksum(self, df: pd.DataFrame) -&gt; str:\n        \"\"\"Calculate checksum of DataFrame for validation\"\"\"\n        return hashlib.md5(pd.util.hash_pandas_object(df).values).hexdigest()<\/pre>\n\n\n\n<p>Now, let\u2019s start the main process &#8211; loading the date. This involves multiple steps &#8211; creating an orchestration method for the entire load process, reading the csv file, validating the csv data, loading the data, verifying the load process all along having robust error handling. Follow along the comments on the code to understand this process.<\/p>\n\n\n\n<pre class=\"wp-block-preformatted\">        \n    def load_csv(self, csv_path: str, table_name: str) -&gt; bool:\n        \"\"\"\n        Load CSV file into Snowflake table with error handling.\n        \n        Args:\n            csv_path (str): Path to CSV file\n            table_name (str): Target Snowflake table name\n            \n        Returns:\n            bool: True if successful, False otherwise\n        \"\"\"\n        conn = None\n        error_file = None\n        success = False\n        \n        try:\n            logging.info(f\"Starting to load CSV file: {csv_path}\")\n            <\/pre>\n\n\n\n<p>&nbsp;<\/p>\n\n\n\n<p>Opens and reads the CSV file into a pandas DataFrame (a table-like structure). The parse_dates parameter tells pandas to interpret the &#8216;date&#8217; column as actual dates rather than text.<\/p>\n\n\n\n<pre class=\"wp-block-preformatted\"> # Read CSV file\n            try:\n                # Adjust parse_dates as needed\n                df = pd.read_csv(csv_path, parse_dates=['date'])  \n            except Exception as e:\n                logging.error(f\"Failed to read CSV file: {str(e)}\")\n                return False<\/pre>\n\n\n\n<p>Uses the validation method to check if the data meets requirements.<\/p>\n\n\n\n<pre class=\"wp-block-preformatted\">            # Validate data\n            is_valid, error_msg = self.validate_csv(df)\n            if not is_valid:\n                logging.error(f\"CSV validation failed: {error_msg}\")\n                return False<\/pre>\n\n\n\n<p>Creates the unique fingerprint of the data for integrity checking.<\/p>\n\n\n\n<pre class=\"wp-block-preformatted\">            # Calculate checksum for validation\n            checksum = self.calculate_checksum(df)<\/pre>\n\n\n\n<p>Establishes a connection to Snowflake.<\/p>\n\n\n\n<pre class=\"wp-block-preformatted\">            # Get Snowflake connection\n            conn = self.get_connection()<\/pre>\n\n\n\n<p>Starts a database transaction. This is like telling Snowflake &#8220;I&#8217;m about to do several things, but treat them as one action.&#8221;<\/p>\n\n\n\n<pre class=\"wp-block-preformatted\">            # Begin transaction\n            conn.cursor().execute(\"BEGIN\")<\/pre>\n\n\n\n<p>&nbsp;<\/p>\n\n\n\n<p>Loads the data into Snowflake in chunks of 10,000 rows at a time. Chunking is almost always more efficient for large datasets. It will definitely behoove you to test different sizes, especially for often repeated load processes.<\/p>\n\n\n\n<pre class=\"wp-block-preformatted\">            try:<br>                # Write to Snowflake<br>                success, nchunks, nrows, _ = write_pandas(<br>                    conn=conn,<br>                    df=df,<br>                    table_name=table_name,<br>                    chunk_size=10000  # Adjust based on your needs<br>                )<br>                <br>                if not success:<br>                    raise Exception(\"Failed to write data to Snowflake\")<\/pre>\n\n\n\n<p>Double-checks that all records were loaded by comparing the count in Snowflake to the original DataFrame size.<\/p>\n\n\n\n<pre class=\"wp-block-preformatted\">                # Verify row count\n                cursor = conn.cursor()\n                cursor.execute(f\"SELECT COUNT(*) FROM {table_name}\")\n                snowflake_count = cursor.fetchone()[0]\n                \n                if snowflake_count != len(df):\n                    raise Exception(\n                        f\"Row count mismatch: {len(df)} in CSV vs {snowflake_count} in Snowflake\"\n                    )<\/pre>\n\n\n\n<p>Adds an entry to a tracking table that records what was loaded, when, and with what checksum. This is like keeping a shipping receipt.<\/p>\n\n\n\n<pre class=\"wp-block-preformatted\">                # Log success metadata\n                cursor.execute(\"\"\"\n                    INSERT INTO load_history (\n                        table_name, \n                        file_name, \n                        load_timestamp, \n                        record_count,\n                        checksum\n                    ) VALUES (%s, %s, CURRENT_TIMESTAMP, %s, %s)\n                \"\"\", (table_name, os.path.basename(csv_path), len(df), checksum))<\/pre>\n\n\n\n<p>Finalizes all changes in Snowflake if everything went well.<\/p>\n\n\n\n<pre class=\"wp-block-preformatted\">                # Commit transaction\n                conn.cursor().execute(\"COMMIT\")\n                success = True\n                logging.info(f\"Successfully loaded {nrows} rows into {table_name}\")<\/pre>\n\n\n\n<p>If anything goes wrong during the process:<\/p>\n\n\n<div class=\"block-core-list\">\n<ul class=\"wp-block-list\">\n<li><code>ROLLBACK<\/code> cancels all changes (like undoing a transaction)<\/li>\n\n\n\n<li>The error is logged<\/li>\n\n\n\n<li>The data that failed to load is saved to a CSV file in an &#8216;errors&#8217; folder for later analysis<\/li>\n<\/ul>\n<\/div>\n\n\n<pre class=\"wp-block-preformatted\">            except Exception as e:\n                # Rollback transaction\n                conn.cursor().execute(\"ROLLBACK\")\n                logging.error(f\"Failed to load data: {str(e)}\")\n                \n                # Save failed records\n                error_file = f\"errors\/{table_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv\"\n                os.makedirs('errors', exist_ok=True)\n                df.to_csv(error_file, index=False)\n                logging.info(f\"Failed records saved to {error_file}\")\n                \n                success = False\n                \n        except Exception as e:\n            logging.error(f\"Unexpected error: {str(e)}\")\n            success = False<\/pre>\n\n\n\n<p>Always closes the database connection (regardless of success or failure) and returns whether the operation succeeded.<\/p>\n\n\n\n<pre class=\"wp-block-preformatted\">        finally:\n            if conn:\n                conn.close()\n                \n        return success<\/pre>\n\n\n\n<p>Let\u2019s see an example of using the above created class to load csv data into snowflake.<\/p>\n\n\n\n<pre class=\"wp-block-preformatted\"># Example usage\nif __name__ == \"__main__\":\n    # Snowflake connection configuration\n    config = {\n        'user': 'your_username',\n        'password': 'your_password',\n        'account': 'your_account',\n        'warehouse': 'your_warehouse',\n        'database': 'your_database',\n        'schema': 'your_schema'\n    }\n    \n    # Initialize loader\n    loader = CSVtoSnowflakeLoader(config)\n    \n    # Load CSV file\n    success = loader.load_csv(\n        csv_path='path\/to\/your\/data.csv',\n        table_name='your_target_table'\n    )<br>    \n    if success:\n        print(\"CSV loaded successfully!\")\n    else:\n        print(\"Failed to load CSV. Check logs for details.\")<\/pre>\n\n\n\n<p>This shows how to use the class we created:<\/p>\n\n\n<div class=\"block-core-list\">\n<ul class=\"wp-block-list\">\n<li>Define connection settings<\/li>\n\n\n\n<li>Create an instance of our loader<\/li>\n\n\n\n<li>Call the load_csv method to load a specific file into a specific table<\/li>\n\n\n\n<li>Check if it worked and print an appropriate message<\/li>\n<\/ul>\n<\/div>\n\n\n<p><strong>For optimal performance and security, consider these best practices<\/strong>:<\/p>\n\n\n<div class=\"block-core-list\">\n<ul class=\"wp-block-list\">\n<li>Chunk-based loading for large files<\/li>\n\n\n\n<li>Proper resource cleanup<\/li>\n\n\n\n<li>Configurable logging<\/li>\n\n\n\n<li>Type hints for better code maintainability<\/li>\n<\/ul>\n<\/div>\n\n\n<h2 class=\"wp-block-heading\" id=\"h-wrapping-up\">Wrapping Up<\/h2>\n\n\n\n<p>By now, you\u2019ve seen how pandas and Python can make loading data into Snowflake both approachable and reliable. Whether you\u2019re pulling from a database with SQLAlchemy or tackling CSV files with the Snowflake Connector, these methods balance ease of use with production-grade robustness. The error handling keeps things running smoothly when hiccups occur, while chunking and validation ensure scalability and accuracy.<\/p>\n\n\n\n<p>Start with these examples for your next project\u2014tweak them, watch the logs, and refine as you go. Remember that successful data loading is not just about getting data from point A to point B\u2014it&#8217;s about ensuring data quality, maintaining system performance, and implementing proper error handling throughout the process. Start with these patterns and adjust them based on your specific use cases and requirements.<\/p>\n\n\n\n<p>As your datasets grow or your needs evolve, you\u2019ll be ready to scale up to batch processing or streaming with confidence. Happy loading, and may your pipelines always run clean!<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Loading data into Snowflake using Python and pandas &#8211; connecting via the Snowflake Python Connector, writing DataFrames to tables with write_pandas, bulk-loading CSV files, and patterns for relational-database-to-Snowflake pipelines with customer transaction and file-based examples.&hellip;<\/p>\n","protected":false},"author":345259,"featured_media":105992,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"_acf_changed":false,"footnotes":""},"categories":[159001],"tags":[],"coauthors":[159232],"class_list":["post-105989","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-snowflake"],"acf":[],"_links":{"self":[{"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/posts\/105989","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/users\/345259"}],"replies":[{"embeddable":true,"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/comments?post=105989"}],"version-history":[{"count":9,"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/posts\/105989\/revisions"}],"predecessor-version":[{"id":106003,"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/posts\/105989\/revisions\/106003"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/media\/105992"}],"wp:attachment":[{"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/media?parent=105989"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/categories?post=105989"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/tags?post=105989"},{"taxonomy":"author","embeddable":true,"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/coauthors?post=105989"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}