TestInterruptThreadNames.py
5.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
"""Test that we get thread names when interrupting a process."""
import time
import lldb
from lldbsuite.test.decorators import *
from lldbsuite.test.lldbtest import *
from lldbsuite.test import lldbutil
class TestInterruptThreadNames(TestBase):
mydir = TestBase.compute_mydir(__file__)
@skipUnlessDarwin
@add_test_categories(['pyapi'])
def test_with_python_api(self):
"""Test that we get thread names when interrupting a process."""
self.build()
exe = self.getBuildArtifact("a.out")
target = self.dbg.CreateTarget(exe)
self.assertTrue(target, VALID_TARGET)
launch_info = lldb.SBLaunchInfo(None)
error = lldb.SBError()
self.dbg.SetAsync(True)
process = target.Launch(launch_info, error)
self.assertTrue(process, PROCESS_IS_VALID)
listener = self.dbg.GetListener()
broadcaster = process.GetBroadcaster()
rc = broadcaster.AddListener(listener, lldb.SBProcess.eBroadcastBitStateChanged)
self.assertTrue(rc != 0, "Unable to add listener to process")
self.assertTrue(self.wait_for_running(process, listener), "Check that process is up and running")
inferior_set_up = self.wait_until_program_setup_complete(process, listener)
self.assertTrue(inferior_set_up.IsValid() and inferior_set_up.GetValueAsSigned() == 1, "Check that the program was able to create its threads within the allotted time")
self.check_number_of_threads(process)
main_thread = lldb.SBThread()
second_thread = lldb.SBThread()
third_thread = lldb.SBThread()
for idx in range(0, process.GetNumThreads()):
t = process.GetThreadAtIndex(idx)
if t.GetName() == "main thread":
main_thread = t
if t.GetName() == "second thread":
second_thread = t
if t.GetName() == "third thread":
third_thread = t
self.check_expected_threads_present(main_thread, second_thread, third_thread)
process.Kill()
# The process will set a global variable 'threads_up_and_running' to 1 when
# it has has completed its setup. Sleep for one second, pause the program,
# check to see if the global has that value, and continue if it does not.
def wait_until_program_setup_complete(self, process, listener):
inferior_set_up = lldb.SBValue()
retry = 5
while retry > 0:
arch = self.getArchitecture()
# when running the testsuite against a remote arm device, it may take
# a little longer for the process to start up. Use a "can't possibly take
# longer than this" value.
if arch == 'arm64' or arch == 'armv7':
time.sleep(10)
else:
time.sleep(1)
process.SendAsyncInterrupt()
self.assertTrue(self.wait_for_stop(process, listener), "Check that process is paused")
inferior_set_up = process.GetTarget().CreateValueFromExpression("threads_up_and_running", "threads_up_and_running")
if inferior_set_up.IsValid() and inferior_set_up.GetValueAsSigned() == 1:
retry = 0
else:
process.Continue()
retry = retry - 1
return inferior_set_up
# Listen to the process events until we get an event saying that the process is
# running. Retry up to five times in case we get other events that are not
# what we're looking for.
def wait_for_running(self, process, listener):
retry_count = 5
if process.GetState() == lldb.eStateRunning:
return True
while retry_count > 0:
event = lldb.SBEvent()
listener.WaitForEvent(2, event)
if event.GetType() == lldb.SBProcess.eBroadcastBitStateChanged:
if process.GetState() == lldb.eStateRunning:
return True
retry_count = retry_count - 1
return False
# Listen to the process events until we get an event saying the process is
# stopped. Retry up to five times in case we get other events that we are
# not looking for.
def wait_for_stop(self, process, listener):
retry_count = 5
if process.GetState() == lldb.eStateStopped or process.GetState() == lldb.eStateCrashed or process.GetState() == lldb.eStateDetached or process.GetState() == lldb.eStateExited:
return True
while retry_count > 0:
event = lldb.SBEvent()
listener.WaitForEvent(2, event)
if event.GetType() == lldb.SBProcess.eBroadcastBitStateChanged:
if process.GetState() == lldb.eStateStopped or process.GetState() == lldb.eStateCrashed or process.GetState() == lldb.eStateDetached or process.GetState() == lldb.eStateExited:
return True
if process.GetState() == lldb.eStateCrashed or process.GetState() == lldb.eStateDetached or process.GetState() == lldb.eStateExited:
return False
retry_count = retry_count - 1
return False
def check_number_of_threads(self, process):
self.assertTrue(
process.GetNumThreads() == 3,
"Check that the process has three threads when sitting at the stopper() breakpoint")
def check_expected_threads_present(self, main_thread, second_thread, third_thread):
self.assertTrue(
main_thread.IsValid() and second_thread.IsValid() and third_thread.IsValid(),
"Got all three expected threads")