reduction.py
9.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
###############################################################################
# Customizable Pickler with some basic reducers
#
# author: Thomas Moreau
#
# adapted from multiprocessing/reduction.py (17/02/2017)
# * Replace the ForkingPickler with a similar _LokyPickler,
# * Add CustomizableLokyPickler to allow customizing pickling process
# on the fly.
#
import io
import os
import sys
import functools
from multiprocessing import util
import types
try:
# Python 2 compat
from cPickle import loads as pickle_loads
except ImportError:
from pickle import loads as pickle_loads
import copyreg
from pickle import HIGHEST_PROTOCOL
if sys.platform == "win32":
if sys.version_info[:2] > (3, 3):
from multiprocessing.reduction import duplicate
else:
from multiprocessing.forking import duplicate
###############################################################################
# Enable custom pickling in Loky.
# To allow instance customization of the pickling process, we use 2 classes.
# _ReducerRegistry gives module level customization and CustomizablePickler
# permits to use instance base custom reducers. Only CustomizablePickler
# should be used.
class _ReducerRegistry(object):
"""Registry for custom reducers.
HIGHEST_PROTOCOL is selected by default as this pickler is used
to pickle ephemeral datastructures for interprocess communication
hence no backward compatibility is required.
"""
# We override the pure Python pickler as its the only way to be able to
# customize the dispatch table without side effects in Python 2.6
# to 3.2. For Python 3.3+ leverage the new dispatch_table
# feature from http://bugs.python.org/issue14166 that makes it possible
# to use the C implementation of the Pickler which is faster.
dispatch_table = {}
@classmethod
def register(cls, type, reduce_func):
"""Attach a reducer function to a given type in the dispatch table."""
if sys.version_info < (3,):
# Python 2 pickler dispatching is not explicitly customizable.
# Let us use a closure to workaround this limitation.
def dispatcher(cls, obj):
reduced = reduce_func(obj)
cls.save_reduce(obj=obj, *reduced)
cls.dispatch_table[type] = dispatcher
else:
cls.dispatch_table[type] = reduce_func
###############################################################################
# Registers extra pickling routines to improve picklization for loky
register = _ReducerRegistry.register
# make methods picklable
def _reduce_method(m):
if m.__self__ is None:
return getattr, (m.__class__, m.__func__.__name__)
else:
return getattr, (m.__self__, m.__func__.__name__)
class _C:
def f(self):
pass
@classmethod
def h(cls):
pass
register(type(_C().f), _reduce_method)
register(type(_C.h), _reduce_method)
if not hasattr(sys, "pypy_version_info"):
# PyPy uses functions instead of method_descriptors and wrapper_descriptors
def _reduce_method_descriptor(m):
return getattr, (m.__objclass__, m.__name__)
register(type(list.append), _reduce_method_descriptor)
register(type(int.__add__), _reduce_method_descriptor)
# Make partial func pickable
def _reduce_partial(p):
return _rebuild_partial, (p.func, p.args, p.keywords or {})
def _rebuild_partial(func, args, keywords):
return functools.partial(func, *args, **keywords)
register(functools.partial, _reduce_partial)
if sys.platform != "win32":
from ._posix_reduction import _mk_inheritable # noqa: F401
else:
from . import _win_reduction # noqa: F401
# global variable to change the pickler behavior
try:
from joblib.externals import cloudpickle # noqa: F401
DEFAULT_ENV = "cloudpickle"
except ImportError:
# If cloudpickle is not present, fallback to pickle
DEFAULT_ENV = "pickle"
ENV_LOKY_PICKLER = os.environ.get("LOKY_PICKLER", DEFAULT_ENV)
_LokyPickler = None
_loky_pickler_name = None
def set_loky_pickler(loky_pickler=None):
global _LokyPickler, _loky_pickler_name
if loky_pickler is None:
loky_pickler = ENV_LOKY_PICKLER
loky_pickler_cls = None
# The default loky_pickler is cloudpickle
if loky_pickler in ["", None]:
loky_pickler = "cloudpickle"
if loky_pickler == _loky_pickler_name:
return
if loky_pickler == "cloudpickle":
from joblib.externals.cloudpickle import CloudPickler as loky_pickler_cls
else:
try:
from importlib import import_module
module_pickle = import_module(loky_pickler)
loky_pickler_cls = module_pickle.Pickler
except (ImportError, AttributeError) as e:
extra_info = ("\nThis error occurred while setting loky_pickler to"
" '{}', as required by the env variable LOKY_PICKLER"
" or the function set_loky_pickler."
.format(loky_pickler))
e.args = (e.args[0] + extra_info,) + e.args[1:]
e.msg = e.args[0]
raise e
util.debug("Using '{}' for serialization."
.format(loky_pickler if loky_pickler else "cloudpickle"))
class CustomizablePickler(loky_pickler_cls):
_loky_pickler_cls = loky_pickler_cls
def _set_dispatch_table(self, dispatch_table):
for ancestor_class in self._loky_pickler_cls.mro():
dt_attribute = getattr(ancestor_class, "dispatch_table", None)
if isinstance(dt_attribute, types.MemberDescriptorType):
# Ancestor class (typically _pickle.Pickler) has a
# member_descriptor for its "dispatch_table" attribute. Use
# it to set the dispatch_table as a member instead of a
# dynamic attribute in the __dict__ of the instance,
# otherwise it will not be taken into account by the C
# implementation of the dump method if a subclass defines a
# class-level dispatch_table attribute as was done in
# cloudpickle 1.6.0:
# https://github.com/joblib/loky/pull/260
dt_attribute.__set__(self, dispatch_table)
break
# On top of member descriptor set, also use setattr such that code
# that directly access self.dispatch_table gets a consistent view
# of the same table.
self.dispatch_table = dispatch_table
def __init__(self, writer, reducers=None, protocol=HIGHEST_PROTOCOL):
loky_pickler_cls.__init__(self, writer, protocol=protocol)
if reducers is None:
reducers = {}
if sys.version_info < (3,):
self.dispatch = loky_pickler_cls.dispatch.copy()
self.dispatch.update(_ReducerRegistry.dispatch_table)
else:
if hasattr(self, "dispatch_table"):
# Force a copy that we will update without mutating the
# any class level defined dispatch_table.
loky_dt = dict(self.dispatch_table)
else:
# Use standard reducers as bases
loky_dt = copyreg.dispatch_table.copy()
# Register loky specific reducers
loky_dt.update(_ReducerRegistry.dispatch_table)
# Set the new dispatch table, taking care of the fact that we
# need to use the member_descriptor when we inherit from a
# subclass of the C implementation of the Pickler base class
# with an class level dispatch_table attribute.
self._set_dispatch_table(loky_dt)
# Register custom reducers
for type, reduce_func in reducers.items():
self.register(type, reduce_func)
def register(self, type, reduce_func):
"""Attach a reducer function to a given type in the dispatch table.
"""
if sys.version_info < (3,):
# Python 2 pickler dispatching is not explicitly customizable.
# Let us use a closure to workaround this limitation.
def dispatcher(self, obj):
reduced = reduce_func(obj)
self.save_reduce(obj=obj, *reduced)
self.dispatch[type] = dispatcher
else:
self.dispatch_table[type] = reduce_func
_LokyPickler = CustomizablePickler
_loky_pickler_name = loky_pickler
def get_loky_pickler_name():
global _loky_pickler_name
return _loky_pickler_name
def get_loky_pickler():
global _LokyPickler
return _LokyPickler
# Set it to its default value
set_loky_pickler()
def loads(buf):
# Compat for python2.7 version
if sys.version_info < (3, 3) and isinstance(buf, io.BytesIO):
buf = buf.getvalue()
return pickle_loads(buf)
def dump(obj, file, reducers=None, protocol=None):
'''Replacement for pickle.dump() using _LokyPickler.'''
global _LokyPickler
_LokyPickler(file, reducers=reducers, protocol=protocol).dump(obj)
def dumps(obj, reducers=None, protocol=None):
global _LokyPickler
buf = io.BytesIO()
dump(obj, buf, reducers=reducers, protocol=protocol)
if sys.version_info < (3, 3):
return buf.getvalue()
return buf.getbuffer()
__all__ = ["dump", "dumps", "loads", "register", "set_loky_pickler"]
if sys.platform == "win32":
__all__ += ["duplicate"]