Example Of Using Python 'Multiprocessing' Library For Multithread Processing Files




Python Logo

Our task:

Let’s suppose we have a set of 100,000 files placed in 100,000 paths. We need to know the size of each and then make a list of the ones larger than n megabytes with full paths while not spending ages on it. Simple methods like bash’s find and grep are too slow, so in this article we will talk about how we can use python multiprocessing library for our files. Python program can be also used for example with check md5sum of file, access rules, etc.

Program logic:

We will program a number of threads, the maximum size of a file, and path that we need to check. As a result, we get a list of files with full paths which are be bigger than the maximum size. The program will search all sub-paths recursive in parallel and we will check the size of each founded file in the path.

Used libraries:

argparse - for parsing arguments multiprocessing - to make parallel processes os - to work with files from pathlib we use Path part - to make our path a full path.

Coding:

Create a main function of the program

def main():

if __name__ == "__main__":
    main()

Parsing arguments:

In order to work, our program needs to know certain things: * the path where we lock the files and the subdirs with filenames * the maximum file size, the programme will show all files * threads, how many we will use (good thoad is used less than CPU cores) Let’s make parsing arguments with ‘argparse’ library Add to our main():

# create parser variable
parser = argparse.ArgumentParser(description="""This is a program for parallel searching files bigger than the maximum size. The program uses threads to quickly find dirs and subdirs in the path (EXCEPT symlinks)""")
# agrument path - for path, it is a new type where we use pathlib to create a full correct path.
parser.add_argument('--path', help='path, where we find subdirs and files for check',
                    required=True, type=lambda p: Path(p).absolute())
# max size of file in Mb
parser.add_argument('--max_size', help='maximum file size for searching in Mb', required=True, type=int)
# number of threads with default 2
parser.add_argument('--threads', help='number of threads for use, default 2', default=2, type=int)

code type=lambda p: Path(p).absolute() - will make absolute path from path given to our program.

creating query, dictionary and list for threads

Because we can’t give arguments to our threads using the basic way (except for the elements from one list), and the thread can’t return anything to main(), we need to give it for example, path, size of file and other variables. Multiprocessing library provides a method for this - we can create a dictionary or a list of threads. multiple threads can obtain variables or append something to the list. Let’s create a few for our program.

dict for environment variables

To add information to the thread (e.g. size of path) we will create a special dictionary which can be read and written by all threads. to block if __name__ == "__main__": we add something like this: envi = Manager().dict() then add to the dictionary into main some args:

 envi.update([
        ('path', str(p.path)),
        ('max_size', str(p.max_size))
    ])

We created this special dictionary in this block because it needs to be globally variable.

list for store file paths with size bigger than max max_size

Our parallel workers need to put paths of oversized files, let’s create a list: to block if __name__ == "__main__": we add something like this: oversized_files = Manager().list()

Queue with directories for work.

We have planned threads for processing paths and, in our case, each thread will work with a different path. Also, because in theory we have some subdirectories which wel find during thread working, we use queue for the paths list. How queue is used for threads working: each thread gets an element from the queue and processes it, the unique element from the queue will be given to a unique thread. Let’s create a queue for workers: to block if __name__ == "__main__": we add something like this: unsearched_dirs = Manager().Queue()

create function to start parallel threads:

this function will start the parallel workers, each worker working as a unique thread.

def search_big_files_start(threads):
    #get path from envi dict
    path = envi.get('path')
    #create a pool for parallel worners
    pool = Pool(threads)
    #add to queue our base path
    unsearched_dirs.put(path)
    #start parallel workes: 1st parameter is the worker function; the second one is how many threads will be used
    pool.map_async(serch_big_files_worker, range(threads))
    #when all is done, close the pool and make a join for the queue (its mean-> main?? end of queue)
    pool.close()
    unsearched_dirs.join()

create a worker function

How it will work in our case: The parallel worker gets a path from the queue where it needs to do two things: * check the size of all the files in the current directory * find all subdirectories and put them in the queue

def serch_big_files_worker(task_num):
    # get max size and convert it to bytes
    max_size = int(envi.get('max_size'))*1024*1024
    while True:
        # get path from the queue
        working_path = unsearched_dirs.get()
        # ### find big files block
        # get all filenames from current dir
        files = [file.name for file in os.scandir(working_path) if file.is_file(follow_symlinks=False)]
        # for each file
        for file in files:
            # make a full path for each file
            file_path = os.path.join(working_path, file)
            # check the size and if it is bigger than the max size then append it to the oversized files list.
            if os.path.getsize(file_path) > max_size:
                oversized_files.append(str(file_path))

        # ### add subdirs to the queue block
        # find dir names in current directory
        sub_dirs = [dir.name for dir in os.scandir(working_path) if dir.is_dir(follow_symlinks=False)]
        # for each dir
        for dirname in sub_dirs:
            # make full path of dir
            dir_path = os.path.join(working_path, dirname)
            # put dir to queue
            unsearched_dirs.put(dir_path)
        # when it is all done, close the thread
        unsearched_dirs.task_done()

finishing touches

It is now needed to add to main() the block use to start function with the number of threads argument: search_big_files_start(p.threads) Also, when it is all done, we need to print the result to the screen:

    print("list of file more than %d Mb", p.max_size)
    print(*oversized_files)

And that is all.

why we use os.scandir(working_path)

It’s because this way is faster. We can also use parallel processes to find subdirectories. And yes, if we have only one directory, we need to change our program, but in our case it is working with good paralleling. Moreover, this way is faster than the others (because we only have names and don’t create variables with os.path type in first) to check the list of dirs or files when you have a lot of paths and not many files in each path.

Because, usually, when you are searching for files you don’t want to follow symlinks. But if you need to, you can enable it by replacing follow_symlinks=False to follow_symlinks=True

possible improvements

Good ideas are: * to make follow_symlink option via arguments * to check threads number and max size variables (not with minus sign, for example) * to make functions to understand different sizes like a M(MegaBytes), K(KiloBytes), for size argument.

Results

As of now, we have a program which can find all files better than n Mb in path and subpaths and uses parallel threads. Hopefully, it will be useful to create something more complex because multiple threads make everything better like a bluetooth =)

#!/usr/bin/env python3

import argparse
from multiprocessing import Pool, Queue
from multiprocessing import Manager
from pathlib import Path
import os


def serch_big_files_worker(task_num):
    # get max size and convert it to bytes
    max_size = int(envi.get('max_size'))*1024*1024
    while True:
        # get path from queue
        working_path = unsearched_dirs.get()
        # ###  find big files block
        # get all filenames from current dir
        files = [file.name for file in os.scandir(working_path) if file.is_file(follow_symlinks=False)]
        # for each file
        for file in files:
            # make a full path for each file
            file_path = os.path.join(working_path, file)
            # check the size and if it’s bigger than the max size then append it to the oversized files list.
            if os.path.getsize(file_path) > max_size:
                oversized_files.append(str(file_path))

        # ### add subdirs to queue block
        # find dir names in current directory
        sub_dirs = [dir.name for dir in os.scandir(working_path) if dir.is_dir(follow_symlinks=False)]
        # for each dir
        for dirname in sub_dirs:
            # make full path of dir
            dir_path = os.path.join(working_path, dirname)
            # put dir to queue
            unsearched_dirs.put(dir_path)
        # when it is all done, close the thread
        unsearched_dirs.task_done()

def search_big_files_start(threads):
    # get the path from envi dict
    path = envi.get('path')
    # create a pool for parallel worners
    pool = Pool(threads)
    # add our base path to the queue
    unsearched_dirs.put(path)
    # start parallel workers: 1st parameter is the worker function, the second one is how many threads will be used
    pool.map_async(serch_big_files_worker, range(threads))
    # when all is done, close the pool and do join for queue(its mean end of queue)
    pool.close()
    unsearched_dirs.join()


def main():
    parser = argparse.ArgumentParser(description="""This is a program for parallel searching files bigger than the
        maximum size.The program uses threads to quickly find dirs and subdirs in the path (EXCEPT symlinks)""")
    parser.add_argument('--path', help='path, where we find subdirs and files for check',
                        required=True, type=lambda p: Path(p).absolute())
    parser.add_argument('--max_size', help='maximum file size for searching, in Mb', required=True, type=int)
    parser.add_argument('--threads', help='number of threads for use, default 2', default=2, type=int)
    p = parser.parse_args()
    # let’s add to envi dict some envs for store and put them to our parallel threads
    envi.update([
        ('path', str(p.path)),
        ('max_size', str(p.max_size))
    ])
    # use start function to start parallel workers
    search_big_files_start(p.threads)
    # print the result
    print("list of file more than %d Mb", p.max_size)
    print(*oversized_files)


if __name__ == "__main__":
    # create envi dict to store the parameters for parallel threads
    envi = Manager().dict()
    # queue for path for check.
    unsearched_dirs = Manager().Queue()
    # list for the files which have a size bigger than max_size
    oversized_files = Manager().list()
    main()

find_big_giles_parallel.py

Ivan

Ivan

DevOps Engineer

Do you have any questions or comments? Contact Us

ALL ARTICLES