"""Piotr API
This module allows to interact with/instrument Piotr virtual devices in a
pythonic way. It exposes a main class, :py:class:`Piotr`, that is able to
interact with running instances and to automate things.
"""
import re
import signal
import logging
import psutil
from os import kill, killpg
from os.path import basename
from time import sleep
from piotr.cmdline import CmdlineModule, module, command
from piotr.user import UserDirectory as ud
from piotr.util import confirm, enforce_root
from piotr.qemu import QemuGuestEnum, QemuPlatforms
from piotr.qemu.guest import QemuGuest, QemuGuestNotFound
from piotr.device import Device as _Device, create_device
from piotr.package import package_create, package_import, BadPackageError
from piotr.exceptions import DeviceConfigError
GDB_PROT_DONE = 'done'
GDB_PROT_CONN = 'connected'
GDB_PROT_RUN = 'running'
[docs]class DeviceNotFound(Exception):
pass
# import avatar2 if available
try:
from avatar2.protocols.gdb import GDBProtocol
from avatar2.targets import TargetStates
from avatar2.message import AvatarMessage, UpdateStateMessage, BreakpointHitMessage
use_avatar2 = True
class Debugger(object):
"""This class is a wrapper for avatar2's GDBProtocol class that will handle
gdb-multiarch.
"""
TARGET_UNKNOWN=-1
TARGET_STOPPED=0
TARGET_RUNNING=1
TARGET_EXITED=2
def __init__(self, inst_name, pid, ip=None, port=4444, gdb='gdb-multiarch'):
self.log = logging.getLogger('gdb-%s-%d' % (inst_name, pid))
self.state = self.TARGET_UNKNOWN
self.__dbg_ip = '127.0.0.1' if ip is None else ip
self.__port = port
self.__debugger = GDBProtocol(
origin=self,
gdb_executable=gdb,
async_message_handler=self.on_debugger_msg
)
self.__debugger.remote_connect(self.__dbg_ip, self.__port)
self.__pid = pid
self.__gdb_executable = gdb
def __str__(self):
return 'Debugger(pid=%d, gdb="%s", ip="", port=%d)' % (
self.__pid,
self.__gdb_executable,
self.dbg_ip,
self.__port
)
def __repr__(self):
return str(self)
def on_debugger_msg(self, message):
"""Process async message sent by gdb
:param message: message
"""
if isinstance(message, UpdateStateMessage):
if message.state == TargetStates.RUNNING:
self.state = self.TARGET_RUNNING
elif message.state == TargetStates.STOPPED:
self.state = self.TARGET_STOPPED
elif message.state == TargetStates.EXITED:
self.state = self.TARGET_EXITED
elif isinstance(message, BreakpointHitMessage):
# breakpoint has been hit, target is stopped
self.state = self.TARGET_STOPPED
def wait(self, blocking=True):
"""Wait for target to be stopped."""
while blocking:
if self.state == self.TARGET_STOPPED or self.state == self.TARGET_EXITED:
break
sleep(0.001)
def cont(self, blocking=True):
"""Continue execution."""
if self.__debugger.cont():
while blocking:
if self.state == self.TARGET_RUNNING:
break
sleep(0.001)
return True
else:
return False
def stop(self):
"""Stop execution of target."""
self.__debugger.stop()
def set_abi(self, abi):
"""Set ABI."""
self.__debugger.set_abi(abi)
def set_breakpoint(self, line, hardware=False, temporary=False, regex=False, condition=None, ignore_count=0, thread=0, pending=False):
"""Inserts a breakpoint
:param hardware: Hardware breakpoint
:type hardware: bool
:param temporary: Tempory breakpoint
:type temporary: bool
:param regex: If set, inserts breakpoints matching the regex
:type regex: str
:param condition: If set, inserts a breakpoint with specified condition
:type condition: str
:param ignore_count: Amount of times the bp should be ignored
:type ignore_count: int
:param thread: Threadno in which this breakpoints should be added
:type thread: int
:returns: The number of the breakpoint
"""
return self.debugger.set_breakpoint(
line,
hardware=hardware,
temporary=temporary,
regex=regex,
condition=condition,
ignore_count=ignore_count,
thread=thread,
pending=pending
)
def remove_breakpoint(self, bkpt):
"""Deletes a breakpoint"""
return self.__debugger.remove_breakpoint(bkpt)
def write_memory(self, address, wordsize, val, num_words=1, raw=False):
"""Writes memory
:param address: Address to write to
:param wordsize: the size of the write (1, 2, 4 or 8)
:param val: the written value
:type val: int if num_words == 1 and raw == False
list if num_words > 1 and raw == False
str or byte if raw == True
:param num_words: The amount of words to read
:param raw: Specifies whether to write in raw or word mode
:returns: True on success else False
"""
return self.__debugger.write_memory(
address,
wordsize,
val,
num_words=num_words,
raw=raw
)
def read_memory(self, address, wordsize=4, num_words=1, raw=False):
"""reads memory
:param address: Address to write to
:param wordsize: the size of a read word (1, 2, 4 or 8)
:param num_words: the amount of read words
:param raw: Whether the read memory should be returned unprocessed
:return: The read memory
"""
return self.__debugger.read_memory(
address,
wordsize=wordsize,
num_words=num_words,
raw=raw
)
def read_register(self, reg):
"""Read register"""
return self.__debugger.read_register(reg)
def write_register(self, reg, value):
return self.__debugger.write_register(reg, value)
def step(self):
return self.__debugger.step()
def console_command(self, cmd, rexpect=GDB_PROT_DONE):
return self.__debugger.console_command(cmd, rexpect=rexpect)
def get_symbol(self, symbol):
return self.__debugger.get_symbol(symbol)
def set_gdb_variable(self, variable, value):
return self.__debugger.set_gdb_variable(variable, value)
def set_fork_mode(self, mode='child'):
return self.__debugger.set_gdb_variable(
'follow-fork-mode',
mode
)
except Exception as exc:
use_avatar2 = False
class Debugger(object):
def __init__(self, inst_name, pid, ip=None, port=4444, gdb='gdb-multiarch'):
raise Exception('Debugger class cannot be used because Avatar2 is not installed.')
[docs]class Process:
"""This class holds information about a process on the emulated system."""
def __init__(self, pid=-1, user=None, path=None):
self.__pid = pid
self.__user = user
self.__path = path
def __repr__(self):
return str(self)
def __str__(self):
return 'Process(pid=%d, user="%s", path="%s")' % (
self.__pid,
self.__user if self.__user is not None else '?',
self.__path if self.__path is not None else '?'
)
@property
def pid(self):
return self.__pid
@property
def user(self):
return self.__user
@property
def path(self):
return self.__path
[docs]class Instance:
"""
This class represents a Piotr running device instance and allows to interact
with it, execute commands into the emulated host and target (if Qemu agent
is supported by the device), enumerate and manipulate process.
:param guest: Qemu guest
:type guest: piotr.qemu.QemuPlatform
:param device: Name of parent device, as referenced in Piotr devices list
:type device: str
"""
def __init__(self, guest, device):
self.__guest= guest
self.__pid = guest.get_pid()
self.__device = Device(device)
self.__sock = guest.get_sock()
self.__name = guest.get_name()
self.debugger = None
def __str__(self):
return 'Instance(name="%s", device="%s")' % (self.__name, self.__device.name)
def __repr__(self):
return str(self)
@property
def device(self):
return self.__device
@property
def name(self):
return self.__name
@property
def sock(self):
return self.__sock
[docs] def stop(self):
"""
Stop instance.
"""
# Loop on all processes and look for our instance
for p in psutil.process_iter():
p_env = p.environ()
if 'PIOTR_INSTNAME' in p_env and p_env['PIOTR_INSTNAME'] == self.__name:
p.send_signal(signal.SIGTERM)
# Clean the execution environment
self.__guest.stop()
[docs] def exec_host(self, command, wait=True):
"""
Execute a command in host.
"""
nibbles = command.split(' ')
if len(nibbles) > 0:
executable = nibbles[0]
args = [arg for arg in nibbles[1:] if len(arg)>0]
guest = QemuGuest(self.__name)
return guest.run_command(executable, args, wait=wait)
else:
return None
[docs] def exec_target(self, command, wait=True):
"""Execute a command in guest.
:param command: command to execute
:type command: str
:param return_output: wait for the process to end and return output
:type return_output: bool
:return: command output or executable PID
"""
nibbles = command.split(' ')
if len(nibbles) > 0:
executable = nibbles[0]
args = [arg for arg in nibbles[1:] if len(arg)>0]
guest = QemuGuest(self.__name)
guest_args = ['/target']
guest_args.extend([executable])
guest_args.extend(args)
return guest.run_command(
'chroot', guest_args,
wait=wait
)
else:
# Error.
return None
[docs] def debug(self, pid, ip='0.0.0.0', port=4444, gdb_executable='gdb-multiarch'):
"""
Runs a gdb server and attaches it to a process.
"""
# Attach a gdbserver to the target PID
guest = QemuGuest(self.__name)
result = guest.run_command(
'gdbserver', [
'%s:%d' % (ip, port),
'--attach',
'%d' % pid
],
wait=False
)
# is it a fail ?
if result is None:
return None
# If avatar2 is installed, return a GDBTarget
if use_avatar2:
# Wait for gdbserver to be available
sleep(2)
# Instanciate our GDB
self.debugger = Debugger(
self.__name,
pid,
ip=None if ip=='0.0.0.0' else ip,
port=port,
gdb=gdb_executable
)
return self.debugger
else:
return result
[docs] def ps(self):
"""
Run `ps` on this target and return a list of processes.
:return: list of processes
"""
# Runs ps command
guest = QemuGuest(self.__name)
result = guest.run_command(
'ps', [],
wait=True
)
# Parse result
processes = []
for l in result.split(b'\n')[1:]:
res = re.match(b'^\s+([0-9]+)\s+(\S+)\s+(.*)$', l)
if res is not None:
processes.append(
Process(
pid=int(res.group(1)),
user=res.group(2).decode('latin-1'),
path=res.group(3).decode('latin-1')
)
)
return processes
[docs] def pid(self, process_name):
"""Find process ID from process name
:param process_name: process name
:type process_name: str
:return: PID of the process, None on error
"""
processes = self.ps()
for process in processes:
if process.path.split(' ')[0] == process_name:
return process.pid
# Not found :(
return None
[docs] def kill(self, pid, sig=9):
"""
Kill a processus.
"""
guest = QemuGuest(self.__name)
result = guest.run_command(
'kill', [
'-%d' % sig,
'%d' % pid
],
wait=True
)
return result
[docs] def target_start(self):
"""
Run target in host.
"""
self.exec_host('target-start', wait=False)
[docs] def get_sysroot(self):
"""Return this instance root path"""
return self.__device.get_sysroot()
[docs]class Device:
"""
API Device object.
"""
def __init__(self, deviceName):
self.__device = deviceName
def __str__(self):
return 'Device(name="%s")' % self.__device
def __repr__(self):
return str(self)
@property
def name(self):
return self.__device
[docs] def run(self, alias=None, background=False):
"""
Run emulated device.
"""
if ud.get().hasDevice(self.__device):
device = _Device(self.__device)
device_name = alias
# Check if we can emulate this platform
if QemuPlatforms.has(device.getPlatform()):
platform_clazz = QemuPlatforms.get(device.getPlatform())
else:
warning('Device', 'Unknown qemu platform `%s`. Falling back to `virt`.' % device.getPlatform())
platform_clazz = QemuPlatforms.get('virt')
guest = platform_clazz(device, name=device_name)
# Check if specific privileges are required.
if guest.requires_privileges():
enforce_root()
# Start emulated device
guest.start(background=background)
if not background:
guest.wait()
guest.stop()
return None
else:
return Instance(
guest,
self.__device
)
else:
raise DeviceNotFound()
[docs] def get_sysroot(self):
"""Return this device root fs path"""
if ud.get().hasDevice(self.__device):
device = _Device(self.__device)
return device.getRootfsPath()
else:
return None
[docs]class Piotr:
"""
Piotr main API.
"""
[docs] @staticmethod
def devices():
"""
Enumerate registered devices.
"""
for device in ud.get().getDevices():
yield Device(device.name)
[docs] @staticmethod
def instances():
"""
Enumerate running instances.
"""
qemu_enum = QemuGuestEnum()
for pid,instance,sock,inst_name in qemu_enum.listGuests():
yield Instance(pid, instance, sock, inst_name)
[docs] @staticmethod
def instance(inst_name):
"""
Find an existing instance by name
"""
for instance in Piotr.instances():
if instance.name == inst_name:
return instance