backports.py
2.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
"""
Backports of fixes for joblib dependencies
"""
import os
import time
from distutils.version import LooseVersion
from os.path import basename
from multiprocessing import util
try:
import numpy as np
def make_memmap(filename, dtype='uint8', mode='r+', offset=0,
shape=None, order='C', unlink_on_gc_collect=False):
"""Custom memmap constructor compatible with numpy.memmap.
This function:
- is a backport the numpy memmap offset fix (See
https://github.com/numpy/numpy/pull/8443 for more details.
The numpy fix is available starting numpy 1.13)
- adds ``unlink_on_gc_collect``, which specifies explicitly whether
the process re-constructing the memmap owns a reference to the
underlying file. If set to True, it adds a finalizer to the
newly-created memmap that sends a maybe_unlink request for the
memmaped file to resource_tracker.
"""
util.debug(
"[MEMMAP READ] creating a memmap (shape {}, filename {}, "
"pid {})".format(shape, basename(filename), os.getpid())
)
mm = np.memmap(filename, dtype=dtype, mode=mode, offset=offset,
shape=shape, order=order)
if LooseVersion(np.__version__) < '1.13':
mm.offset = offset
if unlink_on_gc_collect:
from ._memmapping_reducer import add_maybe_unlink_finalizer
add_maybe_unlink_finalizer(mm)
return mm
except ImportError:
def make_memmap(filename, dtype='uint8', mode='r+', offset=0,
shape=None, order='C', unlink_on_gc_collect=False):
raise NotImplementedError(
"'joblib.backports.make_memmap' should not be used "
'if numpy is not installed.')
if os.name == 'nt':
# https://github.com/joblib/joblib/issues/540
access_denied_errors = (5, 13)
from os import replace
def concurrency_safe_rename(src, dst):
"""Renames ``src`` into ``dst`` overwriting ``dst`` if it exists.
On Windows os.replace can yield permission errors if executed by two
different processes.
"""
max_sleep_time = 1
total_sleep_time = 0
sleep_time = 0.001
while total_sleep_time < max_sleep_time:
try:
replace(src, dst)
break
except Exception as exc:
if getattr(exc, 'winerror', None) in access_denied_errors:
time.sleep(sleep_time)
total_sleep_time += sleep_time
sleep_time *= 2
else:
raise
else:
raise
else:
from os import replace as concurrency_safe_rename # noqa