How to Upsert DataFrames into Postgres safely.
Thomas Dickson
3 minute read
I spend a bit of time moving data between applications and Postgres using psycopg2 and pandas DataFrames. Unsurprisingly this is something that a lot of other people do too. I really liked this blog post which compared the speed of different methods of inserting pandas DataFrames to Postgres databases. I adapted one of the snippets to up-sert a pandas dataframe to a Postgres table given a specific constraint.
Up-serts in Postgres occur when rows are updated if they already exist and are inserted if they do not. Conflicts can occur if two rows occupy identical positions on indexes, in this event it’s possible to handle behaviour using the ON CONFLICT
keywords to handle UPDATE
behaviour. As Pandas does not come with the ability to upsert rows we need to manage upserts ourselves.
The snippet below upserts a DataFrame to a table given a connection and the name of the constraint. Two strings of SQL are created, one to describe the insert statement and the other to describe the update statement which is run if there is a conflict on the specified constraint. Note that although I create the text for the statements using string interpolation 1 the values which are actually inserted in the query are passed in the execute_batch
function.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import logging
import psycopg2
import psycopg2.extras
import pandas as pd
def upsert_dataframe(conn, df, table, constraint_name, page_size=100):
"""
Upserting a pandas dataframe into a Postgres database.
Parameters
----------
conn: psycopg2.connection
Connection to postgres database.
df: pd.DataFrame
Pandas dataframe for upsert
table: str
Name of table that dataframe is to be upserted into
contraint_name: str
Name of the contraint used to determine insert or update
page_size: int
Number of insert statements per command.
Returns
-------
None
"""
# Create a list of tuples from the dataframe values
tuples = [tuple(x)*2 for x in df.to_numpy()]
# Comma-separated dataframe columns
cols = ','.join(list(df.columns))
# SQL query to execute
insert_stmt = """
INSERT INTO {0} ({1}) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON CONFLICT ON CONSTRAINT {2}
DO UPDATE SET
""".format(table, cols, constraint_name)
update_stmt = ",".join(["{0}=%s".format(col) for col in list(df.columns)])
query = insert_stmt + update_stmt
cursor = conn.cursor()
try:
psycopg2.extras.execute_batch(cursor, query, tuples, page_size)
conn.commit()
except (Exception, psycopg2.DatabaseError) as error:
print("Error: %s" % error)
conn.rollback()
cursor.close()
return 1
logging.info("Upsert to table {0} using constraint {1} complete".format(table, constraint))
cursor.close()
-
String interpolation can lead to SQL injection attacks. It’s unlikely that this particular weakness will be exploited for the applications I use this code for but that doesn’t mean I should be sloppy. This is the most useful part of the Pyscopg2 documentation for this problem. ↩