How to Upsert DataFrames into Postgres safely.

Thomas Dickson

3 minute read



I spend a bit of time moving data between applications and Postgres using psycopg2 and pandas DataFrames. Unsurprisingly this is something that a lot of other people do too. I really liked this blog post which compared the speed of different methods of inserting pandas DataFrames to Postgres databases. I adapted one of the snippets to up-sert a pandas dataframe to a Postgres table given a specific constraint.

Up-serts in Postgres occur when rows are updated if they already exist and are inserted if they do not. Conflicts can occur if two rows occupy identical positions on indexes, in this event it’s possible to handle behaviour using the ON CONFLICT keywords to handle UPDATE behaviour. As Pandas does not come with the ability to upsert rows we need to manage upserts ourselves.

The snippet below upserts a DataFrame to a table given a connection and the name of the constraint. Two strings of SQL are created, one to describe the insert statement and the other to describe the update statement which is run if there is a conflict on the specified constraint. Note that although I create the text for the statements using string interpolation 1 the values which are actually inserted in the query are passed in the execute_batch function.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import logging
import psycopg2
import psycopg2.extras
import pandas as pd

def upsert_dataframe(conn, df, table, constraint_name, page_size=100):
    """
    Upserting a pandas dataframe into a Postgres database.

    Parameters
    ----------
    conn: psycopg2.connection
        Connection to postgres database.
    df: pd.DataFrame
        Pandas dataframe for upsert
    table: str
        Name of table that dataframe is to be upserted into
    contraint_name: str
        Name of the contraint used to determine insert or update
    page_size: int
        Number of insert statements per command.

    Returns
    -------
    None

    """
    # Create a list of tuples from the dataframe values
    tuples = [tuple(x)*2 for x in df.to_numpy()]
    # Comma-separated dataframe columns
    cols = ','.join(list(df.columns))
    # SQL query to execute
    insert_stmt  = """
    INSERT INTO {0} ({1}) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
    ON CONFLICT ON CONSTRAINT {2}
    DO UPDATE SET
    """.format(table, cols, constraint_name)
    update_stmt = ",".join(["{0}=%s".format(col) for col in list(df.columns)])
    query = insert_stmt + update_stmt
    cursor = conn.cursor()
    try:
        psycopg2.extras.execute_batch(cursor, query, tuples, page_size)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    logging.info("Upsert to table {0} using constraint {1} complete".format(table, constraint))
    cursor.close()
  1. String interpolation can lead to SQL injection attacks. It’s unlikely that this particular weakness will be exploited for the applications I use this code for but that doesn’t mean I should be sloppy. This is the most useful part of the Pyscopg2 documentation for this problem.