Sanjana Agarwal
Committed by Gerrit Code Review

Made changes as per comments.

kafkaProducer is now non-static.

TODO: KafkaPublisherManager Service and not Singleton.
Kafka event publishing.

Change-Id: I5ec20a6e4950c38e822468d343521ab77475b7d3
Showing 36 changed files with 710 additions and 284 deletions
......@@ -38,6 +38,7 @@
<artifactId>protobuf-java</artifactId>
<version>3.0.0-beta-2</version>
</dependency>
</dependencies>
<build>
......
/**
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.kafkaintegration.api;
import org.onosproject.event.Event;
import org.onosproject.kafkaintegration.api.dto.OnosEvent;
/**
* API for conversion of various ONOS events to Protobuf.
*
*/
public interface EventConversionService {
OnosEvent convertEvent(Event<?, ?> event);
}
......@@ -16,16 +16,19 @@ package org.onosproject.kafkaintegration.api;
import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
import org.onosproject.kafkaintegration.errors.InvalidApplicationException;
import org.onosproject.kafkaintegration.errors.InvalidGroupIdException;
import com.google.common.annotations.Beta;
import java.util.List;
/**
* APIs for subscribing to Onos Event Messages.
*/
@Beta
public interface EventExporterService {
public interface EventSubscriptionService {
/**
* Registers the external application to receive events generated in ONOS.
......@@ -61,4 +64,12 @@ public interface EventExporterService {
*/
void unsubscribe(EventSubscriber subscriber)
throws InvalidGroupIdException, InvalidApplicationException;
/**
* Returns the event subscriber for various event types.
*
* @param type ONOS event type.
* @return List of event subscribers
*/
List<EventSubscriber> getEventSubscribers(Type type);
}
......
/**
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.kafkaintegration.api;
import com.google.protobuf.GeneratedMessage;
import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
/**
* API for dispatching ONOS events.
*/
public interface KafkaPublisherService {
/**
* Publish the ONOS Event to all listeners.
*
* @param eventType the ONOS eventtype
* @param message generated Protocol buffer message from ONOS event data
*/
void publish(Type eventType, GeneratedMessage message);
}
......@@ -19,7 +19,7 @@ import org.onosproject.event.AbstractEvent;
import com.google.protobuf.GeneratedMessage;
/**
* Represents the converted Onos Event data into GPB format.
* Represents the converted Onos Event data into protobuf format.
*
*/
public class OnosEvent extends AbstractEvent<OnosEvent.Type, GeneratedMessage> {
......@@ -38,6 +38,19 @@ public class OnosEvent extends AbstractEvent<OnosEvent.Type, GeneratedMessage> {
* List of Event Types supported.
*/
public enum Type {
DEVICE, LINK;
/**
* Signifies Device events.
*/
DEVICE("DEVICE"),
/**
* Signifies Link events.
*/
LINK("LINK");
public String typeName;
Type(String name) {
typeName = name;
}
}
}
......
......@@ -19,6 +19,9 @@
featuresRepo="mvn:${project.groupId}/${project.artifactId}/${project.version}/xml/features"
features="${project.artifactId}" apps="org.onosproject.incubator.protobuf">
<description>${project.description}</description>
<artifact>mvn:${project.groupId}/${project.artifactId}/${project.version}</artifact>
<artifact>mvn:${project.groupId}/onos-app-kafka-api/${project.version}</artifact>
<artifact>mvn:${project.groupId}/onos-app-kafka-core/${project.version}</artifact>
<artifact>mvn:${project.groupId}/onos-app-kafka-web/${project.version}</artifact>
<artifact>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.kafka-clients/0.8.2.2_1</artifact>
</app>
......
......@@ -19,6 +19,8 @@
description="${project.description}">
<feature>onos-api</feature>
<bundle>mvn:${project.groupId}/onos-app-kafka-api/${project.version}</bundle>
<bundle>mvn:${project.groupId}/onos-app-kafka/${project.version}</bundle>
<bundle>mvn:${project.groupId}/onos-app-kafka-core/${project.version}</bundle>
<bundle>mvn:${project.groupId}/onos-app-kafka-web/${project.version}</bundle>
<bundle>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.kafka-clients/0.8.2.2_1</bundle>
</feature>
</features>
......
......@@ -26,12 +26,6 @@
<artifactId>onos-app-kafka</artifactId>
<packaging>bundle</packaging>
<description>
Kafka Integration Application.
This will export ONOS Events to Northbound Kafka Server.
</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<onos.version>${project.version}</onos.version>
......@@ -41,21 +35,18 @@
<web.context>/onos/kafka</web.context>
<api.version>1.0.0</api.version>
<api.package>org.onosproject.kafkaintegration.rest</api.package>
<api.title>Kafka Integration Application REST API</api.title>
<api.description>
APIs for subscribing to Events generated by ONOS
</api.description>
<onos.app.category>Utility</onos.app.category>
<onos.app.url>https://wiki.onosproject.org/display/ONOS/Kafka+Integration</onos.app.url>
<onos.app.readme>Export ONOS events to a Northbound Kafka server</onos.app.readme>
<onos.app.requires>org.onosproject.incubator.protobuf</onos.app.requires>
</properties>
<dependencies>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-api</artifactId>
</dependency>
<packaging>pom</packaging>
<description>
Kafka Integration Application.
This will export ONOS Events to Northbound Kafka Server.
</description>
<dependencies>
<dependency>
<groupId>org.onosproject</groupId>
......@@ -65,168 +56,22 @@
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-incubator-protobuf</artifactId>
<artifactId>onos-app-kafka-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onlab-osgi</artifactId>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-rest</artifactId>
<artifactId>onos-app-kafka-web</artifactId>
<version>${project.version}</version>
</dependency>
<!--Also need to update the app.xml and the features.xml -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-api</artifactId>
<scope>test</scope>
<classifier>tests</classifier>
<groupId>org.apache.servicemix.bundles</groupId>
<artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
<version>0.8.2.2_1</version>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.0.0-beta-2</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.containers</groupId>
<artifactId>jersey-container-servlet</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-core-serializers</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr.annotations</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<extensions>true</extensions>
<configuration>
<instructions>
<Bundle-SymbolicName>
${project.groupId}.${project.artifactId}
</Bundle-SymbolicName>
<_wab>src/main/webapp/</_wab>
<Include-Resource>
WEB-INF/classes/apidoc/swagger.json=target/swagger.json,
{maven-resources}
</Include-Resource>
<Import-Package>
org.slf4j,
org.osgi.framework,
javax.ws.rs,
javax.ws.rs.core,
org.glassfish.jersey.servlet,
com.fasterxml.jackson.databind,
com.fasterxml.jackson.databind.node,
com.fasterxml.jackson.core,
org.onlab.packet.*,
org.onosproject.*,
org.onlab.util.*,
com.google.common.*,
com.google.protobuf.*
</Import-Package>
<Web-ContextPath>${web.context}</Web-ContextPath>
</instructions>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-scr-plugin</artifactId>
<executions>
<execution>
<id>generate-scr-srcdescriptor</id>
<goals>
<goal>scr</goal>
</goals>
</execution>
</executions>
<configuration>
<supportedProjectTypes>
<supportedProjectType>bundle</supportedProjectType>
<supportedProjectType>war</supportedProjectType>
</supportedProjectTypes>
</configuration>
</plugin>
<plugin>
<groupId>org.onosproject</groupId>
<artifactId>onos-maven-plugin</artifactId>
<executions>
<execution>
<id>cfg</id>
<phase>generate-resources</phase>
<goals>
<goal>cfg</goal>
</goals>
</execution>
<execution>
<id>swagger</id>
<goals>
<goal>swagger</goal>
</goals>
</execution>
<execution>
<id>app</id>
<phase>package</phase>
<goals>
<goal>app</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
</project>
\ No newline at end of file
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2016-present Open Networking Laboratory
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.onosproject</groupId>
<artifactId>onos-kafka</artifactId>
<version>1.7.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>onos-app-kafka-core</artifactId>
<packaging>bundle</packaging>
<description>
Kafka Integration Application.
This module is exclusive of REST calls and is only for the implementation of Apache Kafka.
</description>
<dependencies>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-api</artifactId>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-app-kafka-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.core</artifactId>
<version>4.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.servicemix.bundles</groupId>
<artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
<version>0.8.2.2_1</version>
</dependency>
<dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr.annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-incubator-protobuf</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.0.0-beta-2</version>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-app-kafka-web</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.compendium</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<extensions>true</extensions>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-scr-plugin</artifactId>
<executions>
<execution>
<id>generate-scr-srcdescriptor</id>
<goals>
<goal>scr</goal>
</goals>
</execution>
</executions>
<configuration>
<supportedProjectTypes>
<supportedProjectType>bundle</supportedProjectType>
<supportedProjectType>war</supportedProjectType>
</supportedProjectTypes>
</configuration>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
......@@ -14,13 +14,13 @@
*/
package org.onosproject.kafkaintegration.converter;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
import java.util.HashMap;
import java.util.Map;
import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
/**
* Returns the appropriate converter object based on the ONOS event type.
......
......@@ -14,6 +14,7 @@
*/
package org.onosproject.kafkaintegration.converter;
import com.google.protobuf.GeneratedMessage;
import org.onosproject.event.Event;
import org.onosproject.grpc.net.Device.DeviceCore;
import org.onosproject.grpc.net.Device.DeviceType;
......@@ -25,13 +26,10 @@ import org.onosproject.net.device.DeviceEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.GeneratedMessage;
/**
* Converts ONOS Device event message to GPB format.
*
* Converts ONOS Device event message to protobuf format.
*/
class DeviceEventConverter implements EventConverter {
public class DeviceEventConverter implements EventConverter {
private final Logger log = LoggerFactory.getLogger(getClass());
......@@ -73,14 +71,14 @@ class DeviceEventConverter implements EventConverter {
DeviceCore deviceCore =
DeviceCore.newBuilder()
.setChassisId(deviceEvent.subject().chassisId().id()
.toString())
.toString())
.setDeviceId(deviceEvent.subject().id().toString())
.setHwVersion(deviceEvent.subject().hwVersion())
.setManufacturer(deviceEvent.subject().manufacturer())
.setSerialNumber(deviceEvent.subject().serialNumber())
.setSwVersion(deviceEvent.subject().swVersion())
.setType(DeviceType
.valueOf(deviceEvent.subject().type().name()))
.valueOf(deviceEvent.subject().type().name()))
.build();
PortCore portCore = null;
......@@ -89,10 +87,10 @@ class DeviceEventConverter implements EventConverter {
PortCore.newBuilder()
.setIsEnabled(deviceEvent.port().isEnabled())
.setPortNumber(deviceEvent.port().number()
.toString())
.toString())
.setPortSpeed(deviceEvent.port().portSpeed())
.setType(PortType
.valueOf(deviceEvent.port().type().name()))
.valueOf(deviceEvent.port().type().name()))
.build();
notificationBuilder.setPort(portCore);
......
......@@ -20,7 +20,7 @@ import com.google.protobuf.GeneratedMessage;
/**
*
* APIs for converting between ONOS event objects and GPB data objects.
* APIs for converting between ONOS event objects and protobuf data objects.
*
*/
public interface EventConverter {
......@@ -30,7 +30,7 @@ public interface EventConverter {
* to Kafka.
*
* @param event ONOS Event object
* @return converted data in GPB format.
* @return converted data in protobuf format.
*/
public GeneratedMessage convertToProtoMessage(Event<?, ?> event);
GeneratedMessage convertToProtoMessage(Event<?, ?> event);
}
......
......@@ -28,10 +28,9 @@ import org.slf4j.LoggerFactory;
import com.google.protobuf.GeneratedMessage;
/**
* Converts for ONOS Link event message to GPB format.
*
* Converts for ONOS Link event message to protobuf format.
*/
class LinkEventConverter implements EventConverter {
public class LinkEventConverter implements EventConverter {
private final Logger log = LoggerFactory.getLogger(getClass());
......@@ -41,7 +40,7 @@ class LinkEventConverter implements EventConverter {
LinkEvent linkEvent = (LinkEvent) event;
if (!linkEventTypeSupported(linkEvent)) {
log.error("Unsupported Onos Event {}. There is no matching"
log.error("Unsupported Onos Event {}. There is no matching "
+ "proto Event type", linkEvent.type().toString());
return null;
}
......@@ -56,7 +55,6 @@ class LinkEventConverter implements EventConverter {
return true;
}
}
return false;
}
......@@ -66,8 +64,7 @@ class LinkEventConverter implements EventConverter {
.setLink(LinkCore.newBuilder()
.setState(LinkState
.valueOf(linkEvent.subject().state().name()))
.setType(LinkType
.valueOf(linkEvent.subject().type().name()))
.setType(LinkType.valueOf(linkEvent.subject().type().name()))
.setDst(ConnectPoint.newBuilder()
.setDeviceId(linkEvent.subject().dst()
.deviceId().toString())
......
/**
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.kafkaintegration.impl;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.event.Event;
import org.onosproject.kafkaintegration.api.EventConversionService;
import org.onosproject.kafkaintegration.api.dto.OnosEvent;
import org.onosproject.kafkaintegration.converter.DeviceEventConverter;
import org.onosproject.kafkaintegration.converter.EventConverter;
import org.onosproject.kafkaintegration.converter.LinkEventConverter;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.link.LinkEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
/**
* Implementation of Event Conversion Service.
*
*/
@Component(immediate = true)
@Service
public class EventConversionManager implements EventConversionService {
private final Logger log = LoggerFactory.getLogger(getClass());
private EventConverter deviceEventConverter;
private EventConverter linkEventConverter;
@Activate
protected void activate() {
deviceEventConverter = new DeviceEventConverter();
linkEventConverter = new LinkEventConverter();
log.info("Started");
}
@Deactivate
protected void deactivate() {
log.info("Stopped");
}
@Override
public OnosEvent convertEvent(Event<?, ?> event) {
if (event instanceof DeviceEvent) {
return new OnosEvent(DEVICE, deviceEventConverter.convertToProtoMessage(event));
} else if (event instanceof LinkEvent) {
return new OnosEvent(LINK, linkEventConverter.convertToProtoMessage(event));
} else {
throw new IllegalArgumentException("Unsupported event type");
}
}
}
/*
/**
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
......@@ -15,18 +15,7 @@
*/
package org.onosproject.kafkaintegration.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
import org.onosproject.kafkaintegration.api.dto.DefaultEventSubscriber;
import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import com.google.common.collect.ImmutableList;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -35,7 +24,9 @@ import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.kafkaintegration.api.EventExporterService;
import org.onosproject.kafkaintegration.api.EventSubscriptionService;
import org.onosproject.kafkaintegration.api.dto.DefaultEventSubscriber;
import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
import org.onosproject.kafkaintegration.api.dto.OnosEvent;
import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
......@@ -51,13 +42,22 @@ import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
/**
* Implementation of Event Exporter Service.
* Implementation of Event Subscription Manager.
*
*/
@Component(immediate = true)
@Service
public class EventExporterManager implements EventExporterService {
public class EventSubscriptionManager implements EventSubscriptionService {
private final Logger log = LoggerFactory.getLogger(getClass());
......@@ -147,15 +147,12 @@ public class EventExporterManager implements EventExporterService {
+ "registered to make this request.");
}
if (!validGroupId(subscriber.subscriberGroupId(),
subscriber.appName())) {
if (!validGroupId(subscriber.subscriberGroupId(), subscriber.appName())) {
throw new InvalidGroupIdException("Incorrect group id in the request");
}
OnosEventListener onosListener = getListener(subscriber.eventType());
checkNotNull(onosListener,
"No listener for the supported event type - {}",
subscriber.eventType());
checkNotNull(onosListener, "No listener for the supported event type - {}", subscriber.eventType());
applyListenerAction(subscriber.eventType(), onosListener,
ListenerAction.START);
......@@ -169,7 +166,7 @@ public class EventExporterManager implements EventExporterService {
subscriptionList.add(subscriber);
subscriptions.put(subscriber.eventType(), subscriptionList);
log.info("Subscription for {} event by {} successfull",
log.info("Subscription for {} event by {} successful",
subscriber.eventType(), subscriber.appName());
}
......@@ -308,6 +305,12 @@ public class EventExporterManager implements EventExporterService {
subscriber.eventType());
}
@Override
public List<EventSubscriber> getEventSubscribers(Type type) {
return subscriptions.getOrDefault(type, ImmutableList.of());
}
/**
* Checks if the subscriber has already subscribed to the requested event
* type.
......
......@@ -21,7 +21,6 @@ import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onosproject.codec.CodecService;
import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
import org.onosproject.kafkaintegration.rest.SubscriberCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
......@@ -16,6 +16,7 @@ package org.onosproject.kafkaintegration.impl;
import org.onosproject.event.AbstractListenerManager;
import org.onosproject.kafkaintegration.api.ExportableEventListener;
import org.onosproject.kafkaintegration.api.KafkaPublisherService;
import org.onosproject.kafkaintegration.api.dto.OnosEvent;
import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
import org.slf4j.Logger;
......@@ -27,17 +28,19 @@ import com.google.protobuf.GeneratedMessage;
* Dispatch ONOS Events to all interested Listeners.
*
*/
public final class Dispatcher
extends AbstractListenerManager<OnosEvent, ExportableEventListener> {
public final class KafkaPublisherManager
extends AbstractListenerManager<OnosEvent, ExportableEventListener> implements KafkaPublisherService {
private final Logger log = LoggerFactory.getLogger(getClass());
// Exists to defeat instantiation
private Dispatcher() {
private KafkaPublisherManager() {
}
//TODO: If possible, get rid of Singleton implementation.
private static class SingletonHolder {
private static final Dispatcher INSTANCE = new Dispatcher();
private static final KafkaPublisherManager INSTANCE = new KafkaPublisherManager();
}
/**
......@@ -45,16 +48,11 @@ public final class Dispatcher
*
* @return singleton object
*/
public static Dispatcher getInstance() {
public static KafkaPublisherManager getInstance() {
return SingletonHolder.INSTANCE;
}
/**
* Publish the ONOS Event to all listeners.
*
* @param eventType the ONOS eventtype
* @param message generated Protocol buffer message from ONOS event data
*/
@Override
public void publish(Type eventType, GeneratedMessage message) {
log.debug("Dispatching ONOS Event {}", eventType);
post(new OnosEvent(eventType, message));
......
......@@ -14,11 +14,6 @@
*/
package org.onosproject.kafkaintegration.impl;
import java.util.TreeMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -31,6 +26,11 @@ import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.TreeMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Component(immediate = true)
public class KafkaStorageManager implements KafkaEventStorageService {
......
......@@ -12,19 +12,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.kafkaintegration.rest;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.UUID;
package org.onosproject.kafkaintegration.impl;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.onosproject.codec.CodecContext;
import org.onosproject.codec.JsonCodec;
import org.onosproject.kafkaintegration.api.dto.DefaultEventSubscriber;
import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.UUID;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Codec for encoding/decoding a Subscriber object to/from JSON.
......
/**
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.kafkaintegration.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.concurrent.Future;
/**
* Implementation of Kafka Producer.
*/
public class Producer {
private KafkaProducer<String, byte[]> kafkaProducer = null;
private final Logger log = LoggerFactory.getLogger(getClass());
Producer(String bootstrapServers, int retries, int maxInFlightRequestsPerConnection,
int requestRequiredAcks, String keySerializer, String valueSerializer) {
Properties prop = new Properties();
prop.put("bootstrap.servers", bootstrapServers);
prop.put("retries", retries);
prop.put("max.in.flight.requests.per.connection", maxInFlightRequestsPerConnection);
prop.put("request.required.acks", requestRequiredAcks);
prop.put("key.serializer", keySerializer);
prop.put("value.serializer", valueSerializer);
kafkaProducer = new KafkaProducer<>(prop);
}
public void start() {
log.info("Started");
}
public void stop() {
if (kafkaProducer != null) {
kafkaProducer.close();
kafkaProducer = null;
}
log.info("Stopped");
}
public Future<RecordMetadata> send(ProducerRecord<String, byte[]> record) {
return kafkaProducer.send(record);
}
}
/**
* Copyright 2016-present Open Networking Laboratory Licensed under the Apache
* License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
/**
* API implementation classes.
*/
package org.onosproject.kafkaintegration.kafka;
......@@ -14,29 +14,31 @@
*/
package org.onosproject.kafkaintegration.listener;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
import com.google.protobuf.GeneratedMessage;
import org.onosproject.event.ListenerService;
import org.onosproject.kafkaintegration.impl.Dispatcher;
import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
import org.onosproject.kafkaintegration.converter.ConversionFactory;
import org.onosproject.kafkaintegration.converter.EventConverter;
import org.onosproject.kafkaintegration.impl.KafkaPublisherManager;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.GeneratedMessage;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
/**
* Listens for ONOS Device events.
*
*/
final class DeviceEventsListener implements OnosEventListener {
public final class DeviceEventsListener implements OnosEventListener {
private final Logger log = LoggerFactory.getLogger(getClass());
private boolean listenerRunning = false;
private InnerListener listener = null;
// Exists to defeat instantiation
private DeviceEventsListener() {
}
......@@ -70,14 +72,14 @@ final class DeviceEventsListener implements OnosEventListener {
@Override
public void event(DeviceEvent arg0) {
// Convert the event to GPB format
// Convert the event to protobuf format
ConversionFactory conversionFactory =
ConversionFactory.getInstance();
EventConverter converter = conversionFactory.getConverter(DEVICE);
GeneratedMessage message = converter.convertToProtoMessage(arg0);
// Call Dispatcher and publish event
Dispatcher.getInstance().publish(DEVICE, message);
KafkaPublisherManager.getInstance().publish(DEVICE, message);
}
}
......@@ -91,4 +93,4 @@ final class DeviceEventsListener implements OnosEventListener {
}
}
}
}
\ No newline at end of file
......
......@@ -14,29 +14,27 @@
*/
package org.onosproject.kafkaintegration.listener;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
import com.google.protobuf.GeneratedMessage;
import org.onosproject.event.ListenerService;
import org.onosproject.kafkaintegration.impl.Dispatcher;
import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
import org.onosproject.kafkaintegration.converter.ConversionFactory;
import org.onosproject.kafkaintegration.converter.EventConverter;
import org.onosproject.kafkaintegration.impl.KafkaPublisherManager;
import org.onosproject.net.link.LinkEvent;
import org.onosproject.net.link.LinkListener;
import org.onosproject.net.link.LinkService;
import com.google.protobuf.GeneratedMessage;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
/**
* Listens for ONOS Link Events.
*
*/
final class LinkEventsListener implements OnosEventListener {
public final class LinkEventsListener implements OnosEventListener {
private boolean listenerRunning = false;
private InnerListener listener = null;
// Exists to defeat instantiation
private LinkEventsListener() {
}
......@@ -70,14 +68,14 @@ final class LinkEventsListener implements OnosEventListener {
@Override
public void event(LinkEvent arg0) {
// Convert the event to GPB format
// Convert the event to protobuf format
ConversionFactory conversionFactory =
ConversionFactory.getInstance();
EventConverter converter = conversionFactory.getConverter(LINK);
GeneratedMessage message = converter.convertToProtoMessage(arg0);
// Call Dispatcher and publish event
Dispatcher.getInstance().publish(LINK, message);
KafkaPublisherManager.getInstance().publish(LINK, message);
}
}
......
......@@ -14,13 +14,13 @@
*/
package org.onosproject.kafkaintegration.listener;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
import java.util.HashMap;
import java.util.Map;
import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
/**
* Returns the appropriate listener object based on the ONOS event type.
......
......@@ -30,7 +30,6 @@ public interface OnosEventListener {
* @param service ONOS event listener for the specific event type
*/
void startListener(Type event, ListenerService<?, ?> service);
/**
* Stop the Listener for the specific ONOS event type.
*
......
......@@ -31,8 +31,9 @@
<modules>
<module>api</module>
<module>core</module>
<module>web</module>
<module>app</module>
</modules>
</project>
</project>
\ No newline at end of file
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2016-present Open Networking Laboratory
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.onosproject</groupId>
<artifactId>onos-kafka</artifactId>
<version>1.7.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>onos-app-kafka-web</artifactId>
<properties>
<web.context>/onos/kafka</web.context>
<api.version>1.0.0</api.version>
<api.package>org.onosproject.kafkaintegration.rest</api.package>
<api.title>Kafka Integration Application REST API</api.title>
<api.description>
APIs for subscribing to Events generated by ONOS
</api.description>
</properties>
<packaging>bundle</packaging>
<dependencies>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-api</artifactId>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-app-kafka-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-incubator-protobuf</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onlab-osgi</artifactId>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-rest</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-api</artifactId>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.0.0-beta-2</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.containers</groupId>
<artifactId>jersey-container-servlet</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-core-serializers</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr.annotations</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<extensions>true</extensions>
<configuration>
<instructions>
<Bundle-SymbolicName>
${project.groupId}.${project.artifactId}
</Bundle-SymbolicName>
<_wab>src/main/webapp/</_wab>
<Include-Resource>
WEB-INF/classes/apidoc/swagger.json=target/swagger.json,
{maven-resources}
</Include-Resource>
<Import-Package>
org.slf4j,
org.osgi.framework,
javax.ws.rs,
javax.ws.rs.core,
org.glassfish.jersey.servlet,
com.fasterxml.jackson.databind,
com.fasterxml.jackson.databind.node,
com.fasterxml.jackson.core,
org.onlab.packet.*,
org.onosproject.*,
org.onlab.util.*,
com.google.common.*,
com.google.protobuf.*
</Import-Package>
<Web-ContextPath>${web.context}</Web-ContextPath>
</instructions>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-scr-plugin</artifactId>
<executions>
<execution>
<id>generate-scr-srcdescriptor</id>
<goals>
<goal>scr</goal>
</goals>
</execution>
</executions>
<configuration>
<supportedProjectTypes>
<supportedProjectType>bundle</supportedProjectType>
<supportedProjectType>war</supportedProjectType>
</supportedProjectTypes>
</configuration>
</plugin>
<plugin>
<groupId>org.onosproject</groupId>
<artifactId>onos-maven-plugin</artifactId>
<executions>
<execution>
<id>cfg</id>
<phase>generate-resources</phase>
<goals>
<goal>cfg</goal>
</goals>
</execution>
<execution>
<id>swagger</id>
<goals>
<goal>swagger</goal>
</goals>
</execution>
<execution>
<id>app</id>
<phase>package</phase>
<goals>
<goal>app</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
......@@ -14,11 +14,15 @@
*/
package org.onosproject.kafkaintegration.rest;
import static com.google.common.base.Preconditions.checkNotNull;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.onosproject.kafkaintegration.api.EventSubscriptionService;
import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
import org.onosproject.rest.AbstractWebResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
......@@ -27,16 +31,11 @@ import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.InputStream;
import org.onosproject.kafkaintegration.api.EventExporterService;
import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
import org.onosproject.rest.AbstractWebResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import static com.google.common.base.Preconditions.checkNotNull;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
/**
* Rest Interfaces for subscribing/unsubscribing to event notifications.
......@@ -52,11 +51,12 @@ public class EventExporterWebResource extends AbstractWebResource {
public static final String DEREGISTRATION_SUCCESSFUL =
"De-Registered Listener successfully";
public static final String EVENT_SUBSCRIPTION_SUCCESSFUL =
"Event Registration successfull";
"Event Registration successful";
public static final String EVENT_SUBSCRIPTION_UNSUCCESSFUL =
"Event subscription unsuccessful";
public static final String EVENT_SUBSCRIPTION_REMOVED =
"Event De-Registration successfull";
"Event De-Registration successful";
/**
* Registers a listener for ONOS Events.
*
......@@ -71,12 +71,11 @@ public class EventExporterWebResource extends AbstractWebResource {
@Path("register")
public Response registerKafkaListener(String appName) {
EventExporterService service = get(EventExporterService.class);
EventSubscriptionService service = get(EventSubscriptionService.class);
EventSubscriberGroupId groupId = service.registerListener(appName);
log.info("Registered app {}", appName);
// TODO: Should also return Kafka server information.
// Will glue this in when we have the config and Kafka modules ready
return ok(groupId.getId().toString()).build();
......@@ -92,7 +91,7 @@ public class EventExporterWebResource extends AbstractWebResource {
@DELETE
@Path("unregister")
public Response removeKafkaListener(String appName) {
EventExporterService service = get(EventExporterService.class);
EventSubscriptionService service = get(EventSubscriptionService.class);
service.unregisterListener(appName);
log.info("Unregistered app {}", appName);
......@@ -112,11 +111,12 @@ public class EventExporterWebResource extends AbstractWebResource {
@Path("subscribe")
public Response subscribe(InputStream input) {
EventExporterService service = get(EventExporterService.class);
EventSubscriptionService service = get(EventSubscriptionService.class);
try {
EventSubscriber sub = parseSubscriptionData(input);
service.subscribe(sub);
// It will subscribe to all the topics. Not only the one that is sent by the consumer.
} catch (Exception e) {
log.error(e.getMessage());
return Response.status(BAD_REQUEST).entity(e.getMessage()).build();
......@@ -155,7 +155,7 @@ public class EventExporterWebResource extends AbstractWebResource {
@Path("unsubscribe")
public Response unsubscribe(InputStream input) {
EventExporterService service = get(EventExporterService.class);
EventSubscriptionService service = get(EventSubscriptionService.class);
try {
EventSubscriber sub = parseSubscriptionData(input);
......