Hyunsun Moon
Committed by Gerrit Code Review

CORD-160 Add connect and disconnect methods to ovsdb

Change-Id: I66e777f8ec9c5834e27b1dc685fdeb197e30ce0d
...@@ -228,4 +228,9 @@ public interface OvsdbClientService extends OvsdbRPC { ...@@ -228,4 +228,9 @@ public interface OvsdbClientService extends OvsdbRPC {
228 * @return ovsdb ports 228 * @return ovsdb ports
229 */ 229 */
230 Set<OvsdbPort> getLocalPorts(Iterable<String> ifaceids); 230 Set<OvsdbPort> getLocalPorts(Iterable<String> ifaceids);
231 +
232 + /**
233 + * Disconnects the ovsdb server.
234 + */
235 + void disconnect();
231 } 236 }
......
...@@ -15,6 +15,9 @@ ...@@ -15,6 +15,9 @@
15 */ 15 */
16 package org.onosproject.ovsdb.controller; 16 package org.onosproject.ovsdb.controller;
17 17
18 +import org.onlab.packet.IpAddress;
19 +import org.onlab.packet.TpPort;
20 +
18 import java.util.List; 21 import java.util.List;
19 22
20 /** 23 /**
...@@ -65,4 +68,12 @@ public interface OvsdbController { ...@@ -65,4 +68,12 @@ public interface OvsdbController {
65 * @return OvsdbClient ovsdb node information 68 * @return OvsdbClient ovsdb node information
66 */ 69 */
67 OvsdbClientService getOvsdbClient(OvsdbNodeId nodeId); 70 OvsdbClientService getOvsdbClient(OvsdbNodeId nodeId);
71 +
72 + /**
73 + * Connect to the ovsdb server with given ip address and port number.
74 + *
75 + * @param ip ip address
76 + * @param port port number
77 + */
78 + void connect(IpAddress ip, TpPort port);
68 } 79 }
......
...@@ -821,7 +821,7 @@ public class DefaultOvsdbClient ...@@ -821,7 +821,7 @@ public class DefaultOvsdbClient
821 Function<JsonNode, DatabaseSchema> rowFunction = new Function<JsonNode, DatabaseSchema>() { 821 Function<JsonNode, DatabaseSchema> rowFunction = new Function<JsonNode, DatabaseSchema>() {
822 @Override 822 @Override
823 public DatabaseSchema apply(JsonNode input) { 823 public DatabaseSchema apply(JsonNode input) {
824 - log.info("Get ovsdb database schema", dbName); 824 + log.info("Get ovsdb database schema {}", dbName);
825 DatabaseSchema dbSchema = FromJsonUtil 825 DatabaseSchema dbSchema = FromJsonUtil
826 .jsonNodeToDbSchema(dbName, input); 826 .jsonNodeToDbSchema(dbName, input);
827 if (dbSchema == null) { 827 if (dbSchema == null) {
...@@ -1185,4 +1185,10 @@ public class DefaultOvsdbClient ...@@ -1185,4 +1185,10 @@ public class DefaultOvsdbClient
1185 } 1185 }
1186 return ifaceid; 1186 return ifaceid;
1187 } 1187 }
1188 +
1189 + @Override
1190 + public void disconnect() {
1191 + channel.disconnect();
1192 + this.agent.removeConnectedNode(nodeId);
1193 + }
1188 } 1194 }
......
...@@ -15,25 +15,38 @@ ...@@ -15,25 +15,38 @@
15 */ 15 */
16 package org.onosproject.ovsdb.controller.impl; 16 package org.onosproject.ovsdb.controller.impl;
17 17
18 +import io.netty.bootstrap.Bootstrap;
18 import io.netty.bootstrap.ServerBootstrap; 19 import io.netty.bootstrap.ServerBootstrap;
19 import io.netty.buffer.PooledByteBufAllocator; 20 import io.netty.buffer.PooledByteBufAllocator;
20 import io.netty.channel.Channel; 21 import io.netty.channel.Channel;
22 +import io.netty.channel.ChannelDuplexHandler;
21 import io.netty.channel.ChannelFuture; 23 import io.netty.channel.ChannelFuture;
24 +import io.netty.channel.ChannelFutureListener;
25 +import io.netty.channel.ChannelHandlerContext;
22 import io.netty.channel.ChannelInitializer; 26 import io.netty.channel.ChannelInitializer;
23 import io.netty.channel.ChannelOption; 27 import io.netty.channel.ChannelOption;
28 +import io.netty.channel.ChannelPipeline;
29 +import io.netty.channel.EventLoop;
24 import io.netty.channel.EventLoopGroup; 30 import io.netty.channel.EventLoopGroup;
25 import io.netty.channel.ServerChannel; 31 import io.netty.channel.ServerChannel;
26 import io.netty.channel.nio.NioEventLoopGroup; 32 import io.netty.channel.nio.NioEventLoopGroup;
27 import io.netty.channel.socket.SocketChannel; 33 import io.netty.channel.socket.SocketChannel;
28 import io.netty.channel.socket.nio.NioServerSocketChannel; 34 import io.netty.channel.socket.nio.NioServerSocketChannel;
35 +import io.netty.channel.socket.nio.NioSocketChannel;
29 import io.netty.handler.codec.string.StringEncoder; 36 import io.netty.handler.codec.string.StringEncoder;
37 +import io.netty.handler.timeout.IdleState;
38 +import io.netty.handler.timeout.IdleStateEvent;
39 +import io.netty.handler.timeout.IdleStateHandler;
30 import io.netty.util.CharsetUtil; 40 import io.netty.util.CharsetUtil;
31 41
32 import java.net.InetSocketAddress; 42 import java.net.InetSocketAddress;
33 import java.util.concurrent.ExecutorService; 43 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Executors; 44 import java.util.concurrent.Executors;
45 +import java.util.concurrent.TimeUnit;
46 +import java.util.concurrent.atomic.AtomicInteger;
35 47
36 import org.onlab.packet.IpAddress; 48 import org.onlab.packet.IpAddress;
49 +import org.onlab.packet.TpPort;
37 import org.onosproject.ovsdb.controller.OvsdbConstant; 50 import org.onosproject.ovsdb.controller.OvsdbConstant;
38 import org.onosproject.ovsdb.controller.OvsdbNodeId; 51 import org.onosproject.ovsdb.controller.OvsdbNodeId;
39 import org.onosproject.ovsdb.controller.driver.DefaultOvsdbClient; 52 import org.onosproject.ovsdb.controller.driver.DefaultOvsdbClient;
...@@ -63,6 +76,9 @@ public class Controller { ...@@ -63,6 +76,9 @@ public class Controller {
63 private EventLoopGroup workerGroup; 76 private EventLoopGroup workerGroup;
64 private Class<? extends ServerChannel> serverChannelClass; 77 private Class<? extends ServerChannel> serverChannelClass;
65 78
79 + private static final int MAX_RETRY = 5;
80 + private static final int IDLE_TIMEOUT_SEC = 10;
81 +
66 /** 82 /**
67 * Initialization. 83 * Initialization.
68 */ 84 */
...@@ -198,4 +214,86 @@ public class Controller { ...@@ -198,4 +214,86 @@ public class Controller {
198 workerGroup.shutdownGracefully(); 214 workerGroup.shutdownGracefully();
199 bossGroup.shutdownGracefully(); 215 bossGroup.shutdownGracefully();
200 } 216 }
217 +
218 + /**
219 + * Connect to the ovsdb server with given ip address and port number.
220 + *
221 + * @param ip ip address
222 + * @param port port number
223 + */
224 + public void connect(IpAddress ip, TpPort port) {
225 + ChannelFutureListener listener = new ConnectionListener(this, ip, port);
226 + connectRetry(ip, port, listener);
227 + }
228 +
229 + private void connectRetry(IpAddress ip, TpPort port, ChannelFutureListener listener) {
230 + try {
231 + Bootstrap b = new Bootstrap();
232 + b.group(workerGroup)
233 + .channel(NioSocketChannel.class)
234 + .option(ChannelOption.TCP_NODELAY, true)
235 + .handler(new ChannelInitializer<SocketChannel>() {
236 +
237 + @Override
238 + protected void initChannel(SocketChannel channel) throws Exception {
239 + ChannelPipeline p = channel.pipeline();
240 + p.addLast(new MessageDecoder(),
241 + new StringEncoder(CharsetUtil.UTF_8),
242 + new IdleStateHandler(IDLE_TIMEOUT_SEC, 0, 0),
243 + new ConnectionHandler());
244 + }
245 + });
246 + b.remoteAddress(ip.toString(), port.toInt());
247 + b.connect().addListener(listener);
248 + } catch (Exception e) {
249 + log.warn("Connection to the ovsdb server {}:{} failed", ip.toString(), port.toString());
250 + }
251 + }
252 +
253 + private class ConnectionListener implements ChannelFutureListener {
254 + private Controller controller;
255 + private IpAddress ip;
256 + private TpPort port;
257 + private AtomicInteger count = new AtomicInteger();
258 +
259 + public ConnectionListener(Controller controller,
260 + IpAddress ip,
261 + TpPort port) {
262 + this.controller = controller;
263 + this.ip = ip;
264 + this.port = port;
265 + }
266 +
267 + @Override
268 + public void operationComplete(ChannelFuture channelFuture) throws Exception {
269 + if (!channelFuture.isSuccess()) {
270 + channelFuture.channel().close();
271 +
272 + if (count.incrementAndGet() < MAX_RETRY) {
273 + final EventLoop loop = channelFuture.channel().eventLoop();
274 +
275 + loop.schedule(() -> {
276 + controller.connectRetry(this.ip, this.port, this);
277 + }, 1L, TimeUnit.SECONDS);
278 + } else {
279 + log.info("Connection to the ovsdb {}:{} failed",
280 + this.ip.toString(), this.port.toString());
281 + }
282 + } else {
283 + handleNewNodeConnection(channelFuture.channel());
284 + }
285 + }
286 + }
287 +
288 + private class ConnectionHandler extends ChannelDuplexHandler {
289 +
290 + @Override
291 + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
292 + IdleStateEvent e = (IdleStateEvent) evt;
293 +
294 + if (e.state() == IdleState.READER_IDLE) {
295 + ctx.close();
296 + }
297 + }
298 + }
201 } 299 }
......
...@@ -33,6 +33,7 @@ import org.apache.felix.scr.annotations.Deactivate; ...@@ -33,6 +33,7 @@ import org.apache.felix.scr.annotations.Deactivate;
33 import org.apache.felix.scr.annotations.Service; 33 import org.apache.felix.scr.annotations.Service;
34 import org.onlab.packet.IpAddress; 34 import org.onlab.packet.IpAddress;
35 import org.onlab.packet.MacAddress; 35 import org.onlab.packet.MacAddress;
36 +import org.onlab.packet.TpPort;
36 import org.onosproject.ovsdb.controller.DefaultEventSubject; 37 import org.onosproject.ovsdb.controller.DefaultEventSubject;
37 import org.onosproject.ovsdb.controller.EventSubject; 38 import org.onosproject.ovsdb.controller.EventSubject;
38 import org.onosproject.ovsdb.controller.OvsdbClientService; 39 import org.onosproject.ovsdb.controller.OvsdbClientService;
...@@ -142,6 +143,11 @@ public class OvsdbControllerImpl implements OvsdbController { ...@@ -142,6 +143,11 @@ public class OvsdbControllerImpl implements OvsdbController {
142 return ovsdbClients.get(nodeId); 143 return ovsdbClients.get(nodeId);
143 } 144 }
144 145
146 + @Override
147 + public void connect(IpAddress ip, TpPort port) {
148 + controller.connect(ip, port);
149 + }
150 +
145 /** 151 /**
146 * Implementation of an Ovsdb Agent which is responsible for keeping track 152 * Implementation of an Ovsdb Agent which is responsible for keeping track
147 * of connected node and the state in which they are. 153 * of connected node and the state in which they are.
......
...@@ -89,7 +89,7 @@ public final class OvsdbJsonRpcHandler extends ChannelInboundHandlerAdapter { ...@@ -89,7 +89,7 @@ public final class OvsdbJsonRpcHandler extends ChannelInboundHandlerAdapter {
89 */ 89 */
90 private void processOvsdbMessage(JsonNode jsonNode) { 90 private void processOvsdbMessage(JsonNode jsonNode) {
91 91
92 - log.info("Handle ovsdb message"); 92 + log.debug("Handle ovsdb message");
93 93
94 if (jsonNode.has("result")) { 94 if (jsonNode.has("result")) {
95 95
......
...@@ -27,6 +27,7 @@ import org.junit.After; ...@@ -27,6 +27,7 @@ import org.junit.After;
27 import org.junit.Before; 27 import org.junit.Before;
28 import org.junit.Test; 28 import org.junit.Test;
29 import org.onlab.packet.IpAddress; 29 import org.onlab.packet.IpAddress;
30 +import org.onlab.packet.TpPort;
30 import org.onosproject.net.DeviceId; 31 import org.onosproject.net.DeviceId;
31 import org.onosproject.net.MastershipRole; 32 import org.onosproject.net.MastershipRole;
32 import org.onosproject.net.device.DeviceDescription; 33 import org.onosproject.net.device.DeviceDescription;
...@@ -193,6 +194,10 @@ public class OvsdbDeviceProviderTest { ...@@ -193,6 +194,10 @@ public class OvsdbDeviceProviderTest {
193 return null; 194 return null;
194 } 195 }
195 196
197 + @Override
198 + public void connect(IpAddress ip, TpPort port) {
199 +
200 + }
196 } 201 }
197 202
198 } 203 }
......
...@@ -26,6 +26,7 @@ import org.junit.Before; ...@@ -26,6 +26,7 @@ import org.junit.Before;
26 import org.junit.Test; 26 import org.junit.Test;
27 import org.onlab.packet.IpAddress; 27 import org.onlab.packet.IpAddress;
28 import org.onlab.packet.MacAddress; 28 import org.onlab.packet.MacAddress;
29 +import org.onlab.packet.TpPort;
29 import org.onosproject.net.DeviceId; 30 import org.onosproject.net.DeviceId;
30 import org.onosproject.net.HostId; 31 import org.onosproject.net.HostId;
31 import org.onosproject.net.host.HostDescription; 32 import org.onosproject.net.host.HostDescription;
...@@ -201,5 +202,10 @@ public class OvsdbHostProviderTest { ...@@ -201,5 +202,10 @@ public class OvsdbHostProviderTest {
201 public OvsdbClientService getOvsdbClient(OvsdbNodeId nodeId) { 202 public OvsdbClientService getOvsdbClient(OvsdbNodeId nodeId) {
202 return null; 203 return null;
203 } 204 }
205 +
206 + @Override
207 + public void connect(IpAddress ip, TpPort port) {
208 +
209 + }
204 } 210 }
205 } 211 }
......