ZippedCsvToParqProcessor(
input_location,
output_location,
lockfile_location,
lock_file_name="lock.yaml",
debug=False,
process_count=8,
batch_size=16,
csv_seperator=";",
clear=False,
recursive=False,
)
Bases: FileChecksumProcessor
Implementation of a FileChecksumProcessor to convert zip archives
with csv files into parquet files
FileChecksumProcessor that can be used as part of a pipeline to
process files based on the checksum.
Args:
input_location: Input location of the Processor
output_location: Output location of the Processor
lockfile_location: Location of the checksum lockfile
debug: Flag to activate the debug mode
process_count: Amount of processes for parallel execution
batch_size: Size of a batch
csv_seperator: Seperator character for the csv files.
clear: Flag to clear the output location when initialize the Processor
recursive: Flag indicating whether the input location should be
searched for files recursively
Source code in niceml/filechecksumprocessors/zippedcsvtoparqprocessor.py
| def __init__(
self,
input_location: Union[dict, LocationConfig],
output_location: Union[dict, LocationConfig],
lockfile_location: Union[dict, LocationConfig],
lock_file_name: str = "lock.yaml",
debug: bool = False,
process_count: int = 8,
batch_size: int = 16,
csv_seperator: str = ";",
clear: bool = False,
recursive: bool = False,
):
"""
FileChecksumProcessor that can be used as part of a pipeline to
process files based on the checksum.
Args:
input_location: Input location of the Processor
output_location: Output location of the Processor
lockfile_location: Location of the checksum lockfile
debug: Flag to activate the debug mode
process_count: Amount of processes for parallel execution
batch_size: Size of a batch
csv_seperator: Seperator character for the csv files.
clear: Flag to clear the output location when initialize the Processor
recursive: Flag indicating whether the input location should be
searched for files recursively
"""
super().__init__(
input_location=input_location,
output_location=output_location,
lockfile_location=lockfile_location,
debug=debug,
process_count=process_count,
batch_size=batch_size,
lock_file_name=lock_file_name,
)
self.recursive = recursive
self.clear = clear
self.csv_seperator = csv_seperator
if self.clear:
clear_folder(self.output_location)
|
Functions
generate_batches
generate_batches(
input_file_list,
changed_files_dict,
output_file_list=None,
force=False,
)
The generate_batches function is responsible for generating a list of batches,
where each batch is a dictionary with inputs
as a key, followed by a list of file paths
Parameters:
-
input_file_list
(List[str]
)
–
List[str]: A list of input file names
-
changed_files_dict
(Dict[str, Dict[str, bool]]
)
–
Dict[str: Dict[str:bool]]: Dictionary with the information
-
output_file_list
(Optional[List[str]]
, default:
None
)
–
List[str]: A optional list of output file names
-
force
(bool
, default:
False
)
–
bool: Force the generation of batches even if no files have changed
Returns:
-
List[Dict[str, List[str]]]
–
A list of batches, each batch is a dictionary with one key inputs
-
List[Dict[str, List[str]]]
–
and the value is a list of file paths
Source code in niceml/filechecksumprocessors/zippedcsvtoparqprocessor.py
| def generate_batches(
self,
input_file_list: List[str],
changed_files_dict: Dict[str, Dict[str, bool]],
output_file_list: Optional[List[str]] = None,
force: bool = False,
) -> List[Dict[str, List[str]]]:
"""
The generate_batches function is responsible for generating a list of batches,
where each batch is a dictionary with `inputs` as a key, followed by a list of file paths
Args:
input_file_list: List[str]: A list of input file names
changed_files_dict: Dict[str: Dict[str:bool]]: Dictionary with the information
which files have changed
output_file_list: List[str]: A optional list of output file names
force: bool: Force the generation of batches even if no files have changed
Returns:
A list of batches, each batch is a dictionary with one key `inputs`
and the value is a list of file paths
"""
if not force:
input_file_list = [
file_name
for file_name, changed in changed_files_dict["inputs"].items()
if changed
]
batches = []
for batch_pos in range(0, len(input_file_list), self.batch_size):
batches.append(
{"inputs": input_file_list[batch_pos : batch_pos + self.batch_size]}
)
return batches
|
list_files
Returns a tuple of two lists
- A list of all files in the input location
- A list of all files in the output location
Returns:
-
Tuple[List[str], List[str]]
–
Source code in niceml/filechecksumprocessors/zippedcsvtoparqprocessor.py
| def list_files(self) -> Tuple[List[str], List[str]]:
"""
Returns a tuple of two lists:
1. A list of all files in the input location
2. A list of all files in the output location
Returns:
A tuple of two lists
"""
with open_location(self.input_location) as (input_fs, input_path):
input_files = list_dir(
path=input_path,
recursive=self.recursive,
file_system=input_fs,
filter_ext=[".zip"],
)
input_files = [
join_fs_path(input_fs, input_path, input_file)
for input_file in input_files
]
with open_location(self.output_location) as (output_fs, output_path):
output_fs.makedirs(output_path, exist_ok=True)
output_files = list_dir(
path=output_path,
recursive=self.recursive,
file_system=output_fs,
filter_ext=[".parq"],
)
output_files = [
join_fs_path(output_fs, output_path, output_file)
for output_file in output_files
]
return input_files, output_files
|
process
The process function takes a batch of files and converts them from CSV to Parquet.
Parameters:
-
self
–
Access the class attributes
-
batch
(Dict[str, List[str]]
)
–
Dict[str: Pass in the batch of files to be processed
-
List[str]]
–
Pass in the list of files that are to be processed
Returns:
-
Dict[str, Dict[str, str]]
–
A dictionary of checksums for each file in self.output_location
-
Dict[str, Dict[str, str]]
–
(key = outputs
) and self.input_location
(key = inputs
)
Source code in niceml/filechecksumprocessors/zippedcsvtoparqprocessor.py
| def process(self, batch: Dict[str, List[str]]) -> Dict[str, Dict[str, str]]:
"""
The process function takes a batch of files and converts them from CSV to Parquet.
Args:
self: Access the class attributes
batch: Dict[str: Pass in the batch of files to be processed
List[str]]: Pass in the list of files that are to be processed
Returns:
A dictionary of checksums for each file in `self.output_location`
(key = `outputs`) and `self.input_location` (key = `inputs`)
"""
checksums = defaultdict(dict)
with open_location(self.input_location) as (input_file_system, input_root):
for zip_file in tqdm(
batch["inputs"], desc="Extract zip files of current batch"
):
with input_file_system.open(zip_file) as opened_zip_file:
checksums["inputs"][zip_file] = md5_from_file(
file_path=zip_file, file_system=input_file_system
)
zf = zipfile.ZipFile(opened_zip_file)
csv_files = zf.namelist()
for csv_file in csv_files:
with zf.open(csv_file, mode="r") as opened_csv_file:
df = pd.read_csv(
opened_csv_file,
sep=self.csv_seperator,
low_memory=False,
)
parq_name = basename(splitext(csv_file)[0]) + ".parq"
output_df_location = join_location_w_path(
self.output_location, parq_name
)
with open_location(output_df_location) as (
output_file_system,
output_df_path,
):
write_parquet(
df, output_df_path, file_system=output_file_system
)
checksums["outputs"][output_df_path] = md5_from_file(
file_path=output_df_path, file_system=output_file_system
)
return checksums
|