Imported Upstream version 6.10.0.49

Former-commit-id: 1d6753294b2993e1fbf92de9366bb9544db4189b
This commit is contained in:
Xamarin Public Jenkins (auto-signing)
2020-01-16 16:38:04 +00:00
parent d94e79959b
commit 468663ddbb
48518 changed files with 2789335 additions and 61176 deletions

View File

@@ -0,0 +1 @@
BasedOnStyle: LLVM

View File

@@ -0,0 +1,8 @@
LEVEL = ../../make
override CFLAGS_EXTRAS += -D__STDC_LIMIT_MACROS -D__STDC_FORMAT_MACROS
ENABLE_THREADS := YES
CXX_SOURCES := main.cpp
MAKE_DSYM :=NO
include $(LEVEL)/Makefile.rules

View File

@@ -0,0 +1,67 @@
from __future__ import print_function
import gdbremote_testcase
import lldbgdbserverutils
from lldbsuite.test.decorators import *
from lldbsuite.test.lldbtest import *
from lldbsuite.test import lldbutil
class TestGdbRemoteAttach(gdbremote_testcase.GdbRemoteTestCaseBase):
mydir = TestBase.compute_mydir(__file__)
@skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
def attach_with_vAttach(self):
# Start the inferior, start the debug monitor, nothing is attached yet.
procs = self.prep_debug_monitor_and_inferior(
inferior_args=["sleep:60"])
self.assertIsNotNone(procs)
# Make sure the target process has been launched.
inferior = procs.get("inferior")
self.assertIsNotNone(inferior)
self.assertTrue(inferior.pid > 0)
self.assertTrue(
lldbgdbserverutils.process_is_running(
inferior.pid, True))
# Add attach packets.
self.test_sequence.add_log_lines([
# Do the attach.
"read packet: $vAttach;{:x}#00".format(inferior.pid),
# Expect a stop notification from the attach.
{"direction": "send",
"regex": r"^\$T([0-9a-fA-F]{2})[^#]*#[0-9a-fA-F]{2}$",
"capture": {1: "stop_signal_hex"}},
], True)
self.add_process_info_collection_packets()
# Run the stream
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
# Gather process info response
process_info = self.parse_process_info_response(context)
self.assertIsNotNone(process_info)
# Ensure the process id matches what we expected.
pid_text = process_info.get('pid', None)
self.assertIsNotNone(pid_text)
reported_pid = int(pid_text, base=16)
self.assertEqual(reported_pid, inferior.pid)
@debugserver_test
def test_attach_with_vAttach_debugserver(self):
self.init_debugserver_test()
self.build()
self.set_inferior_startup_attach_manually()
self.attach_with_vAttach()
@llgs_test
def test_attach_with_vAttach_llgs(self):
self.init_llgs_test()
self.build()
self.set_inferior_startup_attach_manually()
self.attach_with_vAttach()

View File

@@ -0,0 +1,220 @@
from __future__ import print_function
import gdbremote_testcase
from lldbsuite.test.decorators import *
from lldbsuite.test.lldbtest import *
from lldbsuite.test import lldbutil
class TestGdbRemoteAuxvSupport(gdbremote_testcase.GdbRemoteTestCaseBase):
mydir = TestBase.compute_mydir(__file__)
AUXV_SUPPORT_FEATURE_NAME = "qXfer:auxv:read"
@skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
def has_auxv_support(self):
inferior_args = ["message:main entered", "sleep:5"]
procs = self.prep_debug_monitor_and_inferior(
inferior_args=inferior_args)
# Don't do anything until we match the launched inferior main entry output.
# Then immediately interrupt the process.
# This prevents auxv data being asked for before it's ready and leaves
# us in a stopped state.
self.test_sequence.add_log_lines([
# Start the inferior...
"read packet: $c#63",
# ... match output....
{"type": "output_match", "regex": self.maybe_strict_output_regex(
r"message:main entered\r\n")},
], True)
# ... then interrupt.
self.add_interrupt_packets()
self.add_qSupported_packets()
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
features = self.parse_qSupported_response(context)
return self.AUXV_SUPPORT_FEATURE_NAME in features and features[
self.AUXV_SUPPORT_FEATURE_NAME] == "+"
def get_raw_auxv_data(self):
# Start up llgs and inferior, and check for auxv support.
if not self.has_auxv_support():
self.skipTest("auxv data not supported")
# Grab pointer size for target. We'll assume that is equivalent to an unsigned long on the target.
# Auxv is specified in terms of pairs of unsigned longs.
self.reset_test_sequence()
self.add_process_info_collection_packets()
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
proc_info = self.parse_process_info_response(context)
self.assertIsNotNone(proc_info)
self.assertTrue("ptrsize" in proc_info)
word_size = int(proc_info["ptrsize"])
OFFSET = 0
LENGTH = 0x400
# Grab the auxv data.
self.reset_test_sequence()
self.test_sequence.add_log_lines(
[
"read packet: $qXfer:auxv:read::{:x},{:x}:#00".format(
OFFSET,
LENGTH),
{
"direction": "send",
"regex": re.compile(
r"^\$([^E])(.*)#[0-9a-fA-F]{2}$",
re.MULTILINE | re.DOTALL),
"capture": {
1: "response_type",
2: "content_raw"}}],
True)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
# Ensure we end up with all auxv data in one packet.
# FIXME don't assume it all comes back in one packet.
self.assertEqual(context.get("response_type"), "l")
# Decode binary data.
content_raw = context.get("content_raw")
self.assertIsNotNone(content_raw)
return (word_size, self.decode_gdbremote_binary(content_raw))
def supports_auxv(self):
# When non-auxv platforms support llgs, skip the test on platforms
# that don't support auxv.
self.assertTrue(self.has_auxv_support())
#
# We skip the "supports_auxv" test on debugserver. The rest of the tests
# appropriately skip the auxv tests if the support flag is not present
# in the qSupported response, so the debugserver test bits are still there
# in case debugserver code one day does have auxv support and thus those
# tests don't get skipped.
#
@llgs_test
def test_supports_auxv_llgs(self):
self.init_llgs_test()
self.build()
self.set_inferior_startup_launch()
self.supports_auxv()
def auxv_data_is_correct_size(self):
(word_size, auxv_data) = self.get_raw_auxv_data()
self.assertIsNotNone(auxv_data)
# Ensure auxv data is a multiple of 2*word_size (there should be two
# unsigned long fields per auxv entry).
self.assertEqual(len(auxv_data) % (2 * word_size), 0)
# print("auxv contains {} entries".format(len(auxv_data) / (2*word_size)))
@debugserver_test
def test_auxv_data_is_correct_size_debugserver(self):
self.init_debugserver_test()
self.build()
self.set_inferior_startup_launch()
self.auxv_data_is_correct_size()
@llgs_test
def test_auxv_data_is_correct_size_llgs(self):
self.init_llgs_test()
self.build()
self.set_inferior_startup_launch()
self.auxv_data_is_correct_size()
def auxv_keys_look_valid(self):
(word_size, auxv_data) = self.get_raw_auxv_data()
self.assertIsNotNone(auxv_data)
# Grab endian.
self.reset_test_sequence()
self.add_process_info_collection_packets()
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
process_info = self.parse_process_info_response(context)
self.assertIsNotNone(process_info)
endian = process_info.get("endian")
self.assertIsNotNone(endian)
auxv_dict = self.build_auxv_dict(endian, word_size, auxv_data)
self.assertIsNotNone(auxv_dict)
# Verify keys look reasonable.
for auxv_key in auxv_dict:
self.assertTrue(auxv_key >= 1)
self.assertTrue(auxv_key <= 1000)
# print("auxv dict: {}".format(auxv_dict))
@debugserver_test
def test_auxv_keys_look_valid_debugserver(self):
self.init_debugserver_test()
self.build()
self.set_inferior_startup_launch()
self.auxv_keys_look_valid()
@llgs_test
def test_auxv_keys_look_valid_llgs(self):
self.init_llgs_test()
self.build()
self.set_inferior_startup_launch()
self.auxv_keys_look_valid()
def auxv_chunked_reads_work(self):
# Verify that multiple smaller offset,length reads of auxv data
# return the same data as a single larger read.
# Grab the auxv data with a single large read here.
(word_size, auxv_data) = self.get_raw_auxv_data()
self.assertIsNotNone(auxv_data)
# Grab endian.
self.reset_test_sequence()
self.add_process_info_collection_packets()
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
process_info = self.parse_process_info_response(context)
self.assertIsNotNone(process_info)
endian = process_info.get("endian")
self.assertIsNotNone(endian)
auxv_dict = self.build_auxv_dict(endian, word_size, auxv_data)
self.assertIsNotNone(auxv_dict)
iterated_auxv_data = self.read_binary_data_in_chunks(
"qXfer:auxv:read::", 2 * word_size)
self.assertIsNotNone(iterated_auxv_data)
auxv_dict_iterated = self.build_auxv_dict(
endian, word_size, iterated_auxv_data)
self.assertIsNotNone(auxv_dict_iterated)
# Verify both types of data collection returned same content.
self.assertEqual(auxv_dict_iterated, auxv_dict)
@debugserver_test
def test_auxv_chunked_reads_work_debugserver(self):
self.init_debugserver_test()
self.build()
self.set_inferior_startup_launch()
self.auxv_chunked_reads_work()
@llgs_test
def test_auxv_chunked_reads_work_llgs(self):
self.init_llgs_test()
self.build()
self.set_inferior_startup_launch()
self.auxv_chunked_reads_work()

View File

@@ -0,0 +1,160 @@
from __future__ import print_function
import gdbremote_testcase
from lldbsuite.test.decorators import *
from lldbsuite.test.lldbtest import *
from lldbsuite.test import lldbutil
class TestGdbRemoteExpeditedRegisters(
gdbremote_testcase.GdbRemoteTestCaseBase):
mydir = TestBase.compute_mydir(__file__)
@skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
def gather_expedited_registers(self):
# Setup the stub and set the gdb remote command stream.
procs = self.prep_debug_monitor_and_inferior(inferior_args=["sleep:2"])
self.test_sequence.add_log_lines([
# Start up the inferior.
"read packet: $c#63",
# Immediately tell it to stop. We want to see what it reports.
"read packet: {}".format(chr(3)),
{"direction": "send",
"regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$",
"capture": {1: "stop_result",
2: "key_vals_text"}},
], True)
# Run the gdb remote command stream.
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
# Pull out expedited registers.
key_vals_text = context.get("key_vals_text")
self.assertIsNotNone(key_vals_text)
expedited_registers = self.extract_registers_from_stop_notification(
key_vals_text)
self.assertIsNotNone(expedited_registers)
return expedited_registers
def stop_notification_contains_generic_register(
self, generic_register_name):
# Generate a stop reply, parse out expedited registers from stop
# notification.
expedited_registers = self.gather_expedited_registers()
self.assertIsNotNone(expedited_registers)
self.assertTrue(len(expedited_registers) > 0)
# Gather target register infos.
reg_infos = self.gather_register_infos()
# Find the generic register.
reg_info = self.find_generic_register_with_name(
reg_infos, generic_register_name)
self.assertIsNotNone(reg_info)
# Ensure the expedited registers contained it.
self.assertTrue(reg_info["lldb_register_index"] in expedited_registers)
# print("{} reg_info:{}".format(generic_register_name, reg_info))
def stop_notification_contains_any_registers(self):
# Generate a stop reply, parse out expedited registers from stop
# notification.
expedited_registers = self.gather_expedited_registers()
# Verify we have at least one expedited register.
self.assertTrue(len(expedited_registers) > 0)
@debugserver_test
def test_stop_notification_contains_any_registers_debugserver(self):
self.init_debugserver_test()
self.build()
self.set_inferior_startup_launch()
self.stop_notification_contains_any_registers()
@llgs_test
def test_stop_notification_contains_any_registers_llgs(self):
self.init_llgs_test()
self.build()
self.set_inferior_startup_launch()
self.stop_notification_contains_any_registers()
def stop_notification_contains_no_duplicate_registers(self):
# Generate a stop reply, parse out expedited registers from stop
# notification.
expedited_registers = self.gather_expedited_registers()
# Verify no expedited register was specified multiple times.
for (reg_num, value) in list(expedited_registers.items()):
if (isinstance(value, list)) and (len(value) > 0):
self.fail(
"expedited register number {} specified more than once ({} times)".format(
reg_num, len(value)))
@debugserver_test
def test_stop_notification_contains_no_duplicate_registers_debugserver(
self):
self.init_debugserver_test()
self.build()
self.set_inferior_startup_launch()
self.stop_notification_contains_no_duplicate_registers()
@llgs_test
def test_stop_notification_contains_no_duplicate_registers_llgs(self):
self.init_llgs_test()
self.build()
self.set_inferior_startup_launch()
self.stop_notification_contains_no_duplicate_registers()
def stop_notification_contains_pc_register(self):
self.stop_notification_contains_generic_register("pc")
@debugserver_test
def test_stop_notification_contains_pc_register_debugserver(self):
self.init_debugserver_test()
self.build()
self.set_inferior_startup_launch()
self.stop_notification_contains_pc_register()
@llgs_test
def test_stop_notification_contains_pc_register_llgs(self):
self.init_llgs_test()
self.build()
self.set_inferior_startup_launch()
self.stop_notification_contains_pc_register()
def stop_notification_contains_fp_register(self):
self.stop_notification_contains_generic_register("fp")
@debugserver_test
def test_stop_notification_contains_fp_register_debugserver(self):
self.init_debugserver_test()
self.build()
self.set_inferior_startup_launch()
self.stop_notification_contains_fp_register()
@llgs_test
def test_stop_notification_contains_fp_register_llgs(self):
self.init_llgs_test()
self.build()
self.set_inferior_startup_launch()
self.stop_notification_contains_fp_register()
def stop_notification_contains_sp_register(self):
self.stop_notification_contains_generic_register("sp")
@debugserver_test
def test_stop_notification_contains_sp_register_debugserver(self):
self.init_debugserver_test()
self.build()
self.set_inferior_startup_launch()
self.stop_notification_contains_sp_register()
@llgs_test
def test_stop_notification_contains_sp_register_llgs(self):
self.init_llgs_test()
self.build()
self.set_inferior_startup_launch()
self.stop_notification_contains_sp_register()

View File

@@ -0,0 +1,131 @@
from __future__ import print_function
# lldb test suite imports
from lldbsuite.test.decorators import *
from lldbsuite.test.lldbtest import TestBase
# gdb-remote-specific imports
import lldbgdbserverutils
from gdbremote_testcase import GdbRemoteTestCaseBase
class TestGdbRemoteHostInfo(GdbRemoteTestCaseBase):
mydir = TestBase.compute_mydir(__file__)
KNOWN_HOST_INFO_KEYS = set([
"arch",
"cputype",
"cpusubtype",
"distribution_id",
"endian",
"hostname",
"ostype",
"os_build",
"os_kernel",
"os_version",
"ptrsize",
"triple",
"vendor",
"watchpoint_exceptions_received",
"default_packet_timeout",
])
DARWIN_REQUIRED_HOST_INFO_KEYS = set([
"cputype",
"cpusubtype",
"endian",
"ostype",
"ptrsize",
"vendor",
"watchpoint_exceptions_received"
])
def add_host_info_collection_packets(self):
self.test_sequence.add_log_lines(
["read packet: $qHostInfo#9b",
{"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$",
"capture": {1: "host_info_raw"}}],
True)
def parse_host_info_response(self, context):
# Ensure we have a host info response.
self.assertIsNotNone(context)
host_info_raw = context.get("host_info_raw")
self.assertIsNotNone(host_info_raw)
# Pull out key:value; pairs.
host_info_dict = {match.group(1): match.group(2)
for match in re.finditer(r"([^:]+):([^;]+);",
host_info_raw)}
import pprint
print("\nqHostInfo response:")
pprint.pprint(host_info_dict)
# Validate keys are known.
for (key, val) in list(host_info_dict.items()):
self.assertTrue(key in self.KNOWN_HOST_INFO_KEYS,
"unknown qHostInfo key: " + key)
self.assertIsNotNone(val)
# Return the key:val pairs.
return host_info_dict
def get_qHostInfo_response(self):
# Launch the debug monitor stub, attaching to the inferior.
server = self.connect_to_debug_monitor()
self.assertIsNotNone(server)
self.add_no_ack_remote_stream()
# Request qHostInfo and get response
self.add_host_info_collection_packets()
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
# Parse qHostInfo response.
host_info = self.parse_host_info_response(context)
self.assertIsNotNone(host_info)
self.assertGreater(len(host_info), 0, "qHostInfo should have returned "
"at least one key:val pair.")
return host_info
def validate_darwin_minimum_host_info_keys(self, host_info_dict):
self.assertIsNotNone(host_info_dict)
missing_keys = [key for key in self.DARWIN_REQUIRED_HOST_INFO_KEYS
if key not in host_info_dict]
self.assertEquals(0, len(missing_keys),
"qHostInfo is missing the following required "
"keys: " + str(missing_keys))
@skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
@debugserver_test
def test_qHostInfo_returns_at_least_one_key_val_pair_debugserver(self):
self.init_debugserver_test()
self.build()
self.get_qHostInfo_response()
@skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
@llgs_test
def test_qHostInfo_returns_at_least_one_key_val_pair_llgs(self):
self.init_llgs_test()
self.build()
self.get_qHostInfo_response()
@skipUnlessDarwin
@skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
@debugserver_test
def test_qHostInfo_contains_darwin_required_keys_debugserver(self):
self.init_debugserver_test()
self.build()
host_info_dict = self.get_qHostInfo_response()
self.validate_darwin_minimum_host_info_keys(host_info_dict)
@skipUnlessDarwin
@skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
@llgs_test
def test_qHostInfo_contains_darwin_required_keys_llgs(self):
self.init_llgs_test()
self.build()
host_info_dict = self.get_qHostInfo_response()
self.validate_darwin_minimum_host_info_keys(host_info_dict)

View File

@@ -0,0 +1,59 @@
from __future__ import print_function
import gdbremote_testcase
import lldbgdbserverutils
from lldbsuite.test.decorators import *
from lldbsuite.test.lldbtest import *
from lldbsuite.test import lldbutil
class TestGdbRemoteKill(gdbremote_testcase.GdbRemoteTestCaseBase):
mydir = TestBase.compute_mydir(__file__)
@skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
def attach_commandline_kill_after_initial_stop(self):
procs = self.prep_debug_monitor_and_inferior()
self.test_sequence.add_log_lines([
"read packet: $k#6b",
{"direction": "send", "regex": r"^\$X[0-9a-fA-F]+([^#]*)#[0-9A-Fa-f]{2}"},
], True)
if self.stub_sends_two_stop_notifications_on_kill:
# Add an expectation for a second X result for stubs that send two
# of these.
self.test_sequence.add_log_lines([
{"direction": "send", "regex": r"^\$X[0-9a-fA-F]+([^#]*)#[0-9A-Fa-f]{2}"},
], True)
self.expect_gdbremote_sequence()
# Wait a moment for completed and now-detached inferior process to
# clear.
time.sleep(1)
if not lldb.remote_platform:
# Process should be dead now. Reap results.
poll_result = procs["inferior"].poll()
self.assertIsNotNone(poll_result)
# Where possible, verify at the system level that the process is not
# running.
self.assertFalse(
lldbgdbserverutils.process_is_running(
procs["inferior"].pid, False))
@debugserver_test
def test_attach_commandline_kill_after_initial_stop_debugserver(self):
self.init_debugserver_test()
self.build()
self.set_inferior_startup_attach()
self.attach_commandline_kill_after_initial_stop()
@llgs_test
def test_attach_commandline_kill_after_initial_stop_llgs(self):
self.init_llgs_test()
self.build()
self.set_inferior_startup_attach()
self.attach_commandline_kill_after_initial_stop()

View File

@@ -0,0 +1,43 @@
from __future__ import print_function
import gdbremote_testcase
import lldbgdbserverutils
from lldbsuite.test.decorators import *
from lldbsuite.test.lldbtest import *
from lldbsuite.test import lldbutil
class TestGdbRemoteModuleInfo(gdbremote_testcase.GdbRemoteTestCaseBase):
mydir = TestBase.compute_mydir(__file__)
def module_info(self):
procs = self.prep_debug_monitor_and_inferior()
self.add_process_info_collection_packets()
context = self.expect_gdbremote_sequence()
info = self.parse_process_info_response(context)
self.test_sequence.add_log_lines([
'read packet: $jModulesInfo:[{"file":"%s","triple":"%s"}]]#00' % (
lldbutil.append_to_process_working_directory("a.out"),
info["triple"].decode('hex')),
{"direction": "send",
"regex": r'^\$\[{(.*)}\]\]#[0-9A-Fa-f]{2}',
"capture": {1: "spec"}},
], True)
context = self.expect_gdbremote_sequence()
spec = context.get("spec")
self.assertRegexpMatches(spec, '"file_path":".*"')
self.assertRegexpMatches(spec, '"file_offset":\d+')
self.assertRegexpMatches(spec, '"file_size":\d+')
self.assertRegexpMatches(spec, '"triple":"\w*-\w*-.*"')
self.assertRegexpMatches(spec, '"uuid":"[A-Fa-f0-9]+"')
@llgs_test
def test_module_info(self):
self.init_llgs_test()
self.build()
self.set_inferior_startup_launch()
self.module_info()

View File

@@ -0,0 +1,211 @@
from __future__ import print_function
import sys
import gdbremote_testcase
import lldbgdbserverutils
from lldbsuite.test.decorators import *
from lldbsuite.test.lldbtest import *
from lldbsuite.test import lldbutil
class TestGdbRemoteProcessInfo(gdbremote_testcase.GdbRemoteTestCaseBase):
mydir = TestBase.compute_mydir(__file__)
def qProcessInfo_returns_running_process(self):
procs = self.prep_debug_monitor_and_inferior()
self.add_process_info_collection_packets()
# Run the stream
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
# Gather process info response
process_info = self.parse_process_info_response(context)
self.assertIsNotNone(process_info)
# Ensure the process id looks reasonable.
pid_text = process_info.get("pid")
self.assertIsNotNone(pid_text)
pid = int(pid_text, base=16)
self.assertNotEqual(0, pid)
# If possible, verify that the process is running.
self.assertTrue(lldbgdbserverutils.process_is_running(pid, True))
@debugserver_test
@skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
def test_qProcessInfo_returns_running_process_debugserver(self):
self.init_debugserver_test()
self.build()
self.qProcessInfo_returns_running_process()
@llgs_test
def test_qProcessInfo_returns_running_process_llgs(self):
self.init_llgs_test()
self.build()
self.qProcessInfo_returns_running_process()
def attach_commandline_qProcessInfo_reports_correct_pid(self):
procs = self.prep_debug_monitor_and_inferior()
self.assertIsNotNone(procs)
self.add_process_info_collection_packets()
# Run the stream
context = self.expect_gdbremote_sequence(timeout_seconds=8)
self.assertIsNotNone(context)
# Gather process info response
process_info = self.parse_process_info_response(context)
self.assertIsNotNone(process_info)
# Ensure the process id matches what we expected.
pid_text = process_info.get('pid', None)
self.assertIsNotNone(pid_text)
reported_pid = int(pid_text, base=16)
self.assertEqual(reported_pid, procs["inferior"].pid)
@debugserver_test
@skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
def test_attach_commandline_qProcessInfo_reports_correct_pid_debugserver(
self):
self.init_debugserver_test()
self.build()
self.set_inferior_startup_attach()
self.attach_commandline_qProcessInfo_reports_correct_pid()
@llgs_test
def test_attach_commandline_qProcessInfo_reports_correct_pid_llgs(self):
self.init_llgs_test()
self.build()
self.set_inferior_startup_attach()
self.attach_commandline_qProcessInfo_reports_correct_pid()
def qProcessInfo_reports_valid_endian(self):
procs = self.prep_debug_monitor_and_inferior()
self.add_process_info_collection_packets()
# Run the stream
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
# Gather process info response
process_info = self.parse_process_info_response(context)
self.assertIsNotNone(process_info)
# Ensure the process id looks reasonable.
endian = process_info.get("endian")
self.assertIsNotNone(endian)
self.assertTrue(endian in ["little", "big", "pdp"])
@debugserver_test
@skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
def test_qProcessInfo_reports_valid_endian_debugserver(self):
self.init_debugserver_test()
self.build()
self.qProcessInfo_reports_valid_endian()
@llgs_test
def test_qProcessInfo_reports_valid_endian_llgs(self):
self.init_llgs_test()
self.build()
self.qProcessInfo_reports_valid_endian()
def qProcessInfo_contains_keys(self, expected_key_set):
procs = self.prep_debug_monitor_and_inferior()
self.add_process_info_collection_packets()
# Run the stream
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
# Gather process info response
process_info = self.parse_process_info_response(context)
self.assertIsNotNone(process_info)
# Ensure the expected keys are present and non-None within the process
# info.
missing_key_set = set()
for expected_key in expected_key_set:
if expected_key not in process_info:
missing_key_set.add(expected_key)
self.assertEqual(
missing_key_set,
set(),
"the listed keys are missing in the qProcessInfo result")
def qProcessInfo_does_not_contain_keys(self, absent_key_set):
procs = self.prep_debug_monitor_and_inferior()
self.add_process_info_collection_packets()
# Run the stream
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
# Gather process info response
process_info = self.parse_process_info_response(context)
self.assertIsNotNone(process_info)
# Ensure the unexpected keys are not present
unexpected_key_set = set()
for unexpected_key in absent_key_set:
if unexpected_key in process_info:
unexpected_key_set.add(unexpected_key)
self.assertEqual(
unexpected_key_set,
set(),
"the listed keys were present but unexpected in qProcessInfo result")
@skipUnlessDarwin
@debugserver_test
@skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
def test_qProcessInfo_contains_cputype_cpusubtype_debugserver_darwin(self):
self.init_debugserver_test()
self.build()
self.qProcessInfo_contains_keys(set(['cputype', 'cpusubtype']))
@skipUnlessDarwin
@llgs_test
def test_qProcessInfo_contains_cputype_cpusubtype_llgs_darwin(self):
self.init_llgs_test()
self.build()
self.qProcessInfo_contains_keys(set(['cputype', 'cpusubtype']))
@skipUnlessPlatform(["linux"])
@llgs_test
def test_qProcessInfo_contains_triple_llgs_linux(self):
self.init_llgs_test()
self.build()
self.qProcessInfo_contains_keys(set(['triple']))
@skipUnlessDarwin
@debugserver_test
@skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
def test_qProcessInfo_does_not_contain_triple_debugserver_darwin(self):
self.init_debugserver_test()
self.build()
# We don't expect to see triple on darwin. If we do, we'll prefer triple
# to cputype/cpusubtype and skip some darwin-based ProcessGDBRemote ArchSpec setup
# for the remote Host and Process.
self.qProcessInfo_does_not_contain_keys(set(['triple']))
@skipUnlessDarwin
@llgs_test
def test_qProcessInfo_does_not_contain_triple_llgs_darwin(self):
self.init_llgs_test()
self.build()
# We don't expect to see triple on darwin. If we do, we'll prefer triple
# to cputype/cpusubtype and skip some darwin-based ProcessGDBRemote ArchSpec setup
# for the remote Host and Process.
self.qProcessInfo_does_not_contain_keys(set(['triple']))
@skipUnlessPlatform(["linux"])
@llgs_test
def test_qProcessInfo_does_not_contain_cputype_cpusubtype_llgs_linux(self):
self.init_llgs_test()
self.build()
self.qProcessInfo_does_not_contain_keys(set(['cputype', 'cpusubtype']))

View File

@@ -0,0 +1,137 @@
from __future__ import print_function
import gdbremote_testcase
from lldbsuite.test.decorators import *
from lldbsuite.test.lldbtest import *
from lldbsuite.test import lldbutil
class TestGdbRemoteRegisterState(gdbremote_testcase.GdbRemoteTestCaseBase):
"""Test QSaveRegisterState/QRestoreRegisterState support."""
mydir = TestBase.compute_mydir(__file__)
@skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
def grp_register_save_restore_works(self, with_suffix):
# Start up the process, use thread suffix, grab main thread id.
inferior_args = ["message:main entered", "sleep:5"]
procs = self.prep_debug_monitor_and_inferior(
inferior_args=inferior_args)
self.add_process_info_collection_packets()
self.add_register_info_collection_packets()
if with_suffix:
self.add_thread_suffix_request_packets()
self.add_threadinfo_collection_packets()
self.test_sequence.add_log_lines([
# Start the inferior...
"read packet: $c#63",
# ... match output....
{"type": "output_match", "regex": self.maybe_strict_output_regex(
r"message:main entered\r\n")},
], True)
# ... then interrupt.
self.add_interrupt_packets()
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
# Gather process info.
process_info = self.parse_process_info_response(context)
endian = process_info.get("endian")
self.assertIsNotNone(endian)
# Gather register info.
reg_infos = self.parse_register_info_packets(context)
self.assertIsNotNone(reg_infos)
self.add_lldb_register_index(reg_infos)
# Pull out the register infos that we think we can bit flip
# successfully.
gpr_reg_infos = [
reg_info for reg_info in reg_infos if self.is_bit_flippable_register(reg_info)]
self.assertTrue(len(gpr_reg_infos) > 0)
# Gather thread info.
if with_suffix:
threads = self.parse_threadinfo_packets(context)
self.assertIsNotNone(threads)
thread_id = threads[0]
self.assertIsNotNone(thread_id)
# print("Running on thread: 0x{:x}".format(thread_id))
else:
thread_id = None
# Save register state.
self.reset_test_sequence()
self.add_QSaveRegisterState_packets(thread_id)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
(success, state_id) = self.parse_QSaveRegisterState_response(context)
self.assertTrue(success)
self.assertIsNotNone(state_id)
# print("saved register state id: {}".format(state_id))
# Remember initial register values.
initial_reg_values = self.read_register_values(
gpr_reg_infos, endian, thread_id=thread_id)
# print("initial_reg_values: {}".format(initial_reg_values))
# Flip gpr register values.
(successful_writes, failed_writes) = self.flip_all_bits_in_each_register_value(
gpr_reg_infos, endian, thread_id=thread_id)
# print("successful writes: {}, failed writes: {}".format(successful_writes, failed_writes))
self.assertTrue(successful_writes > 0)
flipped_reg_values = self.read_register_values(
gpr_reg_infos, endian, thread_id=thread_id)
# print("flipped_reg_values: {}".format(flipped_reg_values))
# Restore register values.
self.reset_test_sequence()
self.add_QRestoreRegisterState_packets(state_id, thread_id)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
# Verify registers match initial register values.
final_reg_values = self.read_register_values(
gpr_reg_infos, endian, thread_id=thread_id)
# print("final_reg_values: {}".format(final_reg_values))
self.assertIsNotNone(final_reg_values)
self.assertEqual(final_reg_values, initial_reg_values)
@debugserver_test
def test_grp_register_save_restore_works_with_suffix_debugserver(self):
USE_THREAD_SUFFIX = True
self.init_debugserver_test()
self.build()
self.set_inferior_startup_launch()
self.grp_register_save_restore_works(USE_THREAD_SUFFIX)
@llgs_test
def test_grp_register_save_restore_works_with_suffix_llgs(self):
USE_THREAD_SUFFIX = True
self.init_llgs_test()
self.build()
self.set_inferior_startup_launch()
self.grp_register_save_restore_works(USE_THREAD_SUFFIX)
@debugserver_test
def test_grp_register_save_restore_works_no_suffix_debugserver(self):
USE_THREAD_SUFFIX = False
self.init_debugserver_test()
self.build()
self.set_inferior_startup_launch()
self.grp_register_save_restore_works(USE_THREAD_SUFFIX)
@llgs_test
def test_grp_register_save_restore_works_no_suffix_llgs(self):
USE_THREAD_SUFFIX = False
self.init_llgs_test()
self.build()
self.set_inferior_startup_launch()
self.grp_register_save_restore_works(USE_THREAD_SUFFIX)

View File

@@ -0,0 +1,41 @@
from __future__ import print_function
import gdbremote_testcase
from lldbsuite.test.decorators import *
from lldbsuite.test.lldbtest import *
from lldbsuite.test import lldbutil
class TestGdbRemoteSingleStep(gdbremote_testcase.GdbRemoteTestCaseBase):
mydir = TestBase.compute_mydir(__file__)
@skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
@debugserver_test
def test_single_step_only_steps_one_instruction_with_s_debugserver(self):
self.init_debugserver_test()
self.build()
self.set_inferior_startup_launch()
self.single_step_only_steps_one_instruction(
use_Hc_packet=True, step_instruction="s")
@llgs_test
@expectedFailureAndroid(
bugnumber="llvm.org/pr24739",
archs=[
"arm",
"aarch64"])
@expectedFailureAll(
oslist=["linux"],
archs=[
"arm",
"aarch64"],
bugnumber="llvm.org/pr24739")
@skipIf(triple='^mips')
def test_single_step_only_steps_one_instruction_with_s_llgs(self):
self.init_llgs_test()
self.build()
self.set_inferior_startup_launch()
self.single_step_only_steps_one_instruction(
use_Hc_packet=True, step_instruction="s")

View File

@@ -0,0 +1,303 @@
from __future__ import print_function
import json
import re
import gdbremote_testcase
from lldbsuite.test.decorators import *
from lldbsuite.test.lldbtest import *
from lldbsuite.test import lldbutil
class TestGdbRemoteThreadsInStopReply(
gdbremote_testcase.GdbRemoteTestCaseBase):
mydir = TestBase.compute_mydir(__file__)
ENABLE_THREADS_IN_STOP_REPLY_ENTRIES = [
"read packet: $QListThreadsInStopReply#21",
"send packet: $OK#00",
]
def gather_stop_reply_fields(self, post_startup_log_lines, thread_count,
field_names):
# Set up the inferior args.
inferior_args = []
for i in range(thread_count - 1):
inferior_args.append("thread:new")
inferior_args.append("sleep:10")
procs = self.prep_debug_monitor_and_inferior(
inferior_args=inferior_args)
self.add_register_info_collection_packets()
self.add_process_info_collection_packets()
# Assumes test_sequence has anything added needed to setup the initial state.
# (Like optionally enabling QThreadsInStopReply.)
if post_startup_log_lines:
self.test_sequence.add_log_lines(post_startup_log_lines, True)
self.test_sequence.add_log_lines([
"read packet: $c#63"
], True)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
hw_info = self.parse_hw_info(context)
# Give threads time to start up, then break.
time.sleep(1)
self.reset_test_sequence()
self.test_sequence.add_log_lines(
[
"read packet: {}".format(
chr(3)),
{
"direction": "send",
"regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$",
"capture": {
1: "stop_result",
2: "key_vals_text"}},
],
True)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
# Wait until all threads have started.
threads = self.wait_for_thread_count(thread_count, timeout_seconds=3)
self.assertIsNotNone(threads)
self.assertEqual(len(threads), thread_count)
# Run, then stop the process, grab the stop reply content.
self.reset_test_sequence()
self.test_sequence.add_log_lines(["read packet: $c#63",
"read packet: {}".format(chr(3)),
{"direction": "send",
"regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$",
"capture": {1: "stop_result",
2: "key_vals_text"}},
],
True)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
# Parse the stop reply contents.
key_vals_text = context.get("key_vals_text")
self.assertIsNotNone(key_vals_text)
kv_dict = self.parse_key_val_dict(key_vals_text)
self.assertIsNotNone(kv_dict)
result = dict();
result["pc_register"] = hw_info["pc_register"]
result["little_endian"] = hw_info["little_endian"]
for key_field in field_names:
result[key_field] = kv_dict.get(key_field)
return result
def gather_stop_reply_threads(self, post_startup_log_lines, thread_count):
# Pull out threads from stop response.
stop_reply_threads_text = self.gather_stop_reply_fields(
post_startup_log_lines, thread_count, ["threads"])["threads"]
if stop_reply_threads_text:
return [int(thread_id, 16)
for thread_id in stop_reply_threads_text.split(",")]
else:
return []
def gather_stop_reply_pcs(self, post_startup_log_lines, thread_count):
results = self.gather_stop_reply_fields( post_startup_log_lines,
thread_count, ["threads", "thread-pcs"])
if not results:
return []
threads_text = results["threads"]
pcs_text = results["thread-pcs"]
thread_ids = threads_text.split(",")
pcs = pcs_text.split(",")
self.assertTrue(len(thread_ids) == len(pcs))
thread_pcs = dict()
for i in range(0, len(pcs)):
thread_pcs[int(thread_ids[i], 16)] = pcs[i]
result = dict()
result["thread_pcs"] = thread_pcs
result["pc_register"] = results["pc_register"]
result["little_endian"] = results["little_endian"]
return result
def switch_endian(self, egg):
return "".join(reversed(re.findall("..", egg)))
def parse_hw_info(self, context):
self.assertIsNotNone(context)
process_info = self.parse_process_info_response(context)
endian = process_info.get("endian")
reg_info = self.parse_register_info_packets(context)
(pc_lldb_reg_index, pc_reg_info) = self.find_pc_reg_info(reg_info)
hw_info = dict()
hw_info["pc_register"] = pc_lldb_reg_index
hw_info["little_endian"] = (endian == "little")
return hw_info
def gather_threads_info_pcs(self, pc_register, little_endian):
self.reset_test_sequence()
self.test_sequence.add_log_lines(
[
"read packet: $jThreadsInfo#c1",
{
"direction": "send",
"regex": r"^\$(.*)#[0-9a-fA-F]{2}$",
"capture": {
1: "threads_info"}},
],
True)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
threads_info = context.get("threads_info")
register = str(pc_register)
# The jThreadsInfo response is not valid JSON data, so we have to
# clean it up first.
jthreads_info = json.loads(re.sub(r"}]", "}", threads_info))
thread_pcs = dict()
for thread_info in jthreads_info:
tid = thread_info["tid"]
pc = thread_info["registers"][register]
thread_pcs[tid] = self.switch_endian(pc) if little_endian else pc
return thread_pcs
def QListThreadsInStopReply_supported(self):
procs = self.prep_debug_monitor_and_inferior()
self.test_sequence.add_log_lines(
self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, True)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
@skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
@debugserver_test
def test_QListThreadsInStopReply_supported_debugserver(self):
self.init_debugserver_test()
self.build()
self.set_inferior_startup_launch()
self.QListThreadsInStopReply_supported()
@llgs_test
def test_QListThreadsInStopReply_supported_llgs(self):
self.init_llgs_test()
self.build()
self.set_inferior_startup_launch()
self.QListThreadsInStopReply_supported()
def stop_reply_reports_multiple_threads(self, thread_count):
# Gather threads from stop notification when QThreadsInStopReply is
# enabled.
stop_reply_threads = self.gather_stop_reply_threads(
self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, thread_count)
self.assertEqual(len(stop_reply_threads), thread_count)
@skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
@debugserver_test
def test_stop_reply_reports_multiple_threads_debugserver(self):
self.init_debugserver_test()
self.build()
self.set_inferior_startup_launch()
self.stop_reply_reports_multiple_threads(5)
@llgs_test
def test_stop_reply_reports_multiple_threads_llgs(self):
self.init_llgs_test()
self.build()
self.set_inferior_startup_launch()
self.stop_reply_reports_multiple_threads(5)
def no_QListThreadsInStopReply_supplies_no_threads(self, thread_count):
# Gather threads from stop notification when QThreadsInStopReply is not
# enabled.
stop_reply_threads = self.gather_stop_reply_threads(None, thread_count)
self.assertEqual(len(stop_reply_threads), 0)
@skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
@debugserver_test
def test_no_QListThreadsInStopReply_supplies_no_threads_debugserver(self):
self.init_debugserver_test()
self.build()
self.set_inferior_startup_launch()
self.no_QListThreadsInStopReply_supplies_no_threads(5)
@llgs_test
def test_no_QListThreadsInStopReply_supplies_no_threads_llgs(self):
self.init_llgs_test()
self.build()
self.set_inferior_startup_launch()
self.no_QListThreadsInStopReply_supplies_no_threads(5)
def stop_reply_reports_correct_threads(self, thread_count):
# Gather threads from stop notification when QThreadsInStopReply is
# enabled.
stop_reply_threads = self.gather_stop_reply_threads(
self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, thread_count)
self.assertEqual(len(stop_reply_threads), thread_count)
# Gather threads from q{f,s}ThreadInfo.
self.reset_test_sequence()
self.add_threadinfo_collection_packets()
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
threads = self.parse_threadinfo_packets(context)
self.assertIsNotNone(threads)
self.assertEqual(len(threads), thread_count)
# Ensure each thread in q{f,s}ThreadInfo appears in stop reply threads
for tid in threads:
self.assertTrue(tid in stop_reply_threads)
@skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
@debugserver_test
def test_stop_reply_reports_correct_threads_debugserver(self):
self.init_debugserver_test()
self.build()
self.set_inferior_startup_launch()
self.stop_reply_reports_correct_threads(5)
@llgs_test
def test_stop_reply_reports_correct_threads_llgs(self):
self.init_llgs_test()
self.build()
self.set_inferior_startup_launch()
self.stop_reply_reports_correct_threads(5)
def stop_reply_contains_thread_pcs(self, thread_count):
results = self.gather_stop_reply_pcs(
self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, thread_count)
stop_reply_pcs = results["thread_pcs"]
pc_register = results["pc_register"]
little_endian = results["little_endian"]
self.assertEqual(len(stop_reply_pcs), thread_count)
threads_info_pcs = self.gather_threads_info_pcs(pc_register,
little_endian)
self.assertEqual(len(threads_info_pcs), thread_count)
for thread_id in stop_reply_pcs:
self.assertTrue(thread_id in threads_info_pcs)
self.assertTrue(int(stop_reply_pcs[thread_id], 16)
== int(threads_info_pcs[thread_id], 16))
@llgs_test
def test_stop_reply_contains_thread_pcs_llgs(self):
self.init_llgs_test()
self.build()
self.set_inferior_startup_launch()
self.stop_reply_contains_thread_pcs(5)
@skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
@debugserver_test
def test_stop_reply_contains_thread_pcs_debugserver(self):
self.init_debugserver_test()
self.build()
self.set_inferior_startup_launch()
self.stop_reply_contains_thread_pcs(5)

View File

@@ -0,0 +1,182 @@
from __future__ import print_function
import sys
import unittest2
import gdbremote_testcase
from lldbsuite.test.decorators import *
from lldbsuite.test.lldbtest import *
from lldbsuite.test import lldbutil
class TestGdbRemote_qThreadStopInfo(gdbremote_testcase.GdbRemoteTestCaseBase):
mydir = TestBase.compute_mydir(__file__)
THREAD_COUNT = 5
@skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
@skipIfDarwinEmbedded # <rdar://problem/27005337>
def gather_stop_replies_via_qThreadStopInfo(self, thread_count):
# Set up the inferior args.
inferior_args = []
for i in range(thread_count - 1):
inferior_args.append("thread:new")
inferior_args.append("sleep:10")
procs = self.prep_debug_monitor_and_inferior(
inferior_args=inferior_args)
# Assumes test_sequence has anything added needed to setup the initial state.
# (Like optionally enabling QThreadsInStopReply.)
self.test_sequence.add_log_lines([
"read packet: $c#63"
], True)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
# Give threads time to start up, then break.
time.sleep(1)
self.reset_test_sequence()
self.test_sequence.add_log_lines(
[
"read packet: {}".format(
chr(3)),
{
"direction": "send",
"regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$",
"capture": {
1: "stop_result",
2: "key_vals_text"}},
],
True)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
# Wait until all threads have started.
threads = self.wait_for_thread_count(thread_count, timeout_seconds=3)
self.assertIsNotNone(threads)
self.assertEqual(len(threads), thread_count)
# Grab stop reply for each thread via qThreadStopInfo{tid:hex}.
stop_replies = {}
thread_dicts = {}
for thread in threads:
# Run the qThreadStopInfo command.
self.reset_test_sequence()
self.test_sequence.add_log_lines(
[
"read packet: $qThreadStopInfo{:x}#00".format(thread),
{
"direction": "send",
"regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$",
"capture": {
1: "stop_result",
2: "key_vals_text"}},
],
True)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
# Parse stop reply contents.
key_vals_text = context.get("key_vals_text")
self.assertIsNotNone(key_vals_text)
kv_dict = self.parse_key_val_dict(key_vals_text)
self.assertIsNotNone(kv_dict)
# Verify there is a thread and that it matches the expected thread
# id.
kv_thread = kv_dict.get("thread")
self.assertIsNotNone(kv_thread)
kv_thread_id = int(kv_thread, 16)
self.assertEqual(kv_thread_id, thread)
# Grab the stop id reported.
stop_result_text = context.get("stop_result")
self.assertIsNotNone(stop_result_text)
stop_replies[kv_thread_id] = int(stop_result_text, 16)
# Hang on to the key-val dictionary for the thread.
thread_dicts[kv_thread_id] = kv_dict
return (stop_replies, thread_dicts)
def qThreadStopInfo_works_for_multiple_threads(self, thread_count):
(stop_replies, _) = self.gather_stop_replies_via_qThreadStopInfo(thread_count)
self.assertEqual(len(stop_replies), thread_count)
@debugserver_test
def test_qThreadStopInfo_works_for_multiple_threads_debugserver(self):
self.init_debugserver_test()
self.build()
self.set_inferior_startup_launch()
self.qThreadStopInfo_works_for_multiple_threads(self.THREAD_COUNT)
@llgs_test
def test_qThreadStopInfo_works_for_multiple_threads_llgs(self):
self.init_llgs_test()
self.build()
self.set_inferior_startup_launch()
self.qThreadStopInfo_works_for_multiple_threads(self.THREAD_COUNT)
def qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt(
self, thread_count):
(stop_replies, _) = self.gather_stop_replies_via_qThreadStopInfo(thread_count)
self.assertIsNotNone(stop_replies)
no_stop_reason_count = sum(
1 for stop_reason in list(
stop_replies.values()) if stop_reason == 0)
with_stop_reason_count = sum(
1 for stop_reason in list(
stop_replies.values()) if stop_reason != 0)
# All but one thread should report no stop reason.
self.assertEqual(no_stop_reason_count, thread_count - 1)
# Only one thread should should indicate a stop reason.
self.assertEqual(with_stop_reason_count, 1)
@debugserver_test
def test_qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt_debugserver(
self):
self.init_debugserver_test()
self.build()
self.set_inferior_startup_launch()
self.qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt(
self.THREAD_COUNT)
@llgs_test
def test_qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt_llgs(
self):
self.init_llgs_test()
self.build()
self.set_inferior_startup_launch()
self.qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt(
self.THREAD_COUNT)
def qThreadStopInfo_has_valid_thread_names(
self, thread_count, expected_thread_name):
(_, thread_dicts) = self.gather_stop_replies_via_qThreadStopInfo(thread_count)
self.assertIsNotNone(thread_dicts)
for thread_dict in list(thread_dicts.values()):
name = thread_dict.get("name")
self.assertIsNotNone(name)
self.assertEqual(name, expected_thread_name)
@unittest2.skip("MacOSX doesn't have a default thread name")
@debugserver_test
def test_qThreadStopInfo_has_valid_thread_names_debugserver(self):
self.init_debugserver_test()
self.build()
self.set_inferior_startup_launch()
self.qThreadStopInfo_has_valid_thread_names(self.THREAD_COUNT, "a.out")
# test requires OS with set, equal thread names by default.
@skipUnlessPlatform(["linux"])
@llgs_test
def test_qThreadStopInfo_has_valid_thread_names_llgs(self):
self.init_llgs_test()
self.build()
self.set_inferior_startup_launch()
self.qThreadStopInfo_has_valid_thread_names(self.THREAD_COUNT, "a.out")

View File

@@ -0,0 +1,159 @@
from __future__ import print_function
import gdbremote_testcase
from lldbsuite.test.decorators import *
from lldbsuite.test.lldbtest import *
from lldbsuite.test import lldbutil
class TestGdbRemote_vCont(gdbremote_testcase.GdbRemoteTestCaseBase):
mydir = TestBase.compute_mydir(__file__)
def vCont_supports_mode(self, mode, inferior_args=None):
# Setup the stub and set the gdb remote command stream.
procs = self.prep_debug_monitor_and_inferior(
inferior_args=inferior_args)
self.add_vCont_query_packets()
# Run the gdb remote command stream.
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
# Pull out supported modes.
supported_vCont_modes = self.parse_vCont_query_response(context)
self.assertIsNotNone(supported_vCont_modes)
# Verify we support the given mode.
self.assertTrue(mode in supported_vCont_modes)
def vCont_supports_c(self):
self.vCont_supports_mode("c")
def vCont_supports_C(self):
self.vCont_supports_mode("C")
def vCont_supports_s(self):
self.vCont_supports_mode("s")
def vCont_supports_S(self):
self.vCont_supports_mode("S")
@expectedFailureAll(oslist=["ios", "tvos", "watchos", "bridgeos"], bugnumber="rdar://27005337")
@debugserver_test
def test_vCont_supports_c_debugserver(self):
self.init_debugserver_test()
self.build()
self.vCont_supports_c()
@expectedFailureAll(oslist=["ios", "tvos", "watchos", "bridgeos"], bugnumber="rdar://27005337")
@llgs_test
def test_vCont_supports_c_llgs(self):
self.init_llgs_test()
self.build()
self.vCont_supports_c()
@expectedFailureAll(oslist=["ios", "tvos", "watchos", "bridgeos"], bugnumber="rdar://27005337")
@debugserver_test
def test_vCont_supports_C_debugserver(self):
self.init_debugserver_test()
self.build()
self.vCont_supports_C()
@expectedFailureAll(oslist=["ios", "tvos", "watchos", "bridgeos"], bugnumber="rdar://27005337")
@llgs_test
def test_vCont_supports_C_llgs(self):
self.init_llgs_test()
self.build()
self.vCont_supports_C()
@expectedFailureAll(oslist=["ios", "tvos", "watchos", "bridgeos"], bugnumber="rdar://27005337")
@debugserver_test
def test_vCont_supports_s_debugserver(self):
self.init_debugserver_test()
self.build()
self.vCont_supports_s()
@expectedFailureAll(oslist=["ios", "tvos", "watchos", "bridgeos"], bugnumber="rdar://27005337")
@llgs_test
def test_vCont_supports_s_llgs(self):
self.init_llgs_test()
self.build()
self.vCont_supports_s()
@expectedFailureAll(oslist=["ios", "tvos", "watchos", "bridgeos"], bugnumber="rdar://27005337")
@debugserver_test
def test_vCont_supports_S_debugserver(self):
self.init_debugserver_test()
self.build()
self.vCont_supports_S()
@expectedFailureAll(oslist=["ios", "tvos", "watchos", "bridgeos"], bugnumber="rdar://27005337")
@llgs_test
def test_vCont_supports_S_llgs(self):
self.init_llgs_test()
self.build()
self.vCont_supports_S()
@expectedFailureAll(oslist=["ios", "tvos", "watchos", "bridgeos"], bugnumber="rdar://27005337")
@debugserver_test
def test_single_step_only_steps_one_instruction_with_Hc_vCont_s_debugserver(
self):
self.init_debugserver_test()
self.build()
self.set_inferior_startup_launch()
self.single_step_only_steps_one_instruction(
use_Hc_packet=True, step_instruction="vCont;s")
@llgs_test
@expectedFailureAndroid(
bugnumber="llvm.org/pr24739",
archs=[
"arm",
"aarch64"])
@expectedFailureAll(
oslist=["linux"],
archs=[
"arm",
"aarch64"],
bugnumber="llvm.org/pr24739")
@skipIf(triple='^mips')
@expectedFailureAll(oslist=["ios", "tvos", "watchos", "bridgeos"], bugnumber="rdar://27005337")
def test_single_step_only_steps_one_instruction_with_Hc_vCont_s_llgs(self):
self.init_llgs_test()
self.build()
self.set_inferior_startup_launch()
self.single_step_only_steps_one_instruction(
use_Hc_packet=True, step_instruction="vCont;s")
@expectedFailureAll(oslist=["ios", "tvos", "watchos", "bridgeos"], bugnumber="rdar://27005337")
@debugserver_test
def test_single_step_only_steps_one_instruction_with_vCont_s_thread_debugserver(
self):
self.init_debugserver_test()
self.build()
self.set_inferior_startup_launch()
self.single_step_only_steps_one_instruction(
use_Hc_packet=False, step_instruction="vCont;s:{thread}")
@llgs_test
@expectedFailureAndroid(
bugnumber="llvm.org/pr24739",
archs=[
"arm",
"aarch64"])
@expectedFailureAll(
oslist=["linux"],
archs=[
"arm",
"aarch64"],
bugnumber="llvm.org/pr24739")
@skipIf(triple='^mips')
@expectedFailureAll(oslist=["ios", "tvos", "watchos", "bridgeos"], bugnumber="rdar://27005337")
def test_single_step_only_steps_one_instruction_with_vCont_s_thread_llgs(
self):
self.init_llgs_test()
self.build()
self.set_inferior_startup_launch()
self.single_step_only_steps_one_instruction(
use_Hc_packet=False, step_instruction="vCont;s:{thread}")

View File

@@ -0,0 +1,98 @@
from __future__ import print_function
import gdbremote_testcase
import lldbgdbserverutils
import re
import select
import socket
import time
from lldbsuite.test.decorators import *
from lldbsuite.test.lldbtest import *
from lldbsuite.test import lldbutil
class TestStubReverseConnect(gdbremote_testcase.GdbRemoteTestCaseBase):
mydir = TestBase.compute_mydir(__file__)
_DEFAULT_TIMEOUT = 20
def setUp(self):
# Set up the test.
gdbremote_testcase.GdbRemoteTestCaseBase.setUp(self)
# Create a listener on a local port.
self.listener_socket = self.create_listener_socket()
self.assertIsNotNone(self.listener_socket)
self.listener_port = self.listener_socket.getsockname()[1]
def create_listener_socket(self, timeout_seconds=_DEFAULT_TIMEOUT):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.assertIsNotNone(sock)
sock.settimeout(timeout_seconds)
sock.bind(("127.0.0.1", 0))
sock.listen(1)
def tear_down_listener():
try:
sock.shutdown(socket.SHUT_RDWR)
except:
# ignore
None
self.addTearDownHook(tear_down_listener)
return sock
def reverse_connect_works(self):
# Indicate stub startup should do a reverse connect.
appended_stub_args = ["--reverse-connect"]
if self.debug_monitor_extra_args:
self.debug_monitor_extra_args += appended_stub_args
else:
self.debug_monitor_extra_args = appended_stub_args
self.stub_hostname = "127.0.0.1"
self.port = self.listener_port
triple = self.dbg.GetSelectedPlatform().GetTriple()
if re.match(".*-.*-.*-android", triple):
self.forward_adb_port(
self.port,
self.port,
"reverse",
self.stub_device)
# Start the stub.
server = self.launch_debug_monitor(logfile=sys.stdout)
self.assertIsNotNone(server)
self.assertTrue(
lldbgdbserverutils.process_is_running(
server.pid, True))
# Listen for the stub's connection to us.
(stub_socket, address) = self.listener_socket.accept()
self.assertIsNotNone(stub_socket)
self.assertIsNotNone(address)
print("connected to stub {} on {}".format(
address, stub_socket.getsockname()))
# Verify we can do the handshake. If that works, we'll call it good.
self.do_handshake(stub_socket, timeout_seconds=self._DEFAULT_TIMEOUT)
# Clean up.
stub_socket.shutdown(socket.SHUT_RDWR)
@debugserver_test
@skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
def test_reverse_connect_works_debugserver(self):
self.init_debugserver_test(use_named_pipe=False)
self.set_inferior_startup_launch()
self.reverse_connect_works()
@llgs_test
@skipIfRemote # reverse connect is not a supported use case for now
def test_reverse_connect_works_llgs(self):
self.init_llgs_test(use_named_pipe=False)
self.set_inferior_startup_launch()
self.reverse_connect_works()

View File

@@ -0,0 +1,86 @@
from __future__ import print_function
import gdbremote_testcase
import lldbgdbserverutils
import os
import select
import tempfile
import time
from lldbsuite.test.decorators import *
from lldbsuite.test.lldbtest import *
from lldbsuite.test import lldbutil
class TestStubSetSIDTestCase(gdbremote_testcase.GdbRemoteTestCaseBase):
mydir = TestBase.compute_mydir(__file__)
def get_stub_sid(self, extra_stub_args=None):
# Launch debugserver
if extra_stub_args:
self.debug_monitor_extra_args += extra_stub_args
server = self.launch_debug_monitor()
self.assertIsNotNone(server)
self.assertTrue(
lldbgdbserverutils.process_is_running(
server.pid, True))
# Get the process id for the stub.
return os.getsid(server.pid)
def sid_is_same_without_setsid(self):
stub_sid = self.get_stub_sid()
self.assertEqual(stub_sid, os.getsid(0))
def sid_is_different_with_setsid(self):
stub_sid = self.get_stub_sid(["--setsid"])
self.assertNotEqual(stub_sid, os.getsid(0))
def sid_is_different_with_S(self):
stub_sid = self.get_stub_sid(["-S"])
self.assertNotEqual(stub_sid, os.getsid(0))
@debugserver_test
@skipIfRemote # --setsid not used on remote platform and currently it is also impossible to get the sid of lldb-platform running on a remote target
def test_sid_is_same_without_setsid_debugserver(self):
self.init_debugserver_test()
self.set_inferior_startup_launch()
self.sid_is_same_without_setsid()
@llgs_test
@skipIfRemote # --setsid not used on remote platform and currently it is also impossible to get the sid of lldb-platform running on a remote target
@expectedFailureAll(oslist=['freebsd'])
def test_sid_is_same_without_setsid_llgs(self):
self.init_llgs_test()
self.set_inferior_startup_launch()
self.sid_is_same_without_setsid()
@debugserver_test
@skipIfRemote # --setsid not used on remote platform and currently it is also impossible to get the sid of lldb-platform running on a remote target
def test_sid_is_different_with_setsid_debugserver(self):
self.init_debugserver_test()
self.set_inferior_startup_launch()
self.sid_is_different_with_setsid()
@llgs_test
@skipIfRemote # --setsid not used on remote platform and currently it is also impossible to get the sid of lldb-platform running on a remote target
def test_sid_is_different_with_setsid_llgs(self):
self.init_llgs_test()
self.set_inferior_startup_launch()
self.sid_is_different_with_setsid()
@debugserver_test
@skipIfRemote # --setsid not used on remote platform and currently it is also impossible to get the sid of lldb-platform running on a remote target
def test_sid_is_different_with_S_debugserver(self):
self.init_debugserver_test()
self.set_inferior_startup_launch()
self.sid_is_different_with_S()
@llgs_test
@skipIfRemote # --setsid not used on remote platform and currently it is also impossible to get the sid of lldb-platform running on a remote target
def test_sid_is_different_with_S_llgs(self):
self.init_llgs_test()
self.set_inferior_startup_launch()
self.sid_is_different_with_S()

View File

@@ -0,0 +1,8 @@
LEVEL = ../../../make
override CFLAGS_EXTRAS += -D__STDC_LIMIT_MACROS -D__STDC_FORMAT_MACROS
ENABLE_THREADS := YES
CXX_SOURCES := main.cpp
MAKE_DSYM :=NO
include $(LEVEL)/Makefile.rules

View File

@@ -0,0 +1,127 @@
from __future__ import print_function
# lldb test suite imports
from lldbsuite.test.decorators import *
from lldbsuite.test.lldbtest import TestBase
# gdb-remote-specific imports
import lldbgdbserverutils
from gdbremote_testcase import GdbRemoteTestCaseBase
class TestGdbRemoteExitCode(GdbRemoteTestCaseBase):
mydir = TestBase.compute_mydir(__file__)
FAILED_LAUNCH_CODE = "E08"
def get_launch_fail_reason(self):
self.reset_test_sequence()
self.test_sequence.add_log_lines(
["read packet: $qLaunchSuccess#00"],
True)
self.test_sequence.add_log_lines(
[{"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$",
"capture": {1: "launch_result"}}],
True)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
return context.get("launch_result")[1:]
def start_inferior(self):
launch_args = self.install_and_create_launch_args()
server = self.connect_to_debug_monitor()
self.assertIsNotNone(server)
self.add_no_ack_remote_stream()
self.test_sequence.add_log_lines(
["read packet: %s" % lldbgdbserverutils.build_gdbremote_A_packet(
launch_args)],
True)
self.test_sequence.add_log_lines(
[{"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$",
"capture": {1: "A_result"}}],
True)
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
launch_result = context.get("A_result")
self.assertIsNotNone(launch_result)
if launch_result == self.FAILED_LAUNCH_CODE:
fail_reason = self.get_launch_fail_reason()
self.fail("failed to launch inferior: " + fail_reason)
@debugserver_test
@skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
def test_start_inferior_debugserver(self):
self.init_debugserver_test()
self.build()
self.start_inferior()
@llgs_test
def test_start_inferior_llgs(self):
self.init_llgs_test()
self.build()
self.start_inferior()
def inferior_exit_0(self):
launch_args = self.install_and_create_launch_args()
server = self.connect_to_debug_monitor()
self.assertIsNotNone(server)
self.add_no_ack_remote_stream()
self.add_verified_launch_packets(launch_args)
self.test_sequence.add_log_lines(
["read packet: $vCont;c#a8",
"send packet: $W00#00"],
True)
self.expect_gdbremote_sequence()
@debugserver_test
@skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
def test_inferior_exit_0_debugserver(self):
self.init_debugserver_test()
self.build()
self.inferior_exit_0()
@llgs_test
def test_inferior_exit_0_llgs(self):
self.init_llgs_test()
self.build()
self.inferior_exit_0()
def inferior_exit_42(self):
launch_args = self.install_and_create_launch_args()
server = self.connect_to_debug_monitor()
self.assertIsNotNone(server)
RETVAL = 42
# build launch args
launch_args += ["retval:%d" % RETVAL]
self.add_no_ack_remote_stream()
self.add_verified_launch_packets(launch_args)
self.test_sequence.add_log_lines(
["read packet: $vCont;c#a8",
"send packet: $W{0:02x}#00".format(RETVAL)],
True)
self.expect_gdbremote_sequence()
@debugserver_test
@skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
def test_inferior_exit_42_debugserver(self):
self.init_debugserver_test()
self.build()
self.inferior_exit_42()
@llgs_test
def test_inferior_exit_42_llgs(self):
self.init_llgs_test()
self.build()
self.inferior_exit_42()

View File

@@ -0,0 +1,355 @@
#include <cstdlib>
#include <cstring>
#include <errno.h>
#include <inttypes.h>
#include <memory>
#include <pthread.h>
#include <setjmp.h>
#include <signal.h>
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include <vector>
#if defined(__APPLE__)
__OSX_AVAILABLE_STARTING(__MAC_10_6, __IPHONE_3_2)
int pthread_threadid_np(pthread_t, __uint64_t *);
#elif defined(__linux__)
#include <sys/syscall.h>
#elif defined(__NetBSD__)
#include <lwp.h>
#endif
static const char *const RETVAL_PREFIX = "retval:";
static const char *const SLEEP_PREFIX = "sleep:";
static const char *const STDERR_PREFIX = "stderr:";
static const char *const SET_MESSAGE_PREFIX = "set-message:";
static const char *const PRINT_MESSAGE_COMMAND = "print-message:";
static const char *const GET_DATA_ADDRESS_PREFIX = "get-data-address-hex:";
static const char *const GET_STACK_ADDRESS_COMMAND = "get-stack-address-hex:";
static const char *const GET_HEAP_ADDRESS_COMMAND = "get-heap-address-hex:";
static const char *const GET_CODE_ADDRESS_PREFIX = "get-code-address-hex:";
static const char *const CALL_FUNCTION_PREFIX = "call-function:";
static const char *const THREAD_PREFIX = "thread:";
static const char *const THREAD_COMMAND_NEW = "new";
static const char *const THREAD_COMMAND_PRINT_IDS = "print-ids";
static const char *const THREAD_COMMAND_SEGFAULT = "segfault";
static bool g_print_thread_ids = false;
static pthread_mutex_t g_print_mutex = PTHREAD_MUTEX_INITIALIZER;
static bool g_threads_do_segfault = false;
static pthread_mutex_t g_jump_buffer_mutex = PTHREAD_MUTEX_INITIALIZER;
static jmp_buf g_jump_buffer;
static bool g_is_segfaulting = false;
static char g_message[256];
static volatile char g_c1 = '0';
static volatile char g_c2 = '1';
static void print_thread_id() {
// Put in the right magic here for your platform to spit out the thread id (tid)
// that debugserver/lldb-gdbserver would see as a TID. Otherwise, let the else
// clause print out the unsupported text so that the unit test knows to skip
// verifying thread ids.
#if defined(__APPLE__)
__uint64_t tid = 0;
pthread_threadid_np(pthread_self(), &tid);
printf("%" PRIx64, tid);
#elif defined(__linux__)
// This is a call to gettid() via syscall.
printf("%" PRIx64, static_cast<uint64_t>(syscall(__NR_gettid)));
#elif defined(__NetBSD__)
// Technically lwpid_t is 32-bit signed integer
printf("%" PRIx64, static_cast<uint64_t>(_lwp_self()));
#else
printf("{no-tid-support}");
#endif
}
static void signal_handler(int signo) {
const char *signal_name = nullptr;
switch (signo) {
case SIGUSR1:
signal_name = "SIGUSR1";
break;
case SIGSEGV:
signal_name = "SIGSEGV";
break;
default:
signal_name = nullptr;
}
// Print notice that we received the signal on a given thread.
pthread_mutex_lock(&g_print_mutex);
if (signal_name)
printf("received %s on thread id: ", signal_name);
else
printf("received signo %d (%s) on thread id: ", signo, strsignal(signo));
print_thread_id();
printf("\n");
pthread_mutex_unlock(&g_print_mutex);
// Reset the signal handler if we're one of the expected signal handlers.
switch (signo) {
case SIGSEGV:
if (g_is_segfaulting) {
// Fix up the pointer we're writing to. This needs to happen if nothing
// intercepts the SIGSEGV (i.e. if somebody runs this from the command
// line).
longjmp(g_jump_buffer, 1);
}
break;
case SIGUSR1:
if (g_is_segfaulting) {
// Fix up the pointer we're writing to. This is used to test gdb remote
// signal delivery. A SIGSEGV will be raised when the thread is created,
// switched out for a SIGUSR1, and then this code still needs to fix the
// seg fault. (i.e. if somebody runs this from the command line).
longjmp(g_jump_buffer, 1);
}
break;
}
// Reset the signal handler.
sig_t sig_result = signal(signo, signal_handler);
if (sig_result == SIG_ERR) {
fprintf(stderr, "failed to set signal handler: errno=%d\n", errno);
exit(1);
}
}
static void swap_chars() {
g_c1 = '1';
g_c2 = '0';
g_c1 = '0';
g_c2 = '1';
}
static void hello() {
pthread_mutex_lock(&g_print_mutex);
printf("hello, world\n");
pthread_mutex_unlock(&g_print_mutex);
}
static void *thread_func(void *arg) {
static pthread_mutex_t s_thread_index_mutex = PTHREAD_MUTEX_INITIALIZER;
static int s_thread_index = 1;
pthread_mutex_lock(&s_thread_index_mutex);
const int this_thread_index = s_thread_index++;
pthread_mutex_unlock(&s_thread_index_mutex);
if (g_print_thread_ids) {
pthread_mutex_lock(&g_print_mutex);
printf("thread %d id: ", this_thread_index);
print_thread_id();
printf("\n");
pthread_mutex_unlock(&g_print_mutex);
}
if (g_threads_do_segfault) {
// Sleep for a number of seconds based on the thread index.
// TODO add ability to send commands to test exe so we can
// handle timing more precisely. This is clunky. All we're
// trying to do is add predictability as to the timing of
// signal generation by created threads.
int sleep_seconds = 2 * (this_thread_index - 1);
while (sleep_seconds > 0)
sleep_seconds = sleep(sleep_seconds);
// Test creating a SEGV.
pthread_mutex_lock(&g_jump_buffer_mutex);
g_is_segfaulting = true;
int *bad_p = nullptr;
if (setjmp(g_jump_buffer) == 0) {
// Force a seg fault signal on this thread.
*bad_p = 0;
} else {
// Tell the system we're no longer seg faulting.
// Used by the SIGUSR1 signal handler that we inject
// in place of the SIGSEGV so it only tries to
// recover from the SIGSEGV if this seg fault code
// was in play.
g_is_segfaulting = false;
}
pthread_mutex_unlock(&g_jump_buffer_mutex);
pthread_mutex_lock(&g_print_mutex);
printf("thread ");
print_thread_id();
printf(": past SIGSEGV\n");
pthread_mutex_unlock(&g_print_mutex);
}
int sleep_seconds_remaining = 60;
while (sleep_seconds_remaining > 0) {
sleep_seconds_remaining = sleep(sleep_seconds_remaining);
}
return nullptr;
}
int main(int argc, char **argv) {
lldb_enable_attach();
std::vector<pthread_t> threads;
std::unique_ptr<uint8_t[]> heap_array_up;
int return_value = 0;
// Set the signal handler.
sig_t sig_result = signal(SIGALRM, signal_handler);
if (sig_result == SIG_ERR) {
fprintf(stderr, "failed to set SIGALRM signal handler: errno=%d\n", errno);
exit(1);
}
sig_result = signal(SIGUSR1, signal_handler);
if (sig_result == SIG_ERR) {
fprintf(stderr, "failed to set SIGUSR1 handler: errno=%d\n", errno);
exit(1);
}
sig_result = signal(SIGSEGV, signal_handler);
if (sig_result == SIG_ERR) {
fprintf(stderr, "failed to set SIGUSR1 handler: errno=%d\n", errno);
exit(1);
}
// Process command line args.
for (int i = 1; i < argc; ++i) {
if (std::strstr(argv[i], STDERR_PREFIX)) {
// Treat remainder as text to go to stderr.
fprintf(stderr, "%s\n", (argv[i] + strlen(STDERR_PREFIX)));
} else if (std::strstr(argv[i], RETVAL_PREFIX)) {
// Treat as the return value for the program.
return_value = std::atoi(argv[i] + strlen(RETVAL_PREFIX));
} else if (std::strstr(argv[i], SLEEP_PREFIX)) {
// Treat as the amount of time to have this process sleep (in seconds).
int sleep_seconds_remaining = std::atoi(argv[i] + strlen(SLEEP_PREFIX));
// Loop around, sleeping until all sleep time is used up. Note that
// signals will cause sleep to end early with the number of seconds
// remaining.
for (int i = 0; sleep_seconds_remaining > 0; ++i) {
sleep_seconds_remaining = sleep(sleep_seconds_remaining);
// std::cout << "sleep result (call " << i << "): " <<
// sleep_seconds_remaining << std::endl;
}
} else if (std::strstr(argv[i], SET_MESSAGE_PREFIX)) {
// Copy the contents after "set-message:" to the g_message buffer.
// Used for reading inferior memory and verifying contents match
// expectations.
strncpy(g_message, argv[i] + strlen(SET_MESSAGE_PREFIX),
sizeof(g_message));
// Ensure we're null terminated.
g_message[sizeof(g_message) - 1] = '\0';
} else if (std::strstr(argv[i], PRINT_MESSAGE_COMMAND)) {
pthread_mutex_lock(&g_print_mutex);
printf("message: %s\n", g_message);
pthread_mutex_unlock(&g_print_mutex);
} else if (std::strstr(argv[i], GET_DATA_ADDRESS_PREFIX)) {
volatile void *data_p = nullptr;
if (std::strstr(argv[i] + strlen(GET_DATA_ADDRESS_PREFIX), "g_message"))
data_p = &g_message[0];
else if (std::strstr(argv[i] + strlen(GET_DATA_ADDRESS_PREFIX), "g_c1"))
data_p = &g_c1;
else if (std::strstr(argv[i] + strlen(GET_DATA_ADDRESS_PREFIX), "g_c2"))
data_p = &g_c2;
pthread_mutex_lock(&g_print_mutex);
printf("data address: %p\n", data_p);
pthread_mutex_unlock(&g_print_mutex);
} else if (std::strstr(argv[i], GET_HEAP_ADDRESS_COMMAND)) {
// Create a byte array if not already present.
if (!heap_array_up)
heap_array_up.reset(new uint8_t[32]);
pthread_mutex_lock(&g_print_mutex);
printf("heap address: %p\n", heap_array_up.get());
pthread_mutex_unlock(&g_print_mutex);
} else if (std::strstr(argv[i], GET_STACK_ADDRESS_COMMAND)) {
pthread_mutex_lock(&g_print_mutex);
printf("stack address: %p\n", &return_value);
pthread_mutex_unlock(&g_print_mutex);
} else if (std::strstr(argv[i], GET_CODE_ADDRESS_PREFIX)) {
void (*func_p)() = nullptr;
if (std::strstr(argv[i] + strlen(GET_CODE_ADDRESS_PREFIX), "hello"))
func_p = hello;
else if (std::strstr(argv[i] + strlen(GET_CODE_ADDRESS_PREFIX),
"swap_chars"))
func_p = swap_chars;
pthread_mutex_lock(&g_print_mutex);
printf("code address: %p\n", func_p);
pthread_mutex_unlock(&g_print_mutex);
} else if (std::strstr(argv[i], CALL_FUNCTION_PREFIX)) {
// Defaut to providing the address of main.
if (std::strcmp(argv[i] + strlen(CALL_FUNCTION_PREFIX), "hello") == 0)
hello();
else if (std::strcmp(argv[i] + strlen(CALL_FUNCTION_PREFIX),
"swap_chars") == 0)
swap_chars();
else {
pthread_mutex_lock(&g_print_mutex);
printf("unknown function: %s\n",
argv[i] + strlen(CALL_FUNCTION_PREFIX));
pthread_mutex_unlock(&g_print_mutex);
}
} else if (std::strstr(argv[i], THREAD_PREFIX)) {
// Check if we're creating a new thread.
if (std::strstr(argv[i] + strlen(THREAD_PREFIX), THREAD_COMMAND_NEW)) {
// Create a new thread.
pthread_t new_thread;
const int err =
::pthread_create(&new_thread, nullptr, thread_func, nullptr);
if (err) {
fprintf(stderr, "pthread_create() failed with error code %d\n", err);
exit(err);
}
threads.push_back(new_thread);
} else if (std::strstr(argv[i] + strlen(THREAD_PREFIX),
THREAD_COMMAND_PRINT_IDS)) {
// Turn on thread id announcing.
g_print_thread_ids = true;
// And announce us.
pthread_mutex_lock(&g_print_mutex);
printf("thread 0 id: ");
print_thread_id();
printf("\n");
pthread_mutex_unlock(&g_print_mutex);
} else if (std::strstr(argv[i] + strlen(THREAD_PREFIX),
THREAD_COMMAND_SEGFAULT)) {
g_threads_do_segfault = true;
} else {
// At this point we don't do anything else with threads.
// Later use thread index and send command to thread.
}
} else {
// Treat the argument as text for stdout.
printf("%s\n", argv[i]);
}
}
// If we launched any threads, join them
for (std::vector<pthread_t>::iterator it = threads.begin();
it != threads.end(); ++it) {
void *thread_retval = nullptr;
const int err = ::pthread_join(*it, &thread_retval);
if (err != 0)
fprintf(stderr, "pthread_join() failed with error code %d\n", err);
}
return return_value;
}

Some files were not shown because too many files have changed in this diff Show More