Added graceful shutdown for upstart service.
Reworked slightly the mastership & device managers and stores to make it work (sort-of) in the distributed env.
Showing
30 changed files
with
604 additions
and
381 deletions
... | @@ -3,9 +3,11 @@ package org.onlab.onos.cli.net; | ... | @@ -3,9 +3,11 @@ package org.onlab.onos.cli.net; |
3 | import org.apache.karaf.shell.commands.Argument; | 3 | import org.apache.karaf.shell.commands.Argument; |
4 | import org.apache.karaf.shell.commands.Command; | 4 | import org.apache.karaf.shell.commands.Command; |
5 | import org.onlab.onos.cli.AbstractShellCommand; | 5 | import org.onlab.onos.cli.AbstractShellCommand; |
6 | -import org.onlab.onos.net.DeviceId; | 6 | +import org.onlab.onos.cluster.MastershipAdminService; |
7 | +import org.onlab.onos.cluster.NodeId; | ||
7 | import org.onlab.onos.net.MastershipRole; | 8 | import org.onlab.onos.net.MastershipRole; |
8 | -import org.onlab.onos.net.device.DeviceAdminService; | 9 | + |
10 | +import static org.onlab.onos.net.DeviceId.deviceId; | ||
9 | 11 | ||
10 | /** | 12 | /** |
11 | * Sets role of the controller node for the given infrastructure device. | 13 | * Sets role of the controller node for the given infrastructure device. |
... | @@ -18,15 +20,19 @@ public class DeviceRoleCommand extends AbstractShellCommand { | ... | @@ -18,15 +20,19 @@ public class DeviceRoleCommand extends AbstractShellCommand { |
18 | required = true, multiValued = false) | 20 | required = true, multiValued = false) |
19 | String uri = null; | 21 | String uri = null; |
20 | 22 | ||
21 | - @Argument(index = 1, name = "role", description = "Mastership role", | 23 | + @Argument(index = 1, name = "node", description = "Node ID", |
24 | + required = true, multiValued = false) | ||
25 | + String node = null; | ||
26 | + | ||
27 | + @Argument(index = 2, name = "role", description = "Mastership role", | ||
22 | required = true, multiValued = false) | 28 | required = true, multiValued = false) |
23 | String role = null; | 29 | String role = null; |
24 | 30 | ||
25 | @Override | 31 | @Override |
26 | protected void execute() { | 32 | protected void execute() { |
33 | + MastershipAdminService service = get(MastershipAdminService.class); | ||
27 | MastershipRole mastershipRole = MastershipRole.valueOf(role.toUpperCase()); | 34 | MastershipRole mastershipRole = MastershipRole.valueOf(role.toUpperCase()); |
28 | - get(DeviceAdminService.class).setRole(DeviceId.deviceId(uri), | 35 | + service.setRole(new NodeId(node), deviceId(uri), mastershipRole); |
29 | - mastershipRole); | ||
30 | } | 36 | } |
31 | 37 | ||
32 | } | 38 | } | ... | ... |
1 | +package org.onlab.onos.cluster; | ||
2 | + | ||
3 | +/** | ||
4 | + * Service for administering the cluster node membership. | ||
5 | + */ | ||
6 | +public interface ClusterAdminService { | ||
7 | + | ||
8 | + /** | ||
9 | + * Removes the specified node from the cluster node list. | ||
10 | + * | ||
11 | + * @param nodeId controller node identifier | ||
12 | + */ | ||
13 | + void removeNode(NodeId nodeId); | ||
14 | + | ||
15 | +} |
... | @@ -8,7 +8,7 @@ import java.util.Set; | ... | @@ -8,7 +8,7 @@ import java.util.Set; |
8 | public interface ClusterStore { | 8 | public interface ClusterStore { |
9 | 9 | ||
10 | /** | 10 | /** |
11 | - * Returns the local controller instance. | 11 | + * Returns the local controller node. |
12 | * | 12 | * |
13 | * @return local controller instance | 13 | * @return local controller instance |
14 | */ | 14 | */ |
... | @@ -22,7 +22,7 @@ public interface ClusterStore { | ... | @@ -22,7 +22,7 @@ public interface ClusterStore { |
22 | Set<ControllerNode> getNodes(); | 22 | Set<ControllerNode> getNodes(); |
23 | 23 | ||
24 | /** | 24 | /** |
25 | - * Returns the specified controller instance. | 25 | + * Returns the specified controller node. |
26 | * | 26 | * |
27 | * @param nodeId controller instance identifier | 27 | * @param nodeId controller instance identifier |
28 | * @return controller instance | 28 | * @return controller instance |
... | @@ -30,11 +30,18 @@ public interface ClusterStore { | ... | @@ -30,11 +30,18 @@ public interface ClusterStore { |
30 | ControllerNode getNode(NodeId nodeId); | 30 | ControllerNode getNode(NodeId nodeId); |
31 | 31 | ||
32 | /** | 32 | /** |
33 | - * Returns the availability state of the specified controller instance. | 33 | + * Returns the availability state of the specified controller node. |
34 | * | 34 | * |
35 | * @param nodeId controller instance identifier | 35 | * @param nodeId controller instance identifier |
36 | * @return availability state | 36 | * @return availability state |
37 | */ | 37 | */ |
38 | ControllerNode.State getState(NodeId nodeId); | 38 | ControllerNode.State getState(NodeId nodeId); |
39 | 39 | ||
40 | + /** | ||
41 | + * Removes the specified node from the inventory of cluster nodes. | ||
42 | + * | ||
43 | + * @param nodeId controller instance identifier | ||
44 | + */ | ||
45 | + void removeNode(NodeId nodeId); | ||
46 | + | ||
40 | } | 47 | } | ... | ... |
1 | -package org.onlab.onos.cluster; | ||
2 | - | ||
3 | -import org.onlab.onos.net.provider.Provider; | ||
4 | - | ||
5 | -/** | ||
6 | - * Abstraction of a mastership information provider. | ||
7 | - */ | ||
8 | -public interface MastershipProvider extends Provider { | ||
9 | - // do we get role info from the local OFcontroller impl? | ||
10 | - // needs to also read from distributed store and emit events? | ||
11 | - // roleChanged(DeviceId deviceId, MastershipRole newRole); | ||
12 | -} |
1 | -package org.onlab.onos.cluster; | ||
2 | - | ||
3 | -import org.onlab.onos.net.DeviceId; | ||
4 | -import org.onlab.onos.net.MastershipRole; | ||
5 | -import org.onlab.onos.net.provider.ProviderService; | ||
6 | - | ||
7 | -public interface MastershipProviderService extends | ||
8 | - ProviderService<MastershipProvider> { | ||
9 | - | ||
10 | - /** | ||
11 | - * Signals the core that mastership has changed for a device. | ||
12 | - * | ||
13 | - * @param deviceId the device ID | ||
14 | - * @param role the new mastership role of this controller instance | ||
15 | - */ | ||
16 | - void roleChanged(NodeId nodeId, DeviceId deviceId, MastershipRole role); | ||
17 | - | ||
18 | -} |
... | @@ -14,6 +14,32 @@ import org.onlab.onos.net.MastershipRole; | ... | @@ -14,6 +14,32 @@ import org.onlab.onos.net.MastershipRole; |
14 | public interface MastershipService { | 14 | public interface MastershipService { |
15 | 15 | ||
16 | /** | 16 | /** |
17 | + * Returns the role of the local node for the specified device, without | ||
18 | + * triggering master selection. | ||
19 | + * | ||
20 | + * @return role of the current node | ||
21 | + */ | ||
22 | + MastershipRole getLocalRole(DeviceId deviceId); | ||
23 | + | ||
24 | + /** | ||
25 | + * Returns the mastership status of the local controller for a given | ||
26 | + * device forcing master selection if necessary. | ||
27 | + * | ||
28 | + * @param deviceId the the identifier of the device | ||
29 | + * @return the role of this controller instance | ||
30 | + */ | ||
31 | + MastershipRole requestRoleFor(DeviceId deviceId); | ||
32 | + | ||
33 | + /** | ||
34 | + * Abandons mastership of the specified device on the local node thus | ||
35 | + * forcing selection of a new master. If the local node is not a master | ||
36 | + * for this device, no action will be taken. | ||
37 | + * | ||
38 | + * @param deviceId the identifier of the device | ||
39 | + */ | ||
40 | + void relinquishMastership(DeviceId deviceId); | ||
41 | + | ||
42 | + /** | ||
17 | * Returns the current master for a given device. | 43 | * Returns the current master for a given device. |
18 | * | 44 | * |
19 | * @param deviceId the identifier of the device | 45 | * @param deviceId the identifier of the device |
... | @@ -30,17 +56,6 @@ public interface MastershipService { | ... | @@ -30,17 +56,6 @@ public interface MastershipService { |
30 | Set<DeviceId> getDevicesOf(NodeId nodeId); | 56 | Set<DeviceId> getDevicesOf(NodeId nodeId); |
31 | 57 | ||
32 | /** | 58 | /** |
33 | - * Returns the mastership status of this controller for a given device. | ||
34 | - * | ||
35 | - * @param deviceId the the identifier of the device | ||
36 | - * @return the role of this controller instance | ||
37 | - */ | ||
38 | - MastershipRole requestRoleFor(DeviceId deviceId); | ||
39 | - | ||
40 | - // TODO: add facet for requesting a different master than the current one; | ||
41 | - // abandon mastership (due to loss of connection) | ||
42 | - | ||
43 | - /** | ||
44 | * Adds the specified mastership change listener. | 59 | * Adds the specified mastership change listener. |
45 | * | 60 | * |
46 | * @param listener the mastership listener | 61 | * @param listener the mastership listener | ... | ... |
... | @@ -14,26 +14,21 @@ public interface MastershipStore { | ... | @@ -14,26 +14,21 @@ public interface MastershipStore { |
14 | // three things to map: NodeId, DeviceId, MastershipRole | 14 | // three things to map: NodeId, DeviceId, MastershipRole |
15 | 15 | ||
16 | /** | 16 | /** |
17 | - * Sets a device's role for a specified controller instance. | 17 | + * Requests role of the local node for the specified device. |
18 | * | 18 | * |
19 | - * @param instance controller instance identifier | ||
20 | * @param deviceId device identifier | 19 | * @param deviceId device identifier |
21 | - * @param role new role | 20 | + * @return established or newly negotiated mastership role |
22 | - * @return a mastership event | ||
23 | */ | 21 | */ |
24 | - MastershipEvent setRole(NodeId instance, DeviceId deviceId, | 22 | + MastershipRole requestRole(DeviceId deviceId); |
25 | - MastershipRole role); | ||
26 | 23 | ||
27 | /** | 24 | /** |
28 | - * Adds or updates mastership information for a device. | 25 | + * Returns the role of a device for a specific controller instance. |
29 | * | 26 | * |
30 | - * @param instance controller instance identifier | 27 | + * @param nodeId the instance identifier |
31 | - * @param deviceId device identifier | 28 | + * @param deviceId the device identifiers |
32 | - * @param role new role | 29 | + * @return the role |
33 | - * @return a mastership event | ||
34 | */ | 30 | */ |
35 | - MastershipEvent addOrUpdateDevice(NodeId instance, DeviceId deviceId, | 31 | + MastershipRole getRole(NodeId nodeId, DeviceId deviceId); |
36 | - MastershipRole role); | ||
37 | 32 | ||
38 | /** | 33 | /** |
39 | * Returns the master for a device. | 34 | * Returns the master for a device. |
... | @@ -52,11 +47,13 @@ public interface MastershipStore { | ... | @@ -52,11 +47,13 @@ public interface MastershipStore { |
52 | Set<DeviceId> getDevices(NodeId nodeId); | 47 | Set<DeviceId> getDevices(NodeId nodeId); |
53 | 48 | ||
54 | /** | 49 | /** |
55 | - * Returns the role of a device for a specific controller instance. | 50 | + * Sets a device's role for a specified controller instance. |
56 | * | 51 | * |
57 | - * @param nodeId the instance identifier | 52 | + * @param nodeId controller instance identifier |
58 | - * @param deviceId the device identifiers | 53 | + * @param deviceId device identifier |
59 | - * @return the role | 54 | + * @param role new role |
55 | + * @return a mastership event | ||
60 | */ | 56 | */ |
61 | - MastershipRole getRole(NodeId nodeId, DeviceId deviceId); | 57 | + MastershipEvent setRole(NodeId nodeId, DeviceId deviceId, |
58 | + MastershipRole role); | ||
62 | } | 59 | } | ... | ... |
... | @@ -9,11 +9,6 @@ public class NodeId { | ... | @@ -9,11 +9,6 @@ public class NodeId { |
9 | 9 | ||
10 | private final String id; | 10 | private final String id; |
11 | 11 | ||
12 | - // Default constructor for serialization | ||
13 | - protected NodeId() { | ||
14 | - id = null; | ||
15 | - } | ||
16 | - | ||
17 | /** | 12 | /** |
18 | * Creates a new cluster node identifier from the specified string. | 13 | * Creates a new cluster node identifier from the specified string. |
19 | * | 14 | * | ... | ... |
1 | package org.onlab.onos.net.device; | 1 | package org.onlab.onos.net.device; |
2 | 2 | ||
3 | import org.onlab.onos.net.DeviceId; | 3 | import org.onlab.onos.net.DeviceId; |
4 | -import org.onlab.onos.net.MastershipRole; | ||
5 | 4 | ||
6 | /** | 5 | /** |
7 | * Service for administering the inventory of infrastructure devices. | 6 | * Service for administering the inventory of infrastructure devices. |
... | @@ -9,16 +8,6 @@ import org.onlab.onos.net.MastershipRole; | ... | @@ -9,16 +8,6 @@ import org.onlab.onos.net.MastershipRole; |
9 | public interface DeviceAdminService { | 8 | public interface DeviceAdminService { |
10 | 9 | ||
11 | /** | 10 | /** |
12 | - * Applies the current mastership role for the specified device. | ||
13 | - * | ||
14 | - * @param deviceId device identifier | ||
15 | - * @param role requested role | ||
16 | - * @deprecated Will be removed in favor of MastershipAdminService.setRole() | ||
17 | - */ | ||
18 | -// @Deprecated | ||
19 | - void setRole(DeviceId deviceId, MastershipRole role); | ||
20 | - | ||
21 | - /** | ||
22 | * Removes the device with the specified identifier. | 11 | * Removes the device with the specified identifier. |
23 | * | 12 | * |
24 | * @param deviceId device identifier | 13 | * @param deviceId device identifier | ... | ... |
... | @@ -2,7 +2,6 @@ package org.onlab.onos.net.device; | ... | @@ -2,7 +2,6 @@ package org.onlab.onos.net.device; |
2 | 2 | ||
3 | import org.onlab.onos.net.Device; | 3 | import org.onlab.onos.net.Device; |
4 | import org.onlab.onos.net.DeviceId; | 4 | import org.onlab.onos.net.DeviceId; |
5 | -import org.onlab.onos.net.MastershipRole; | ||
6 | import org.onlab.onos.net.Port; | 5 | import org.onlab.onos.net.Port; |
7 | import org.onlab.onos.net.PortNumber; | 6 | import org.onlab.onos.net.PortNumber; |
8 | import org.onlab.onos.net.provider.ProviderId; | 7 | import org.onlab.onos.net.provider.ProviderId; |
... | @@ -104,23 +103,6 @@ public interface DeviceStore { | ... | @@ -104,23 +103,6 @@ public interface DeviceStore { |
104 | boolean isAvailable(DeviceId deviceId); | 103 | boolean isAvailable(DeviceId deviceId); |
105 | 104 | ||
106 | /** | 105 | /** |
107 | - * Returns the mastership role determined for this device. | ||
108 | - * | ||
109 | - * @param deviceId device identifier | ||
110 | - * @return mastership role | ||
111 | - */ | ||
112 | - MastershipRole getRole(DeviceId deviceId); | ||
113 | - | ||
114 | - /** | ||
115 | - * Administratively sets the role of the specified device. | ||
116 | - * | ||
117 | - * @param deviceId device identifier | ||
118 | - * @param role mastership role to apply | ||
119 | - * @return mastership role change event or null if no change | ||
120 | - */ | ||
121 | - DeviceEvent setRole(DeviceId deviceId, MastershipRole role); | ||
122 | - | ||
123 | - /** | ||
124 | * Administratively removes the specified device from the store. | 106 | * Administratively removes the specified device from the store. |
125 | * | 107 | * |
126 | * @param deviceId device to be removed | 108 | * @param deviceId device to be removed | ... | ... |
1 | +package org.onlab.onos.cluster; | ||
2 | + | ||
3 | +import java.util.Set; | ||
4 | + | ||
5 | +/** | ||
6 | + * Test adapter for the cluster service. | ||
7 | + */ | ||
8 | +public class ClusterServiceAdapter implements ClusterService { | ||
9 | + @Override | ||
10 | + public ControllerNode getLocalNode() { | ||
11 | + return null; | ||
12 | + } | ||
13 | + | ||
14 | + @Override | ||
15 | + public Set<ControllerNode> getNodes() { | ||
16 | + return null; | ||
17 | + } | ||
18 | + | ||
19 | + @Override | ||
20 | + public ControllerNode getNode(NodeId nodeId) { | ||
21 | + return null; | ||
22 | + } | ||
23 | + | ||
24 | + @Override | ||
25 | + public ControllerNode.State getState(NodeId nodeId) { | ||
26 | + return null; | ||
27 | + } | ||
28 | + | ||
29 | + @Override | ||
30 | + public void addListener(ClusterEventListener listener) { | ||
31 | + } | ||
32 | + | ||
33 | + @Override | ||
34 | + public void removeListener(ClusterEventListener listener) { | ||
35 | + } | ||
36 | +} |
1 | +package org.onlab.onos.cluster; | ||
2 | + | ||
3 | +import org.onlab.onos.net.DeviceId; | ||
4 | +import org.onlab.onos.net.MastershipRole; | ||
5 | + | ||
6 | +import java.util.Set; | ||
7 | + | ||
8 | +/** | ||
9 | + * Test adapter for mastership service. | ||
10 | + */ | ||
11 | +public class MastershipServiceAdapter implements MastershipService { | ||
12 | + @Override | ||
13 | + public MastershipRole getLocalRole(DeviceId deviceId) { | ||
14 | + return null; | ||
15 | + } | ||
16 | + | ||
17 | + @Override | ||
18 | + public MastershipRole requestRoleFor(DeviceId deviceId) { | ||
19 | + return null; | ||
20 | + } | ||
21 | + | ||
22 | + @Override | ||
23 | + public void relinquishMastership(DeviceId deviceId) { | ||
24 | + } | ||
25 | + | ||
26 | + @Override | ||
27 | + public NodeId getMasterFor(DeviceId deviceId) { | ||
28 | + return null; | ||
29 | + } | ||
30 | + | ||
31 | + @Override | ||
32 | + public Set<DeviceId> getDevicesOf(NodeId nodeId) { | ||
33 | + return null; | ||
34 | + } | ||
35 | + | ||
36 | + @Override | ||
37 | + public void addListener(MastershipListener listener) { | ||
38 | + } | ||
39 | + | ||
40 | + @Override | ||
41 | + public void removeListener(MastershipListener listener) { | ||
42 | + } | ||
43 | +} |
... | @@ -6,6 +6,7 @@ import org.apache.felix.scr.annotations.Deactivate; | ... | @@ -6,6 +6,7 @@ import org.apache.felix.scr.annotations.Deactivate; |
6 | import org.apache.felix.scr.annotations.Reference; | 6 | import org.apache.felix.scr.annotations.Reference; |
7 | import org.apache.felix.scr.annotations.ReferenceCardinality; | 7 | import org.apache.felix.scr.annotations.ReferenceCardinality; |
8 | import org.apache.felix.scr.annotations.Service; | 8 | import org.apache.felix.scr.annotations.Service; |
9 | +import org.onlab.onos.cluster.ClusterAdminService; | ||
9 | import org.onlab.onos.cluster.ClusterEvent; | 10 | import org.onlab.onos.cluster.ClusterEvent; |
10 | import org.onlab.onos.cluster.ClusterEventListener; | 11 | import org.onlab.onos.cluster.ClusterEventListener; |
11 | import org.onlab.onos.cluster.ClusterService; | 12 | import org.onlab.onos.cluster.ClusterService; |
... | @@ -26,7 +27,7 @@ import static org.slf4j.LoggerFactory.getLogger; | ... | @@ -26,7 +27,7 @@ import static org.slf4j.LoggerFactory.getLogger; |
26 | */ | 27 | */ |
27 | @Component(immediate = true) | 28 | @Component(immediate = true) |
28 | @Service | 29 | @Service |
29 | -public class ClusterManager implements ClusterService { | 30 | +public class ClusterManager implements ClusterService, ClusterAdminService { |
30 | 31 | ||
31 | public static final String INSTANCE_ID_NULL = "Instance ID cannot be null"; | 32 | public static final String INSTANCE_ID_NULL = "Instance ID cannot be null"; |
32 | private final Logger log = getLogger(getClass()); | 33 | private final Logger log = getLogger(getClass()); |
... | @@ -75,6 +76,12 @@ public class ClusterManager implements ClusterService { | ... | @@ -75,6 +76,12 @@ public class ClusterManager implements ClusterService { |
75 | } | 76 | } |
76 | 77 | ||
77 | @Override | 78 | @Override |
79 | + public void removeNode(NodeId nodeId) { | ||
80 | + checkNotNull(nodeId, INSTANCE_ID_NULL); | ||
81 | + store.removeNode(nodeId); | ||
82 | + } | ||
83 | + | ||
84 | + @Override | ||
78 | public void addListener(ClusterEventListener listener) { | 85 | public void addListener(ClusterEventListener listener) { |
79 | listenerRegistry.addListener(listener); | 86 | listenerRegistry.addListener(listener); |
80 | } | 87 | } | ... | ... |
1 | package org.onlab.onos.cluster.impl; | 1 | package org.onlab.onos.cluster.impl; |
2 | 2 | ||
3 | -import static org.slf4j.LoggerFactory.getLogger; | ||
4 | - | ||
5 | -import java.util.Set; | ||
6 | - | ||
7 | import org.apache.felix.scr.annotations.Activate; | 3 | import org.apache.felix.scr.annotations.Activate; |
8 | import org.apache.felix.scr.annotations.Component; | 4 | import org.apache.felix.scr.annotations.Component; |
9 | import org.apache.felix.scr.annotations.Deactivate; | 5 | import org.apache.felix.scr.annotations.Deactivate; |
... | @@ -14,8 +10,6 @@ import org.onlab.onos.cluster.ClusterService; | ... | @@ -14,8 +10,6 @@ import org.onlab.onos.cluster.ClusterService; |
14 | import org.onlab.onos.cluster.MastershipAdminService; | 10 | import org.onlab.onos.cluster.MastershipAdminService; |
15 | import org.onlab.onos.cluster.MastershipEvent; | 11 | import org.onlab.onos.cluster.MastershipEvent; |
16 | import org.onlab.onos.cluster.MastershipListener; | 12 | import org.onlab.onos.cluster.MastershipListener; |
17 | -import org.onlab.onos.cluster.MastershipProvider; | ||
18 | -import org.onlab.onos.cluster.MastershipProviderService; | ||
19 | import org.onlab.onos.cluster.MastershipService; | 13 | import org.onlab.onos.cluster.MastershipService; |
20 | import org.onlab.onos.cluster.MastershipStore; | 14 | import org.onlab.onos.cluster.MastershipStore; |
21 | import org.onlab.onos.cluster.NodeId; | 15 | import org.onlab.onos.cluster.NodeId; |
... | @@ -23,16 +17,16 @@ import org.onlab.onos.event.AbstractListenerRegistry; | ... | @@ -23,16 +17,16 @@ import org.onlab.onos.event.AbstractListenerRegistry; |
23 | import org.onlab.onos.event.EventDeliveryService; | 17 | import org.onlab.onos.event.EventDeliveryService; |
24 | import org.onlab.onos.net.DeviceId; | 18 | import org.onlab.onos.net.DeviceId; |
25 | import org.onlab.onos.net.MastershipRole; | 19 | import org.onlab.onos.net.MastershipRole; |
26 | -import org.onlab.onos.net.provider.AbstractProviderRegistry; | ||
27 | -import org.onlab.onos.net.provider.AbstractProviderService; | ||
28 | import org.slf4j.Logger; | 20 | import org.slf4j.Logger; |
29 | 21 | ||
22 | +import java.util.Set; | ||
23 | + | ||
30 | import static com.google.common.base.Preconditions.checkNotNull; | 24 | import static com.google.common.base.Preconditions.checkNotNull; |
25 | +import static org.slf4j.LoggerFactory.getLogger; | ||
31 | 26 | ||
32 | @Component(immediate = true) | 27 | @Component(immediate = true) |
33 | @Service | 28 | @Service |
34 | public class MastershipManager | 29 | public class MastershipManager |
35 | - extends AbstractProviderRegistry<MastershipProvider, MastershipProviderService> | ||
36 | implements MastershipService, MastershipAdminService { | 30 | implements MastershipService, MastershipAdminService { |
37 | 31 | ||
38 | private static final String NODE_ID_NULL = "Node ID cannot be null"; | 32 | private static final String NODE_ID_NULL = "Node ID cannot be null"; |
... | @@ -77,22 +71,33 @@ public class MastershipManager | ... | @@ -77,22 +71,33 @@ public class MastershipManager |
77 | } | 71 | } |
78 | 72 | ||
79 | @Override | 73 | @Override |
80 | - public NodeId getMasterFor(DeviceId deviceId) { | 74 | + public MastershipRole getLocalRole(DeviceId deviceId) { |
81 | checkNotNull(deviceId, DEVICE_ID_NULL); | 75 | checkNotNull(deviceId, DEVICE_ID_NULL); |
82 | - return store.getMaster(deviceId); | 76 | + return store.getRole(clusterService.getLocalNode().id(), deviceId); |
83 | } | 77 | } |
84 | 78 | ||
85 | @Override | 79 | @Override |
86 | - public Set<DeviceId> getDevicesOf(NodeId nodeId) { | 80 | + public void relinquishMastership(DeviceId deviceId) { |
87 | - checkNotNull(nodeId, NODE_ID_NULL); | 81 | + checkNotNull(deviceId, DEVICE_ID_NULL); |
88 | - return store.getDevices(nodeId); | 82 | + // FIXME: add method to store to give up mastership and trigger new master selection process |
89 | } | 83 | } |
90 | 84 | ||
91 | @Override | 85 | @Override |
92 | public MastershipRole requestRoleFor(DeviceId deviceId) { | 86 | public MastershipRole requestRoleFor(DeviceId deviceId) { |
93 | checkNotNull(deviceId, DEVICE_ID_NULL); | 87 | checkNotNull(deviceId, DEVICE_ID_NULL); |
94 | - NodeId id = clusterService.getLocalNode().id(); | 88 | + return store.requestRole(deviceId); |
95 | - return store.getRole(id, deviceId); | 89 | + } |
90 | + | ||
91 | + @Override | ||
92 | + public NodeId getMasterFor(DeviceId deviceId) { | ||
93 | + checkNotNull(deviceId, DEVICE_ID_NULL); | ||
94 | + return store.getMaster(deviceId); | ||
95 | + } | ||
96 | + | ||
97 | + @Override | ||
98 | + public Set<DeviceId> getDevicesOf(NodeId nodeId) { | ||
99 | + checkNotNull(nodeId, NODE_ID_NULL); | ||
100 | + return store.getDevices(nodeId); | ||
96 | } | 101 | } |
97 | 102 | ||
98 | @Override | 103 | @Override |
... | @@ -107,28 +112,7 @@ public class MastershipManager | ... | @@ -107,28 +112,7 @@ public class MastershipManager |
107 | listenerRegistry.removeListener(listener); | 112 | listenerRegistry.removeListener(listener); |
108 | } | 113 | } |
109 | 114 | ||
110 | - @Override | 115 | + // FIXME: provide wiring to allow events to be triggered by changes within the store |
111 | - protected MastershipProviderService createProviderService( | ||
112 | - MastershipProvider provider) { | ||
113 | - return new InternalMastershipProviderService(provider); | ||
114 | - } | ||
115 | - | ||
116 | - private class InternalMastershipProviderService | ||
117 | - extends AbstractProviderService<MastershipProvider> | ||
118 | - implements MastershipProviderService { | ||
119 | - | ||
120 | - protected InternalMastershipProviderService(MastershipProvider provider) { | ||
121 | - super(provider); | ||
122 | - } | ||
123 | - | ||
124 | - @Override | ||
125 | - public void roleChanged(NodeId nodeId, DeviceId deviceId, MastershipRole role) { | ||
126 | - // TODO Auto-generated method stub | ||
127 | - MastershipEvent event = | ||
128 | - store.addOrUpdateDevice(nodeId, deviceId, role); | ||
129 | - post(event); | ||
130 | - } | ||
131 | - } | ||
132 | 116 | ||
133 | // Posts the specified event to the local event dispatcher. | 117 | // Posts the specified event to the local event dispatcher. |
134 | private void post(MastershipEvent event) { | 118 | private void post(MastershipEvent event) { | ... | ... |
... | @@ -6,6 +6,9 @@ import org.apache.felix.scr.annotations.Deactivate; | ... | @@ -6,6 +6,9 @@ import org.apache.felix.scr.annotations.Deactivate; |
6 | import org.apache.felix.scr.annotations.Reference; | 6 | import org.apache.felix.scr.annotations.Reference; |
7 | import org.apache.felix.scr.annotations.ReferenceCardinality; | 7 | import org.apache.felix.scr.annotations.ReferenceCardinality; |
8 | import org.apache.felix.scr.annotations.Service; | 8 | import org.apache.felix.scr.annotations.Service; |
9 | +import org.onlab.onos.cluster.ClusterService; | ||
10 | +import org.onlab.onos.cluster.MastershipEvent; | ||
11 | +import org.onlab.onos.cluster.MastershipListener; | ||
9 | import org.onlab.onos.cluster.MastershipService; | 12 | import org.onlab.onos.cluster.MastershipService; |
10 | import org.onlab.onos.event.AbstractListenerRegistry; | 13 | import org.onlab.onos.event.AbstractListenerRegistry; |
11 | import org.onlab.onos.event.EventDeliveryService; | 14 | import org.onlab.onos.event.EventDeliveryService; |
... | @@ -54,6 +57,8 @@ public class DeviceManager | ... | @@ -54,6 +57,8 @@ public class DeviceManager |
54 | protected final AbstractListenerRegistry<DeviceEvent, DeviceListener> | 57 | protected final AbstractListenerRegistry<DeviceEvent, DeviceListener> |
55 | listenerRegistry = new AbstractListenerRegistry<>(); | 58 | listenerRegistry = new AbstractListenerRegistry<>(); |
56 | 59 | ||
60 | + private final MastershipListener mastershipListener = new InnerMastershipListener(); | ||
61 | + | ||
57 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) | 62 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
58 | protected DeviceStore store; | 63 | protected DeviceStore store; |
59 | 64 | ||
... | @@ -61,16 +66,21 @@ public class DeviceManager | ... | @@ -61,16 +66,21 @@ public class DeviceManager |
61 | protected EventDeliveryService eventDispatcher; | 66 | protected EventDeliveryService eventDispatcher; |
62 | 67 | ||
63 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) | 68 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
69 | + protected ClusterService clusterService; | ||
70 | + | ||
71 | + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) | ||
64 | protected MastershipService mastershipService; | 72 | protected MastershipService mastershipService; |
65 | 73 | ||
66 | @Activate | 74 | @Activate |
67 | public void activate() { | 75 | public void activate() { |
68 | eventDispatcher.addSink(DeviceEvent.class, listenerRegistry); | 76 | eventDispatcher.addSink(DeviceEvent.class, listenerRegistry); |
77 | + mastershipService.addListener(mastershipListener); | ||
69 | log.info("Started"); | 78 | log.info("Started"); |
70 | } | 79 | } |
71 | 80 | ||
72 | @Deactivate | 81 | @Deactivate |
73 | public void deactivate() { | 82 | public void deactivate() { |
83 | + mastershipService.removeListener(mastershipListener); | ||
74 | eventDispatcher.removeSink(DeviceEvent.class); | 84 | eventDispatcher.removeSink(DeviceEvent.class); |
75 | log.info("Stopped"); | 85 | log.info("Stopped"); |
76 | } | 86 | } |
... | @@ -94,7 +104,7 @@ public class DeviceManager | ... | @@ -94,7 +104,7 @@ public class DeviceManager |
94 | @Override | 104 | @Override |
95 | public MastershipRole getRole(DeviceId deviceId) { | 105 | public MastershipRole getRole(DeviceId deviceId) { |
96 | checkNotNull(deviceId, DEVICE_ID_NULL); | 106 | checkNotNull(deviceId, DEVICE_ID_NULL); |
97 | - return store.getRole(deviceId); | 107 | + return mastershipService.getLocalRole(deviceId); |
98 | } | 108 | } |
99 | 109 | ||
100 | @Override | 110 | @Override |
... | @@ -116,18 +126,15 @@ public class DeviceManager | ... | @@ -116,18 +126,15 @@ public class DeviceManager |
116 | return store.isAvailable(deviceId); | 126 | return store.isAvailable(deviceId); |
117 | } | 127 | } |
118 | 128 | ||
119 | - @Override | 129 | + // Applies the specified role to the device; ignores NONE |
120 | - public void setRole(DeviceId deviceId, MastershipRole newRole) { | 130 | + private void applyRole(DeviceId deviceId, MastershipRole newRole) { |
121 | - checkNotNull(deviceId, DEVICE_ID_NULL); | 131 | + if (newRole != MastershipRole.NONE) { |
122 | - checkNotNull(newRole, ROLE_NULL); | 132 | + Device device = store.getDevice(deviceId); |
123 | - DeviceEvent event = store.setRole(deviceId, newRole); | ||
124 | - if (event != null) { | ||
125 | - Device device = event.subject(); | ||
126 | DeviceProvider provider = getProvider(device.providerId()); | 133 | DeviceProvider provider = getProvider(device.providerId()); |
127 | if (provider != null) { | 134 | if (provider != null) { |
128 | provider.roleChanged(device, newRole); | 135 | provider.roleChanged(device, newRole); |
129 | } | 136 | } |
130 | - post(event); | 137 | + post(new DeviceEvent(DEVICE_MASTERSHIP_CHANGED, device)); |
131 | } | 138 | } |
132 | } | 139 | } |
133 | 140 | ||
... | @@ -176,12 +183,9 @@ public class DeviceManager | ... | @@ -176,12 +183,9 @@ public class DeviceManager |
176 | // If there was a change of any kind, trigger role selection process. | 183 | // If there was a change of any kind, trigger role selection process. |
177 | if (event != null) { | 184 | if (event != null) { |
178 | log.info("Device {} connected", deviceId); | 185 | log.info("Device {} connected", deviceId); |
179 | - if (event.type().equals(DEVICE_ADDED)) { | 186 | + mastershipService.requestRoleFor(deviceId); |
180 | - MastershipRole role = mastershipService.requestRoleFor(deviceId); | 187 | + provider().roleChanged(event.subject(), |
181 | - store.setRole(deviceId, role); | 188 | + mastershipService.getLocalRole(deviceId)); |
182 | - } | ||
183 | - Device device = event.subject(); | ||
184 | - provider().roleChanged(device, store.getRole(device.id())); | ||
185 | post(event); | 189 | post(event); |
186 | } | 190 | } |
187 | } | 191 | } |
... | @@ -229,4 +233,14 @@ public class DeviceManager | ... | @@ -229,4 +233,14 @@ public class DeviceManager |
229 | } | 233 | } |
230 | } | 234 | } |
231 | 235 | ||
236 | + // Intercepts mastership events | ||
237 | + private class InnerMastershipListener implements MastershipListener { | ||
238 | + @Override | ||
239 | + public void event(MastershipEvent event) { | ||
240 | + // FIXME: for now we're taking action only on becoming master | ||
241 | + if (event.master().equals(clusterService.getLocalNode().id())) { | ||
242 | + applyRole(event.subject(), MastershipRole.MASTER); | ||
243 | + } | ||
244 | + } | ||
245 | + } | ||
232 | } | 246 | } | ... | ... |
1 | package org.onlab.onos.net.device.impl; | 1 | package org.onlab.onos.net.device.impl; |
2 | 2 | ||
3 | +import com.google.common.collect.Sets; | ||
3 | import org.junit.After; | 4 | import org.junit.After; |
4 | import org.junit.Before; | 5 | import org.junit.Before; |
6 | +import org.junit.Ignore; | ||
5 | import org.junit.Test; | 7 | import org.junit.Test; |
6 | -import org.onlab.onos.cluster.MastershipListener; | 8 | +import org.onlab.onos.cluster.MastershipServiceAdapter; |
7 | -import org.onlab.onos.cluster.MastershipService; | ||
8 | import org.onlab.onos.cluster.NodeId; | 9 | import org.onlab.onos.cluster.NodeId; |
9 | import org.onlab.onos.event.Event; | 10 | import org.onlab.onos.event.Event; |
11 | +import org.onlab.onos.event.impl.TestEventDispatcher; | ||
10 | import org.onlab.onos.net.Device; | 12 | import org.onlab.onos.net.Device; |
11 | import org.onlab.onos.net.DeviceId; | 13 | import org.onlab.onos.net.DeviceId; |
12 | import org.onlab.onos.net.MastershipRole; | 14 | import org.onlab.onos.net.MastershipRole; |
... | @@ -25,11 +27,8 @@ import org.onlab.onos.net.device.DeviceService; | ... | @@ -25,11 +27,8 @@ import org.onlab.onos.net.device.DeviceService; |
25 | import org.onlab.onos.net.device.PortDescription; | 27 | import org.onlab.onos.net.device.PortDescription; |
26 | import org.onlab.onos.net.provider.AbstractProvider; | 28 | import org.onlab.onos.net.provider.AbstractProvider; |
27 | import org.onlab.onos.net.provider.ProviderId; | 29 | import org.onlab.onos.net.provider.ProviderId; |
28 | -import org.onlab.onos.event.impl.TestEventDispatcher; | ||
29 | import org.onlab.onos.net.trivial.impl.SimpleDeviceStore; | 30 | import org.onlab.onos.net.trivial.impl.SimpleDeviceStore; |
30 | 31 | ||
31 | -import com.google.common.collect.Sets; | ||
32 | - | ||
33 | import java.util.ArrayList; | 32 | import java.util.ArrayList; |
34 | import java.util.Iterator; | 33 | import java.util.Iterator; |
35 | import java.util.List; | 34 | import java.util.List; |
... | @@ -151,10 +150,10 @@ public class DeviceManagerTest { | ... | @@ -151,10 +150,10 @@ public class DeviceManagerTest { |
151 | assertEquals("incorrect role", MastershipRole.MASTER, service.getRole(DID1)); | 150 | assertEquals("incorrect role", MastershipRole.MASTER, service.getRole(DID1)); |
152 | } | 151 | } |
153 | 152 | ||
153 | + @Ignore("disabled until we settle the device-mastership wiring") | ||
154 | @Test | 154 | @Test |
155 | public void setRole() throws InterruptedException { | 155 | public void setRole() throws InterruptedException { |
156 | connectDevice(DID1, SW1); | 156 | connectDevice(DID1, SW1); |
157 | - admin.setRole(DID1, MastershipRole.STANDBY); | ||
158 | validateEvents(DEVICE_ADDED, DEVICE_MASTERSHIP_CHANGED); | 157 | validateEvents(DEVICE_ADDED, DEVICE_MASTERSHIP_CHANGED); |
159 | assertEquals("incorrect role", MastershipRole.STANDBY, service.getRole(DID1)); | 158 | assertEquals("incorrect role", MastershipRole.STANDBY, service.getRole(DID1)); |
160 | assertEquals("incorrect device", DID1, provider.deviceReceived.id()); | 159 | assertEquals("incorrect device", DID1, provider.deviceReceived.id()); |
... | @@ -259,11 +258,10 @@ public class DeviceManagerTest { | ... | @@ -259,11 +258,10 @@ public class DeviceManagerTest { |
259 | } | 258 | } |
260 | } | 259 | } |
261 | 260 | ||
262 | - private static class TestMastershipService implements MastershipService { | 261 | + private static class TestMastershipService extends MastershipServiceAdapter { |
263 | - | ||
264 | @Override | 262 | @Override |
265 | - public NodeId getMasterFor(DeviceId deviceId) { | 263 | + public MastershipRole getLocalRole(DeviceId deviceId) { |
266 | - return null; | 264 | + return MastershipRole.MASTER; |
267 | } | 265 | } |
268 | 266 | ||
269 | @Override | 267 | @Override |
... | @@ -275,15 +273,6 @@ public class DeviceManagerTest { | ... | @@ -275,15 +273,6 @@ public class DeviceManagerTest { |
275 | public MastershipRole requestRoleFor(DeviceId deviceId) { | 273 | public MastershipRole requestRoleFor(DeviceId deviceId) { |
276 | return MastershipRole.MASTER; | 274 | return MastershipRole.MASTER; |
277 | } | 275 | } |
278 | - | ||
279 | - @Override | ||
280 | - public void addListener(MastershipListener listener) { | ||
281 | - } | ||
282 | - | ||
283 | - @Override | ||
284 | - public void removeListener(MastershipListener listener) { | ||
285 | - } | ||
286 | - | ||
287 | } | 276 | } |
288 | 277 | ||
289 | } | 278 | } | ... | ... |
... | @@ -5,12 +5,10 @@ import com.google.common.collect.Sets; | ... | @@ -5,12 +5,10 @@ import com.google.common.collect.Sets; |
5 | import com.hazelcast.config.Config; | 5 | import com.hazelcast.config.Config; |
6 | import com.hazelcast.core.Hazelcast; | 6 | import com.hazelcast.core.Hazelcast; |
7 | import com.hazelcast.core.HazelcastInstance; | 7 | import com.hazelcast.core.HazelcastInstance; |
8 | - | ||
9 | import org.junit.After; | 8 | import org.junit.After; |
10 | import org.junit.Before; | 9 | import org.junit.Before; |
11 | import org.junit.Test; | 10 | import org.junit.Test; |
12 | -import org.onlab.onos.cluster.MastershipListener; | 11 | +import org.onlab.onos.cluster.MastershipServiceAdapter; |
13 | -import org.onlab.onos.cluster.MastershipService; | ||
14 | import org.onlab.onos.cluster.NodeId; | 12 | import org.onlab.onos.cluster.NodeId; |
15 | import org.onlab.onos.event.Event; | 13 | import org.onlab.onos.event.Event; |
16 | import org.onlab.onos.event.impl.TestEventDispatcher; | 14 | import org.onlab.onos.event.impl.TestEventDispatcher; |
... | @@ -50,6 +48,7 @@ import static org.onlab.onos.net.device.DeviceEvent.Type.*; | ... | @@ -50,6 +48,7 @@ import static org.onlab.onos.net.device.DeviceEvent.Type.*; |
50 | // FIXME This test is painfully slow starting up Hazelcast on each test cases, | 48 | // FIXME This test is painfully slow starting up Hazelcast on each test cases, |
51 | // turning it off in repository for now. | 49 | // turning it off in repository for now. |
52 | // FIXME DistributedDeviceStore should have it's own test cases. | 50 | // FIXME DistributedDeviceStore should have it's own test cases. |
51 | + | ||
53 | /** | 52 | /** |
54 | * Test codifying the device service & device provider service contracts. | 53 | * Test codifying the device service & device provider service contracts. |
55 | */ | 54 | */ |
... | @@ -91,11 +90,11 @@ public class DistributedDeviceManagerTest { | ... | @@ -91,11 +90,11 @@ public class DistributedDeviceManagerTest { |
91 | config.getGroupConfig().setName(UUID.randomUUID().toString()); | 90 | config.getGroupConfig().setName(UUID.randomUUID().toString()); |
92 | // quickly form single node cluster | 91 | // quickly form single node cluster |
93 | config.getNetworkConfig().getJoin() | 92 | config.getNetworkConfig().getJoin() |
94 | - .getTcpIpConfig() | 93 | + .getTcpIpConfig() |
95 | - .setEnabled(true).setConnectionTimeoutSeconds(0); | 94 | + .setEnabled(true).setConnectionTimeoutSeconds(0); |
96 | config.getNetworkConfig().getJoin() | 95 | config.getNetworkConfig().getJoin() |
97 | - .getMulticastConfig() | 96 | + .getMulticastConfig() |
98 | - .setEnabled(false); | 97 | + .setEnabled(false); |
99 | 98 | ||
100 | storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config)); | 99 | storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config)); |
101 | storeManager.activate(); | 100 | storeManager.activate(); |
... | @@ -186,16 +185,6 @@ public class DistributedDeviceManagerTest { | ... | @@ -186,16 +185,6 @@ public class DistributedDeviceManagerTest { |
186 | } | 185 | } |
187 | 186 | ||
188 | @Test | 187 | @Test |
189 | - public void setRole() throws InterruptedException { | ||
190 | - connectDevice(DID1, SW1); | ||
191 | - admin.setRole(DID1, MastershipRole.STANDBY); | ||
192 | - validateEvents(DEVICE_ADDED, DEVICE_MASTERSHIP_CHANGED); | ||
193 | - assertEquals("incorrect role", MastershipRole.STANDBY, service.getRole(DID1)); | ||
194 | - assertEquals("incorrect device", DID1, provider.deviceReceived.id()); | ||
195 | - assertEquals("incorrect role", MastershipRole.STANDBY, provider.roleReceived); | ||
196 | - } | ||
197 | - | ||
198 | - @Test | ||
199 | public void updatePorts() { | 188 | public void updatePorts() { |
200 | connectDevice(DID1, SW1); | 189 | connectDevice(DID1, SW1); |
201 | List<PortDescription> pds = new ArrayList<>(); | 190 | List<PortDescription> pds = new ArrayList<>(); |
... | @@ -310,11 +299,10 @@ public class DistributedDeviceManagerTest { | ... | @@ -310,11 +299,10 @@ public class DistributedDeviceManagerTest { |
310 | } | 299 | } |
311 | } | 300 | } |
312 | 301 | ||
313 | - private static class TestMastershipService implements MastershipService { | 302 | + private static class TestMastershipService extends MastershipServiceAdapter { |
314 | - | ||
315 | @Override | 303 | @Override |
316 | - public NodeId getMasterFor(DeviceId deviceId) { | 304 | + public MastershipRole getLocalRole(DeviceId deviceId) { |
317 | - return null; | 305 | + return MastershipRole.MASTER; |
318 | } | 306 | } |
319 | 307 | ||
320 | @Override | 308 | @Override |
... | @@ -326,15 +314,6 @@ public class DistributedDeviceManagerTest { | ... | @@ -326,15 +314,6 @@ public class DistributedDeviceManagerTest { |
326 | public MastershipRole requestRoleFor(DeviceId deviceId) { | 314 | public MastershipRole requestRoleFor(DeviceId deviceId) { |
327 | return MastershipRole.MASTER; | 315 | return MastershipRole.MASTER; |
328 | } | 316 | } |
329 | - | ||
330 | - @Override | ||
331 | - public void addListener(MastershipListener listener) { | ||
332 | - } | ||
333 | - | ||
334 | - @Override | ||
335 | - public void removeListener(MastershipListener listener) { | ||
336 | - } | ||
337 | - | ||
338 | } | 317 | } |
339 | 318 | ||
340 | } | 319 | } | ... | ... |
1 | package org.onlab.onos.store.cluster.impl; | 1 | package org.onlab.onos.store.cluster.impl; |
2 | 2 | ||
3 | +import com.google.common.base.Optional; | ||
4 | +import com.google.common.cache.LoadingCache; | ||
3 | import com.google.common.collect.ImmutableSet; | 5 | import com.google.common.collect.ImmutableSet; |
4 | -import com.hazelcast.core.HazelcastInstance; | 6 | +import com.hazelcast.core.IMap; |
5 | import com.hazelcast.core.Member; | 7 | import com.hazelcast.core.Member; |
8 | +import com.hazelcast.core.MemberAttributeEvent; | ||
9 | +import com.hazelcast.core.MembershipEvent; | ||
10 | +import com.hazelcast.core.MembershipListener; | ||
6 | import org.apache.felix.scr.annotations.Activate; | 11 | import org.apache.felix.scr.annotations.Activate; |
7 | import org.apache.felix.scr.annotations.Component; | 12 | import org.apache.felix.scr.annotations.Component; |
8 | import org.apache.felix.scr.annotations.Deactivate; | 13 | import org.apache.felix.scr.annotations.Deactivate; |
9 | -import org.apache.felix.scr.annotations.Reference; | ||
10 | -import org.apache.felix.scr.annotations.ReferenceCardinality; | ||
11 | import org.apache.felix.scr.annotations.Service; | 14 | import org.apache.felix.scr.annotations.Service; |
12 | import org.onlab.onos.cluster.ClusterStore; | 15 | import org.onlab.onos.cluster.ClusterStore; |
13 | import org.onlab.onos.cluster.ControllerNode; | 16 | import org.onlab.onos.cluster.ControllerNode; |
14 | import org.onlab.onos.cluster.DefaultControllerNode; | 17 | import org.onlab.onos.cluster.DefaultControllerNode; |
15 | import org.onlab.onos.cluster.NodeId; | 18 | import org.onlab.onos.cluster.NodeId; |
16 | -import org.onlab.onos.store.StoreService; | 19 | +import org.onlab.onos.store.impl.AbsentInvalidatingLoadingCache; |
20 | +import org.onlab.onos.store.impl.AbstractDistributedStore; | ||
21 | +import org.onlab.onos.store.impl.OptionalCacheLoader; | ||
17 | import org.onlab.packet.IpPrefix; | 22 | import org.onlab.packet.IpPrefix; |
18 | -import org.slf4j.Logger; | ||
19 | 23 | ||
24 | +import java.util.Map; | ||
20 | import java.util.Set; | 25 | import java.util.Set; |
26 | +import java.util.concurrent.ConcurrentHashMap; | ||
21 | 27 | ||
22 | -import static org.slf4j.LoggerFactory.getLogger; | 28 | +import static com.google.common.cache.CacheBuilder.newBuilder; |
29 | +import static org.onlab.onos.cluster.ControllerNode.State; | ||
23 | 30 | ||
24 | /** | 31 | /** |
25 | * Distributed implementation of the cluster nodes store. | 32 | * Distributed implementation of the cluster nodes store. |
26 | */ | 33 | */ |
27 | @Component(immediate = true) | 34 | @Component(immediate = true) |
28 | @Service | 35 | @Service |
29 | -public class DistributedClusterStore implements ClusterStore { | 36 | +public class DistributedClusterStore extends AbstractDistributedStore |
37 | + implements ClusterStore { | ||
30 | 38 | ||
31 | - private final Logger log = getLogger(getClass()); | 39 | + private IMap<byte[], byte[]> rawNodes; |
40 | + private LoadingCache<NodeId, Optional<DefaultControllerNode>> nodes; | ||
32 | 41 | ||
33 | - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) | 42 | + private String listenerId; |
34 | - protected StoreService storeService; | 43 | + private final MembershipListener listener = new InnerMembershipListener(); |
35 | - | 44 | + private final Map<NodeId, State> states = new ConcurrentHashMap<>(); |
36 | - private HazelcastInstance theInstance; | ||
37 | - | ||
38 | - // FIXME: experimental implementation; enhance to assure persistence and | ||
39 | - // visibility to nodes that are not currently in the cluster | ||
40 | 45 | ||
41 | @Activate | 46 | @Activate |
42 | public void activate() { | 47 | public void activate() { |
48 | + super.activate(); | ||
49 | + listenerId = theInstance.getCluster().addMembershipListener(listener); | ||
50 | + | ||
51 | + rawNodes = theInstance.getMap("nodes"); | ||
52 | + OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader | ||
53 | + = new OptionalCacheLoader<>(storeService, rawNodes); | ||
54 | + nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader)); | ||
55 | + rawNodes.addEntryListener(new RemoteEventHandler<>(nodes), true); | ||
56 | + | ||
57 | + loadClusterNodes(); | ||
58 | + | ||
43 | log.info("Started"); | 59 | log.info("Started"); |
44 | - theInstance = storeService.getHazelcastInstance(); | 60 | + } |
45 | 61 | ||
62 | + // Loads the initial set of cluster nodes | ||
63 | + private void loadClusterNodes() { | ||
64 | + for (Member member : theInstance.getCluster().getMembers()) { | ||
65 | + addMember(member); | ||
66 | + } | ||
46 | } | 67 | } |
47 | 68 | ||
48 | @Deactivate | 69 | @Deactivate |
49 | public void deactivate() { | 70 | public void deactivate() { |
71 | + theInstance.getCluster().removeMembershipListener(listenerId); | ||
50 | log.info("Stopped"); | 72 | log.info("Stopped"); |
51 | } | 73 | } |
52 | 74 | ||
... | @@ -58,30 +80,71 @@ public class DistributedClusterStore implements ClusterStore { | ... | @@ -58,30 +80,71 @@ public class DistributedClusterStore implements ClusterStore { |
58 | @Override | 80 | @Override |
59 | public Set<ControllerNode> getNodes() { | 81 | public Set<ControllerNode> getNodes() { |
60 | ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder(); | 82 | ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder(); |
61 | - for (Member member : theInstance.getCluster().getMembers()) { | 83 | + for (Optional<DefaultControllerNode> optional : nodes.asMap().values()) { |
62 | - builder.add(node(member)); | 84 | + builder.add(optional.get()); |
63 | } | 85 | } |
64 | return builder.build(); | 86 | return builder.build(); |
65 | } | 87 | } |
66 | 88 | ||
67 | @Override | 89 | @Override |
68 | public ControllerNode getNode(NodeId nodeId) { | 90 | public ControllerNode getNode(NodeId nodeId) { |
69 | - for (Member member : theInstance.getCluster().getMembers()) { | 91 | + return nodes.getUnchecked(nodeId).orNull(); |
70 | - if (member.getUuid().equals(nodeId.toString())) { | 92 | + } |
71 | - return node(member); | 93 | + |
72 | - } | 94 | + @Override |
73 | - } | 95 | + public State getState(NodeId nodeId) { |
74 | - return null; | 96 | + State state = states.get(nodeId); |
97 | + return state == null ? State.INACTIVE : state; | ||
75 | } | 98 | } |
76 | 99 | ||
77 | @Override | 100 | @Override |
78 | - public ControllerNode.State getState(NodeId nodeId) { | 101 | + public void removeNode(NodeId nodeId) { |
79 | - return ControllerNode.State.ACTIVE; | 102 | + synchronized (this) { |
103 | + rawNodes.remove(serialize(nodeId)); | ||
104 | + nodes.invalidate(nodeId); | ||
105 | + } | ||
106 | + } | ||
107 | + | ||
108 | + // Adds a new node based on the specified member | ||
109 | + private synchronized void addMember(Member member) { | ||
110 | + DefaultControllerNode node = node(member); | ||
111 | + rawNodes.put(serialize(node.id()), serialize(node)); | ||
112 | + nodes.put(node.id(), Optional.of(node)); | ||
113 | + states.put(node.id(), State.ACTIVE); | ||
80 | } | 114 | } |
81 | 115 | ||
82 | // Creates a controller node descriptor from the Hazelcast member. | 116 | // Creates a controller node descriptor from the Hazelcast member. |
83 | - private ControllerNode node(Member member) { | 117 | + private DefaultControllerNode node(Member member) { |
84 | - return new DefaultControllerNode(new NodeId(member.getUuid()), | 118 | + IpPrefix ip = memberAddress(member); |
85 | - IpPrefix.valueOf(member.getSocketAddress().getAddress().getAddress())); | 119 | + return new DefaultControllerNode(new NodeId(ip.toString()), ip); |
120 | + } | ||
121 | + | ||
122 | + private IpPrefix memberAddress(Member member) { | ||
123 | + byte[] address = member.getSocketAddress().getAddress().getAddress(); | ||
124 | + return IpPrefix.valueOf(address); | ||
125 | + } | ||
126 | + | ||
127 | + // Interceptor for membership events. | ||
128 | + private class InnerMembershipListener implements MembershipListener { | ||
129 | + @Override | ||
130 | + public void memberAdded(MembershipEvent membershipEvent) { | ||
131 | + log.info("Member {} added", membershipEvent.getMember()); | ||
132 | + addMember(membershipEvent.getMember()); | ||
133 | + } | ||
134 | + | ||
135 | + @Override | ||
136 | + public void memberRemoved(MembershipEvent membershipEvent) { | ||
137 | + log.info("Member {} removed", membershipEvent.getMember()); | ||
138 | + states.put(new NodeId(memberAddress(membershipEvent.getMember()).toString()), | ||
139 | + State.INACTIVE); | ||
140 | + } | ||
141 | + | ||
142 | + @Override | ||
143 | + public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) { | ||
144 | + log.info("Member {} attribute {} changed to {}", | ||
145 | + memberAttributeEvent.getMember(), | ||
146 | + memberAttributeEvent.getKey(), | ||
147 | + memberAttributeEvent.getValue()); | ||
148 | + } | ||
86 | } | 149 | } |
87 | } | 150 | } | ... | ... |
core/store/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
0 → 100644
1 | +package org.onlab.onos.store.cluster.impl; | ||
2 | + | ||
3 | +import com.google.common.base.Optional; | ||
4 | +import com.google.common.cache.LoadingCache; | ||
5 | +import com.google.common.collect.ImmutableSet; | ||
6 | +import com.hazelcast.core.IMap; | ||
7 | +import org.apache.felix.scr.annotations.Activate; | ||
8 | +import org.apache.felix.scr.annotations.Component; | ||
9 | +import org.apache.felix.scr.annotations.Deactivate; | ||
10 | +import org.apache.felix.scr.annotations.Reference; | ||
11 | +import org.apache.felix.scr.annotations.ReferenceCardinality; | ||
12 | +import org.apache.felix.scr.annotations.Service; | ||
13 | +import org.onlab.onos.cluster.ClusterService; | ||
14 | +import org.onlab.onos.cluster.MastershipEvent; | ||
15 | +import org.onlab.onos.cluster.MastershipStore; | ||
16 | +import org.onlab.onos.cluster.NodeId; | ||
17 | +import org.onlab.onos.net.DeviceId; | ||
18 | +import org.onlab.onos.net.MastershipRole; | ||
19 | +import org.onlab.onos.store.impl.AbsentInvalidatingLoadingCache; | ||
20 | +import org.onlab.onos.store.impl.AbstractDistributedStore; | ||
21 | +import org.onlab.onos.store.impl.OptionalCacheLoader; | ||
22 | + | ||
23 | +import java.util.Map; | ||
24 | +import java.util.Objects; | ||
25 | +import java.util.Set; | ||
26 | + | ||
27 | +import static com.google.common.cache.CacheBuilder.newBuilder; | ||
28 | + | ||
29 | +/** | ||
30 | + * Distributed implementation of the cluster nodes store. | ||
31 | + */ | ||
32 | +@Component(immediate = true) | ||
33 | +@Service | ||
34 | +public class DistributedMastershipStore extends AbstractDistributedStore | ||
35 | + implements MastershipStore { | ||
36 | + | ||
37 | + private IMap<byte[], byte[]> rawMasters; | ||
38 | + private LoadingCache<DeviceId, Optional<NodeId>> masters; | ||
39 | + | ||
40 | + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) | ||
41 | + protected ClusterService clusterService; | ||
42 | + | ||
43 | + @Activate | ||
44 | + public void activate() { | ||
45 | + super.activate(); | ||
46 | + | ||
47 | + rawMasters = theInstance.getMap("masters"); | ||
48 | + OptionalCacheLoader<DeviceId, NodeId> nodeLoader | ||
49 | + = new OptionalCacheLoader<>(storeService, rawMasters); | ||
50 | + masters = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader)); | ||
51 | + rawMasters.addEntryListener(new RemoteEventHandler<>(masters), true); | ||
52 | + | ||
53 | + log.info("Started"); | ||
54 | + } | ||
55 | + | ||
56 | + @Deactivate | ||
57 | + public void deactivate() { | ||
58 | + log.info("Stopped"); | ||
59 | + } | ||
60 | + | ||
61 | + @Override | ||
62 | + public MastershipEvent setRole(NodeId nodeId, DeviceId deviceId, MastershipRole role) { | ||
63 | + synchronized (this) { | ||
64 | + NodeId currentMaster = getMaster(deviceId); | ||
65 | + if (role == MastershipRole.MASTER && Objects.equals(currentMaster, nodeId)) { | ||
66 | + return null; | ||
67 | + } | ||
68 | + | ||
69 | + // FIXME: for now implementing semantics of setMaster | ||
70 | + rawMasters.put(serialize(deviceId), serialize(nodeId)); | ||
71 | + masters.put(deviceId, Optional.of(nodeId)); | ||
72 | + return new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, nodeId); | ||
73 | + } | ||
74 | + } | ||
75 | + | ||
76 | + @Override | ||
77 | + public NodeId getMaster(DeviceId deviceId) { | ||
78 | + return masters.getUnchecked(deviceId).orNull(); | ||
79 | + } | ||
80 | + | ||
81 | + @Override | ||
82 | + public Set<DeviceId> getDevices(NodeId nodeId) { | ||
83 | + ImmutableSet.Builder<DeviceId> builder = ImmutableSet.builder(); | ||
84 | + for (Map.Entry<DeviceId, Optional<NodeId>> entry : masters.asMap().entrySet()) { | ||
85 | + if (nodeId.equals(entry.getValue().get())) { | ||
86 | + builder.add(entry.getKey()); | ||
87 | + } | ||
88 | + } | ||
89 | + return builder.build(); | ||
90 | + } | ||
91 | + | ||
92 | + @Override | ||
93 | + public MastershipRole requestRole(DeviceId deviceId) { | ||
94 | + // FIXME: for now we are 'selecting' as master whoever asks | ||
95 | + setRole(clusterService.getLocalNode().id(), deviceId, MastershipRole.MASTER); | ||
96 | + return MastershipRole.MASTER; | ||
97 | + } | ||
98 | + | ||
99 | + @Override | ||
100 | + public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) { | ||
101 | + NodeId master = masters.getUnchecked(deviceId).orNull(); | ||
102 | + return nodeId.equals(master) ? MastershipRole.MASTER : MastershipRole.STANDBY; | ||
103 | + } | ||
104 | + | ||
105 | +} |
1 | package org.onlab.onos.store.device.impl; | 1 | package org.onlab.onos.store.device.impl; |
2 | 2 | ||
3 | import com.google.common.base.Optional; | 3 | import com.google.common.base.Optional; |
4 | -import com.google.common.cache.CacheBuilder; | ||
5 | import com.google.common.cache.LoadingCache; | 4 | import com.google.common.cache.LoadingCache; |
6 | import com.google.common.collect.ImmutableList; | 5 | import com.google.common.collect.ImmutableList; |
7 | import com.google.common.collect.ImmutableSet; | 6 | import com.google.common.collect.ImmutableSet; |
8 | import com.google.common.collect.ImmutableSet.Builder; | 7 | import com.google.common.collect.ImmutableSet.Builder; |
9 | -import com.hazelcast.core.EntryAdapter; | ||
10 | -import com.hazelcast.core.EntryEvent; | ||
11 | -import com.hazelcast.core.HazelcastInstance; | ||
12 | import com.hazelcast.core.IMap; | 8 | import com.hazelcast.core.IMap; |
13 | import com.hazelcast.core.ISet; | 9 | import com.hazelcast.core.ISet; |
14 | -import com.hazelcast.core.MapEvent; | ||
15 | - | ||
16 | import org.apache.felix.scr.annotations.Activate; | 10 | import org.apache.felix.scr.annotations.Activate; |
17 | import org.apache.felix.scr.annotations.Component; | 11 | import org.apache.felix.scr.annotations.Component; |
18 | import org.apache.felix.scr.annotations.Deactivate; | 12 | import org.apache.felix.scr.annotations.Deactivate; |
19 | -import org.apache.felix.scr.annotations.Reference; | ||
20 | -import org.apache.felix.scr.annotations.ReferenceCardinality; | ||
21 | import org.apache.felix.scr.annotations.Service; | 13 | import org.apache.felix.scr.annotations.Service; |
22 | import org.onlab.onos.net.DefaultDevice; | 14 | import org.onlab.onos.net.DefaultDevice; |
23 | import org.onlab.onos.net.DefaultPort; | 15 | import org.onlab.onos.net.DefaultPort; |
... | @@ -31,8 +23,8 @@ import org.onlab.onos.net.device.DeviceEvent; | ... | @@ -31,8 +23,8 @@ import org.onlab.onos.net.device.DeviceEvent; |
31 | import org.onlab.onos.net.device.DeviceStore; | 23 | import org.onlab.onos.net.device.DeviceStore; |
32 | import org.onlab.onos.net.device.PortDescription; | 24 | import org.onlab.onos.net.device.PortDescription; |
33 | import org.onlab.onos.net.provider.ProviderId; | 25 | import org.onlab.onos.net.provider.ProviderId; |
34 | -import org.onlab.onos.store.StoreService; | ||
35 | import org.onlab.onos.store.impl.AbsentInvalidatingLoadingCache; | 26 | import org.onlab.onos.store.impl.AbsentInvalidatingLoadingCache; |
27 | +import org.onlab.onos.store.impl.AbstractDistributedStore; | ||
36 | import org.onlab.onos.store.impl.OptionalCacheLoader; | 28 | import org.onlab.onos.store.impl.OptionalCacheLoader; |
37 | import org.slf4j.Logger; | 29 | import org.slf4j.Logger; |
38 | 30 | ||
... | @@ -47,17 +39,17 @@ import java.util.Objects; | ... | @@ -47,17 +39,17 @@ import java.util.Objects; |
47 | import java.util.Set; | 39 | import java.util.Set; |
48 | 40 | ||
49 | import static com.google.common.base.Preconditions.checkArgument; | 41 | import static com.google.common.base.Preconditions.checkArgument; |
50 | -import static com.google.common.base.Preconditions.checkNotNull; | 42 | +import static com.google.common.cache.CacheBuilder.newBuilder; |
51 | import static org.onlab.onos.net.device.DeviceEvent.Type.*; | 43 | import static org.onlab.onos.net.device.DeviceEvent.Type.*; |
52 | import static org.slf4j.LoggerFactory.getLogger; | 44 | import static org.slf4j.LoggerFactory.getLogger; |
53 | 45 | ||
54 | - | ||
55 | /** | 46 | /** |
56 | * Manages inventory of infrastructure devices using Hazelcast-backed map. | 47 | * Manages inventory of infrastructure devices using Hazelcast-backed map. |
57 | */ | 48 | */ |
58 | @Component(immediate = true) | 49 | @Component(immediate = true) |
59 | @Service | 50 | @Service |
60 | -public class DistributedDeviceStore implements DeviceStore { | 51 | +public class DistributedDeviceStore extends AbstractDistributedStore |
52 | + implements DeviceStore { | ||
61 | 53 | ||
62 | private final Logger log = getLogger(getClass()); | 54 | private final Logger log = getLogger(getClass()); |
63 | 55 | ||
... | @@ -79,16 +71,9 @@ public class DistributedDeviceStore implements DeviceStore { | ... | @@ -79,16 +71,9 @@ public class DistributedDeviceStore implements DeviceStore { |
79 | private IMap<byte[], byte[]> rawDevicePorts; | 71 | private IMap<byte[], byte[]> rawDevicePorts; |
80 | private LoadingCache<DeviceId, Optional<Map<PortNumber, Port>>> devicePorts; | 72 | private LoadingCache<DeviceId, Optional<Map<PortNumber, Port>>> devicePorts; |
81 | 73 | ||
82 | - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) | ||
83 | - protected StoreService storeService; | ||
84 | - | ||
85 | - protected HazelcastInstance theInstance; | ||
86 | - | ||
87 | - | ||
88 | @Activate | 74 | @Activate |
89 | public void activate() { | 75 | public void activate() { |
90 | - log.info("Started"); | 76 | + super.activate(); |
91 | - theInstance = storeService.getHazelcastInstance(); | ||
92 | 77 | ||
93 | // IMap event handler needs value | 78 | // IMap event handler needs value |
94 | final boolean includeValue = true; | 79 | final boolean includeValue = true; |
... | @@ -96,40 +81,29 @@ public class DistributedDeviceStore implements DeviceStore { | ... | @@ -96,40 +81,29 @@ public class DistributedDeviceStore implements DeviceStore { |
96 | // TODO decide on Map name scheme to avoid collision | 81 | // TODO decide on Map name scheme to avoid collision |
97 | rawDevices = theInstance.getMap("devices"); | 82 | rawDevices = theInstance.getMap("devices"); |
98 | final OptionalCacheLoader<DeviceId, DefaultDevice> deviceLoader | 83 | final OptionalCacheLoader<DeviceId, DefaultDevice> deviceLoader |
99 | - = new OptionalCacheLoader<>(storeService, rawDevices); | 84 | + = new OptionalCacheLoader<>(storeService, rawDevices); |
100 | - devices = new AbsentInvalidatingLoadingCache<>( | 85 | + devices = new AbsentInvalidatingLoadingCache<>(newBuilder().build(deviceLoader)); |
101 | - CacheBuilder.newBuilder() | ||
102 | - .build(deviceLoader)); | ||
103 | // refresh/populate cache based on notification from other instance | 86 | // refresh/populate cache based on notification from other instance |
104 | - rawDevices.addEntryListener( | 87 | + rawDevices.addEntryListener(new RemoteEventHandler<>(devices), includeValue); |
105 | - new RemoteEventHandler<>(devices), | ||
106 | - includeValue); | ||
107 | 88 | ||
108 | rawRoles = theInstance.getMap("roles"); | 89 | rawRoles = theInstance.getMap("roles"); |
109 | final OptionalCacheLoader<DeviceId, MastershipRole> rolesLoader | 90 | final OptionalCacheLoader<DeviceId, MastershipRole> rolesLoader |
110 | - = new OptionalCacheLoader<>(storeService, rawRoles); | 91 | + = new OptionalCacheLoader<>(storeService, rawRoles); |
111 | - roles = new AbsentInvalidatingLoadingCache<>( | 92 | + roles = new AbsentInvalidatingLoadingCache<>(newBuilder().build(rolesLoader)); |
112 | - CacheBuilder.newBuilder() | ||
113 | - .build(rolesLoader)); | ||
114 | // refresh/populate cache based on notification from other instance | 93 | // refresh/populate cache based on notification from other instance |
115 | - rawRoles.addEntryListener( | 94 | + rawRoles.addEntryListener(new RemoteEventHandler<>(roles), includeValue); |
116 | - new RemoteEventHandler<>(roles), | ||
117 | - includeValue); | ||
118 | 95 | ||
119 | // TODO cache availableDevices | 96 | // TODO cache availableDevices |
120 | availableDevices = theInstance.getSet("availableDevices"); | 97 | availableDevices = theInstance.getSet("availableDevices"); |
121 | 98 | ||
122 | rawDevicePorts = theInstance.getMap("devicePorts"); | 99 | rawDevicePorts = theInstance.getMap("devicePorts"); |
123 | final OptionalCacheLoader<DeviceId, Map<PortNumber, Port>> devicePortLoader | 100 | final OptionalCacheLoader<DeviceId, Map<PortNumber, Port>> devicePortLoader |
124 | - = new OptionalCacheLoader<>(storeService, rawDevicePorts); | 101 | + = new OptionalCacheLoader<>(storeService, rawDevicePorts); |
125 | - devicePorts = new AbsentInvalidatingLoadingCache<>( | 102 | + devicePorts = new AbsentInvalidatingLoadingCache<>(newBuilder().build(devicePortLoader)); |
126 | - CacheBuilder.newBuilder() | ||
127 | - .build(devicePortLoader)); | ||
128 | // refresh/populate cache based on notification from other instance | 103 | // refresh/populate cache based on notification from other instance |
129 | - rawDevicePorts.addEntryListener( | 104 | + rawDevicePorts.addEntryListener(new RemoteEventHandler<>(devicePorts), includeValue); |
130 | - new RemoteEventHandler<>(devicePorts), | ||
131 | - includeValue); | ||
132 | 105 | ||
106 | + log.info("Started"); | ||
133 | } | 107 | } |
134 | 108 | ||
135 | @Deactivate | 109 | @Deactivate |
... | @@ -369,25 +343,6 @@ public class DistributedDeviceStore implements DeviceStore { | ... | @@ -369,25 +343,6 @@ public class DistributedDeviceStore implements DeviceStore { |
369 | } | 343 | } |
370 | 344 | ||
371 | @Override | 345 | @Override |
372 | - public MastershipRole getRole(DeviceId deviceId) { | ||
373 | - MastershipRole role = roles.getUnchecked(deviceId).orNull(); | ||
374 | - return role != null ? role : MastershipRole.NONE; | ||
375 | - } | ||
376 | - | ||
377 | - @Override | ||
378 | - public DeviceEvent setRole(DeviceId deviceId, MastershipRole role) { | ||
379 | - synchronized (this) { | ||
380 | - Device device = getDevice(deviceId); | ||
381 | - checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); | ||
382 | - MastershipRole oldRole = deserialize( | ||
383 | - rawRoles.put(serialize(deviceId), serialize(role))); | ||
384 | - roles.put(deviceId, Optional.of(role)); | ||
385 | - return oldRole == role ? null : | ||
386 | - new DeviceEvent(DEVICE_MASTERSHIP_CHANGED, device, null); | ||
387 | - } | ||
388 | - } | ||
389 | - | ||
390 | - @Override | ||
391 | public DeviceEvent removeDevice(DeviceId deviceId) { | 346 | public DeviceEvent removeDevice(DeviceId deviceId) { |
392 | synchronized (this) { | 347 | synchronized (this) { |
393 | byte[] deviceIdBytes = serialize(deviceId); | 348 | byte[] deviceIdBytes = serialize(deviceId); |
... | @@ -403,54 +358,5 @@ public class DistributedDeviceStore implements DeviceStore { | ... | @@ -403,54 +358,5 @@ public class DistributedDeviceStore implements DeviceStore { |
403 | } | 358 | } |
404 | 359 | ||
405 | // TODO cache serialized DeviceID if we suffer from serialization cost | 360 | // TODO cache serialized DeviceID if we suffer from serialization cost |
406 | - private byte[] serialize(final Object obj) { | ||
407 | - return storeService.serialize(obj); | ||
408 | - } | ||
409 | - | ||
410 | - private <T> T deserialize(final byte[] bytes) { | ||
411 | - return storeService.deserialize(bytes); | ||
412 | - } | ||
413 | - | ||
414 | - /** | ||
415 | - * An IMap EntryListener, which reflects each remote event to cache. | ||
416 | - * | ||
417 | - * @param <K> IMap key type after deserialization | ||
418 | - * @param <V> IMap value type after deserialization | ||
419 | - */ | ||
420 | - public final class RemoteEventHandler<K, V> extends | ||
421 | - EntryAdapter<byte[], byte[]> { | ||
422 | - | ||
423 | - private LoadingCache<K, Optional<V>> cache; | ||
424 | - | ||
425 | - /** | ||
426 | - * Constructor. | ||
427 | - * | ||
428 | - * @param cache cache to update | ||
429 | - */ | ||
430 | - public RemoteEventHandler( | ||
431 | - LoadingCache<K, Optional<V>> cache) { | ||
432 | - this.cache = checkNotNull(cache); | ||
433 | - } | ||
434 | - | ||
435 | - @Override | ||
436 | - public void mapCleared(MapEvent event) { | ||
437 | - cache.invalidateAll(); | ||
438 | - } | ||
439 | - | ||
440 | - @Override | ||
441 | - public void entryUpdated(EntryEvent<byte[], byte[]> event) { | ||
442 | - cache.put(storeService.<K>deserialize(event.getKey()), | ||
443 | - Optional.of(storeService.<V>deserialize(event.getValue()))); | ||
444 | - } | ||
445 | 361 | ||
446 | - @Override | ||
447 | - public void entryRemoved(EntryEvent<byte[], byte[]> event) { | ||
448 | - cache.invalidate(storeService.<K>deserialize(event.getKey())); | ||
449 | - } | ||
450 | - | ||
451 | - @Override | ||
452 | - public void entryAdded(EntryEvent<byte[], byte[]> event) { | ||
453 | - entryUpdated(event); | ||
454 | - } | ||
455 | - } | ||
456 | } | 362 | } | ... | ... |
1 | +package org.onlab.onos.store.impl; | ||
2 | + | ||
3 | +import com.google.common.base.Optional; | ||
4 | +import com.google.common.cache.LoadingCache; | ||
5 | +import com.hazelcast.core.EntryAdapter; | ||
6 | +import com.hazelcast.core.EntryEvent; | ||
7 | +import com.hazelcast.core.HazelcastInstance; | ||
8 | +import com.hazelcast.core.MapEvent; | ||
9 | +import org.apache.felix.scr.annotations.Activate; | ||
10 | +import org.apache.felix.scr.annotations.Component; | ||
11 | +import org.apache.felix.scr.annotations.Reference; | ||
12 | +import org.apache.felix.scr.annotations.ReferenceCardinality; | ||
13 | +import org.onlab.onos.store.StoreService; | ||
14 | +import org.slf4j.Logger; | ||
15 | + | ||
16 | +import static com.google.common.base.Preconditions.checkNotNull; | ||
17 | +import static org.slf4j.LoggerFactory.getLogger; | ||
18 | + | ||
19 | +/** | ||
20 | + * Abstraction of a distributed store based on Hazelcast. | ||
21 | + */ | ||
22 | +@Component(componentAbstract = true) | ||
23 | +public abstract class AbstractDistributedStore { | ||
24 | + | ||
25 | + protected final Logger log = getLogger(getClass()); | ||
26 | + | ||
27 | + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) | ||
28 | + protected StoreService storeService; | ||
29 | + | ||
30 | + protected HazelcastInstance theInstance; | ||
31 | + | ||
32 | + @Activate | ||
33 | + public void activate() { | ||
34 | + theInstance = storeService.getHazelcastInstance(); | ||
35 | + } | ||
36 | + | ||
37 | + /** | ||
38 | + * Serializes the specified object using the backing store service. | ||
39 | + * | ||
40 | + * @param obj object to be serialized | ||
41 | + * @return serialized object | ||
42 | + */ | ||
43 | + protected byte[] serialize(Object obj) { | ||
44 | + return storeService.serialize(obj); | ||
45 | + } | ||
46 | + | ||
47 | + /** | ||
48 | + * Deserializes the specified object using the backing store service. | ||
49 | + * | ||
50 | + * @param bytes bytes to be deserialized | ||
51 | + * @param <T> type of object | ||
52 | + * @return deserialized object | ||
53 | + */ | ||
54 | + protected <T> T deserialize(byte[] bytes) { | ||
55 | + return storeService.deserialize(bytes); | ||
56 | + } | ||
57 | + | ||
58 | + | ||
59 | + /** | ||
60 | + * An IMap entry listener, which reflects each remote event to the cache. | ||
61 | + * | ||
62 | + * @param <K> IMap key type after deserialization | ||
63 | + * @param <V> IMap value type after deserialization | ||
64 | + */ | ||
65 | + public final class RemoteEventHandler<K, V> extends EntryAdapter<byte[], byte[]> { | ||
66 | + | ||
67 | + private LoadingCache<K, Optional<V>> cache; | ||
68 | + | ||
69 | + /** | ||
70 | + * Constructor. | ||
71 | + * | ||
72 | + * @param cache cache to update | ||
73 | + */ | ||
74 | + public RemoteEventHandler(LoadingCache<K, Optional<V>> cache) { | ||
75 | + this.cache = checkNotNull(cache); | ||
76 | + } | ||
77 | + | ||
78 | + @Override | ||
79 | + public void mapCleared(MapEvent event) { | ||
80 | + cache.invalidateAll(); | ||
81 | + } | ||
82 | + | ||
83 | + @Override | ||
84 | + public void entryUpdated(EntryEvent<byte[], byte[]> event) { | ||
85 | + cache.put(storeService.<K>deserialize(event.getKey()), | ||
86 | + Optional.of(storeService.<V>deserialize(event.getValue()))); | ||
87 | + } | ||
88 | + | ||
89 | + @Override | ||
90 | + public void entryRemoved(EntryEvent<byte[], byte[]> event) { | ||
91 | + cache.invalidate(storeService.<K>deserialize(event.getKey())); | ||
92 | + } | ||
93 | + | ||
94 | + @Override | ||
95 | + public void entryAdded(EntryEvent<byte[], byte[]> event) { | ||
96 | + entryUpdated(event); | ||
97 | + } | ||
98 | + } | ||
99 | + | ||
100 | +} |
... | @@ -9,6 +9,9 @@ import org.apache.felix.scr.annotations.Activate; | ... | @@ -9,6 +9,9 @@ import org.apache.felix.scr.annotations.Activate; |
9 | import org.apache.felix.scr.annotations.Component; | 9 | import org.apache.felix.scr.annotations.Component; |
10 | import org.apache.felix.scr.annotations.Deactivate; | 10 | import org.apache.felix.scr.annotations.Deactivate; |
11 | import org.apache.felix.scr.annotations.Service; | 11 | import org.apache.felix.scr.annotations.Service; |
12 | +import org.onlab.onos.cluster.ControllerNode; | ||
13 | +import org.onlab.onos.cluster.DefaultControllerNode; | ||
14 | +import org.onlab.onos.cluster.NodeId; | ||
12 | import org.onlab.onos.net.DefaultDevice; | 15 | import org.onlab.onos.net.DefaultDevice; |
13 | import org.onlab.onos.net.DefaultPort; | 16 | import org.onlab.onos.net.DefaultPort; |
14 | import org.onlab.onos.net.Device; | 17 | import org.onlab.onos.net.Device; |
... | @@ -21,8 +24,11 @@ import org.onlab.onos.net.provider.ProviderId; | ... | @@ -21,8 +24,11 @@ import org.onlab.onos.net.provider.ProviderId; |
21 | import org.onlab.onos.store.StoreService; | 24 | import org.onlab.onos.store.StoreService; |
22 | import org.onlab.onos.store.serializers.DefaultPortSerializer; | 25 | import org.onlab.onos.store.serializers.DefaultPortSerializer; |
23 | import org.onlab.onos.store.serializers.DeviceIdSerializer; | 26 | import org.onlab.onos.store.serializers.DeviceIdSerializer; |
27 | +import org.onlab.onos.store.serializers.IpPrefixSerializer; | ||
28 | +import org.onlab.onos.store.serializers.NodeIdSerializer; | ||
24 | import org.onlab.onos.store.serializers.PortNumberSerializer; | 29 | import org.onlab.onos.store.serializers.PortNumberSerializer; |
25 | import org.onlab.onos.store.serializers.ProviderIdSerializer; | 30 | import org.onlab.onos.store.serializers.ProviderIdSerializer; |
31 | +import org.onlab.packet.IpPrefix; | ||
26 | import org.onlab.util.KryoPool; | 32 | import org.onlab.util.KryoPool; |
27 | import org.slf4j.Logger; | 33 | import org.slf4j.Logger; |
28 | import org.slf4j.LoggerFactory; | 34 | import org.slf4j.LoggerFactory; |
... | @@ -65,18 +71,21 @@ public class StoreManager implements StoreService { | ... | @@ -65,18 +71,21 @@ public class StoreManager implements StoreService { |
65 | protected void setupKryoPool() { | 71 | protected void setupKryoPool() { |
66 | // FIXME Slice out types used in common to separate pool/namespace. | 72 | // FIXME Slice out types used in common to separate pool/namespace. |
67 | serializerPool = KryoPool.newBuilder() | 73 | serializerPool = KryoPool.newBuilder() |
68 | - .register( | 74 | + .register(ArrayList.class, |
69 | - ArrayList.class, | 75 | + HashMap.class, |
70 | - HashMap.class, | ||
71 | 76 | ||
72 | - Device.Type.class, | 77 | + ControllerNode.State.class, |
78 | + Device.Type.class, | ||
73 | 79 | ||
74 | - DefaultDevice.class, | 80 | + DefaultControllerNode.class, |
75 | - MastershipRole.class, | 81 | + DefaultDevice.class, |
76 | - Port.class, | 82 | + MastershipRole.class, |
77 | - Element.class | 83 | + Port.class, |
84 | + Element.class | ||
78 | ) | 85 | ) |
86 | + .register(IpPrefix.class, new IpPrefixSerializer()) | ||
79 | .register(URI.class, new URISerializer()) | 87 | .register(URI.class, new URISerializer()) |
88 | + .register(NodeId.class, new NodeIdSerializer()) | ||
80 | .register(ProviderId.class, new ProviderIdSerializer()) | 89 | .register(ProviderId.class, new ProviderIdSerializer()) |
81 | .register(DeviceId.class, new DeviceIdSerializer()) | 90 | .register(DeviceId.class, new DeviceIdSerializer()) |
82 | .register(PortNumber.class, new PortNumberSerializer()) | 91 | .register(PortNumber.class, new PortNumberSerializer()) | ... | ... |
1 | +package org.onlab.onos.store.serializers; | ||
2 | + | ||
3 | +import com.esotericsoftware.kryo.Kryo; | ||
4 | +import com.esotericsoftware.kryo.Serializer; | ||
5 | +import com.esotericsoftware.kryo.io.Input; | ||
6 | +import com.esotericsoftware.kryo.io.Output; | ||
7 | +import org.onlab.onos.cluster.NodeId; | ||
8 | + | ||
9 | +/** | ||
10 | + * Kryo Serializer for {@link org.onlab.onos.cluster.NodeId}. | ||
11 | + */ | ||
12 | +public final class NodeIdSerializer extends Serializer<NodeId> { | ||
13 | + | ||
14 | + @Override | ||
15 | + public void write(Kryo kryo, Output output, NodeId object) { | ||
16 | + kryo.writeObject(output, object.toString()); | ||
17 | + } | ||
18 | + | ||
19 | + @Override | ||
20 | + public NodeId read(Kryo kryo, Input input, Class<NodeId> type) { | ||
21 | + final String id = kryo.readObject(input, String.class); | ||
22 | + return new NodeId(id); | ||
23 | + } | ||
24 | +} |
... | @@ -62,4 +62,8 @@ public class SimpleClusterStore implements ClusterStore { | ... | @@ -62,4 +62,8 @@ public class SimpleClusterStore implements ClusterStore { |
62 | return ControllerNode.State.ACTIVE; | 62 | return ControllerNode.State.ACTIVE; |
63 | } | 63 | } |
64 | 64 | ||
65 | + @Override | ||
66 | + public void removeNode(NodeId nodeId) { | ||
67 | + } | ||
68 | + | ||
65 | } | 69 | } | ... | ... |
... | @@ -248,23 +248,6 @@ public class SimpleDeviceStore implements DeviceStore { | ... | @@ -248,23 +248,6 @@ public class SimpleDeviceStore implements DeviceStore { |
248 | } | 248 | } |
249 | 249 | ||
250 | @Override | 250 | @Override |
251 | - public MastershipRole getRole(DeviceId deviceId) { | ||
252 | - MastershipRole role = roles.get(deviceId); | ||
253 | - return role != null ? role : MastershipRole.NONE; | ||
254 | - } | ||
255 | - | ||
256 | - @Override | ||
257 | - public DeviceEvent setRole(DeviceId deviceId, MastershipRole role) { | ||
258 | - synchronized (this) { | ||
259 | - Device device = getDevice(deviceId); | ||
260 | - checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); | ||
261 | - MastershipRole oldRole = roles.put(deviceId, role); | ||
262 | - return oldRole == role ? null : | ||
263 | - new DeviceEvent(DEVICE_MASTERSHIP_CHANGED, device, null); | ||
264 | - } | ||
265 | - } | ||
266 | - | ||
267 | - @Override | ||
268 | public DeviceEvent removeDevice(DeviceId deviceId) { | 251 | public DeviceEvent removeDevice(DeviceId deviceId) { |
269 | synchronized (this) { | 252 | synchronized (this) { |
270 | roles.remove(deviceId); | 253 | roles.remove(deviceId); | ... | ... |
... | @@ -38,7 +38,7 @@ public class SimpleMastershipStore implements MastershipStore { | ... | @@ -38,7 +38,7 @@ public class SimpleMastershipStore implements MastershipStore { |
38 | private ControllerNode instance; | 38 | private ControllerNode instance; |
39 | 39 | ||
40 | protected final ConcurrentMap<DeviceId, MastershipRole> roleMap = | 40 | protected final ConcurrentMap<DeviceId, MastershipRole> roleMap = |
41 | - new ConcurrentHashMap<DeviceId, MastershipRole>(); | 41 | + new ConcurrentHashMap<>(); |
42 | 42 | ||
43 | @Activate | 43 | @Activate |
44 | public void activate() { | 44 | public void activate() { |
... | @@ -53,7 +53,7 @@ public class SimpleMastershipStore implements MastershipStore { | ... | @@ -53,7 +53,7 @@ public class SimpleMastershipStore implements MastershipStore { |
53 | 53 | ||
54 | @Override | 54 | @Override |
55 | public MastershipEvent setRole(NodeId nodeId, DeviceId deviceId, | 55 | public MastershipEvent setRole(NodeId nodeId, DeviceId deviceId, |
56 | - MastershipRole role) { | 56 | + MastershipRole role) { |
57 | if (roleMap.get(deviceId) == null) { | 57 | if (roleMap.get(deviceId) == null) { |
58 | return null; | 58 | return null; |
59 | } | 59 | } |
... | @@ -62,14 +62,6 @@ public class SimpleMastershipStore implements MastershipStore { | ... | @@ -62,14 +62,6 @@ public class SimpleMastershipStore implements MastershipStore { |
62 | } | 62 | } |
63 | 63 | ||
64 | @Override | 64 | @Override |
65 | - public MastershipEvent addOrUpdateDevice(NodeId instance, | ||
66 | - DeviceId deviceId, MastershipRole role) { | ||
67 | - //TODO refine when we do listeners | ||
68 | - roleMap.put(deviceId, role); | ||
69 | - return null; | ||
70 | - } | ||
71 | - | ||
72 | - @Override | ||
73 | public NodeId getMaster(DeviceId deviceId) { | 65 | public NodeId getMaster(DeviceId deviceId) { |
74 | return instance.id(); | 66 | return instance.id(); |
75 | } | 67 | } |
... | @@ -80,6 +72,11 @@ public class SimpleMastershipStore implements MastershipStore { | ... | @@ -80,6 +72,11 @@ public class SimpleMastershipStore implements MastershipStore { |
80 | } | 72 | } |
81 | 73 | ||
82 | @Override | 74 | @Override |
75 | + public MastershipRole requestRole(DeviceId deviceId) { | ||
76 | + return getRole(instance.id(), deviceId); | ||
77 | + } | ||
78 | + | ||
79 | + @Override | ||
83 | public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) { | 80 | public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) { |
84 | MastershipRole role = roleMap.get(deviceId); | 81 | MastershipRole role = roleMap.get(deviceId); |
85 | if (role == null) { | 82 | if (role == null) { | ... | ... |
... | @@ -51,7 +51,7 @@ perl -pi.old -e "s|^(featuresRepositories=.*)|\1,mvn:org.onlab.onos/onos-feature | ... | @@ -51,7 +51,7 @@ perl -pi.old -e "s|^(featuresRepositories=.*)|\1,mvn:org.onlab.onos/onos-feature |
51 | $ONOS_STAGE/$KARAF_DIST/etc/org.apache.karaf.features.cfg | 51 | $ONOS_STAGE/$KARAF_DIST/etc/org.apache.karaf.features.cfg |
52 | 52 | ||
53 | # Patch the Apache Karaf distribution file to load ONOS features | 53 | # Patch the Apache Karaf distribution file to load ONOS features |
54 | -perl -pi.old -e 's|^(featuresBoot=.*)|\1,onos-api,onos-core-trivial,onos-cli,onos-rest,onos-gui,onos-openflow,onos-app-tvue,onos-app-fwd|' \ | 54 | +perl -pi.old -e 's|^(featuresBoot=.*)|\1,onos-api,onos-core,onos-cli,onos-rest,onos-gui,onos-openflow,onos-app-fwd|' \ |
55 | $ONOS_STAGE/$KARAF_DIST/etc/org.apache.karaf.features.cfg | 55 | $ONOS_STAGE/$KARAF_DIST/etc/org.apache.karaf.features.cfg |
56 | 56 | ||
57 | # Patch the Apache Karaf distribution with ONOS branding bundle | 57 | # Patch the Apache Karaf distribution with ONOS branding bundle | ... | ... |
... | @@ -13,6 +13,11 @@ respawn | ... | @@ -13,6 +13,11 @@ respawn |
13 | env LANG=en_US.UTF-8 | 13 | env LANG=en_US.UTF-8 |
14 | env JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64 | 14 | env JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64 |
15 | 15 | ||
16 | +pre-stop script | ||
17 | + /opt/onos/bin/onos halt 2>/opt/onos/var/stderr.log | ||
18 | + sleep 3 | ||
19 | +end script | ||
20 | + | ||
16 | script | 21 | script |
17 | [ -f /opt/onos/options ] && . /opt/onos/options | 22 | [ -f /opt/onos/options ] && . /opt/onos/options |
18 | start-stop-daemon --signal INT --start --chuid sdn \ | 23 | start-stop-daemon --signal INT --start --chuid sdn \ | ... | ... |
... | @@ -24,8 +24,9 @@ ssh $remote " | ... | @@ -24,8 +24,9 @@ ssh $remote " |
24 | ln -s $ONOS_INSTALL_DIR/$KARAF_DIST/data/log /opt/onos/log | 24 | ln -s $ONOS_INSTALL_DIR/$KARAF_DIST/data/log /opt/onos/log |
25 | mkdir $ONOS_INSTALL_DIR/var | 25 | mkdir $ONOS_INSTALL_DIR/var |
26 | 26 | ||
27 | - # Install the upstart configuration file. | 27 | + # Install the upstart configuration file and setup options for debugging |
28 | sudo cp $ONOS_INSTALL_DIR/debian/onos.conf /etc/init/onos.conf | 28 | sudo cp $ONOS_INSTALL_DIR/debian/onos.conf /etc/init/onos.conf |
29 | + echo 'export ONOS_OPTS=debug' > $ONOS_INSTALL_DIR/options | ||
29 | 30 | ||
30 | # Remove any previous ON.Lab bits from ~/.m2 repo | 31 | # Remove any previous ON.Lab bits from ~/.m2 repo |
31 | rm -fr ~/.m2/repository/org/onlab | 32 | rm -fr ~/.m2/repository/org/onlab | ... | ... |
... | @@ -10,7 +10,5 @@ remote=$ONOS_USER@${1:-$OCI} | ... | @@ -10,7 +10,5 @@ remote=$ONOS_USER@${1:-$OCI} |
10 | 10 | ||
11 | ssh $remote " | 11 | ssh $remote " |
12 | sudo service onos stop 1>/dev/null 2>/dev/null | 12 | sudo service onos stop 1>/dev/null 2>/dev/null |
13 | - [ -f $ONOS_INSTALL_DIR/bin/onos ] && \ | ||
14 | - $ONOS_INSTALL_DIR/bin/onos halt 2>/dev/null | ||
15 | sudo rm -fr $ONOS_INSTALL_DIR | 13 | sudo rm -fr $ONOS_INSTALL_DIR |
16 | " | 14 | " | ... | ... |
-
Please register or login to post a comment