Pavlin Radoslavov

Moved the BGP Route Intent synchronization mechanism from the Router class

to the new class IntentSynchronizer.

Also, minor cleanup in some of the method names and access scope.

Change-Id: I38257cd1d9516ef3b3dd50f23c28015054c73d70
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.onos.sdnip;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import org.apache.commons.lang3.tuple.Pair;
import org.onlab.onos.core.ApplicationId;
import org.onlab.onos.net.flow.criteria.Criteria.IPCriterion;
import org.onlab.onos.net.flow.criteria.Criterion;
import org.onlab.onos.net.intent.Intent;
import org.onlab.onos.net.intent.IntentService;
import org.onlab.onos.net.intent.IntentState;
import org.onlab.onos.net.intent.MultiPointToSinglePointIntent;
import org.onlab.packet.Ip4Prefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Objects;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class IntentSynchronizer {
private static final Logger log =
LoggerFactory.getLogger(IntentSynchronizer.class);
private final ApplicationId appId;
private final IntentService intentService;
private final Map<Ip4Prefix, MultiPointToSinglePointIntent> pushedRouteIntents;
//
// State to deal with SDN-IP Leader election and pushing Intents
//
private final ExecutorService bgpIntentsSynchronizerExecutor;
private final Semaphore intentsSynchronizerSemaphore = new Semaphore(0);
private volatile boolean isElectedLeader = false;
private volatile boolean isActivatedLeader = false;
/**
* Class constructor.
*
* @param appId the Application ID
* @param intentService the intent service
*/
IntentSynchronizer(ApplicationId appId, IntentService intentService) {
this.appId = appId;
this.intentService = intentService;
pushedRouteIntents = new ConcurrentHashMap<>();
bgpIntentsSynchronizerExecutor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setNameFormat("bgp-intents-synchronizer-%d").build());
}
/**
* Starts the synchronizer.
*/
public void start() {
bgpIntentsSynchronizerExecutor.execute(new Runnable() {
@Override
public void run() {
doIntentSynchronizationThread();
}
});
}
/**
* Stops the synchronizer.
*/
public void stop() {
// Stop the thread(s)
bgpIntentsSynchronizerExecutor.shutdownNow();
//
// Withdraw all SDN-IP intents
//
if (!isElectedLeader) {
return; // Nothing to do: not the leader anymore
}
log.debug("Withdrawing all SDN-IP Route Intents...");
for (Intent intent : intentService.getIntents()) {
if (!(intent instanceof MultiPointToSinglePointIntent)
|| !intent.appId().equals(appId)) {
continue;
}
intentService.withdraw(intent);
}
pushedRouteIntents.clear();
}
//@Override TODO hook this up to something
public void leaderChanged(boolean isLeader) {
log.debug("Leader changed: {}", isLeader);
if (!isLeader) {
this.isElectedLeader = false;
this.isActivatedLeader = false;
return; // Nothing to do
}
this.isActivatedLeader = false;
this.isElectedLeader = true;
//
// Tell the Intents Synchronizer thread to start the synchronization
//
intentsSynchronizerSemaphore.release();
}
/**
* Gets the pushed route intents.
*
* @return the pushed route intents
*/
public Collection<MultiPointToSinglePointIntent> getPushedRouteIntents() {
List<MultiPointToSinglePointIntent> pushedIntents = new LinkedList<>();
for (Map.Entry<Ip4Prefix, MultiPointToSinglePointIntent> entry :
pushedRouteIntents.entrySet()) {
pushedIntents.add(entry.getValue());
}
return pushedIntents;
}
/**
* Thread for Intent Synchronization.
*/
private void doIntentSynchronizationThread() {
boolean interrupted = false;
try {
while (!interrupted) {
try {
intentsSynchronizerSemaphore.acquire();
//
// Drain all permits, because a single synchronization is
// sufficient.
//
intentsSynchronizerSemaphore.drainPermits();
} catch (InterruptedException e) {
log.debug("Interrupted while waiting to become " +
"Intent Synchronization leader");
interrupted = true;
break;
}
syncIntents();
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
/**
* Submits a multi-point-to-single-point intent.
*
* @param prefix the IPv4 matching prefix for the intent to submit
* @param intent the intent to submit
*/
void submitRouteIntent(Ip4Prefix prefix,
MultiPointToSinglePointIntent intent) {
synchronized (this) {
if (isElectedLeader && isActivatedLeader) {
log.debug("Intent installation: adding Intent for prefix: {}",
prefix);
intentService.submit(intent);
}
// Maintain the Intent
pushedRouteIntents.put(prefix, intent);
}
}
/**
* Withdraws a multi-point-to-single-point intent.
*
* @param prefix the IPv4 matching prefix for the intent to withdraw.
*/
void withdrawRouteIntent(Ip4Prefix prefix) {
synchronized (this) {
MultiPointToSinglePointIntent intent =
pushedRouteIntents.remove(prefix);
if (intent == null) {
log.debug("There is no intent in pushedRouteIntents to delete " +
"for prefix: {}", prefix);
return;
}
if (isElectedLeader && isActivatedLeader) {
log.debug("Intent installation: deleting Intent for prefix: {}",
prefix);
intentService.withdraw(intent);
}
}
}
/**
* Performs Intents Synchronization between the internally stored Route
* Intents and the installed Route Intents.
*/
private void syncIntents() {
synchronized (this) {
if (!isElectedLeader) {
return; // Nothing to do: not the leader anymore
}
log.debug("Syncing SDN-IP Route Intents...");
Map<Ip4Prefix, MultiPointToSinglePointIntent> fetchedIntents =
new HashMap<>();
//
// Fetch all intents, and classify the Multi-Point-to-Point Intents
// based on the matching prefix.
//
for (Intent intent : intentService.getIntents()) {
if (!(intent instanceof MultiPointToSinglePointIntent)
|| !intent.appId().equals(appId)) {
continue;
}
MultiPointToSinglePointIntent mp2pIntent =
(MultiPointToSinglePointIntent) intent;
Criterion c =
mp2pIntent.selector().getCriterion(Criterion.Type.IPV4_DST);
if (c != null && c instanceof IPCriterion) {
IPCriterion ipCriterion = (IPCriterion) c;
Ip4Prefix ip4Prefix = ipCriterion.ip().getIp4Prefix();
if (ip4Prefix == null) {
// TODO: For now we support only IPv4
continue;
}
fetchedIntents.put(ip4Prefix, mp2pIntent);
} else {
log.warn("No IPV4_DST criterion found for intent {}",
mp2pIntent.id());
}
}
//
// Compare for each prefix the local IN-MEMORY Intents with the
// FETCHED Intents:
// - If the IN-MEMORY Intent is same as the FETCHED Intent, store
// the FETCHED Intent in the local memory (i.e., override the
// IN-MEMORY Intent) to preserve the original Intent ID
// - if the IN-MEMORY Intent is not same as the FETCHED Intent,
// delete the FETCHED Intent, and push/install the IN-MEMORY
// Intent.
// - If there is an IN-MEMORY Intent for a prefix, but no FETCHED
// Intent for same prefix, then push/install the IN-MEMORY
// Intent.
// - If there is a FETCHED Intent for a prefix, but no IN-MEMORY
// Intent for same prefix, then delete/withdraw the FETCHED
// Intent.
//
Collection<Pair<Ip4Prefix, MultiPointToSinglePointIntent>>
storeInMemoryIntents = new LinkedList<>();
Collection<Pair<Ip4Prefix, MultiPointToSinglePointIntent>>
addIntents = new LinkedList<>();
Collection<Pair<Ip4Prefix, MultiPointToSinglePointIntent>>
deleteIntents = new LinkedList<>();
for (Map.Entry<Ip4Prefix, MultiPointToSinglePointIntent> entry :
pushedRouteIntents.entrySet()) {
Ip4Prefix prefix = entry.getKey();
MultiPointToSinglePointIntent inMemoryIntent =
entry.getValue();
MultiPointToSinglePointIntent fetchedIntent =
fetchedIntents.get(prefix);
if (fetchedIntent == null) {
//
// No FETCHED Intent for same prefix: push the IN-MEMORY
// Intent.
//
addIntents.add(Pair.of(prefix, inMemoryIntent));
continue;
}
IntentState state = intentService.getIntentState(fetchedIntent.id());
if (state == IntentState.WITHDRAWING ||
state == IntentState.WITHDRAWN) {
// The intent has been withdrawn but according to our route
// table it should be installed. We'll reinstall it.
addIntents.add(Pair.of(prefix, inMemoryIntent));
}
//
// If IN-MEMORY Intent is same as the FETCHED Intent,
// store the FETCHED Intent in the local memory.
//
if (compareMultiPointToSinglePointIntents(inMemoryIntent,
fetchedIntent)) {
storeInMemoryIntents.add(Pair.of(prefix, fetchedIntent));
} else {
//
// The IN-MEMORY Intent is not same as the FETCHED Intent,
// hence delete the FETCHED Intent, and install the
// IN-MEMORY Intent.
//
deleteIntents.add(Pair.of(prefix, fetchedIntent));
addIntents.add(Pair.of(prefix, inMemoryIntent));
}
fetchedIntents.remove(prefix);
}
//
// Any remaining FETCHED Intents have to be deleted/withdrawn
//
for (Map.Entry<Ip4Prefix, MultiPointToSinglePointIntent> entry :
fetchedIntents.entrySet()) {
Ip4Prefix prefix = entry.getKey();
MultiPointToSinglePointIntent fetchedIntent = entry.getValue();
deleteIntents.add(Pair.of(prefix, fetchedIntent));
}
//
// Perform the actions:
// 1. Store in memory fetched intents that are same. Can be done
// even if we are not the leader anymore
// 2. Delete intents: check if the leader before each operation
// 3. Add intents: check if the leader before each operation
//
for (Pair<Ip4Prefix, MultiPointToSinglePointIntent> pair :
storeInMemoryIntents) {
Ip4Prefix prefix = pair.getLeft();
MultiPointToSinglePointIntent intent = pair.getRight();
log.debug("Intent synchronization: updating in-memory " +
"Intent for prefix: {}", prefix);
pushedRouteIntents.put(prefix, intent);
}
//
isActivatedLeader = true; // Allow push of Intents
for (Pair<Ip4Prefix, MultiPointToSinglePointIntent> pair :
deleteIntents) {
Ip4Prefix prefix = pair.getLeft();
MultiPointToSinglePointIntent intent = pair.getRight();
if (!isElectedLeader) {
isActivatedLeader = false;
return;
}
log.debug("Intent synchronization: deleting Intent for " +
"prefix: {}", prefix);
intentService.withdraw(intent);
}
//
for (Pair<Ip4Prefix, MultiPointToSinglePointIntent> pair :
addIntents) {
Ip4Prefix prefix = pair.getLeft();
MultiPointToSinglePointIntent intent = pair.getRight();
if (!isElectedLeader) {
isActivatedLeader = false;
return;
}
log.debug("Intent synchronization: adding Intent for " +
"prefix: {}", prefix);
intentService.submit(intent);
}
if (!isElectedLeader) {
isActivatedLeader = false;
}
log.debug("Syncing SDN-IP routes completed.");
}
}
/**
* Compares two Multi-point to Single Point Intents whether they represent
* same logical intention.
*
* @param intent1 the first Intent to compare
* @param intent2 the second Intent to compare
* @return true if both Intents represent same logical intention, otherwise
* false
*/
private boolean compareMultiPointToSinglePointIntents(
MultiPointToSinglePointIntent intent1,
MultiPointToSinglePointIntent intent2) {
return Objects.equal(intent1.appId(), intent2.appId()) &&
Objects.equal(intent1.selector(), intent2.selector()) &&
Objects.equal(intent1.treatment(), intent2.treatment()) &&
Objects.equal(intent1.ingressPoints(), intent2.ingressPoints()) &&
Objects.equal(intent1.egressPoint(), intent2.egressPoint());
}
}
......@@ -96,6 +96,13 @@ public class PeerConnectivityManager {
}
/**
* Stops the peer connectivity manager.
*/
public void stop() {
// TODO: Implement it
}
/**
* Sets up paths to establish connectivity between all internal
* {@link BgpSpeaker}s and all external {@link BgpPeer}s.
*/
......
......@@ -16,21 +16,16 @@
package org.onlab.onos.sdnip;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import org.apache.commons.lang3.tuple.Pair;
import org.onlab.onos.core.ApplicationId;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.Host;
......@@ -38,15 +33,9 @@ import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.DefaultTrafficTreatment;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.flow.criteria.Criteria.IPCriterion;
import org.onlab.onos.net.flow.criteria.Criterion;
import org.onlab.onos.net.flow.criteria.Criterion.Type;
import org.onlab.onos.net.host.HostEvent;
import org.onlab.onos.net.host.HostListener;
import org.onlab.onos.net.host.HostService;
import org.onlab.onos.net.intent.Intent;
import org.onlab.onos.net.intent.IntentService;
import org.onlab.onos.net.intent.IntentState;
import org.onlab.onos.net.intent.MultiPointToSinglePointIntent;
import org.onlab.onos.sdnip.config.BgpPeer;
import org.onlab.onos.sdnip.config.Interface;
......@@ -59,7 +48,6 @@ import org.onlab.packet.MacAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Objects;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
......@@ -76,100 +64,81 @@ import com.googlecode.concurrenttrees.radixinverted.InvertedRadixTree;
public class Router implements RouteListener {
private static final Logger log = LoggerFactory.getLogger(Router.class);
// For routes announced by local BGP daemon in SDN network,
// the next hop will be 0.0.0.0.
private static final Ip4Address LOCAL_NEXT_HOP =
Ip4Address.valueOf("0.0.0.0");
// Store all route updates in a radix tree.
// The key in this tree is the binary string of prefix of the route.
private InvertedRadixTree<RouteEntry> bgpRoutes;
// Stores all incoming route updates in a queue.
private BlockingQueue<RouteUpdate> routeUpdates;
private final BlockingQueue<RouteUpdate> routeUpdates;
// The Ip4Address is the next hop address of each route update.
private SetMultimap<Ip4Address, RouteEntry> routesWaitingOnArp;
private ConcurrentHashMap<Ip4Prefix, MultiPointToSinglePointIntent> pushedRouteIntents;
private IntentService intentService;
private HostService hostService;
private SdnIpConfigService configService;
private InterfaceService interfaceService;
private ExecutorService bgpUpdatesExecutor;
private ExecutorService bgpIntentsSynchronizerExecutor;
private final SetMultimap<Ip4Address, RouteEntry> routesWaitingOnArp;
private final ApplicationId appId;
//
// State to deal with SDN-IP Leader election and pushing Intents
//
private Semaphore intentsSynchronizerSemaphore = new Semaphore(0);
private volatile boolean isElectedLeader = false;
private volatile boolean isActivatedLeader = false;
// For routes announced by local BGP daemon in SDN network,
// the next hop will be 0.0.0.0.
public static final Ip4Address LOCAL_NEXT_HOP =
Ip4Address.valueOf("0.0.0.0");
private final IntentSynchronizer intentSynchronizer;
private final HostService hostService;
private final SdnIpConfigService configService;
private final InterfaceService interfaceService;
private final ExecutorService bgpUpdatesExecutor;
private final HostListener hostListener;
/**
* Class constructor.
*
* @param appId the application ID
* @param intentService the intent service
* @param intentSynchronizer the intent synchronizer
* @param hostService the host service
* @param configService the configuration service
* @param interfaceService the interface service
*/
public Router(ApplicationId appId, IntentService intentService,
public Router(ApplicationId appId, IntentSynchronizer intentSynchronizer,
HostService hostService, SdnIpConfigService configService,
InterfaceService interfaceService) {
this.appId = appId;
this.intentService = intentService;
this.intentSynchronizer = intentSynchronizer;
this.hostService = hostService;
this.configService = configService;
this.interfaceService = interfaceService;
this.hostListener = new InternalHostListener();
bgpRoutes = new ConcurrentInvertedRadixTree<>(
new DefaultByteArrayNodeFactory());
routeUpdates = new LinkedBlockingQueue<>();
routesWaitingOnArp = Multimaps.synchronizedSetMultimap(
HashMultimap.<Ip4Address, RouteEntry>create());
pushedRouteIntents = new ConcurrentHashMap<>();
bgpUpdatesExecutor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat("bgp-updates-%d").build());
bgpIntentsSynchronizerExecutor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setNameFormat("bgp-intents-synchronizer-%d").build());
this.hostService.addListener(new InternalHostListener());
}
/**
* Starts the router.
*/
public void start() {
this.hostService.addListener(hostListener);
bgpUpdatesExecutor.execute(new Runnable() {
@Override
public void run() {
doUpdatesThread();
}
});
bgpIntentsSynchronizerExecutor.execute(new Runnable() {
@Override
public void run() {
doIntentSynchronizationThread();
}
});
}
/**
* Shuts the router down.
* Stops the router.
*/
public void shutdown() {
// Stop all threads
public void stop() {
this.hostService.removeListener(hostListener);
// Stop the thread(s)
bgpUpdatesExecutor.shutdownNow();
bgpIntentsSynchronizerExecutor.shutdownNow();
synchronized (this) {
// Cleanup all local state
......@@ -177,41 +146,7 @@ public class Router implements RouteListener {
new DefaultByteArrayNodeFactory());
routeUpdates.clear();
routesWaitingOnArp.clear();
pushedRouteIntents.clear();
//
// Withdraw all SDN-IP intents
//
if (!isElectedLeader) {
return; // Nothing to do: not the leader anymore
}
log.debug("Withdrawing all SDN-IP Route Intents...");
for (Intent intent : intentService.getIntents()) {
if (!(intent instanceof MultiPointToSinglePointIntent)
|| !intent.appId().equals(appId)) {
continue;
}
intentService.withdraw(intent);
}
}
}
//@Override TODO hook this up to something
public void leaderChanged(boolean isLeader) {
log.debug("Leader changed: {}", isLeader);
if (!isLeader) {
this.isElectedLeader = false;
this.isActivatedLeader = false;
return; // Nothing to do
}
this.isActivatedLeader = false;
this.isElectedLeader = true;
//
// Tell the Intents Synchronizer thread to start the synchronization
//
intentsSynchronizerSemaphore.release();
}
@Override
......@@ -227,35 +162,6 @@ public class Router implements RouteListener {
}
/**
* Thread for Intent Synchronization.
*/
private void doIntentSynchronizationThread() {
boolean interrupted = false;
try {
while (!interrupted) {
try {
intentsSynchronizerSemaphore.acquire();
//
// Drain all permits, because a single synchronization is
// sufficient.
//
intentsSynchronizerSemaphore.drainPermits();
} catch (InterruptedException e) {
log.debug("Interrupted while waiting to become " +
"Intent Synchronization leader");
interrupted = true;
break;
}
syncIntents();
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
/**
* Thread for handling route updates.
*/
private void doUpdatesThread() {
......@@ -290,194 +196,6 @@ public class Router implements RouteListener {
}
/**
* Performs Intents Synchronization between the internally stored Route
* Intents and the installed Route Intents.
*/
private void syncIntents() {
synchronized (this) {
if (!isElectedLeader) {
return; // Nothing to do: not the leader anymore
}
log.debug("Syncing SDN-IP Route Intents...");
Map<Ip4Prefix, MultiPointToSinglePointIntent> fetchedIntents =
new HashMap<>();
//
// Fetch all intents, and classify the Multi-Point-to-Point Intents
// based on the matching prefix.
//
for (Intent intent : intentService.getIntents()) {
if (!(intent instanceof MultiPointToSinglePointIntent)
|| !intent.appId().equals(appId)) {
continue;
}
MultiPointToSinglePointIntent mp2pIntent =
(MultiPointToSinglePointIntent) intent;
Criterion c = mp2pIntent.selector().getCriterion(Type.IPV4_DST);
if (c != null && c instanceof IPCriterion) {
IPCriterion ipCriterion = (IPCriterion) c;
Ip4Prefix ip4Prefix = ipCriterion.ip().getIp4Prefix();
if (ip4Prefix == null) {
// TODO: For now we support only IPv4
continue;
}
fetchedIntents.put(ip4Prefix, mp2pIntent);
} else {
log.warn("No IPV4_DST criterion found for intent {}",
mp2pIntent.id());
}
}
//
// Compare for each prefix the local IN-MEMORY Intents with the
// FETCHED Intents:
// - If the IN-MEMORY Intent is same as the FETCHED Intent, store
// the FETCHED Intent in the local memory (i.e., override the
// IN-MEMORY Intent) to preserve the original Intent ID
// - if the IN-MEMORY Intent is not same as the FETCHED Intent,
// delete the FETCHED Intent, and push/install the IN-MEMORY
// Intent.
// - If there is an IN-MEMORY Intent for a prefix, but no FETCHED
// Intent for same prefix, then push/install the IN-MEMORY
// Intent.
// - If there is a FETCHED Intent for a prefix, but no IN-MEMORY
// Intent for same prefix, then delete/withdraw the FETCHED
// Intent.
//
Collection<Pair<Ip4Prefix, MultiPointToSinglePointIntent>>
storeInMemoryIntents = new LinkedList<>();
Collection<Pair<Ip4Prefix, MultiPointToSinglePointIntent>>
addIntents = new LinkedList<>();
Collection<Pair<Ip4Prefix, MultiPointToSinglePointIntent>>
deleteIntents = new LinkedList<>();
for (Map.Entry<Ip4Prefix, MultiPointToSinglePointIntent> entry :
pushedRouteIntents.entrySet()) {
Ip4Prefix prefix = entry.getKey();
MultiPointToSinglePointIntent inMemoryIntent =
entry.getValue();
MultiPointToSinglePointIntent fetchedIntent =
fetchedIntents.get(prefix);
if (fetchedIntent == null) {
//
// No FETCHED Intent for same prefix: push the IN-MEMORY
// Intent.
//
addIntents.add(Pair.of(prefix, inMemoryIntent));
continue;
}
IntentState state = intentService.getIntentState(fetchedIntent.id());
if (state == IntentState.WITHDRAWING ||
state == IntentState.WITHDRAWN) {
// The intent has been withdrawn but according to our route
// table it should be installed. We'll reinstall it.
addIntents.add(Pair.of(prefix, inMemoryIntent));
}
//
// If IN-MEMORY Intent is same as the FETCHED Intent,
// store the FETCHED Intent in the local memory.
//
if (compareMultiPointToSinglePointIntents(inMemoryIntent,
fetchedIntent)) {
storeInMemoryIntents.add(Pair.of(prefix, fetchedIntent));
} else {
//
// The IN-MEMORY Intent is not same as the FETCHED Intent,
// hence delete the FETCHED Intent, and install the
// IN-MEMORY Intent.
//
deleteIntents.add(Pair.of(prefix, fetchedIntent));
addIntents.add(Pair.of(prefix, inMemoryIntent));
}
fetchedIntents.remove(prefix);
}
//
// Any remaining FETCHED Intents have to be deleted/withdrawn
//
for (Map.Entry<Ip4Prefix, MultiPointToSinglePointIntent> entry :
fetchedIntents.entrySet()) {
Ip4Prefix prefix = entry.getKey();
MultiPointToSinglePointIntent fetchedIntent = entry.getValue();
deleteIntents.add(Pair.of(prefix, fetchedIntent));
}
//
// Perform the actions:
// 1. Store in memory fetched intents that are same. Can be done
// even if we are not the leader anymore
// 2. Delete intents: check if the leader before each operation
// 3. Add intents: check if the leader before each operation
//
for (Pair<Ip4Prefix, MultiPointToSinglePointIntent> pair :
storeInMemoryIntents) {
Ip4Prefix prefix = pair.getLeft();
MultiPointToSinglePointIntent intent = pair.getRight();
log.debug("Intent synchronization: updating in-memory " +
"Intent for prefix: {}", prefix);
pushedRouteIntents.put(prefix, intent);
}
//
isActivatedLeader = true; // Allow push of Intents
for (Pair<Ip4Prefix, MultiPointToSinglePointIntent> pair :
deleteIntents) {
Ip4Prefix prefix = pair.getLeft();
MultiPointToSinglePointIntent intent = pair.getRight();
if (!isElectedLeader) {
isActivatedLeader = false;
return;
}
log.debug("Intent synchronization: deleting Intent for " +
"prefix: {}", prefix);
intentService.withdraw(intent);
}
//
for (Pair<Ip4Prefix, MultiPointToSinglePointIntent> pair :
addIntents) {
Ip4Prefix prefix = pair.getLeft();
MultiPointToSinglePointIntent intent = pair.getRight();
if (!isElectedLeader) {
isActivatedLeader = false;
return;
}
log.debug("Intent synchronization: adding Intent for " +
"prefix: {}", prefix);
intentService.submit(intent);
}
if (!isElectedLeader) {
isActivatedLeader = false;
}
log.debug("Syncing SDN-IP routes completed.");
}
}
/**
* Compares two Multi-point to Single Point Intents whether they represent
* same logical intention.
*
* @param intent1 the first Intent to compare
* @param intent2 the second Intent to compare
* @return true if both Intents represent same logical intention, otherwise
* false
*/
private boolean compareMultiPointToSinglePointIntents(
MultiPointToSinglePointIntent intent1,
MultiPointToSinglePointIntent intent2) {
return Objects.equal(intent1.appId(), intent2.appId()) &&
Objects.equal(intent1.selector(), intent2.selector()) &&
Objects.equal(intent1.treatment(), intent2.treatment()) &&
Objects.equal(intent1.ingressPoints(), intent2.ingressPoints()) &&
Objects.equal(intent1.egressPoint(), intent2.egressPoint());
}
/**
* Processes adding a route entry.
* <p>
* Put new route entry into the radix tree. If there was an existing
......@@ -488,7 +206,7 @@ public class Router implements RouteListener {
*
* @param routeEntry the route entry to add
*/
protected void processRouteAdd(RouteEntry routeEntry) {
void processRouteAdd(RouteEntry routeEntry) {
synchronized (this) {
log.debug("Processing route add: {}", routeEntry);
......@@ -610,17 +328,8 @@ public class Router implements RouteListener {
log.debug("Adding intent for prefix {}, next hop mac {}",
prefix, nextHopMacAddress);
MultiPointToSinglePointIntent pushedIntent =
pushedRouteIntents.get(prefix);
// Just for testing.
if (pushedIntent != null) {
log.error("There should not be a pushed intent: {}", pushedIntent);
}
ConnectPoint egressPort = egressInterface.connectPoint();
Set<ConnectPoint> ingressPorts = new HashSet<>();
ConnectPoint egressPort = egressInterface.connectPoint();
for (Interface intf : interfaceService.getInterfaces()) {
if (!intf.connectPoint().equals(egressInterface.connectPoint())) {
......@@ -644,14 +353,7 @@ public class Router implements RouteListener {
new MultiPointToSinglePointIntent(appId, selector, treatment,
ingressPorts, egressPort);
if (isElectedLeader && isActivatedLeader) {
log.debug("Intent installation: adding Intent for prefix: {}",
prefix);
intentService.submit(intent);
}
// Maintain the Intent
pushedRouteIntents.put(prefix, intent);
intentSynchronizer.submitRouteIntent(prefix, intent);
}
/**
......@@ -663,7 +365,7 @@ public class Router implements RouteListener {
*
* @param routeEntry the route entry to delete
*/
protected void processRouteDelete(RouteEntry routeEntry) {
void processRouteDelete(RouteEntry routeEntry) {
synchronized (this) {
log.debug("Processing route delete: {}", routeEntry);
Ip4Prefix prefix = routeEntry.prefix();
......@@ -691,21 +393,7 @@ public class Router implements RouteListener {
private void executeRouteDelete(RouteEntry routeEntry) {
log.debug("Executing route delete: {}", routeEntry);
Ip4Prefix prefix = routeEntry.prefix();
MultiPointToSinglePointIntent intent =
pushedRouteIntents.remove(prefix);
if (intent == null) {
log.debug("There is no intent in pushedRouteIntents to delete " +
"for prefix: {}", prefix);
} else {
if (isElectedLeader && isActivatedLeader) {
log.debug("Intent installation: deleting Intent for prefix: {}",
prefix);
intentService.withdraw(intent);
}
}
intentSynchronizer.withdrawRouteIntent(routeEntry.prefix());
}
/**
......@@ -774,21 +462,6 @@ public class Router implements RouteListener {
}
/**
* Gets the pushed route intents.
*
* @return the pushed route intents
*/
public Collection<MultiPointToSinglePointIntent> getPushedRouteIntents() {
List<MultiPointToSinglePointIntent> pushedIntents = new LinkedList<>();
for (Map.Entry<Ip4Prefix, MultiPointToSinglePointIntent> entry :
pushedRouteIntents.entrySet()) {
pushedIntents.add(entry.getValue());
}
return pushedIntents;
}
/**
* Listener for host events.
*/
class InternalHostListener implements HostListener {
......
......@@ -55,6 +55,7 @@ public class SdnIp implements SdnIpService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected HostService hostService;
private IntentSynchronizer intentSynchronizer;
private SdnIpConfigReader config;
private PeerConnectivityManager peerConnectivity;
private Router router;
......@@ -64,35 +65,41 @@ public class SdnIp implements SdnIpService {
protected void activate() {
log.debug("SDN-IP started");
ApplicationId appId = coreService.registerApplication(SDN_IP_APP);
config = new SdnIpConfigReader();
config.init();
InterfaceService interfaceService = new HostToInterfaceAdaptor(hostService);
InterfaceService interfaceService =
new HostToInterfaceAdaptor(hostService);
ApplicationId appId = coreService.registerApplication(SDN_IP_APP);
intentSynchronizer = new IntentSynchronizer(appId, intentService);
intentSynchronizer.start();
peerConnectivity = new PeerConnectivityManager(appId, config,
interfaceService, intentService);
peerConnectivity.start();
router = new Router(appId, intentService, hostService, config, interfaceService);
router = new Router(appId, intentSynchronizer, hostService, config,
interfaceService);
router.start();
// Manually set the router as the leader to allow testing
// Manually set the instance as the leader to allow testing
// TODO change this when we get a leader election
router.leaderChanged(true);
intentSynchronizer.leaderChanged(true);
bgpSessionManager = new BgpSessionManager(router);
bgpSessionManager.startUp(2000); // TODO
// TODO: the local BGP listen port number should be configurable
bgpSessionManager.start(2000);
// TODO need to disable link discovery on external ports
}
@Deactivate
protected void deactivate() {
bgpSessionManager.shutDown();
router.shutdown();
bgpSessionManager.stop();
router.stop();
peerConnectivity.stop();
intentSynchronizer.stop();
log.info("Stopped");
}
......@@ -114,7 +121,7 @@ public class SdnIp implements SdnIpService {
@Override
public void modifyPrimary(boolean isPrimary) {
router.leaderChanged(isPrimary);
intentSynchronizer.leaderChanged(isPrimary);
}
static String dpidToUri(String dpid) {
......
......@@ -181,8 +181,8 @@ public class BgpSessionManager {
* @param listenPortNumber the port number to listen on. By default
* it should be BgpConstants.BGP_PORT (179)
*/
public void startUp(int listenPortNumber) {
log.debug("BGP Session Manager startUp()");
public void start(int listenPortNumber) {
log.debug("BGP Session Manager start.");
isShutdown = false;
ChannelFactory channelFactory =
......@@ -222,9 +222,9 @@ public class BgpSessionManager {
}
/**
* Shuts down the BGP Session Manager operation.
* Stops the BGP Session Manager operation.
*/
public void shutDown() {
public void stop() {
isShutdown = true;
allChannels.close().awaitUninterruptibly();
serverBootstrap.releaseExternalResources();
......
......@@ -54,7 +54,8 @@ import com.googlecode.concurrenttrees.radixinverted.ConcurrentInvertedRadixTree;
import com.googlecode.concurrenttrees.radixinverted.InvertedRadixTree;
/**
* This class tests the intent synchronization function in Router class.
* This class tests the intent synchronization function in the
* IntentSynchronizer class.
*/
public class IntentSyncTest {
......@@ -74,6 +75,7 @@ public class IntentSyncTest {
DeviceId.deviceId("of:0000000000000003"),
PortNumber.portNumber(1));
private IntentSynchronizer intentSynchronizer;
private Router router;
private static final ApplicationId APPID = new ApplicationId() {
......@@ -94,7 +96,8 @@ public class IntentSyncTest {
setUpHostService();
intentService = createMock(IntentService.class);
router = new Router(APPID, intentService,
intentSynchronizer = new IntentSynchronizer(APPID, intentService);
router = new Router(APPID, intentSynchronizer,
hostService, null, interfaceService);
}
......@@ -260,7 +263,7 @@ public class IntentSyncTest {
// Compose a intent, which is equal to intent5 but the id is different.
MultiPointToSinglePointIntent intent5New =
staticIntentBuilder(intent5, routeEntry5, "00:00:00:00:00:01");
assertTrue(TestUtils.callMethod(router,
assertTrue(TestUtils.callMethod(intentSynchronizer,
"compareMultiPointToSinglePointIntents",
new Class<?>[] {MultiPointToSinglePointIntent.class,
MultiPointToSinglePointIntent.class},
......@@ -296,7 +299,8 @@ public class IntentSyncTest {
pushedRouteIntents.put(routeEntry5.prefix(), intent5New);
pushedRouteIntents.put(routeEntry6.prefix(), intent6);
pushedRouteIntents.put(routeEntry7.prefix(), intent7);
TestUtils.setField(router, "pushedRouteIntents", pushedRouteIntents);
TestUtils.setField(intentSynchronizer, "pushedRouteIntents",
pushedRouteIntents);
// Set up expectation
reset(intentService);
......@@ -327,8 +331,9 @@ public class IntentSyncTest {
replay(intentService);
// Start the test
router.leaderChanged(true);
TestUtils.callMethod(router, "syncIntents", new Class<?>[] {});
intentSynchronizer.leaderChanged(true);
TestUtils.callMethod(intentSynchronizer, "syncIntents",
new Class<?>[] {});
// Verify
assertEquals(router.getRoutes().size(), 6);
......@@ -338,12 +343,12 @@ public class IntentSyncTest {
assertTrue(router.getRoutes().contains(routeEntry5));
assertTrue(router.getRoutes().contains(routeEntry6));
assertEquals(router.getPushedRouteIntents().size(), 6);
assertTrue(router.getPushedRouteIntents().contains(intent1));
assertTrue(router.getPushedRouteIntents().contains(intent3));
assertTrue(router.getPushedRouteIntents().contains(intent4Update));
assertTrue(router.getPushedRouteIntents().contains(intent5));
assertTrue(router.getPushedRouteIntents().contains(intent6));
assertEquals(intentSynchronizer.getPushedRouteIntents().size(), 6);
assertTrue(intentSynchronizer.getPushedRouteIntents().contains(intent1));
assertTrue(intentSynchronizer.getPushedRouteIntents().contains(intent3));
assertTrue(intentSynchronizer.getPushedRouteIntents().contains(intent4Update));
assertTrue(intentSynchronizer.getPushedRouteIntents().contains(intent5));
assertTrue(intentSynchronizer.getPushedRouteIntents().contains(intent6));
verify(intentService);
}
......
......@@ -102,6 +102,7 @@ public class RouterTest {
}
};
private IntentSynchronizer intentSynchronizer;
private Router router;
@Before
......@@ -113,7 +114,8 @@ public class RouterTest {
intentService = createMock(IntentService.class);
router = new Router(APPID, intentService,
intentSynchronizer = new IntentSynchronizer(APPID, intentService);
router = new Router(APPID, intentSynchronizer,
hostService, sdnIpConfigService, interfaceService);
}
......@@ -258,15 +260,15 @@ public class RouterTest {
replay(intentService);
// Call the processRouteAdd() method in Router class
router.leaderChanged(true);
TestUtils.setField(router, "isActivatedLeader", true);
intentSynchronizer.leaderChanged(true);
TestUtils.setField(intentSynchronizer, "isActivatedLeader", true);
router.processRouteAdd(routeEntry);
// Verify
assertEquals(router.getRoutes().size(), 1);
assertTrue(router.getRoutes().contains(routeEntry));
assertEquals(router.getPushedRouteIntents().size(), 1);
assertEquals(router.getPushedRouteIntents().iterator().next(),
assertEquals(intentSynchronizer.getPushedRouteIntents().size(), 1);
assertEquals(intentSynchronizer.getPushedRouteIntents().iterator().next(),
intent);
verify(intentService);
}
......@@ -338,15 +340,15 @@ public class RouterTest {
replay(intentService);
// Call the processRouteAdd() method in Router class
router.leaderChanged(true);
TestUtils.setField(router, "isActivatedLeader", true);
intentSynchronizer.leaderChanged(true);
TestUtils.setField(intentSynchronizer, "isActivatedLeader", true);
router.processRouteAdd(routeEntryUpdate);
// Verify
assertEquals(router.getRoutes().size(), 1);
assertTrue(router.getRoutes().contains(routeEntryUpdate));
assertEquals(router.getPushedRouteIntents().size(), 1);
assertEquals(router.getPushedRouteIntents().iterator().next(),
assertEquals(intentSynchronizer.getPushedRouteIntents().size(), 1);
assertEquals(intentSynchronizer.getPushedRouteIntents().iterator().next(),
intentNew);
verify(intentService);
}
......@@ -389,13 +391,13 @@ public class RouterTest {
replay(intentService);
// Call route deleting method in Router class
router.leaderChanged(true);
TestUtils.setField(router, "isActivatedLeader", true);
intentSynchronizer.leaderChanged(true);
TestUtils.setField(intentSynchronizer, "isActivatedLeader", true);
router.processRouteDelete(routeEntry);
// Verify
assertEquals(router.getRoutes().size(), 0);
assertEquals(router.getPushedRouteIntents().size(), 0);
assertEquals(intentSynchronizer.getPushedRouteIntents().size(), 0);
verify(intentService);
}
......@@ -416,14 +418,14 @@ public class RouterTest {
replay(intentService);
// Call the processRouteAdd() method in Router class
router.leaderChanged(true);
TestUtils.setField(router, "isActivatedLeader", true);
intentSynchronizer.leaderChanged(true);
TestUtils.setField(intentSynchronizer, "isActivatedLeader", true);
router.processRouteAdd(routeEntry);
// Verify
assertEquals(router.getRoutes().size(), 1);
assertTrue(router.getRoutes().contains(routeEntry));
assertEquals(router.getPushedRouteIntents().size(), 0);
assertEquals(intentSynchronizer.getPushedRouteIntents().size(), 0);
verify(intentService);
}
}
......
......@@ -92,6 +92,7 @@ public class RouterTestWithAsyncArp {
DeviceId.deviceId("of:0000000000000003"),
PortNumber.portNumber(1));
private IntentSynchronizer intentSynchronizer;
private Router router;
private InternalHostListener internalHostListener;
......@@ -114,7 +115,8 @@ public class RouterTestWithAsyncArp {
hostService = createMock(HostService.class);
intentService = createMock(IntentService.class);
router = new Router(APPID, intentService,
intentSynchronizer = new IntentSynchronizer(APPID, intentService);
router = new Router(APPID, intentSynchronizer,
hostService, sdnIpConfigService, interfaceService);
internalHostListener = router.new InternalHostListener();
}
......@@ -211,8 +213,8 @@ public class RouterTestWithAsyncArp {
replay(intentService);
// Call the processRouteAdd() method in Router class
router.leaderChanged(true);
TestUtils.setField(router, "isActivatedLeader", true);
intentSynchronizer.leaderChanged(true);
TestUtils.setField(intentSynchronizer, "isActivatedLeader", true);
router.processRouteAdd(routeEntry);
Host host = new DefaultHost(ProviderId.NONE, HostId.NONE,
......@@ -227,9 +229,9 @@ public class RouterTestWithAsyncArp {
// Verify
assertEquals(router.getRoutes().size(), 1);
assertTrue(router.getRoutes().contains(routeEntry));
assertEquals(router.getPushedRouteIntents().size(), 1);
assertEquals(router.getPushedRouteIntents().iterator().next(),
intent);
assertEquals(intentSynchronizer.getPushedRouteIntents().size(), 1);
assertEquals(intentSynchronizer.getPushedRouteIntents().iterator().next(),
intent);
verify(intentService);
verify(hostService);
......@@ -294,8 +296,8 @@ public class RouterTestWithAsyncArp {
replay(intentService);
// Call the processRouteAdd() method in Router class
router.leaderChanged(true);
TestUtils.setField(router, "isActivatedLeader", true);
intentSynchronizer.leaderChanged(true);
TestUtils.setField(intentSynchronizer, "isActivatedLeader", true);
router.processRouteAdd(routeEntryUpdate);
Host host = new DefaultHost(ProviderId.NONE, HostId.NONE,
......@@ -310,8 +312,8 @@ public class RouterTestWithAsyncArp {
// Verify
assertEquals(router.getRoutes().size(), 1);
assertTrue(router.getRoutes().contains(routeEntryUpdate));
assertEquals(router.getPushedRouteIntents().size(), 1);
assertEquals(router.getPushedRouteIntents().iterator().next(),
assertEquals(intentSynchronizer.getPushedRouteIntents().size(), 1);
assertEquals(intentSynchronizer.getPushedRouteIntents().iterator().next(),
intentNew);
verify(intentService);
verify(hostService);
......@@ -342,13 +344,13 @@ public class RouterTestWithAsyncArp {
replay(intentService);
// Call route deleting method in Router class
router.leaderChanged(true);
TestUtils.setField(router, "isActivatedLeader", true);
intentSynchronizer.leaderChanged(true);
TestUtils.setField(intentSynchronizer, "isActivatedLeader", true);
router.processRouteDelete(routeEntry);
// Verify
assertEquals(router.getRoutes().size(), 0);
assertEquals(router.getPushedRouteIntents().size(), 0);
assertEquals(intentSynchronizer.getPushedRouteIntents().size(), 0);
verify(intentService);
}
......
......@@ -66,6 +66,7 @@ public class SdnIpTest {
private static final int MIN_PREFIX_LENGTH = 1;
private static final int MAX_PREFIX_LENGTH = 32;
private IntentSynchronizer intentSynchronizer;
static Router router;
private SdnIpConfigService sdnIpConfigService;
......@@ -111,7 +112,8 @@ public class SdnIpTest {
intentService = createMock(IntentService.class);
random = new Random();
router = new Router(APPID, intentService, hostService,
intentSynchronizer = new IntentSynchronizer(APPID, intentService);
router = new Router(APPID, intentSynchronizer, hostService,
sdnIpConfigService, interfaceService);
}
......@@ -228,8 +230,8 @@ public class SdnIpTest {
replay(intentService);
router.leaderChanged(true);
TestUtils.setField(router, "isActivatedLeader", true);
intentSynchronizer.leaderChanged(true);
TestUtils.setField(intentSynchronizer, "isActivatedLeader", true);
// Add route updates
for (RouteUpdate update : routeUpdates) {
......@@ -239,7 +241,8 @@ public class SdnIpTest {
latch.await(5000, TimeUnit.MILLISECONDS);
assertEquals(router.getRoutes().size(), numRoutes);
assertEquals(router.getPushedRouteIntents().size(), numRoutes);
assertEquals(intentSynchronizer.getPushedRouteIntents().size(),
numRoutes);
verify(intentService);
}
......@@ -295,8 +298,8 @@ public class SdnIpTest {
replay(intentService);
router.leaderChanged(true);
TestUtils.setField(router, "isActivatedLeader", true);
intentSynchronizer.leaderChanged(true);
TestUtils.setField(intentSynchronizer, "isActivatedLeader", true);
// Send the add updates first
for (RouteUpdate update : routeUpdates) {
......@@ -314,7 +317,7 @@ public class SdnIpTest {
deleteCount.await(5000, TimeUnit.MILLISECONDS);
assertEquals(0, router.getRoutes().size());
assertEquals(0, router.getPushedRouteIntents().size());
assertEquals(0, intentSynchronizer.getPushedRouteIntents().size());
verify(intentService);
}
......
......@@ -95,7 +95,7 @@ public class BgpSessionManagerTest {
//
bgpSessionManager = new BgpSessionManager(dummyRouteListener);
// NOTE: We use port 0 to bind on any available port
bgpSessionManager.startUp(0);
bgpSessionManager.start(0);
// Get the port number the BGP Session Manager is listening on
Channel serverChannel = TestUtils.getField(bgpSessionManager,
......@@ -136,7 +136,7 @@ public class BgpSessionManagerTest {
@After
public void tearDown() throws Exception {
bgpSessionManager.shutDown();
bgpSessionManager.stop();
bgpSessionManager = null;
}
......