Committed by
Gerrit Code Review
HazelcastLinkResourceStore
Change-Id: Ic5d6bf9b54b023368a883e3665484900ccda44e7
Showing
3 changed files
with
868 additions
and
0 deletions
1 | +/* | ||
2 | + * Copyright 2014 Open Networking Laboratory | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.onlab.onos.store.hz; | ||
17 | + | ||
18 | +import static com.google.common.base.Preconditions.checkNotNull; | ||
19 | + | ||
20 | +import java.util.ArrayList; | ||
21 | +import java.util.Collection; | ||
22 | +import java.util.HashSet; | ||
23 | +import java.util.Set; | ||
24 | +import java.util.concurrent.TimeUnit; | ||
25 | + | ||
26 | +import org.onlab.onos.store.serializers.StoreSerializer; | ||
27 | + | ||
28 | +import com.hazelcast.core.TransactionalMap; | ||
29 | +import com.hazelcast.query.Predicate; | ||
30 | + | ||
31 | +// TODO: implement Predicate, etc. if we need them. | ||
32 | +/** | ||
33 | + * Wrapper around TransactionalMap<byte[], byte[]> which serializes/deserializes | ||
34 | + * key and value using StoreSerializer. | ||
35 | + * | ||
36 | + * @param <K> key type | ||
37 | + * @param <V> value type | ||
38 | + */ | ||
39 | +public class STxMap<K, V> implements TransactionalMap<K, V> { | ||
40 | + | ||
41 | + private final TransactionalMap<byte[], byte[]> m; | ||
42 | + private final StoreSerializer serializer; | ||
43 | + | ||
44 | + /** | ||
45 | + * Creates a STxMap instance. | ||
46 | + * | ||
47 | + * @param baseMap base IMap to use | ||
48 | + * @param serializer serializer to use for both key and value | ||
49 | + */ | ||
50 | + public STxMap(TransactionalMap<byte[], byte[]> baseMap, StoreSerializer serializer) { | ||
51 | + this.m = checkNotNull(baseMap); | ||
52 | + this.serializer = checkNotNull(serializer); | ||
53 | + } | ||
54 | + | ||
55 | + @Override | ||
56 | + public int size() { | ||
57 | + return m.size(); | ||
58 | + } | ||
59 | + | ||
60 | + @Override | ||
61 | + public boolean isEmpty() { | ||
62 | + return m.isEmpty(); | ||
63 | + } | ||
64 | + | ||
65 | + @Deprecated | ||
66 | + @Override | ||
67 | + public Object getId() { | ||
68 | + return m.getId(); | ||
69 | + } | ||
70 | + | ||
71 | + @Override | ||
72 | + public String getPartitionKey() { | ||
73 | + return m.getPartitionKey(); | ||
74 | + } | ||
75 | + | ||
76 | + @Override | ||
77 | + public String getName() { | ||
78 | + return m.getName(); | ||
79 | + } | ||
80 | + | ||
81 | + @Override | ||
82 | + public String getServiceName() { | ||
83 | + return m.getServiceName(); | ||
84 | + } | ||
85 | + | ||
86 | + @Override | ||
87 | + public void destroy() { | ||
88 | + m.destroy(); | ||
89 | + } | ||
90 | + | ||
91 | + @Override | ||
92 | + public boolean containsKey(Object key) { | ||
93 | + return m.containsKey(serializeKey(key)); | ||
94 | + } | ||
95 | + | ||
96 | + @Override | ||
97 | + public V get(Object key) { | ||
98 | + return deserializeVal(m.get(serializeKey(key))); | ||
99 | + } | ||
100 | + | ||
101 | + @Override | ||
102 | + public V getForUpdate(Object key) { | ||
103 | + // TODO Auto-generated method stub | ||
104 | + return deserializeVal(m.getForUpdate(serializeKey(key))); | ||
105 | + } | ||
106 | + | ||
107 | + @Override | ||
108 | + public V put(K key, V value) { | ||
109 | + return deserializeVal(m.put(serializeKey(key), serializeVal(value))); | ||
110 | + } | ||
111 | + | ||
112 | + @Override | ||
113 | + public V remove(Object key) { | ||
114 | + return deserializeVal(m.remove(serializeKey(key))); | ||
115 | + } | ||
116 | + | ||
117 | + @Override | ||
118 | + public boolean remove(Object key, Object value) { | ||
119 | + return m.remove(serializeKey(key), serializeVal(value)); | ||
120 | + } | ||
121 | + | ||
122 | + @Override | ||
123 | + public void delete(Object key) { | ||
124 | + m.delete(serializeKey(key)); | ||
125 | + } | ||
126 | + | ||
127 | + @Override | ||
128 | + public V put(K key, V value, long ttl, TimeUnit timeunit) { | ||
129 | + return deserializeVal(m.put(serializeKey(key), serializeVal(value), ttl, timeunit)); | ||
130 | + } | ||
131 | + | ||
132 | + @Override | ||
133 | + public V putIfAbsent(K key, V value) { | ||
134 | + return deserializeVal(m.putIfAbsent(serializeKey(key), serializeVal(value))); | ||
135 | + } | ||
136 | + | ||
137 | + @Override | ||
138 | + public boolean replace(K key, V oldValue, V newValue) { | ||
139 | + return m.replace(serializeKey(key), serializeVal(oldValue), serializeVal(newValue)); | ||
140 | + } | ||
141 | + | ||
142 | + @Override | ||
143 | + public V replace(K key, V value) { | ||
144 | + return deserializeVal(m.replace(serializeKey(key), serializeVal(value))); | ||
145 | + } | ||
146 | + | ||
147 | + @Override | ||
148 | + public void set(K key, V value) { | ||
149 | + m.set(serializeKey(key), serializeVal(value)); | ||
150 | + } | ||
151 | + | ||
152 | + | ||
153 | + @Override | ||
154 | + public Set<K> keySet() { | ||
155 | + return deserializeKeySet(m.keySet()); | ||
156 | + } | ||
157 | + | ||
158 | + @Override | ||
159 | + public Collection<V> values() { | ||
160 | + return deserializeVals(m.values()); | ||
161 | + } | ||
162 | + | ||
163 | + @Deprecated // marking method not implemented | ||
164 | + @SuppressWarnings("rawtypes") | ||
165 | + @Override | ||
166 | + public Set<K> keySet(Predicate predicate) { | ||
167 | + throw new UnsupportedOperationException(); | ||
168 | + } | ||
169 | + | ||
170 | + @Deprecated // marking method not implemented | ||
171 | + @SuppressWarnings("rawtypes") | ||
172 | + @Override | ||
173 | + public Collection<V> values(Predicate predicate) { | ||
174 | + throw new UnsupportedOperationException(); | ||
175 | + } | ||
176 | + | ||
177 | + private byte[] serializeKey(Object key) { | ||
178 | + return serializer.encode(key); | ||
179 | + } | ||
180 | + | ||
181 | + private K deserializeKey(byte[] key) { | ||
182 | + return serializer.decode(key); | ||
183 | + } | ||
184 | + | ||
185 | + private byte[] serializeVal(Object val) { | ||
186 | + return serializer.encode(val); | ||
187 | + } | ||
188 | + | ||
189 | + private V deserializeVal(byte[] val) { | ||
190 | + if (val == null) { | ||
191 | + return null; | ||
192 | + } | ||
193 | + return serializer.decode(val.clone()); | ||
194 | + } | ||
195 | + | ||
196 | + private Set<K> deserializeKeySet(Set<byte[]> keys) { | ||
197 | + Set<K> dsk = new HashSet<>(keys.size()); | ||
198 | + for (byte[] key : keys) { | ||
199 | + dsk.add(deserializeKey(key)); | ||
200 | + } | ||
201 | + return dsk; | ||
202 | + } | ||
203 | + | ||
204 | + private Collection<V> deserializeVals(Collection<byte[]> vals) { | ||
205 | + Collection<V> dsl = new ArrayList<>(vals.size()); | ||
206 | + for (byte[] val : vals) { | ||
207 | + dsl.add(deserializeVal(val)); | ||
208 | + } | ||
209 | + return dsl; | ||
210 | + } | ||
211 | +} |
core/store/dist/src/main/java/org/onlab/onos/store/resource/impl/HazelcastLinkResourceStore.java
0 → 100644
1 | +/* | ||
2 | + * Copyright 2014 Open Networking Laboratory | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.onlab.onos.store.resource.impl; | ||
17 | + | ||
18 | +import java.util.ArrayList; | ||
19 | +import java.util.Collection; | ||
20 | +import java.util.Collections; | ||
21 | +import java.util.HashMap; | ||
22 | +import java.util.HashSet; | ||
23 | +import java.util.List; | ||
24 | +import java.util.Map; | ||
25 | +import java.util.Set; | ||
26 | + | ||
27 | +import org.apache.felix.scr.annotations.Activate; | ||
28 | +import org.apache.felix.scr.annotations.Component; | ||
29 | +import org.apache.felix.scr.annotations.Deactivate; | ||
30 | +import org.apache.felix.scr.annotations.Reference; | ||
31 | +import org.apache.felix.scr.annotations.ReferenceCardinality; | ||
32 | +import org.apache.felix.scr.annotations.Service; | ||
33 | +import org.onlab.onos.net.Link; | ||
34 | +import org.onlab.onos.net.LinkKey; | ||
35 | +import org.onlab.onos.net.intent.IntentId; | ||
36 | +import org.onlab.onos.net.link.LinkService; | ||
37 | +import org.onlab.onos.net.resource.Bandwidth; | ||
38 | +import org.onlab.onos.net.resource.BandwidthResourceAllocation; | ||
39 | +import org.onlab.onos.net.resource.Lambda; | ||
40 | +import org.onlab.onos.net.resource.LambdaResourceAllocation; | ||
41 | +import org.onlab.onos.net.resource.LinkResourceAllocations; | ||
42 | +import org.onlab.onos.net.resource.LinkResourceEvent; | ||
43 | +import org.onlab.onos.net.resource.LinkResourceStore; | ||
44 | +import org.onlab.onos.net.resource.ResourceAllocation; | ||
45 | +import org.onlab.onos.net.resource.ResourceType; | ||
46 | +import org.onlab.onos.store.StoreDelegate; | ||
47 | +import org.onlab.onos.store.hz.AbstractHazelcastStore; | ||
48 | +import org.onlab.onos.store.hz.STxMap; | ||
49 | +import org.slf4j.Logger; | ||
50 | + | ||
51 | +import com.google.common.collect.ImmutableList; | ||
52 | +import com.google.common.collect.ImmutableSet; | ||
53 | +import com.google.common.collect.Sets; | ||
54 | +import com.hazelcast.core.TransactionalMap; | ||
55 | +import com.hazelcast.transaction.TransactionContext; | ||
56 | +import com.hazelcast.transaction.TransactionException; | ||
57 | +import com.hazelcast.transaction.TransactionOptions; | ||
58 | +import com.hazelcast.transaction.TransactionOptions.TransactionType; | ||
59 | + | ||
60 | +import static com.google.common.base.Preconditions.checkNotNull; | ||
61 | +import static com.google.common.base.Preconditions.checkState; | ||
62 | +import static org.slf4j.LoggerFactory.getLogger; | ||
63 | + | ||
64 | +/** | ||
65 | + * Manages link resources using Hazelcast. | ||
66 | + */ | ||
67 | +@Component(immediate = true, enabled = false) | ||
68 | +@Service | ||
69 | +public class HazelcastLinkResourceStore | ||
70 | + extends AbstractHazelcastStore<LinkResourceEvent, StoreDelegate<LinkResourceEvent>> | ||
71 | + implements LinkResourceStore { | ||
72 | + | ||
73 | + | ||
74 | + private final Logger log = getLogger(getClass()); | ||
75 | + | ||
76 | + // FIXME: what is the Bandwidth unit? | ||
77 | + private static final Bandwidth DEFAULT_BANDWIDTH = Bandwidth.valueOf(1_000); | ||
78 | + | ||
79 | + private static final Bandwidth EMPTY_BW = Bandwidth.valueOf(0); | ||
80 | + | ||
81 | + // table to store current allocations | ||
82 | + /** LinkKey -> List<LinkResourceAllocations>. */ | ||
83 | + private static final String LINK_RESOURCE_ALLOCATIONS = "LinkResourceAllocations"; | ||
84 | + | ||
85 | + /** IntentId -> LinkResourceAllocations. */ | ||
86 | + private static final String INTENT_ALLOCATIONS = "IntentAllocations"; | ||
87 | + | ||
88 | + | ||
89 | + // TODO make this configurable | ||
90 | + // number of retries to attempt on allocation failure, due to | ||
91 | + // concurrent update | ||
92 | + private static int maxAllocateRetries = 5; | ||
93 | + | ||
94 | + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) | ||
95 | + protected LinkService linkService; | ||
96 | + | ||
97 | + // Link annotation key name to use as bandwidth | ||
98 | + private String bandwidthAnnotation = "bandwidth"; | ||
99 | + // Link annotation key name to use as max lambda | ||
100 | + private String wavesAnnotation = "optical.waves"; | ||
101 | + | ||
102 | + @Override | ||
103 | + @Activate | ||
104 | + public void activate() { | ||
105 | + super.activate(); | ||
106 | + log.info("Started"); | ||
107 | + } | ||
108 | + | ||
109 | + @Deactivate | ||
110 | + public void deactivate() { | ||
111 | + log.info("Stopped"); | ||
112 | + } | ||
113 | + | ||
114 | + private STxMap<IntentId, LinkResourceAllocations> getIntentAllocs(TransactionContext tx) { | ||
115 | + TransactionalMap<byte[], byte[]> raw = tx.getMap(INTENT_ALLOCATIONS); | ||
116 | + return new STxMap<>(raw, serializer); | ||
117 | + } | ||
118 | + | ||
119 | + private STxMap<LinkKey, List<LinkResourceAllocations>> getLinkAllocs(TransactionContext tx) { | ||
120 | + TransactionalMap<byte[], byte[]> raw = tx.getMap(LINK_RESOURCE_ALLOCATIONS); | ||
121 | + return new STxMap<>(raw, serializer); | ||
122 | + } | ||
123 | + | ||
124 | + private Set<? extends ResourceAllocation> getResourceCapacity(ResourceType type, Link link) { | ||
125 | + // TODO: plugin/provider mechanism to add resource type in the future? | ||
126 | + if (type == ResourceType.BANDWIDTH) { | ||
127 | + return ImmutableSet.of(getBandwidthResourceCapacity(link)); | ||
128 | + } | ||
129 | + if (type == ResourceType.LAMBDA) { | ||
130 | + return getLambdaResourceCapacity(link); | ||
131 | + } | ||
132 | + return null; | ||
133 | + } | ||
134 | + | ||
135 | + private Set<LambdaResourceAllocation> getLambdaResourceCapacity(Link link) { | ||
136 | + // FIXME enumerate all the possible link/port lambdas | ||
137 | + Set<LambdaResourceAllocation> allocations = new HashSet<>(); | ||
138 | + try { | ||
139 | + final int waves = Integer.parseInt(link.annotations().value(wavesAnnotation)); | ||
140 | + for (int i = 1; i <= waves; i++) { | ||
141 | + allocations.add(new LambdaResourceAllocation(Lambda.valueOf(i))); | ||
142 | + } | ||
143 | + } catch (NumberFormatException e) { | ||
144 | + log.debug("No {} annotation on link %s", wavesAnnotation, link); | ||
145 | + } | ||
146 | + return allocations; | ||
147 | + } | ||
148 | + | ||
149 | + private BandwidthResourceAllocation getBandwidthResourceCapacity(Link link) { | ||
150 | + | ||
151 | + // if Link annotation exist, use them | ||
152 | + // if all fails, use DEFAULT_BANDWIDTH | ||
153 | + | ||
154 | + Bandwidth bandwidth = null; | ||
155 | + String strBw = link.annotations().value(bandwidthAnnotation); | ||
156 | + if (strBw != null) { | ||
157 | + try { | ||
158 | + bandwidth = Bandwidth.valueOf(Double.parseDouble(strBw)); | ||
159 | + } catch (NumberFormatException e) { | ||
160 | + // do nothings | ||
161 | + bandwidth = null; | ||
162 | + } | ||
163 | + } | ||
164 | + | ||
165 | + if (bandwidth == null) { | ||
166 | + // fall back, use fixed default | ||
167 | + bandwidth = DEFAULT_BANDWIDTH; | ||
168 | + } | ||
169 | + return new BandwidthResourceAllocation(bandwidth); | ||
170 | + } | ||
171 | + | ||
172 | + private Map<ResourceType, Set<? extends ResourceAllocation>> getResourceCapacity(Link link) { | ||
173 | + Map<ResourceType, Set<? extends ResourceAllocation>> caps = new HashMap<>(); | ||
174 | + for (ResourceType type : ResourceType.values()) { | ||
175 | + Set<? extends ResourceAllocation> cap = getResourceCapacity(type, link); | ||
176 | + if (cap != null) { | ||
177 | + caps.put(type, cap); | ||
178 | + } | ||
179 | + } | ||
180 | + return caps; | ||
181 | + } | ||
182 | + | ||
183 | + @Override | ||
184 | + public Set<ResourceAllocation> getFreeResources(Link link) { | ||
185 | + Map<ResourceType, Set<? extends ResourceAllocation>> freeResources = getFreeResourcesEx(link); | ||
186 | + Set<ResourceAllocation> allFree = new HashSet<>(); | ||
187 | + for (Set<? extends ResourceAllocation> r:freeResources.values()) { | ||
188 | + allFree.addAll(r); | ||
189 | + } | ||
190 | + return allFree; | ||
191 | + } | ||
192 | + | ||
193 | + private Map<ResourceType, Set<? extends ResourceAllocation>> getFreeResourcesEx(Link link) { | ||
194 | + // returns capacity - allocated | ||
195 | + | ||
196 | + checkNotNull(link); | ||
197 | + Map<ResourceType, Set<? extends ResourceAllocation>> free = new HashMap<>(); | ||
198 | + final Map<ResourceType, Set<? extends ResourceAllocation>> caps = getResourceCapacity(link); | ||
199 | + final Iterable<LinkResourceAllocations> allocations = getAllocations(link); | ||
200 | + | ||
201 | + for (ResourceType type : ResourceType.values()) { | ||
202 | + // there should be class/category of resources | ||
203 | + switch (type) { | ||
204 | + case BANDWIDTH: | ||
205 | + { | ||
206 | + Set<? extends ResourceAllocation> bw = caps.get(ResourceType.BANDWIDTH); | ||
207 | + if (bw == null || bw.isEmpty()) { | ||
208 | + bw = Sets.newHashSet(new BandwidthResourceAllocation(EMPTY_BW)); | ||
209 | + } | ||
210 | + | ||
211 | + BandwidthResourceAllocation cap = (BandwidthResourceAllocation) bw.iterator().next(); | ||
212 | + double freeBw = cap.bandwidth().toDouble(); | ||
213 | + | ||
214 | + // enumerate current allocations, subtracting resources | ||
215 | + for (LinkResourceAllocations alloc : allocations) { | ||
216 | + Set<ResourceAllocation> types = alloc.getResourceAllocation(link); | ||
217 | + for (ResourceAllocation a : types) { | ||
218 | + if (a instanceof BandwidthResourceAllocation) { | ||
219 | + BandwidthResourceAllocation bwA = (BandwidthResourceAllocation) a; | ||
220 | + freeBw -= bwA.bandwidth().toDouble(); | ||
221 | + } | ||
222 | + } | ||
223 | + } | ||
224 | + | ||
225 | + free.put(type, Sets.newHashSet(new BandwidthResourceAllocation(Bandwidth.valueOf(freeBw)))); | ||
226 | + break; | ||
227 | + } | ||
228 | + | ||
229 | + case LAMBDA: | ||
230 | + { | ||
231 | + Set<? extends ResourceAllocation> lmd = caps.get(type); | ||
232 | + if (lmd == null || lmd.isEmpty()) { | ||
233 | + // nothing left | ||
234 | + break; | ||
235 | + } | ||
236 | + Set<LambdaResourceAllocation> freeL = new HashSet<>(); | ||
237 | + for (ResourceAllocation r : lmd) { | ||
238 | + if (r instanceof LambdaResourceAllocation) { | ||
239 | + freeL.add((LambdaResourceAllocation) r); | ||
240 | + } | ||
241 | + } | ||
242 | + | ||
243 | + // enumerate current allocations, removing resources | ||
244 | + for (LinkResourceAllocations alloc : allocations) { | ||
245 | + Set<ResourceAllocation> types = alloc.getResourceAllocation(link); | ||
246 | + for (ResourceAllocation a : types) { | ||
247 | + if (a instanceof LambdaResourceAllocation) { | ||
248 | + freeL.remove(a); | ||
249 | + } | ||
250 | + } | ||
251 | + } | ||
252 | + | ||
253 | + free.put(type, freeL); | ||
254 | + break; | ||
255 | + } | ||
256 | + | ||
257 | + default: | ||
258 | + break; | ||
259 | + } | ||
260 | + } | ||
261 | + return free; | ||
262 | + } | ||
263 | + | ||
264 | + @Override | ||
265 | + public void allocateResources(LinkResourceAllocations allocations) { | ||
266 | + checkNotNull(allocations); | ||
267 | + | ||
268 | + for (int i = 0; i < maxAllocateRetries; ++i) { | ||
269 | + TransactionContext tx = theInstance.newTransactionContext(); | ||
270 | + tx.beginTransaction(); | ||
271 | + try { | ||
272 | + | ||
273 | + STxMap<IntentId, LinkResourceAllocations> intentAllocs = getIntentAllocs(tx); | ||
274 | + // should this be conditional write? | ||
275 | + intentAllocs.put(allocations.intendId(), allocations); | ||
276 | + | ||
277 | + for (Link link : allocations.links()) { | ||
278 | + allocateLinkResource(tx, link, allocations); | ||
279 | + } | ||
280 | + | ||
281 | + tx.commitTransaction(); | ||
282 | + return; | ||
283 | + } catch (TransactionException e) { | ||
284 | + log.debug("Failed to commit allocations for {}. [retry={}]", | ||
285 | + allocations.intendId(), i); | ||
286 | + log.trace(" details {} ", allocations, e); | ||
287 | + continue; | ||
288 | + } catch (Exception e) { | ||
289 | + log.error("Exception thrown, rolling back", e); | ||
290 | + tx.rollbackTransaction(); | ||
291 | + throw e; | ||
292 | + } | ||
293 | + } | ||
294 | + } | ||
295 | + | ||
296 | + private void allocateLinkResource(TransactionContext tx, Link link, | ||
297 | + LinkResourceAllocations allocations) { | ||
298 | + | ||
299 | + // requested resources | ||
300 | + Set<ResourceAllocation> reqs = allocations.getResourceAllocation(link); | ||
301 | + | ||
302 | + Map<ResourceType, Set<? extends ResourceAllocation>> available = getFreeResourcesEx(link); | ||
303 | + for (ResourceAllocation req : reqs) { | ||
304 | + Set<? extends ResourceAllocation> avail = available.get(req.type()); | ||
305 | + if (req instanceof BandwidthResourceAllocation) { | ||
306 | + // check if allocation should be accepted | ||
307 | + if (avail.isEmpty()) { | ||
308 | + checkState(!avail.isEmpty(), | ||
309 | + "There's no Bandwidth resource on %s?", | ||
310 | + link); | ||
311 | + } | ||
312 | + BandwidthResourceAllocation bw = (BandwidthResourceAllocation) avail.iterator().next(); | ||
313 | + double bwLeft = bw.bandwidth().toDouble(); | ||
314 | + bwLeft -= ((BandwidthResourceAllocation) req).bandwidth().toDouble(); | ||
315 | + if (bwLeft < 0) { | ||
316 | + // FIXME throw appropriate Exception | ||
317 | + checkState(bwLeft >= 0, | ||
318 | + "There's no Bandwidth left on %s. %s", | ||
319 | + link, bwLeft); | ||
320 | + } | ||
321 | + } else if (req instanceof LambdaResourceAllocation) { | ||
322 | + | ||
323 | + // check if allocation should be accepted | ||
324 | + if (!avail.contains(req)) { | ||
325 | + // requested lambda was not available | ||
326 | + // FIXME throw appropriate exception | ||
327 | + checkState(avail.contains(req), | ||
328 | + "Allocating %s on %s failed", | ||
329 | + req, link); | ||
330 | + } | ||
331 | + } | ||
332 | + } | ||
333 | + // all requests allocatable => add allocation | ||
334 | + final LinkKey linkKey = LinkKey.linkKey(link); | ||
335 | + STxMap<LinkKey, List<LinkResourceAllocations>> linkAllocs = getLinkAllocs(tx); | ||
336 | + final List<LinkResourceAllocations> before = linkAllocs.get(linkKey); | ||
337 | + List<LinkResourceAllocations> after = new ArrayList<>(before.size() + 1); | ||
338 | + after.addAll(before); | ||
339 | + after.add(allocations); | ||
340 | + linkAllocs.replace(linkKey, before, after); | ||
341 | + } | ||
342 | + | ||
343 | + @Override | ||
344 | + public LinkResourceEvent releaseResources(LinkResourceAllocations allocations) { | ||
345 | + checkNotNull(allocations); | ||
346 | + | ||
347 | + final IntentId intendId = allocations.intendId(); | ||
348 | + final Collection<Link> links = allocations.links(); | ||
349 | + | ||
350 | + boolean success = false; | ||
351 | + do { | ||
352 | + // TODO: smaller tx unit to lower the chance of collisions? | ||
353 | + TransactionContext tx = theInstance.newTransactionContext(); | ||
354 | + tx.beginTransaction(); | ||
355 | + try { | ||
356 | + STxMap<IntentId, LinkResourceAllocations> intentAllocs = getIntentAllocs(tx); | ||
357 | + intentAllocs.remove(intendId); | ||
358 | + | ||
359 | + STxMap<LinkKey, List<LinkResourceAllocations>> linkAllocs = getLinkAllocs(tx); | ||
360 | + | ||
361 | + for (Link link : links) { | ||
362 | + final LinkKey linkId = LinkKey.linkKey(link); | ||
363 | + | ||
364 | + List<LinkResourceAllocations> before = linkAllocs.get(linkId); | ||
365 | + if (before == null || before.isEmpty()) { | ||
366 | + // something is wrong, but it is already freed | ||
367 | + log.warn("There was no resource left to release on {}", linkId); | ||
368 | + continue; | ||
369 | + } | ||
370 | + List<LinkResourceAllocations> after = new ArrayList<>(before); | ||
371 | + after.remove(allocations); | ||
372 | + linkAllocs.replace(linkId, before, after); | ||
373 | + } | ||
374 | + | ||
375 | + tx.commitTransaction(); | ||
376 | + success = true; | ||
377 | + } catch (TransactionException e) { | ||
378 | + log.debug("Transaction failed, retrying"); | ||
379 | + } catch (Exception e) { | ||
380 | + log.error("Exception thrown during releaseResource {}", | ||
381 | + allocations, e); | ||
382 | + tx.rollbackTransaction(); | ||
383 | + throw e; | ||
384 | + } | ||
385 | + } while (!success); | ||
386 | + | ||
387 | + // Issue events to force recompilation of intents. | ||
388 | + final List<LinkResourceAllocations> releasedResources = | ||
389 | + ImmutableList.of(allocations); | ||
390 | + return new LinkResourceEvent( | ||
391 | + LinkResourceEvent.Type.ADDITIONAL_RESOURCES_AVAILABLE, | ||
392 | + releasedResources); | ||
393 | + } | ||
394 | + | ||
395 | + @Override | ||
396 | + public LinkResourceAllocations getAllocations(IntentId intentId) { | ||
397 | + checkNotNull(intentId); | ||
398 | + TransactionOptions opt = new TransactionOptions(); | ||
399 | + // read-only and will never be commited, thus does not need durability | ||
400 | + opt.setTransactionType(TransactionType.LOCAL); | ||
401 | + TransactionContext tx = theInstance.newTransactionContext(opt); | ||
402 | + tx.beginTransaction(); | ||
403 | + try { | ||
404 | + STxMap<IntentId, LinkResourceAllocations> intentAllocs = getIntentAllocs(tx); | ||
405 | + return intentAllocs.get(intentId); | ||
406 | + } finally { | ||
407 | + tx.rollbackTransaction(); | ||
408 | + } | ||
409 | + } | ||
410 | + | ||
411 | + @Override | ||
412 | + public List<LinkResourceAllocations> getAllocations(Link link) { | ||
413 | + checkNotNull(link); | ||
414 | + final LinkKey key = LinkKey.linkKey(link); | ||
415 | + | ||
416 | + TransactionOptions opt = new TransactionOptions(); | ||
417 | + // read-only and will never be commited, thus does not need durability | ||
418 | + opt.setTransactionType(TransactionType.LOCAL); | ||
419 | + TransactionContext tx = theInstance.newTransactionContext(opt); | ||
420 | + tx.beginTransaction(); | ||
421 | + List<LinkResourceAllocations> res = null; | ||
422 | + try { | ||
423 | + STxMap<LinkKey, List<LinkResourceAllocations>> linkAllocs = getLinkAllocs(tx); | ||
424 | + res = linkAllocs.get(key); | ||
425 | + } finally { | ||
426 | + tx.rollbackTransaction(); | ||
427 | + } | ||
428 | + | ||
429 | + if (res == null) { | ||
430 | + // try to add empty list | ||
431 | + TransactionContext tx2 = theInstance.newTransactionContext(); | ||
432 | + tx2.beginTransaction(); | ||
433 | + try { | ||
434 | + res = getLinkAllocs(tx2).putIfAbsent(key, new ArrayList<>()); | ||
435 | + tx2.commitTransaction(); | ||
436 | + if (res == null) { | ||
437 | + return Collections.emptyList(); | ||
438 | + } else { | ||
439 | + return res; | ||
440 | + } | ||
441 | + } catch (TransactionException e) { | ||
442 | + // concurrently added? | ||
443 | + return getAllocations(link); | ||
444 | + } catch (Exception e) { | ||
445 | + tx.rollbackTransaction(); | ||
446 | + } | ||
447 | + } | ||
448 | + return res; | ||
449 | + | ||
450 | + } | ||
451 | + | ||
452 | + @Override | ||
453 | + public Iterable<LinkResourceAllocations> getAllocations() { | ||
454 | + TransactionContext tx = theInstance.newTransactionContext(); | ||
455 | + tx.beginTransaction(); | ||
456 | + try { | ||
457 | + STxMap<IntentId, LinkResourceAllocations> intentAllocs = getIntentAllocs(tx); | ||
458 | + return intentAllocs.values(); | ||
459 | + } finally { | ||
460 | + tx.rollbackTransaction(); | ||
461 | + } | ||
462 | + } | ||
463 | +} |
core/store/dist/src/test/java/org/onlab/onos/store/resource/impl/HazelcastLinkResourceStoreTest.java
0 → 100644
1 | +/* | ||
2 | + * Copyright 2014 Open Networking Laboratory | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.onlab.onos.store.resource.impl; | ||
17 | + | ||
18 | +import java.util.HashSet; | ||
19 | +import java.util.Set; | ||
20 | + | ||
21 | +import org.junit.After; | ||
22 | +import org.junit.Before; | ||
23 | +import org.junit.Test; | ||
24 | +import org.onlab.onos.net.AnnotationKeys; | ||
25 | +import org.onlab.onos.net.Annotations; | ||
26 | +import org.onlab.onos.net.ConnectPoint; | ||
27 | +import org.onlab.onos.net.DefaultAnnotations; | ||
28 | +import org.onlab.onos.net.DefaultLink; | ||
29 | +import org.onlab.onos.net.Link; | ||
30 | +import org.onlab.onos.net.provider.ProviderId; | ||
31 | +import org.onlab.onos.net.resource.Bandwidth; | ||
32 | +import org.onlab.onos.net.resource.BandwidthResourceAllocation; | ||
33 | +import org.onlab.onos.net.resource.LambdaResourceAllocation; | ||
34 | +import org.onlab.onos.net.resource.LinkResourceAllocations; | ||
35 | +import org.onlab.onos.net.resource.LinkResourceStore; | ||
36 | +import org.onlab.onos.net.resource.ResourceAllocation; | ||
37 | +import org.onlab.onos.net.resource.ResourceType; | ||
38 | +import org.onlab.onos.store.hz.StoreService; | ||
39 | +import org.onlab.onos.store.hz.TestStoreManager; | ||
40 | + | ||
41 | +import com.hazelcast.config.Config; | ||
42 | +import com.hazelcast.core.Hazelcast; | ||
43 | + | ||
44 | +import static org.junit.Assert.assertEquals; | ||
45 | +import static org.junit.Assert.assertFalse; | ||
46 | +import static org.junit.Assert.assertNotNull; | ||
47 | +import static org.onlab.onos.net.DeviceId.deviceId; | ||
48 | +import static org.onlab.onos.net.Link.Type.DIRECT; | ||
49 | +import static org.onlab.onos.net.PortNumber.portNumber; | ||
50 | + | ||
51 | +/** | ||
52 | + * Test of the simple LinkResourceStore implementation. | ||
53 | + */ | ||
54 | +public class HazelcastLinkResourceStoreTest { | ||
55 | + | ||
56 | + private LinkResourceStore store; | ||
57 | + private HazelcastLinkResourceStore storeImpl; | ||
58 | + private Link link1; | ||
59 | + private Link link2; | ||
60 | + private Link link3; | ||
61 | + private TestStoreManager storeMgr; | ||
62 | + | ||
63 | + /** | ||
64 | + * Returns {@link Link} object. | ||
65 | + * | ||
66 | + * @param dev1 source device | ||
67 | + * @param port1 source port | ||
68 | + * @param dev2 destination device | ||
69 | + * @param port2 destination port | ||
70 | + * @return created {@link Link} object | ||
71 | + */ | ||
72 | + private Link newLink(String dev1, int port1, String dev2, int port2) { | ||
73 | + Annotations annotations = DefaultAnnotations.builder() | ||
74 | + .set(AnnotationKeys.OPTICAL_WAVES, "80") | ||
75 | + .set(AnnotationKeys.BANDWIDTH, "1000000") | ||
76 | + .build(); | ||
77 | + return new DefaultLink( | ||
78 | + new ProviderId("of", "foo"), | ||
79 | + new ConnectPoint(deviceId(dev1), portNumber(port1)), | ||
80 | + new ConnectPoint(deviceId(dev2), portNumber(port2)), | ||
81 | + DIRECT, annotations); | ||
82 | + } | ||
83 | + | ||
84 | + @Before | ||
85 | + public void setUp() throws Exception { | ||
86 | + | ||
87 | + Config config = TestStoreManager.getTestConfig(); | ||
88 | + | ||
89 | + storeMgr = new TestStoreManager(Hazelcast.newHazelcastInstance(config)); | ||
90 | + storeMgr.activate(); | ||
91 | + | ||
92 | + | ||
93 | + storeImpl = new TestHazelcastLinkResourceStore(storeMgr); | ||
94 | + storeImpl.activate(); | ||
95 | + store = storeImpl; | ||
96 | + | ||
97 | + link1 = newLink("of:1", 1, "of:2", 2); | ||
98 | + link2 = newLink("of:2", 1, "of:3", 2); | ||
99 | + link3 = newLink("of:3", 1, "of:4", 2); | ||
100 | + } | ||
101 | + | ||
102 | + @After | ||
103 | + public void tearDown() throws Exception { | ||
104 | + storeImpl.deactivate(); | ||
105 | + | ||
106 | + storeMgr.deactivate(); | ||
107 | + } | ||
108 | + | ||
109 | + /** | ||
110 | + * Tests constructor and activate method. | ||
111 | + */ | ||
112 | + @Test | ||
113 | + public void testConstructorAndActivate() { | ||
114 | + final Iterable<LinkResourceAllocations> allAllocations = store.getAllocations(); | ||
115 | + assertNotNull(allAllocations); | ||
116 | + assertFalse(allAllocations.iterator().hasNext()); | ||
117 | + | ||
118 | + final Iterable<LinkResourceAllocations> linkAllocations = | ||
119 | + store.getAllocations(link1); | ||
120 | + assertNotNull(linkAllocations); | ||
121 | + assertFalse(linkAllocations.iterator().hasNext()); | ||
122 | + | ||
123 | + final Set<ResourceAllocation> res = store.getFreeResources(link2); | ||
124 | + assertNotNull(res); | ||
125 | + } | ||
126 | + | ||
127 | + /** | ||
128 | + * Picks up and returns one of bandwidth allocations from a given set. | ||
129 | + * | ||
130 | + * @param resources the set of {@link ResourceAllocation}s | ||
131 | + * @return {@link BandwidthResourceAllocation} object if found, null | ||
132 | + * otherwise | ||
133 | + */ | ||
134 | + private BandwidthResourceAllocation getBandwidthObj(Set<ResourceAllocation> resources) { | ||
135 | + for (ResourceAllocation res : resources) { | ||
136 | + if (res.type() == ResourceType.BANDWIDTH) { | ||
137 | + return ((BandwidthResourceAllocation) res); | ||
138 | + } | ||
139 | + } | ||
140 | + return null; | ||
141 | + } | ||
142 | + | ||
143 | + /** | ||
144 | + * Returns all lambda allocations from a given set. | ||
145 | + * | ||
146 | + * @param resources the set of {@link ResourceAllocation}s | ||
147 | + * @return a set of {@link LambdaResourceAllocation} objects | ||
148 | + */ | ||
149 | + private Set<LambdaResourceAllocation> getLambdaObjs(Set<ResourceAllocation> resources) { | ||
150 | + Set<LambdaResourceAllocation> lambdaResources = new HashSet<>(); | ||
151 | + for (ResourceAllocation res : resources) { | ||
152 | + if (res.type() == ResourceType.LAMBDA) { | ||
153 | + lambdaResources.add((LambdaResourceAllocation) res); | ||
154 | + } | ||
155 | + } | ||
156 | + return lambdaResources; | ||
157 | + } | ||
158 | + | ||
159 | + /** | ||
160 | + * Tests initial free bandwidth for a link. | ||
161 | + */ | ||
162 | + @Test | ||
163 | + public void testInitialBandwidth() { | ||
164 | + final Set<ResourceAllocation> freeRes = store.getFreeResources(link1); | ||
165 | + assertNotNull(freeRes); | ||
166 | + | ||
167 | + final BandwidthResourceAllocation alloc = getBandwidthObj(freeRes); | ||
168 | + assertNotNull(alloc); | ||
169 | + | ||
170 | + assertEquals(Bandwidth.valueOf(1000000.0), alloc.bandwidth()); | ||
171 | + } | ||
172 | + | ||
173 | + /** | ||
174 | + * Tests initial free lambda for a link. | ||
175 | + */ | ||
176 | + @Test | ||
177 | + public void testInitialLambdas() { | ||
178 | + final Set<ResourceAllocation> freeRes = store.getFreeResources(link3); | ||
179 | + assertNotNull(freeRes); | ||
180 | + | ||
181 | + final Set<LambdaResourceAllocation> res = getLambdaObjs(freeRes); | ||
182 | + assertNotNull(res); | ||
183 | + assertEquals(80, res.size()); | ||
184 | + } | ||
185 | + | ||
186 | + public static final class TestHazelcastLinkResourceStore | ||
187 | + extends HazelcastLinkResourceStore { | ||
188 | + | ||
189 | + public TestHazelcastLinkResourceStore(StoreService storeMgr) { | ||
190 | + super.storeService = storeMgr; | ||
191 | + } | ||
192 | + | ||
193 | + } | ||
194 | +} |
-
Please register or login to post a comment