Madan Jampani

Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next

Showing 72 changed files with 2360 additions and 623 deletions
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-apps</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>onos-app-config</artifactId>
<packaging>bundle</packaging>
<description>ONOS simple network configuration reader</description>
<dependencies>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.4.2</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
package org.onlab.onos.config;
import java.util.Collections;
import java.util.List;
import org.codehaus.jackson.annotate.JsonProperty;
/**
* Object to store address configuration read from a JSON file.
*/
public class AddressConfiguration {
private List<AddressEntry> addresses;
/**
* Gets a list of addresses in the system.
*
* @return the list of addresses
*/
public List<AddressEntry> getAddresses() {
return Collections.unmodifiableList(addresses);
}
/**
* Sets a list of addresses in the system.
*
* @param addresses the list of addresses
*/
@JsonProperty("addresses")
public void setAddresses(List<AddressEntry> addresses) {
this.addresses = addresses;
}
}
package org.onlab.onos.config;
import java.util.List;
import org.codehaus.jackson.annotate.JsonProperty;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
/**
* Represents a set of addresses bound to a port.
*/
public class AddressEntry {
private String dpid;
private short portNumber;
private List<IpPrefix> ipAddresses;
private MacAddress macAddress;
public String getDpid() {
return dpid;
}
@JsonProperty("dpid")
public void setDpid(String strDpid) {
this.dpid = strDpid;
}
public short getPortNumber() {
return portNumber;
}
@JsonProperty("port")
public void setPortNumber(short portNumber) {
this.portNumber = portNumber;
}
public List<IpPrefix> getIpAddresses() {
return ipAddresses;
}
@JsonProperty("ips")
public void setIpAddresses(List<IpPrefix> ipAddresses) {
this.ipAddresses = ipAddresses;
}
public MacAddress getMacAddress() {
return macAddress;
}
@JsonProperty("mac")
public void setMacAddress(MacAddress macAddress) {
this.macAddress = macAddress;
}
}
package org.onlab.onos.config;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.codehaus.jackson.map.ObjectMapper;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.host.HostAdminService;
import org.onlab.onos.net.host.PortAddresses;
import org.slf4j.Logger;
import com.google.common.collect.Sets;
/**
* Simple configuration module to read in supplementary network configuration
* from a file.
*/
@Component(immediate = true)
public class NetworkConfigReader {
private final Logger log = getLogger(getClass());
private static final String DEFAULT_CONFIG_FILE = "config/addresses.json";
private String configFileName = DEFAULT_CONFIG_FILE;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected HostAdminService hostAdminService;
@Activate
protected void activate() {
log.info("Started network config reader");
log.info("Config file set to {}", configFileName);
AddressConfiguration config = readNetworkConfig();
if (config != null) {
for (AddressEntry entry : config.getAddresses()) {
ConnectPoint cp = new ConnectPoint(
DeviceId.deviceId(dpidToUri(entry.getDpid())),
PortNumber.portNumber(entry.getPortNumber()));
PortAddresses addresses = new PortAddresses(cp,
Sets.newHashSet(entry.getIpAddresses()),
entry.getMacAddress());
hostAdminService.bindAddressesToPort(addresses);
}
}
}
@Deactivate
protected void deactivate() {
log.info("Stopped");
}
private AddressConfiguration readNetworkConfig() {
File configFile = new File(configFileName);
ObjectMapper mapper = new ObjectMapper();
try {
AddressConfiguration config =
mapper.readValue(configFile, AddressConfiguration.class);
return config;
} catch (FileNotFoundException e) {
log.warn("Configuration file not found: {}", configFileName);
} catch (IOException e) {
log.error("Unable to read config from file:", e);
}
return null;
}
private static String dpidToUri(String dpid) {
return "of:" + dpid.replace(":", "");
}
}
/**
* Simple configuration module to read in supplementary network configuration
* from a file.
*/
package org.onlab.onos.config;
{
"interfaces" : [
{
"dpid" : "00:00:00:00:00:00:01",
"port" : "1",
"ips" : ["192.168.10.101/24"],
"mac" : "00:00:00:11:22:33"
},
{
"dpid" : "00:00:00:00:00:00:02",
"port" : "1",
"ips" : ["192.168.20.101/24", "192.168.30.101/24"]
},
{
"dpid" : "00:00:00:00:00:00:03",
"port" : "1",
"ips" : ["10.1.0.1/16"],
"mac" : "00:00:00:00:00:01"
}
]
}
......@@ -233,7 +233,7 @@ public class IOLoopTestClient {
}
@Override
protected void connect(SelectionKey key) {
protected void connect(SelectionKey key) throws IOException {
super.connect(key);
TestMessageStream b = (TestMessageStream) key.attachment();
Worker w = ((CustomIOLoop) b.loop()).worker;
......
......@@ -26,7 +26,9 @@ import org.onlab.onos.net.packet.InboundPacket;
import org.onlab.onos.net.packet.PacketContext;
import org.onlab.onos.net.packet.PacketProcessor;
import org.onlab.onos.net.packet.PacketService;
import org.onlab.onos.net.proxyarp.ProxyArpService;
import org.onlab.onos.net.topology.TopologyService;
import org.onlab.packet.ARP;
import org.onlab.packet.Ethernet;
import org.slf4j.Logger;
......@@ -50,6 +52,9 @@ public class ReactiveForwarding {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleService flowRuleService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ProxyArpService proxyArpService;
private ReactivePacketProcessor processor = new ReactivePacketProcessor();
private ApplicationId appId;
......@@ -85,6 +90,16 @@ public class ReactiveForwarding {
InboundPacket pkt = context.inPacket();
Ethernet ethPkt = pkt.parsed();
if (ethPkt.getEtherType() == Ethernet.TYPE_ARP) {
ARP arp = (ARP) ethPkt.getPayload();
if (arp.getOpCode() == ARP.OP_REPLY) {
proxyArpService.forward(ethPkt);
} else if (arp.getOpCode() == ARP.OP_REQUEST) {
proxyArpService.reply(ethPkt);
}
context.block();
return;
}
HostId id = HostId.hostId(ethPkt.getDestinationMAC());
// Do we know who this is for? If not, flood and bail.
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-apps</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>onos-app-mobility</artifactId>
<packaging>bundle</packaging>
<description>ONOS simple Mobility app</description>
</project>
package org.onlab.onos.mobility;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collection;
import java.util.List;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.Host;
import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleService;
import org.onlab.onos.net.flow.criteria.Criteria.EthCriterion;
import org.onlab.onos.net.flow.criteria.Criterion;
import org.onlab.onos.net.flow.criteria.Criterion.Type;
import org.onlab.onos.net.host.HostEvent;
import org.onlab.onos.net.host.HostListener;
import org.onlab.onos.net.host.HostService;
import org.onlab.packet.MacAddress;
import org.slf4j.Logger;
import com.google.common.collect.Lists;
/**
* Sample mobility application. Cleans up flowmods when a host moves.
*/
@Component(immediate = true)
public class HostMobility {
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected HostService hostService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleService flowRuleService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
private ApplicationId appId;
@Activate
public void activate() {
appId = ApplicationId.getAppId();
hostService.addListener(new InternalHostListener());
log.info("Started with Application ID {}", appId.id());
}
@Deactivate
public void deactivate() {
flowRuleService.removeFlowRulesById(appId);
log.info("Stopped");
}
public class InternalHostListener
implements HostListener {
@Override
public void event(HostEvent event) {
switch (event.type()) {
case HOST_ADDED:
case HOST_REMOVED:
case HOST_UPDATED:
// don't care if a host has been added, removed.
break;
case HOST_MOVED:
log.info("Host {} has moved; cleaning up.", event.subject());
cleanup(event.subject());
break;
default:
break;
}
}
/**
* For a given host, remove any flow rule which references it's addresses.
* @param host the host to clean up for
*/
private void cleanup(Host host) {
Iterable<Device> devices = deviceService.getDevices();
List<FlowRule> flowRules = Lists.newLinkedList();
for (Device device : devices) {
flowRules.addAll(cleanupDevice(device, host));
}
FlowRule[] flows = new FlowRule[flowRules.size()];
flows = flowRules.toArray(flows);
flowRuleService.removeFlowRules(flows);
}
private Collection<? extends FlowRule> cleanupDevice(Device device, Host host) {
List<FlowRule> flowRules = Lists.newLinkedList();
MacAddress mac = host.mac();
for (FlowRule rule : flowRuleService.getFlowEntries(device.id())) {
for (Criterion c : rule.selector().criteria()) {
if (c.type() == Type.ETH_DST || c.type() == Type.ETH_SRC) {
EthCriterion eth = (EthCriterion) c;
if (eth.mac().equals(mac)) {
flowRules.add(rule);
}
}
}
}
//TODO: handle ip cleanup
return flowRules;
}
}
}
/**
* Trivial application that provides simple form of reactive forwarding.
*/
package org.onlab.onos.mobility;
......@@ -20,6 +20,8 @@
<module>tvue</module>
<module>fwd</module>
<module>foo</module>
<module>mobility</module>
<module>config</module>
</modules>
<properties>
......
......@@ -9,6 +9,8 @@ import org.onlab.onos.net.ConnectPoint;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import com.google.common.base.MoreObjects;
/**
* Represents address information bound to a port.
*/
......@@ -83,4 +85,13 @@ public class PortAddresses {
public int hashCode() {
return Objects.hash(connectPoint, ipAddresses, macAddress);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("connect-point", connectPoint)
.add("ip-addresses", ipAddresses)
.add("mac-address", macAddress)
.toString();
}
}
......
package org.onlab.onos.net.link;
import java.util.Set;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Link;
import java.util.Set;
/**
* Service for interacting with the inventory of infrastructure links.
*/
......
......@@ -21,9 +21,16 @@ public interface ProxyArpService {
* Sends a reply for a given request. If the host is not known then the arp
* will be flooded at all edge ports.
*
* @param request
* @param eth
* an arp request
*/
void reply(Ethernet request);
void reply(Ethernet eth);
/**
* Forwards an ARP request to its destination. Floods at the edge the ARP request if the
* destination is not known.
* @param eth an ethernet frame containing an ARP request.
*/
void forward(Ethernet eth);
}
......
package org.onlab.onos.net.link;
import java.util.Set;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Link;
import java.util.Set;
/**
* Test adapter for link service.
*/
......@@ -63,4 +63,5 @@ public class LinkServiceAdapter implements LinkService {
public void removeListener(LinkListener listener) {
}
}
......
......@@ -161,7 +161,7 @@ implements FlowRuleService, FlowRuleProviderRegistry {
switch (stored.state()) {
case ADDED:
case PENDING_ADD:
frp.applyFlowRule(flowRule);
frp.applyFlowRule(stored);
break;
case PENDING_REMOVE:
case REMOVED:
......
......@@ -53,7 +53,7 @@ public class LinkManager
protected final AbstractListenerRegistry<LinkEvent, LinkListener>
listenerRegistry = new AbstractListenerRegistry<>();
private LinkStoreDelegate delegate = new InternalStoreDelegate();
private final LinkStoreDelegate delegate = new InternalStoreDelegate();
private final DeviceListener deviceListener = new InternalDeviceListener();
......
package org.onlab.onos.net.proxyarp.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.Host;
import org.onlab.onos.net.HostId;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.device.DeviceEvent;
import org.onlab.onos.net.device.DeviceListener;
import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.flow.DefaultTrafficTreatment;
import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.host.HostService;
import org.onlab.onos.net.link.LinkEvent;
import org.onlab.onos.net.link.LinkListener;
import org.onlab.onos.net.link.LinkService;
import org.onlab.onos.net.packet.DefaultOutboundPacket;
import org.onlab.onos.net.packet.PacketService;
import org.onlab.onos.net.proxyarp.ProxyArpService;
import org.onlab.packet.ARP;
import org.onlab.packet.Ethernet;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.VlanId;
import org.slf4j.Logger;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
@Component(immediate = true)
@Service
public class ProxyArpManager implements ProxyArpService {
private final Logger log = getLogger(getClass());
private static final String MAC_ADDR_NULL = "Mac address cannot be null.";
private static final String REQUEST_NULL = "Arp request cannot be null.";
private static final String REQUEST_NOT_ARP = "Ethernet frame does not contain ARP request.";
private static final String NOT_ARP_REQUEST = "ARP is not a request.";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected HostService hostService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PacketService packetService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LinkService linkService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
private final Multimap<Device, PortNumber> internalPorts =
HashMultimap.<Device, PortNumber>create();
private final Multimap<Device, PortNumber> externalPorts =
HashMultimap.<Device, PortNumber>create();
/**
* Listens to both device service and link service to determine
* whether a port is internal or external.
*/
@Activate
public void activate() {
deviceService.addListener(new InternalDeviceListener());
linkService.addListener(new InternalLinkListener());
determinePortLocations();
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public boolean known(IpPrefix addr) {
checkNotNull(MAC_ADDR_NULL, addr);
Set<Host> hosts = hostService.getHostsByIp(addr);
return !hosts.isEmpty();
}
@Override
public void reply(Ethernet eth) {
checkNotNull(REQUEST_NULL, eth);
checkArgument(eth.getEtherType() == Ethernet.TYPE_ARP,
REQUEST_NOT_ARP);
ARP arp = (ARP) eth.getPayload();
checkArgument(arp.getOpCode() == ARP.OP_REQUEST, NOT_ARP_REQUEST);
VlanId vlan = VlanId.vlanId(eth.getVlanID());
Set<Host> hosts = hostService.getHostsByIp(IpPrefix.valueOf(arp
.getTargetProtocolAddress()));
Host dst = null;
Host src = hostService.getHost(HostId.hostId(eth.getSourceMAC(),
VlanId.vlanId(eth.getVlanID())));
for (Host host : hosts) {
if (host.vlan().equals(vlan)) {
dst = host;
break;
}
}
if (src == null || dst == null) {
flood(eth);
return;
}
Ethernet arpReply = buildArpReply(dst, eth);
// TODO: check send status with host service.
TrafficTreatment.Builder builder = new DefaultTrafficTreatment.Builder();
builder.setOutput(src.location().port());
packetService.emit(new DefaultOutboundPacket(src.location().deviceId(),
builder.build(), ByteBuffer.wrap(arpReply.serialize())));
}
@Override
public void forward(Ethernet eth) {
checkNotNull(REQUEST_NULL, eth);
checkArgument(eth.getEtherType() == Ethernet.TYPE_ARP,
REQUEST_NOT_ARP);
ARP arp = (ARP) eth.getPayload();
checkArgument(arp.getOpCode() == ARP.OP_REPLY, NOT_ARP_REQUEST);
Host h = hostService.getHost(HostId.hostId(eth.getDestinationMAC(),
VlanId.vlanId(eth.getVlanID())));
if (h == null) {
flood(eth);
} else {
TrafficTreatment.Builder builder = new DefaultTrafficTreatment.Builder();
builder.setOutput(h.location().port());
packetService.emit(new DefaultOutboundPacket(h.location().deviceId(),
builder.build(), ByteBuffer.wrap(eth.serialize())));
}
}
/**
* Flood the arp request at all edges in the network.
* @param request the arp request.
*/
private void flood(Ethernet request) {
TrafficTreatment.Builder builder = null;
ByteBuffer buf = ByteBuffer.wrap(request.serialize());
synchronized (externalPorts) {
for (Entry<Device, PortNumber> entry : externalPorts.entries()) {
builder = new DefaultTrafficTreatment.Builder();
builder.setOutput(entry.getValue());
packetService.emit(new DefaultOutboundPacket(entry.getKey().id(),
builder.build(), buf));
}
}
}
/**
* Determines the location of all known ports in the system.
*/
private void determinePortLocations() {
Iterable<Device> devices = deviceService.getDevices();
Iterable<Link> links = null;
List<PortNumber> ports = null;
for (Device d : devices) {
ports = buildPortNumberList(deviceService.getPorts(d.id()));
links = linkService.getLinks();
for (Link l : links) {
// for each link, mark the concerned ports as internal
// and the remaining ports are therefore external.
if (l.src().deviceId().equals(d)
&& ports.contains(l.src().port())) {
ports.remove(l.src().port());
internalPorts.put(d, l.src().port());
}
if (l.dst().deviceId().equals(d)
&& ports.contains(l.dst().port())) {
ports.remove(l.dst().port());
internalPorts.put(d, l.dst().port());
}
}
synchronized (externalPorts) {
externalPorts.putAll(d, ports);
}
}
}
private List<PortNumber> buildPortNumberList(List<Port> ports) {
List<PortNumber> portNumbers = Lists.newLinkedList();
for (Port p : ports) {
portNumbers.add(p.number());
}
return portNumbers;
}
/**
* Builds an arp reply based on a request.
* @param h the host we want to send to
* @param request the arp request we got
* @return an ethernet frame containing the arp reply
*/
private Ethernet buildArpReply(Host h, Ethernet request) {
Ethernet eth = new Ethernet();
eth.setDestinationMACAddress(request.getSourceMACAddress());
eth.setSourceMACAddress(h.mac().getAddress());
eth.setEtherType(Ethernet.TYPE_ARP);
eth.setVlanID(request.getVlanID());
ARP arp = new ARP();
arp.setOpCode(ARP.OP_REPLY);
arp.setProtocolType(ARP.PROTO_TYPE_IP);
arp.setHardwareType(ARP.HW_TYPE_ETHERNET);
arp.setProtocolAddressLength((byte) IpPrefix.INET_LEN);
arp.setHardwareAddressLength((byte) Ethernet.DATALAYER_ADDRESS_LENGTH);
arp.setSenderHardwareAddress(h.mac().getAddress());
arp.setTargetHardwareAddress(request.getSourceMACAddress());
arp.setTargetProtocolAddress(((ARP) request.getPayload())
.getSenderProtocolAddress());
arp.setSenderProtocolAddress(h.ipAddresses().iterator().next().toInt());
eth.setPayload(arp);
return eth;
}
public class InternalLinkListener implements LinkListener {
@Override
public void event(LinkEvent event) {
Link link = event.subject();
Device src = deviceService.getDevice(link.src().deviceId());
Device dst = deviceService.getDevice(link.dst().deviceId());
switch (event.type()) {
case LINK_ADDED:
synchronized (externalPorts) {
externalPorts.remove(src, link.src().port());
externalPorts.remove(dst, link.dst().port());
internalPorts.put(src, link.src().port());
internalPorts.put(dst, link.dst().port());
}
break;
case LINK_REMOVED:
synchronized (externalPorts) {
externalPorts.put(src, link.src().port());
externalPorts.put(dst, link.dst().port());
internalPorts.remove(src, link.src().port());
internalPorts.remove(dst, link.dst().port());
}
break;
case LINK_UPDATED:
// don't care about links being updated.
break;
default:
break;
}
}
}
public class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
Device device = event.subject();
switch (event.type()) {
case DEVICE_ADDED:
case DEVICE_AVAILABILITY_CHANGED:
case DEVICE_MASTERSHIP_CHANGED:
case DEVICE_SUSPENDED:
case DEVICE_UPDATED:
case PORT_UPDATED:
// nothing to do in these cases; handled when links get reported
break;
case DEVICE_REMOVED:
synchronized (externalPorts) {
externalPorts.removeAll(device);
internalPorts.removeAll(device);
}
break;
case PORT_ADDED:
synchronized (externalPorts) {
externalPorts.put(device, event.port().number());
internalPorts.remove(device, event.port().number());
}
break;
case PORT_REMOVED:
synchronized (externalPorts) {
externalPorts.remove(device, event.port().number());
internalPorts.remove(device, event.port().number());
}
break;
default:
break;
}
}
}
}
/**
* Core subsystem for responding to arp requests.
*/
package org.onlab.onos.proxyarp.impl;
package org.onlab.onos.net.proxyarp.impl;
......
package org.onlab.onos.proxyarp.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import java.nio.ByteBuffer;
import java.util.Set;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.net.Host;
import org.onlab.onos.net.flow.DefaultTrafficTreatment;
import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.host.HostService;
import org.onlab.onos.net.packet.DefaultOutboundPacket;
import org.onlab.onos.net.packet.PacketService;
import org.onlab.onos.net.proxyarp.ProxyArpService;
import org.onlab.onos.net.topology.TopologyService;
import org.onlab.packet.ARP;
import org.onlab.packet.Ethernet;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.VlanId;
public class ProxyArpManager implements ProxyArpService {
private static final String MAC_ADDR_NULL = "Mac address cannot be null.";
private static final String REQUEST_NULL = "Arp request cannot be null.";
private static final String REQUEST_NOT_ARP = "Ethernet frame does not contain ARP request.";
private static final String NOT_ARP_REQUEST = "ARP is not a request.";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected HostService hostService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PacketService packetService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected TopologyService topologyService;
@Override
public boolean known(IpPrefix addr) {
checkNotNull(MAC_ADDR_NULL, addr);
Set<Host> hosts = hostService.getHostsByIp(addr);
return !hosts.isEmpty();
}
@Override
public void reply(Ethernet request) {
checkNotNull(REQUEST_NULL, request);
checkArgument(request.getEtherType() == Ethernet.TYPE_ARP,
REQUEST_NOT_ARP);
ARP arp = (ARP) request.getPayload();
checkArgument(arp.getOpCode() == ARP.OP_REQUEST, NOT_ARP_REQUEST);
VlanId vlan = VlanId.vlanId(request.getVlanID());
Set<Host> hosts = hostService.getHostsByIp(IpPrefix.valueOf(arp
.getTargetProtocolAddress()));
Host h = null;
for (Host host : hosts) {
if (host.vlan().equals(vlan)) {
h = host;
break;
}
}
if (h == null) {
flood(request);
return;
}
Ethernet arpReply = buildArpReply(h, request);
// TODO: check send status with host service.
TrafficTreatment.Builder builder = new DefaultTrafficTreatment.Builder();
builder.setOutput(h.location().port());
packetService.emit(new DefaultOutboundPacket(h.location().deviceId(),
builder.build(), ByteBuffer.wrap(arpReply.serialize())));
}
private void flood(Ethernet request) {
// TODO: flood on all edge ports.
}
private Ethernet buildArpReply(Host h, Ethernet request) {
Ethernet eth = new Ethernet();
eth.setDestinationMACAddress(request.getSourceMACAddress());
eth.setSourceMACAddress(h.mac().getAddress());
eth.setEtherType(Ethernet.TYPE_ARP);
ARP arp = new ARP();
arp.setOpCode(ARP.OP_REPLY);
arp.setSenderHardwareAddress(h.mac().getAddress());
arp.setTargetHardwareAddress(request.getSourceMACAddress());
arp.setTargetProtocolAddress(((ARP) request.getPayload())
.getSenderProtocolAddress());
arp.setSenderProtocolAddress(h.ipAddresses().iterator().next().toInt());
eth.setPayload(arp);
return eth;
}
}
......@@ -33,8 +33,11 @@ import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.StoreManager;
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.common.TestStoreManager;
import org.onlab.onos.store.device.impl.DistributedDeviceStore;
import org.onlab.onos.store.serializers.KryoSerializationManager;
import org.onlab.onos.store.serializers.KryoSerializationService;
import org.onlab.packet.IpPrefix;
import java.util.ArrayList;
......@@ -92,6 +95,7 @@ public class DistributedDeviceManagerTest {
private DistributedDeviceStore dstore;
private TestMastershipManager masterManager;
private EventDeliveryService eventService;
private KryoSerializationManager serializationMgr;
@Before
public void setUp() {
......@@ -107,7 +111,10 @@ public class DistributedDeviceManagerTest {
storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
storeManager.activate();
dstore = new TestDistributedDeviceStore();
serializationMgr = new KryoSerializationManager();
serializationMgr.activate();
dstore = new TestDistributedDeviceStore(storeManager, serializationMgr);
dstore.activate();
mgr.store = dstore;
......@@ -133,6 +140,7 @@ public class DistributedDeviceManagerTest {
mgr.deactivate();
dstore.deactivate();
serializationMgr.deactivate();
storeManager.deactivate();
}
......@@ -298,8 +306,10 @@ public class DistributedDeviceManagerTest {
private class TestDistributedDeviceStore extends DistributedDeviceStore {
public TestDistributedDeviceStore() {
this.storeService = storeManager;
public TestDistributedDeviceStore(StoreService storeService,
KryoSerializationService kryoSerializationService) {
this.storeService = storeService;
this.kryoSerializationService = kryoSerializationService;
}
}
......
package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.cluster.DefaultControllerNode;
/**
* Service for administering communications manager.
*/
public interface ClusterCommunicationAdminService {
/**
* Adds the node to the list of monitored nodes.
*
* @param node node to be added
*/
void addNode(DefaultControllerNode node);
/**
* Removes the node from the list of monitored nodes.
*
* @param node node to be removed
*/
void removeNode(DefaultControllerNode node);
/**
* Starts-up the communications engine.
*
* @param localNode local controller node
* @param delegate nodes delegate
*/
void startUp(DefaultControllerNode localNode, ClusterNodesDelegate delegate);
/**
* Clears all nodes and streams as part of leaving the cluster.
*/
void clearAllNodesAndStreams();
}
package org.onlab.onos.store.cluster.impl;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMembershipMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.HelloMessage;
import org.onlab.onos.store.cluster.messaging.LeavingMemberMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
import org.onlab.onos.store.cluster.messaging.NewMemberMessage;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.onlab.packet.IpPrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static java.net.InetAddress.getByAddress;
import static org.onlab.util.Tools.namedThreads;
/**
* Implements the cluster communication services to use by other stores.
*/
@Component(immediate = true)
@Service
public class ClusterCommunicationManager
implements ClusterCommunicationService, ClusterCommunicationAdminService {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final long CONNECTION_CUSTODIAN_DELAY = 100L;
private static final long CONNECTION_CUSTODIAN_FREQUENCY = 2000;
private static final long START_TIMEOUT = 1000;
private static final int WORKERS = 3;
private ClusterConnectionListener connectionListener;
private List<ClusterIOWorker> workers = new ArrayList<>(WORKERS);
private DefaultControllerNode localNode;
private ClusterNodesDelegate nodesDelegate;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected SerializationService serializationService;
// Nodes to be monitored to make sure they have a connection.
private final Set<DefaultControllerNode> nodes = new HashSet<>();
// Means to track message streams to other nodes.
private final Map<NodeId, ClusterMessageStream> streams = new ConcurrentHashMap<>();
// TODO: use something different that won't require synchronization
private Multimap<MessageSubject, MessageSubscriber> subscribers = HashMultimap.create();
// Executor pools for listening and managing connections to other nodes.
private final ExecutorService listenExecutor =
Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen"));
private final ExecutorService commExecutors =
Executors.newFixedThreadPool(WORKERS, namedThreads("onos-comm-cluster"));
private final ExecutorService heartbeatExecutor =
Executors.newSingleThreadExecutor(namedThreads("onos-comm-heartbeat"));
private final Timer timer = new Timer("onos-comm-initiator");
private final TimerTask connectionCustodian = new ConnectionCustodian();
private MembershipSubscriber membershipSubscriber = new MembershipSubscriber();
@Activate
public void activate() {
addSubscriber(MessageSubject.NEW_MEMBER, membershipSubscriber);
addSubscriber(MessageSubject.LEAVING_MEMBER, membershipSubscriber);
log.info("Activated but waiting for delegate");
}
@Deactivate
public void deactivate() {
removeSubscriber(MessageSubject.NEW_MEMBER, membershipSubscriber);
removeSubscriber(MessageSubject.LEAVING_MEMBER, membershipSubscriber);
connectionCustodian.cancel();
if (connectionListener != null) {
connectionListener.shutdown();
for (ClusterIOWorker worker : workers) {
worker.shutdown();
}
}
log.info("Stopped");
}
@Override
public boolean send(ClusterMessage message) {
boolean ok = true;
for (DefaultControllerNode node : nodes) {
if (!node.equals(localNode)) {
ok = send(message, node.id()) && ok;
}
}
return ok;
}
@Override
public boolean send(ClusterMessage message, NodeId toNodeId) {
ClusterMessageStream stream = streams.get(toNodeId);
if (stream != null && !toNodeId.equals(localNode.id())) {
try {
stream.write(message);
return true;
} catch (IOException e) {
log.warn("Unable to send message {} to node {}",
message.subject(), toNodeId);
}
}
return false;
}
@Override
public synchronized void addSubscriber(MessageSubject subject,
MessageSubscriber subscriber) {
subscribers.put(subject, subscriber);
}
@Override
public synchronized void removeSubscriber(MessageSubject subject,
MessageSubscriber subscriber) {
subscribers.remove(subject, subscriber);
}
@Override
public Set<MessageSubscriber> getSubscribers(MessageSubject subject) {
return ImmutableSet.copyOf(subscribers.get(subject));
}
@Override
public void addNode(DefaultControllerNode node) {
nodes.add(node);
}
@Override
public void removeNode(DefaultControllerNode node) {
send(new LeavingMemberMessage(node.id()));
nodes.remove(node);
ClusterMessageStream stream = streams.remove(node.id());
if (stream != null) {
stream.close();
}
}
@Override
public void startUp(DefaultControllerNode localNode,
ClusterNodesDelegate delegate) {
this.localNode = localNode;
this.nodesDelegate = delegate;
startCommunications();
startListening();
startInitiatingConnections();
log.info("Started");
}
@Override
public void clearAllNodesAndStreams() {
nodes.clear();
send(new LeavingMemberMessage(localNode.id()));
for (ClusterMessageStream stream : streams.values()) {
stream.close();
}
streams.clear();
}
/**
* Dispatches the specified message to all subscribers to its subject.
*
* @param message message to dispatch
* @param fromNodeId node from which the message was received
*/
void dispatch(ClusterMessage message, NodeId fromNodeId) {
Set<MessageSubscriber> set = getSubscribers(message.subject());
if (set != null) {
for (MessageSubscriber subscriber : set) {
subscriber.receive(message, fromNodeId);
}
}
}
/**
* Adds the stream associated with the specified node.
*
* @param nodeId newly detected cluster node id
* @param ip node IP listen address
* @param tcpPort node TCP listen port
* @return controller node bound to the stream
*/
DefaultControllerNode addNodeStream(NodeId nodeId, IpPrefix ip, int tcpPort,
ClusterMessageStream stream) {
DefaultControllerNode node = nodesDelegate.nodeDetected(nodeId, ip, tcpPort);
stream.setNode(node);
streams.put(node.id(), stream);
send(new NewMemberMessage(node.id(), node.ip(), node.tcpPort()));
return node;
}
/**
* Removes the stream associated with the specified node.
*
* @param node node whose stream to remove
*/
void removeNodeStream(DefaultControllerNode node) {
nodesDelegate.nodeVanished(node.id());
streams.remove(node.id());
}
/**
* Finds the least utilized IO worker.
*
* @return IO worker
*/
ClusterIOWorker findWorker() {
ClusterIOWorker leastUtilized = null;
int minCount = Integer.MAX_VALUE;
for (ClusterIOWorker worker : workers) {
int count = worker.streamCount();
if (count == 0) {
return worker;
}
if (count < minCount) {
leastUtilized = worker;
minCount = count;
}
}
return leastUtilized;
}
/**
* Kicks off the IO loops and waits for them to startup.
*/
private void startCommunications() {
HelloMessage hello = new HelloMessage(localNode.id(), localNode.ip(),
localNode.tcpPort());
for (int i = 0; i < WORKERS; i++) {
try {
ClusterIOWorker worker =
new ClusterIOWorker(this, serializationService, hello);
workers.add(worker);
commExecutors.execute(worker);
} catch (IOException e) {
log.warn("Unable to start communication worker", e);
}
}
// Wait for the IO loops to start
for (ClusterIOWorker loop : workers) {
if (!loop.awaitStart(START_TIMEOUT)) {
log.warn("Comm loop did not start on-time; moving on...");
}
}
}
/**
* Starts listening for connections from peer cluster members.
*/
private void startListening() {
try {
connectionListener =
new ClusterConnectionListener(this, localNode.ip(), localNode.tcpPort());
listenExecutor.execute(connectionListener);
if (!connectionListener.awaitStart(START_TIMEOUT)) {
log.warn("Listener did not start on-time; moving on...");
}
} catch (IOException e) {
log.error("Unable to listen for cluster connections", e);
}
}
/**
* Attempts to connect to any nodes that do not have an associated connection.
*/
private void startInitiatingConnections() {
timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY,
CONNECTION_CUSTODIAN_FREQUENCY);
}
/**
* Initiates open connection request and registers the pending socket
* channel with the given IO worker.
*
* @param worker loop with which the channel should be registered
* @throws java.io.IOException if the socket could not be open or connected
*/
private void initiateConnection(DefaultControllerNode node,
ClusterIOWorker worker) throws IOException {
SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort());
SocketChannel ch = SocketChannel.open();
ch.configureBlocking(false);
ch.connect(sa);
worker.connectStream(ch);
}
// Sweeps through all controller nodes and attempts to open connection to
// those that presently do not have one.
private class ConnectionCustodian extends TimerTask {
@Override
public void run() {
for (DefaultControllerNode node : nodes) {
if (!node.id().equals(localNode.id()) && !streams.containsKey(node.id())) {
try {
initiateConnection(node, findWorker());
} catch (IOException e) {
log.debug("Unable to connect", e);
}
}
}
}
}
private class MembershipSubscriber implements MessageSubscriber {
@Override
public void receive(ClusterMessage message, NodeId fromNodeId) {
MessageSubject subject = message.subject();
ClusterMembershipMessage cmm = (ClusterMembershipMessage) message;
if (message.subject() == MessageSubject.NEW_MEMBER) {
log.info("Node {} arrived", cmm.nodeId());
nodesDelegate.nodeDetected(cmm.nodeId(), cmm.ipAddress(), cmm.tcpPort());
} else if (subject == MessageSubject.LEAVING_MEMBER) {
log.info("Node {} is leaving", cmm.nodeId());
nodesDelegate.nodeRemoved(cmm.nodeId());
}
}
}
}
package org.onlab.onos.store.cluster.impl;
import org.onlab.nio.AcceptorLoop;
import org.onlab.packet.IpPrefix;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import static java.net.InetAddress.getByAddress;
/**
* Listens to inbound connection requests and accepts them.
*/
public class ClusterConnectionListener extends AcceptorLoop {
private static final long SELECT_TIMEOUT = 50;
private static final int COMM_BUFFER_SIZE = 32 * 1024;
private static final boolean SO_NO_DELAY = false;
private static final int SO_SEND_BUFFER_SIZE = COMM_BUFFER_SIZE;
private static final int SO_RCV_BUFFER_SIZE = COMM_BUFFER_SIZE;
private final ClusterCommunicationManager manager;
ClusterConnectionListener(ClusterCommunicationManager manager,
IpPrefix ip, int tcpPort) throws IOException {
super(SELECT_TIMEOUT, new InetSocketAddress(getByAddress(ip.toOctets()), tcpPort));
this.manager = manager;
}
@Override
protected void acceptConnection(ServerSocketChannel channel) throws IOException {
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
Socket so = sc.socket();
so.setTcpNoDelay(SO_NO_DELAY);
so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
manager.findWorker().acceptStream(sc);
}
}
package org.onlab.onos.store.cluster.impl;
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.HelloMessage;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.Objects;
import static org.onlab.packet.IpPrefix.valueOf;
/**
* Performs the IO operations related to a cluster-wide communications.
*/
public class ClusterIOWorker extends
IOLoop<ClusterMessage, ClusterMessageStream> {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final long SELECT_TIMEOUT = 50;
private final ClusterCommunicationManager manager;
private final SerializationService serializationService;
private final ClusterMessage helloMessage;
/**
* Creates a new cluster IO worker.
*
* @param manager parent comms manager
* @param serializationService serialization service for encode/decode
* @param helloMessage hello message for greeting peers
* @throws IOException if errors occur during IO loop ignition
*/
ClusterIOWorker(ClusterCommunicationManager manager,
SerializationService serializationService,
ClusterMessage helloMessage) throws IOException {
super(SELECT_TIMEOUT);
this.manager = manager;
this.serializationService = serializationService;
this.helloMessage = helloMessage;
}
@Override
protected ClusterMessageStream createStream(ByteChannel byteChannel) {
return new ClusterMessageStream(serializationService, this, byteChannel);
}
@Override
protected void processMessages(List<ClusterMessage> messages, MessageStream<ClusterMessage> stream) {
NodeId nodeId = getNodeId(messages, (ClusterMessageStream) stream);
for (ClusterMessage message : messages) {
manager.dispatch(message, nodeId);
}
}
// Retrieves the node from the stream. If one is not bound, it attempts
// to bind it using the knowledge that the first message must be a hello.
private NodeId getNodeId(List<ClusterMessage> messages, ClusterMessageStream stream) {
DefaultControllerNode node = stream.node();
if (node == null && !messages.isEmpty()) {
ClusterMessage firstMessage = messages.get(0);
if (firstMessage instanceof HelloMessage) {
HelloMessage hello = (HelloMessage) firstMessage;
node = manager.addNodeStream(hello.nodeId(), hello.ipAddress(),
hello.tcpPort(), stream);
}
}
return node != null ? node.id() : null;
}
@Override
public ClusterMessageStream acceptStream(SocketChannel channel) {
ClusterMessageStream stream = super.acceptStream(channel);
try {
InetSocketAddress sa = (InetSocketAddress) channel.getRemoteAddress();
log.info("Accepted connection from node {}", valueOf(sa.getAddress().getAddress()));
stream.write(helloMessage);
} catch (IOException e) {
log.warn("Unable to accept connection from an unknown end-point", e);
}
return stream;
}
@Override
protected void connect(SelectionKey key) throws IOException {
try {
super.connect(key);
ClusterMessageStream stream = (ClusterMessageStream) key.attachment();
stream.write(helloMessage);
} catch (IOException e) {
if (!Objects.equals(e.getMessage(), "Connection refused")) {
throw e;
}
}
}
@Override
protected void removeStream(MessageStream<ClusterMessage> stream) {
DefaultControllerNode node = ((ClusterMessageStream) stream).node();
if (node != null) {
log.info("Closed connection to node {}", node.id());
manager.removeNodeStream(node);
}
super.removeStream(stream);
}
}
......@@ -3,6 +3,8 @@ package org.onlab.onos.store.cluster.impl;
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
......@@ -10,29 +12,29 @@ import java.nio.channels.ByteChannel;
import static com.google.common.base.Preconditions.checkState;
/**
* Stream for transferring TLV messages between cluster members.
* Stream for transferring messages between two cluster members.
*/
public class TLVMessageStream extends MessageStream<TLVMessage> {
public class ClusterMessageStream extends MessageStream<ClusterMessage> {
public static final int METADATA_LENGTH = 16; // 8 + 4 + 4
private static final int LENGTH_OFFSET = 12;
private static final long MARKER = 0xfeedcafecafefeedL;
private static final int COMM_BUFFER_SIZE = 32 * 1024;
private static final int COMM_IDLE_TIME = 500;
private DefaultControllerNode node;
private SerializationService serializationService;
/**
* Creates a message stream associated with the specified IO loop and
* backed by the given byte channel.
*
* @param serializationService service for encoding/decoding messages
* @param loop IO loop
* @param byteChannel backing byte channel
* @param bufferSize size of the backing byte buffers
* @param maxIdleMillis maximum number of millis the stream can be idle
*/
protected TLVMessageStream(IOLoop<TLVMessage, ?> loop, ByteChannel byteChannel,
int bufferSize, int maxIdleMillis) {
super(loop, byteChannel, bufferSize, maxIdleMillis);
public ClusterMessageStream(SerializationService serializationService,
IOLoop<ClusterMessage, ?> loop,
ByteChannel byteChannel) {
super(loop, byteChannel, COMM_BUFFER_SIZE, COMM_IDLE_TIME);
this.serializationService = serializationService;
}
/**
......@@ -40,7 +42,7 @@ public class TLVMessageStream extends MessageStream<TLVMessage> {
*
* @return controller node
*/
DefaultControllerNode node() {
public DefaultControllerNode node() {
return node;
}
......@@ -49,47 +51,19 @@ public class TLVMessageStream extends MessageStream<TLVMessage> {
*
* @param node controller node
*/
void setNode(DefaultControllerNode node) {
public void setNode(DefaultControllerNode node) {
checkState(this.node == null, "Stream is already bound to a node");
this.node = node;
}
@Override
protected TLVMessage read(ByteBuffer buffer) {
// Do we have enough bytes to read the header? If not, bail.
if (buffer.remaining() < METADATA_LENGTH) {
return null;
}
// Peek at the length and if we have enough to read the entire message
// go ahead, otherwise bail.
int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
if (buffer.remaining() < length) {
return null;
}
// At this point, we have enough data to read a complete message.
long marker = buffer.getLong();
checkState(marker == MARKER, "Incorrect message marker");
int type = buffer.getInt();
length = buffer.getInt();
// TODO: add deserialization hook here
byte[] data = new byte[length - METADATA_LENGTH];
buffer.get(data);
return new TLVMessage(type, data);
protected ClusterMessage read(ByteBuffer buffer) {
return serializationService.decode(buffer);
}
@Override
protected void write(TLVMessage message, ByteBuffer buffer) {
buffer.putLong(MARKER);
buffer.putInt(message.type());
buffer.putInt(message.length());
// TODO: add serialization hook here
buffer.put(message.data());
protected void write(ClusterMessage message, ByteBuffer buffer) {
serializationService.encode(message, buffer);
}
}
......
package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.packet.IpPrefix;
/**
* Simple back interface through which connection manager can interact with
* the cluster store.
*/
public interface ClusterNodesDelegate {
/**
* Notifies about cluster node coming online.
*
* @param nodeId newly detected cluster node id
* @param ip node IP listen address
* @param tcpPort node TCP listen port
* @return the controller node
*/
DefaultControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort);
/**
* Notifies about cluster node going offline.
*
* @param nodeId identifier of the cluster node that vanished
*/
void nodeVanished(NodeId nodeId);
/**
* Notifies about remote request to remove node from cluster.
*
* @param nodeId identifier of the cluster node that was removed
*/
void nodeRemoved(NodeId nodeId);
}
......@@ -4,10 +4,9 @@ import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.nio.AcceptorLoop;
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
import org.onlab.onos.cluster.ClusterEvent;
import org.onlab.onos.cluster.ClusterStore;
import org.onlab.onos.cluster.ClusterStoreDelegate;
......@@ -20,27 +19,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static java.net.InetAddress.getByAddress;
import static org.onlab.onos.cluster.ControllerNode.State;
import static org.onlab.packet.IpPrefix.valueOf;
import static org.onlab.util.Tools.namedThreads;
/**
* Distributed implementation of the cluster nodes store.
......@@ -51,145 +35,69 @@ public class DistributedClusterStore
extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
implements ClusterStore {
private static final int HELLO_MSG = 1;
private static final int ECHO_MSG = 2;
private final Logger log = LoggerFactory.getLogger(getClass());
private static final long CONNECTION_CUSTODIAN_DELAY = 1000L;
private static final long CONNECTION_CUSTODIAN_FREQUENCY = 5000;
private static final long START_TIMEOUT = 1000;
private static final long SELECT_TIMEOUT = 50;
private static final int WORKERS = 3;
private static final int COMM_BUFFER_SIZE = 32 * 1024;
private static final int COMM_IDLE_TIME = 500;
private static final boolean SO_NO_DELAY = false;
private static final int SO_SEND_BUFFER_SIZE = COMM_BUFFER_SIZE;
private static final int SO_RCV_BUFFER_SIZE = COMM_BUFFER_SIZE;
private DefaultControllerNode self;
private DefaultControllerNode localNode;
private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>();
private final Map<NodeId, State> states = new ConcurrentHashMap<>();
// Means to track message streams to other nodes.
private final Map<NodeId, TLVMessageStream> streams = new ConcurrentHashMap<>();
private final Map<SocketChannel, DefaultControllerNode> nodesByChannel = new ConcurrentHashMap<>();
// Executor pools for listening and managing connections to other nodes.
private final ExecutorService listenExecutor =
Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen"));
private final ExecutorService commExecutors =
Executors.newFixedThreadPool(WORKERS, namedThreads("onos-comm-cluster"));
private final ExecutorService heartbeatExecutor =
Executors.newSingleThreadExecutor(namedThreads("onos-comm-heartbeat"));
private final Timer timer = new Timer("onos-comm-initiator");
private final TimerTask connectionCustodian = new ConnectionCustodian();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterCommunicationAdminService communicationAdminService;
private ListenLoop listenLoop;
private List<CommLoop> commLoops = new ArrayList<>(WORKERS);
private final ClusterNodesDelegate nodesDelegate = new InnerNodesDelegate();
@Activate
public void activate() {
loadClusterDefinition();
startCommunications();
startListening();
startInitiating();
establishSelfIdentity();
// Start-up the comm service and prime it with the loaded nodes.
communicationAdminService.startUp(localNode, nodesDelegate);
for (DefaultControllerNode node : nodes.values()) {
communicationAdminService.addNode(node);
}
log.info("Started");
}
@Deactivate
public void deactivate() {
listenLoop.shutdown();
for (CommLoop loop : commLoops) {
loop.shutdown();
}
log.info("Stopped");
}
// Loads the cluster definition file
/**
* Loads the cluster definition file.
*/
private void loadClusterDefinition() {
// ClusterDefinitionStore cds = new ClusterDefinitionStore("../config/cluster.json");
// try {
// Set<DefaultControllerNode> storedNodes = cds.read();
// for (DefaultControllerNode node : storedNodes) {
// nodes.put(node.id(), node);
// }
// } catch (IOException e) {
// log.error("Unable to read cluster definitions", e);
// }
// Establishes the controller's own identity.
IpPrefix ip = valueOf(System.getProperty("onos.ip", "127.0.1.1"));
self = nodes.get(new NodeId(ip.toString()));
// As a fall-back, let's make sure we at least know who we are.
if (self == null) {
self = new DefaultControllerNode(new NodeId(ip.toString()), ip);
nodes.put(self.id(), self);
}
}
// Kicks off the IO loops.
private void startCommunications() {
for (int i = 0; i < WORKERS; i++) {
ClusterDefinitionStore cds = new ClusterDefinitionStore("../config/cluster.json");
try {
CommLoop loop = new CommLoop();
commLoops.add(loop);
commExecutors.execute(loop);
} catch (IOException e) {
log.warn("Unable to start comm IO loop", e);
}
}
// Wait for the IO loops to start
for (CommLoop loop : commLoops) {
if (!loop.awaitStart(START_TIMEOUT)) {
log.warn("Comm loop did not start on-time; moving on...");
}
}
}
// Starts listening for connections from peer cluster members.
private void startListening() {
try {
listenLoop = new ListenLoop(self.ip(), self.tcpPort());
listenExecutor.execute(listenLoop);
if (!listenLoop.awaitStart(START_TIMEOUT)) {
log.warn("Listen loop did not start on-time; moving on...");
Set<DefaultControllerNode> storedNodes = cds.read();
for (DefaultControllerNode node : storedNodes) {
nodes.put(node.id(), node);
}
} catch (IOException e) {
log.error("Unable to listen for cluster connections", e);
log.error("Unable to read cluster definitions", e);
}
}
/**
* Initiates open connection request and registers the pending socket
* channel with the given IO loop.
*
* @param loop loop with which the channel should be registered
* @throws java.io.IOException if the socket could not be open or connected
* Determines who the local controller node is.
*/
private void openConnection(DefaultControllerNode node, CommLoop loop) throws IOException {
SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort());
SocketChannel ch = SocketChannel.open();
nodesByChannel.put(ch, node);
ch.configureBlocking(false);
ch.connect(sa);
loop.connectStream(ch);
}
private void establishSelfIdentity() {
// Establishes the controller's own identity.
IpPrefix ip = valueOf(System.getProperty("onos.ip", "127.0.1.1"));
localNode = nodes.get(new NodeId(ip.toString()));
// Attempts to connect to any nodes that do not have an associated connection.
private void startInitiating() {
timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY, CONNECTION_CUSTODIAN_FREQUENCY);
// As a fall-back, let's make sure we at least know who we are.
if (localNode == null) {
localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
nodes.put(localNode.id(), localNode);
}
states.put(localNode.id(), State.ACTIVE);
}
@Override
public ControllerNode getLocalNode() {
return self;
return localNode;
}
@Override
......@@ -213,150 +121,48 @@ public class DistributedClusterStore
public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
nodes.put(nodeId, node);
communicationAdminService.addNode(node);
return node;
}
@Override
public void removeNode(NodeId nodeId) {
nodes.remove(nodeId);
streams.remove(nodeId);
}
// Listens and accepts inbound connections from other cluster nodes.
private class ListenLoop extends AcceptorLoop {
ListenLoop(IpPrefix ip, int tcpPort) throws IOException {
super(SELECT_TIMEOUT, new InetSocketAddress(getByAddress(ip.toOctets()), tcpPort));
}
@Override
protected void acceptConnection(ServerSocketChannel channel) throws IOException {
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
Socket so = sc.socket();
so.setTcpNoDelay(SO_NO_DELAY);
so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
findLeastUtilizedLoop().acceptStream(sc);
}
}
private class CommLoop extends IOLoop<TLVMessage, TLVMessageStream> {
CommLoop() throws IOException {
super(SELECT_TIMEOUT);
}
@Override
protected TLVMessageStream createStream(ByteChannel byteChannel) {
return new TLVMessageStream(this, byteChannel, COMM_BUFFER_SIZE, COMM_IDLE_TIME);
}
@Override
protected void processMessages(List<TLVMessage> messages, MessageStream<TLVMessage> stream) {
TLVMessageStream tlvStream = (TLVMessageStream) stream;
for (TLVMessage message : messages) {
// TODO: add type-based dispatching here...
log.info("Got message {}", message.type());
// FIXME: hack to get going
if (message.type() == HELLO_MSG) {
processHello(message, tlvStream);
}
}
if (nodeId.equals(localNode.id())) {
// We are being ejected from the cluster, so remove all other nodes.
communicationAdminService.clearAllNodesAndStreams();
nodes.clear();
nodes.put(localNode.id(), localNode);
} else {
// Remove the other node.
DefaultControllerNode node = nodes.remove(nodeId);
if (node != null) {
communicationAdminService.removeNode(node);
}
@Override
public TLVMessageStream acceptStream(SocketChannel channel) {
TLVMessageStream stream = super.acceptStream(channel);
try {
InetSocketAddress sa = (InetSocketAddress) channel.getRemoteAddress();
log.info("Accepted a new connection from node {}", IpPrefix.valueOf(sa.getAddress().getAddress()));
stream.write(createHello(self));
} catch (IOException e) {
log.warn("Unable to accept connection from an unknown end-point", e);
}
return stream;
}
// Entity to handle back calls from the connection manager.
private class InnerNodesDelegate implements ClusterNodesDelegate {
@Override
public TLVMessageStream connectStream(SocketChannel channel) {
TLVMessageStream stream = super.connectStream(channel);
DefaultControllerNode node = nodesByChannel.get(channel);
if (node != null) {
log.info("Opened connection to node {}", node.id());
nodesByChannel.remove(channel);
public DefaultControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort) {
DefaultControllerNode node = nodes.get(nodeId);
if (node == null) {
node = (DefaultControllerNode) addNode(nodeId, ip, tcpPort);
}
return stream;
states.put(nodeId, State.ACTIVE);
return node;
}
@Override
protected void connect(SelectionKey key) {
super.connect(key);
TLVMessageStream stream = (TLVMessageStream) key.attachment();
send(stream, createHello(self));
}
public void nodeVanished(NodeId nodeId) {
states.put(nodeId, State.INACTIVE);
}
// FIXME: pure hack for now
private void processHello(TLVMessage message, TLVMessageStream stream) {
String data = new String(message.data());
log.info("Processing hello with data [{}]", data);
String[] fields = new String(data).split(":");
DefaultControllerNode node = new DefaultControllerNode(new NodeId(fields[0]),
IpPrefix.valueOf(fields[1]),
Integer.parseInt(fields[2]));
stream.setNode(node);
nodes.put(node.id(), node);
streams.put(node.id(), stream);
}
// Sends message to the specified stream.
private void send(TLVMessageStream stream, TLVMessage message) {
try {
stream.write(message);
} catch (IOException e) {
log.warn("Unable to send message to {}", stream.node().id());
}
}
private TLVMessage createHello(DefaultControllerNode self) {
return new TLVMessage(HELLO_MSG, (self.id() + ":" + self.ip() + ":" + self.tcpPort()).getBytes());
}
// Sweeps through all controller nodes and attempts to open connection to
// those that presently do not have one.
private class ConnectionCustodian extends TimerTask {
@Override
public void run() {
for (DefaultControllerNode node : nodes.values()) {
if (node != self && !streams.containsKey(node.id())) {
try {
openConnection(node, findLeastUtilizedLoop());
} catch (IOException e) {
log.warn("Unable to connect", e);
}
}
}
public void nodeRemoved(NodeId nodeId) {
removeNode(nodeId);
}
}
// Finds the least utilities IO loop.
private CommLoop findLeastUtilizedLoop() {
CommLoop leastUtilized = null;
int minCount = Integer.MAX_VALUE;
for (CommLoop loop : commLoops) {
int count = loop.streamCount();
if (count == 0) {
return loop;
}
if (count < minCount) {
leastUtilized = loop;
minCount = count;
}
}
return leastUtilized;
}
}
......
package org.onlab.onos.store.cluster.impl;
import de.javakaffee.kryoserializers.URISerializer;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultDevice;
import org.onlab.onos.net.DefaultLink;
import org.onlab.onos.net.DefaultPort;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Element;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.EchoMessage;
import org.onlab.onos.store.cluster.messaging.LeavingMemberMessage;
import org.onlab.onos.store.cluster.messaging.HelloMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.cluster.messaging.NewMemberMessage;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.onlab.onos.store.serializers.ConnectPointSerializer;
import org.onlab.onos.store.serializers.DefaultLinkSerializer;
import org.onlab.onos.store.serializers.DefaultPortSerializer;
import org.onlab.onos.store.serializers.DeviceIdSerializer;
import org.onlab.onos.store.serializers.IpPrefixSerializer;
import org.onlab.onos.store.serializers.LinkKeySerializer;
import org.onlab.onos.store.serializers.NodeIdSerializer;
import org.onlab.onos.store.serializers.PortNumberSerializer;
import org.onlab.onos.store.serializers.ProviderIdSerializer;
import org.onlab.packet.IpPrefix;
import org.onlab.util.KryoPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import static com.google.common.base.Preconditions.checkState;
/**
* Factory for parsing messages sent between cluster members.
*/
@Component(immediate = true)
@Service
public class MessageSerializer implements SerializationService {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final int METADATA_LENGTH = 12; // 8 + 4
private static final int LENGTH_OFFSET = 8;
private static final long MARKER = 0xfeedcafebeaddeadL;
private KryoPool serializerPool;
@Activate
public void activate() {
setupKryoPool();
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
/**
* Sets up the common serialzers pool.
*/
protected void setupKryoPool() {
// FIXME Slice out types used in common to separate pool/namespace.
serializerPool = KryoPool.newBuilder()
.register(ArrayList.class,
HashMap.class,
ControllerNode.State.class,
Device.Type.class,
DefaultControllerNode.class,
DefaultDevice.class,
MastershipRole.class,
Port.class,
Element.class,
Link.Type.class,
MessageSubject.class,
HelloMessage.class,
NewMemberMessage.class,
LeavingMemberMessage.class,
EchoMessage.class
)
.register(IpPrefix.class, new IpPrefixSerializer())
.register(URI.class, new URISerializer())
.register(NodeId.class, new NodeIdSerializer())
.register(ProviderId.class, new ProviderIdSerializer())
.register(DeviceId.class, new DeviceIdSerializer())
.register(PortNumber.class, new PortNumberSerializer())
.register(DefaultPort.class, new DefaultPortSerializer())
.register(LinkKey.class, new LinkKeySerializer())
.register(ConnectPoint.class, new ConnectPointSerializer())
.register(DefaultLink.class, new DefaultLinkSerializer())
.build()
.populate(1);
}
@Override
public ClusterMessage decode(ByteBuffer buffer) {
try {
// Do we have enough bytes to read the header? If not, bail.
if (buffer.remaining() < METADATA_LENGTH) {
return null;
}
// Peek at the length and if we have enough to read the entire message
// go ahead, otherwise bail.
int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
if (buffer.remaining() < length) {
return null;
}
// At this point, we have enough data to read a complete message.
long marker = buffer.getLong();
checkState(marker == MARKER, "Incorrect message marker");
length = buffer.getInt();
// TODO: sanity checking for length
byte[] data = new byte[length - METADATA_LENGTH];
buffer.get(data);
return (ClusterMessage) serializerPool.deserialize(data);
} catch (Exception e) {
// TODO: recover from exceptions by forwarding stream to next marker
log.warn("Unable to decode message due to: " + e);
}
return null;
}
@Override
public void encode(ClusterMessage message, ByteBuffer buffer) {
try {
byte[] data = serializerPool.serialize(message);
buffer.putLong(MARKER);
buffer.putInt(data.length + METADATA_LENGTH);
buffer.put(data);
} catch (Exception e) {
// TODO: recover from exceptions by forwarding stream to next marker
log.warn("Unable to encode message due to: " + e);
}
}
}
package org.onlab.onos.store.cluster.impl;
import org.onlab.nio.AbstractMessage;
import java.util.Objects;
import static com.google.common.base.MoreObjects.toStringHelper;
/**
* Base message for cluster-wide communications using TLVs.
*/
public class TLVMessage extends AbstractMessage {
private final int type;
private final byte[] data;
/**
* Creates an immutable TLV message.
*
* @param type message type
* @param data message data bytes
*/
public TLVMessage(int type, byte[] data) {
this.length = data.length + TLVMessageStream.METADATA_LENGTH;
this.type = type;
this.data = data;
}
/**
* Returns the message type indicator.
*
* @return message type
*/
public int type() {
return type;
}
/**
* Returns the data bytes.
*
* @return message data
*/
public byte[] data() {
return data;
}
@Override
public int hashCode() {
return Objects.hash(type, data);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
final TLVMessage other = (TLVMessage) obj;
return Objects.equals(this.type, other.type) &&
Objects.equals(this.data, other.data);
}
@Override
public String toString() {
return toStringHelper(this).add("type", type).add("length", length).toString();
}
}
/**
* Distributed cluster store and messaging subsystem implementation.
*/
package org.onlab.onos.store.cluster.impl;
\ No newline at end of file
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
import java.util.Set;
/**
* Service for assisting communications between controller cluster nodes.
*/
public interface ClusterCommunicationService {
/**
* Sends a message to all controller nodes.
*
* @param message message to send
* @return true if the message was sent sucessfully to all nodes; false
* if there is no stream or if there was an error for some node
*/
boolean send(ClusterMessage message);
/**
* Sends a message to the specified controller node.
*
* @param message message to send
* @param toNodeId node identifier
* @return true if the message was sent sucessfully; false if there is
* no stream or if there was an error
*/
boolean send(ClusterMessage message, NodeId toNodeId);
/**
* Adds a new subscriber for the specified message subject.
*
* @param subject message subject
* @param subscriber message subscriber
*/
void addSubscriber(MessageSubject subject, MessageSubscriber subscriber);
/**
* Removes the specified subscriber from the given message subject.
*
* @param subject message subject
* @param subscriber message subscriber
*/
void removeSubscriber(MessageSubject subject, MessageSubscriber subscriber);
/**
* Returns the set of subscribers for the specified message subject.
*
* @param subject message subject
* @return set of message subscribers
*/
Set<MessageSubscriber> getSubscribers(MessageSubject subject);
}
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
import org.onlab.packet.IpPrefix;
/**
* Base for cluster membership messages.
*/
public abstract class ClusterMembershipMessage extends ClusterMessage {
private NodeId nodeId;
private IpPrefix ipAddress;
private int tcpPort;
// For serialization
protected ClusterMembershipMessage() {
super(MessageSubject.HELLO);
nodeId = null;
ipAddress = null;
tcpPort = 0;
}
/**
* Creates a new membership message for the specified end-point data.
*
* @param subject message subject
* @param nodeId sending node identification
* @param ipAddress sending node IP address
* @param tcpPort sending node TCP port
*/
protected ClusterMembershipMessage(MessageSubject subject, NodeId nodeId,
IpPrefix ipAddress, int tcpPort) {
super(subject);
this.nodeId = nodeId;
this.ipAddress = ipAddress;
this.tcpPort = tcpPort;
}
/**
* Returns the sending node identifer.
*
* @return node identifier
*/
public NodeId nodeId() {
return nodeId;
}
/**
* Returns the sending node IP address.
*
* @return node IP address
*/
public IpPrefix ipAddress() {
return ipAddress;
}
/**
* Returns the sending node TCP listen port.
*
* @return TCP listen port
*/
public int tcpPort() {
return tcpPort;
}
}
package org.onlab.onos.store.cluster.messaging;
import org.onlab.nio.AbstractMessage;
import static com.google.common.base.MoreObjects.toStringHelper;
/**
* Base message for cluster-wide communications.
*/
public abstract class ClusterMessage extends AbstractMessage {
private final MessageSubject subject;
/**
* Creates a cluster message.
*
* @param subject message subject
*/
protected ClusterMessage(MessageSubject subject) {
this.subject = subject;
}
/**
* Returns the message subject indicator.
*
* @return message subject
*/
public MessageSubject subject() {
return subject;
}
@Override
public String toString() {
return toStringHelper(this).add("subject", subject).add("length", length).toString();
}
}
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
/**l
* Echo heart-beat message that nodes send to each other.
*/
public class EchoMessage extends ClusterMessage {
private NodeId nodeId;
// For serialization
private EchoMessage() {
super(MessageSubject.HELLO);
nodeId = null;
}
/**
* Creates a new heart-beat echo message.
*
* @param nodeId sending node identification
*/
public EchoMessage(NodeId nodeId) {
super(MessageSubject.HELLO);
nodeId = nodeId;
}
/**
* Returns the sending node identifer.
*
* @return node identifier
*/
public NodeId nodeId() {
return nodeId;
}
}
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
import org.onlab.packet.IpPrefix;
/**
* Hello message that nodes use to greet each other.
*/
public class HelloMessage extends ClusterMembershipMessage {
// For serialization
private HelloMessage() {
}
/**
* Creates a new hello message for the specified end-point data.
*
* @param nodeId sending node identification
* @param ipAddress sending node IP address
* @param tcpPort sending node TCP port
*/
public HelloMessage(NodeId nodeId, IpPrefix ipAddress, int tcpPort) {
super(MessageSubject.HELLO, nodeId, ipAddress, tcpPort);
}
}
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
/**
* Announcement message that nodes use to gossip about team departures.
*/
public class LeavingMemberMessage extends ClusterMembershipMessage {
// For serialization
private LeavingMemberMessage() {
super();
}
/**
* Creates a new goodbye message.
*
* @param nodeId sending node identification
*/
public LeavingMemberMessage(NodeId nodeId) {
super(MessageSubject.LEAVING_MEMBER, nodeId, null, 0);
}
}
package org.onlab.onos.store.cluster.messaging;
/**
* Representation of a message subject.
*/
public enum MessageSubject {
/** Represents a first greeting message. */
HELLO,
/** Signifies announcement about new member. */
NEW_MEMBER,
/** Signifies announcement about leaving member. */
LEAVING_MEMBER,
/** Signifies a heart-beat message. */
ECHO
}
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
/**
* Represents a message consumer.
*/
public interface MessageSubscriber {
/**
* Receives the specified cluster message.
*
* @param message message to be received
* @param fromNodeId node from which the message was received
*/
void receive(ClusterMessage message, NodeId fromNodeId);
}
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
import org.onlab.packet.IpPrefix;
/**
* Announcement message that nodes use to gossip about new arrivals.
*/
public class NewMemberMessage extends ClusterMembershipMessage {
// For serialization
private NewMemberMessage() {
}
/**
* Creates a new gossip message for the specified end-point data.
*
* @param nodeId sending node identification
* @param ipAddress sending node IP address
* @param tcpPort sending node TCP port
*/
public NewMemberMessage(NodeId nodeId, IpPrefix ipAddress, int tcpPort) {
super(MessageSubject.NEW_MEMBER, nodeId, ipAddress, tcpPort);
}
}
package org.onlab.onos.store.cluster.messaging;
import java.nio.ByteBuffer;
/**
* Service for encoding &amp; decoding intra-cluster messages.
*/
public interface SerializationService {
/**
* Decodes the specified byte buffer to obtain the message within.
*
* @param buffer byte buffer with message(s)
* @return parsed message
*/
ClusterMessage decode(ByteBuffer buffer);
/**
* Encodes the specified message into the given byte buffer.
*
* @param message message to be encoded
* @param buffer byte buffer to receive the message data
*/
void encode(ClusterMessage message, ByteBuffer buffer);
}
/**
* Cluster messaging APIs for the use by the various distributed stores.
*/
package org.onlab.onos.store.cluster.messaging;
\ No newline at end of file
package org.onlab.onos.store.cluster.impl;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.packet.IpPrefix;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Tests of the cluster communication manager.
*/
public class ClusterCommunicationManagerTest {
private static final NodeId N1 = new NodeId("n1");
private static final NodeId N2 = new NodeId("n2");
private static final int P1 = 9881;
private static final int P2 = 9882;
private static final IpPrefix IP = IpPrefix.valueOf("127.0.0.1");
private ClusterCommunicationManager ccm1;
private ClusterCommunicationManager ccm2;
private TestDelegate cnd1 = new TestDelegate();
private TestDelegate cnd2 = new TestDelegate();
private DefaultControllerNode node1 = new DefaultControllerNode(N1, IP, P1);
private DefaultControllerNode node2 = new DefaultControllerNode(N2, IP, P2);
@Before
public void setUp() {
MessageSerializer messageSerializer = new MessageSerializer();
messageSerializer.activate();
ccm1 = new ClusterCommunicationManager();
ccm1.serializationService = messageSerializer;
ccm1.activate();
ccm2 = new ClusterCommunicationManager();
ccm2.serializationService = messageSerializer;
ccm2.activate();
ccm1.startUp(node1, cnd1);
ccm2.startUp(node2, cnd2);
}
@After
public void tearDown() {
ccm1.deactivate();
ccm2.deactivate();
}
@Test
public void connect() throws Exception {
cnd1.latch = new CountDownLatch(1);
cnd2.latch = new CountDownLatch(1);
ccm1.addNode(node2);
validateDelegateEvent(cnd1, Op.DETECTED, node2.id());
validateDelegateEvent(cnd2, Op.DETECTED, node1.id());
}
@Test
public void disconnect() throws Exception {
cnd1.latch = new CountDownLatch(1);
cnd2.latch = new CountDownLatch(1);
ccm1.addNode(node2);
validateDelegateEvent(cnd1, Op.DETECTED, node2.id());
validateDelegateEvent(cnd2, Op.DETECTED, node1.id());
cnd1.latch = new CountDownLatch(1);
cnd2.latch = new CountDownLatch(1);
ccm1.deactivate();
//
// validateDelegateEvent(cnd2, Op.VANISHED, node1.id());
}
private void validateDelegateEvent(TestDelegate delegate, Op op, NodeId nodeId)
throws InterruptedException {
assertTrue("did not connect in time", delegate.latch.await(2500, TimeUnit.MILLISECONDS));
assertEquals("incorrect event", op, delegate.op);
assertEquals("incorrect event node", nodeId, delegate.nodeId);
}
enum Op { DETECTED, VANISHED, REMOVED };
private class TestDelegate implements ClusterNodesDelegate {
Op op;
CountDownLatch latch;
NodeId nodeId;
@Override
public DefaultControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort) {
latch(nodeId, Op.DETECTED);
return new DefaultControllerNode(nodeId, ip, tcpPort);
}
@Override
public void nodeVanished(NodeId nodeId) {
latch(nodeId, Op.VANISHED);
}
@Override
public void nodeRemoved(NodeId nodeId) {
latch(nodeId, Op.REMOVED);
}
private void latch(NodeId nodeId, Op op) {
this.op = op;
this.nodeId = nodeId;
latch.countDown();
}
}
}
\ No newline at end of file
......@@ -49,6 +49,7 @@ public class DistributedClusterStore
private final MembershipListener listener = new InternalMembershipListener();
private final Map<NodeId, State> states = new ConcurrentHashMap<>();
@Override
@Activate
public void activate() {
super.activate();
......@@ -56,7 +57,7 @@ public class DistributedClusterStore
rawNodes = theInstance.getMap("nodes");
OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader
= new OptionalCacheLoader<>(storeService, rawNodes);
= new OptionalCacheLoader<>(kryoSerializationService, rawNodes);
nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
rawNodes.addEntryListener(new RemoteCacheEventHandler<>(nodes), true);
......
......@@ -52,7 +52,7 @@ implements MastershipStore {
rawMasters = theInstance.getMap("masters");
OptionalCacheLoader<DeviceId, NodeId> nodeLoader
= new OptionalCacheLoader<>(storeService, rawMasters);
= new OptionalCacheLoader<>(kryoSerializationService, rawMasters);
masters = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
rawMasters.addEntryListener(new RemoteMasterShipEventHandler(masters), true);
......
......@@ -15,6 +15,7 @@ import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.event.Event;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.StoreDelegate;
import org.onlab.onos.store.serializers.KryoSerializationService;
import org.slf4j.Logger;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -32,6 +33,9 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StoreService storeService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected KryoSerializationService kryoSerializationService;
protected HazelcastInstance theInstance;
@Activate
......@@ -46,7 +50,7 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel
* @return serialized object
*/
protected byte[] serialize(Object obj) {
return storeService.serialize(obj);
return kryoSerializationService.serialize(obj);
}
/**
......@@ -57,7 +61,7 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel
* @return deserialized object
*/
protected <T> T deserialize(byte[] bytes) {
return storeService.deserialize(bytes);
return kryoSerializationService.deserialize(bytes);
}
......
......@@ -2,6 +2,8 @@ package org.onlab.onos.store.common;
import static com.google.common.base.Preconditions.checkNotNull;
import org.onlab.onos.store.serializers.KryoSerializationService;
import com.google.common.base.Optional;
import com.google.common.cache.CacheLoader;
import com.hazelcast.core.IMap;
......@@ -16,28 +18,28 @@ import com.hazelcast.core.IMap;
public final class OptionalCacheLoader<K, V> extends
CacheLoader<K, Optional<V>> {
private final StoreService storeService;
private final KryoSerializationService kryoSerializationService;
private IMap<byte[], byte[]> rawMap;
/**
* Constructor.
*
* @param storeService to use for serialization
* @param kryoSerializationService to use for serialization
* @param rawMap underlying IMap
*/
public OptionalCacheLoader(StoreService storeService, IMap<byte[], byte[]> rawMap) {
this.storeService = checkNotNull(storeService);
public OptionalCacheLoader(KryoSerializationService kryoSerializationService, IMap<byte[], byte[]> rawMap) {
this.kryoSerializationService = checkNotNull(kryoSerializationService);
this.rawMap = checkNotNull(rawMap);
}
@Override
public Optional<V> load(K key) throws Exception {
byte[] keyBytes = storeService.serialize(key);
byte[] keyBytes = kryoSerializationService.serialize(key);
byte[] valBytes = rawMap.get(keyBytes);
if (valBytes == null) {
return Optional.absent();
}
V dev = storeService.deserialize(valBytes);
V dev = kryoSerializationService.deserialize(valBytes);
return Optional.of(dev);
}
}
......
......@@ -5,46 +5,14 @@ import com.hazelcast.config.FileSystemXmlConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import de.javakaffee.kryoserializers.URISerializer;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultDevice;
import org.onlab.onos.net.DefaultLink;
import org.onlab.onos.net.DefaultPort;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Element;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.serializers.ConnectPointSerializer;
import org.onlab.onos.store.serializers.DefaultLinkSerializer;
import org.onlab.onos.store.serializers.DefaultPortSerializer;
import org.onlab.onos.store.serializers.DeviceIdSerializer;
import org.onlab.onos.store.serializers.IpPrefixSerializer;
import org.onlab.onos.store.serializers.LinkKeySerializer;
import org.onlab.onos.store.serializers.NodeIdSerializer;
import org.onlab.onos.store.serializers.PortNumberSerializer;
import org.onlab.onos.store.serializers.ProviderIdSerializer;
import org.onlab.packet.IpPrefix;
import org.onlab.util.KryoPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
/**
* Auxiliary bootstrap of distributed store.
......@@ -58,55 +26,18 @@ public class StoreManager implements StoreService {
private final Logger log = LoggerFactory.getLogger(getClass());
protected HazelcastInstance instance;
private KryoPool serializerPool;
@Activate
public void activate() {
try {
Config config = new FileSystemXmlConfig(HAZELCAST_XML_FILE);
instance = Hazelcast.newHazelcastInstance(config);
setupKryoPool();
log.info("Started");
} catch (FileNotFoundException e) {
log.error("Unable to configure Hazelcast", e);
}
}
/**
* Sets up the common serialzers pool.
*/
protected void setupKryoPool() {
// FIXME Slice out types used in common to separate pool/namespace.
serializerPool = KryoPool.newBuilder()
.register(ArrayList.class,
HashMap.class,
ControllerNode.State.class,
Device.Type.class,
DefaultControllerNode.class,
DefaultDevice.class,
MastershipRole.class,
Port.class,
Element.class,
Link.Type.class
)
.register(IpPrefix.class, new IpPrefixSerializer())
.register(URI.class, new URISerializer())
.register(NodeId.class, new NodeIdSerializer())
.register(ProviderId.class, new ProviderIdSerializer())
.register(DeviceId.class, new DeviceIdSerializer())
.register(PortNumber.class, new PortNumberSerializer())
.register(DefaultPort.class, new DefaultPortSerializer())
.register(LinkKey.class, new LinkKeySerializer())
.register(ConnectPoint.class, new ConnectPointSerializer())
.register(DefaultLink.class, new DefaultLinkSerializer())
.build()
.populate(10);
}
@Deactivate
public void deactivate() {
instance.shutdown();
......@@ -118,18 +49,4 @@ public class StoreManager implements StoreService {
return instance;
}
@Override
public byte[] serialize(final Object obj) {
return serializerPool.serialize(obj);
}
@Override
public <T> T deserialize(final byte[] bytes) {
if (bytes == null) {
return null;
}
return serializerPool.deserialize(bytes);
}
}
......
......@@ -15,22 +15,4 @@ public interface StoreService {
*/
HazelcastInstance getHazelcastInstance();
/**
* Serializes the specified object into bytes using one of the
* pre-registered serializers.
*
* @param obj object to be serialized
* @return serialized bytes
*/
public byte[] serialize(final Object obj);
/**
* Deserializes the specified bytes into an object using one of the
* pre-registered serializers.
*
* @param bytes bytes to be deserialized
* @return deserialized object
*/
public <T> T deserialize(final byte[] bytes);
}
......
......@@ -46,9 +46,8 @@ public class TestStoreManager extends StoreManager {
this.instance = instance;
}
// Hazelcast setup removed from original code.
@Override
public void activate() {
setupKryoPool();
// Hazelcast setup removed from original code.
}
}
......
......@@ -87,7 +87,7 @@ public class DistributedDeviceStore
// TODO decide on Map name scheme to avoid collision
rawDevices = theInstance.getMap("devices");
final OptionalCacheLoader<DeviceId, DefaultDevice> deviceLoader
= new OptionalCacheLoader<>(storeService, rawDevices);
= new OptionalCacheLoader<>(kryoSerializationService, rawDevices);
devices = new AbsentInvalidatingLoadingCache<>(newBuilder().build(deviceLoader));
// refresh/populate cache based on notification from other instance
devicesListener = rawDevices.addEntryListener(new RemoteDeviceEventHandler(devices), includeValue);
......@@ -97,7 +97,7 @@ public class DistributedDeviceStore
rawDevicePorts = theInstance.getMap("devicePorts");
final OptionalCacheLoader<DeviceId, Map<PortNumber, Port>> devicePortLoader
= new OptionalCacheLoader<>(storeService, rawDevicePorts);
= new OptionalCacheLoader<>(kryoSerializationService, rawDevicePorts);
devicePorts = new AbsentInvalidatingLoadingCache<>(newBuilder().build(devicePortLoader));
// refresh/populate cache based on notification from other instance
portsListener = rawDevicePorts.addEntryListener(new RemotePortEventHandler(devicePorts), includeValue);
......
......@@ -70,7 +70,7 @@ public class DistributedLinkStore
// TODO decide on Map name scheme to avoid collision
rawLinks = theInstance.getMap("links");
final OptionalCacheLoader<LinkKey, DefaultLink> linkLoader
= new OptionalCacheLoader<>(storeService, rawLinks);
= new OptionalCacheLoader<>(kryoSerializationService, rawLinks);
links = new AbsentInvalidatingLoadingCache<>(newBuilder().build(linkLoader));
// refresh/populate cache based on notification from other instance
linksListener = rawLinks.addEntryListener(new RemoteLinkEventHandler(links), includeValue);
......
......@@ -36,6 +36,8 @@ import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.StoreManager;
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.common.TestStoreManager;
import org.onlab.onos.store.serializers.KryoSerializationManager;
import org.onlab.onos.store.serializers.KryoSerializationService;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
......@@ -61,6 +63,7 @@ public class DistributedDeviceStoreTest {
private static final PortNumber P3 = PortNumber.portNumber(3);
private DistributedDeviceStore deviceStore;
private KryoSerializationManager serializationMgr;
private StoreManager storeManager;
......@@ -82,7 +85,10 @@ public class DistributedDeviceStoreTest {
storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
storeManager.activate();
deviceStore = new TestDistributedDeviceStore(storeManager);
serializationMgr = new KryoSerializationManager();
serializationMgr.activate();
deviceStore = new TestDistributedDeviceStore(storeManager, serializationMgr);
deviceStore.activate();
}
......@@ -90,6 +96,8 @@ public class DistributedDeviceStoreTest {
public void tearDown() throws Exception {
deviceStore.deactivate();
serializationMgr.deactivate();
storeManager.deactivate();
}
......@@ -384,8 +392,10 @@ public class DistributedDeviceStoreTest {
}
private class TestDistributedDeviceStore extends DistributedDeviceStore {
public TestDistributedDeviceStore(StoreService storeService) {
public TestDistributedDeviceStore(StoreService storeService,
KryoSerializationService kryoSerializationService) {
this.storeService = storeService;
this.kryoSerializationService = kryoSerializationService;
}
}
}
......
......@@ -30,6 +30,8 @@ import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.StoreManager;
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.common.TestStoreManager;
import org.onlab.onos.store.serializers.KryoSerializationManager;
import org.onlab.onos.store.serializers.KryoSerializationService;
import com.google.common.collect.Iterables;
import com.hazelcast.config.Config;
......@@ -49,6 +51,7 @@ public class DistributedLinkStoreTest {
private static final PortNumber P3 = PortNumber.portNumber(3);
private StoreManager storeManager;
private KryoSerializationManager serializationMgr;
private DistributedLinkStore linkStore;
......@@ -68,13 +71,17 @@ public class DistributedLinkStoreTest {
storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
storeManager.activate();
linkStore = new TestDistributedLinkStore(storeManager);
serializationMgr = new KryoSerializationManager();
serializationMgr.activate();
linkStore = new TestDistributedLinkStore(storeManager, serializationMgr);
linkStore.activate();
}
@After
public void tearDown() throws Exception {
linkStore.deactivate();
serializationMgr.deactivate();
storeManager.deactivate();
}
......@@ -354,8 +361,10 @@ public class DistributedLinkStoreTest {
class TestDistributedLinkStore extends DistributedLinkStore {
TestDistributedLinkStore(StoreService storeService) {
TestDistributedLinkStore(StoreService storeService,
KryoSerializationService kryoSerializationService) {
this.storeService = storeService;
this.kryoSerializationService = kryoSerializationService;
}
}
}
......
package org.onlab.onos.store.serializers;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultDevice;
import org.onlab.onos.net.DefaultLink;
import org.onlab.onos.net.DefaultPort;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Element;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.packet.IpPrefix;
import org.onlab.util.KryoPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import de.javakaffee.kryoserializers.URISerializer;
/**
* Serialization service using Kryo.
*/
@Component(immediate = true)
@Service
public class KryoSerializationManager implements KryoSerializationService {
private final Logger log = LoggerFactory.getLogger(getClass());
private KryoPool serializerPool;
@Activate
public void activate() {
setupKryoPool();
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
/**
* Sets up the common serialzers pool.
*/
protected void setupKryoPool() {
// FIXME Slice out types used in common to separate pool/namespace.
serializerPool = KryoPool.newBuilder()
.register(ArrayList.class,
HashMap.class,
ControllerNode.State.class,
Device.Type.class,
DefaultControllerNode.class,
DefaultDevice.class,
MastershipRole.class,
Port.class,
Element.class,
Link.Type.class
)
.register(IpPrefix.class, new IpPrefixSerializer())
.register(URI.class, new URISerializer())
.register(NodeId.class, new NodeIdSerializer())
.register(ProviderId.class, new ProviderIdSerializer())
.register(DeviceId.class, new DeviceIdSerializer())
.register(PortNumber.class, new PortNumberSerializer())
.register(DefaultPort.class, new DefaultPortSerializer())
.register(LinkKey.class, new LinkKeySerializer())
.register(ConnectPoint.class, new ConnectPointSerializer())
.register(DefaultLink.class, new DefaultLinkSerializer())
.build()
.populate(1);
}
@Override
public byte[] serialize(final Object obj) {
return serializerPool.serialize(obj);
}
@Override
public <T> T deserialize(final byte[] bytes) {
if (bytes == null) {
return null;
}
return serializerPool.deserialize(bytes);
}
}
package org.onlab.onos.store.serializers;
// TODO: To be replaced with SerializationService from IOLoop activity
/**
* Service to serialize Objects into byte array.
*/
public interface KryoSerializationService {
/**
* Serializes the specified object into bytes using one of the
* pre-registered serializers.
*
* @param obj object to be serialized
* @return serialized bytes
*/
public byte[] serialize(final Object obj);
/**
* Deserializes the specified bytes into an object using one of the
* pre-registered serializers.
*
* @param bytes bytes to be deserialized
* @return deserialized object
*/
public <T> T deserialize(final byte[] bytes);
}
......@@ -105,11 +105,9 @@ public class SimpleFlowRuleStore
*/
if (flowEntries.containsEntry(did, f)) {
//synchronized (flowEntries) {
flowEntries.remove(did, f);
flowEntries.put(did, f);
flowEntriesById.remove(rule.appId(), rule);
//}
}
}
......
......@@ -19,6 +19,9 @@
<bundle>mvn:de.javakaffee/kryo-serializers/0.27</bundle>
<bundle>mvn:org.onlab.onos/onlab-nio/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.codehaus.jackson/jackson-core-asl/1.9.13</bundle>
<bundle>mvn:org.codehaus.jackson/jackson-mapper-asl/1.9.13</bundle>
</feature>
<feature name="onos-thirdparty-web" version="1.0.0"
......@@ -58,7 +61,7 @@
<bundle>mvn:org.onlab.onos/onos-core-hz-common/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-serializers/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-hz-cluster/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-dist/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-hz-net/1.0.0-SNAPSHOT</bundle>
</feature>
<feature name="onos-core-trivial" version="1.0.0"
......@@ -116,10 +119,24 @@
<bundle>mvn:org.onlab.onos/onos-app-fwd/1.0.0-SNAPSHOT</bundle>
</feature>
<feature name="onos-app-mobility" version="1.0.0"
description="ONOS sample forwarding application">
<feature>onos-api</feature>
<bundle>mvn:org.onlab.onos/onos-app-mobility/1.0.0-SNAPSHOT</bundle>
</feature>
<feature name="onos-app-foo" version="1.0.0"
description="ONOS sample playground application">
<feature>onos-api</feature>
<bundle>mvn:org.onlab.onos/onos-app-foo/1.0.0-SNAPSHOT</bundle>
</feature>
<feature name="onos-app-config" version="1.0.0"
description="ONOS network config reader">
<feature>onos-api</feature>
<bundle>mvn:org.onlab.onos/onos-app-config/1.0.0-SNAPSHOT</bundle>
</feature>
</features>
......
......@@ -98,6 +98,17 @@
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<!-- Web related -->
<dependency>
......
......@@ -93,7 +93,7 @@ public class FlowModBuilder {
OFFlowMod fm = factory.buildFlowDelete()
.setCookie(U64.of(cookie.value()))
.setBufferId(OFBufferId.NO_BUFFER)
.setActions(actions)
//.setActions(actions)
.setMatch(match)
.setFlags(Collections.singleton(OFFlowModFlags.SEND_FLOW_REM))
.setPriority(priority)
......@@ -104,6 +104,9 @@ public class FlowModBuilder {
private List<OFAction> buildActions() {
List<OFAction> acts = new LinkedList<>();
if (treatment == null) {
return acts;
}
for (Instruction i : treatment.instructions()) {
switch (i.type()) {
case DROP:
......
......@@ -31,6 +31,7 @@ import org.onlab.onos.openflow.controller.OpenFlowPacketContext;
import org.onlab.onos.openflow.controller.PacketListener;
import org.onlab.packet.ARP;
import org.onlab.packet.Ethernet;
import org.onlab.packet.IPv4;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.VlanId;
import org.slf4j.Logger;
......@@ -92,8 +93,6 @@ public class OpenFlowHostProvider extends AbstractProvider implements HostProvid
public void handlePacket(OpenFlowPacketContext pktCtx) {
Ethernet eth = pktCtx.parsed();
// Potentially a new or moved host
if (eth.getEtherType() == Ethernet.TYPE_ARP) {
VlanId vlan = VlanId.vlanId(eth.getVlanID());
ConnectPoint heardOn = new ConnectPoint(deviceId(Dpid.uri(pktCtx.dpid())),
portNumber(pktCtx.inPort()));
......@@ -107,14 +106,24 @@ public class OpenFlowHostProvider extends AbstractProvider implements HostProvid
HostLocation hloc = new HostLocation(deviceId(Dpid.uri(pktCtx.dpid())),
portNumber(pktCtx.inPort()),
System.currentTimeMillis());
HostId hid = HostId.hostId(eth.getSourceMAC(), vlan);
// Potentially a new or moved host
if (eth.getEtherType() == Ethernet.TYPE_ARP) {
ARP arp = (ARP) eth.getPayload();
Set<IpPrefix> ips = newHashSet(IpPrefix.valueOf(arp.getSenderProtocolAddress()));
HostDescription hdescr =
new DefaultHostDescription(eth.getSourceMAC(), vlan, hloc, ips);
providerService.hostDetected(hid, hdescr);
} else if (eth.getEtherType() == Ethernet.TYPE_IPV4) {
IPv4 ip = (IPv4) eth.getPayload();
Set<IpPrefix> ips = newHashSet(IpPrefix.valueOf(ip.getSourceAddress()));
HostDescription hdescr =
new DefaultHostDescription(eth.getSourceMAC(), vlan, hloc, ips);
providerService.hostDetected(hid, hdescr);
}
// TODO: Use DHCP packets as well later...
......
......@@ -106,10 +106,6 @@ public class OpenFlowPacketProvider extends AbstractProvider implements PacketPr
for (Instruction inst : packet.treatment().instructions()) {
if (inst.type().equals(Instruction.Type.OUTPUT)) {
p = portDesc(((OutputInstruction) inst).port());
if (!sw.getPorts().contains(p)) {
log.warn("Tried to write out non-existint port {}", p.getPortNo());
continue;
}
OFPacketOut po = packetOut(sw, eth, p.getPortNo());
sw.sendMsg(po);
}
......
......@@ -154,9 +154,9 @@ public class OpenFlowPacketProviderTest {
assertEquals("message sent incorrectly", 0, sw.sent.size());
//to missing port
OutboundPacket portFailPkt = outPacket(DID, TR_MISSING, eth);
provider.emit(portFailPkt);
assertEquals("extra message sent", 1, sw.sent.size());
//OutboundPacket portFailPkt = outPacket(DID, TR_MISSING, eth);
//provider.emit(portFailPkt);
//assertEquals("extra message sent", 1, sw.sent.size());
}
......
......@@ -9,5 +9,5 @@
nodes=$(env | sort | egrep "OC[0-9]+" | cut -d= -f2)
onos-package
for node in $nodes; do printf "%s: " $node; onos-install -f $node; done
for node in $nodes; do (printf "%s: %s\n" "$node" "`onos-install -f $node`")& done
for node in $nodes; do onos-wait-for-start $node; done
......
......@@ -3,5 +3,7 @@
# ONOS remote command-line client.
#-------------------------------------------------------------------------------
[ "$1" = "-w" ] && shift && onos-wait-for-start $1
[ -n "$1" ] && OCI=$1 && shift
client -h $OCI "$@"
client -h $OCI "$@" 2>/dev/null
......
#!/bin/bash
#-------------------------------------------------------------------------------
# Remotely kills the ONOS service on the specified node.
#-------------------------------------------------------------------------------
[ ! -d "$ONOS_ROOT" ] && echo "ONOS_ROOT is not defined" >&2 && exit 1
. $ONOS_ROOT/tools/build/envDefaults
ssh $ONOS_USER@${1:-$OCI} "kill -9 \$(ps -ef | grep karaf.jar | grep -v grep | cut -c10-15)"
\ No newline at end of file
# Default virtual box ONOS instances 1,2 & ONOS mininet box
export ONOS_NIC=192.168.56.*
export OC1="192.168.56.11"
export OC2="192.168.56.12"
export OCN="192.168.56.7"
package org.onlab.util;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
......@@ -8,6 +9,8 @@ import org.apache.commons.lang3.tuple.Pair;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.ByteBufferInput;
import com.esotericsoftware.kryo.io.ByteBufferOutput;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.collect.ImmutableList;
......@@ -174,6 +177,22 @@ public final class KryoPool {
}
/**
* Serializes given object to byte buffer using Kryo instance in pool.
*
* @param obj Object to serialize
* @param buffer to write to
*/
public void serialize(final Object obj, final ByteBuffer buffer) {
ByteBufferOutput out = new ByteBufferOutput(buffer);
Kryo kryo = getKryo();
try {
kryo.writeClassAndObject(out, obj);
} finally {
putKryo(kryo);
}
}
/**
* Deserializes given byte array to Object using Kryo instance in pool.
*
* @param bytes serialized bytes
......@@ -192,6 +211,24 @@ public final class KryoPool {
}
}
/**
* Deserializes given byte buffer to Object using Kryo instance in pool.
*
* @param buffer input with serialized bytes
* @param <T> deserialized Object type
* @return deserialized Object
*/
public <T> T deserialize(final ByteBuffer buffer) {
ByteBufferInput in = new ByteBufferInput(buffer);
Kryo kryo = getKryo();
try {
@SuppressWarnings("unchecked")
T obj = (T) kryo.readClassAndObject(in);
return obj;
} finally {
putKryo(kryo);
}
}
/**
* Creates a Kryo instance with {@link #registeredTypes} pre-registered.
......
......@@ -93,14 +93,9 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
*
* @param key selection key holding the pending connect operation.
*/
protected void connect(SelectionKey key) {
try {
protected void connect(SelectionKey key) throws IOException {
SocketChannel ch = (SocketChannel) key.channel();
ch.finishConnect();
} catch (IOException | IllegalStateException e) {
log.warn("Unable to complete connection", e);
}
if (key.isValid()) {
key.interestOps(SelectionKey.OP_READ);
}
......@@ -124,7 +119,11 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
// If there is a pending connect operation, complete it.
if (key.isConnectable()) {
try {
connect(key);
} catch (IOException | IllegalStateException e) {
log.warn("Unable to complete connection", e);
}
}
// If there is a read operation, slurp as much data as possible.
......
......@@ -10,6 +10,7 @@ import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -262,7 +263,7 @@ public abstract class MessageStream<M extends Message> {
try {
channel.write(outbound);
} catch (IOException e) {
if (!closed && !e.getMessage().equals("Broken pipe")) {
if (!closed && !Objects.equals(e.getMessage(), "Broken pipe")) {
log.warn("Unable to write data", e);
ioError = e;
}
......
......@@ -230,7 +230,7 @@ public class IOLoopTestClient {
}
@Override
protected void connect(SelectionKey key) {
protected void connect(SelectionKey key) throws IOException {
super.connect(key);
TestMessageStream b = (TestMessageStream) key.attachment();
Worker w = ((CustomIOLoop) b.loop()).worker;
......