Implementation of a FileChecksumProcessor to convert zip archives
with csv files into parquet files
FileChecksumProcessor that can be used as part of a pipeline to
process files based on the checksum.
Args:
input_location: Input location of the Processor
output_location: Output location of the Processor
lockfile_location: Location of the checksum lockfile
debug: Flag to activate the debug mode
process_count: Amount of processes for parallel execution
batch_size: Size of a batch
csv_seperator: Seperator character for the csv files.
clear: Flag to clear the output location when initialize the Processor
recursive: Flag indicating whether the input location should be
searched for files recursively
Source code in niceml/filechecksumprocessors/zippedcsvtoparqprocessor.py
def__init__(self,input_location:Union[dict,LocationConfig],output_location:Union[dict,LocationConfig],lockfile_location:Union[dict,LocationConfig],lock_file_name:str="lock.yaml",debug:bool=False,process_count:int=8,batch_size:int=16,csv_seperator:str=";",clear:bool=False,recursive:bool=False,):""" FileChecksumProcessor that can be used as part of a pipeline to process files based on the checksum. Args: input_location: Input location of the Processor output_location: Output location of the Processor lockfile_location: Location of the checksum lockfile debug: Flag to activate the debug mode process_count: Amount of processes for parallel execution batch_size: Size of a batch csv_seperator: Seperator character for the csv files. clear: Flag to clear the output location when initialize the Processor recursive: Flag indicating whether the input location should be searched for files recursively """super().__init__(input_location=input_location,output_location=output_location,lockfile_location=lockfile_location,debug=debug,process_count=process_count,batch_size=batch_size,lock_file_name=lock_file_name,)self.recursive=recursiveself.clear=clearself.csv_seperator=csv_seperatorifself.clear:clear_folder(self.output_location)
The generate_batches function is responsible for generating a list of batches,
where each batch is a dictionary with inputs as a key, followed by a list of file paths
Parameters:
input_file_list
(List[str])
–
List[str]: A list of input file names
changed_files_dict
(Dict[str, Dict[str, bool]])
–
Dict[str: Dict[str:bool]]: Dictionary with the information
defgenerate_batches(self,input_file_list:List[str],changed_files_dict:Dict[str,Dict[str,bool]],output_file_list:Optional[List[str]]=None,force:bool=False,)->List[Dict[str,List[str]]]:""" The generate_batches function is responsible for generating a list of batches, where each batch is a dictionary with `inputs` as a key, followed by a list of file paths Args: input_file_list: List[str]: A list of input file names changed_files_dict: Dict[str: Dict[str:bool]]: Dictionary with the information which files have changed output_file_list: List[str]: A optional list of output file names force: bool: Force the generation of batches even if no files have changed Returns: A list of batches, each batch is a dictionary with one key `inputs` and the value is a list of file paths """ifnotforce:input_file_list=[file_nameforfile_name,changedinchanged_files_dict["inputs"].items()ifchanged]batches=[]forbatch_posinrange(0,len(input_file_list),self.batch_size):batches.append({"inputs":input_file_list[batch_pos:batch_pos+self.batch_size]})returnbatches
deflist_files(self)->Tuple[List[str],List[str]]:""" Returns a tuple of two lists: 1. A list of all files in the input location 2. A list of all files in the output location Returns: A tuple of two lists """withopen_location(self.input_location)as(input_fs,input_path):input_files=list_dir(path=input_path,recursive=self.recursive,file_system=input_fs,filter_ext=[".zip"],)input_files=[join_fs_path(input_fs,input_path,input_file)forinput_fileininput_files]withopen_location(self.output_location)as(output_fs,output_path):output_fs.makedirs(output_path,exist_ok=True)output_files=list_dir(path=output_path,recursive=self.recursive,file_system=output_fs,filter_ext=[".parq"],)output_files=[join_fs_path(output_fs,output_path,output_file)foroutput_fileinoutput_files]returninput_files,output_files
defprocess(self,batch:Dict[str,List[str]])->Dict[str,Dict[str,str]]:""" The process function takes a batch of files and converts them from CSV to Parquet. Args: self: Access the class attributes batch: Dict[str: Pass in the batch of files to be processed List[str]]: Pass in the list of files that are to be processed Returns: A dictionary of checksums for each file in `self.output_location` (key = `outputs`) and `self.input_location` (key = `inputs`) """checksums=defaultdict(dict)withopen_location(self.input_location)as(input_file_system,input_root):forzip_fileintqdm(batch["inputs"],desc="Extract zip files of current batch"):withinput_file_system.open(zip_file)asopened_zip_file:checksums["inputs"][zip_file]=md5_from_file(file_path=zip_file,file_system=input_file_system)zf=zipfile.ZipFile(opened_zip_file)csv_files=zf.namelist()forcsv_fileincsv_files:withzf.open(csv_file,mode="r")asopened_csv_file:df=pd.read_csv(opened_csv_file,sep=self.csv_seperator,low_memory=False,)parq_name=basename(splitext(csv_file)[0])+".parq"output_df_location=join_location_w_path(self.output_location,parq_name)withopen_location(output_df_location)as(output_file_system,output_df_path,):write_parquet(df,output_df_path,file_system=output_file_system)checksums["outputs"][output_df_path]=md5_from_file(file_path=output_df_path,file_system=output_file_system)returnchecksums