Loading csv files into dataframe
You might think loading csv files into dataframe is as simple as running one line of pd.read_csv(filename)
.
But in reality you will struggle with different encodings. By default pandas
will attempt to load your file using OS default encoding. But if the file encoding is different than the OS default encoding that that simple line of read_csv will fail.
In my project I tried somethings as below to iterate over encodings and load with whichever works.
try:
for enc in ['ISO-8859-1', 'sjis', 'utf-8']:
df = pd.read_csv(filename, encoding=enc)
except UnicodeDecodeError as e:
logging.info(f"Got UnicodeDecodeError : {e}")
pass
Loading records from file into database table
I had a project to load certain fields from csv file into certain database table. The task sounds simple right? All you need is prepare a dataframe that you want to insert into sql table and use to_sql. To prepare this dataframe ready for inserting I had to solve below problems.
- Map file columns to sql table columns. Also ensure data type matches sql table column data type
- Do conversions between key fields defined in the file and key column in sql table. Let's say we are trying to insert records into mortgage_attributes table where key column is
cusip
. The file has a key column calledsecurity_id
. We need to convertsecurity_id
within the file into correspondingcusip
value. This mapping betweensecurity_id
andcusip
could have been stored insecurity_mappings
table
File field to sql table columns mappings
I leveraged json
to map File fields to sql table columns in below manner
[
{
"source_field_name": "FILE_FIELD_ONE",
"target_db_column_name": "sql_column_one",
"type": "numeric"
},
{
"source_field_name": "FILE_FIELD_TWO",
"target_db_column_name": "sql_column_two",
"type": "string"
},
]
As you can see I defined file field to sql table column mapping as a list. I also defined data type of the field as well.
Now when I run my script, I can read this json file and constuct dataframe ready to insert into sql table.
I load json file into dict when using it in the script.
def read_mapping_file(json_filepath: pathlib.Path):
"""
Read json file with a list of mappings defined in a file
:param json_filepath:
:return:
"""
with open(json_filepath, "r") as fh:
return json.load(fh)
In mappings json, you noticed that I specify data type as a text. "numeric" for numeric values like int, float and "string" for text data type. But when loading json, I am enforcing data type to an enum. I thought it is useful to prevent typos in json mapping. Suppose someone used "numerici" instead of "numeric" or "text" instead of "string". Enforcing enum ensures that such issues are detected when running the script
from enum import Enum
class ColumnTypes(Enum):
STRING = "string"
NUMERIC = "numeric"
Then using below function I enforce data types to above Enum
def replace_type_with_column_types_enum(mappings: List[dict]):
"""
This function simply replaces string representation of types to ColumnType enum
For instance numeric is converted to ColumnType("numeric") which is equivalent to ColumnType.NUMERIC
This will ensure that in json config we are using the right "types", i.e. no accidental typos
:param mappings:
:return: file field to table column mappings but "type" as ColumnType enum
"""
new_mappings = []
for mapping in mappings:
mapping["type"] = ColumnTypes(mapping["type"])
new_mappings.append(mapping)
return new_mappings
This way if mapping["type"]
had values not defined in ColumnTypes
enum the script will throw an exception ValueError: 'xyz' is not a valid ColumnTypes
Now we can load our files and prepare it for insertion. The entry script looks somethings like below
import click
import pathlib
def validate_path_exists(ctx, param, value):
file_or_directory_path = value
if not pathlib.Path(file_or_directory_path).exists():
raise click.BadParameter(f"{param} {file_or_directory_path} doesn't exist, please check")
return value
@click.command()
@click.option('--config_directory', default="./configs",
help="Specify the directory where config files for field mappings and table names are located",
callback=validate_path_exists)
@click.option('--mappings_filename', default="field_to_db_column_mapping.json",
help="SM field to db table columns mappings")
@click.argument('source_filepath', callback=validate_path_exists)
def main(config_directory, mappings_filename, source_filepath):
sm_filepath = pathlib.Path(sm_filepath)
mapping_filepath = pathlib.Path(config_directory) / mappings_filename
db_configs_filepath = pathlib.Path(config_directory) / db_config_filename
# read SM file into dataframe and convert CLIENT_ID to str
smfdf = pd.read_csv(sm_filepath)
smfdf["CLIENT_ID"] = smfdf["CLIENT_ID"].apply(str)
# read mappings between SM fields and db table columns
mappings = read_mapping_file(mapping_filepath)
mappings = detect_diff_between_file_fields_and_mapping_source_fields(smfdf, mappings)
mappings = replace_type_with_column_types_enum(mappings)
column_to_types = {mapping["source_field_name"]: mapping["type"] for mapping in mappings}
column_renames = {mapping["source_field_name"]: mapping["target_db_column_name"] for mapping in mappings}
# convert SM field columns to appropriate types such as str, int, float or date
smfdf = convert_columns_to_appropriate_types(smfdf, column_to_types)
# copy smdf to new dataframe and rename columns to table columns
newsmdf = smfdf.rename(columns=column_renames)
logging.info(f"original df : {smfdf.head().to_string()}\nnewsmdf : {newsmdf.head().to_string()}")
Querying sql table
To make my script extendible, I also introduced json file where the user can define sql table name, key column, unique columns as below
{
"tablename": "my_precious_table",
"key_column": "my_key_column",
"unique_columns": [
"uniq_col_one",
"uniq_col_two"
],
"non_nullable_columns": [
{
"name": "my_nonnullable_col",
"type": "numeric",
"default_value": 999
}
],
"additional_columns": [
{
"name": "my_col_extra",
"type": "date"
}
]
}
Next I defined below kind of function which allows me to construct SQL query given a table I want to query and key columns and unique columns I want to query from that table.
def get_query_given_table_and_columns(key_col_values: List[Union[str, int, float]], tablename: str, key_col_name: str,
uniq_col_names: List[str],
additional_col_key_values: dict[str, Union[str, int, float, datetime.datetime]],
db_type: DBType = DBType.DSREAD):
"""
This function constructs a simple SQL query from tablename, uniq_col_names, key_col_name, key_col_values
:param key_col_values:
:param tablename:
:param key_col_name:
:param uniq_col_names:
:param additional_col_key_values:
:param db_type:
:return:
"""
with DataObject(db_type.value, autocommit=True) as dobj:
full_tablename = dobj.get_table(tablename)
markers = dobj.generate_markers(len(key_col_values))
query = f"SELECT {generate_comma_separated_text(uniq_col_names, False)} FROM {full_tablename} WHERE {key_col_name} IN ({markers})"
for col in additional_col_key_values:
query += f" AND {col}=?"
return query, tuple(key_col_values) + tuple(additional_col_key_values.values())
This function makes use of DataObject
that is in-house built module. It return an object that can be used to run sql queris on the database. This object also provides extra utilitis like getting fully qualified table name or genrating markers which is simply a series of question marks used as replacement for actual values. This function is truly a utility because I can construct query for any number of sql tables with it.
And to actually run the query I can use below function in conjunction with above function
def get_adhoc_table_records(key_col_values: List[Union[str, int, float]], tablename: str, key_col_name: str,
uniq_col_names: List[str],
additional_col_key_values: dict[str, Union[str, int, float, datetime.datetime]],
chunk_size: int = 500, db_type: DBType = DBType.DSREAD):
"""
Query records from a "tablename" using "key_col_values". This function will run a query like below by iterating over chunks of "key_col_values"
SELECT * FROM tablename WHERE key_col_name in (chunk of key_col_values)
:param key_col_values:
:param tablename:
:param key_col_name:
:param uniq_col_names:
:param additional_col_key_values:
:param chunk_size:
:param db_type:
:return:
"""
return get_db_records_in_chunks(key_col_values, get_query_given_table_and_columns, chunk_size,
tablename, key_col_name,
uniq_col_names, additional_col_key_values, db_type=db_type)