HIGUCHI Yuta
Committed by Gerrit Code Review

ONOS-3323 gRPC implementation of Remote Service

- Start modelling Device related service (ONOS-3306)
- exclude machine generated code from doc

Change-Id: Idffbcd883f813de79c6f05fedc9475f308efcc31
......@@ -46,11 +46,13 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.10.1</version>
<version>2.10.3</version>
<configuration>
<show>package</show>
<excludePackageNames>@external-excludes
</excludePackageNames>
<excludePackageNames>@external-excludes</excludePackageNames>
<sourceFileExcludes>
<sourceFileExclude>**/generated-sources/**</sourceFileExclude>
</sourceFileExcludes>
<docfilessubdirs>true</docfilessubdirs>
<doctitle>ONOS Java API (1.4.0-SNAPSHOT)</doctitle>
<groups>
......
......@@ -51,8 +51,11 @@
<show>package</show>
<docfilessubdirs>true</docfilessubdirs>
<doctitle>ONOS Java API (1.4.0-SNAPSHOT)</doctitle>
<excludePackageNames>@internal-excludes
</excludePackageNames>
<excludePackageNames>@internal-excludes</excludePackageNames>
<sourceFileExcludes>
<sourceFileExclude>**/generated-sources/**</sourceFileExclude>
</sourceFileExcludes>
<groups>
<group>
<title>Network Model &amp; Services</title>
......
......@@ -36,6 +36,7 @@
<module>net</module>
<module>store</module>
<module>rpc</module>
<module>rpc-grpc</module>
</modules>
<dependencies>
......
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!--
~ Copyright 2015 Open Networking Laboratory
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<features xmlns="http://karaf.apache.org/xmlns/features/v1.2.0" name="${project.artifactId}-${project.version}">
<feature name="${project.artifactId}" version="${project.version}"
description="${project.description}">
<feature>onos-api</feature>
<bundle>mvn:com.google.protobuf/protobuf-java/3.0.0-beta-1</bundle>
<bundle>mvn:io.netty/netty-common/4.1.0.Beta6</bundle>
<bundle>mvn:io.netty/netty-buffer/4.1.0.Beta6</bundle>
<bundle>mvn:io.netty/netty-transport/4.1.0.Beta6</bundle>
<bundle>mvn:io.netty/netty-handler/4.1.0.Beta6</bundle>
<bundle>mvn:io.netty/netty-codec/4.1.0.Beta6</bundle>
<bundle>mvn:io.netty/netty-codec-http/4.1.0.Beta6</bundle>
<bundle>mvn:io.netty/netty-codec-http2/4.1.0.Beta6</bundle>
<bundle>mvn:io.netty/netty-resolver/4.1.0.Beta6</bundle>
<bundle>mvn:com.twitter/hpack/0.11.0</bundle>
<!-- TODO: Create shaded jar for these. -->
<bundle>wrap:mvn:com.google.auth/google-auth-library-credentials/0.3.0$Bundle-SymbolicName=com.google.auth.google-auth-library-credentials&amp;Bundle-Version=0.3.0</bundle>
<bundle>wrap:mvn:com.google.auth/google-auth-library-oauth2-http/0.3.0$Bundle-SymbolicName=com.google.auth.google-auth-library-oauth2-http&amp;Bundle-Version=0.3.0</bundle>
<bundle>wrap:mvn:io.grpc/grpc-all/0.9.0$Bundle-SymbolicName=io.grpc.grpc-all&amp;Bundle-Version=0.9.0&amp;Import-Package=io.netty.*;version=4.1.0.Beta6,javax.net.ssl,com.google.protobuf.nano;resolution:=optional,okio;resolution:=optional,*</bundle>
<bundle>mvn:${project.groupId}/${project.artifactId}/${project.version}</bundle>
</feature>
</features>
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2015 Open Networking Laboratory
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>onos-incubator</artifactId>
<groupId>org.onosproject</groupId>
<version>1.4.0-SNAPSHOT</version>
</parent>
<artifactId>onos-incubator-rpc-grpc</artifactId>
<packaging>bundle</packaging>
<description>ONOS inter-cluster RPC based on gRPC</description>
<url>http://onosproject.org</url>
<properties>
<onos.app.name>org.onosproject.incubator.rpc.grpc</onos.app.name>
<onos.app.requires>org.onosproject.incubator.rpc</onos.app.requires>
<!-- Note: update feature.xml when updating -->
<grpc.version>0.9.0</grpc.version>
<grpc.netty.version>4.1.0.Beta6</grpc.netty.version>
</properties>
<pluginRepositories>
<pluginRepository>
<id>protoc-plugin</id>
<url>https://dl.bintray.com/sergei-ivanov/maven/</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-api</artifactId>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-incubator-api</artifactId>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onlab-osgi</artifactId>
</dependency>
<!--
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<version>${grpc.version}</version>
</dependency>
-->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-core</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-auth</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-api</artifactId>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr.annotations</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.4.0.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<extensions>true</extensions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-scr-plugin</artifactId>
<executions>
<execution>
<id>generate-scr-srcdescriptor</id>
<goals>
<goal>scr</goal>
</goals>
</execution>
</executions>
<configuration>
<!-- avoid searching into wrong source path -->
<scanClasses>true</scanClasses>
<supportedProjectTypes>
<supportedProjectType>bundle</supportedProjectType>
<supportedProjectType>war</supportedProjectType>
</supportedProjectTypes>
</configuration>
</plugin>
<plugin>
<groupId>org.onosproject</groupId>
<artifactId>onos-maven-plugin</artifactId>
<executions>
<execution>
<id>cfg</id>
<phase>generate-resources</phase>
<goals>
<goal>cfg</goal>
</goals>
</execution>
<execution>
<id>swagger</id>
<phase>generate-sources</phase>
<goals>
<goal>swagger</goal>
</goals>
</execution>
<execution>
<id>app</id>
<phase>package</phase>
<goals>
<goal>app</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.google.protobuf.tools</groupId>
<artifactId>maven-protoc-plugin</artifactId>
<version>0.4.2</version>
<configuration>
<!-- The version of protoc must match protobuf-java. If you don't
depend on protobuf-java directly, you will be transitively depending on the
protobuf-java version that grpc depends on. -->
<protocArtifact>com.google.protobuf:protoc:3.0.0-beta-1:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.9.1</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${project.build.directory}/generated-sources/protobuf/java</source>
<source>${project.build.directory}/generated-sources/protobuf/grpc-java</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<!-- gRPC requires more recent version of netty -->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
<version>${grpc.netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>${grpc.netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>${grpc.netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>${grpc.netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>${grpc.netty.version}</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>hpack</artifactId>
<!-- 0.11.0 and later are published as a bundle -->
<version>0.11.0</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.incubator.rpc.grpc;
import java.util.Map;
import org.onosproject.net.device.DeviceProvider;
import org.onosproject.net.device.DeviceProviderRegistry;
import org.onosproject.net.device.DeviceProviderService;
import org.onosproject.net.provider.AbstractProviderRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
// gRPC Client side
/**
* Proxy object to handle DeviceProviderRegistry calls.
*
* RPC wise, this will start/stop bidirectional streaming service sessions.
*/
final class DeviceProviderRegistryClientProxy
extends AbstractProviderRegistry<DeviceProvider, DeviceProviderService>
implements DeviceProviderRegistry {
private final Logger log = LoggerFactory.getLogger(getClass());
private final Channel channel;
private final Map<DeviceProvider, DeviceProviderServiceClientProxy> pServices;
DeviceProviderRegistryClientProxy(ManagedChannel channel) {
this.channel = channel;
pServices = Maps.newIdentityHashMap();
}
@Override
protected synchronized DeviceProviderService createProviderService(DeviceProvider provider) {
// Create session
DeviceProviderServiceClientProxy pService = new DeviceProviderServiceClientProxy(provider, channel);
log.debug("Created DeviceProviderServiceClientProxy", pService);
DeviceProviderServiceClientProxy old = pServices.put(provider, pService);
if (old != null) {
// sanity check, can go away
log.warn("Duplicate registration detected for {}", provider.id());
}
return pService;
}
@Override
public synchronized void unregister(DeviceProvider provider) {
DeviceProviderServiceClientProxy pService = pServices.remove(provider);
log.debug("Unregistering DeviceProviderServiceClientProxy", pService);
super.unregister(provider);
if (pService != null) {
pService.shutdown();
}
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.incubator.rpc.grpc;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.stream.Collectors.toList;
import static org.onosproject.incubator.rpc.grpc.GrpcDeviceUtils.translate;
import static org.onosproject.net.DeviceId.deviceId;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.onosproject.grpc.Device.DeviceProviderMsg;
import org.onosproject.grpc.Device.DeviceProviderServiceMsg;
import org.onosproject.grpc.Device.IsReachableRequest;
import org.onosproject.grpc.Device.RoleChanged;
import org.onosproject.grpc.Device.TriggerProbe;
import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc;
import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpcStub;
import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
import org.onosproject.net.device.DeviceDescription;
import org.onosproject.net.device.DeviceProvider;
import org.onosproject.net.device.DeviceProviderService;
import org.onosproject.net.device.PortDescription;
import org.onosproject.net.device.PortStatistics;
import org.onosproject.net.provider.AbstractProviderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.MoreObjects;
import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
// gRPC Client side
// gRPC wise, this object represents bidirectional streaming service session
// and deals with outgoing message stream
/**
* DeviceProviderService instance associated with given DeviceProvider.
*/
final class DeviceProviderServiceClientProxy
extends AbstractProviderService<DeviceProvider>
implements DeviceProviderService {
private final Logger log = LoggerFactory.getLogger(getClass());
private final StreamObserver<DeviceProviderServiceMsg> devProvService;
private final AtomicBoolean hasShutdown = new AtomicBoolean(false);
private final Channel channel;
DeviceProviderServiceClientProxy(DeviceProvider provider, Channel channel) {
super(provider);
this.channel = channel;
DeviceProviderRegistryRpcStub stub = DeviceProviderRegistryRpcGrpc.newStub(channel);
log.debug("Calling RPC register({}) against {}", provider.id(), channel.authority());
devProvService = stub.register(new DeviceProviderClientProxy(provider));
// send initialize message
DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
builder.setRegisterProvider(builder.getRegisterProviderBuilder()
.setProviderScheme(provider.id().scheme())
.build());
devProvService.onNext(builder.build());
}
@Override
public void deviceConnected(DeviceId deviceId,
DeviceDescription deviceDescription) {
checkValidity();
DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
builder.setDeviceConnected(builder.getDeviceConnectedBuilder()
.setDeviceId(deviceId.toString())
.setDeviceDescription(translate(deviceDescription))
.build());
devProvService.onNext(builder.build());
}
@Override
public void deviceDisconnected(DeviceId deviceId) {
checkValidity();
DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
builder.setDeviceDisconnected(builder.getDeviceDisconnectedBuilder()
.setDeviceId(deviceId.toString())
.build());
devProvService.onNext(builder.build());
}
@Override
public void updatePorts(DeviceId deviceId,
List<PortDescription> portDescriptions) {
checkValidity();
DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
List<org.onosproject.grpc.Port.PortDescription> portDescs =
portDescriptions.stream()
.map(GrpcDeviceUtils::translate)
.collect(toList());
builder.setUpdatePorts(builder.getUpdatePortsBuilder()
.setDeviceId(deviceId.toString())
.addAllPortDescriptions(portDescs)
.build());
devProvService.onNext(builder.build());
}
@Override
public void portStatusChanged(DeviceId deviceId,
PortDescription portDescription) {
checkValidity();
DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
builder.setPortStatusChanged(builder.getPortStatusChangedBuilder()
.setDeviceId(deviceId.toString())
.setPortDescription(translate(portDescription))
.build());
devProvService.onNext(builder.build());
}
@Override
public void receivedRoleReply(DeviceId deviceId, MastershipRole requested,
MastershipRole response) {
checkValidity();
DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
builder.setReceivedRoleReply(builder.getReceivedRoleReplyBuilder()
.setDeviceId(deviceId.toString())
.setRequested(translate(requested))
.setResponse(translate(response))
.build());
devProvService.onNext(builder.build());
}
@Override
public void updatePortStatistics(DeviceId deviceId,
Collection<PortStatistics> portStatistics) {
checkValidity();
DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
List<org.onosproject.grpc.Port.PortStatistics> portStats =
portStatistics.stream()
.map(GrpcDeviceUtils::translate)
.collect(toList());
builder.setUpdatePortStatistics(builder.getUpdatePortStatisticsBuilder()
.setDeviceId(deviceId.toString())
.addAllPortStatistics(portStats)
.build());
devProvService.onNext(builder.build());
}
/**
* Shutdown this session.
*/
public void shutdown() {
if (hasShutdown.compareAndSet(false, true)) {
log.info("Shutting down session over {}", channel.authority());
// initiate clean shutdown from client
devProvService.onCompleted();
invalidate();
}
}
/**
* Abnormally terminate this session.
* @param t error details
*/
public void shutdown(Throwable t) {
if (hasShutdown.compareAndSet(false, true)) {
log.error("Shutting down session over {}", channel.authority());
// initiate abnormal termination from client
devProvService.onError(t);
invalidate();
}
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("channel", channel.authority())
.add("hasShutdown", hasShutdown.get())
.toString();
}
// gRPC wise, this object handles incoming message stream
/**
* Translates DeviceProvider instructions received from RPC to Java calls.
*/
private final class DeviceProviderClientProxy
implements StreamObserver<DeviceProviderMsg> {
private final DeviceProvider provider;
DeviceProviderClientProxy(DeviceProvider provider) {
this.provider = checkNotNull(provider);
}
@Override
public void onNext(DeviceProviderMsg msg) {
try {
log.trace("DeviceProviderClientProxy received: {}", msg);
onMethod(msg);
} catch (Exception e) {
log.error("Exception caught handling {} at DeviceProviderClientProxy", msg, e);
// initiate shutdown from client
shutdown(e);
}
}
/**
* Translates received RPC message to {@link DeviceProvider} method calls.
* @param msg DeviceProvider message
*/
private void onMethod(DeviceProviderMsg msg) {
switch (msg.getMethodCase()) {
case TRIGGER_PROBE:
TriggerProbe triggerProbe = msg.getTriggerProbe();
provider.triggerProbe(deviceId(triggerProbe.getDeviceId()));
break;
case ROLE_CHANGED:
RoleChanged roleChanged = msg.getRoleChanged();
provider.roleChanged(deviceId(roleChanged.getDeviceId()),
translate(roleChanged.getNewRole()));
break;
case IS_REACHABLE_REQUEST:
IsReachableRequest isReachableRequest = msg.getIsReachableRequest();
// check if reachable
boolean reachable = provider.isReachable(deviceId(isReachableRequest.getDeviceId()));
int xid = isReachableRequest.getXid();
// send response back DeviceProviderService channel
DeviceProviderServiceMsg.Builder builder = DeviceProviderServiceMsg.newBuilder();
builder.setIsReachableResponse(builder.getIsReachableResponseBuilder()
.setXid(xid)
.setIsReachable(reachable)
.build());
devProvService.onNext(builder.build());
break;
case METHOD_NOT_SET:
default:
log.warn("Unexpected method, ignoring", msg);
break;
}
}
@Override
public void onCompleted() {
log.info("DeviceProviderClientProxy completed");
// session terminated from remote
// TODO unregister...? how?
//devProvService.onCompleted();
}
@Override
public void onError(Throwable t) {
log.error("DeviceProviderClientProxy#onError", t);
// session terminated from remote
// TODO unregister...? how?
//devProvService.onError(t);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("channel", channel.authority())
.toString();
}
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.incubator.rpc.grpc;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.onlab.packet.ChassisId;
import org.onosproject.grpc.Device.DeviceType;
import org.onosproject.grpc.Port.PortType;
import org.onosproject.net.Annotations;
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.Device;
import org.onosproject.net.MastershipRole;
import org.onosproject.net.Port;
import org.onosproject.net.Port.Type;
import org.onosproject.net.PortNumber;
import org.onosproject.net.SparseAnnotations;
import org.onosproject.net.device.DefaultDeviceDescription;
import org.onosproject.net.device.DefaultPortDescription;
import org.onosproject.net.device.DefaultPortStatistics;
import org.onosproject.net.device.DeviceDescription;
import org.onosproject.net.device.PortDescription;
import org.onosproject.net.device.PortStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.api.client.repackaged.com.google.common.annotations.Beta;
/**
* gRPC message conversion related utilities.
*/
@Beta
public final class GrpcDeviceUtils {
private static final Logger log = LoggerFactory.getLogger(GrpcDeviceUtils.class);
/**
* Translates gRPC enum MastershipRole to ONOS enum.
*
* @param role mastership role in gRPC enum
* @return equivalent in ONOS enum
*/
public static MastershipRole translate(org.onosproject.grpc.Device.MastershipRole role) {
switch (role) {
case NONE:
return MastershipRole.NONE;
case MASTER:
return MastershipRole.MASTER;
case STANDBY:
return MastershipRole.STANDBY;
case UNRECOGNIZED:
log.warn("Unrecognized MastershipRole gRPC message: {}", role);
default:
return MastershipRole.NONE;
}
}
/**
* Translates ONOS enum MastershipRole to gRPC enum.
*
* @param newRole ONOS' mastership role
* @return equivalent in gRPC message enum
*/
public static org.onosproject.grpc.Device.MastershipRole translate(MastershipRole newRole) {
switch (newRole) {
case MASTER:
return org.onosproject.grpc.Device.MastershipRole.MASTER;
case STANDBY:
return org.onosproject.grpc.Device.MastershipRole.STANDBY;
case NONE:
default:
return org.onosproject.grpc.Device.MastershipRole.NONE;
}
}
/**
* Translates gRPC DeviceDescription to {@link DeviceDescription}.
*
* @param deviceDescription gRPC message
* @return {@link DeviceDescription}
*/
public static DeviceDescription translate(org.onosproject.grpc.Device.DeviceDescription deviceDescription) {
URI uri = URI.create(deviceDescription.getDeviceUri());
Device.Type type = translate(deviceDescription.getType());
String manufacturer = deviceDescription.getManufacturer();
String hwVersion = deviceDescription.getHwVersion();
String swVersion = deviceDescription.getSwVersion();
String serialNumber = deviceDescription.getSerialNumber();
ChassisId chassis = new ChassisId(deviceDescription.getChassisId());
return new DefaultDeviceDescription(uri, type, manufacturer,
hwVersion, swVersion, serialNumber,
chassis,
asAnnotations(deviceDescription.getAnnotations()));
}
/**
* Translates {@link DeviceDescription} to gRPC DeviceDescription message.
*
* @param deviceDescription {@link DeviceDescription}
* @return gRPC DeviceDescription message
*/
public static org.onosproject.grpc.Device.DeviceDescription translate(DeviceDescription deviceDescription) {
return org.onosproject.grpc.Device.DeviceDescription.newBuilder()
.setDeviceUri(deviceDescription.deviceUri().toString())
.setType(translate(deviceDescription.type()))
.setManufacturer(deviceDescription.manufacturer())
.setHwVersion(deviceDescription.hwVersion())
.setSwVersion(deviceDescription.swVersion())
.setSerialNumber(deviceDescription.serialNumber())
.setChassisId(deviceDescription.chassisId().toString())
.putAllAnnotations(asMap(deviceDescription.annotations()))
.build();
}
/**
* Translates gRPC DeviceType to {@link Device.Type}.
*
* @param type gRPC message
* @return {@link Device.Type}
*/
public static Device.Type translate(org.onosproject.grpc.Device.DeviceType type) {
switch (type) {
case BALANCER:
return Device.Type.BALANCER;
case CONTROLLER:
return Device.Type.CONTROLLER;
case FIBER_SWITCH:
return Device.Type.FIBER_SWITCH;
case FIREWALL:
return Device.Type.FIREWALL;
case IDS:
return Device.Type.IDS;
case IPS:
return Device.Type.IPS;
case MICROWAVE:
return Device.Type.MICROWAVE;
case OTHER:
return Device.Type.OTHER;
case OTN:
return Device.Type.OTN;
case ROADM:
return Device.Type.ROADM;
case ROADM_OTN:
return Device.Type.ROADM_OTN;
case ROUTER:
return Device.Type.ROUTER;
case SWITCH:
return Device.Type.SWITCH;
case VIRTUAL:
return Device.Type.VIRTUAL;
case UNRECOGNIZED:
default:
log.warn("Unexpected DeviceType: {}", type);
return Device.Type.OTHER;
}
}
/**
* Translates {@link Type} to gRPC DeviceType.
*
* @param type {@link Type}
* @return gRPC message
*/
public static DeviceType translate(Device.Type type) {
switch (type) {
case BALANCER:
return DeviceType.BALANCER;
case CONTROLLER:
return DeviceType.CONTROLLER;
case FIBER_SWITCH:
return DeviceType.FIBER_SWITCH;
case FIREWALL:
return DeviceType.FIREWALL;
case IDS:
return DeviceType.IDS;
case IPS:
return DeviceType.IPS;
case MICROWAVE:
return DeviceType.MICROWAVE;
case OTHER:
return DeviceType.OTHER;
case OTN:
return DeviceType.OTN;
case ROADM:
return DeviceType.ROADM;
case ROADM_OTN:
return DeviceType.ROADM_OTN;
case ROUTER:
return DeviceType.ROUTER;
case SWITCH:
return DeviceType.SWITCH;
case VIRTUAL:
return DeviceType.VIRTUAL;
default:
log.warn("Unexpected Device.Type: {}", type);
return DeviceType.OTHER;
}
}
/**
* Translates gRPC PortDescription message to {@link PortDescription}.
*
* @param portDescription gRPC message
* @return {@link PortDescription}
*/
public static PortDescription translate(org.onosproject.grpc.Port.PortDescription portDescription) {
PortNumber number = PortNumber.fromString(portDescription.getPortNumber());
boolean isEnabled = portDescription.getIsEnabled();
Port.Type type = translate(portDescription.getType());
long portSpeed = portDescription.getPortSpeed();
SparseAnnotations annotations = asAnnotations(portDescription.getAnnotations());
// TODO How to deal with more specific Port...
return new DefaultPortDescription(number, isEnabled, type, portSpeed, annotations);
}
/**
* Translates {@link PortDescription} to gRPC PortDescription message.
*
* @param portDescription {@link PortDescription}
* @return gRPC PortDescription message
*/
public static org.onosproject.grpc.Port.PortDescription translate(PortDescription portDescription) {
// TODO How to deal with more specific Port...
return org.onosproject.grpc.Port.PortDescription.newBuilder()
.setPortNumber(portDescription.portNumber().toString())
.setIsEnabled(portDescription.isEnabled())
.setType(translate(portDescription.type()))
.setPortSpeed(portDescription.portSpeed())
.putAllAnnotations(asMap(portDescription.annotations()))
.build();
}
/**
* Translates gRPC PortType to {@link Port.Type}.
*
* @param type gRPC message
* @return {@link Port.Type}
*/
public static Port.Type translate(PortType type) {
switch (type) {
case COPPER:
return Type.COPPER;
case FIBER:
return Type.FIBER;
case OCH:
return Type.OCH;
case ODUCLT:
return Type.ODUCLT;
case OMS:
return Type.OMS;
case PACKET:
return Type.PACKET;
case VIRTUAL:
return Type.VIRTUAL;
case UNRECOGNIZED:
default:
log.warn("Unexpected PortType: {}", type);
return Type.COPPER;
}
}
/**
* Translates {@link Port.Type} to gRPC PortType.
*
* @param type {@link Port.Type}
* @return gRPC message
*/
public static PortType translate(Port.Type type) {
switch (type) {
case COPPER:
return PortType.COPPER;
case FIBER:
return PortType.FIBER;
case OCH:
return PortType.OCH;
case ODUCLT:
return PortType.ODUCLT;
case OMS:
return PortType.OMS;
case PACKET:
return PortType.PACKET;
case VIRTUAL:
return PortType.VIRTUAL;
default:
log.warn("Unexpected Port.Type: {}", type);
return PortType.COPPER;
}
}
/**
* Translates gRPC PortStatistics message to {@link PortStatistics}.
*
* @param portStatistics gRPC PortStatistics message
* @return {@link PortStatistics}
*/
public static PortStatistics translate(org.onosproject.grpc.Port.PortStatistics portStatistics) {
// TODO implement adding missing fields
return DefaultPortStatistics.builder()
.setPort(portStatistics.getPort())
.setPacketsReceived(portStatistics.getPacketsReceived())
.setPacketsSent(portStatistics.getPacketsSent())
.build();
}
/**
* Translates {@link PortStatistics} to gRPC PortStatistics message.
*
* @param portStatistics {@link PortStatistics}
* @return gRPC PortStatistics message
*/
public static org.onosproject.grpc.Port.PortStatistics translate(PortStatistics portStatistics) {
// TODO implement adding missing fields
return org.onosproject.grpc.Port.PortStatistics.newBuilder()
.setPort(portStatistics.port())
.setPacketsReceived(portStatistics.packetsReceived())
.setPacketsSent(portStatistics.packetsSent())
.build();
}
// may be this can be moved to Annotation itself or AnnotationsUtils
/**
* Converts Annotations to Map of Strings.
*
* @param annotations {@link Annotations}
* @return Map of annotation key and values
*/
public static Map<String, String> asMap(Annotations annotations) {
if (annotations instanceof DefaultAnnotations) {
return ((DefaultAnnotations) annotations).asMap();
}
Map<String, String> map = new HashMap<>();
annotations.keys()
.forEach(k -> map.put(k, annotations.value(k)));
return map;
}
// may be this can be moved to Annotation itself or AnnotationsUtils
/**
* Converts Map of Strings to {@link SparseAnnotations}.
*
* @param annotations Map of annotation key and values
* @return {@link SparseAnnotations}
*/
public static SparseAnnotations asAnnotations(Map<String, String> annotations) {
DefaultAnnotations.Builder builder = DefaultAnnotations.builder();
annotations.entrySet().forEach(e -> {
if (e.getValue() != null) {
builder.set(e.getKey(), e.getValue());
} else {
builder.remove(e.getKey());
}
});
return builder.build();
}
// Utility class not intended for instantiation.
private GrpcDeviceUtils() {}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.incubator.rpc.grpc;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import org.onosproject.incubator.rpc.RemoteServiceContext;
import org.onosproject.net.device.DeviceProviderRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.MoreObjects;
import io.grpc.ManagedChannel;
// gRPC Client side
// Probably there should be plug-in mechanism in the future.
/**
* RemoteServiceContext based on gRPC.
*
* <p>
* Currently it supports {@link DeviceProviderRegistry}.
*/
public class GrpcRemoteServiceContext implements RemoteServiceContext {
private final Logger log = LoggerFactory.getLogger(getClass());
private final Map<Class<? extends Object>, Object> services = new ConcurrentHashMap<>();
private final ManagedChannel channel;
public GrpcRemoteServiceContext(ManagedChannel channel) {
this.channel = checkNotNull(channel);
services.put(DeviceProviderRegistry.class, new DeviceProviderRegistryClientProxy(channel));
}
@Override
public <T> T get(Class<T> serviceClass) {
@SuppressWarnings("unchecked")
T service = (T) services.get(serviceClass);
if (service != null) {
return service;
}
log.error("{} not supported", serviceClass);
throw new NoSuchElementException(serviceClass.getTypeName() + " not supported");
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("services", services.keySet())
.add("channel", channel.authority())
.toString();
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.incubator.rpc.grpc;
import static com.google.common.base.Preconditions.checkArgument;
import java.net.URI;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onosproject.incubator.rpc.RemoteServiceContext;
import org.onosproject.incubator.rpc.RemoteServiceContextProvider;
import org.onosproject.incubator.rpc.RemoteServiceContextProviderService;
import org.onosproject.incubator.rpc.RemoteServiceProviderRegistry;
import org.onosproject.net.provider.ProviderId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.grpc.ManagedChannel;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
// gRPC Client side
/**
* RemoteServiceContextProvider based on gRPC.
*/
@Component(immediate = true)
public class GrpcRemoteServiceProvider implements RemoteServiceContextProvider {
public static final String GRPC_SCHEME = "grpc";
public static final String RPC_PROVIDER_NAME = "org.onosproject.rpc.provider.grpc";
private static final ProviderId PID = new ProviderId(GRPC_SCHEME, RPC_PROVIDER_NAME);
private final Logger log = LoggerFactory.getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected RemoteServiceProviderRegistry rpcRegistry;
private Map<URI, ManagedChannel> channels = new ConcurrentHashMap<>();
private RemoteServiceContextProviderService providerService;
@Activate
protected void activate() {
providerService = rpcRegistry.register(this);
// FIXME remove me. test code to see if gRPC loads in karaf
//getChannel(URI.create("grpc://localhost:8080"));
log.info("Started");
}
@Deactivate
protected void deactivate() {
rpcRegistry.unregister(this);
// shutdown all channels
channels.values().stream()
.forEach(ManagedChannel::shutdown);
// Should we wait for shutdown? How?
channels.clear();
log.info("Stopped");
}
@Override
public ProviderId id() {
return PID;
}
@Override
public RemoteServiceContext get(URI uri) {
// Create gRPC client
return new GrpcRemoteServiceContext(getChannel(uri));
}
private ManagedChannel getChannel(URI uri) {
checkArgument(Objects.equals(GRPC_SCHEME, uri.getScheme()),
"Invalid URI scheme: %s", uri.getScheme());
return channels.compute(uri, (u, ch) -> {
if (ch != null && !ch.isShutdown()) {
return ch;
} else {
return createChannel(u);
}
});
}
private ManagedChannel createChannel(URI uri) {
log.debug("Creating channel for {}", uri);
return NettyChannelBuilder.forAddress(uri.getHost(), uri.getPort())
.negotiationType(NegotiationType.PLAINTEXT)
.build();
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.incubator.rpc.grpc;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.stream.Collectors.toList;
import static org.onosproject.incubator.rpc.grpc.GrpcDeviceUtils.translate;
import static org.onosproject.net.DeviceId.deviceId;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onosproject.grpc.Device.DeviceConnected;
import org.onosproject.grpc.Device.DeviceDisconnected;
import org.onosproject.grpc.Device.DeviceProviderMsg;
import org.onosproject.grpc.Device.DeviceProviderServiceMsg;
import org.onosproject.grpc.Device.IsReachableResponse;
import org.onosproject.grpc.Device.PortStatusChanged;
import org.onosproject.grpc.Device.ReceivedRoleReply;
import org.onosproject.grpc.Device.RegisterProvider;
import org.onosproject.grpc.Device.UpdatePortStatistics;
import org.onosproject.grpc.Device.UpdatePorts;
import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc;
import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpc;
import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
import org.onosproject.net.device.DeviceProvider;
import org.onosproject.net.device.DeviceProviderRegistry;
import org.onosproject.net.device.DeviceProviderService;
import org.onosproject.net.provider.ProviderId;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Sets;
import io.grpc.Server;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;
// gRPC Server on Metro-side
// Translates request received on RPC channel, and calls corresponding Service on
// Metro-ONOS cluster.
/**
* Server side implementation of gRPC based RemoteService.
*/
@Component(immediate = true)
public class GrpcRemoteServiceServer {
private static final String RPC_PROVIDER_NAME = "org.onosproject.rpc.provider.grpc";
// TODO pick a number
public static final int DEFAULT_LISTEN_PORT = 11984;
private final Logger log = LoggerFactory.getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceProviderRegistry deviceProviderRegistry;
@Property(name = "listenPort", intValue = DEFAULT_LISTEN_PORT,
label = "Port to listen on")
protected int listenPort = DEFAULT_LISTEN_PORT;
private Server server;
private final Set<DeviceProviderServerProxy> registeredProviders = Sets.newConcurrentHashSet();
@Activate
protected void activate(ComponentContext context) throws IOException {
modified(context);
log.debug("Server starting on {}", listenPort);
try {
server = NettyServerBuilder.forPort(listenPort)
.addService(DeviceProviderRegistryRpcGrpc.bindService(new DeviceProviderRegistryServerProxy()))
.build().start();
} catch (IOException e) {
log.error("Failed to start gRPC server", e);
throw e;
}
log.info("Started on {}", listenPort);
}
@Deactivate
protected void deactivate() {
registeredProviders.stream()
.forEach(deviceProviderRegistry::unregister);
server.shutdown();
// Should we wait for shutdown?
log.info("Stopped");
}
@Modified
public void modified(ComponentContext context) {
// TODO support dynamic reconfiguration and restarting server?
}
// RPC Server-side code
// RPC session Factory
/**
* Relays DeviceProviderRegistry calls from RPC client.
*/
class DeviceProviderRegistryServerProxy implements DeviceProviderRegistryRpc {
@Override
public StreamObserver<DeviceProviderServiceMsg> register(StreamObserver<DeviceProviderMsg> toDeviceProvider) {
log.trace("DeviceProviderRegistryServerProxy#register called!");
DeviceProviderServerProxy provider = new DeviceProviderServerProxy(toDeviceProvider);
return new DeviceProviderServiceServerProxy(provider, toDeviceProvider);
}
}
// Lower -> Upper Controller message
// RPC Server-side code
// RPC session handler
private final class DeviceProviderServiceServerProxy
implements StreamObserver<DeviceProviderServiceMsg> {
// intentionally shadowing
private final Logger log = LoggerFactory.getLogger(getClass());
private final DeviceProviderServerProxy pairedProvider;
private final StreamObserver<DeviceProviderMsg> toDeviceProvider;
private final Cache<Integer, CompletableFuture<Boolean>> outstandingIsReachable;
// wrapped providerService
private DeviceProviderService deviceProviderService;
DeviceProviderServiceServerProxy(DeviceProviderServerProxy provider,
StreamObserver<DeviceProviderMsg> toDeviceProvider) {
this.pairedProvider = provider;
this.toDeviceProvider = toDeviceProvider;
outstandingIsReachable = CacheBuilder.newBuilder()
.expireAfterWrite(1, TimeUnit.MINUTES)
.build();
// pair RPC session in other direction
provider.pair(this);
}
@Override
public void onNext(DeviceProviderServiceMsg msg) {
try {
log.trace("DeviceProviderServiceServerProxy received: {}", msg);
onMethod(msg);
} catch (Exception e) {
log.error("Exception thrown handling {}", msg, e);
onError(e);
throw e;
}
}
/**
* Translates received RPC message to {@link DeviceProviderService} method calls.
* @param msg DeviceProviderService message
*/
private void onMethod(DeviceProviderServiceMsg msg) {
switch (msg.getMethodCase()) {
case REGISTER_PROVIDER:
RegisterProvider registerProvider = msg.getRegisterProvider();
// TODO Do we care about provider name?
pairedProvider.setProviderId(new ProviderId(registerProvider.getProviderScheme(), RPC_PROVIDER_NAME));
registeredProviders.add(pairedProvider);
deviceProviderService = deviceProviderRegistry.register(pairedProvider);
break;
case DEVICE_CONNECTED:
DeviceConnected deviceConnected = msg.getDeviceConnected();
deviceProviderService.deviceConnected(deviceId(deviceConnected.getDeviceId()),
translate(deviceConnected.getDeviceDescription()));
break;
case DEVICE_DISCONNECTED:
DeviceDisconnected deviceDisconnected = msg.getDeviceDisconnected();
deviceProviderService.deviceDisconnected(deviceId(deviceDisconnected.getDeviceId()));
break;
case UPDATE_PORTS:
UpdatePorts updatePorts = msg.getUpdatePorts();
deviceProviderService.updatePorts(deviceId(updatePorts.getDeviceId()),
updatePorts.getPortDescriptionsList()
.stream()
.map(GrpcDeviceUtils::translate)
.collect(toList()));
break;
case PORT_STATUS_CHANGED:
PortStatusChanged portStatusChanged = msg.getPortStatusChanged();
deviceProviderService.portStatusChanged(deviceId(portStatusChanged.getDeviceId()),
translate(portStatusChanged.getPortDescription()));
break;
case RECEIVED_ROLE_REPLY:
ReceivedRoleReply receivedRoleReply = msg.getReceivedRoleReply();
deviceProviderService.receivedRoleReply(deviceId(receivedRoleReply.getDeviceId()),
translate(receivedRoleReply.getRequested()),
translate(receivedRoleReply.getResponse()));
break;
case UPDATE_PORT_STATISTICS:
UpdatePortStatistics updatePortStatistics = msg.getUpdatePortStatistics();
deviceProviderService.updatePortStatistics(deviceId(updatePortStatistics.getDeviceId()),
updatePortStatistics.getPortStatisticsList()
.stream()
.map(GrpcDeviceUtils::translate)
.collect(toList()));
break;
// return value of DeviceProvider#isReachable
case IS_REACHABLE_RESPONSE:
IsReachableResponse isReachableResponse = msg.getIsReachableResponse();
int xid = isReachableResponse.getXid();
boolean isReachable = isReachableResponse.getIsReachable();
CompletableFuture<Boolean> result = outstandingIsReachable.asMap().remove(xid);
if (result != null) {
result.complete(isReachable);
}
break;
case METHOD_NOT_SET:
default:
log.warn("Unexpected message received {}", msg);
break;
}
}
@Override
public void onCompleted() {
log.info("DeviceProviderServiceServerProxy completed");
deviceProviderRegistry.unregister(pairedProvider);
registeredProviders.remove(pairedProvider);
toDeviceProvider.onCompleted();
}
@Override
public void onError(Throwable e) {
log.error("DeviceProviderServiceServerProxy#onError", e);
deviceProviderRegistry.unregister(pairedProvider);
registeredProviders.remove(pairedProvider);
// TODO What is the proper clean up for bi-di stream on error?
// sample suggests no-op
toDeviceProvider.onError(e);
}
/**
* Registers Future for {@link DeviceProvider#isReachable(DeviceId)} return value.
* @param xid IsReachable call ID.
* @param reply Future to
*/
void register(int xid, CompletableFuture<Boolean> reply) {
outstandingIsReachable.put(xid, reply);
}
}
// Upper -> Lower Controller message
/**
* Relay DeviceProvider calls to RPC client.
*/
private final class DeviceProviderServerProxy
implements DeviceProvider {
private final Logger log = LoggerFactory.getLogger(getClass());
// xid for isReachable calls
private final AtomicInteger xidPool = new AtomicInteger();
private final StreamObserver<DeviceProviderMsg> toDeviceProvider;
private DeviceProviderServiceServerProxy deviceProviderServiceProxy = null;
private ProviderId providerId;
DeviceProviderServerProxy(StreamObserver<DeviceProviderMsg> toDeviceProvider) {
this.toDeviceProvider = toDeviceProvider;
}
void setProviderId(ProviderId pid) {
this.providerId = pid;
}
/**
* Registers RPC stream in other direction.
* @param deviceProviderServiceProxy {@link DeviceProviderServiceServerProxy}
*/
void pair(DeviceProviderServiceServerProxy deviceProviderServiceProxy) {
this.deviceProviderServiceProxy = deviceProviderServiceProxy;
}
@Override
public void triggerProbe(DeviceId deviceId) {
log.trace("triggerProbe({})", deviceId);
DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
msgBuilder.setTriggerProbe(msgBuilder.getTriggerProbeBuilder()
.setDeviceId(deviceId.toString())
.build());
DeviceProviderMsg triggerProbeMsg = msgBuilder.build();
toDeviceProvider.onNext(triggerProbeMsg);
// TODO Catch Exceptions and call onError()
}
@Override
public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
log.trace("roleChanged({}, {})", deviceId, newRole);
DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
msgBuilder.setRoleChanged(msgBuilder.getRoleChangedBuilder()
.setDeviceId(deviceId.toString())
.setNewRole(translate(newRole))
.build());
toDeviceProvider.onNext(msgBuilder.build());
// TODO Catch Exceptions and call onError()
}
@Override
public boolean isReachable(DeviceId deviceId) {
log.trace("isReachable({})", deviceId);
CompletableFuture<Boolean> result = new CompletableFuture<>();
final int xid = xidPool.incrementAndGet();
DeviceProviderMsg.Builder msgBuilder = DeviceProviderMsg.newBuilder();
msgBuilder.setIsReachableRequest(msgBuilder.getIsReachableRequestBuilder()
.setXid(xid)
.setDeviceId(deviceId.toString())
.build());
// Associate xid and register above future some where
// in DeviceProviderService channel to receive reply
if (deviceProviderServiceProxy != null) {
deviceProviderServiceProxy.register(xid, result);
}
// send message down RPC
toDeviceProvider.onNext(msgBuilder.build());
// wait for reply
try {
return result.get(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.debug("isReachable({}) was Interrupted", deviceId, e);
Thread.currentThread().interrupt();
} catch (TimeoutException e) {
log.warn("isReachable({}) Timed out", deviceId, e);
} catch (ExecutionException e) {
log.error("isReachable({}) Execution failed", deviceId, e);
// close session?
}
return false;
// TODO Catch Exceptions and call onError()
}
@Override
public ProviderId id() {
return checkNotNull(providerId, "not initialized yet");
}
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* gRPC based RemoteServiceProvider implementation.
*/
package org.onosproject.incubator.rpc.grpc;
syntax = "proto3";
option java_package = "org.onosproject.grpc";
import "Port.proto";
package Device;
enum DeviceType {
OTHER = 0;
SWITCH = 1;
ROUTER = 2;
ROADM = 3;
OTN = 4;
ROADM_OTN = 5;
FIREWALL = 6;
BALANCER = 7;
IPS = 8;
IDS = 9;
CONTROLLER = 10;
VIRTUAL = 11;
FIBER_SWITCH = 12;
MICROWAVE = 13;
}
message DeviceDescription {
string device_Uri = 1;
DeviceType type = 2;
string manufacturer = 3;
string hw_version = 4;
string sw_version = 5;
string serial_number = 6;
string chassis_id = 7;
map<string, string> annotations = 8;
}
enum MastershipRole {
NONE = 0;
MASTER = 1;
STANDBY = 2;
}
message DeviceConnected {
// DeviceID as String DeviceId#toString
string device_id = 1;
DeviceDescription device_description = 2;
}
message DeviceDisconnected {
// DeviceID as String DeviceId#toString
string device_id = 1;
}
message UpdatePorts {
// DeviceID as String DeviceId#toString
string device_id = 1;
repeated Port.PortDescription port_descriptions= 2;
}
message PortStatusChanged {
// DeviceID as String DeviceId#toString
string device_id = 1;
Port.PortDescription port_description= 2;
}
message ReceivedRoleReply {
// DeviceID as String DeviceId#toString
string device_id = 1;
MastershipRole requested = 2;
MastershipRole response = 3;
}
message UpdatePortStatistics {
// DeviceID as String DeviceId#toString
string device_id = 1;
repeated Port.PortStatistics port_statistics = 2;
}
message RegisterProvider {
// DeviceProvider's ProviderId scheme
string provider_scheme = 1;
}
message DeviceProviderServiceMsg {
oneof method {
DeviceConnected device_connected= 1;
DeviceDisconnected device_disconnected = 2;
UpdatePorts update_ports= 3;
PortStatusChanged port_status_changed = 4;
ReceivedRoleReply received_role_reply = 5;
UpdatePortStatistics update_port_statistics = 6;
// This message is for return value of DeviceProvider#isReachable
IsReachableResponse is_reachable_response = 7;
// This MUST be the 1st message over the stream
RegisterProvider register_provider = 8;
}
}
message TriggerProbe {
// DeviceID as String DeviceId#toString
string device_id = 1;
}
message RoleChanged {
// DeviceID as String DeviceId#toString
string device_id = 1;
MastershipRole new_role = 2;
}
message IsReachableRequest {
int32 xid = 1;
// DeviceID as String DeviceId#toString
string device_id = 2;
}
message IsReachableResponse {
int32 xid = 1;
bool is_reachable = 2;
}
message DeviceProviderMsg {
oneof method {
TriggerProbe trigger_probe = 1;
RoleChanged role_changed = 2;
IsReachableRequest is_reachable_request= 3;
}
}
service DeviceProviderRegistryRpc {
rpc Register(stream DeviceProviderServiceMsg) returns (stream DeviceProviderMsg);
}
syntax = "proto3";
option java_package = "org.onosproject.grpc";
package Port;
enum PortType {
// Signifies copper-based connectivity.
COPPER = 0;
// Signifies optical fiber-based connectivity.
FIBER = 1;
// Signifies optical fiber-based packet port.
PACKET = 2;
// Signifies optical fiber-based optical tributary port (called T-port).
//The signal from the client side will be formed into a ITU G.709 (OTN) frame.
ODUCLT = 3;
// Signifies optical fiber-based Line-side port (called L-port).
OCH = 4;
// Signifies optical fiber-based WDM port (called W-port).
//Optical Multiplexing Section (See ITU G.709).
OMS = 5;
// Signifies virtual port.
VIRTUAL = 6;
}
// TODO What are we going to do with more specific PortDescription ...
message PortDescription {
// PortNumber as String PortNumber#toString
string port_number = 1;
bool is_enabled = 2;
PortType type = 3;
int64 port_speed = 4;
map<string, string> annotations = 8;
}
message PortStatistics {
int32 port = 1;
int64 packets_received = 2;
int64 packets_sent = 3;
// TODO add all other fields
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.incubator.rpc.grpc;
import static org.junit.Assert.*;
import static org.onosproject.net.DeviceId.deviceId;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onlab.packet.ChassisId;
import org.onosproject.incubator.rpc.RemoteServiceContext;
import org.onosproject.incubator.rpc.RemoteServiceContextProvider;
import org.onosproject.incubator.rpc.RemoteServiceContextProviderService;
import org.onosproject.incubator.rpc.RemoteServiceProviderRegistry;
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.Device.Type;
import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
import org.onosproject.net.PortNumber;
import org.onosproject.net.SparseAnnotations;
import org.onosproject.net.device.DefaultDeviceDescription;
import org.onosproject.net.device.DefaultPortDescription;
import org.onosproject.net.device.DeviceDescription;
import org.onosproject.net.device.DeviceProvider;
import org.onosproject.net.device.DeviceProviderRegistry;
import org.onosproject.net.device.DeviceProviderService;
import org.onosproject.net.device.PortDescription;
import org.onosproject.net.device.PortStatistics;
import org.onosproject.net.provider.AbstractProviderRegistry;
import org.onosproject.net.provider.AbstractProviderService;
import org.onosproject.net.provider.ProviderId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableList;
/**
* Set of tests of the gRPC RemoteService components.
*/
public class GrpcRemoteServiceTest {
private static final DeviceId DEVICE_ID = deviceId("dev:000001");
private final Logger log = LoggerFactory.getLogger(getClass());
private static final ProviderId PID = new ProviderId("test", "com.exmaple.test");
private static final URI DURI = URI.create("dev:000001");
private static final String MFR = "mfr";
private static final String HW = "hw";
private static final String SW = "sw";
private static final String SN = "serial";
private static final ChassisId CHASSIS = new ChassisId(42);
private static final SparseAnnotations ANON = DefaultAnnotations.builder()
.set("foo", "var")
.build();
private static final PortNumber PORT = PortNumber.portNumber(99);
private static final DeviceDescription DDESC
= new DefaultDeviceDescription(DURI, Type.SWITCH, MFR, HW, SW, SN,
CHASSIS, ANON);
private GrpcRemoteServiceServer server;
private GrpcRemoteServiceProvider client;
private DeviceProvider svSideDeviceProvider;
private MTestDeviceProviderService svDeviceProviderService;
private CountDownLatch serverReady;
private URI uri;
public static int pickListenPort() {
try {
// pick unused port
ServerSocket socket = new ServerSocket(0);
int port = socket.getLocalPort();
socket.close();
return port;
} catch (IOException e) {
// something went wrong, try picking randomly
return RandomUtils.nextInt(49152, 0xFFFF + 1);
}
}
@Before
public void setUp() throws Exception {
serverReady = new CountDownLatch(1);
server = new GrpcRemoteServiceServer();
server.deviceProviderRegistry = new MTestDeviceProviderRegistry();
// todo: pass proper ComponentContext
server.listenPort = pickListenPort();
uri = URI.create("grpc://localhost:" + server.listenPort);
server.activate(null);
client = new GrpcRemoteServiceProvider();
client.rpcRegistry = new NoOpRemoteServiceProviderRegistry();
client.activate();
}
@After
public void tearDown() {
client.deactivate();
server.deactivate();
}
private static void assertEqualsButNotSame(Object expected, Object actual) {
assertEquals(expected, actual);
assertNotSame("Cannot be same instance if it properly went through gRPC",
expected, actual);
}
@Test
public void basics() throws InterruptedException {
RemoteServiceContext remoteServiceContext = client.get(uri);
assertNotNull(remoteServiceContext);
DeviceProviderRegistry deviceProviderRegistry = remoteServiceContext.get(DeviceProviderRegistry.class);
assertNotNull(deviceProviderRegistry);
CTestDeviceProvider clDeviceProvider = new CTestDeviceProvider();
DeviceProviderService clDeviceProviderService = deviceProviderRegistry.register(clDeviceProvider);
assertTrue(serverReady.await(10, TimeUnit.SECONDS));
// client to server communication
clDeviceProviderService.deviceConnected(DEVICE_ID, DDESC);
assertTrue(svDeviceProviderService.deviceConnected.await(10, TimeUnit.SECONDS));
assertEqualsButNotSame(DEVICE_ID, svDeviceProviderService.deviceConnectedDid);
assertEqualsButNotSame(DDESC, svDeviceProviderService.deviceConnectedDesc);
PortDescription portDescription = new DefaultPortDescription(PORT, true, ANON);
List<PortDescription> portDescriptions = ImmutableList.of(portDescription);
clDeviceProviderService.updatePorts(DEVICE_ID, portDescriptions);
assertTrue(svDeviceProviderService.updatePorts.await(10, TimeUnit.SECONDS));
assertEqualsButNotSame(DEVICE_ID, svDeviceProviderService.updatePortsDid);
assertEqualsButNotSame(portDescriptions, svDeviceProviderService.updatePortsDescs);
MastershipRole cRole = MastershipRole.MASTER;
MastershipRole dRole = MastershipRole.STANDBY;
clDeviceProviderService.receivedRoleReply(DEVICE_ID, cRole, dRole);
assertTrue(svDeviceProviderService.receivedRoleReply.await(10, TimeUnit.SECONDS));
assertEqualsButNotSame(DEVICE_ID, svDeviceProviderService.receivedRoleReplyDid);
assertEquals(cRole, svDeviceProviderService.receivedRoleReplyRequested);
assertEquals(dRole, svDeviceProviderService.receivedRoleReplyResponse);
clDeviceProviderService.portStatusChanged(DEVICE_ID, portDescription);
assertTrue(svDeviceProviderService.portStatusChanged.await(10, TimeUnit.SECONDS));
assertEqualsButNotSame(DEVICE_ID, svDeviceProviderService.portStatusChangedDid);
assertEqualsButNotSame(portDescription, svDeviceProviderService.portStatusChangedDesc);
Collection<PortStatistics> portStatistics = Collections.emptyList();
clDeviceProviderService.updatePortStatistics(DEVICE_ID, portStatistics);
assertTrue(svDeviceProviderService.updatePortStatistics.await(10, TimeUnit.SECONDS));
assertEqualsButNotSame(DEVICE_ID, svDeviceProviderService.updatePortStatisticsDid);
assertEqualsButNotSame(portStatistics, svDeviceProviderService.updatePortStatisticsStats);
clDeviceProviderService.deviceDisconnected(DEVICE_ID);
assertTrue(svDeviceProviderService.deviceDisconnected.await(10, TimeUnit.SECONDS));
assertEqualsButNotSame(DEVICE_ID, svDeviceProviderService.deviceDisconnectedDid);
// server to client communication
svSideDeviceProvider.triggerProbe(DEVICE_ID);
assertTrue(clDeviceProvider.triggerProbe.await(10, TimeUnit.SECONDS));
assertEquals(DEVICE_ID, clDeviceProvider.triggerProbeDid);
assertNotSame("Cannot be same instance if it properly went through gRPC",
DEVICE_ID, clDeviceProvider.triggerProbeDid);
svSideDeviceProvider.roleChanged(DEVICE_ID, MastershipRole.STANDBY);
assertTrue(clDeviceProvider.roleChanged.await(10, TimeUnit.SECONDS));
assertEquals(DEVICE_ID, clDeviceProvider.roleChangedDid);
assertNotSame("Cannot be same instance if it properly went through gRPC",
DEVICE_ID, clDeviceProvider.roleChangedDid);
assertEquals(MastershipRole.STANDBY, clDeviceProvider.roleChangedNewRole);
clDeviceProvider.isReachableReply = false;
assertEquals(clDeviceProvider.isReachableReply,
svSideDeviceProvider.isReachable(DEVICE_ID));
assertTrue(clDeviceProvider.isReachable.await(10, TimeUnit.SECONDS));
assertEquals(DEVICE_ID, clDeviceProvider.isReachableDid);
assertNotSame("Cannot be same instance if it properly went through gRPC",
DEVICE_ID, clDeviceProvider.isReachableDid);
}
/**
* Device Provider on CO side.
*/
public class CTestDeviceProvider implements DeviceProvider {
final CountDownLatch triggerProbe = new CountDownLatch(1);
DeviceId triggerProbeDid;
final CountDownLatch roleChanged = new CountDownLatch(1);
DeviceId roleChangedDid;
MastershipRole roleChangedNewRole;
final CountDownLatch isReachable = new CountDownLatch(1);
DeviceId isReachableDid;
boolean isReachableReply = false;
@Override
public ProviderId id() {
return PID;
}
@Override
public void triggerProbe(DeviceId deviceId) {
log.info("triggerProbe({}) on Client called", deviceId);
triggerProbeDid = deviceId;
triggerProbe.countDown();
}
@Override
public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
log.info("roleChanged({},{}) on Client called", deviceId, newRole);
roleChangedDid = deviceId;
roleChangedNewRole = newRole;
roleChanged.countDown();
}
@Override
public boolean isReachable(DeviceId deviceId) {
log.info("isReachable({}) on Client called", deviceId);
isReachableDid = deviceId;
isReachable.countDown();
return isReachableReply;
}
}
class NoOpRemoteServiceProviderRegistry
implements RemoteServiceProviderRegistry {
@Override
public RemoteServiceContextProviderService register(RemoteServiceContextProvider provider) {
return new RemoteServiceContextProviderService() {
@Override
public RemoteServiceContextProvider provider() {
return provider;
}
};
}
@Override
public void unregister(RemoteServiceContextProvider provider) {
}
@Override
public Set<ProviderId> getProviders() {
return Collections.emptySet();
}
}
/**
* DeviceProvider on Metro side.
*/
public class MTestDeviceProviderRegistry
extends AbstractProviderRegistry<DeviceProvider, DeviceProviderService>
implements DeviceProviderRegistry {
@Override
protected DeviceProviderService createProviderService(DeviceProvider provider) {
log.info("createProviderService({})", provider);
svSideDeviceProvider = provider;
svDeviceProviderService = new MTestDeviceProviderService(provider);
serverReady.countDown();
return svDeviceProviderService;
}
}
private final class MTestDeviceProviderService
extends AbstractProviderService<DeviceProvider>
implements DeviceProviderService {
public MTestDeviceProviderService(DeviceProvider provider) {
super(provider);
}
final CountDownLatch deviceConnected = new CountDownLatch(1);
DeviceId deviceConnectedDid;
DeviceDescription deviceConnectedDesc;
@Override
public void deviceConnected(DeviceId deviceId,
DeviceDescription deviceDescription) {
log.info("deviceConnected({}, {}) on Server called", deviceId, deviceDescription);
deviceConnectedDid = deviceId;
deviceConnectedDesc = deviceDescription;
deviceConnected.countDown();
}
final CountDownLatch updatePorts = new CountDownLatch(1);
DeviceId updatePortsDid;
List<PortDescription> updatePortsDescs;
@Override
public void updatePorts(DeviceId deviceId,
List<PortDescription> portDescriptions) {
log.info("updatePorts({}, {}) on Server called", deviceId, portDescriptions);
updatePortsDid = deviceId;
updatePortsDescs = portDescriptions;
updatePorts.countDown();
}
final CountDownLatch receivedRoleReply = new CountDownLatch(1);
DeviceId receivedRoleReplyDid;
MastershipRole receivedRoleReplyRequested;
MastershipRole receivedRoleReplyResponse;
@Override
public void receivedRoleReply(DeviceId deviceId, MastershipRole requested,
MastershipRole response) {
log.info("receivedRoleReply({}, {}, {}) on Server called", deviceId, requested, response);
receivedRoleReplyDid = deviceId;
receivedRoleReplyRequested = requested;
receivedRoleReplyResponse = response;
receivedRoleReply.countDown();
}
final CountDownLatch portStatusChanged = new CountDownLatch(1);
DeviceId portStatusChangedDid;
PortDescription portStatusChangedDesc;
@Override
public void portStatusChanged(DeviceId deviceId,
PortDescription portDescription) {
log.info("portStatusChanged({}, {}) on Server called", deviceId, portDescription);
portStatusChangedDid = deviceId;
portStatusChangedDesc = portDescription;
portStatusChanged.countDown();
}
final CountDownLatch updatePortStatistics = new CountDownLatch(1);
DeviceId updatePortStatisticsDid;
Collection<PortStatistics> updatePortStatisticsStats;
@Override
public void updatePortStatistics(DeviceId deviceId,
Collection<PortStatistics> portStatistics) {
log.info("updatePortStatistics({}, {}) on Server called", deviceId, portStatistics);
updatePortStatisticsDid = deviceId;
updatePortStatisticsStats = portStatistics;
updatePortStatistics.countDown();
}
final CountDownLatch deviceDisconnected = new CountDownLatch(1);
DeviceId deviceDisconnectedDid;
@Override
public void deviceDisconnected(DeviceId deviceId) {
log.info("deviceDisconnected({}) on Server called", deviceId);
deviceDisconnectedDid = deviceId;
deviceDisconnected.countDown();
}
}
}