My environment requires a lot of database work in SQL Server to access data. The data I work with (i.e., text) isn’t stored particularly efficiently so I will sometimes need to pull down data, perform some manipulations, re-upload, do some joins, and download again. Sure, there are a number of shiny ‘solutions’ that would make for superior infrastructure in daily data science efforts, but no one’s ever asked me (nor, necessarily, should they).
That said, in all my years of work within this environment, surely I should have been aware of how to efficiently insert data using pandas
. That said, a typical approach might look like the following (conceptually):
import pandas as pd
import sqlalchemy as sa
eng = sa.create_engine('mssql+pyodbc://...')
query = '''select ... from ...'''
# get the data
for i, df in enumerate(pd.read_sql_query(query, eng, iteration=True, chunksize=100_000)):
# do cleaning, other manipulations
df['flag'] = df.apply(some_function)
df = df[df.flag == 1]
# upload result ids
if i == 0:
df.to_sql(name, eng, index=False, if_exists='fail')
else:
df.to_sql(name, eng, index=False, if_exists='append')
The inserts (df.to_sql
), however, will take a while, even if I only upload a single id using something like df[['id']]
. This is generally okay for my work — run it overnight…or over the weekend, and it’s all ready when I come back. But there is a better way.
The problem with this approach is that df.to_sql
will, by default, do a single INSERT
rather than performing a batch/bulk insert. The df.to_sql
function has a couple parameters which allow us to optimize the insertions, and we can even add improvements on the SQL Alchemy side.
pandas
to_sql
parameters
The to_sql
method provides two paramters which we can make use of:
method='multi'
:None
: by default which results in ‘standard SQLINSERT
clause (one per row)’multi
: Pass multiple values in a singleINSERT
clause
- chunksize=2000:
- By default, insert entire table
- Customize the number of rows (here, I’ve set it to 2000)
Both of these parameters are important. SQL Server limits the number of inserted rows to 1000 and the number of parameters to about 2100. So, setting just df.to_sql(..., method='multi')
will hit against these limitation since we’re operating in 100,000 row chunks. If we’re just inserting a single column (and dropping the index), we might try df.to_sql(..., method='multi', chunksize=1000)
. We can’t set chunksize to >1000 since SQL Server only allows insertions of 1000 rows at a time.
In practice, we might want to optimize the chunksize. Let’s set an limit of 2000 parameters.
upload_index = False
max_chunksize = 2000
n_cols = df.shape[1]
if upload_index:
# if including index
chunksize = int(2000 / (n_cols + 1))
else:
# if excluding index
chunksize = int(2000 / n_cols)
if chunksize > 1000: # max of 1000 rows
chunksize = 1000
df.to_sql(name, eng, index=upload_index, chunksize=chunksize, method='multi')
sqlalchemy.create_engine
parameters
We could alternatively make improvements on the SQL Alchemy side by adding the fast_executemany=True
parameter to create_engine
. When using mssql+pyodbc
, this problem results in optimal performance. Add parameters both to to_sql
and create_engine
will result in inferior performance (but still better than using neither).
eng = sa.create_engine('mssql+pyodbc://...', fast_executemany=True)
In this case, we’re asking sqlalchemy
to batch together individual insertion calls. However, when calling df.to_sql
, we will likely need to specify dtype
parameters as sqlalchemy
types:
df.to_sql(name, eng, index=False, dtype={'id': sa.types.String(20)})
While this resulted in a 3x speedup over the iteration/chunksize appraoch, we’ll need to explicitly define the column types. We could create a function to perform the conversion, maybe something like:
def pd2sa_types(df):
mapping = {}
for col, dtype in df.dtypes.items():
if dtype == 'object':
max_length = int(df[col].str.len().max()) + 5 if not df[col].isnull().all() else 255
mapping[column] = sa.types.VARCHAR(max_length)
elif dtype == 'float64':
mapping[column] = sa.types.Float()
elif dtype == 'int64':
mapping[column] = sa.types.Integer()
elif dtype == 'bool':
mapping[column] = sa.types.Boolean()
elif dtype == 'datetime64[ns]':
mapping[column] = sa.types.DateTime()
elif dtype == 'timedelta64[ns]':
# sqlalchemy doesn't handle timedelta, so store as stinrg
mapping[column] = sa.types.VARCHAR(50)
else:
mapping[column] = sa.types.VARCHAR(255)
return mapping
Final Result
# 3 seconds/100,000
import pandas as pd
import sqlalchemy as sa
eng = sa.create_engine('mssql+pyodbc://...', fast_executemany=True)
query = '''select ... from ...'''
# get the data
dtypes = { # specify if known
'id': sa.types.String(20)
}
for i, df in enumerate(pd.read_sql_query(query, eng, iteration=True, chunksize=100_000)):
# do cleaning, other manipulations
df['flag'] = df.apply(some_function)
df = df[df.flag == 1]
if not dtypes:
dtypes = pd2sa_types(df)
# upload result ids
if i == 0:
df.to_sql(name, eng, index=False, if_exists='fail')
else:
df.to_sql(name, eng, index=False, if_exists='append')
Alternatively, we’ll still get significant improvements by optimizing the pandas
‘multi’ and chunksize
(and this may be more efficient in other scenarios):
# 9 seconds / 100,000
import pandas as pd
import sqlalchemy as sa
eng = sa.create_engine('mssql+pyodbc://...', fast_executemany=True)
query = '''select ... from ...'''
# get the data
insert_chunksize= None
max_chunksize = 2000
max_rows = 1000
for i, df in enumerate(pd.read_sql_query(query, eng, iteration=True, chunksize=100_000)):
# do cleaning, other manipulations
df['flag'] = df.apply(some_function)
df = df[df.flag == 1]
# upload result ids
if insert_chunks is None:
n_cols = df.shape[1]
insert_chunksize= int(max_chunksize / n_cols)
if insert_chunksize > max_rows:
insert_chunksize = max_rows
if i == 0:
df.to_sql(name, eng, index=False, if_exists='fail', method='multi', chunksize=insert_chunksize)
else:
df.to_sql(name, eng, index=False, if_exists='append', method='multi', chunksize=insert_chunksize)