parallel.py
3.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
"""Convenient parallelization of higher order functions.
This module provides two helper functions, with appropriate fallbacks on
Python 2 and on systems lacking support for synchronization mechanisms:
- map_multiprocess
- map_multithread
These helpers work like Python 3's map, with two differences:
- They don't guarantee the order of processing of
the elements of the iterable.
- The underlying process/thread pools chop the iterable into
a number of chunks, so that for very long iterables using
a large value for chunksize can make the job complete much faster
than using the default value of 1.
"""
__all__ = ['map_multiprocess', 'map_multithread']
from contextlib import contextmanager
from multiprocessing import Pool as ProcessPool
from multiprocessing.dummy import Pool as ThreadPool
from pip._vendor.requests.adapters import DEFAULT_POOLSIZE
from pip._vendor.six import PY2
from pip._vendor.six.moves import map
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from multiprocessing import pool
from typing import Callable, Iterable, Iterator, TypeVar, Union
Pool = Union[pool.Pool, pool.ThreadPool]
S = TypeVar('S')
T = TypeVar('T')
# On platforms without sem_open, multiprocessing[.dummy] Pool
# cannot be created.
try:
import multiprocessing.synchronize # noqa
except ImportError:
LACK_SEM_OPEN = True
else:
LACK_SEM_OPEN = False
# Incredibly large timeout to work around bpo-8296 on Python 2.
TIMEOUT = 2000000
@contextmanager
def closing(pool):
# type: (Pool) -> Iterator[Pool]
"""Return a context manager making sure the pool closes properly."""
try:
yield pool
finally:
# For Pool.imap*, close and join are needed
# for the returned iterator to begin yielding.
pool.close()
pool.join()
pool.terminate()
def _map_fallback(func, iterable, chunksize=1):
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
"""Make an iterator applying func to each element in iterable.
This function is the sequential fallback either on Python 2
where Pool.imap* doesn't react to KeyboardInterrupt
or when sem_open is unavailable.
"""
return map(func, iterable)
def _map_multiprocess(func, iterable, chunksize=1):
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
"""Chop iterable into chunks and submit them to a process pool.
For very long iterables using a large value for chunksize can make
the job complete much faster than using the default value of 1.
Return an unordered iterator of the results.
"""
with closing(ProcessPool()) as pool:
return pool.imap_unordered(func, iterable, chunksize)
def _map_multithread(func, iterable, chunksize=1):
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
"""Chop iterable into chunks and submit them to a thread pool.
For very long iterables using a large value for chunksize can make
the job complete much faster than using the default value of 1.
Return an unordered iterator of the results.
"""
with closing(ThreadPool(DEFAULT_POOLSIZE)) as pool:
return pool.imap_unordered(func, iterable, chunksize)
if LACK_SEM_OPEN or PY2:
map_multiprocess = map_multithread = _map_fallback
else:
map_multiprocess = _map_multiprocess
map_multithread = _map_multithread