[ACCEPTED]-multiprocessing Pool.imap broken?-multiprocessing
First notice that this works:
import multiprocessing as mp
import multiprocessing.util as util
pool=mp.Pool(1)
print list(pool.imap(abs, range(3)))
The difference 16 is that pool
does not get finalized when the 15 call to pool.imap()
ends.
In contrast,
print(list(mp.Pool(1).imap(abs, range(3))))
causes the Pool
instance 14 to be finalized soon after the imap
call ends.
The 13 lack of a reference causes the Finalizer
(called 12 self._terminate
in the Pool
class) to be called. This sets 11 in motion a sequence of commands which tears 10 down the task handler thread, result handler 9 thread, worker subprocesses, etc.
This all 8 happens so quickly, that at least on a majority 7 of runs, the task sent to the task handler 6 does not complete.
Here are the relevant 5 bits of code:
From /usr/lib/python2.6/multiprocessing/pool.py:
class Pool(object):
def __init__(self, processes=None, initializer=None, initargs=()):
...
self._terminate = Finalize(
self, self._terminate_pool,
args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
self._task_handler, self._result_handler, self._cache),
exitpriority=15
)
/usr/lib/python2.6/multiprocessing/util.py:
class Finalize(object):
'''
Class which supports object finalization using weakrefs
'''
def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
...
if obj is not None:
self._weakref = weakref.ref(obj, self)
The 4 weakref.ref(obj,self)
causes self()
to be called when obj
is about to 3 be finalized.
I used the debug command util.log_to_stderr(util.SUBDEBUG)
to 2 learn the sequence of events. For example:
import multiprocessing as mp
import multiprocessing.util as util
util.log_to_stderr(util.SUBDEBUG)
print(list(mp.Pool(1).imap(abs, range(3))))
yields
[DEBUG/MainProcess] created semlock with handle 3077013504
[DEBUG/MainProcess] created semlock with handle 3077009408
[DEBUG/MainProcess] created semlock with handle 3077005312
[DEBUG/MainProcess] created semlock with handle 3077001216
[INFO/PoolWorker-1] child process calling self.run()
[SUBDEBUG/MainProcess] finalizer calling <bound method type._terminate_pool of <class 'multiprocessing.pool.Pool'>> with args (<Queue.Queue instance at 0x9d6e62c>, <multiprocessing.queues.SimpleQueue object at 0x9cf04cc>, <multiprocessing.queues.SimpleQueue object at 0x9d6e40c>, [<Process(PoolWorker-1, started daemon)>], <Thread(Thread-1, started daemon -1217967248)>, <Thread(Thread-2, started daemon -1226359952)>, {0: <multiprocessing.pool.IMapIterator object at 0x9d6eaec>}) and kwargs {}
[DEBUG/MainProcess] finalizing pool
...
and 1 compare that with
import multiprocessing as mp
import multiprocessing.util as util
util.log_to_stderr(util.SUBDEBUG)
pool=mp.Pool(1)
print list(pool.imap(abs, range(3)))
which yields
[DEBUG/MainProcess] created semlock with handle 3078684672
[DEBUG/MainProcess] created semlock with handle 3078680576
[DEBUG/MainProcess] created semlock with handle 3078676480
[DEBUG/MainProcess] created semlock with handle 3078672384
[INFO/PoolWorker-1] child process calling self.run()
[DEBUG/MainProcess] doing set_length()
[0, 1, 2]
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[SUBDEBUG/MainProcess] calling <Finalize object, callback=_terminate_pool, args=(<Queue.Queue instance at 0xb763e60c>, <multiprocessing.queues.SimpleQueue object at 0xb76c94ac>, <multiprocessing.queues.SimpleQueue object at 0xb763e3ec>, [<Process(PoolWorker-1, started daemon)>], <Thread(Thread-1, started daemon -1218274448)>, <Thread(Thread-2, started daemon -1226667152)>, {}), exitprority=15>
...
[DEBUG/MainProcess] finalizing pool
In my case, I was calling the pool.imap()
without expecting 15 a return value and not getting it to work. However, if 14 I tried it with pool.map()
it worked fine. The issue 13 was exactly as the previous answer stated: there 12 was no finalizer called, so the process 11 was effectively dumped before it was started.
The 10 solution was to evoke a finalizer such as 9 a list()
function. This caused it to work correctly, since 8 it now requires fulfillment to be handed 7 to the list function, and thus the process 6 was executed. In brief, it is explained 5 below (this is, of course, simplified. For 4 now, just pretend it's something useful):
from multiprocessing import Pool
from shutil import copy
from tqdm import tqdm
filedict = { r"C:\src\file1.txt": r"C:\trg\file1_fixed.txt",
r"C:\src\file2.txt": r"C:\trg\file2_fixed.txt",
r"C:\src\file3.txt": r"C:\trg\file3_fixed.txt",
r"C:\src\file4.txt": r"C:\trg\file4_fixed.txt" }
# target process
def copyfile(srctrg):
copy(srctrg[0],srctrg[1])
return True
# a couple of trial processes for illustration
with Pool(2) as pool:
# works fine with map, but cannot utilize tqdm() since no iterator object is returned
pool.map(copyfile,list(filedict.items()))
# will not work, since no finalizer is called for imap
tqdm(pool.imap(copyfile,list(filedict.items()))) # NOT WORKING
# this works, since the finalization is forced for the process
list(tqdm(pool.imap(copyfile,list(filedict.items()))))
In 3 my case, the simple solution was to enclose 2 the entire tqdm(pool.imap(...))
in a list()
in order to force the 1 execution.
More Related questions
We use cookies to improve the performance of the site. By staying on our site, you agree to the terms of use of cookies.