Let’s suppose we have a set of 100,000 files placed in 100,000 paths. We need to know the size of each and then make a list of the ones larger than n megabytes with full paths while not spending ages on it.
Simple methods like bash’s find and grep are too slow, so in this article we will talk about how we can use python multiprocessing library for our files. Python program can be also used for example with check md5sum of file, access rules, etc.
Program logic
We will program a number of threads, the maximum size of a file, and path that we need to check. As a result, we get a list of files with full paths which are be bigger than the maximum size. The program will search all sub-paths recursive in parallel and we will check the size of each founded file in the path.
Used libraries
argparse – for parsing arguments multiprocessing – to make parallel processes os – to work with files from pathlib we use Path part – to make our path a full path.
Coding
Create a main function of the program
def main():
if __name__ == "__main__":
main()
Parsing arguments
In order to work, our program needs to know certain things: * the path where we lock the files and the subdirs with filenames * the maximum file size, the programme will show all files * threads, how many we will use (good thoad is used less than CPU cores) Let’s make parsing arguments with ‘argparse’ library Add to our main()
:
# create parser variable
parser = argparse.ArgumentParser(description="""This is a program for parallel searching files bigger than the maximum size. The program uses threads to quickly find dirs and subdirs in the path (EXCEPT symlinks)""")
# agrument path - for path, it is a new type where we use pathlib to create a full correct path.
parser.add_argument('--path', help='path, where we find subdirs and files for check',
required=True, type=lambda p: Path(p).absolute())
# max size of file in Mb
parser.add_argument('--max_size', help='maximum file size for searching in Mb', required=True, type=int)
# number of threads with default 2
parser.add_argument('--threads', help='number of threads for use, default 2', default=2, type=int)
code type=lambda p: Path(p).absolute()
– will make absolute path from path given to our program.
Creating query, dictionary and list for threads
Because we can’t give arguments to our threads using the basic way (except for the elements from one list), and the thread can’t return anything to main(), we need to give it for example, path, size of file and other variables. Multiprocessing library provides a method for this – we can create a dictionary or a list of threads. multiple threads can obtain variables or append something to the list. Let’s create a few for our program.
Dict for environment variables
To add information to the thread (e.g. size of path) we will create a special dictionary which can be read and written by all threads. to block if name == "main":
we add something like this: envi = Manager().dict()
then add to the dictionary into main some args:
envi.update([
('path', str(p.path)),
('max_size', str(p.max_size))
])
We created this special dictionary in this block because it needs to be globally variable.
List for store file paths with size bigger than max max_size
Our parallel workers need to put paths of oversized files, let’s create a list: to block if name == "main":
we add something like this: oversized_files = Manager().list()
Queue with directories for work
We have planned threads for processing paths and, in our case, each thread will work with a different path. Also, because in theory we have some subdirectories which wel find during thread working, we use queue for the paths list. How queue is used for threads working: each thread gets an element from the queue and processes it, the unique element from the queue will be given to a unique thread. Let’s create a queue for workers: to block if name == "main":
we add something like this: unsearched_dirs = Manager().Queue()
Create function to start parallel threads
this function will start the parallel workers, each worker working as a unique thread.
def search_big_files_start(threads):
#get path from envi dict
path = envi.get('path')
#create a pool for parallel worners
pool = Pool(threads)
#add to queue our base path
unsearched_dirs.put(path)
#start parallel workes: 1st parameter is the worker function; the second one is how many threads will be used
pool.map_async(serch_big_files_worker, range(threads))
#when all is done, close the pool and make a join for the queue (its mean-> main?? end of queue)
pool.close()
unsearched_dirs.join()
Create a worker function
How it will work in our case: The parallel worker gets a path from the queue where it needs to do two things: * check the size of all the files in the current directory * find all subdirectories and put them in the queue
def serch_big_files_worker(task_num):
# get max size and convert it to bytes
max_size = int(envi.get('max_size'))*1024*1024
while True:
# get path from the queue
working_path = unsearched_dirs.get()
# ### find big files block
# get all filenames from current dir
files = [file.name for file in os.scandir(working_path) if file.is_file(follow_symlinks=False)]
# for each file
for file in files:
# make a full path for each file
file_path = os.path.join(working_path, file)
# check the size and if it is bigger than the max size then append it to the oversized files list.
if os.path.getsize(file_path) > max_size:
oversized_files.append(str(file_path))
# ### add subdirs to the queue block
# find dir names in current directory
sub_dirs = [dir.name for dir in os.scandir(working_path) if dir.is_dir(follow_symlinks=False)]
# for each dir
for dirname in sub_dirs:
# make full path of dir
dir_path = os.path.join(working_path, dirname)
# put dir to queue
unsearched_dirs.put(dir_path)
# when it is all done, close the thread
unsearched_dirs.task_done()
Finishing touches
It is now needed to add to main() the block use to start function with the number of threads argument: search_big_files_start(p.threads) Also, when it is all done, we need to print the result to the screen:
print("list of file more than %d Mb", p.max_size)
print(*oversized_files)
And that is all.
Why we use os.scandir(working_path)
It’s because this way is faster. We can also use parallel processes to find subdirectories. And yes, if we have only one directory, we need to change our program, but in our case it is working with good paralleling. Moreover, this way is faster than the others (because we only have names and don’t create variables with os.path type in first) to check the list of dirs or files when you have a lot of paths and not many files in each path.
Why we disable Symlinks
Because, usually, when you are searching for files you don’t want to follow symlinks. But if you need to, you can enable it by replacing follow_symlinks=False
to follow_symlinks=True
Possible improvements
Good ideas are: * to make follow_symlink option via arguments * to check threads number and max size variables (not with minus sign, for example) * to make functions to understand different sizes like a M(MegaBytes), K(KiloBytes), for size argument.
Results
As of now, we have a program which can find all files better than n Mb in path and subpaths and uses parallel threads. Hopefully, it will be useful to create something more complex because multiple threads make everything better like a bluetooth =)
Full code of script here
#!/usr/bin/env python3
import argparse
from multiprocessing import Pool, Queue
from multiprocessing import Manager
from pathlib import Path
import os
def serch_big_files_worker(task_num):
# get max size and convert it to bytes
max_size = int(envi.get('max_size'))*1024*1024
while True:
# get path from queue
working_path = unsearched_dirs.get()
# ### find big files block
# get all filenames from current dir
files = [file.name for file in os.scandir(working_path) if file.is_file(follow_symlinks=False)]
# for each file
for file in files:
# make full path for each file
file_path = os.path.join(working_path, file)
# check size and if more then max size, then appent to oversized files list.
if os.path.getsize(file_path) > max_size:
oversized_files.append(str(file_path))
# ### add subdirs to queue block
# find dir names in current directory
sub_dirs = [dir.name for dir in os.scandir(working_path) if dir.is_dir(follow_symlinks=False)]
# for each dir
for dirname in sub_dirs:
# make full path of dir
dir_path = os.path.join(working_path, dirname)
# put dir to queue
unsearched_dirs.put(dir_path)
# when we all maked, close thread
unsearched_dirs.task_done()
def search_big_files_start(threads):
# get path from envi dict
path = envi.get('path')
# create a pool for parallel worners
pool = Pool(threads)
# add to queue our base path
unsearched_dirs.put(path)
# start parallel workes: 1st parameter is worker function, second - is how many thread will be used
pool.map_async(serch_big_files_worker, range(threads))
# when all was be done, close pool and make join for queue(its mean end of queue)
pool.close()
unsearched_dirs.join()
def main():
parser = argparse.ArgumentParser(description="""This is a programm for parallel searching files more than
maximum size. Programm uses threads for fast finding dirs and subdirs in path(EXCEPT symlinks)""")
parser.add_argument('--path', help='path, where we will find subdirs and files for check',
required=True, type=lambda p: Path(p).absolute())
parser.add_argument('--max_size', help='maximum file size for finding, in Mb', required=True, type=int)
parser.add_argument('--threads', help='number of threads for use, default 2', default=2, type=int)
p = parser.parse_args()
# lets add to envi dict some envs for store and for give them to our parralell threads
envi.update([
('path', str(p.path)),
('max_size', str(p.max_size))
])
# use start function for start parallel workers
search_big_files_start(p.threads)
# print result
print("list of file more than %d Mb", p.max_size)
print(*oversized_files)
if __name__ == "__main__":
# create envi dict for store some parameter for parallel threads
envi = Manager().dict()
# queue for path for check.
unsearched_dirs = Manager().Queue()
# list for files whom has size more then max_size
oversized_files = Manager().list()
main()
The original autor: Ivan, DevOps Engineer, cloudinfrastack