Ayaka Koshibe

Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next

Showing 41 changed files with 1115 additions and 539 deletions
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-apps</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>onos-app-mobility</artifactId>
<packaging>bundle</packaging>
<description>ONOS simple Mobility app</description>
</project>
package org.onlab.onos.mobility;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collection;
import java.util.List;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.Host;
import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleService;
import org.onlab.onos.net.flow.criteria.Criteria.EthCriterion;
import org.onlab.onos.net.flow.criteria.Criterion;
import org.onlab.onos.net.flow.criteria.Criterion.Type;
import org.onlab.onos.net.host.HostEvent;
import org.onlab.onos.net.host.HostListener;
import org.onlab.onos.net.host.HostService;
import org.onlab.packet.MacAddress;
import org.slf4j.Logger;
import com.google.common.collect.Lists;
/**
* Sample mobility application. Cleans up flowmods when a host moves.
*/
@Component(immediate = true)
public class HostMobility {
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected HostService hostService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleService flowRuleService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
private ApplicationId appId;
@Activate
public void activate() {
appId = ApplicationId.getAppId();
hostService.addListener(new InternalHostListener());
log.info("Started with Application ID {}", appId.id());
}
@Deactivate
public void deactivate() {
flowRuleService.removeFlowRulesById(appId);
log.info("Stopped");
}
public class InternalHostListener
implements HostListener {
@Override
public void event(HostEvent event) {
switch (event.type()) {
case HOST_ADDED:
case HOST_REMOVED:
case HOST_UPDATED:
// don't care if a host has been added, removed.
break;
case HOST_MOVED:
log.info("Host {} has moved; cleaning up.", event.subject());
cleanup(event.subject());
break;
default:
break;
}
}
/**
* For a given host, remove any flow rule which references it's addresses.
* @param host the host to clean up for
*/
private void cleanup(Host host) {
Iterable<Device> devices = deviceService.getDevices();
List<FlowRule> flowRules = Lists.newLinkedList();
for (Device device : devices) {
flowRules.addAll(cleanupDevice(device, host));
}
FlowRule[] flows = new FlowRule[flowRules.size()];
flows = flowRules.toArray(flows);
flowRuleService.removeFlowRules(flows);
}
private Collection<? extends FlowRule> cleanupDevice(Device device, Host host) {
List<FlowRule> flowRules = Lists.newLinkedList();
MacAddress mac = host.mac();
for (FlowRule rule : flowRuleService.getFlowEntries(device.id())) {
for (Criterion c : rule.selector().criteria()) {
if (c.type() == Type.ETH_DST || c.type() == Type.ETH_SRC) {
EthCriterion eth = (EthCriterion) c;
if (eth.mac().equals(mac)) {
flowRules.add(rule);
}
}
}
}
//TODO: handle ip cleanup
return flowRules;
}
}
}
/**
* Trivial application that provides simple form of reactive forwarding.
*/
package org.onlab.onos.mobility;
......@@ -20,6 +20,7 @@
<module>tvue</module>
<module>fwd</module>
<module>foo</module>
<module>mobility</module>
<module>config</module>
</modules>
......
......@@ -161,7 +161,7 @@ implements FlowRuleService, FlowRuleProviderRegistry {
switch (stored.state()) {
case ADDED:
case PENDING_ADD:
frp.applyFlowRule(flowRule);
frp.applyFlowRule(stored);
break;
case PENDING_REMOVE:
case REMOVED:
......
package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.cluster.DefaultControllerNode;
/**
* Service for administering communications manager.
*/
public interface ClusterCommunicationAdminService {
/**
* Adds the node to the list of monitored nodes.
*
* @param node node to be added
*/
void addNode(DefaultControllerNode node);
/**
* Removes the node from the list of monitored nodes.
*
* @param node node to be removed
*/
void removeNode(DefaultControllerNode node);
/**
* Starts-up the communications engine.
*
* @param localNode local controller node
* @param delegate nodes delegate
*/
void startUp(DefaultControllerNode localNode, ClusterNodesDelegate delegate);
/**
* Clears all nodes and streams as part of leaving the cluster.
*/
void clearAllNodesAndStreams();
}
package org.onlab.onos.store.cluster.impl;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMembershipMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.HelloMessage;
import org.onlab.onos.store.cluster.messaging.LeavingMemberMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
import org.onlab.onos.store.cluster.messaging.NewMemberMessage;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.onlab.packet.IpPrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static java.net.InetAddress.getByAddress;
import static org.onlab.util.Tools.namedThreads;
/**
* Implements the cluster communication services to use by other stores.
*/
@Component(immediate = true)
@Service
public class ClusterCommunicationManager
implements ClusterCommunicationService, ClusterCommunicationAdminService {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final long CONNECTION_CUSTODIAN_DELAY = 100L;
private static final long CONNECTION_CUSTODIAN_FREQUENCY = 2000;
private static final long START_TIMEOUT = 1000;
private static final int WORKERS = 3;
private ClusterConnectionListener connectionListener;
private List<ClusterIOWorker> workers = new ArrayList<>(WORKERS);
private DefaultControllerNode localNode;
private ClusterNodesDelegate nodesDelegate;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected SerializationService serializationService;
// Nodes to be monitored to make sure they have a connection.
private final Set<DefaultControllerNode> nodes = new HashSet<>();
// Means to track message streams to other nodes.
private final Map<NodeId, ClusterMessageStream> streams = new ConcurrentHashMap<>();
// TODO: use something different that won't require synchronization
private Multimap<MessageSubject, MessageSubscriber> subscribers = HashMultimap.create();
// Executor pools for listening and managing connections to other nodes.
private final ExecutorService listenExecutor =
Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen"));
private final ExecutorService commExecutors =
Executors.newFixedThreadPool(WORKERS, namedThreads("onos-comm-cluster"));
private final ExecutorService heartbeatExecutor =
Executors.newSingleThreadExecutor(namedThreads("onos-comm-heartbeat"));
private final Timer timer = new Timer("onos-comm-initiator");
private final TimerTask connectionCustodian = new ConnectionCustodian();
private MembershipSubscriber membershipSubscriber = new MembershipSubscriber();
@Activate
public void activate() {
addSubscriber(MessageSubject.NEW_MEMBER, membershipSubscriber);
addSubscriber(MessageSubject.LEAVING_MEMBER, membershipSubscriber);
log.info("Activated but waiting for delegate");
}
@Deactivate
public void deactivate() {
removeSubscriber(MessageSubject.NEW_MEMBER, membershipSubscriber);
removeSubscriber(MessageSubject.LEAVING_MEMBER, membershipSubscriber);
connectionCustodian.cancel();
if (connectionListener != null) {
connectionListener.shutdown();
for (ClusterIOWorker worker : workers) {
worker.shutdown();
}
}
log.info("Stopped");
}
@Override
public boolean send(ClusterMessage message) {
boolean ok = true;
for (DefaultControllerNode node : nodes) {
if (!node.equals(localNode)) {
ok = send(message, node.id()) && ok;
}
}
return ok;
}
@Override
public boolean send(ClusterMessage message, NodeId toNodeId) {
ClusterMessageStream stream = streams.get(toNodeId);
if (stream != null && !toNodeId.equals(localNode.id())) {
try {
stream.write(message);
return true;
} catch (IOException e) {
log.warn("Unable to send message {} to node {}",
message.subject(), toNodeId);
}
}
return false;
}
@Override
public synchronized void addSubscriber(MessageSubject subject,
MessageSubscriber subscriber) {
subscribers.put(subject, subscriber);
}
@Override
public synchronized void removeSubscriber(MessageSubject subject,
MessageSubscriber subscriber) {
subscribers.remove(subject, subscriber);
}
@Override
public Set<MessageSubscriber> getSubscribers(MessageSubject subject) {
return ImmutableSet.copyOf(subscribers.get(subject));
}
@Override
public void addNode(DefaultControllerNode node) {
nodes.add(node);
}
@Override
public void removeNode(DefaultControllerNode node) {
send(new LeavingMemberMessage(node.id()));
nodes.remove(node);
ClusterMessageStream stream = streams.remove(node.id());
if (stream != null) {
stream.close();
}
}
@Override
public void startUp(DefaultControllerNode localNode,
ClusterNodesDelegate delegate) {
this.localNode = localNode;
this.nodesDelegate = delegate;
startCommunications();
startListening();
startInitiatingConnections();
log.info("Started");
}
@Override
public void clearAllNodesAndStreams() {
nodes.clear();
send(new LeavingMemberMessage(localNode.id()));
for (ClusterMessageStream stream : streams.values()) {
stream.close();
}
streams.clear();
}
/**
* Dispatches the specified message to all subscribers to its subject.
*
* @param message message to dispatch
* @param fromNodeId node from which the message was received
*/
void dispatch(ClusterMessage message, NodeId fromNodeId) {
Set<MessageSubscriber> set = getSubscribers(message.subject());
if (set != null) {
for (MessageSubscriber subscriber : set) {
subscriber.receive(message, fromNodeId);
}
}
}
/**
* Adds the stream associated with the specified node.
*
* @param nodeId newly detected cluster node id
* @param ip node IP listen address
* @param tcpPort node TCP listen port
* @return controller node bound to the stream
*/
DefaultControllerNode addNodeStream(NodeId nodeId, IpPrefix ip, int tcpPort,
ClusterMessageStream stream) {
DefaultControllerNode node = nodesDelegate.nodeDetected(nodeId, ip, tcpPort);
stream.setNode(node);
streams.put(node.id(), stream);
send(new NewMemberMessage(node.id(), node.ip(), node.tcpPort()));
return node;
}
/**
* Removes the stream associated with the specified node.
*
* @param node node whose stream to remove
*/
void removeNodeStream(DefaultControllerNode node) {
nodesDelegate.nodeVanished(node.id());
streams.remove(node.id());
}
/**
* Finds the least utilized IO worker.
*
* @return IO worker
*/
ClusterIOWorker findWorker() {
ClusterIOWorker leastUtilized = null;
int minCount = Integer.MAX_VALUE;
for (ClusterIOWorker worker : workers) {
int count = worker.streamCount();
if (count == 0) {
return worker;
}
if (count < minCount) {
leastUtilized = worker;
minCount = count;
}
}
return leastUtilized;
}
/**
* Kicks off the IO loops and waits for them to startup.
*/
private void startCommunications() {
HelloMessage hello = new HelloMessage(localNode.id(), localNode.ip(),
localNode.tcpPort());
for (int i = 0; i < WORKERS; i++) {
try {
ClusterIOWorker worker =
new ClusterIOWorker(this, serializationService, hello);
workers.add(worker);
commExecutors.execute(worker);
} catch (IOException e) {
log.warn("Unable to start communication worker", e);
}
}
// Wait for the IO loops to start
for (ClusterIOWorker loop : workers) {
if (!loop.awaitStart(START_TIMEOUT)) {
log.warn("Comm loop did not start on-time; moving on...");
}
}
}
/**
* Starts listening for connections from peer cluster members.
*/
private void startListening() {
try {
connectionListener =
new ClusterConnectionListener(this, localNode.ip(), localNode.tcpPort());
listenExecutor.execute(connectionListener);
if (!connectionListener.awaitStart(START_TIMEOUT)) {
log.warn("Listener did not start on-time; moving on...");
}
} catch (IOException e) {
log.error("Unable to listen for cluster connections", e);
}
}
/**
* Attempts to connect to any nodes that do not have an associated connection.
*/
private void startInitiatingConnections() {
timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY,
CONNECTION_CUSTODIAN_FREQUENCY);
}
/**
* Initiates open connection request and registers the pending socket
* channel with the given IO worker.
*
* @param worker loop with which the channel should be registered
* @throws java.io.IOException if the socket could not be open or connected
*/
private void initiateConnection(DefaultControllerNode node,
ClusterIOWorker worker) throws IOException {
SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort());
SocketChannel ch = SocketChannel.open();
ch.configureBlocking(false);
ch.connect(sa);
worker.connectStream(ch);
}
// Sweeps through all controller nodes and attempts to open connection to
// those that presently do not have one.
private class ConnectionCustodian extends TimerTask {
@Override
public void run() {
for (DefaultControllerNode node : nodes) {
if (!node.id().equals(localNode.id()) && !streams.containsKey(node.id())) {
try {
initiateConnection(node, findWorker());
} catch (IOException e) {
log.debug("Unable to connect", e);
}
}
}
}
}
private class MembershipSubscriber implements MessageSubscriber {
@Override
public void receive(ClusterMessage message, NodeId fromNodeId) {
MessageSubject subject = message.subject();
ClusterMembershipMessage cmm = (ClusterMembershipMessage) message;
if (message.subject() == MessageSubject.NEW_MEMBER) {
log.info("Node {} arrived", cmm.nodeId());
nodesDelegate.nodeDetected(cmm.nodeId(), cmm.ipAddress(), cmm.tcpPort());
} else if (subject == MessageSubject.LEAVING_MEMBER) {
log.info("Node {} is leaving", cmm.nodeId());
nodesDelegate.nodeRemoved(cmm.nodeId());
}
}
}
}
......@@ -23,12 +23,12 @@ public class ClusterConnectionListener extends AcceptorLoop {
private static final int SO_SEND_BUFFER_SIZE = COMM_BUFFER_SIZE;
private static final int SO_RCV_BUFFER_SIZE = COMM_BUFFER_SIZE;
private final WorkerFinder workerFinder;
private final ClusterCommunicationManager manager;
ClusterConnectionListener(IpPrefix ip, int tcpPort,
WorkerFinder workerFinder) throws IOException {
ClusterConnectionListener(ClusterCommunicationManager manager,
IpPrefix ip, int tcpPort) throws IOException {
super(SELECT_TIMEOUT, new InetSocketAddress(getByAddress(ip.toOctets()), tcpPort));
this.workerFinder = workerFinder;
this.manager = manager;
}
@Override
......@@ -41,7 +41,7 @@ public class ClusterConnectionListener extends AcceptorLoop {
so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
workerFinder.findWorker().acceptStream(sc);
manager.findWorker().acceptStream(sc);
}
}
......
......@@ -3,8 +3,9 @@ package org.onlab.onos.store.cluster.impl;
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageStream;
import org.onlab.onos.store.cluster.messaging.HelloMessage;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -29,27 +30,23 @@ public class ClusterIOWorker extends
private static final long SELECT_TIMEOUT = 50;
private final ConnectionManager connectionManager;
private final CommunicationsDelegate commsDelegate;
private final ClusterCommunicationManager manager;
private final SerializationService serializationService;
private final ClusterMessage helloMessage;
/**
* Creates a new cluster IO worker.
*
* @param connectionManager parent connection manager
* @param commsDelegate communications delegate for dispatching
* @param manager parent comms manager
* @param serializationService serialization service for encode/decode
* @param helloMessage hello message for greeting peers
* @throws IOException if errors occur during IO loop ignition
*/
ClusterIOWorker(ConnectionManager connectionManager,
CommunicationsDelegate commsDelegate,
ClusterIOWorker(ClusterCommunicationManager manager,
SerializationService serializationService,
ClusterMessage helloMessage) throws IOException {
super(SELECT_TIMEOUT);
this.connectionManager = connectionManager;
this.commsDelegate = commsDelegate;
this.manager = manager;
this.serializationService = serializationService;
this.helloMessage = helloMessage;
}
......@@ -61,11 +58,27 @@ public class ClusterIOWorker extends
@Override
protected void processMessages(List<ClusterMessage> messages, MessageStream<ClusterMessage> stream) {
NodeId nodeId = getNodeId(messages, (ClusterMessageStream) stream);
for (ClusterMessage message : messages) {
commsDelegate.dispatch(message);
manager.dispatch(message, nodeId);
}
}
// Retrieves the node from the stream. If one is not bound, it attempts
// to bind it using the knowledge that the first message must be a hello.
private NodeId getNodeId(List<ClusterMessage> messages, ClusterMessageStream stream) {
DefaultControllerNode node = stream.node();
if (node == null && !messages.isEmpty()) {
ClusterMessage firstMessage = messages.get(0);
if (firstMessage instanceof HelloMessage) {
HelloMessage hello = (HelloMessage) firstMessage;
node = manager.addNodeStream(hello.nodeId(), hello.ipAddress(),
hello.tcpPort(), stream);
}
}
return node != null ? node.id() : null;
}
@Override
public ClusterMessageStream acceptStream(SocketChannel channel) {
ClusterMessageStream stream = super.acceptStream(channel);
......@@ -99,7 +112,7 @@ public class ClusterIOWorker extends
DefaultControllerNode node = ((ClusterMessageStream) stream).node();
if (node != null) {
log.info("Closed connection to node {}", node.id());
connectionManager.removeNodeStream(node);
manager.removeNodeStream(node);
}
super.removeStream(stream);
}
......
package org.onlab.onos.store.cluster.messaging;
package org.onlab.onos.store.cluster.impl;
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
......
package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.packet.IpPrefix;
/**
* Simple back interface through which connection manager can interact with
......@@ -9,17 +11,27 @@ import org.onlab.onos.cluster.DefaultControllerNode;
public interface ClusterNodesDelegate {
/**
* Notifies about a new cluster node being detected.
* Notifies about cluster node coming online.
*
* @param node newly detected cluster node
* @param nodeId newly detected cluster node id
* @param ip node IP listen address
* @param tcpPort node TCP listen port
* @return the controller node
*/
void nodeDetected(DefaultControllerNode node);
DefaultControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort);
/**
* Notifies about cluster node going offline.
*
* @param node cluster node that vanished
* @param nodeId identifier of the cluster node that vanished
*/
void nodeVanished(DefaultControllerNode node);
void nodeVanished(NodeId nodeId);
/**
* Notifies about remote request to remove node from cluster.
*
* @param nodeId identifier of the cluster node that was removed
*/
void nodeRemoved(NodeId nodeId);
}
......
package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
/**
* Simple back interface for interacting with the communications service.
*/
public interface CommunicationsDelegate {
/**
* Dispatches the specified message to all registered subscribers.
*
* @param message message to be dispatched
*/
void dispatch(ClusterMessage message);
/**
* Sets the sender.
*
* @param messageSender message sender
*/
void setSender(MessageSender messageSender);
}
package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageStream;
import org.onlab.onos.store.cluster.messaging.HelloMessage;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static java.net.InetAddress.getByAddress;
import static org.onlab.util.Tools.namedThreads;
/**
* Manages connections to other controller cluster nodes.
*/
public class ConnectionManager implements MessageSender {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final long CONNECTION_CUSTODIAN_DELAY = 1000L;
private static final long CONNECTION_CUSTODIAN_FREQUENCY = 5000;
private static final long START_TIMEOUT = 1000;
private static final int WORKERS = 3;
private ClusterConnectionListener connectionListener;
private List<ClusterIOWorker> workers = new ArrayList<>(WORKERS);
private final DefaultControllerNode localNode;
private final ClusterNodesDelegate nodesDelegate;
private final CommunicationsDelegate commsDelegate;
private final SerializationService serializationService;
// Nodes to be monitored to make sure they have a connection.
private final Set<DefaultControllerNode> nodes = new HashSet<>();
// Means to track message streams to other nodes.
private final Map<NodeId, ClusterMessageStream> streams = new ConcurrentHashMap<>();
// Executor pools for listening and managing connections to other nodes.
private final ExecutorService listenExecutor =
Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen"));
private final ExecutorService commExecutors =
Executors.newFixedThreadPool(WORKERS, namedThreads("onos-comm-cluster"));
private final ExecutorService heartbeatExecutor =
Executors.newSingleThreadExecutor(namedThreads("onos-comm-heartbeat"));
private final Timer timer = new Timer("onos-comm-initiator");
private final TimerTask connectionCustodian = new ConnectionCustodian();
private final WorkerFinder workerFinder = new LeastUtilitiedWorkerFinder();
/**
* Creates a new connection manager.
*/
ConnectionManager(DefaultControllerNode localNode,
ClusterNodesDelegate nodesDelegate,
CommunicationsDelegate commsDelegate,
SerializationService serializationService) {
this.localNode = localNode;
this.nodesDelegate = nodesDelegate;
this.commsDelegate = commsDelegate;
this.serializationService = serializationService;
commsDelegate.setSender(this);
startCommunications();
startListening();
startInitiating();
log.info("Started");
}
/**
* Shuts down the connection manager.
*/
void shutdown() {
connectionListener.shutdown();
for (ClusterIOWorker worker : workers) {
worker.shutdown();
}
log.info("Stopped");
}
/**
* Adds the node to the list of monitored nodes.
*
* @param node node to be added
*/
void addNode(DefaultControllerNode node) {
nodes.add(node);
}
/**
* Removes the node from the list of monitored nodes.
*
* @param node node to be removed
*/
void removeNode(DefaultControllerNode node) {
nodes.remove(node);
ClusterMessageStream stream = streams.remove(node.id());
if (stream != null) {
stream.close();
}
}
/**
* Removes the stream associated with the specified node.
*
* @param node node whose stream to remove
*/
void removeNodeStream(DefaultControllerNode node) {
nodesDelegate.nodeVanished(node);
streams.remove(node.id());
}
@Override
public boolean send(NodeId nodeId, ClusterMessage message) {
ClusterMessageStream stream = streams.get(nodeId);
if (stream != null) {
try {
stream.write(message);
return true;
} catch (IOException e) {
log.warn("Unable to send a message about {} to node {}",
message.subject(), nodeId);
}
}
return false;
}
/**
* Kicks off the IO loops and waits for them to startup.
*/
private void startCommunications() {
HelloMessage hello = new HelloMessage(localNode.id(), localNode.ip(),
localNode.tcpPort());
for (int i = 0; i < WORKERS; i++) {
try {
ClusterIOWorker worker =
new ClusterIOWorker(this, commsDelegate,
serializationService, hello);
workers.add(worker);
commExecutors.execute(worker);
} catch (IOException e) {
log.warn("Unable to start communication worker", e);
}
}
// Wait for the IO loops to start
for (ClusterIOWorker loop : workers) {
if (!loop.awaitStart(START_TIMEOUT)) {
log.warn("Comm loop did not start on-time; moving on...");
}
}
}
/**
* Starts listening for connections from peer cluster members.
*/
private void startListening() {
try {
connectionListener =
new ClusterConnectionListener(localNode.ip(), localNode.tcpPort(),
workerFinder);
listenExecutor.execute(connectionListener);
if (!connectionListener.awaitStart(START_TIMEOUT)) {
log.warn("Listener did not start on-time; moving on...");
}
} catch (IOException e) {
log.error("Unable to listen for cluster connections", e);
}
}
/**
* Initiates open connection request and registers the pending socket
* channel with the given IO loop.
*
* @param loop loop with which the channel should be registered
* @throws java.io.IOException if the socket could not be open or connected
*/
private void initiateConnection(DefaultControllerNode node,
ClusterIOWorker loop) throws IOException {
SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort());
SocketChannel ch = SocketChannel.open();
ch.configureBlocking(false);
ch.connect(sa);
loop.connectStream(ch);
}
/**
* Attempts to connect to any nodes that do not have an associated connection.
*/
private void startInitiating() {
timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY,
CONNECTION_CUSTODIAN_FREQUENCY);
}
// Sweeps through all controller nodes and attempts to open connection to
// those that presently do not have one.
private class ConnectionCustodian extends TimerTask {
@Override
public void run() {
for (DefaultControllerNode node : nodes) {
if (node != localNode && !streams.containsKey(node.id())) {
try {
initiateConnection(node, workerFinder.findWorker());
} catch (IOException e) {
log.debug("Unable to connect", e);
}
}
}
}
}
// Finds the least utilitied IO loop.
private class LeastUtilitiedWorkerFinder implements WorkerFinder {
@Override
public ClusterIOWorker findWorker() {
ClusterIOWorker leastUtilized = null;
int minCount = Integer.MAX_VALUE;
for (ClusterIOWorker worker : workers) {
int count = worker.streamCount();
if (count == 0) {
return worker;
}
if (count < minCount) {
leastUtilized = worker;
minCount = count;
}
}
return leastUtilized;
}
}
}
......@@ -14,7 +14,6 @@ import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.onlab.packet.IpPrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -43,20 +42,20 @@ public class DistributedClusterStore
private final Map<NodeId, State> states = new ConcurrentHashMap<>();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private CommunicationsDelegate commsDelegate;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private SerializationService serializationService;
private ClusterCommunicationAdminService communicationAdminService;
private final ClusterNodesDelegate nodesDelegate = new InnerNodesDelegate();
private ConnectionManager connectionManager;
@Activate
public void activate() {
loadClusterDefinition();
establishSelfIdentity();
connectionManager = new ConnectionManager(localNode, nodesDelegate,
commsDelegate, serializationService);
// Start-up the comm service and prime it with the loaded nodes.
communicationAdminService.startUp(localNode, nodesDelegate);
for (DefaultControllerNode node : nodes.values()) {
communicationAdminService.addNode(node);
}
log.info("Started");
}
......@@ -92,8 +91,8 @@ public class DistributedClusterStore
if (localNode == null) {
localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
nodes.put(localNode.id(), localNode);
states.put(localNode.id(), State.ACTIVE);
}
states.put(localNode.id(), State.ACTIVE);
}
@Override
......@@ -122,29 +121,48 @@ public class DistributedClusterStore
public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
nodes.put(nodeId, node);
connectionManager.addNode(node);
communicationAdminService.addNode(node);
return node;
}
@Override
public void removeNode(NodeId nodeId) {
if (nodeId.equals(localNode.id())) {
// We are being ejected from the cluster, so remove all other nodes.
communicationAdminService.clearAllNodesAndStreams();
nodes.clear();
nodes.put(localNode.id(), localNode);
} else {
// Remove the other node.
DefaultControllerNode node = nodes.remove(nodeId);
if (node != null) {
connectionManager.removeNode(node);
communicationAdminService.removeNode(node);
}
}
}
// Entity to handle back calls from the connection manager.
private class InnerNodesDelegate implements ClusterNodesDelegate {
@Override
public void nodeDetected(DefaultControllerNode node) {
nodes.put(node.id(), node);
states.put(node.id(), State.ACTIVE);
public DefaultControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort) {
DefaultControllerNode node = nodes.get(nodeId);
if (node == null) {
node = (DefaultControllerNode) addNode(nodeId, ip, tcpPort);
}
states.put(nodeId, State.ACTIVE);
return node;
}
@Override
public void nodeVanished(DefaultControllerNode node) {
states.put(node.id(), State.INACTIVE);
public void nodeVanished(NodeId nodeId) {
states.put(nodeId, State.INACTIVE);
}
@Override
public void nodeRemoved(NodeId nodeId) {
removeNode(nodeId);
}
}
}
......
package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
/**
* Created by tom on 9/29/14.
*/
public interface MessageSender {
/**
* Sends the specified message to the given cluster node.
*
* @param nodeId node identifier
* @param message mesage to send
* @return true if the message was sent sucessfully; false if there is
* no stream or if there was an error
*/
boolean send(NodeId nodeId, ClusterMessage message);
}
package org.onlab.onos.store.cluster.impl;
import de.javakaffee.kryoserializers.URISerializer;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultDevice;
import org.onlab.onos.net.DefaultLink;
import org.onlab.onos.net.DefaultPort;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Element;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.EchoMessage;
import org.onlab.onos.store.cluster.messaging.LeavingMemberMessage;
import org.onlab.onos.store.cluster.messaging.HelloMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.cluster.messaging.NewMemberMessage;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.onlab.onos.store.serializers.ConnectPointSerializer;
import org.onlab.onos.store.serializers.DefaultLinkSerializer;
import org.onlab.onos.store.serializers.DefaultPortSerializer;
import org.onlab.onos.store.serializers.DeviceIdSerializer;
import org.onlab.onos.store.serializers.IpPrefixSerializer;
import org.onlab.onos.store.serializers.LinkKeySerializer;
import org.onlab.onos.store.serializers.NodeIdSerializer;
import org.onlab.onos.store.serializers.PortNumberSerializer;
import org.onlab.onos.store.serializers.ProviderIdSerializer;
import org.onlab.packet.IpPrefix;
import org.onlab.util.KryoPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import static com.google.common.base.Preconditions.checkState;
/**
* Factory for parsing messages sent between cluster members.
*/
@Component(immediate = true)
@Service
public class MessageSerializer implements SerializationService {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final int METADATA_LENGTH = 12; // 8 + 4
private static final int LENGTH_OFFSET = 8;
private static final long MARKER = 0xfeedcafebeaddeadL;
private KryoPool serializerPool;
@Activate
public void activate() {
setupKryoPool();
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
/**
* Sets up the common serialzers pool.
*/
protected void setupKryoPool() {
// FIXME Slice out types used in common to separate pool/namespace.
serializerPool = KryoPool.newBuilder()
.register(ArrayList.class,
HashMap.class,
ControllerNode.State.class,
Device.Type.class,
DefaultControllerNode.class,
DefaultDevice.class,
MastershipRole.class,
Port.class,
Element.class,
Link.Type.class,
MessageSubject.class,
HelloMessage.class,
NewMemberMessage.class,
LeavingMemberMessage.class,
EchoMessage.class
)
.register(IpPrefix.class, new IpPrefixSerializer())
.register(URI.class, new URISerializer())
.register(NodeId.class, new NodeIdSerializer())
.register(ProviderId.class, new ProviderIdSerializer())
.register(DeviceId.class, new DeviceIdSerializer())
.register(PortNumber.class, new PortNumberSerializer())
.register(DefaultPort.class, new DefaultPortSerializer())
.register(LinkKey.class, new LinkKeySerializer())
.register(ConnectPoint.class, new ConnectPointSerializer())
.register(DefaultLink.class, new DefaultLinkSerializer())
.build()
.populate(1);
}
@Override
public ClusterMessage decode(ByteBuffer buffer) {
try {
// Do we have enough bytes to read the header? If not, bail.
if (buffer.remaining() < METADATA_LENGTH) {
return null;
}
// Peek at the length and if we have enough to read the entire message
// go ahead, otherwise bail.
int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
if (buffer.remaining() < length) {
return null;
}
// At this point, we have enough data to read a complete message.
long marker = buffer.getLong();
checkState(marker == MARKER, "Incorrect message marker");
length = buffer.getInt();
// TODO: sanity checking for length
byte[] data = new byte[length - METADATA_LENGTH];
buffer.get(data);
return (ClusterMessage) serializerPool.deserialize(data);
} catch (Exception e) {
// TODO: recover from exceptions by forwarding stream to next marker
log.warn("Unable to decode message due to: " + e);
}
return null;
}
@Override
public void encode(ClusterMessage message, ByteBuffer buffer) {
try {
byte[] data = serializerPool.serialize(message);
buffer.putLong(MARKER);
buffer.putInt(data.length + METADATA_LENGTH);
buffer.put(data);
} catch (Exception e) {
// TODO: recover from exceptions by forwarding stream to next marker
log.warn("Unable to encode message due to: " + e);
}
}
}
package org.onlab.onos.store.cluster.impl;
/**
* Provides means to find a worker IO loop.
*/
public interface WorkerFinder {
/**
* Finds a suitable worker.
*
* @return available worker
*/
ClusterIOWorker findWorker();
}
/**
* Distributed cluster store and messaging subsystem implementation.
*/
package org.onlab.onos.store.cluster.impl;
\ No newline at end of file
......@@ -10,6 +10,15 @@ import java.util.Set;
public interface ClusterCommunicationService {
/**
* Sends a message to all controller nodes.
*
* @param message message to send
* @return true if the message was sent sucessfully to all nodes; false
* if there is no stream or if there was an error for some node
*/
boolean send(ClusterMessage message);
/**
* Sends a message to the specified controller node.
*
* @param message message to send
......
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
import org.onlab.packet.IpPrefix;
/**
* Base for cluster membership messages.
*/
public abstract class ClusterMembershipMessage extends ClusterMessage {
private NodeId nodeId;
private IpPrefix ipAddress;
private int tcpPort;
// For serialization
protected ClusterMembershipMessage() {
super(MessageSubject.HELLO);
nodeId = null;
ipAddress = null;
tcpPort = 0;
}
/**
* Creates a new membership message for the specified end-point data.
*
* @param subject message subject
* @param nodeId sending node identification
* @param ipAddress sending node IP address
* @param tcpPort sending node TCP port
*/
protected ClusterMembershipMessage(MessageSubject subject, NodeId nodeId,
IpPrefix ipAddress, int tcpPort) {
super(subject);
this.nodeId = nodeId;
this.ipAddress = ipAddress;
this.tcpPort = tcpPort;
}
/**
* Returns the sending node identifer.
*
* @return node identifier
*/
public NodeId nodeId() {
return nodeId;
}
/**
* Returns the sending node IP address.
*
* @return node IP address
*/
public IpPrefix ipAddress() {
return ipAddress;
}
/**
* Returns the sending node TCP listen port.
*
* @return TCP listen port
*/
public int tcpPort() {
return tcpPort;
}
}
......@@ -6,18 +6,10 @@ import org.onlab.packet.IpPrefix;
/**
* Hello message that nodes use to greet each other.
*/
public class HelloMessage extends ClusterMessage {
private NodeId nodeId;
private IpPrefix ipAddress;
private int tcpPort;
public class HelloMessage extends ClusterMembershipMessage {
// For serialization
private HelloMessage() {
super(MessageSubject.HELLO);
nodeId = null;
ipAddress = null;
tcpPort = 0;
}
/**
......@@ -28,36 +20,7 @@ public class HelloMessage extends ClusterMessage {
* @param tcpPort sending node TCP port
*/
public HelloMessage(NodeId nodeId, IpPrefix ipAddress, int tcpPort) {
super(MessageSubject.HELLO);
nodeId = nodeId;
ipAddress = ipAddress;
tcpPort = tcpPort;
}
/**
* Returns the sending node identifer.
*
* @return node identifier
*/
public NodeId nodeId() {
return nodeId;
super(MessageSubject.HELLO, nodeId, ipAddress, tcpPort);
}
/**
* Returns the sending node IP address.
*
* @return node IP address
*/
public IpPrefix ipAddress() {
return ipAddress;
}
/**
* Returns the sending node TCP listen port.
*
* @return TCP listen port
*/
public int tcpPort() {
return tcpPort;
}
}
......
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
/**
* Announcement message that nodes use to gossip about team departures.
*/
public class LeavingMemberMessage extends ClusterMembershipMessage {
// For serialization
private LeavingMemberMessage() {
super();
}
/**
* Creates a new goodbye message.
*
* @param nodeId sending node identification
*/
public LeavingMemberMessage(NodeId nodeId) {
super(MessageSubject.LEAVING_MEMBER, nodeId, null, 0);
}
}
......@@ -8,6 +8,12 @@ public enum MessageSubject {
/** Represents a first greeting message. */
HELLO,
/** Signifies announcement about new member. */
NEW_MEMBER,
/** Signifies announcement about leaving member. */
LEAVING_MEMBER,
/** Signifies a heart-beat message. */
ECHO
......
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
/**
* Represents a message consumer.
*/
......@@ -9,7 +11,8 @@ public interface MessageSubscriber {
* Receives the specified cluster message.
*
* @param message message to be received
* @param fromNodeId node from which the message was received
*/
void receive(ClusterMessage message);
void receive(ClusterMessage message, NodeId fromNodeId);
}
......
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
import org.onlab.packet.IpPrefix;
/**
* Announcement message that nodes use to gossip about new arrivals.
*/
public class NewMemberMessage extends ClusterMembershipMessage {
// For serialization
private NewMemberMessage() {
}
/**
* Creates a new gossip message for the specified end-point data.
*
* @param nodeId sending node identification
* @param ipAddress sending node IP address
* @param tcpPort sending node TCP port
*/
public NewMemberMessage(NodeId nodeId, IpPrefix ipAddress, int tcpPort) {
super(MessageSubject.NEW_MEMBER, nodeId, ipAddress, tcpPort);
}
}
......@@ -3,12 +3,12 @@ package org.onlab.onos.store.cluster.messaging;
import java.nio.ByteBuffer;
/**
* Service for serializing/deserializing intra-cluster messages.
* Service for encoding &amp; decoding intra-cluster messages.
*/
public interface SerializationService {
/**
* Decodes the specified byte buffer to obtain a message within.
* Decodes the specified byte buffer to obtain the message within.
*
* @param buffer byte buffer with message(s)
* @return parsed message
......
package org.onlab.onos.store.cluster.messaging.impl;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.impl.CommunicationsDelegate;
import org.onlab.onos.store.cluster.impl.MessageSender;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
import java.util.Set;
/**
* Implements the cluster communication services to use by other stores.
*/
@Component(immediate = true)
@Service
public class ClusterCommunicationManager
implements ClusterCommunicationService, CommunicationsDelegate {
// TODO: use something different that won't require synchronization
private Multimap<MessageSubject, MessageSubscriber> subscribers = HashMultimap.create();
private MessageSender messageSender;
@Override
public boolean send(ClusterMessage message, NodeId toNodeId) {
return messageSender.send(toNodeId, message);
}
@Override
public synchronized void addSubscriber(MessageSubject subject, MessageSubscriber subscriber) {
subscribers.put(subject, subscriber);
}
@Override
public synchronized void removeSubscriber(MessageSubject subject, MessageSubscriber subscriber) {
subscribers.remove(subject, subscriber);
}
@Override
public Set<MessageSubscriber> getSubscribers(MessageSubject subject) {
return ImmutableSet.copyOf(subscribers.get(subject));
}
@Override
public void dispatch(ClusterMessage message) {
Set<MessageSubscriber> set = getSubscribers(message.subject());
if (set != null) {
for (MessageSubscriber subscriber : set) {
subscriber.receive(message);
}
}
}
@Override
public void setSender(MessageSender messageSender) {
this.messageSender = messageSender;
}
}
package org.onlab.onos.store.cluster.messaging.impl;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import java.nio.ByteBuffer;
import static com.google.common.base.Preconditions.checkState;
/**
* Factory for parsing messages sent between cluster members.
*/
public class MessageSerializer implements SerializationService {
private static final int METADATA_LENGTH = 16; // 8 + 4 + 4
private static final int LENGTH_OFFSET = 12;
private static final long MARKER = 0xfeedcafebeaddeadL;
@Override
public ClusterMessage decode(ByteBuffer buffer) {
try {
// Do we have enough bytes to read the header? If not, bail.
if (buffer.remaining() < METADATA_LENGTH) {
return null;
}
// Peek at the length and if we have enough to read the entire message
// go ahead, otherwise bail.
int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
if (buffer.remaining() < length) {
return null;
}
// At this point, we have enough data to read a complete message.
long marker = buffer.getLong();
checkState(marker == MARKER, "Incorrect message marker");
int subjectOrdinal = buffer.getInt();
MessageSubject subject = MessageSubject.values()[subjectOrdinal];
length = buffer.getInt();
// TODO: sanity checking for length
byte[] data = new byte[length - METADATA_LENGTH];
buffer.get(data);
// TODO: add deserialization hook here; for now this hack
return null; // actually deserialize
} catch (Exception e) {
// TODO: recover from exceptions by forwarding stream to next marker
e.printStackTrace();
}
return null;
}
@Override
public void encode(ClusterMessage message, ByteBuffer buffer) {
try {
int i = 0;
// Type based lookup for proper encoder
} catch (Exception e) {
// TODO: recover from exceptions by forwarding stream to next marker
e.printStackTrace();
}
}
}
/**
* Cluster messaging APIs for the use by the various distributed stores.
*/
package org.onlab.onos.store.cluster.messaging;
\ No newline at end of file
package org.onlab.onos.store.cluster.impl;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.packet.IpPrefix;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Tests of the cluster communication manager.
*/
public class ClusterCommunicationManagerTest {
private static final NodeId N1 = new NodeId("n1");
private static final NodeId N2 = new NodeId("n2");
private static final int P1 = 9881;
private static final int P2 = 9882;
private static final IpPrefix IP = IpPrefix.valueOf("127.0.0.1");
private ClusterCommunicationManager ccm1;
private ClusterCommunicationManager ccm2;
private TestDelegate cnd1 = new TestDelegate();
private TestDelegate cnd2 = new TestDelegate();
private DefaultControllerNode node1 = new DefaultControllerNode(N1, IP, P1);
private DefaultControllerNode node2 = new DefaultControllerNode(N2, IP, P2);
@Before
public void setUp() {
MessageSerializer messageSerializer = new MessageSerializer();
messageSerializer.activate();
ccm1 = new ClusterCommunicationManager();
ccm1.serializationService = messageSerializer;
ccm1.activate();
ccm2 = new ClusterCommunicationManager();
ccm2.serializationService = messageSerializer;
ccm2.activate();
ccm1.startUp(node1, cnd1);
ccm2.startUp(node2, cnd2);
}
@After
public void tearDown() {
ccm1.deactivate();
ccm2.deactivate();
}
@Test
public void connect() throws Exception {
cnd1.latch = new CountDownLatch(1);
cnd2.latch = new CountDownLatch(1);
ccm1.addNode(node2);
validateDelegateEvent(cnd1, Op.DETECTED, node2.id());
validateDelegateEvent(cnd2, Op.DETECTED, node1.id());
}
@Test
public void disconnect() throws Exception {
cnd1.latch = new CountDownLatch(1);
cnd2.latch = new CountDownLatch(1);
ccm1.addNode(node2);
validateDelegateEvent(cnd1, Op.DETECTED, node2.id());
validateDelegateEvent(cnd2, Op.DETECTED, node1.id());
cnd1.latch = new CountDownLatch(1);
cnd2.latch = new CountDownLatch(1);
ccm1.deactivate();
//
// validateDelegateEvent(cnd2, Op.VANISHED, node1.id());
}
private void validateDelegateEvent(TestDelegate delegate, Op op, NodeId nodeId)
throws InterruptedException {
assertTrue("did not connect in time", delegate.latch.await(2500, TimeUnit.MILLISECONDS));
assertEquals("incorrect event", op, delegate.op);
assertEquals("incorrect event node", nodeId, delegate.nodeId);
}
enum Op { DETECTED, VANISHED, REMOVED };
private class TestDelegate implements ClusterNodesDelegate {
Op op;
CountDownLatch latch;
NodeId nodeId;
@Override
public DefaultControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort) {
latch(nodeId, Op.DETECTED);
return new DefaultControllerNode(nodeId, ip, tcpPort);
}
@Override
public void nodeVanished(NodeId nodeId) {
latch(nodeId, Op.VANISHED);
}
@Override
public void nodeRemoved(NodeId nodeId) {
latch(nodeId, Op.REMOVED);
}
private void latch(NodeId nodeId, Op op) {
this.op = op;
this.nodeId = nodeId;
latch.countDown();
}
}
}
\ No newline at end of file
......@@ -105,11 +105,9 @@ public class SimpleFlowRuleStore
*/
if (flowEntries.containsEntry(did, f)) {
//synchronized (flowEntries) {
flowEntries.remove(did, f);
flowEntries.put(did, f);
flowEntriesById.remove(rule.appId(), rule);
//}
}
}
......
......@@ -119,6 +119,14 @@
<bundle>mvn:org.onlab.onos/onos-app-fwd/1.0.0-SNAPSHOT</bundle>
</feature>
<feature name="onos-app-mobility" version="1.0.0"
description="ONOS sample forwarding application">
<feature>onos-api</feature>
<bundle>mvn:org.onlab.onos/onos-app-mobility/1.0.0-SNAPSHOT</bundle>
</feature>
<feature name="onos-app-foo" version="1.0.0"
description="ONOS sample playground application">
<feature>onos-api</feature>
......
......@@ -243,6 +243,8 @@ public abstract class AbstractOpenFlowSwitch implements OpenFlowSwitchDriver {
if (role == RoleState.SLAVE || role == RoleState.EQUAL) {
this.role = role;
}
} else {
this.role = role;
}
} catch (IOException e) {
log.error("Unable to write to switch {}.", this.dpid);
......
......@@ -651,7 +651,7 @@ class OFChannelHandler extends IdleStateAwareChannelHandler {
* @param error The error message
*/
protected void logError(OFChannelHandler h, OFErrorMsg error) {
log.error("{} from switch {} in state {}",
log.debug("{} from switch {} in state {}",
new Object[] {
error,
h.getSwitchInfoString(),
......
......@@ -77,6 +77,7 @@ public class FlowModBuilder {
.setCookie(U64.of(cookie.value()))
.setBufferId(OFBufferId.NO_BUFFER)
.setActions(actions)
.setIdleTimeout(10)
.setMatch(match)
.setFlags(Collections.singleton(OFFlowModFlags.SEND_FLOW_REM))
.setPriority(priority)
......@@ -104,6 +105,9 @@ public class FlowModBuilder {
private List<OFAction> buildActions() {
List<OFAction> acts = new LinkedList<>();
if (treatment == null) {
return acts;
}
for (Instruction i : treatment.instructions()) {
switch (i.type()) {
case DROP:
......
......@@ -31,6 +31,7 @@ import org.onlab.onos.openflow.controller.OpenFlowPacketContext;
import org.onlab.onos.openflow.controller.PacketListener;
import org.onlab.packet.ARP;
import org.onlab.packet.Ethernet;
import org.onlab.packet.IPv4;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.VlanId;
import org.slf4j.Logger;
......@@ -92,8 +93,6 @@ public class OpenFlowHostProvider extends AbstractProvider implements HostProvid
public void handlePacket(OpenFlowPacketContext pktCtx) {
Ethernet eth = pktCtx.parsed();
// Potentially a new or moved host
if (eth.getEtherType() == Ethernet.TYPE_ARP) {
VlanId vlan = VlanId.vlanId(eth.getVlanID());
ConnectPoint heardOn = new ConnectPoint(deviceId(Dpid.uri(pktCtx.dpid())),
portNumber(pktCtx.inPort()));
......@@ -107,14 +106,24 @@ public class OpenFlowHostProvider extends AbstractProvider implements HostProvid
HostLocation hloc = new HostLocation(deviceId(Dpid.uri(pktCtx.dpid())),
portNumber(pktCtx.inPort()),
System.currentTimeMillis());
HostId hid = HostId.hostId(eth.getSourceMAC(), vlan);
// Potentially a new or moved host
if (eth.getEtherType() == Ethernet.TYPE_ARP) {
ARP arp = (ARP) eth.getPayload();
Set<IpPrefix> ips = newHashSet(IpPrefix.valueOf(arp.getSenderProtocolAddress()));
HostDescription hdescr =
new DefaultHostDescription(eth.getSourceMAC(), vlan, hloc, ips);
providerService.hostDetected(hid, hdescr);
} else if (eth.getEtherType() == Ethernet.TYPE_IPV4) {
IPv4 ip = (IPv4) eth.getPayload();
Set<IpPrefix> ips = newHashSet(IpPrefix.valueOf(ip.getSourceAddress()));
HostDescription hdescr =
new DefaultHostDescription(eth.getSourceMAC(), vlan, hloc, ips);
providerService.hostDetected(hid, hdescr);
}
// TODO: Use DHCP packets as well later...
......
......@@ -106,10 +106,6 @@ public class OpenFlowPacketProvider extends AbstractProvider implements PacketPr
for (Instruction inst : packet.treatment().instructions()) {
if (inst.type().equals(Instruction.Type.OUTPUT)) {
p = portDesc(((OutputInstruction) inst).port());
/*if (!sw.getPorts().contains(p)) {
log.warn("Tried to write out non-existent port {}", p.getPortNo());
continue;
}*/
OFPacketOut po = packetOut(sw, eth, p.getPortNo());
sw.sendMsg(po);
}
......
......@@ -3,5 +3,7 @@
# ONOS remote command-line client.
#-------------------------------------------------------------------------------
[ "$1" = "-w" ] && shift && onos-wait-for-start $1
[ -n "$1" ] && OCI=$1 && shift
client -h $OCI "$@"
client -h $OCI "$@" 2>/dev/null
......
#!/bin/bash
#-------------------------------------------------------------------------------
# Remotely kills the ONOS service on the specified node.
#-------------------------------------------------------------------------------
[ ! -d "$ONOS_ROOT" ] && echo "ONOS_ROOT is not defined" >&2 && exit 1
. $ONOS_ROOT/tools/build/envDefaults
ssh $ONOS_USER@${1:-$OCI} "kill -9 \$(ps -ef | grep karaf.jar | grep -v grep | cut -c10-15)"
\ No newline at end of file
# ProxMox-based cell of ONOS instances 1,2 & ONOS mininet box
. $ONOS_ROOT/tools/test/cells/.reset
export ONOS_NIC="10.128.4.*"
export OC1="10.128.4.60"
#export OC2="192.168.97.131"
#export OCN="192.168.97.130"
# Default virtual box ONOS instances 1,2 & ONOS mininet box
export ONOS_NIC=192.168.56.*
export OC1="192.168.56.11"
export OC2="192.168.56.12"
export OCN="192.168.56.7"