Skip to content

dfnormalization

dfnormalization

Module for dataframe normalization op

Classes

Functions

df_normalization

df_normalization(context, input_location)

The df_normalization function takes in a dataframe and normalizes the features specified in scalar_feature_keys, categorical_feature_keys and binary_feature_keys. The parameters for the feature keys can be a function that returns the feature keys as a list or a list of feature keys. The function returns a normalized parquet file with all columns normalized specified in feature_keys, as well as an output yaml file containing information about how each feature was normalized. The input_parq_location is where the input parquet files are located, while output_parq_location is where you want to save your new dataframes and norm info yaml.

Parameters:

  • context (OpExecutionContext) –

    OpExecutionContext: Get the op_config

  • input_location (dict) –

    dict: Specify the location of the input data

Returns:

  • dict

    The output_parq_location, which is the location of the normalized parquet files

  • dict

    and norm info

Source code in niceml/dagster/ops/dfnormalization.py
@op(
    config_schema={
        "scalar_feature_keys": Field(
            Any,
            description="Column names to be normalized with scalar values (list or function)",
            default_value=[],
        ),
        "binary_feature_keys": Field(
            Any,
            description="Column names to be normalized with binary values (list or function)",
            default_value=[],
        ),
        "categorical_feature_keys": Field(
            Any,
            description="Column names to be normalized with categorical values (list or function)",
            default_value=[],
        ),
        "output_parq_location": Field(
            dict, description="Target location for the normalized parq files"
        ),
        "output_norm_feature_info_file_name": Field(
            str,
            description="File name for the file containing the normalization "
            "information of the features ",
            default_value="normalization_info.yaml",
        ),
        "recursive": Field(bool, description="", default_value=False),
    }
)
def df_normalization(context: OpExecutionContext, input_location: dict) -> dict:
    """
    The df_normalization function takes in a dataframe and normalizes the features
    specified in `scalar_feature_keys`, `categorical_feature_keys` and `binary_feature_keys`.
    The parameters for the feature keys can be a function that returns the feature keys as
    a list or a list of feature keys. The function returns a normalized parquet file with
    all columns normalized specified in feature_keys, as well as an output yaml file
    containing information about how each feature was normalized. The input_parq_location
    is where the input parquet files are located, while output_parq_location is where you
    want to save your new dataframes and norm info yaml.

    Args:
        context: OpExecutionContext: Get the op_config
        input_location: dict: Specify the location of the input data

    Returns:
        The output_parq_location, which is the location of the normalized parquet files
        and norm info
    """
    op_config = json.loads(json.dumps(context.op_config))
    instantiated_op_config = instantiate(op_config, _convert_=ConvertMode.ALL)
    output_parq_location: dict = instantiated_op_config["output_parq_location"]
    scalar_feature_keys: List[str] = instantiated_op_config["scalar_feature_keys"]

    binary_feature_keys: List[str] = instantiated_op_config["binary_feature_keys"]

    categorical_feature_keys: List[str] = instantiated_op_config[
        "categorical_feature_keys"
    ]
    output_norm_feature_info_file_name: str = instantiated_op_config[
        "output_norm_feature_info_file_name"
    ]

    with open_location(input_location) as (input_fs, input_root):
        input_files = list_dir(
            join_fs_path(input_fs, input_root),
            file_system=input_fs,
            recursive=instantiated_op_config["recursive"],
        )
        loaded_data_list: List[pd.DataFrame] = []
        for input_file in input_files:
            cur_df = read_parquet(
                file_system=input_fs,
                filepath=join_fs_path(input_fs, input_root, input_file),
            )
            cur_df["orig_file_name"] = [input_file for _ in range(len(cur_df))]
            loaded_data_list.append(cur_df)

        data = pd.concat(loaded_data_list)

        info_list: List[NormalizationInfo] = []
        for scalar_feature in tqdm(
            scalar_feature_keys, desc="Normalize scalar features"
        ):
            data, scalar_norm_info = normalize_scalar_column(data, scalar_feature)
            info_list.append(scalar_norm_info)
        for categorical_feature in tqdm(
            categorical_feature_keys, desc="Normalize categorical features"
        ):
            data, categorical_norm_info = normalize_categorical_column(
                data, categorical_feature
            )
            info_list.append(categorical_norm_info)
        for binary_feature in tqdm(
            binary_feature_keys, desc="Normalize binary features"
        ):
            data, binary_norm_info = normalize_binary_column(data, binary_feature)
            info_list.append(binary_norm_info)

        with open_location(output_parq_location) as (output_fs, output_root):
            info_dict_list: List[dict] = [asdict(info) for info in info_list]

            write_yaml(
                data={"norm_infos": info_dict_list},
                file_system=output_fs,
                filepath=join_fs_path(
                    output_fs, output_root, output_norm_feature_info_file_name
                ),
            )
            for file in input_files:
                write_parquet(
                    dataframe=data[data["orig_file_name"] == file].drop(
                        columns="orig_file_name"
                    ),
                    filepath=join_fs_path(output_fs, output_root, file),
                    file_system=output_fs,
                )

    return output_parq_location