0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ETL (Extract Transform Load) tidbits

Posted at

Loading csv files into dataframe

You might think loading csv files into dataframe is as simple as running one line of pd.read_csv(filename).
But in reality you will struggle with different encodings. By default pandas will attempt to load your file using OS default encoding. But if the file encoding is different than the OS default encoding that that simple line of read_csv will fail.

In my project I tried somethings as below to iterate over encodings and load with whichever works.

try:
    for enc in ['ISO-8859-1', 'sjis', 'utf-8']:
        df = pd.read_csv(filename, encoding=enc)
except UnicodeDecodeError as e:
    logging.info(f"Got UnicodeDecodeError : {e}")
    pass

Loading records from file into database table

I had a project to load certain fields from csv file into certain database table. The task sounds simple right? All you need is prepare a dataframe that you want to insert into sql table and use to_sql. To prepare this dataframe ready for inserting I had to solve below problems.

  • Map file columns to sql table columns. Also ensure data type matches sql table column data type
  • Do conversions between key fields defined in the file and key column in sql table. Let's say we are trying to insert records into mortgage_attributes table where key column is cusip. The file has a key column called security_id. We need to convert security_id within the file into corresponding cusip value. This mapping between security_id and cusip could have been stored in security_mappings table

File field to sql table columns mappings

I leveraged json to map File fields to sql table columns in below manner

[
{
    "source_field_name": "FILE_FIELD_ONE",
    "target_db_column_name": "sql_column_one",
    "type": "numeric"
  },
   {
    "source_field_name": "FILE_FIELD_TWO",
    "target_db_column_name": "sql_column_two",
    "type": "string"
  },
]

As you can see I defined file field to sql table column mapping as a list. I also defined data type of the field as well.
Now when I run my script, I can read this json file and constuct dataframe ready to insert into sql table.

I load json file into dict when using it in the script.

def read_mapping_file(json_filepath: pathlib.Path):
    """
    Read json file with a list of mappings defined in a file
    :param json_filepath:
    :return:
    """
    with open(json_filepath, "r") as fh:
        return json.load(fh)

In mappings json, you noticed that I specify data type as a text. "numeric" for numeric values like int, float and "string" for text data type. But when loading json, I am enforcing data type to an enum. I thought it is useful to prevent typos in json mapping. Suppose someone used "numerici" instead of "numeric" or "text" instead of "string". Enforcing enum ensures that such issues are detected when running the script

from enum import Enum

class ColumnTypes(Enum):
    STRING = "string"
    NUMERIC = "numeric"

Then using below function I enforce data types to above Enum


def replace_type_with_column_types_enum(mappings: List[dict]):
    """
    This function simply replaces string representation of types to ColumnType enum
    For instance numeric is converted to ColumnType("numeric") which is equivalent to ColumnType.NUMERIC
    This will ensure that in json config we are using the right "types", i.e. no accidental typos
    :param mappings:
    :return: file field to table column mappings but "type" as ColumnType enum
    """
    new_mappings = []
    for mapping in mappings:
        mapping["type"] = ColumnTypes(mapping["type"])
        new_mappings.append(mapping)
    return new_mappings

This way if mapping["type"] had values not defined in ColumnTypes enum the script will throw an exception ValueError: 'xyz' is not a valid ColumnTypes

Now we can load our files and prepare it for insertion. The entry script looks somethings like below

import click
import pathlib

def validate_path_exists(ctx, param, value):
    file_or_directory_path = value
    if not pathlib.Path(file_or_directory_path).exists():
        raise click.BadParameter(f"{param} {file_or_directory_path} doesn't exist, please check")
    return value


@click.command()
@click.option('--config_directory', default="./configs",
              help="Specify the directory where config files for field mappings and table names are located",
              callback=validate_path_exists)
@click.option('--mappings_filename', default="field_to_db_column_mapping.json",
              help="SM field to db table columns mappings")
@click.argument('source_filepath', callback=validate_path_exists)
def main(config_directory, mappings_filename, source_filepath):
        sm_filepath = pathlib.Path(sm_filepath)
    mapping_filepath = pathlib.Path(config_directory) / mappings_filename
    db_configs_filepath = pathlib.Path(config_directory) / db_config_filename

    # read SM file into dataframe and convert CLIENT_ID to str
    smfdf = pd.read_csv(sm_filepath)
    smfdf["CLIENT_ID"] = smfdf["CLIENT_ID"].apply(str)

    # read mappings between SM fields and db table columns
    mappings = read_mapping_file(mapping_filepath)
    mappings = detect_diff_between_file_fields_and_mapping_source_fields(smfdf, mappings)
    mappings = replace_type_with_column_types_enum(mappings)
    column_to_types = {mapping["source_field_name"]: mapping["type"] for mapping in mappings}
    column_renames = {mapping["source_field_name"]: mapping["target_db_column_name"] for mapping in mappings}

    # convert SM field columns to appropriate types such as str, int, float or date
    smfdf = convert_columns_to_appropriate_types(smfdf, column_to_types)

    # copy smdf to new dataframe and rename columns to table columns
    newsmdf = smfdf.rename(columns=column_renames)
    logging.info(f"original df : {smfdf.head().to_string()}\nnewsmdf : {newsmdf.head().to_string()}")

Querying sql table

To make my script extendible, I also introduced json file where the user can define sql table name, key column, unique columns as below

{
  "tablename": "my_precious_table",
  "key_column": "my_key_column",
  "unique_columns": [
    "uniq_col_one",
    "uniq_col_two"
  ],
  "non_nullable_columns": [
    {
      "name": "my_nonnullable_col",
      "type": "numeric",
      "default_value": 999
    }
  ],
  "additional_columns": [
    {
      "name": "my_col_extra",
      "type": "date"
    }
  ]
}

Next I defined below kind of function which allows me to construct SQL query given a table I want to query and key columns and unique columns I want to query from that table.

def get_query_given_table_and_columns(key_col_values: List[Union[str, int, float]], tablename: str, key_col_name: str,
                                      uniq_col_names: List[str],
                                      additional_col_key_values: dict[str, Union[str, int, float, datetime.datetime]],
                                      db_type: DBType = DBType.DSREAD):
    """
    This function constructs a simple SQL query from tablename, uniq_col_names, key_col_name, key_col_values
    :param key_col_values:
    :param tablename:
    :param key_col_name:
    :param uniq_col_names:
    :param additional_col_key_values:
    :param db_type:
    :return:
    """
    with DataObject(db_type.value, autocommit=True) as dobj:
        full_tablename = dobj.get_table(tablename)
        markers = dobj.generate_markers(len(key_col_values))
        query = f"SELECT {generate_comma_separated_text(uniq_col_names, False)} FROM {full_tablename} WHERE {key_col_name} IN ({markers})"
        for col in additional_col_key_values:
            query += f" AND {col}=?"
        return query, tuple(key_col_values) + tuple(additional_col_key_values.values())

This function makes use of DataObject that is in-house built module. It return an object that can be used to run sql queris on the database. This object also provides extra utilitis like getting fully qualified table name or genrating markers which is simply a series of question marks used as replacement for actual values. This function is truly a utility because I can construct query for any number of sql tables with it.

And to actually run the query I can use below function in conjunction with above function

def get_adhoc_table_records(key_col_values: List[Union[str, int, float]], tablename: str, key_col_name: str,
                            uniq_col_names: List[str],
                            additional_col_key_values: dict[str, Union[str, int, float, datetime.datetime]],
                            chunk_size: int = 500, db_type: DBType = DBType.DSREAD):
    """
    Query records from a "tablename" using "key_col_values". This function will run a query like below by iterating over chunks of "key_col_values"
    SELECT * FROM tablename WHERE key_col_name in (chunk of key_col_values)
    :param key_col_values:
    :param tablename:
    :param key_col_name:
    :param uniq_col_names:
    :param additional_col_key_values:
    :param chunk_size:
    :param db_type:
    :return:
    """
    return get_db_records_in_chunks(key_col_values, get_query_given_table_and_columns, chunk_size,
                                    tablename, key_col_name,
                                    uniq_col_names, additional_col_key_values, db_type=db_type)
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?