The Python Oracle

Python Read huge file line per line and send it to multiprocessing or thread

--------------------------------------------------
Rise to the top 3% as a developer or hire one of them at Toptal: https://topt.al/25cXVn
--------------------------------------------------

Music by Eric Matyas
https://www.soundimage.org
Track title: Lost Civilization

--

Chapters
00:00 Python Read Huge File Line Per Line And Send It To Multiprocessing Or Thread
01:24 Accepted Answer Score 4
02:43 Answer 2 Score 1
04:47 Thank you

--

Full question
https://stackoverflow.com/questions/7182...

--

Content licensed under CC BY-SA
https://meta.stackexchange.com/help/lice...

--

Tags
#python #multiprocessing #readfile

#avk47



ACCEPTED ANSWER

Score 4


Although the problem seems unrealistic though. shooting 737,022,387 requests! calculate how many months it'll take from single computer!!

Still, Better way to do this task is to read line by line from file in a separate thread and insert into a queue. And then multi-process the queue.

Solution 1:

from multiprocessing import Queue, Process
from threading import Thread
from time import sleep

urls_queue = Queue()
max_process = 4

def read_urls():
    with open('urls_file.txt', 'r') as f:
        for url in f:
            urls_queue.put(url.strip())
            print('put url: {}'.format(url.strip()))

    # put DONE to tell send_request_processor to exit
    for i in range(max_process):
        urls_queue.put("DONE")


def send_request(url):
    print('send request: {}'.format(url))
    sleep(1)
    print('recv response: {}'.format(url))


def send_request_processor():
    print('start send request processor')
    while True:
        url = urls_queue.get()
        if url == "DONE":
            break
        else:
            send_request(url)


def main():
    file_reader_thread = Thread(target=read_urls)
    file_reader_thread.start()

    procs = []
    for i in range(max_process):
        p = Process(target=send_request_processor)
        procs.append(p)
        p.start()

    for p in procs:
        p.join()

    print('all done')
    # wait for all tasks in the queue
    file_reader_thread.join()


if __name__ == '__main__':
    main()

Demo: https://onlinegdb.com/Elfo5bGFz

Solution 2:

You can use tornado asynchronous networking library

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue

q = Queue(maxsize=2)

async def consumer():
    async for item in q:
        try:
            print('Doing work on %s' % item)
            await gen.sleep(0.01)
        finally:
            q.task_done()

async def producer():
    with open('urls_file.txt', 'r') as f:
        for url in f:
            await q.put(url)
            print('Put %s' % item)

async def main():
    # Start consumer without waiting (since it never finishes).
    IOLoop.current().spawn_callback(consumer)
    await producer()     # Wait for producer to put all tasks.
    await q.join()       # Wait for consumer to finish all tasks.
    print('Done')
    # producer and consumer can run in parallel

IOLoop.current().run_sync(main)



ANSWER 2

Score 1


Using method multiprocessing.pool.imap is a step in the right direction but the problem is that with so much input you will be feeding the input task queue faster than the processing pool can take the tasks off the queue and return results. Consequently, the task queue will continue to grow and you will exhaust memory. What is needed is a way to "throttle" method imap so that it blocks once the task queue size has N tasks on it. I think a reasonable value for N as a default is twice the pool size to ensure that when a pool process completes work on a task there will be no delay for it to find another task to work on. Hence we create classes BoundedQueueProcessPool (multiprocessing) and BoundedQueueThreadPool (multithreading):

import multiprocessing.pool
import multiprocessing
import threading


class ImapResult():
    def __init__(self, semaphore, result):
        self._semaphore = semaphore
        self.it = result.__iter__()

    def __iter__(self):
        return self

    def __next__(self):
        try:
            elem = self.it.__next__()
            self._semaphore.release()
            return elem
        except StopIteration:
            raise
        except:
            self._semaphore.release()
            raise

class BoundedQueuePool:
    def __init__(self, limit, semaphore):
        self._limit = limit
        self._semaphore = semaphore

    def release(self, result, callback=None):
        self._semaphore.release()
        if callback:
            callback(result)

    def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None):
        self._semaphore.acquire()
        callback_fn = self.release if callback is None else lambda result: self.release(result, callback=callback)
        error_callback_fn = self.release if error_callback is None else lambda result: self.release(result, callback=callback)
        return super().apply_async(func, args, kwds, callback=callback_fn, error_callback=error_callback_fn)

    def imap(self, func, iterable, chunksize=1):
        def new_iterable(iterable):
            for elem in iterable:
                self._semaphore.acquire()
                yield elem
        if chunksize > self._limit:
            raise ValueError(f'chunksize argument exceeds {self._limit}')
        result = super().imap(func, new_iterable(iterable), chunksize)
        return ImapResult(self._semaphore, result)

    def imap_unordered(self, func, iterable, chunksize=1):
        def new_iterable(iterable):
            for elem in iterable:
                self._semaphore.acquire()
                yield elem
        if chunksize > self._limit:
            raise ValueError(f'chunksize argument exceeds {self._limit}')
        result = super().imap_unordered(func, new_iterable(iterable), chunksize)
        return ImapResult(self._semaphore, result)

class BoundedQueueProcessPool(BoundedQueuePool, multiprocessing.pool.Pool):
    def __init__(self, *args, max_waiting_tasks=None, **kwargs):
        multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
        if max_waiting_tasks is None:
            max_waiting_tasks = self._processes
        elif max_waiting_tasks < 0:
            raise ValueError(f'Invalid negative max_waiting_tasks value: {max_waiting_tasks}')
        limit = self._processes + max_waiting_tasks
        BoundedQueuePool.__init__(self, limit, multiprocessing.BoundedSemaphore(limit))

class BoundedQueueThreadPool(BoundedQueuePool, multiprocessing.pool.ThreadPool):
    def __init__(self, *args, max_waiting_tasks=None, **kwargs):
        multiprocessing.pool.ThreadPool.__init__(self, *args, **kwargs)
        if max_waiting_tasks is None:
            max_waiting_tasks = self._processes
        elif max_waiting_tasks < 0:
            raise ValueError(f'Invalid negative max_waiting_tasks value: {max_waiting_tasks}')
        limit = self._processes + max_waiting_tasks
        BoundedQueuePool.__init__(self, limit, threading.BoundedSemaphore(limit))


#######################################################################

from time import sleep


def process_line(line):
    sleep(3)
    # the lines already have line end characters:
    print(line, end='')
    return True

if __name__ == "__main__":
    pool = BoundedQueueProcessPool(2)
    with open("test.txt") as file:
        for res in pool.imap(process_line, file):
            #print(res)
            pass
    pool.close()
    pool.join()