Skip to content

zippedcsvtoparqprocessor

zippedcsvtoparqprocessor

Module for implementation of ZippedCsvToParqProcessor

Classes

ZippedCsvToParqProcessor

ZippedCsvToParqProcessor(
    input_location,
    output_location,
    lockfile_location,
    lock_file_name="lock.yaml",
    debug=False,
    process_count=8,
    batch_size=16,
    csv_seperator=";",
    clear=False,
    recursive=False,
)

Bases: FileChecksumProcessor

Implementation of a FileChecksumProcessor to convert zip archives with csv files into parquet files

FileChecksumProcessor that can be used as part of a pipeline to process files based on the checksum. Args: input_location: Input location of the Processor output_location: Output location of the Processor lockfile_location: Location of the checksum lockfile debug: Flag to activate the debug mode process_count: Amount of processes for parallel execution batch_size: Size of a batch csv_seperator: Seperator character for the csv files. clear: Flag to clear the output location when initialize the Processor recursive: Flag indicating whether the input location should be searched for files recursively

Source code in niceml/filechecksumprocessors/zippedcsvtoparqprocessor.py
def __init__(
    self,
    input_location: Union[dict, LocationConfig],
    output_location: Union[dict, LocationConfig],
    lockfile_location: Union[dict, LocationConfig],
    lock_file_name: str = "lock.yaml",
    debug: bool = False,
    process_count: int = 8,
    batch_size: int = 16,
    csv_seperator: str = ";",
    clear: bool = False,
    recursive: bool = False,
):
    """
    FileChecksumProcessor that can be used as part of a pipeline to
    process files based on the checksum.
    Args:
        input_location: Input location of the Processor
        output_location: Output location of the Processor
        lockfile_location: Location of the checksum lockfile
        debug: Flag to activate the debug mode
        process_count: Amount of processes for parallel execution
        batch_size: Size of a batch
        csv_seperator: Seperator character for the csv files.
        clear: Flag to clear the output location when initialize the Processor
        recursive: Flag indicating whether the input location should be
                    searched for files recursively
    """
    super().__init__(
        input_location=input_location,
        output_location=output_location,
        lockfile_location=lockfile_location,
        debug=debug,
        process_count=process_count,
        batch_size=batch_size,
        lock_file_name=lock_file_name,
    )
    self.recursive = recursive
    self.clear = clear
    self.csv_seperator = csv_seperator

    if self.clear:
        clear_folder(self.output_location)
Functions
generate_batches
generate_batches(
    input_file_list,
    changed_files_dict,
    output_file_list=None,
    force=False,
)

The generate_batches function is responsible for generating a list of batches, where each batch is a dictionary with inputs as a key, followed by a list of file paths

Parameters:

  • input_file_list (List[str]) –

    List[str]: A list of input file names

  • changed_files_dict (Dict[str, Dict[str, bool]]) –

    Dict[str: Dict[str:bool]]: Dictionary with the information

  • output_file_list (Optional[List[str]], default: None ) –

    List[str]: A optional list of output file names

  • force (bool, default: False ) –

    bool: Force the generation of batches even if no files have changed

Returns:

  • List[Dict[str, List[str]]]

    A list of batches, each batch is a dictionary with one key inputs

  • List[Dict[str, List[str]]]

    and the value is a list of file paths

Source code in niceml/filechecksumprocessors/zippedcsvtoparqprocessor.py
def generate_batches(
    self,
    input_file_list: List[str],
    changed_files_dict: Dict[str, Dict[str, bool]],
    output_file_list: Optional[List[str]] = None,
    force: bool = False,
) -> List[Dict[str, List[str]]]:
    """
    The generate_batches function is responsible for generating a list of batches,
    where each batch is a dictionary with `inputs` as a key, followed by a list of file paths

    Args:
        input_file_list: List[str]: A list of input file names
        changed_files_dict: Dict[str: Dict[str:bool]]: Dictionary with the information
        which files have changed
        output_file_list: List[str]: A optional list of output file names
        force: bool: Force the generation of batches even if no files have changed

    Returns:
        A list of batches, each batch is a dictionary with one key `inputs`
        and the value is a list of file paths
    """
    if not force:
        input_file_list = [
            file_name
            for file_name, changed in changed_files_dict["inputs"].items()
            if changed
        ]
    batches = []
    for batch_pos in range(0, len(input_file_list), self.batch_size):
        batches.append(
            {"inputs": input_file_list[batch_pos : batch_pos + self.batch_size]}
        )
    return batches
list_files
list_files()
Returns a tuple of two lists
  1. A list of all files in the input location
  2. A list of all files in the output location

Returns:

  • Tuple[List[str], List[str]]

    A tuple of two lists

Source code in niceml/filechecksumprocessors/zippedcsvtoparqprocessor.py
def list_files(self) -> Tuple[List[str], List[str]]:
    """
    Returns a tuple of two lists:
        1. A list of all files in the input location
        2. A list of all files in the output location

    Returns:
        A tuple of two lists
    """
    with open_location(self.input_location) as (input_fs, input_path):
        input_files = list_dir(
            path=input_path,
            recursive=self.recursive,
            file_system=input_fs,
            filter_ext=[".zip"],
        )
        input_files = [
            join_fs_path(input_fs, input_path, input_file)
            for input_file in input_files
        ]
    with open_location(self.output_location) as (output_fs, output_path):
        output_fs.makedirs(output_path, exist_ok=True)
        output_files = list_dir(
            path=output_path,
            recursive=self.recursive,
            file_system=output_fs,
            filter_ext=[".parq"],
        )
        output_files = [
            join_fs_path(output_fs, output_path, output_file)
            for output_file in output_files
        ]

    return input_files, output_files
process
process(batch)

The process function takes a batch of files and converts them from CSV to Parquet.

Parameters:

  • self

    Access the class attributes

  • batch (Dict[str, List[str]]) –

    Dict[str: Pass in the batch of files to be processed

  • List[str]]

    Pass in the list of files that are to be processed

Returns:

  • Dict[str, Dict[str, str]]

    A dictionary of checksums for each file in self.output_location

  • Dict[str, Dict[str, str]]

    (key = outputs) and self.input_location (key = inputs)

Source code in niceml/filechecksumprocessors/zippedcsvtoparqprocessor.py
def process(self, batch: Dict[str, List[str]]) -> Dict[str, Dict[str, str]]:
    """
    The process function takes a batch of files and converts them from CSV to Parquet.

    Args:
        self: Access the class attributes
        batch: Dict[str: Pass in the batch of files to be processed
        List[str]]: Pass in the list of files that are to be processed

    Returns:
        A dictionary of checksums for each file in `self.output_location`
        (key = `outputs`) and `self.input_location` (key = `inputs`)
    """
    checksums = defaultdict(dict)

    with open_location(self.input_location) as (input_file_system, input_root):
        for zip_file in tqdm(
            batch["inputs"], desc="Extract zip files of current batch"
        ):
            with input_file_system.open(zip_file) as opened_zip_file:
                checksums["inputs"][zip_file] = md5_from_file(
                    file_path=zip_file, file_system=input_file_system
                )

                zf = zipfile.ZipFile(opened_zip_file)
                csv_files = zf.namelist()
                for csv_file in csv_files:
                    with zf.open(csv_file, mode="r") as opened_csv_file:
                        df = pd.read_csv(
                            opened_csv_file,
                            sep=self.csv_seperator,
                            low_memory=False,
                        )
                    parq_name = basename(splitext(csv_file)[0]) + ".parq"
                    output_df_location = join_location_w_path(
                        self.output_location, parq_name
                    )
                    with open_location(output_df_location) as (
                        output_file_system,
                        output_df_path,
                    ):
                        write_parquet(
                            df, output_df_path, file_system=output_file_system
                        )
                        checksums["outputs"][output_df_path] = md5_from_file(
                            file_path=output_df_path, file_system=output_file_system
                        )
    return checksums

Functions