Toggle navigation
Toggle navigation
This project
Loading...
Sign in
홍길동
/
onos
Go to a project
Toggle navigation
Toggle navigation pinning
Projects
Groups
Snippets
Help
Project
Activity
Repository
Pipelines
Graphs
Issues
0
Merge Requests
0
Wiki
Snippets
Network
Create a new issue
Builds
Commits
Issue Boards
Authored by
tom
2014-10-08 20:22:13 -0700
Browse Files
Options
Browse Files
Download
Plain Diff
Commit
2f182f6bac735d0e6157ad51b59284767a78d430
2f182f6b
2 parents
1a17eb47
c7e1cb63
Merge remote-tracking branch 'origin/master'
Hide whitespace changes
Inline
Side-by-side
Showing
10 changed files
with
355 additions
and
104 deletions
core/api/src/main/java/org/onlab/onos/net/flow/BatchOperationResult.java
core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowEntry.java
core/api/src/main/java/org/onlab/onos/net/flow/FlowEntry.java
core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
core/api/src/main/java/org/onlab/onos/net/flow/BatchOperationResult.java
0 → 100644
View file @
2f182f6
package
org
.
onlab
.
onos
.
net
.
flow
;
import
java.util.List
;
/**
* Interface capturing the result of a batch operation.
*
*/
public
interface
BatchOperationResult
<
T
>
{
/**
* Returns whether the operation was successful.
* @return true if successful, false otherwise
*/
boolean
isSuccess
();
/**
* Obtains a list of items which failed.
* @return a list of failures
*/
List
<
T
>
failedItems
();
}
core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
View file @
2f182f6
package
org
.
onlab
.
onos
.
net
.
flow
;
public
class
CompletedBatchOperation
{
import
java.util.List
;
import
com.google.common.collect.ImmutableList
;
public
class
CompletedBatchOperation
implements
BatchOperationResult
<
FlowEntry
>
{
private
final
boolean
success
;
private
final
List
<
FlowEntry
>
failures
;
public
CompletedBatchOperation
(
boolean
success
,
List
<
FlowEntry
>
failures
)
{
this
.
success
=
success
;
this
.
failures
=
ImmutableList
.
copyOf
(
failures
);
}
@Override
public
boolean
isSuccess
()
{
return
success
;
}
@Override
public
List
<
FlowEntry
>
failedItems
()
{
return
failures
;
}
}
...
...
core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowEntry.java
View file @
2f182f6
...
...
@@ -17,6 +17,10 @@ public class DefaultFlowEntry extends DefaultFlowRule implements FlowEntry {
private
long
lastSeen
=
-
1
;
private
final
int
errType
;
private
final
int
errCode
;
public
DefaultFlowEntry
(
DeviceId
deviceId
,
TrafficSelector
selector
,
TrafficTreatment
treatment
,
int
priority
,
FlowEntryState
state
,
...
...
@@ -27,6 +31,8 @@ public class DefaultFlowEntry extends DefaultFlowRule implements FlowEntry {
this
.
life
=
life
;
this
.
packets
=
packets
;
this
.
bytes
=
bytes
;
this
.
errCode
=
-
1
;
this
.
errType
=
-
1
;
this
.
lastSeen
=
System
.
currentTimeMillis
();
}
...
...
@@ -37,6 +43,8 @@ public class DefaultFlowEntry extends DefaultFlowRule implements FlowEntry {
this
.
life
=
life
;
this
.
packets
=
packets
;
this
.
bytes
=
bytes
;
this
.
errCode
=
-
1
;
this
.
errType
=
-
1
;
this
.
lastSeen
=
System
.
currentTimeMillis
();
}
...
...
@@ -46,9 +54,18 @@ public class DefaultFlowEntry extends DefaultFlowRule implements FlowEntry {
this
.
life
=
0
;
this
.
packets
=
0
;
this
.
bytes
=
0
;
this
.
errCode
=
-
1
;
this
.
errType
=
-
1
;
this
.
lastSeen
=
System
.
currentTimeMillis
();
}
public
DefaultFlowEntry
(
FlowRule
rule
,
int
errType
,
int
errCode
)
{
super
(
rule
);
this
.
state
=
FlowEntryState
.
FAILED
;
this
.
errType
=
errType
;
this
.
errCode
=
errCode
;
}
@Override
public
long
life
()
{
return
life
;
...
...
@@ -100,6 +117,16 @@ public class DefaultFlowEntry extends DefaultFlowRule implements FlowEntry {
}
@Override
public
int
errType
()
{
return
this
.
errType
;
}
@Override
public
int
errCode
()
{
return
this
.
errCode
;
}
@Override
public
String
toString
()
{
return
toStringHelper
(
this
)
.
add
(
"rule"
,
super
.
toString
())
...
...
@@ -108,4 +135,6 @@ public class DefaultFlowEntry extends DefaultFlowRule implements FlowEntry {
}
}
...
...
core/api/src/main/java/org/onlab/onos/net/flow/FlowEntry.java
View file @
2f182f6
...
...
@@ -29,7 +29,12 @@ public interface FlowEntry extends FlowRule {
/**
* Flow has been removed from flow table and can be purged.
*/
REMOVED
REMOVED
,
/**
* Indicates that the installation of this flow has failed.
*/
FAILED
}
/**
...
...
@@ -95,4 +100,16 @@ public interface FlowEntry extends FlowRule {
*/
void
setBytes
(
long
bytes
);
/**
* Indicates the error type.
* @return an integer value of the error
*/
int
errType
();
/**
* Indicates the error code.
* @return an integer value of the error
*/
int
errCode
();
}
...
...
core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
View file @
2f182f6
...
...
@@ -37,6 +37,12 @@ public interface FlowRuleProvider extends Provider {
*/
void
removeRulesById
(
ApplicationId
id
,
FlowRule
...
flowRules
);
Future
<
Void
>
executeBatch
(
BatchOperation
<
FlowRuleBatchEntry
>
batch
);
/**
* Installs a batch of flow rules. Each flowrule is associated to an
* operation which results in either addition, removal or modification.
* @param batch a batch of flow rules
* @return a future indicating the status of this execution
*/
Future
<
CompletedBatchOperation
>
executeBatch
(
BatchOperation
<
FlowRuleBatchEntry
>
batch
);
}
...
...
core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
View file @
2f182f6
...
...
@@ -5,10 +5,12 @@ import static org.slf4j.LoggerFactory.getLogger;
import
java.util.Iterator
;
import
java.util.List
;
import
java.util.concurrent.CancellationException
;
import
java.util.concurrent.ExecutionException
;
import
java.util.concurrent.Future
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.TimeoutException
;
import
java.util.concurrent.atomic.AtomicReference
;
import
org.apache.felix.scr.annotations.Activate
;
import
org.apache.felix.scr.annotations.Component
;
...
...
@@ -26,6 +28,7 @@ import org.onlab.onos.net.flow.CompletedBatchOperation;
import
org.onlab.onos.net.flow.FlowEntry
;
import
org.onlab.onos.net.flow.FlowRule
;
import
org.onlab.onos.net.flow.FlowRuleBatchEntry
;
import
org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation
;
import
org.onlab.onos.net.flow.FlowRuleBatchOperation
;
import
org.onlab.onos.net.flow.FlowRuleEvent
;
import
org.onlab.onos.net.flow.FlowRuleListener
;
...
...
@@ -52,6 +55,8 @@ public class FlowRuleManager
extends
AbstractProviderRegistry
<
FlowRuleProvider
,
FlowRuleProviderService
>
implements
FlowRuleService
,
FlowRuleProviderRegistry
{
enum
BatchState
{
STARTED
,
FINISHED
,
CANCELLED
};
public
static
final
String
FLOW_RULE_NULL
=
"FlowRule cannot be null"
;
private
final
Logger
log
=
getLogger
(
getClass
());
...
...
@@ -144,7 +149,7 @@ public class FlowRuleManager
FlowRuleBatchOperation
batch
)
{
Multimap
<
FlowRuleProvider
,
FlowRuleBatchEntry
>
batches
=
ArrayListMultimap
.
create
();
List
<
Future
<
Void
>>
futures
=
Lists
.
newArrayList
();
List
<
Future
<
CompletedBatchOperation
>>
futures
=
Lists
.
newArrayList
();
for
(
FlowRuleBatchEntry
fbe
:
batch
.
getOperations
())
{
final
FlowRule
f
=
fbe
.
getTarget
();
final
Device
device
=
deviceService
.
getDevice
(
f
.
deviceId
());
...
...
@@ -165,10 +170,10 @@ public class FlowRuleManager
for
(
FlowRuleProvider
provider
:
batches
.
keySet
())
{
FlowRuleBatchOperation
b
=
new
FlowRuleBatchOperation
(
batches
.
get
(
provider
));
Future
<
Void
>
future
=
provider
.
executeBatch
(
b
);
Future
<
CompletedBatchOperation
>
future
=
provider
.
executeBatch
(
b
);
futures
.
add
(
future
);
}
return
new
FlowRuleBatchFuture
(
futures
);
return
new
FlowRuleBatchFuture
(
futures
,
batches
);
}
@Override
...
...
@@ -341,59 +346,140 @@ public class FlowRuleManager
private
class
FlowRuleBatchFuture
implements
Future
<
CompletedBatchOperation
>
{
private
final
List
<
Future
<
Void
>>
futures
;
private
final
List
<
Future
<
CompletedBatchOperation
>>
futures
;
private
final
Multimap
<
FlowRuleProvider
,
FlowRuleBatchEntry
>
batches
;
private
final
AtomicReference
<
BatchState
>
state
;
private
CompletedBatchOperation
overall
;
public
FlowRuleBatchFuture
(
List
<
Future
<
Void
>>
futures
)
{
public
FlowRuleBatchFuture
(
List
<
Future
<
CompletedBatchOperation
>>
futures
,
Multimap
<
FlowRuleProvider
,
FlowRuleBatchEntry
>
batches
)
{
this
.
futures
=
futures
;
this
.
batches
=
batches
;
state
=
new
AtomicReference
<
FlowRuleManager
.
BatchState
>();
state
.
set
(
BatchState
.
STARTED
);
}
@Override
public
boolean
cancel
(
boolean
mayInterruptIfRunning
)
{
// TODO Auto-generated method stub
return
false
;
if
(
state
.
get
()
==
BatchState
.
FINISHED
)
{
return
false
;
}
if
(!
state
.
compareAndSet
(
BatchState
.
STARTED
,
BatchState
.
CANCELLED
))
{
return
false
;
}
cleanUpBatch
();
for
(
Future
<
CompletedBatchOperation
>
f
:
futures
)
{
f
.
cancel
(
mayInterruptIfRunning
);
}
return
true
;
}
@Override
public
boolean
isCancelled
()
{
// TODO Auto-generated method stub
return
false
;
return
state
.
get
()
==
BatchState
.
CANCELLED
;
}
@Override
public
boolean
isDone
()
{
boolean
isDone
=
true
;
for
(
Future
<
Void
>
future
:
futures
)
{
isDone
&=
future
.
isDone
();
}
return
isDone
;
return
state
.
get
()
==
BatchState
.
FINISHED
;
}
@Override
public
CompletedBatchOperation
get
()
throws
InterruptedException
,
ExecutionException
{
// TODO Auto-generated method stub
for
(
Future
<
Void
>
future
:
futures
)
{
future
.
get
();
ExecutionException
{
if
(
isDone
())
{
return
overall
;
}
boolean
success
=
true
;
List
<
FlowEntry
>
failed
=
Lists
.
newLinkedList
();
CompletedBatchOperation
completed
;
for
(
Future
<
CompletedBatchOperation
>
future
:
futures
)
{
completed
=
future
.
get
();
success
=
validateBatchOperation
(
failed
,
completed
,
future
);
}
return
new
CompletedBatchOperation
();
return
finalizeBatchOperation
(
success
,
failed
);
}
@Override
public
CompletedBatchOperation
get
(
long
timeout
,
TimeUnit
unit
)
throws
InterruptedException
,
ExecutionException
,
TimeoutException
{
// TODO we should decrement the timeout
if
(
isDone
())
{
return
overall
;
}
boolean
success
=
true
;
List
<
FlowEntry
>
failed
=
Lists
.
newLinkedList
();
CompletedBatchOperation
completed
;
long
start
=
System
.
nanoTime
();
long
end
=
start
+
unit
.
toNanos
(
timeout
);
for
(
Future
<
Void
>
future
:
futures
)
{
for
(
Future
<
CompletedBatchOperation
>
future
:
futures
)
{
long
now
=
System
.
nanoTime
();
long
thisTimeout
=
end
-
now
;
future
.
get
(
thisTimeout
,
TimeUnit
.
NANOSECONDS
);
completed
=
future
.
get
(
thisTimeout
,
TimeUnit
.
NANOSECONDS
);
success
=
validateBatchOperation
(
failed
,
completed
,
future
);
}
return
new
CompletedBatchOperation
(
);
return
finalizeBatchOperation
(
success
,
failed
);
}
private
boolean
validateBatchOperation
(
List
<
FlowEntry
>
failed
,
CompletedBatchOperation
completed
,
Future
<
CompletedBatchOperation
>
future
)
{
if
(
isCancelled
())
{
throw
new
CancellationException
();
}
if
(!
completed
.
isSuccess
())
{
failed
.
addAll
(
completed
.
failedItems
());
cleanUpBatch
();
cancelAllSubBatches
();
return
false
;
}
return
true
;
}
private
void
cancelAllSubBatches
()
{
for
(
Future
<
CompletedBatchOperation
>
f
:
futures
)
{
f
.
cancel
(
true
);
}
}
private
CompletedBatchOperation
finalizeBatchOperation
(
boolean
success
,
List
<
FlowEntry
>
failed
)
{
synchronized
(
this
)
{
if
(!
state
.
compareAndSet
(
BatchState
.
STARTED
,
BatchState
.
FINISHED
))
{
if
(
state
.
get
()
==
BatchState
.
FINISHED
)
{
return
overall
;
}
throw
new
CancellationException
();
}
overall
=
new
CompletedBatchOperation
(
success
,
failed
);
return
overall
;
}
}
private
void
cleanUpBatch
()
{
for
(
FlowRuleBatchEntry
fbe
:
batches
.
values
())
{
if
(
fbe
.
getOperator
()
==
FlowRuleOperation
.
ADD
||
fbe
.
getOperator
()
==
FlowRuleOperation
.
MODIFY
)
{
store
.
deleteFlowRule
(
fbe
.
getTarget
());
}
else
if
(
fbe
.
getOperator
()
==
FlowRuleOperation
.
REMOVE
)
{
store
.
storeFlowRule
(
fbe
.
getTarget
());
}
}
}
}
}
...
...
core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
View file @
2f182f6
...
...
@@ -19,8 +19,11 @@ import java.util.Map;
import
java.util.Objects
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.concurrent.ConcurrentMap
;
import
java.util.concurrent.ExecutionException
;
import
java.util.concurrent.ExecutorService
;
import
java.util.concurrent.Future
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.TimeoutException
;
import
org.apache.felix.scr.annotations.Activate
;
import
org.apache.felix.scr.annotations.Component
;
...
...
@@ -516,9 +519,14 @@ public class IntentManager
public
void
run
()
{
for
(
Iterator
<
Future
<
CompletedBatchOperation
>>
i
=
futures
.
iterator
();
i
.
hasNext
();)
{
Future
<
CompletedBatchOperation
>
future
=
i
.
next
();
if
(
future
.
isDone
())
{
// TODO: we may want to get the future here
try
{
// TODO: we may want to get the future here and go back to the future.
CompletedBatchOperation
completed
=
future
.
get
(
100
,
TimeUnit
.
NANOSECONDS
);
// TODO check if future succeeded and if not report fail items
i
.
remove
();
}
catch
(
TimeoutException
|
InterruptedException
|
ExecutionException
te
)
{
log
.
debug
(
"Intallations of intent {} is still pending"
,
intent
);
}
}
if
(
futures
.
isEmpty
())
{
...
...
core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
View file @
2f182f6
...
...
@@ -28,6 +28,7 @@ import org.onlab.onos.net.Port;
import
org.onlab.onos.net.PortNumber
;
import
org.onlab.onos.net.device.DeviceListener
;
import
org.onlab.onos.net.device.DeviceService
;
import
org.onlab.onos.net.flow.CompletedBatchOperation
;
import
org.onlab.onos.net.flow.DefaultFlowEntry
;
import
org.onlab.onos.net.flow.DefaultFlowRule
;
import
org.onlab.onos.net.flow.FlowEntry
;
...
...
@@ -408,7 +409,7 @@ public class FlowRuleManagerTest {
}
@Override
public
Future
<
Void
>
executeBatch
(
public
Future
<
CompletedBatchOperation
>
executeBatch
(
BatchOperation
<
FlowRuleBatchEntry
>
batch
)
{
// TODO Auto-generated method stub
return
null
;
...
...
providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
View file @
2f182f6
...
...
@@ -27,6 +27,8 @@ import org.onlab.onos.net.flow.instructions.L2ModificationInstruction.ModVlanPcp
import
org.onlab.onos.net.flow.instructions.L3ModificationInstruction
;
import
org.onlab.onos.net.flow.instructions.L3ModificationInstruction.ModIPInstruction
;
import
org.projectfloodlight.openflow.protocol.OFFactory
;
import
org.projectfloodlight.openflow.protocol.OFFlowAdd
;
import
org.projectfloodlight.openflow.protocol.OFFlowDelete
;
import
org.projectfloodlight.openflow.protocol.OFFlowMod
;
import
org.projectfloodlight.openflow.protocol.OFFlowModFlags
;
import
org.projectfloodlight.openflow.protocol.action.OFAction
;
...
...
@@ -68,12 +70,13 @@ public class FlowModBuilder {
this
.
cookie
=
flowRule
.
id
();
}
public
OFFlow
Mo
d
buildFlowAdd
()
{
public
OFFlow
Ad
d
buildFlowAdd
()
{
Match
match
=
buildMatch
();
List
<
OFAction
>
actions
=
buildActions
();
//TODO: what to do without bufferid? do we assume that there will be a pktout as well?
OFFlowMod
fm
=
factory
.
buildFlowAdd
()
OFFlowAdd
fm
=
factory
.
buildFlowAdd
()
.
setXid
(
cookie
.
value
())
.
setCookie
(
U64
.
of
(
cookie
.
value
()))
.
setBufferId
(
OFBufferId
.
NO_BUFFER
)
.
setActions
(
actions
)
...
...
@@ -92,6 +95,7 @@ public class FlowModBuilder {
//TODO: what to do without bufferid? do we assume that there will be a pktout as well?
OFFlowMod
fm
=
factory
.
buildFlowModify
()
.
setXid
(
cookie
.
value
())
.
setCookie
(
U64
.
of
(
cookie
.
value
()))
.
setBufferId
(
OFBufferId
.
NO_BUFFER
)
.
setActions
(
actions
)
...
...
@@ -104,11 +108,12 @@ public class FlowModBuilder {
}
public
OFFlow
Mod
buildFlowDel
()
{
public
OFFlow
Delete
buildFlowDel
()
{
Match
match
=
buildMatch
();
List
<
OFAction
>
actions
=
buildActions
();
OFFlowMod
fm
=
factory
.
buildFlowDelete
()
OFFlowDelete
fm
=
factory
.
buildFlowDelete
()
.
setXid
(
cookie
.
value
())
.
setCookie
(
U64
.
of
(
cookie
.
value
()))
.
setBufferId
(
OFBufferId
.
NO_BUFFER
)
.
setActions
(
actions
)
...
...
providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
View file @
2f182f6
...
...
@@ -2,6 +2,7 @@ package org.onlab.onos.provider.of.flow.impl;
import
static
org
.
slf4j
.
LoggerFactory
.
getLogger
;
import
java.util.HashMap
;
import
java.util.HashSet
;
import
java.util.List
;
import
java.util.Map
;
...
...
@@ -21,9 +22,12 @@ import org.apache.felix.scr.annotations.Reference;
import
org.apache.felix.scr.annotations.ReferenceCardinality
;
import
org.onlab.onos.ApplicationId
;
import
org.onlab.onos.net.DeviceId
;
import
org.onlab.onos.net.flow.CompletedBatchOperation
;
import
org.onlab.onos.net.flow.DefaultFlowEntry
;
import
org.onlab.onos.net.flow.FlowEntry
;
import
org.onlab.onos.net.flow.FlowRule
;
import
org.onlab.onos.net.flow.FlowRuleBatchEntry
;
import
org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation
;
import
org.onlab.onos.net.flow.FlowRuleProvider
;
import
org.onlab.onos.net.flow.FlowRuleProviderRegistry
;
import
org.onlab.onos.net.flow.FlowRuleProviderService
;
...
...
@@ -40,6 +44,7 @@ import org.onlab.onos.openflow.controller.RoleState;
import
org.projectfloodlight.openflow.protocol.OFActionType
;
import
org.projectfloodlight.openflow.protocol.OFBarrierRequest
;
import
org.projectfloodlight.openflow.protocol.OFErrorMsg
;
import
org.projectfloodlight.openflow.protocol.OFFlowMod
;
import
org.projectfloodlight.openflow.protocol.OFFlowRemoved
;
import
org.projectfloodlight.openflow.protocol.OFFlowStatsEntry
;
import
org.projectfloodlight.openflow.protocol.OFFlowStatsReply
;
...
...
@@ -52,6 +57,11 @@ import org.projectfloodlight.openflow.protocol.OFStatsType;
import
org.projectfloodlight.openflow.protocol.OFVersion
;
import
org.projectfloodlight.openflow.protocol.action.OFAction
;
import
org.projectfloodlight.openflow.protocol.action.OFActionOutput
;
import
org.projectfloodlight.openflow.protocol.errormsg.OFBadActionErrorMsg
;
import
org.projectfloodlight.openflow.protocol.errormsg.OFBadInstructionErrorMsg
;
import
org.projectfloodlight.openflow.protocol.errormsg.OFBadMatchErrorMsg
;
import
org.projectfloodlight.openflow.protocol.errormsg.OFBadRequestErrorMsg
;
import
org.projectfloodlight.openflow.protocol.errormsg.OFFlowModFailedErrorMsg
;
import
org.projectfloodlight.openflow.protocol.instruction.OFInstruction
;
import
org.projectfloodlight.openflow.protocol.instruction.OFInstructionApplyActions
;
import
org.projectfloodlight.openflow.types.OFPort
;
...
...
@@ -70,6 +80,8 @@ import com.google.common.collect.Multimap;
@Component
(
immediate
=
true
)
public
class
OpenFlowRuleProvider
extends
AbstractProvider
implements
FlowRuleProvider
{
enum
BatchState
{
STARTED
,
FINISHED
,
CANCELLED
};
private
final
Logger
log
=
getLogger
(
getClass
());
@Reference
(
cardinality
=
ReferenceCardinality
.
MANDATORY_UNARY
)
...
...
@@ -88,6 +100,9 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
private
final
Map
<
Long
,
InstallationFuture
>
pendingFutures
=
new
ConcurrentHashMap
<
Long
,
InstallationFuture
>();
private
final
Map
<
Long
,
InstallationFuture
>
pendingFMs
=
new
ConcurrentHashMap
<
Long
,
InstallationFuture
>();
/**
* Creates an OpenFlow host provider.
*/
...
...
@@ -143,9 +158,47 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
removeFlowRule
(
flowRules
);
}
@Override
public
Future
<
CompletedBatchOperation
>
executeBatch
(
BatchOperation
<
FlowRuleBatchEntry
>
batch
)
{
final
Set
<
Dpid
>
sws
=
new
HashSet
<
Dpid
>();
final
Map
<
Long
,
FlowRuleBatchEntry
>
fmXids
=
new
HashMap
<
Long
,
FlowRuleBatchEntry
>();
OFFlowMod
mod
=
null
;
for
(
FlowRuleBatchEntry
fbe
:
batch
.
getOperations
())
{
FlowRule
flowRule
=
fbe
.
getTarget
();
OpenFlowSwitch
sw
=
controller
.
getSwitch
(
Dpid
.
dpid
(
flowRule
.
deviceId
().
uri
()));
sws
.
add
(
new
Dpid
(
sw
.
getId
()));
FlowModBuilder
builder
=
new
FlowModBuilder
(
flowRule
,
sw
.
factory
());
switch
(
fbe
.
getOperator
())
{
case
ADD:
mod
=
builder
.
buildFlowAdd
();
break
;
case
REMOVE:
mod
=
builder
.
buildFlowDel
();
break
;
case
MODIFY:
mod
=
builder
.
buildFlowMod
();
break
;
default
:
log
.
error
(
"Unsupported batch operation {}"
,
fbe
.
getOperator
());
}
if
(
mod
!=
null
)
{
sw
.
sendMsg
(
mod
);
fmXids
.
put
(
mod
.
getXid
(),
fbe
);
}
else
{
log
.
error
(
"Conversion of flowrule {} failed."
,
flowRule
);
}
}
InstallationFuture
installation
=
new
InstallationFuture
(
sws
,
fmXids
);
for
(
Long
xid
:
fmXids
.
keySet
())
{
pendingFMs
.
put
(
xid
,
installation
);
}
pendingFutures
.
put
(
U32
.
f
(
batch
.
hashCode
()),
installation
);
installation
.
verify
(
batch
.
hashCode
());
return
installation
;
}
//TODO: InternalFlowRuleProvider listening to stats and error and flowremoved.
// possibly barriers as well. May not be internal at all...
private
class
InternalFlowProvider
implements
OpenFlowSwitchListener
,
OpenFlowEventListener
{
...
...
@@ -175,7 +228,6 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
InstallationFuture
future
=
null
;
switch
(
msg
.
getType
())
{
case
FLOW_REMOVED:
//TODO: make this better
OFFlowRemoved
removed
=
(
OFFlowRemoved
)
msg
;
FlowEntry
fr
=
new
FlowEntryBuilder
(
dpid
,
removed
).
build
();
...
...
@@ -191,7 +243,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
break
;
case
ERROR:
future
=
pendingF
uture
s
.
get
(
msg
.
getXid
());
future
=
pendingF
M
s
.
get
(
msg
.
getXid
());
if
(
future
!=
null
)
{
future
.
fail
((
OFErrorMsg
)
msg
,
dpid
);
}
...
...
@@ -203,10 +255,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
@Override
public
void
roleAssertFailed
(
Dpid
dpid
,
RoleState
role
)
{
// TODO Auto-generated method stub
}
public
void
roleAssertFailed
(
Dpid
dpid
,
RoleState
role
)
{}
private
synchronized
void
pushFlowMetrics
(
Dpid
dpid
,
OFStatsReply
stats
)
{
if
(
stats
.
getStatsType
()
!=
OFStatsType
.
FLOW
)
{
...
...
@@ -230,7 +279,6 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
private
boolean
tableMissRule
(
Dpid
dpid
,
OFFlowStatsEntry
reply
)
{
// TODO NEED TO FIND A BETTER WAY TO AVOID DOING THIS
if
(
reply
.
getVersion
().
equals
(
OFVersion
.
OF_10
)
||
reply
.
getMatch
().
getMatchFields
().
iterator
().
hasNext
())
{
return
false
;
...
...
@@ -251,104 +299,91 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
return
false
;
}
}
@Override
public
Future
<
Void
>
executeBatch
(
BatchOperation
<
FlowRuleBatchEntry
>
batch
)
{
final
Set
<
Dpid
>
sws
=
new
HashSet
<
Dpid
>();
for
(
FlowRuleBatchEntry
fbe
:
batch
.
getOperations
())
{
FlowRule
flowRule
=
fbe
.
getTarget
();
OpenFlowSwitch
sw
=
controller
.
getSwitch
(
Dpid
.
dpid
(
flowRule
.
deviceId
().
uri
()));
sws
.
add
(
new
Dpid
(
sw
.
getId
()));
switch
(
fbe
.
getOperator
())
{
case
ADD:
//TODO: Track XID for each flowmod
sw
.
sendMsg
(
new
FlowModBuilder
(
flowRule
,
sw
.
factory
()).
buildFlowAdd
());
break
;
case
REMOVE:
//TODO: Track XID for each flowmod
sw
.
sendMsg
(
new
FlowModBuilder
(
flowRule
,
sw
.
factory
()).
buildFlowDel
());
break
;
case
MODIFY:
//TODO: Track XID for each flowmod
sw
.
sendMsg
(
new
FlowModBuilder
(
flowRule
,
sw
.
factory
()).
buildFlowMod
());
break
;
default
:
log
.
error
(
"Unsupported batch operation {}"
,
fbe
.
getOperator
());
}
}
InstallationFuture
installation
=
new
InstallationFuture
(
sws
);
pendingFutures
.
put
(
U32
.
f
(
batch
.
hashCode
()),
installation
);
installation
.
verify
(
batch
.
hashCode
());
return
installation
;
}
private
class
InstallationFuture
implements
Future
<
Void
>
{
private
class
InstallationFuture
implements
Future
<
CompletedBatchOperation
>
{
private
final
Set
<
Dpid
>
sws
;
private
final
AtomicBoolean
ok
=
new
AtomicBoolean
(
true
);
private
final
Map
<
Long
,
FlowRuleBatchEntry
>
fms
;
private
final
List
<
FlowEntry
>
offendingFlowMods
=
Lists
.
newLinkedList
();
private
final
CountDownLatch
countDownLatch
;
private
Integer
pendingXid
;
private
BatchState
state
;
public
InstallationFuture
(
Set
<
Dpid
>
sws
)
{
public
InstallationFuture
(
Set
<
Dpid
>
sws
,
Map
<
Long
,
FlowRuleBatchEntry
>
fmXids
)
{
this
.
state
=
BatchState
.
STARTED
;
this
.
sws
=
sws
;
this
.
fms
=
fmXids
;
countDownLatch
=
new
CountDownLatch
(
sws
.
size
());
}
public
void
fail
(
OFErrorMsg
msg
,
Dpid
dpid
)
{
ok
.
set
(
false
);
//TODO add reason to flowentry
FlowEntry
fe
=
null
;
FlowRuleBatchEntry
fbe
=
fms
.
get
(
msg
.
getXid
());
FlowRule
offending
=
fbe
.
getTarget
();
//TODO handle specific error msgs
//offendingFlowMods.add(new FlowEntryBuilder(dpid, msg.));
switch
(
msg
.
getErrType
())
{
case
BAD_ACTION:
OFBadActionErrorMsg
bad
=
(
OFBadActionErrorMsg
)
msg
;
fe
=
new
DefaultFlowEntry
(
offending
,
bad
.
getErrType
().
ordinal
(),
bad
.
getCode
().
ordinal
());
break
;
case
BAD_INSTRUCTION:
OFBadInstructionErrorMsg
badins
=
(
OFBadInstructionErrorMsg
)
msg
;
fe
=
new
DefaultFlowEntry
(
offending
,
badins
.
getErrType
().
ordinal
(),
badins
.
getCode
().
ordinal
());
break
;
case
BAD_MATCH:
OFBadMatchErrorMsg
badMatch
=
(
OFBadMatchErrorMsg
)
msg
;
fe
=
new
DefaultFlowEntry
(
offending
,
badMatch
.
getErrType
().
ordinal
(),
badMatch
.
getCode
().
ordinal
());
break
;
case
BAD_REQUEST:
break
;
case
EXPERIMENTER:
OFBadRequestErrorMsg
badReq
=
(
OFBadRequestErrorMsg
)
msg
;
fe
=
new
DefaultFlowEntry
(
offending
,
badReq
.
getErrType
().
ordinal
(),
badReq
.
getCode
().
ordinal
());
break
;
case
FLOW_MOD_FAILED:
OFFlowModFailedErrorMsg
fmFail
=
(
OFFlowModFailedErrorMsg
)
msg
;
fe
=
new
DefaultFlowEntry
(
offending
,
fmFail
.
getErrType
().
ordinal
(),
fmFail
.
getCode
().
ordinal
());
break
;
case
EXPERIMENTER:
case
GROUP_MOD_FAILED:
break
;
case
HELLO_FAILED:
break
;
case
METER_MOD_FAILED:
break
;
case
PORT_MOD_FAILED:
break
;
case
QUEUE_OP_FAILED:
break
;
case
ROLE_REQUEST_FAILED:
break
;
case
SWITCH_CONFIG_FAILED:
break
;
case
TABLE_FEATURES_FAILED:
break
;
case
TABLE_MOD_FAILED:
fe
=
new
DefaultFlowEntry
(
offending
,
msg
.
getErrType
().
ordinal
(),
0
);
break
;
default
:
break
;
log
.
error
(
"Unknown error type {}"
,
msg
.
getErrType
())
;
}
offendingFlowMods
.
add
(
fe
);
}
public
void
satisfyRequirement
(
Dpid
dpid
)
{
log
.
warn
(
"Satisfaction from switch {}"
,
dpid
);
sws
.
remove
(
dpid
);
countDownLatch
.
countDown
();
cleanUp
();
}
public
void
verify
(
Integer
id
)
{
pendingXid
=
id
;
for
(
Dpid
dpid
:
sws
)
{
OpenFlowSwitch
sw
=
controller
.
getSwitch
(
dpid
);
OFBarrierRequest
.
Builder
builder
=
sw
.
factory
()
...
...
@@ -356,41 +391,59 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
.
setXid
(
id
);
sw
.
sendMsg
(
builder
.
build
());
}
}
@Override
public
boolean
cancel
(
boolean
mayInterruptIfRunning
)
{
// TODO Auto-generated method stub
return
false
;
this
.
state
=
BatchState
.
CANCELLED
;
cleanUp
();
for
(
FlowRuleBatchEntry
fbe
:
fms
.
values
())
{
if
(
fbe
.
getOperator
()
==
FlowRuleOperation
.
ADD
||
fbe
.
getOperator
()
==
FlowRuleOperation
.
MODIFY
)
{
removeFlowRule
(
fbe
.
getTarget
());
}
else
if
(
fbe
.
getOperator
()
==
FlowRuleOperation
.
REMOVE
)
{
applyRule
(
fbe
.
getTarget
());
}
}
return
isCancelled
();
}
@Override
public
boolean
isCancelled
()
{
// TODO Auto-generated method stub
return
false
;
return
this
.
state
==
BatchState
.
CANCELLED
;
}
@Override
public
boolean
isDone
()
{
return
sws
.
isEmpty
()
;
return
this
.
state
==
BatchState
.
FINISHED
;
}
@Override
public
Void
get
()
throws
InterruptedException
,
ExecutionException
{
public
CompletedBatchOperation
get
()
throws
InterruptedException
,
ExecutionException
{
countDownLatch
.
await
();
//return offendingFlowMods
;
return
n
ull
;
this
.
state
=
BatchState
.
FINISHED
;
return
n
ew
CompletedBatchOperation
(
ok
.
get
(),
offendingFlowMods
)
;
}
@Override
public
Void
get
(
long
timeout
,
TimeUnit
unit
)
public
CompletedBatchOperation
get
(
long
timeout
,
TimeUnit
unit
)
throws
InterruptedException
,
ExecutionException
,
TimeoutException
{
countDownLatch
.
await
(
timeout
,
unit
);
//return offendingFlowMods;
return
null
;
if
(
countDownLatch
.
await
(
timeout
,
unit
))
{
this
.
state
=
BatchState
.
FINISHED
;
return
new
CompletedBatchOperation
(
ok
.
get
(),
offendingFlowMods
);
}
throw
new
TimeoutException
();
}
private
void
cleanUp
()
{
if
(
sws
.
isEmpty
())
{
pendingFutures
.
remove
(
pendingXid
);
for
(
Long
xid
:
fms
.
keySet
())
{
pendingFMs
.
remove
(
xid
);
}
}
}
}
...
...
Please
register
or
login
to post a comment