semlock.py
8.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
###############################################################################
# Ctypes implementation for posix semaphore.
#
# author: Thomas Moreau and Olivier Grisel
#
# adapted from cpython/Modules/_multiprocessing/semaphore.c (17/02/2017)
# * use ctypes to access pthread semaphores and provide a full python
# semaphore management.
# * For OSX, as no sem_getvalue is not implemented, Semaphore with value > 1
# are not guaranteed to work.
# * Only work with LokyProcess on posix
#
import os
import sys
import time
import errno
import ctypes
import tempfile
import threading
from ctypes.util import find_library
# As we need to use ctypes return types for semlock object, failure value
# needs to be cast to proper python value. Unix failure convention is to
# return 0, whereas OSX returns -1
SEM_FAILURE = ctypes.c_void_p(0).value
if sys.platform == 'darwin':
SEM_FAILURE = ctypes.c_void_p(-1).value
# Semaphore types
RECURSIVE_MUTEX = 0
SEMAPHORE = 1
# Semaphore constants
SEM_OFLAG = ctypes.c_int(os.O_CREAT | os.O_EXCL)
SEM_PERM = ctypes.c_int(384)
class timespec(ctypes.Structure):
_fields_ = [("tv_sec", ctypes.c_long), ("tv_nsec", ctypes.c_long)]
if sys.platform != 'win32':
pthread = ctypes.CDLL(find_library('pthread'), use_errno=True)
pthread.sem_open.restype = ctypes.c_void_p
pthread.sem_close.argtypes = [ctypes.c_void_p]
pthread.sem_wait.argtypes = [ctypes.c_void_p]
pthread.sem_trywait.argtypes = [ctypes.c_void_p]
pthread.sem_post.argtypes = [ctypes.c_void_p]
pthread.sem_getvalue.argtypes = [ctypes.c_void_p, ctypes.c_void_p]
pthread.sem_unlink.argtypes = [ctypes.c_char_p]
if sys.platform != "darwin":
pthread.sem_timedwait.argtypes = [ctypes.c_void_p,
ctypes.POINTER(timespec)]
try:
from threading import get_ident
except ImportError:
def get_ident():
return threading.current_thread().ident
if sys.version_info[:2] < (3, 3):
class FileExistsError(OSError):
pass
class FileNotFoundError(OSError):
pass
def sem_unlink(name):
if pthread.sem_unlink(name.encode('ascii')) < 0:
raiseFromErrno()
def _sem_open(name, value=None):
""" Construct or retrieve a semaphore with the given name
If value is None, try to retrieve an existing named semaphore.
Else create a new semaphore with the given value
"""
if value is None:
handle = pthread.sem_open(ctypes.c_char_p(name), 0)
else:
handle = pthread.sem_open(ctypes.c_char_p(name), SEM_OFLAG, SEM_PERM,
ctypes.c_int(value))
if handle == SEM_FAILURE:
e = ctypes.get_errno()
if e == errno.EEXIST:
raise FileExistsError("a semaphore named %s already exists" % name)
elif e == errno.ENOENT:
raise FileNotFoundError('cannot find semaphore named %s' % name)
elif e == errno.ENOSYS:
raise NotImplementedError('No semaphore implementation on this '
'system')
else:
raiseFromErrno()
return handle
def _sem_timedwait(handle, timeout):
t_start = time.time()
if sys.platform != "darwin":
sec = int(timeout)
tv_sec = int(t_start)
nsec = int(1e9 * (timeout - sec) + .5)
tv_nsec = int(1e9 * (t_start - tv_sec) + .5)
deadline = timespec(sec+tv_sec, nsec+tv_nsec)
deadline.tv_sec += int(deadline.tv_nsec / 1000000000)
deadline.tv_nsec %= 1000000000
return pthread.sem_timedwait(handle, ctypes.pointer(deadline))
# PERFORMANCE WARNING
# No sem_timedwait on OSX so we implement our own method. This method can
# degrade performances has the wait can have a latency up to 20 msecs
deadline = t_start + timeout
delay = 0
now = time.time()
while True:
# Poll the sem file
res = pthread.sem_trywait(handle)
if res == 0:
return 0
else:
e = ctypes.get_errno()
if e != errno.EAGAIN:
raiseFromErrno()
# check for timeout
now = time.time()
if now > deadline:
ctypes.set_errno(errno.ETIMEDOUT)
return -1
# calculate how much time left and check the delay is not too long
# -- maximum is 20 msecs
difference = (deadline - now)
delay = min(delay, 20e-3, difference)
# Sleep and increase delay
time.sleep(delay)
delay += 1e-3
class SemLock(object):
"""ctypes wrapper to the unix semaphore"""
_rand = tempfile._RandomNameSequence()
def __init__(self, kind, value, maxvalue, name=None, unlink_now=False):
self.count = 0
self.ident = 0
self.kind = kind
self.maxvalue = maxvalue
self.name = name
self.handle = _sem_open(self.name.encode('ascii'), value)
def __del__(self):
try:
res = pthread.sem_close(self.handle)
assert res == 0, "Issue while closing semaphores"
except AttributeError:
pass
def _is_mine(self):
return self.count > 0 and get_ident() == self.ident
def acquire(self, block=True, timeout=None):
if self.kind == RECURSIVE_MUTEX and self._is_mine():
self.count += 1
return True
if block and timeout is None:
res = pthread.sem_wait(self.handle)
elif not block or timeout <= 0:
res = pthread.sem_trywait(self.handle)
else:
res = _sem_timedwait(self.handle, timeout)
if res < 0:
e = ctypes.get_errno()
if e == errno.EINTR:
return None
elif e in [errno.EAGAIN, errno.ETIMEDOUT]:
return False
raiseFromErrno()
self.count += 1
self.ident = get_ident()
return True
def release(self):
if self.kind == RECURSIVE_MUTEX:
assert self._is_mine(), (
"attempt to release recursive lock not owned by thread")
if self.count > 1:
self.count -= 1
return
assert self.count == 1
else:
if sys.platform == 'darwin':
# Handle broken get_value for mac ==> only Lock will work
# as sem_get_value do not work properly
if self.maxvalue == 1:
if pthread.sem_trywait(self.handle) < 0:
e = ctypes.get_errno()
if e != errno.EAGAIN:
raise OSError(e, errno.errorcode[e])
else:
if pthread.sem_post(self.handle) < 0:
raiseFromErrno()
else:
raise ValueError(
"semaphore or lock released too many times")
else:
import warnings
warnings.warn("semaphore are broken on OSX, release might "
"increase its maximal value", RuntimeWarning)
else:
value = self._get_value()
if value >= self.maxvalue:
raise ValueError(
"semaphore or lock released too many times")
if pthread.sem_post(self.handle) < 0:
raiseFromErrno()
self.count -= 1
def _get_value(self):
value = ctypes.pointer(ctypes.c_int(-1))
if pthread.sem_getvalue(self.handle, value) < 0:
raiseFromErrno()
return value.contents.value
def _count(self):
return self.count
def _is_zero(self):
if sys.platform == 'darwin':
# Handle broken get_value for mac ==> only Lock will work
# as sem_get_value do not work properly
if pthread.sem_trywait(self.handle) < 0:
e = ctypes.get_errno()
if e == errno.EAGAIN:
return True
raise OSError(e, errno.errorcode[e])
else:
if pthread.sem_post(self.handle) < 0:
raiseFromErrno()
return False
else:
value = ctypes.pointer(ctypes.c_int(-1))
if pthread.sem_getvalue(self.handle, value) < 0:
raiseFromErrno()
return value.contents.value == 0
def _after_fork(self):
self.count = 0
@staticmethod
def _rebuild(handle, kind, maxvalue, name):
self = SemLock.__new__(SemLock)
self.count = 0
self.ident = 0
self.kind = kind
self.maxvalue = maxvalue
self.name = name
self.handle = _sem_open(name.encode('ascii'))
return self
def raiseFromErrno():
e = ctypes.get_errno()
raise OSError(e, errno.errorcode[e])