Shravan Ambati
Committed by Gerrit Code Review

Refactored Kafka Application to simplify dependencies

1. Fixed a Bug in KafkaProducer. Without this fix the App will not send data in GPB format.
2. Added two new services - KafkaProducerService and KafkaConfigService.
3. Fixed a TODO in the register API to return Kafka server information.
4. Removed the use of LeadershipService and ClusterService, since we are not ready for clustering yet.

Change-Id: If20ef5238bb4629af0c6769129494eb44abf1d3c
Showing 18 changed files with 675 additions and 456 deletions
......@@ -39,6 +39,12 @@
<version>3.0.0-beta-2</version>
</dependency>
<dependency>
<groupId>org.apache.servicemix.bundles</groupId>
<artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
<version>0.8.2.2_1</version>
</dependency>
</dependencies>
<build>
......
......@@ -14,16 +14,16 @@
*/
package org.onosproject.kafkaintegration.api;
import java.util.List;
import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
import org.onosproject.kafkaintegration.api.dto.RegistrationResponse;
import org.onosproject.kafkaintegration.errors.InvalidApplicationException;
import org.onosproject.kafkaintegration.errors.InvalidGroupIdException;
import com.google.common.annotations.Beta;
import java.util.List;
/**
* APIs for subscribing to Onos Event Messages.
*/
......@@ -34,9 +34,9 @@ public interface EventSubscriptionService {
* Registers the external application to receive events generated in ONOS.
*
* @param appName Application Name
* @return unique consumer group identifier
* @return Registration Response DTO.
*/
EventSubscriberGroupId registerListener(String appName);
RegistrationResponse registerListener(String appName);
/**
* Removes the Registered Listener.
......
/**
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.kafkaintegration.api;
import org.onosproject.kafkaintegration.api.dto.KafkaServerConfig;
public interface KafkaConfigService {
/**
* Returns the Kafka Server Configuration Information.
*
* @return KafkaServerConfig DTO Object.
*/
KafkaServerConfig getConfigParams();
}
/**
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.kafkaintegration.api;
import java.util.concurrent.Future;
import org.onosproject.kafkaintegration.api.dto.KafkaServerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* APIs for controlling the Kafka Producer.
*
*/
public interface KafkaProducerService {
/**
* Starts the Kafka Producer.
*
* @param config the Kafka Server Config
*/
void start(KafkaServerConfig config);
/**
* Stops the Kafka Producer.
*
*/
void stop();
/**
* Restarts the Kafka Producer.
*
* @param config the Kafka Server Config
*/
void restart(KafkaServerConfig config);
/**
* Sends message to Kafka Server.
*
* @param record a message to be sent
* @return metadata for a record that as been acknowledged
*/
public Future<RecordMetadata> send(ProducerRecord<String, byte[]> record);
}
......@@ -84,6 +84,7 @@ public final class DefaultEventSubscriber implements EventSubscriber {
.addValue(subscriberGroupId.toString())
.add("eventType", eventType).toString();
}
/**
* To create an instance of the builder.
*
......@@ -92,6 +93,7 @@ public final class DefaultEventSubscriber implements EventSubscriber {
public static Builder builder() {
return new Builder();
}
/**
* Builder class for Event subscriber.
*/
......@@ -107,8 +109,7 @@ public final class DefaultEventSubscriber implements EventSubscriber {
}
@Override
public Builder setSubscriberGroupId(EventSubscriberGroupId
subscriberGroupId) {
public Builder setSubscriberGroupId(EventSubscriberGroupId subscriberGroupId) {
this.subscriberGroupId = subscriberGroupId;
return this;
}
......@@ -122,13 +123,11 @@ public final class DefaultEventSubscriber implements EventSubscriber {
@Override
public EventSubscriber build() {
checkNotNull(appName, "App name cannot be null");
checkNotNull(subscriberGroupId, "Subscriber group ID cannot " +
"be " +
"null");
checkNotNull(subscriberGroupId,
"Subscriber group ID cannot " + "be " + "null");
checkNotNull(eventType, "Event type cannot be null");
return new DefaultEventSubscriber(appName,
subscriberGroupId,
return new DefaultEventSubscriber(appName, subscriberGroupId,
eventType);
}
}
......
/**
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.kafkaintegration.api.dto;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* DTO to hold Kafka Server Configuration information.
*
*/
public final class KafkaServerConfig {
private final String ipAddress;
private final String port;
private final int numOfRetries;
private final int maxInFlightRequestsPerConnection;
private final int acksRequired;
private final String keySerializer;
private final String valueSerializer;
private KafkaServerConfig(String ipAddress, String port, int numOfRetries,
int maxInFlightRequestsPerConnection,
int requestRequiredAcks, String keySerializer,
String valueSerializer) {
this.ipAddress = checkNotNull(ipAddress, "Ip Address Cannot be null");
this.port = checkNotNull(port, "Port Number cannot be null");
this.numOfRetries = numOfRetries;
this.maxInFlightRequestsPerConnection =
maxInFlightRequestsPerConnection;
this.acksRequired = requestRequiredAcks;
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
}
public final String getIpAddress() {
return ipAddress;
}
public final String getPort() {
return port;
}
public final int getNumOfRetries() {
return numOfRetries;
}
public final int getMaxInFlightRequestsPerConnection() {
return maxInFlightRequestsPerConnection;
}
public final int getAcksRequired() {
return acksRequired;
}
public final String getKeySerializer() {
return keySerializer;
}
public final String getValueSerializer() {
return valueSerializer;
}
/**
* To create an instance of the builder.
*
* @return instance of builder
*/
public static Builder builder() {
return new Builder();
}
/**
* Builder class for KafkaServerConfig.
*/
public static final class Builder {
private String ipAddress;
private String port;
private int numOfRetries;
private int maxInFlightRequestsPerConnection;
private int acksRequired;
private String keySerializer;
private String valueSerializer;
public Builder ipAddress(String ipAddress) {
this.ipAddress = ipAddress;
return this;
}
public Builder port(String port) {
this.port = port;
return this;
}
public Builder numOfRetries(int numOfRetries) {
this.numOfRetries = numOfRetries;
return this;
}
public Builder maxInFlightRequestsPerConnection(int maxInFlightRequestsPerConnection) {
this.maxInFlightRequestsPerConnection =
maxInFlightRequestsPerConnection;
return this;
}
public Builder acksRequired(int acksRequired) {
this.acksRequired = acksRequired;
return this;
}
public Builder keySerializer(String keySerializer) {
this.keySerializer = keySerializer;
return this;
}
public Builder valueSerializer(String valueSerializer) {
this.valueSerializer = valueSerializer;
return this;
}
public KafkaServerConfig build() {
checkNotNull(ipAddress, "App name cannot be null");
checkNotNull(port, "Subscriber group ID cannot " + "be " + "null");
return new KafkaServerConfig(ipAddress, port, numOfRetries,
maxInFlightRequestsPerConnection,
acksRequired, keySerializer,
valueSerializer);
}
}
}
/**
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.kafkaintegration.api.dto;
import static com.google.common.base.MoreObjects.toStringHelper;
import java.util.Objects;
/**
* DTO to hold Registration Response for requests from external apps.
*/
public final class RegistrationResponse {
private EventSubscriberGroupId groupId;
private String ipAddress;
private String port;
public RegistrationResponse(EventSubscriberGroupId groupId,
String ipAddress, String port) {
this.groupId = groupId;
this.ipAddress = ipAddress;
this.port = port;
}
public final EventSubscriberGroupId getGroupId() {
return groupId;
}
public final String getIpAddress() {
return ipAddress;
}
public final String getPort() {
return port;
}
@Override
public boolean equals(Object o) {
if (o instanceof RegistrationResponse) {
RegistrationResponse sub = (RegistrationResponse) o;
if (sub.groupId.equals(groupId) && sub.ipAddress.equals(ipAddress)
&& sub.port.equals(port)) {
return true;
}
}
return false;
}
@Override
public int hashCode() {
return Objects.hash(groupId, ipAddress, port);
}
@Override
public String toString() {
return toStringHelper(this).add("subscriberGroupId", groupId)
.add("ipAddress", ipAddress).add("port", port).toString();
}
}
......@@ -30,7 +30,8 @@
<packaging>bundle</packaging>
<description>
Kafka Integration Application.
This module is exclusive of REST calls and is only for the implementation of Apache Kafka.
This module is exclusive of REST calls and is only for the implementation
of the Application.
</description>
<dependencies>
......
......@@ -15,7 +15,15 @@
*/
package org.onosproject.kafkaintegration.impl;
import com.google.common.collect.ImmutableList;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -25,14 +33,15 @@ import org.apache.felix.scr.annotations.Service;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.kafkaintegration.api.EventSubscriptionService;
import org.onosproject.kafkaintegration.api.KafkaConfigService;
import org.onosproject.kafkaintegration.api.dto.DefaultEventSubscriber;
import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
import org.onosproject.kafkaintegration.api.dto.RegistrationResponse;
import org.onosproject.kafkaintegration.api.dto.OnosEvent;
import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
import org.onosproject.kafkaintegration.errors.InvalidApplicationException;
import org.onosproject.kafkaintegration.errors.InvalidGroupIdException;
import org.onosproject.kafkaintegration.listener.ListenerFactory;
import org.onosproject.kafkaintegration.listener.OnosEventListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.link.LinkService;
......@@ -42,14 +51,7 @@ import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
import com.google.common.collect.ImmutableList;
/**
* Implementation of Event Subscription Manager.
......@@ -82,6 +84,9 @@ public class EventSubscriptionManager implements EventSubscriptionService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected KafkaConfigService kafkaConfigService;
private ApplicationId appId;
@Activate
......@@ -100,13 +105,11 @@ public class EventSubscriptionManager implements EventSubscriptionService {
subscriptions = storageService
.<Type, List<EventSubscriber>>consistentMapBuilder()
.withName(SUBSCRIBED_APPS)
.withSerializer(Serializer.using(KryoNamespaces.API,
EventSubscriber.class,
OnosEvent.class,
OnosEvent.Type.class,
.withSerializer(Serializer
.using(KryoNamespaces.API, EventSubscriber.class,
OnosEvent.class, OnosEvent.Type.class,
DefaultEventSubscriber.class,
EventSubscriberGroupId.class,
UUID.class))
EventSubscriberGroupId.class, UUID.class))
.build().asJavaMap();
log.info("Started");
......@@ -118,15 +121,22 @@ public class EventSubscriptionManager implements EventSubscriptionService {
}
@Override
public EventSubscriberGroupId registerListener(String appName) {
public RegistrationResponse registerListener(String appName) {
// TODO: Remove it once ONOS provides a mechanism for external apps
// to register with the core service. See Jira - 4409
ApplicationId externalAppId = coreService.registerApplication(appName);
return registeredApps.computeIfAbsent(externalAppId,
EventSubscriberGroupId id =
registeredApps.computeIfAbsent(externalAppId,
(key) -> new EventSubscriberGroupId(UUID
.randomUUID()));
RegistrationResponse response = new RegistrationResponse(id,
kafkaConfigService.getConfigParams().getIpAddress(),
kafkaConfigService.getConfigParams().getPort());
return response;
}
@Override
......@@ -147,16 +157,11 @@ public class EventSubscriptionManager implements EventSubscriptionService {
+ "registered to make this request.");
}
if (!validGroupId(subscriber.subscriberGroupId(), subscriber.appName())) {
if (!validGroupId(subscriber.subscriberGroupId(),
subscriber.appName())) {
throw new InvalidGroupIdException("Incorrect group id in the request");
}
OnosEventListener onosListener = getListener(subscriber.eventType());
checkNotNull(onosListener, "No listener for the supported event type - {}", subscriber.eventType());
applyListenerAction(subscriber.eventType(), onosListener,
ListenerAction.START);
// update internal state
List<EventSubscriber> subscriptionList =
subscriptions.get(subscriber.eventType());
......@@ -233,12 +238,6 @@ public class EventSubscriptionManager implements EventSubscriptionService {
* @param eventType ONOS event type
* @return ONOS event listener
*/
private OnosEventListener getListener(Type eventType) {
checkNotNull(eventType);
ListenerFactory factory = ListenerFactory.getInstance();
OnosEventListener onosListener = factory.getListener(eventType);
return onosListener;
}
/**
* Checks if the group id is valid for this registered application.
......@@ -287,15 +286,6 @@ public class EventSubscriptionManager implements EventSubscriptionService {
// stop the listener.
List<EventSubscriber> subscribers =
subscriptions.get(subscriber.eventType());
if (subscribers.size() == 1) {
OnosEventListener onosListener =
getListener(subscriber.eventType());
checkNotNull(onosListener,
"No listener for the supported event type - {}",
subscriber.eventType());
applyListenerAction(subscriber.eventType(), onosListener,
ListenerAction.STOP);
}
// update internal state.
subscribers.remove(subscriber);
......@@ -308,7 +298,6 @@ public class EventSubscriptionManager implements EventSubscriptionService {
@Override
public List<EventSubscriber> getEventSubscribers(Type type) {
return subscriptions.getOrDefault(type, ImmutableList.of());
}
/**
......
......@@ -31,7 +31,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Component(immediate = true)
@Component(immediate = false)
public class KafkaStorageManager implements KafkaEventStorageService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
......
......@@ -19,36 +19,32 @@ package org.onosproject.kafkaintegration.kafka;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.kafkaintegration.api.EventConversionService;
import org.onosproject.kafkaintegration.api.EventSubscriptionService;
import org.onosproject.kafkaintegration.api.KafkaProducerService;
import org.onosproject.kafkaintegration.api.KafkaConfigService;
import org.onosproject.kafkaintegration.api.dto.OnosEvent;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.link.LinkEvent;
import org.onosproject.net.link.LinkListener;
import org.onosproject.net.link.LinkService;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Dictionary;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
/**
* Encapsulates the behavior of monitoring various ONOS events.
* */
......@@ -63,152 +59,66 @@ public class EventMonitor {
protected EventConversionService eventConversionService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected KafkaProducerService kafkaProducer;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LinkService linkService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ComponentConfigService componentConfigService;
protected KafkaConfigService kafkaConfigService;
private final DeviceListener deviceListener = new InternalDeviceListener();
private final LinkListener linkListener = new InternalLinkListener();
protected ExecutorService eventExecutor;
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final int RETRIES = 1;
private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 5;
private static final int REQUEST_REQUIRED_ACKS = 1;
private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
private static final String VALUE_SERIALIZER = "org.apache.kafka.common.serialization.ByteArraySerializer";
@Property(name = "bootstrap.servers", value = BOOTSTRAP_SERVERS,
label = "Default host/post pair to establish initial connection to Kafka cluster.")
private String bootstrapServers = BOOTSTRAP_SERVERS;
@Property(name = "retries", intValue = RETRIES,
label = "Number of times the producer can retry to send after first failure")
private int retries = RETRIES;
@Property(name = "max.in.flight.requests.per.connection", intValue = MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
label = "The maximum number of unacknowledged requests the client will send before blocking")
private int maxInFlightRequestsPerConnection = MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION;
@Property(name = "request.required.acks", intValue = 1,
label = "Producer will get an acknowledgement after the leader has replicated the data")
private int requestRequiredAcks = REQUEST_REQUIRED_ACKS;
@Property(name = "key.serializer", value = KEY_SERIALIZER,
label = "Serializer class for key that implements the Serializer interface.")
private String keySerializer = KEY_SERIALIZER;
@Property(name = "value.serializer", value = VALUE_SERIALIZER,
label = "Serializer class for value that implements the Serializer interface.")
private String valueSerializer = VALUE_SERIALIZER;
private Producer producer;
@Activate
protected void activate(ComponentContext context) {
componentConfigService.registerProperties(getClass());
protected void activate() {
eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/onosEvents", "events-%d", log));
deviceService.addListener(deviceListener);
linkService.addListener(linkListener);
producer = new Producer(bootstrapServers, retries, maxInFlightRequestsPerConnection,
requestRequiredAcks, keySerializer, valueSerializer);
producer.start();
kafkaProducer.start(kafkaConfigService.getConfigParams());
log.info("Started");
}
@Deactivate
protected void deactivate() {
componentConfigService.unregisterProperties(getClass(), false);
deviceService.removeListener(deviceListener);
linkService.removeListener(linkListener);
producer.stop();
eventExecutor.shutdownNow();
eventExecutor = null;
log.info("Stopped");
}
@Modified
private void modified(ComponentContext context) {
if (context == null) {
bootstrapServers = BOOTSTRAP_SERVERS;
retries = RETRIES;
maxInFlightRequestsPerConnection = MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION;
requestRequiredAcks = REQUEST_REQUIRED_ACKS;
keySerializer = KEY_SERIALIZER;
valueSerializer = VALUE_SERIALIZER;
return;
}
Dictionary properties = context.getProperties();
kafkaProducer.stop();
String newBootstrapServers = BOOTSTRAP_SERVERS;
int newRetries = RETRIES;
int newMaxInFlightRequestsPerConnection = MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION;
int newRequestRequiredAcks = REQUEST_REQUIRED_ACKS;
try {
String s = get(properties, "bootstrapServers");
newBootstrapServers = isNullOrEmpty(s)
? bootstrapServers : s.trim();
s = get(properties, "retries");
newRetries = isNullOrEmpty(s)
? retries : Integer.parseInt(s.trim());
s = get(properties, "maxInFlightRequestsPerConnection");
newMaxInFlightRequestsPerConnection = isNullOrEmpty(s)
? maxInFlightRequestsPerConnection : Integer.parseInt(s.trim());
s = get(properties, "requestRequiredAcks");
newRequestRequiredAcks = isNullOrEmpty(s)
? requestRequiredAcks : Integer.parseInt(s.trim());
} catch (NumberFormatException | ClassCastException e) {
return;
}
boolean modified = newBootstrapServers != bootstrapServers ||
newRetries != retries ||
newMaxInFlightRequestsPerConnection != maxInFlightRequestsPerConnection ||
newRequestRequiredAcks != requestRequiredAcks;
if (modified) {
bootstrapServers = newBootstrapServers;
retries = newRetries;
maxInFlightRequestsPerConnection = newMaxInFlightRequestsPerConnection;
requestRequiredAcks = newRequestRequiredAcks;
if (producer != null) {
producer.stop();
}
producer = new Producer(bootstrapServers, retries, maxInFlightRequestsPerConnection,
requestRequiredAcks, keySerializer, valueSerializer);
producer.start();
log.info("Modified");
} else {
return;
}
log.info("Stopped");
}
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
if (!eventSubscriptionService.getEventSubscribers(DEVICE).isEmpty()) {
OnosEvent onosEvent = eventConversionService.convertEvent(event);
eventExecutor.execute(() -> {
try {
String id = UUID.randomUUID().toString();
producer.send(new ProducerRecord<>(DEVICE.toString(),
id, event.subject().toString().getBytes())).get();
log.debug("Device event sent successfully.");
} catch (InterruptedException e) {
kafkaProducer.send(new ProducerRecord<>(DEVICE.toString(),
onosEvent.subject().toByteArray())).get();
log.debug("Event Type - {}, Subject {} sent successfully.",
DEVICE, onosEvent.subject());
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
log.error("Exception thrown {}", e);
} catch (ExecutionException e2) {
log.error("Exception thrown {}", e2);
}
});
} else {
......@@ -221,17 +131,21 @@ public class EventMonitor {
@Override
public void event(LinkEvent event) {
if (!eventSubscriptionService.getEventSubscribers(LINK).isEmpty()) {
OnosEvent onosEvent = eventConversionService.convertEvent(event);
eventExecutor.execute(() -> {
try {
String id = UUID.randomUUID().toString();
producer.send(new ProducerRecord<>(LINK.toString(),
id, event.subject().toString().getBytes())).get();
log.debug("Link event sent successfully.");
} catch (InterruptedException e) {
kafkaProducer.send(new ProducerRecord<>(LINK.toString(),
onosEvent.subject().toByteArray())).get();
log.debug("Event Type - {}, Subject {} sent successfully.",
LINK, onosEvent.subject());
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
log.error("Exception thrown {}", e);
} catch (ExecutionException e2) {
log.error("Exception thrown {}", e2);
}
});
} else {
......
/**
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.kafkaintegration.kafka;
import static com.google.common.base.Strings.isNullOrEmpty;
import static org.onlab.util.Tools.get;
import java.util.Dictionary;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.kafkaintegration.api.KafkaConfigService;
import org.onosproject.kafkaintegration.api.KafkaProducerService;
import org.onosproject.kafkaintegration.api.dto.KafkaServerConfig;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Component(immediate = true)
@Service
public class KafkaConfigManager implements KafkaConfigService {
private final Logger log = LoggerFactory.getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ComponentConfigService componentConfigService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected KafkaProducerService producer;
public static final String BOOTSTRAP_SERVERS = "localhost:9092";
private String kafkaServerIp =
BOOTSTRAP_SERVERS.substring(0, BOOTSTRAP_SERVERS.indexOf(":"));
private String kafkaServerPortNum =
BOOTSTRAP_SERVERS.substring(BOOTSTRAP_SERVERS.indexOf(":") + 1,
BOOTSTRAP_SERVERS.length());
private static final int RETRIES = 1;
private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 5;
private static final int REQUEST_REQUIRED_ACKS = 1;
private static final String KEY_SERIALIZER =
"org.apache.kafka.common.serialization.StringSerializer";
private static final String VALUE_SERIALIZER =
"org.apache.kafka.common.serialization.ByteArraySerializer";
@Property(name = "bootstrap.servers", value = BOOTSTRAP_SERVERS,
label = "Default IP/Port pair to establish initial connection to Kafka cluster.")
protected String bootstrapServers = BOOTSTRAP_SERVERS;
@Property(name = "retries", intValue = RETRIES,
label = "Number of times the producer can retry to send after first failure")
protected int retries = RETRIES;
@Property(name = "max.in.flight.requests.per.connection",
intValue = MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
label = "The maximum number of unacknowledged requests the client will send before blocking")
protected int maxInFlightRequestsPerConnection =
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION;
@Property(name = "request.required.acks", intValue = 1,
label = "Producer will get an acknowledgement after the leader has replicated the data")
protected int requestRequiredAcks = REQUEST_REQUIRED_ACKS;
@Property(name = "key.serializer", value = KEY_SERIALIZER,
label = "Serializer class for key that implements the Serializer interface.")
protected String keySerializer = KEY_SERIALIZER;
@Property(name = "value.serializer", value = VALUE_SERIALIZER,
label = "Serializer class for value that implements the Serializer interface.")
protected String valueSerializer = VALUE_SERIALIZER;
@Activate
protected void activate(ComponentContext context) {
componentConfigService.registerProperties(getClass());
log.info("Started");
}
@Deactivate
protected void deactivate() {
componentConfigService.unregisterProperties(getClass(), false);
log.info("Stopped");
}
@Modified
private void modified(ComponentContext context) {
if (context == null) {
bootstrapServers = BOOTSTRAP_SERVERS;
retries = RETRIES;
maxInFlightRequestsPerConnection =
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION;
requestRequiredAcks = REQUEST_REQUIRED_ACKS;
keySerializer = KEY_SERIALIZER;
valueSerializer = VALUE_SERIALIZER;
return;
}
Dictionary<?, ?> properties = context.getProperties();
String newBootstrapServers;
int newRetries;
int newMaxInFlightRequestsPerConnection;
int newRequestRequiredAcks;
try {
String s = get(properties, "bootstrap.servers");
newBootstrapServers =
isNullOrEmpty(s) ? bootstrapServers : s.trim();
s = get(properties, "retries");
newRetries =
isNullOrEmpty(s) ? retries : Integer.parseInt(s.trim());
s = get(properties, "max.in.flight.requests.per.connection");
newMaxInFlightRequestsPerConnection =
isNullOrEmpty(s) ? maxInFlightRequestsPerConnection
: Integer.parseInt(s.trim());
s = get(properties, "request.required.acks");
newRequestRequiredAcks =
isNullOrEmpty(s) ? requestRequiredAcks
: Integer.parseInt(s.trim());
} catch (NumberFormatException | ClassCastException e) {
return;
}
if (configModified(newBootstrapServers, newRetries,
newMaxInFlightRequestsPerConnection,
newRequestRequiredAcks)) {
bootstrapServers = newBootstrapServers;
kafkaServerIp = bootstrapServers
.substring(0, bootstrapServers.indexOf(":"));
kafkaServerPortNum = bootstrapServers
.substring(bootstrapServers.indexOf(":") + 1,
bootstrapServers.length());
retries = newRetries;
maxInFlightRequestsPerConnection =
newMaxInFlightRequestsPerConnection;
requestRequiredAcks = newRequestRequiredAcks;
producer.restart(KafkaServerConfig.builder()
.ipAddress(kafkaServerIp).port(kafkaServerPortNum)
.numOfRetries(retries)
.maxInFlightRequestsPerConnection(maxInFlightRequestsPerConnection)
.acksRequired(requestRequiredAcks)
.keySerializer(keySerializer)
.valueSerializer(valueSerializer).build());
log.info("Kafka Server Config has been Modified - "
+ "bootstrapServers {}, retries {}, "
+ "maxInFlightRequestsPerConnection {}, "
+ "requestRequiredAcks {}", bootstrapServers, retries,
maxInFlightRequestsPerConnection, requestRequiredAcks);
} else {
return;
}
}
private boolean configModified(String newBootstrapServers, int newRetries,
int newMaxInFlightRequestsPerConnection,
int newRequestRequiredAcks) {
return !newBootstrapServers.equals(bootstrapServers)
|| newRetries != retries
|| newMaxInFlightRequestsPerConnection != maxInFlightRequestsPerConnection
|| newRequestRequiredAcks != requestRequiredAcks;
}
@Override
public KafkaServerConfig getConfigParams() {
String ipAddr =
bootstrapServers.substring(0, bootstrapServers.indexOf(":"));
String port =
bootstrapServers.substring(bootstrapServers.indexOf(":") + 1,
bootstrapServers.length());
return KafkaServerConfig.builder().ipAddress(ipAddr).port(port)
.numOfRetries(retries)
.maxInFlightRequestsPerConnection(maxInFlightRequestsPerConnection)
.acksRequired(requestRequiredAcks).keySerializer(keySerializer)
.valueSerializer(valueSerializer).build();
}
}
......@@ -16,50 +16,84 @@
package org.onosproject.kafkaintegration.kafka;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.onosproject.kafkaintegration.api.KafkaProducerService;
import org.onosproject.kafkaintegration.api.dto.KafkaServerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.concurrent.Future;
/**
* Implementation of Kafka Producer.
* Implementation of a Kafka Producer.
*/
public class Producer {
@Component
@Service
public class Producer implements KafkaProducerService {
private KafkaProducer<String, byte[]> kafkaProducer = null;
private final Logger log = LoggerFactory.getLogger(getClass());
Producer(String bootstrapServers, int retries, int maxInFlightRequestsPerConnection,
int requestRequiredAcks, String keySerializer, String valueSerializer) {
@Activate
protected void activate() {
log.info("Started");
}
Properties prop = new Properties();
prop.put("bootstrap.servers", bootstrapServers);
prop.put("retries", retries);
prop.put("max.in.flight.requests.per.connection", maxInFlightRequestsPerConnection);
prop.put("request.required.acks", requestRequiredAcks);
prop.put("key.serializer", keySerializer);
prop.put("value.serializer", valueSerializer);
@Deactivate
protected void deactivate() {
log.info("Stopped");
}
kafkaProducer = new KafkaProducer<>(prop);
@Override
public void start(KafkaServerConfig config) {
if (kafkaProducer != null) {
log.info("Producer has already started");
return;
}
public void start() {
log.info("Started");
String bootstrapServer =
new StringBuilder().append(config.getIpAddress()).append(":")
.append(config.getPort()).toString();
// Set Server Properties
Properties prop = new Properties();
prop.put("bootstrap.servers", bootstrapServer);
prop.put("retries", config.getNumOfRetries());
prop.put("max.in.flight.requests.per.connection",
config.getMaxInFlightRequestsPerConnection());
prop.put("request.required.acks", config.getAcksRequired());
prop.put("key.serializer", config.getKeySerializer());
prop.put("value.serializer", config.getValueSerializer());
kafkaProducer = new KafkaProducer<>(prop);
log.info("Kafka Producer has started.");
}
@Override
public void stop() {
if (kafkaProducer != null) {
kafkaProducer.close();
kafkaProducer = null;
}
log.info("Stopped");
log.info("Kafka Producer has Stopped");
}
@Override
public void restart(KafkaServerConfig config) {
stop();
start(config);
}
@Override
public Future<RecordMetadata> send(ProducerRecord<String, byte[]> record) {
return kafkaProducer.send(record);
}
......
/**
* Copyright 2016 Open Networking Laboratory
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.kafkaintegration.listener;
import com.google.protobuf.GeneratedMessage;
import org.onosproject.event.ListenerService;
import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
import org.onosproject.kafkaintegration.converter.ConversionFactory;
import org.onosproject.kafkaintegration.converter.EventConverter;
import org.onosproject.kafkaintegration.impl.KafkaPublisherManager;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
/**
* Listens for ONOS Device events.
*
*/
public final class DeviceEventsListener implements OnosEventListener {
private final Logger log = LoggerFactory.getLogger(getClass());
private boolean listenerRunning = false;
private InnerListener listener = null;
// Exists to defeat instantiation
private DeviceEventsListener() {
}
private static class SingletonHolder {
private static final DeviceEventsListener INSTANCE =
new DeviceEventsListener();
}
/**
* Returns a static reference to the Listener Factory.
*
* @return singleton object
*/
public static DeviceEventsListener getInstance() {
return SingletonHolder.INSTANCE;
}
@Override
public void startListener(Type e, ListenerService<?, ?> service) {
if (!listenerRunning) {
listener = new InnerListener();
DeviceService deviceService = (DeviceService) service;
deviceService.addListener(listener);
listenerRunning = true;
}
}
private class InnerListener implements DeviceListener {
@Override
public void event(DeviceEvent arg0) {
// Convert the event to protobuf format
ConversionFactory conversionFactory =
ConversionFactory.getInstance();
EventConverter converter = conversionFactory.getConverter(DEVICE);
GeneratedMessage message = converter.convertToProtoMessage(arg0);
// Call Dispatcher and publish event
KafkaPublisherManager.getInstance().publish(DEVICE, message);
}
}
@Override
public void stopListener(Type e, ListenerService<?, ?> service) {
if (listenerRunning) {
DeviceService deviceService = (DeviceService) service;
deviceService.removeListener(listener);
listener = null;
listenerRunning = false;
}
}
}
\ No newline at end of file
/**
* Copyright 2016 Open Networking Laboratory
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.kafkaintegration.listener;
import com.google.protobuf.GeneratedMessage;
import org.onosproject.event.ListenerService;
import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
import org.onosproject.kafkaintegration.converter.ConversionFactory;
import org.onosproject.kafkaintegration.converter.EventConverter;
import org.onosproject.kafkaintegration.impl.KafkaPublisherManager;
import org.onosproject.net.link.LinkEvent;
import org.onosproject.net.link.LinkListener;
import org.onosproject.net.link.LinkService;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
/**
* Listens for ONOS Link Events.
*
*/
public final class LinkEventsListener implements OnosEventListener {
private boolean listenerRunning = false;
private InnerListener listener = null;
// Exists to defeat instantiation
private LinkEventsListener() {
}
private static class SingletonHolder {
private static final LinkEventsListener INSTANCE =
new LinkEventsListener();
}
/**
* Returns a static reference to the Listener Factory.
*
* @return singleton object
*/
public static LinkEventsListener getInstance() {
return SingletonHolder.INSTANCE;
}
@Override
public void startListener(Type e, ListenerService<?, ?> service) {
if (!listenerRunning) {
listener = new InnerListener();
LinkService linkService = (LinkService) service;
linkService.addListener(listener);
listenerRunning = true;
}
}
private class InnerListener implements LinkListener {
@Override
public void event(LinkEvent arg0) {
// Convert the event to protobuf format
ConversionFactory conversionFactory =
ConversionFactory.getInstance();
EventConverter converter = conversionFactory.getConverter(LINK);
GeneratedMessage message = converter.convertToProtoMessage(arg0);
// Call Dispatcher and publish event
KafkaPublisherManager.getInstance().publish(LINK, message);
}
}
@Override
public void stopListener(Type e, ListenerService<?, ?> service) {
if (listenerRunning) {
LinkService linkService = (LinkService) service;
linkService.removeListener(listener);
listenerRunning = false;
}
}
}
/**
* Copyright 2016 Open Networking Laboratory
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.kafkaintegration.listener;
import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
import java.util.HashMap;
import java.util.Map;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
/**
* Returns the appropriate listener object based on the ONOS event type.
*
*/
public final class ListenerFactory {
// Store listeners for all supported events
private Map<Type, OnosEventListener> listeners =
new HashMap<Type, OnosEventListener>() {
{
put(DEVICE, DeviceEventsListener.getInstance());
put(LINK, LinkEventsListener.getInstance());
}
};
// Exists to defeat instantiation
private ListenerFactory() {
}
private static class SingletonHolder {
private static final ListenerFactory INSTANCE = new ListenerFactory();
}
/**
* Returns a static reference to the Listener Factory.
*
* @return singleton object
*/
public static ListenerFactory getInstance() {
return SingletonHolder.INSTANCE;
}
/**
* Returns the listener object for the specified ONOS event type.
*
* @param event ONOS Event type
* @return return listener object
*/
public OnosEventListener getListener(Type event) {
return listeners.get(event);
}
}
......@@ -200,6 +200,7 @@
</execution>
<execution>
<id>swagger</id>
<phase>generate-resources</phase>
<goals>
<goal>swagger</goal>
</goals>
......
......@@ -18,12 +18,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.onosproject.kafkaintegration.api.EventSubscriptionService;
import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
import org.onosproject.kafkaintegration.api.dto.RegistrationResponse;
import org.onosproject.rest.AbstractWebResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.POST;
......@@ -62,7 +61,7 @@ public class EventExporterWebResource extends AbstractWebResource {
*
* @param appName The application trying to register
* @return 200 OK with UUID string which should be used as Kafka Consumer
* Group Id
* Group Id and Kafka Server, port information.
* @onos.rsModel KafkaRegistration
*/
@POST
......@@ -73,12 +72,16 @@ public class EventExporterWebResource extends AbstractWebResource {
EventSubscriptionService service = get(EventSubscriptionService.class);
EventSubscriberGroupId groupId = service.registerListener(appName);
RegistrationResponse response = service.registerListener(appName);
ObjectNode result = mapper().createObjectNode();
result.put("groupId", response.getGroupId().getId().toString());
result.put("ipAddress", response.getIpAddress());
result.put("port", response.getPort());
log.info("Registered app {}", appName);
// TODO: Should also return Kafka server information.
// Will glue this in when we have the config and Kafka modules ready
return ok(groupId.getId().toString()).build();
return ok(result.toString()).build();
}
/**
......