[ACCEPTED]-multiprocessing Pool.imap broken?-multiprocessing

Accepted answer
Score: 13

First notice that this works:

import multiprocessing as mp
import multiprocessing.util as util
pool=mp.Pool(1)
print list(pool.imap(abs, range(3)))

The difference 16 is that pool does not get finalized when the 15 call to pool.imap() ends.

In contrast,

print(list(mp.Pool(1).imap(abs, range(3))))

causes the Pool instance 14 to be finalized soon after the imap call ends. The 13 lack of a reference causes the Finalizer (called 12 self._terminate in the Pool class) to be called. This sets 11 in motion a sequence of commands which tears 10 down the task handler thread, result handler 9 thread, worker subprocesses, etc.

This all 8 happens so quickly, that at least on a majority 7 of runs, the task sent to the task handler 6 does not complete.

Here are the relevant 5 bits of code:

From /usr/lib/python2.6/multiprocessing/pool.py:

class Pool(object):
    def __init__(self, processes=None, initializer=None, initargs=()):
        ...
        self._terminate = Finalize(
            self, self._terminate_pool,
            args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
                  self._task_handler, self._result_handler, self._cache),
            exitpriority=15
            )

/usr/lib/python2.6/multiprocessing/util.py:

class Finalize(object):
    '''
    Class which supports object finalization using weakrefs
    '''
    def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
        ...
        if obj is not None:
            self._weakref = weakref.ref(obj, self)   

The 4 weakref.ref(obj,self) causes self() to be called when obj is about to 3 be finalized.

I used the debug command util.log_to_stderr(util.SUBDEBUG) to 2 learn the sequence of events. For example:

import multiprocessing as mp
import multiprocessing.util as util
util.log_to_stderr(util.SUBDEBUG)

print(list(mp.Pool(1).imap(abs, range(3))))

yields

[DEBUG/MainProcess] created semlock with handle 3077013504
[DEBUG/MainProcess] created semlock with handle 3077009408
[DEBUG/MainProcess] created semlock with handle 3077005312
[DEBUG/MainProcess] created semlock with handle 3077001216
[INFO/PoolWorker-1] child process calling self.run()
[SUBDEBUG/MainProcess] finalizer calling <bound method type._terminate_pool of <class 'multiprocessing.pool.Pool'>> with args (<Queue.Queue instance at 0x9d6e62c>, <multiprocessing.queues.SimpleQueue object at 0x9cf04cc>, <multiprocessing.queues.SimpleQueue object at 0x9d6e40c>, [<Process(PoolWorker-1, started daemon)>], <Thread(Thread-1, started daemon -1217967248)>, <Thread(Thread-2, started daemon -1226359952)>, {0: <multiprocessing.pool.IMapIterator object at 0x9d6eaec>}) and kwargs {}
[DEBUG/MainProcess] finalizing pool
...

and 1 compare that with

import multiprocessing as mp
import multiprocessing.util as util
util.log_to_stderr(util.SUBDEBUG)
pool=mp.Pool(1)
print list(pool.imap(abs, range(3)))

which yields

[DEBUG/MainProcess] created semlock with handle 3078684672
[DEBUG/MainProcess] created semlock with handle 3078680576
[DEBUG/MainProcess] created semlock with handle 3078676480
[DEBUG/MainProcess] created semlock with handle 3078672384
[INFO/PoolWorker-1] child process calling self.run()
[DEBUG/MainProcess] doing set_length()
[0, 1, 2]
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[SUBDEBUG/MainProcess] calling <Finalize object, callback=_terminate_pool, args=(<Queue.Queue instance at 0xb763e60c>, <multiprocessing.queues.SimpleQueue object at 0xb76c94ac>, <multiprocessing.queues.SimpleQueue object at 0xb763e3ec>, [<Process(PoolWorker-1, started daemon)>], <Thread(Thread-1, started daemon -1218274448)>, <Thread(Thread-2, started daemon -1226667152)>, {}), exitprority=15>
...
[DEBUG/MainProcess] finalizing pool
Score: 1

In my case, I was calling the pool.imap() without expecting 15 a return value and not getting it to work. However, if 14 I tried it with pool.map() it worked fine. The issue 13 was exactly as the previous answer stated: there 12 was no finalizer called, so the process 11 was effectively dumped before it was started.

The 10 solution was to evoke a finalizer such as 9 a list() function. This caused it to work correctly, since 8 it now requires fulfillment to be handed 7 to the list function, and thus the process 6 was executed. In brief, it is explained 5 below (this is, of course, simplified. For 4 now, just pretend it's something useful):

from multiprocessing import Pool
from shutil import copy
from tqdm import tqdm

filedict = { r"C:\src\file1.txt": r"C:\trg\file1_fixed.txt",
             r"C:\src\file2.txt": r"C:\trg\file2_fixed.txt",
             r"C:\src\file3.txt": r"C:\trg\file3_fixed.txt",
             r"C:\src\file4.txt": r"C:\trg\file4_fixed.txt" }

# target process
def copyfile(srctrg):  
    copy(srctrg[0],srctrg[1])
    return True

# a couple of trial processes for illustration
with Pool(2) as pool:

    # works fine with map, but cannot utilize tqdm() since no iterator object is returned 
    pool.map(copyfile,list(filedict.items()))

    # will not work, since no finalizer is called for imap
    tqdm(pool.imap(copyfile,list(filedict.items())))    # NOT WORKING

    # this works, since the finalization is forced for the process
    list(tqdm(pool.imap(copyfile,list(filedict.items()))))

In 3 my case, the simple solution was to enclose 2 the entire tqdm(pool.imap(...)) in a list() in order to force the 1 execution.

More Related questions