Committed by
Gerrit Code Review
Fix for ticket no. ONOS-4754
Change-Id: Ic878096dcf8957e9903e22e6f4bf2e8ecda6897c
Showing
2 changed files
with
22 additions
and
3 deletions
... | @@ -124,6 +124,14 @@ public class OpenFlowControllerImpl implements OpenFlowController { | ... | @@ -124,6 +124,14 @@ public class OpenFlowControllerImpl implements OpenFlowController { |
124 | private final ExecutorService executorBarrier = | 124 | private final ExecutorService executorBarrier = |
125 | Executors.newFixedThreadPool(4, groupedThreads("onos/of", "event-barrier-%d", log)); | 125 | Executors.newFixedThreadPool(4, groupedThreads("onos/of", "event-barrier-%d", log)); |
126 | 126 | ||
127 | + //Separate executor thread for handling error messages and barrier replies for same failed | ||
128 | + // transactions to avoid context switching of thread | ||
129 | + protected ExecutorService executorErrorMsgs = | ||
130 | + Executors.newSingleThreadExecutor(groupedThreads("onos/of", "event-error-msg-%d", log)); | ||
131 | + | ||
132 | + //concurrent hashmap to track failed transactions | ||
133 | + protected ConcurrentMap<Long, Boolean> errorMsgs = | ||
134 | + new ConcurrentHashMap<>(); | ||
127 | protected ConcurrentMap<Dpid, OpenFlowSwitch> connectedSwitches = | 135 | protected ConcurrentMap<Dpid, OpenFlowSwitch> connectedSwitches = |
128 | new ConcurrentHashMap<>(); | 136 | new ConcurrentHashMap<>(); |
129 | protected ConcurrentMap<Dpid, OpenFlowSwitch> activeMasterSwitches = | 137 | protected ConcurrentMap<Dpid, OpenFlowSwitch> activeMasterSwitches = |
... | @@ -299,7 +307,8 @@ public class OpenFlowControllerImpl implements OpenFlowController { | ... | @@ -299,7 +307,8 @@ public class OpenFlowControllerImpl implements OpenFlowController { |
299 | } | 307 | } |
300 | case ERROR: | 308 | case ERROR: |
301 | log.debug("Received error message from {}: {}", dpid, msg); | 309 | log.debug("Received error message from {}: {}", dpid, msg); |
302 | - executorMsgs.execute(new OFMessageHandler(dpid, msg)); | 310 | + errorMsgs.putIfAbsent(msg.getXid(), true); |
311 | + executorErrorMsgs.execute(new OFMessageHandler(dpid, msg)); | ||
303 | break; | 312 | break; |
304 | case STATS_REPLY: | 313 | case STATS_REPLY: |
305 | OFStatsReply reply = (OFStatsReply) msg; | 314 | OFStatsReply reply = (OFStatsReply) msg; |
... | @@ -407,7 +416,14 @@ public class OpenFlowControllerImpl implements OpenFlowController { | ... | @@ -407,7 +416,14 @@ public class OpenFlowControllerImpl implements OpenFlowController { |
407 | } | 416 | } |
408 | break; | 417 | break; |
409 | case BARRIER_REPLY: | 418 | case BARRIER_REPLY: |
419 | + if (errorMsgs.containsKey(msg.getXid())) { | ||
420 | + //To make oferror msg handling and corresponding barrier reply serialized, | ||
421 | + // executorErrorMsgs is used for both transaction | ||
422 | + errorMsgs.remove(msg.getXid()); | ||
423 | + executorErrorMsgs.execute(new OFMessageHandler(dpid, msg)); | ||
424 | + } else { | ||
410 | executorBarrier.execute(new OFMessageHandler(dpid, msg)); | 425 | executorBarrier.execute(new OFMessageHandler(dpid, msg)); |
426 | + } | ||
411 | break; | 427 | break; |
412 | case EXPERIMENTER: | 428 | case EXPERIMENTER: |
413 | long experimenter = ((OFExperimenter) msg).getExperimenter(); | 429 | long experimenter = ((OFExperimenter) msg).getExperimenter(); | ... | ... |
... | @@ -53,6 +53,7 @@ public class OpenFlowControllerImplPacketsTest { | ... | @@ -53,6 +53,7 @@ public class OpenFlowControllerImplPacketsTest { |
53 | TestExecutorService statsExecutorService; | 53 | TestExecutorService statsExecutorService; |
54 | TestExecutorService pktInExecutorService; | 54 | TestExecutorService pktInExecutorService; |
55 | TestExecutorService flowRmvExecutorService; | 55 | TestExecutorService flowRmvExecutorService; |
56 | + TestExecutorService errorMsgExecutorService; | ||
56 | 57 | ||
57 | /** | 58 | /** |
58 | * Mock packet listener that accumulates packets. | 59 | * Mock packet listener that accumulates packets. |
... | @@ -114,10 +115,12 @@ public class OpenFlowControllerImplPacketsTest { | ... | @@ -114,10 +115,12 @@ public class OpenFlowControllerImplPacketsTest { |
114 | statsExecutorService = new TestExecutorService(); | 115 | statsExecutorService = new TestExecutorService(); |
115 | pktInExecutorService = new TestExecutorService(); | 116 | pktInExecutorService = new TestExecutorService(); |
116 | flowRmvExecutorService = new TestExecutorService(); | 117 | flowRmvExecutorService = new TestExecutorService(); |
118 | + errorMsgExecutorService = new TestExecutorService(); | ||
117 | 119 | ||
118 | controller.executorMsgs = statsExecutorService; | 120 | controller.executorMsgs = statsExecutorService; |
119 | controller.executorPacketIn = pktInExecutorService; | 121 | controller.executorPacketIn = pktInExecutorService; |
120 | controller.executorFlowRemoved = flowRmvExecutorService; | 122 | controller.executorFlowRemoved = flowRmvExecutorService; |
123 | + controller.executorErrorMsgs = errorMsgExecutorService; | ||
121 | 124 | ||
122 | } | 125 | } |
123 | 126 | ||
... | @@ -168,8 +171,8 @@ public class OpenFlowControllerImplPacketsTest { | ... | @@ -168,8 +171,8 @@ public class OpenFlowControllerImplPacketsTest { |
168 | agent.addConnectedSwitch(dpid1, switch1); | 171 | agent.addConnectedSwitch(dpid1, switch1); |
169 | OfMessageAdapter errorPacket = new OfMessageAdapter(OFType.ERROR); | 172 | OfMessageAdapter errorPacket = new OfMessageAdapter(OFType.ERROR); |
170 | controller.processPacket(dpid1, errorPacket); | 173 | controller.processPacket(dpid1, errorPacket); |
171 | - assertThat(statsExecutorService.submittedMessages(), hasSize(1)); | 174 | + assertThat(errorMsgExecutorService.submittedMessages(), hasSize(1)); |
172 | - assertThat(statsExecutorService.submittedMessages().get(0), is(errorPacket)); | 175 | + assertThat(errorMsgExecutorService.submittedMessages().get(0), is(errorPacket)); |
173 | } | 176 | } |
174 | 177 | ||
175 | /** | 178 | /** | ... | ... |
-
Please register or login to post a comment