Hyunsun Moon
Committed by Gerrit Code Review

CORD-151 Implement initial compute node setup

Followings are changed
- Changed nodeStore from eventually consistent map to consistent map
- Removed ovsdb connection management(ovsdb controller has connection status)
- Not only one leader but all onos instances make ovsdb session

Following jobs are done
- Reads compute node and ovsdb access info from network config
- Initiates ovsdb connection to the nodes
- Creates integration bridge on each ovsdbs
- Creates vxlan tunnel port on each integration bridges

Change-Id: I8df4061fcb1eae9b0abd545b7a3f540be50607a9
...@@ -49,6 +49,11 @@ ...@@ -49,6 +49,11 @@
49 <artifactId>onos-core-serializers</artifactId> 49 <artifactId>onos-core-serializers</artifactId>
50 <version>${project.version}</version> 50 <version>${project.version}</version>
51 </dependency> 51 </dependency>
52 + <dependency>
53 + <groupId>org.onosproject</groupId>
54 + <artifactId>onos-ovsdb-api</artifactId>
55 + <version>${project.version}</version>
56 + </dependency>
52 </dependencies> 57 </dependencies>
53 58
54 </project> 59 </project>
......
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
15 */ 15 */
16 package org.onosproject.cordvtn; 16 package org.onosproject.cordvtn;
17 17
18 +import com.google.common.collect.Collections2;
19 +import com.google.common.collect.Sets;
18 import org.apache.felix.scr.annotations.Activate; 20 import org.apache.felix.scr.annotations.Activate;
19 import org.apache.felix.scr.annotations.Component; 21 import org.apache.felix.scr.annotations.Component;
20 import org.apache.felix.scr.annotations.Deactivate; 22 import org.apache.felix.scr.annotations.Deactivate;
...@@ -22,31 +24,39 @@ import org.apache.felix.scr.annotations.Reference; ...@@ -22,31 +24,39 @@ import org.apache.felix.scr.annotations.Reference;
22 import org.apache.felix.scr.annotations.ReferenceCardinality; 24 import org.apache.felix.scr.annotations.ReferenceCardinality;
23 import org.apache.felix.scr.annotations.Service; 25 import org.apache.felix.scr.annotations.Service;
24 import org.onlab.util.KryoNamespace; 26 import org.onlab.util.KryoNamespace;
27 +import org.onosproject.cluster.ClusterService;
28 +import org.onosproject.core.ApplicationId;
25 import org.onosproject.core.CoreService; 29 import org.onosproject.core.CoreService;
30 +import org.onosproject.mastership.MastershipService;
26 import org.onosproject.net.Device; 31 import org.onosproject.net.Device;
27 import org.onosproject.net.DeviceId; 32 import org.onosproject.net.DeviceId;
28 import org.onosproject.net.Host; 33 import org.onosproject.net.Host;
34 +import org.onosproject.net.behaviour.ControllerInfo;
29 import org.onosproject.net.device.DeviceEvent; 35 import org.onosproject.net.device.DeviceEvent;
30 import org.onosproject.net.device.DeviceListener; 36 import org.onosproject.net.device.DeviceListener;
31 import org.onosproject.net.device.DeviceService; 37 import org.onosproject.net.device.DeviceService;
32 import org.onosproject.net.host.HostEvent; 38 import org.onosproject.net.host.HostEvent;
33 import org.onosproject.net.host.HostListener; 39 import org.onosproject.net.host.HostListener;
34 import org.onosproject.net.host.HostService; 40 import org.onosproject.net.host.HostService;
41 +import org.onosproject.ovsdb.controller.OvsdbClientService;
42 +import org.onosproject.ovsdb.controller.OvsdbController;
43 +import org.onosproject.ovsdb.controller.OvsdbNodeId;
35 import org.onosproject.store.serializers.KryoNamespaces; 44 import org.onosproject.store.serializers.KryoNamespaces;
36 -import org.onosproject.store.service.EventuallyConsistentMap; 45 +import org.onosproject.store.service.ConsistentMap;
37 -import org.onosproject.store.service.LogicalClockService; 46 +import org.onosproject.store.service.Serializer;
38 import org.onosproject.store.service.StorageService; 47 import org.onosproject.store.service.StorageService;
48 +import org.onosproject.store.service.Versioned;
39 import org.slf4j.Logger; 49 import org.slf4j.Logger;
40 50
51 +import java.util.ArrayList;
52 +import java.util.HashMap;
41 import java.util.List; 53 import java.util.List;
54 +import java.util.Map;
42 import java.util.concurrent.ExecutorService; 55 import java.util.concurrent.ExecutorService;
43 import java.util.concurrent.Executors; 56 import java.util.concurrent.Executors;
44 -import java.util.stream.Collectors;
45 57
58 +import static com.google.common.base.Preconditions.checkNotNull;
46 import static org.onlab.util.Tools.groupedThreads; 59 import static org.onlab.util.Tools.groupedThreads;
47 -import static org.onosproject.cordvtn.OvsdbNode.State;
48 -import static org.onosproject.cordvtn.OvsdbNode.State.INIT;
49 -import static org.onosproject.cordvtn.OvsdbNode.State.DISCONNECT;
50 import static org.onosproject.net.Device.Type.SWITCH; 60 import static org.onosproject.net.Device.Type.SWITCH;
51 import static org.slf4j.LoggerFactory.getLogger; 61 import static org.slf4j.LoggerFactory.getLogger;
52 62
...@@ -63,7 +73,17 @@ public class CordVtn implements CordVtnService { ...@@ -63,7 +73,17 @@ public class CordVtn implements CordVtnService {
63 private static final int NUM_THREADS = 1; 73 private static final int NUM_THREADS = 1;
64 private static final KryoNamespace.Builder NODE_SERIALIZER = KryoNamespace.newBuilder() 74 private static final KryoNamespace.Builder NODE_SERIALIZER = KryoNamespace.newBuilder()
65 .register(KryoNamespaces.API) 75 .register(KryoNamespaces.API)
66 - .register(OvsdbNode.class); 76 + .register(DefaultOvsdbNode.class);
77 + private static final String DEFAULT_BRIDGE_NAME = "br-int";
78 + private static final Map<String, String> VXLAN_OPTIONS = new HashMap<String, String>() {
79 + {
80 + put("key", "flow");
81 + put("local_ip", "flow");
82 + put("remote_ip", "flow");
83 + }
84 + };
85 + private static final int DPID_BEGIN = 3;
86 + private static final int OFPORT = 6653;
67 87
68 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 88 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
69 protected CoreService coreService; 89 protected CoreService coreService;
...@@ -72,14 +92,20 @@ public class CordVtn implements CordVtnService { ...@@ -72,14 +92,20 @@ public class CordVtn implements CordVtnService {
72 protected StorageService storageService; 92 protected StorageService storageService;
73 93
74 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 94 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
75 - protected LogicalClockService clockService;
76 -
77 - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
78 protected DeviceService deviceService; 95 protected DeviceService deviceService;
79 96
80 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 97 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
81 protected HostService hostService; 98 protected HostService hostService;
82 99
100 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
101 + protected MastershipService mastershipService;
102 +
103 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
104 + protected OvsdbController controller;
105 +
106 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
107 + protected ClusterService clusterService;
108 +
83 private final ExecutorService eventExecutor = Executors 109 private final ExecutorService eventExecutor = Executors
84 .newFixedThreadPool(NUM_THREADS, groupedThreads("onos/cordvtn", "event-handler")); 110 .newFixedThreadPool(NUM_THREADS, groupedThreads("onos/cordvtn", "event-handler"));
85 111
...@@ -90,15 +116,16 @@ public class CordVtn implements CordVtnService { ...@@ -90,15 +116,16 @@ public class CordVtn implements CordVtnService {
90 private final BridgeHandler bridgeHandler = new BridgeHandler(); 116 private final BridgeHandler bridgeHandler = new BridgeHandler();
91 private final VmHandler vmHandler = new VmHandler(); 117 private final VmHandler vmHandler = new VmHandler();
92 118
93 - private EventuallyConsistentMap<DeviceId, OvsdbNode> nodeStore; 119 + private ConsistentMap<DeviceId, OvsdbNode> nodeStore;
120 + private ApplicationId appId;
94 121
95 @Activate 122 @Activate
96 protected void activate() { 123 protected void activate() {
97 - coreService.registerApplication("org.onosproject.cordvtn"); 124 + appId = coreService.registerApplication("org.onosproject.cordvtn");
98 - nodeStore = storageService.<DeviceId, OvsdbNode>eventuallyConsistentMapBuilder() 125 + nodeStore = storageService.<DeviceId, OvsdbNode>consistentMapBuilder()
126 + .withSerializer(Serializer.using(NODE_SERIALIZER.build()))
99 .withName("cordvtn-nodestore") 127 .withName("cordvtn-nodestore")
100 - .withSerializer(NODE_SERIALIZER) 128 + .withApplicationId(appId)
101 - .withTimestampProvider((k, v) -> clockService.getTimestamp())
102 .build(); 129 .build();
103 130
104 deviceService.addListener(deviceListener); 131 deviceService.addListener(deviceListener);
...@@ -113,43 +140,59 @@ public class CordVtn implements CordVtnService { ...@@ -113,43 +140,59 @@ public class CordVtn implements CordVtnService {
113 hostService.removeListener(hostListener); 140 hostService.removeListener(hostListener);
114 141
115 eventExecutor.shutdown(); 142 eventExecutor.shutdown();
116 - nodeStore.destroy(); 143 + nodeStore.clear();
117 144
118 log.info("Stopped"); 145 log.info("Stopped");
119 } 146 }
120 147
121 @Override 148 @Override
122 - public void addNode(OvsdbNode ovsdbNode) { 149 + public void addNode(OvsdbNode ovsdb) {
123 - if (nodeStore.containsKey(ovsdbNode.deviceId())) { 150 + checkNotNull(ovsdb);
124 - log.warn("Node {} already exists", ovsdbNode.host()); 151 + nodeStore.put(ovsdb.deviceId(), ovsdb);
152 + }
153 +
154 + @Override
155 + public void deleteNode(OvsdbNode ovsdb) {
156 + checkNotNull(ovsdb);
157 +
158 + if (!nodeStore.containsKey(ovsdb.deviceId())) {
125 return; 159 return;
126 } 160 }
127 - nodeStore.put(ovsdbNode.deviceId(), ovsdbNode); 161 +
128 - if (ovsdbNode.state() != INIT) { 162 + // check ovsdb and integration bridge connection state first
129 - updateNode(ovsdbNode, INIT); 163 + if (isNodeConnected(ovsdb)) {
164 + log.warn("Cannot delete connected node {}", ovsdb.host());
165 + } else {
166 + nodeStore.remove(ovsdb.deviceId());
130 } 167 }
131 } 168 }
132 169
133 @Override 170 @Override
134 - public void deleteNode(OvsdbNode ovsdbNode) { 171 + public void connect(OvsdbNode ovsdb) {
135 - if (!nodeStore.containsKey(ovsdbNode.deviceId())) { 172 + checkNotNull(ovsdb);
136 - log.warn("Node {} does not exist", ovsdbNode.host()); 173 +
174 + if (!nodeStore.containsKey(ovsdb.deviceId())) {
175 + log.warn("Node {} does not exist", ovsdb.host());
137 return; 176 return;
138 } 177 }
139 - updateNode(ovsdbNode, DISCONNECT); 178 + controller.connect(ovsdb.ip(), ovsdb.port());
140 } 179 }
141 180
142 @Override 181 @Override
143 - public void updateNode(OvsdbNode ovsdbNode, State state) { 182 + public void disconnect(OvsdbNode ovsdb) {
144 - if (!nodeStore.containsKey(ovsdbNode.deviceId())) { 183 + checkNotNull(ovsdb);
145 - log.warn("Node {} does not exist", ovsdbNode.host()); 184 +
185 + if (!nodeStore.containsKey(ovsdb.deviceId())) {
186 + log.warn("Node {} does not exist", ovsdb.host());
146 return; 187 return;
147 } 188 }
148 - DefaultOvsdbNode updatedNode = new DefaultOvsdbNode(ovsdbNode.host(), 189 +
149 - ovsdbNode.ip(), 190 + OvsdbClientService ovsdbClient = getOvsdbClient(ovsdb);
150 - ovsdbNode.port(), 191 + checkNotNull(ovsdbClient);
151 - state); 192 +
152 - nodeStore.put(ovsdbNode.deviceId(), updatedNode); 193 + if (ovsdbClient.isConnected()) {
194 + ovsdbClient.disconnect();
195 + }
153 } 196 }
154 197
155 @Override 198 @Override
...@@ -159,14 +202,42 @@ public class CordVtn implements CordVtnService { ...@@ -159,14 +202,42 @@ public class CordVtn implements CordVtnService {
159 202
160 @Override 203 @Override
161 public OvsdbNode getNode(DeviceId deviceId) { 204 public OvsdbNode getNode(DeviceId deviceId) {
162 - return nodeStore.get(deviceId); 205 + Versioned<OvsdbNode> ovsdb = nodeStore.get(deviceId);
206 + if (ovsdb != null) {
207 + return ovsdb.value();
208 + } else {
209 + return null;
210 + }
163 } 211 }
164 212
165 @Override 213 @Override
166 public List<OvsdbNode> getNodes() { 214 public List<OvsdbNode> getNodes() {
167 - return nodeStore.values() 215 + List<OvsdbNode> ovsdbs = new ArrayList<>();
168 - .stream() 216 + ovsdbs.addAll(Collections2.transform(nodeStore.values(), Versioned::value));
169 - .collect(Collectors.toList()); 217 + return ovsdbs;
218 + }
219 +
220 + @Override
221 + public boolean isNodeConnected(OvsdbNode ovsdb) {
222 + checkNotNull(ovsdb);
223 +
224 + OvsdbClientService ovsdbClient = getOvsdbClient(ovsdb);
225 + if (ovsdbClient == null) {
226 + return false;
227 + } else {
228 + return ovsdbClient.isConnected();
229 + }
230 + }
231 +
232 + private OvsdbClientService getOvsdbClient(OvsdbNode ovsdb) {
233 + checkNotNull(ovsdb);
234 +
235 + OvsdbClientService ovsdbClient = controller.getOvsdbClient(
236 + new OvsdbNodeId(ovsdb.ip(), ovsdb.port().toInt()));
237 + if (ovsdbClient == null) {
238 + log.warn("Couldn't find ovsdb client of node {}", ovsdb.host());
239 + }
240 + return ovsdbClient;
170 } 241 }
171 242
172 private class InternalDeviceListener implements DeviceListener { 243 private class InternalDeviceListener implements DeviceListener {
...@@ -182,6 +253,7 @@ public class CordVtn implements CordVtnService { ...@@ -182,6 +253,7 @@ public class CordVtn implements CordVtnService {
182 break; 253 break;
183 case DEVICE_AVAILABILITY_CHANGED: 254 case DEVICE_AVAILABILITY_CHANGED:
184 eventExecutor.submit(() -> handler.disconnected(device)); 255 eventExecutor.submit(() -> handler.disconnected(device));
256 + // TODO handle the case that the device is recovered
185 break; 257 break;
186 default: 258 default:
187 break; 259 break;
...@@ -212,14 +284,27 @@ public class CordVtn implements CordVtnService { ...@@ -212,14 +284,27 @@ public class CordVtn implements CordVtnService {
212 284
213 @Override 285 @Override
214 public void connected(Device device) { 286 public void connected(Device device) {
215 - // create bridge and set bridgeId 287 + log.info("Ovsdb {} is connected", device.id());
216 - // set node state connected 288 +
289 + if (!mastershipService.isLocalMaster(device.id())) {
290 + return;
291 + }
292 +
293 + // TODO change to use bridge config
294 + OvsdbNode ovsdb = getNode(device.id());
295 + OvsdbClientService ovsdbClient = getOvsdbClient(ovsdb);
296 +
297 + List<ControllerInfo> controllers = new ArrayList<>();
298 + Sets.newHashSet(clusterService.getNodes()).forEach(controller ->
299 + controllers.add(new ControllerInfo(controller.ip(), OFPORT, "tcp")));
300 + String dpid = ovsdb.intBrId().toString().substring(DPID_BEGIN);
301 +
302 + ovsdbClient.createBridge(DEFAULT_BRIDGE_NAME, dpid, controllers);
217 } 303 }
218 304
219 @Override 305 @Override
220 public void disconnected(Device device) { 306 public void disconnected(Device device) {
221 - // set node state disconnected if the node exists 307 + log.warn("Ovsdb {} is disconnected", device.id());
222 - // which means that the node is not deleted explicitly
223 } 308 }
224 } 309 }
225 310
...@@ -227,12 +312,29 @@ public class CordVtn implements CordVtnService { ...@@ -227,12 +312,29 @@ public class CordVtn implements CordVtnService {
227 312
228 @Override 313 @Override
229 public void connected(Device device) { 314 public void connected(Device device) {
230 - // create vxlan port 315 + log.info("Integration Bridge {} is detected", device.id());
316 +
317 + OvsdbNode ovsdb = getNodes().stream()
318 + .filter(node -> node.intBrId().equals(device.id()))
319 + .findFirst().get();
320 +
321 + if (ovsdb == null) {
322 + log.warn("Couldn't find OVSDB associated with {}", device.id());
323 + return;
324 + }
325 +
326 + if (!mastershipService.isLocalMaster(ovsdb.deviceId())) {
327 + return;
328 + }
329 +
330 + // TODO change to use tunnel config and tunnel description
331 + OvsdbClientService ovsdbClient = getOvsdbClient(ovsdb);
332 + ovsdbClient.createTunnel(DEFAULT_BRIDGE_NAME, "vxlan", "vxlan", VXLAN_OPTIONS);
231 } 333 }
232 334
233 @Override 335 @Override
234 public void disconnected(Device device) { 336 public void disconnected(Device device) {
235 - 337 + log.info("Integration Bridge {} is vanished", device.id());
236 } 338 }
237 } 339 }
238 340
...@@ -240,12 +342,12 @@ public class CordVtn implements CordVtnService { ...@@ -240,12 +342,12 @@ public class CordVtn implements CordVtnService {
240 342
241 @Override 343 @Override
242 public void connected(Host host) { 344 public void connected(Host host) {
243 - // install flow rules for this vm 345 + log.info("VM {} is detected", host.id());
244 } 346 }
245 347
246 @Override 348 @Override
247 public void disconnected(Host host) { 349 public void disconnected(Host host) {
248 - // uninstall flow rules associated with this vm 350 + log.info("VM {} is vanished", host.id());
249 } 351 }
250 } 352 }
251 } 353 }
......
...@@ -20,6 +20,7 @@ import com.google.common.collect.Sets; ...@@ -20,6 +20,7 @@ import com.google.common.collect.Sets;
20 import org.onlab.packet.IpAddress; 20 import org.onlab.packet.IpAddress;
21 import org.onlab.packet.TpPort; 21 import org.onlab.packet.TpPort;
22 import org.onosproject.core.ApplicationId; 22 import org.onosproject.core.ApplicationId;
23 +import org.onosproject.net.DeviceId;
23 import org.onosproject.net.config.Config; 24 import org.onosproject.net.config.Config;
24 25
25 import java.util.Set; 26 import java.util.Set;
...@@ -35,6 +36,7 @@ public class CordVtnConfig extends Config<ApplicationId> { ...@@ -35,6 +36,7 @@ public class CordVtnConfig extends Config<ApplicationId> {
35 public static final String HOST = "host"; 36 public static final String HOST = "host";
36 public static final String IP = "ip"; 37 public static final String IP = "ip";
37 public static final String PORT = "port"; 38 public static final String PORT = "port";
39 + public static final String BRIDGE_ID = "bridgeId";
38 40
39 /** 41 /**
40 * Returns the set of ovsdb nodes read from network config. 42 * Returns the set of ovsdb nodes read from network config.
...@@ -51,7 +53,8 @@ public class CordVtnConfig extends Config<ApplicationId> { ...@@ -51,7 +53,8 @@ public class CordVtnConfig extends Config<ApplicationId> {
51 nodes.forEach(jsonNode -> ovsdbNodes.add(new OvsdbNodeConfig( 53 nodes.forEach(jsonNode -> ovsdbNodes.add(new OvsdbNodeConfig(
52 jsonNode.path(HOST).asText(), 54 jsonNode.path(HOST).asText(),
53 IpAddress.valueOf(jsonNode.path(IP).asText()), 55 IpAddress.valueOf(jsonNode.path(IP).asText()),
54 - TpPort.tpPort(jsonNode.path(PORT).asInt())))); 56 + TpPort.tpPort(jsonNode.path(PORT).asInt()),
57 + DeviceId.deviceId(jsonNode.path(BRIDGE_ID).asText()))));
55 58
56 return ovsdbNodes; 59 return ovsdbNodes;
57 } 60 }
...@@ -64,11 +67,13 @@ public class CordVtnConfig extends Config<ApplicationId> { ...@@ -64,11 +67,13 @@ public class CordVtnConfig extends Config<ApplicationId> {
64 private final String host; 67 private final String host;
65 private final IpAddress ip; 68 private final IpAddress ip;
66 private final TpPort port; 69 private final TpPort port;
70 + private final DeviceId bridgeId;
67 71
68 - public OvsdbNodeConfig(String host, IpAddress ip, TpPort port) { 72 + public OvsdbNodeConfig(String host, IpAddress ip, TpPort port, DeviceId bridgeId) {
69 this.host = checkNotNull(host); 73 this.host = checkNotNull(host);
70 this.ip = checkNotNull(ip); 74 this.ip = checkNotNull(ip);
71 this.port = checkNotNull(port); 75 this.port = checkNotNull(port);
76 + this.bridgeId = checkNotNull(bridgeId);
72 } 77 }
73 78
74 /** 79 /**
...@@ -97,5 +102,9 @@ public class CordVtnConfig extends Config<ApplicationId> { ...@@ -97,5 +102,9 @@ public class CordVtnConfig extends Config<ApplicationId> {
97 public TpPort port() { 102 public TpPort port() {
98 return this.port; 103 return this.port;
99 } 104 }
105 +
106 + public DeviceId bridgeId() {
107 + return this.bridgeId;
108 + }
100 } 109 }
101 } 110 }
......
...@@ -20,11 +20,6 @@ import org.apache.felix.scr.annotations.Component; ...@@ -20,11 +20,6 @@ import org.apache.felix.scr.annotations.Component;
20 import org.apache.felix.scr.annotations.Deactivate; 20 import org.apache.felix.scr.annotations.Deactivate;
21 import org.apache.felix.scr.annotations.Reference; 21 import org.apache.felix.scr.annotations.Reference;
22 import org.apache.felix.scr.annotations.ReferenceCardinality; 22 import org.apache.felix.scr.annotations.ReferenceCardinality;
23 -import org.onosproject.cluster.ClusterService;
24 -import org.onosproject.cluster.LeadershipEvent;
25 -import org.onosproject.cluster.LeadershipEventListener;
26 -import org.onosproject.cluster.LeadershipService;
27 -import org.onosproject.cluster.NodeId;
28 import org.onosproject.core.ApplicationId; 23 import org.onosproject.core.ApplicationId;
29 import org.onosproject.core.CoreService; 24 import org.onosproject.core.CoreService;
30 import org.onosproject.net.config.ConfigFactory; 25 import org.onosproject.net.config.ConfigFactory;
...@@ -35,7 +30,6 @@ import org.onosproject.net.config.NetworkConfigService; ...@@ -35,7 +30,6 @@ import org.onosproject.net.config.NetworkConfigService;
35 import org.onosproject.net.config.basics.SubjectFactories; 30 import org.onosproject.net.config.basics.SubjectFactories;
36 import org.slf4j.Logger; 31 import org.slf4j.Logger;
37 32
38 -import static org.onosproject.cordvtn.OvsdbNode.State.INIT;
39 import static org.slf4j.LoggerFactory.getLogger; 33 import static org.slf4j.LoggerFactory.getLogger;
40 34
41 /** 35 /**
...@@ -58,12 +52,6 @@ public class CordVtnConfigManager { ...@@ -58,12 +52,6 @@ public class CordVtnConfigManager {
58 protected NetworkConfigService configService; 52 protected NetworkConfigService configService;
59 53
60 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 54 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
61 - protected LeadershipService leadershipService;
62 -
63 - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
64 - protected ClusterService clusterService;
65 -
66 - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
67 protected CordVtnService cordVtnService; 55 protected CordVtnService cordVtnService;
68 56
69 private final ConfigFactory configFactory = 57 private final ConfigFactory configFactory =
...@@ -74,29 +62,22 @@ public class CordVtnConfigManager { ...@@ -74,29 +62,22 @@ public class CordVtnConfigManager {
74 } 62 }
75 }; 63 };
76 64
77 - private final LeadershipEventListener leadershipListener = new InternalLeadershipListener();
78 private final NetworkConfigListener configListener = new InternalConfigListener(); 65 private final NetworkConfigListener configListener = new InternalConfigListener();
79 66
80 - private NodeId local;
81 private ApplicationId appId; 67 private ApplicationId appId;
82 68
83 @Activate 69 @Activate
84 protected void active() { 70 protected void active() {
85 - local = clusterService.getLocalNode().id();
86 appId = coreService.getAppId(CordVtnService.CORDVTN_APP_ID); 71 appId = coreService.getAppId(CordVtnService.CORDVTN_APP_ID);
87 72
88 configService.addListener(configListener); 73 configService.addListener(configListener);
89 configRegistry.registerConfigFactory(configFactory); 74 configRegistry.registerConfigFactory(configFactory);
90 75
91 - leadershipService.addListener(leadershipListener); 76 + readConfiguration();
92 - leadershipService.runForLeadership(CordVtnService.CORDVTN_APP_ID);
93 } 77 }
94 78
95 @Deactivate 79 @Deactivate
96 protected void deactivate() { 80 protected void deactivate() {
97 - leadershipService.removeListener(leadershipListener);
98 - leadershipService.withdraw(appId.name());
99 -
100 configRegistry.unregisterConfigFactory(configFactory); 81 configRegistry.unregisterConfigFactory(configFactory);
101 configService.removeListener(configListener); 82 configService.removeListener(configListener);
102 } 83 }
...@@ -110,30 +91,13 @@ public class CordVtnConfigManager { ...@@ -110,30 +91,13 @@ public class CordVtnConfigManager {
110 } 91 }
111 92
112 config.ovsdbNodes().forEach(node -> { 93 config.ovsdbNodes().forEach(node -> {
113 - DefaultOvsdbNode ovsdbNode = 94 + DefaultOvsdbNode ovsdb = new DefaultOvsdbNode(
114 - new DefaultOvsdbNode(node.host(), node.ip(), node.port(), INIT); 95 + node.host(), node.ip(), node.port(), node.bridgeId());
115 - cordVtnService.addNode(ovsdbNode); 96 + cordVtnService.addNode(ovsdb);
116 - log.info("Add new node {}", node.host()); 97 + cordVtnService.connect(ovsdb);
117 }); 98 });
118 } 99 }
119 100
120 - private synchronized void processLeadershipChange(NodeId leader) {
121 - if (leader == null || !leader.equals(local)) {
122 - return;
123 - }
124 - readConfiguration();
125 - }
126 -
127 - private class InternalLeadershipListener implements LeadershipEventListener {
128 -
129 - @Override
130 - public void event(LeadershipEvent event) {
131 - if (event.subject().topic().equals(appId.name())) {
132 - processLeadershipChange(event.subject().leader());
133 - }
134 - }
135 - }
136 -
137 private class InternalConfigListener implements NetworkConfigListener { 101 private class InternalConfigListener implements NetworkConfigListener {
138 102
139 @Override 103 @Override
......
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
15 */ 15 */
16 package org.onosproject.cordvtn; 16 package org.onosproject.cordvtn;
17 17
18 -import org.onosproject.cordvtn.OvsdbNode.State;
19 import org.onosproject.net.DeviceId; 18 import org.onosproject.net.DeviceId;
20 19
21 import java.util.List; 20 import java.util.List;
...@@ -29,25 +28,30 @@ public interface CordVtnService { ...@@ -29,25 +28,30 @@ public interface CordVtnService {
29 /** 28 /**
30 * Adds a new node to the service. 29 * Adds a new node to the service.
31 * 30 *
32 - * @param ovsdbNode ovsdb node 31 + * @param ovsdb ovsdb node
33 */ 32 */
34 - void addNode(OvsdbNode ovsdbNode); 33 + void addNode(OvsdbNode ovsdb);
35 34
36 /** 35 /**
37 * Deletes a node from the service. 36 * Deletes a node from the service.
38 * 37 *
39 - * @param ovsdbNode ovsdb node 38 + * @param ovsdb ovsdb node
40 */ 39 */
41 - void deleteNode(OvsdbNode ovsdbNode); 40 + void deleteNode(OvsdbNode ovsdb);
42 41
43 /** 42 /**
44 - * Updates ovsdb node. 43 + * Connect to a node.
45 - * It only used for updating node's connection state.
46 * 44 *
47 - * @param ovsdbNode ovsdb node 45 + * @param ovsdb ovsdb node
48 - * @param state ovsdb connection state
49 */ 46 */
50 - void updateNode(OvsdbNode ovsdbNode, State state); 47 + void connect(OvsdbNode ovsdb);
48 +
49 + /**
50 + * Disconnect a node.
51 + *
52 + * @param ovsdb ovsdb node
53 + */
54 + void disconnect(OvsdbNode ovsdb);
51 55
52 /** 56 /**
53 * Returns the number of the nodes known to the service. 57 * Returns the number of the nodes known to the service.
...@@ -65,6 +69,14 @@ public interface CordVtnService { ...@@ -65,6 +69,14 @@ public interface CordVtnService {
65 OvsdbNode getNode(DeviceId deviceId); 69 OvsdbNode getNode(DeviceId deviceId);
66 70
67 /** 71 /**
72 + * Returns connection state of the node.
73 + *
74 + * @param ovsdb ovsdb node
75 + * @return true if the node is connected, false otherwise
76 + */
77 + boolean isNodeConnected(OvsdbNode ovsdb);
78 +
79 + /**
68 * Returns all nodes known to the service. 80 * Returns all nodes known to the service.
69 * 81 *
70 * @return list of nodes 82 * @return list of nodes
......
...@@ -30,13 +30,13 @@ public class DefaultOvsdbNode implements OvsdbNode { ...@@ -30,13 +30,13 @@ public class DefaultOvsdbNode implements OvsdbNode {
30 private final String host; 30 private final String host;
31 private final IpAddress ip; 31 private final IpAddress ip;
32 private final TpPort port; 32 private final TpPort port;
33 - private final State state; 33 + private final DeviceId brId;
34 34
35 - public DefaultOvsdbNode(String host, IpAddress ip, TpPort port, State state) { 35 + public DefaultOvsdbNode(String host, IpAddress ip, TpPort port, DeviceId brId) {
36 this.host = host; 36 this.host = host;
37 this.ip = ip; 37 this.ip = ip;
38 this.port = port; 38 this.port = port;
39 - this.state = state; 39 + this.brId = brId;
40 } 40 }
41 41
42 @Override 42 @Override
...@@ -55,8 +55,8 @@ public class DefaultOvsdbNode implements OvsdbNode { ...@@ -55,8 +55,8 @@ public class DefaultOvsdbNode implements OvsdbNode {
55 } 55 }
56 56
57 @Override 57 @Override
58 - public State state() { 58 + public DeviceId intBrId() {
59 - return this.state; 59 + return this.brId;
60 } 60 }
61 61
62 @Override 62 @Override
...@@ -65,11 +65,6 @@ public class DefaultOvsdbNode implements OvsdbNode { ...@@ -65,11 +65,6 @@ public class DefaultOvsdbNode implements OvsdbNode {
65 } 65 }
66 66
67 @Override 67 @Override
68 - public DeviceId intBrId() {
69 - return DeviceId.deviceId("of:" + this.host);
70 - }
71 -
72 - @Override
73 public boolean equals(Object o) { 68 public boolean equals(Object o) {
74 if (this == o) { 69 if (this == o) {
75 return true; 70 return true;
...@@ -79,7 +74,8 @@ public class DefaultOvsdbNode implements OvsdbNode { ...@@ -79,7 +74,8 @@ public class DefaultOvsdbNode implements OvsdbNode {
79 DefaultOvsdbNode that = (DefaultOvsdbNode) o; 74 DefaultOvsdbNode that = (DefaultOvsdbNode) o;
80 if (this.host.equals(that.host) && 75 if (this.host.equals(that.host) &&
81 this.ip.equals(that.ip) && 76 this.ip.equals(that.ip) &&
82 - this.port.equals(that.port)) { 77 + this.port.equals(that.port) &&
78 + this.brId.equals(that.brId)) {
83 return true; 79 return true;
84 } 80 }
85 } 81 }
...@@ -97,7 +93,7 @@ public class DefaultOvsdbNode implements OvsdbNode { ...@@ -97,7 +93,7 @@ public class DefaultOvsdbNode implements OvsdbNode {
97 .add("host", host) 93 .add("host", host)
98 .add("ip", ip) 94 .add("ip", ip)
99 .add("port", port) 95 .add("port", port)
100 - .add("state", state) 96 + .add("bridgeId", brId)
101 .toString(); 97 .toString();
102 } 98 }
103 } 99 }
......
1 -/*
2 - * Copyright 2014-2015 Open Networking Laboratory
3 - *
4 - * Licensed under the Apache License, Version 2.0 (the "License");
5 - * you may not use this file except in compliance with the License.
6 - * You may obtain a copy of the License at
7 - *
8 - * http://www.apache.org/licenses/LICENSE-2.0
9 - *
10 - * Unless required by applicable law or agreed to in writing, software
11 - * distributed under the License is distributed on an "AS IS" BASIS,
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 - * See the License for the specific language governing permissions and
14 - * limitations under the License.
15 - */
16 -package org.onosproject.cordvtn;
17 -
18 -import org.apache.felix.scr.annotations.Activate;
19 -import org.apache.felix.scr.annotations.Component;
20 -import org.apache.felix.scr.annotations.Deactivate;
21 -import org.apache.felix.scr.annotations.Reference;
22 -import org.apache.felix.scr.annotations.ReferenceCardinality;
23 -import org.onosproject.cluster.ClusterService;
24 -import org.onosproject.cluster.LeadershipService;
25 -import org.onosproject.cluster.NodeId;
26 -import org.onosproject.mastership.MastershipService;
27 -import org.onosproject.net.Device;
28 -import org.onosproject.net.device.DeviceEvent;
29 -import org.onosproject.net.device.DeviceListener;
30 -import org.onosproject.net.device.DeviceService;
31 -import org.slf4j.Logger;
32 -
33 -import java.util.concurrent.Executors;
34 -import java.util.concurrent.ScheduledExecutorService;
35 -import java.util.concurrent.TimeUnit;
36 -
37 -import static org.onlab.util.Tools.groupedThreads;
38 -import static org.onosproject.cordvtn.OvsdbNode.State.CONNECTED;
39 -import static org.onosproject.cordvtn.OvsdbNode.State.DISCONNECTED;
40 -import static org.onosproject.cordvtn.OvsdbNode.State.READY;
41 -import static org.slf4j.LoggerFactory.getLogger;
42 -
43 -/**
44 - * Provides the connection state management of all nodes registered to the service
45 - * so that the nodes keep connected unless it is requested to be deleted.
46 - */
47 -@Component(immediate = true)
48 -public class NodeConnectionManager {
49 - protected final Logger log = getLogger(getClass());
50 -
51 - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
52 - MastershipService mastershipService;
53 -
54 - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
55 - LeadershipService leadershipService;
56 -
57 - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
58 - ClusterService clusterService;
59 -
60 - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
61 - DeviceService deviceService;
62 -
63 - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
64 - CordVtnService cordVtnService;
65 -
66 - private static final int DELAY_SEC = 5;
67 -
68 - private final DeviceListener deviceListener = new InternalDeviceListener();
69 - private final ScheduledExecutorService connectionExecutor = Executors
70 - .newSingleThreadScheduledExecutor(groupedThreads("onos/cordvtn", "connection-manager"));
71 -
72 - private NodeId localId;
73 -
74 - @Activate
75 - protected void activate() {
76 - localId = clusterService.getLocalNode().id();
77 - deviceService.addListener(deviceListener);
78 -
79 - connectionExecutor.scheduleWithFixedDelay(() -> cordVtnService.getNodes()
80 - .stream()
81 - .filter(node -> localId.equals(getMaster(node)))
82 - .forEach(node -> {
83 - connect(node);
84 - disconnect(node);
85 - }), 0, DELAY_SEC, TimeUnit.SECONDS);
86 - }
87 -
88 - @Deactivate
89 - public void stop() {
90 - connectionExecutor.shutdown();
91 - deviceService.removeListener(deviceListener);
92 - }
93 -
94 - public void connect(OvsdbNode ovsdbNode) {
95 - switch (ovsdbNode.state()) {
96 - case INIT:
97 - case DISCONNECTED:
98 - setPassiveMode(ovsdbNode);
99 - case READY:
100 - setupConnection(ovsdbNode);
101 - break;
102 - default:
103 - break;
104 - }
105 - }
106 -
107 - public void disconnect(OvsdbNode ovsdbNode) {
108 - switch (ovsdbNode.state()) {
109 - case DISCONNECT:
110 - // TODO: disconnect
111 - break;
112 - default:
113 - break;
114 - }
115 - }
116 -
117 - private class InternalDeviceListener implements DeviceListener {
118 -
119 - @Override
120 - public void event(DeviceEvent event) {
121 - Device device = event.subject();
122 - if (device.type() != Device.Type.CONTROLLER) {
123 - return;
124 - }
125 -
126 - DefaultOvsdbNode node;
127 - switch (event.type()) {
128 - case DEVICE_ADDED:
129 - node = (DefaultOvsdbNode) cordVtnService.getNode(device.id());
130 - if (node != null) {
131 - cordVtnService.updateNode(node, CONNECTED);
132 - }
133 - break;
134 - case DEVICE_AVAILABILITY_CHANGED:
135 - node = (DefaultOvsdbNode) cordVtnService.getNode(device.id());
136 - if (node != null) {
137 - cordVtnService.updateNode(node, DISCONNECTED);
138 - }
139 - break;
140 - default:
141 - break;
142 - }
143 - }
144 - }
145 -
146 - private NodeId getMaster(OvsdbNode ovsdbNode) {
147 - NodeId master = mastershipService.getMasterFor(ovsdbNode.intBrId());
148 -
149 - // master is null if there's no such device
150 - if (master == null) {
151 - master = leadershipService.getLeader(CordVtnService.CORDVTN_APP_ID);
152 - }
153 - return master;
154 - }
155 -
156 - private void setPassiveMode(OvsdbNode ovsdbNode) {
157 - // TODO: need ovsdb client implementation first
158 - // TODO: set the remove ovsdb server passive mode
159 - cordVtnService.updateNode(ovsdbNode, READY);
160 - }
161 -
162 - private void setupConnection(OvsdbNode ovsdbNode) {
163 - // TODO initiate connection
164 - }
165 -}
...@@ -23,12 +23,6 @@ import org.onosproject.net.DeviceId; ...@@ -23,12 +23,6 @@ import org.onosproject.net.DeviceId;
23 * Representation of a node with ovsdb server. 23 * Representation of a node with ovsdb server.
24 */ 24 */
25 public interface OvsdbNode { 25 public interface OvsdbNode {
26 - /**
27 - * Ovsdb connection state.
28 - */
29 - enum State {
30 - INIT, READY, CONNECTED, DISCONNECT, DISCONNECTED
31 - }
32 26
33 /** 27 /**
34 * Returns the IP address of the ovsdb server. 28 * Returns the IP address of the ovsdb server.
...@@ -53,13 +47,6 @@ public interface OvsdbNode { ...@@ -53,13 +47,6 @@ public interface OvsdbNode {
53 String host(); 47 String host();
54 48
55 /** 49 /**
56 - * Returns the connection state of the ovsdb server.
57 - *
58 - * @return connection state
59 - */
60 - State state();
61 -
62 - /**
63 * Returns the device id of the ovsdb server. 50 * Returns the device id of the ovsdb server.
64 * 51 *
65 * @return device id 52 * @return device id
......