upload android base code part7

This commit is contained in:
August 2018-08-08 18:09:17 +08:00
parent 4e516ec6ed
commit 841ae54672
25229 changed files with 1709508 additions and 0 deletions

View file

@ -0,0 +1,28 @@
// Copyright (C) 2017 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
python_library_host {
name: "adb_py",
srcs: [
"adb/*.py",
],
version: {
py2: {
enabled: true,
},
py3: {
enabled: false,
},
},
}

View file

@ -0,0 +1,13 @@
Copyright (C) 2016 The Android Open Source Project
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View file

@ -0,0 +1,17 @@
#
# Copyright (C) 2015 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import
from .device import * # pylint: disable=wildcard-import

View file

@ -0,0 +1,513 @@
#
# Copyright (C) 2015 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import atexit
import base64
import logging
import os
import re
import subprocess
class FindDeviceError(RuntimeError):
pass
class DeviceNotFoundError(FindDeviceError):
def __init__(self, serial):
self.serial = serial
super(DeviceNotFoundError, self).__init__(
'No device with serial {}'.format(serial))
class NoUniqueDeviceError(FindDeviceError):
def __init__(self):
super(NoUniqueDeviceError, self).__init__('No unique device')
class ShellError(RuntimeError):
def __init__(self, cmd, stdout, stderr, exit_code):
super(ShellError, self).__init__(
'`{0}` exited with code {1}'.format(cmd, exit_code))
self.cmd = cmd
self.stdout = stdout
self.stderr = stderr
self.exit_code = exit_code
def get_devices(adb_path='adb'):
with open(os.devnull, 'wb') as devnull:
subprocess.check_call([adb_path, 'start-server'], stdout=devnull,
stderr=devnull)
out = split_lines(subprocess.check_output([adb_path, 'devices']))
# The first line of `adb devices` just says "List of attached devices", so
# skip that.
devices = []
for line in out[1:]:
if not line.strip():
continue
if 'offline' in line:
continue
serial, _ = re.split(r'\s+', line, maxsplit=1)
devices.append(serial)
return devices
def _get_unique_device(product=None, adb_path='adb'):
devices = get_devices(adb_path=adb_path)
if len(devices) != 1:
raise NoUniqueDeviceError()
return AndroidDevice(devices[0], product, adb_path)
def _get_device_by_serial(serial, product=None, adb_path='adb'):
for device in get_devices(adb_path=adb_path):
if device == serial:
return AndroidDevice(serial, product, adb_path)
raise DeviceNotFoundError(serial)
def get_device(serial=None, product=None, adb_path='adb'):
"""Get a uniquely identified AndroidDevice if one is available.
Raises:
DeviceNotFoundError:
The serial specified by `serial` or $ANDROID_SERIAL is not
connected.
NoUniqueDeviceError:
Neither `serial` nor $ANDROID_SERIAL was set, and the number of
devices connected to the system is not 1. Having 0 connected
devices will also result in this error.
Returns:
An AndroidDevice associated with the first non-None identifier in the
following order of preference:
1) The `serial` argument.
2) The environment variable $ANDROID_SERIAL.
3) The single device connnected to the system.
"""
if serial is not None:
return _get_device_by_serial(serial, product, adb_path)
android_serial = os.getenv('ANDROID_SERIAL')
if android_serial is not None:
return _get_device_by_serial(android_serial, product, adb_path)
return _get_unique_device(product, adb_path=adb_path)
def _get_device_by_type(flag, adb_path):
with open(os.devnull, 'wb') as devnull:
subprocess.check_call([adb_path, 'start-server'], stdout=devnull,
stderr=devnull)
try:
serial = subprocess.check_output(
[adb_path, flag, 'get-serialno']).strip()
except subprocess.CalledProcessError:
raise RuntimeError('adb unexpectedly returned nonzero')
if serial == 'unknown':
raise NoUniqueDeviceError()
return _get_device_by_serial(serial, adb_path=adb_path)
def get_usb_device(adb_path='adb'):
"""Get the unique USB-connected AndroidDevice if it is available.
Raises:
NoUniqueDeviceError:
0 or multiple devices are connected via USB.
Returns:
An AndroidDevice associated with the unique USB-connected device.
"""
return _get_device_by_type('-d', adb_path=adb_path)
def get_emulator_device(adb_path='adb'):
"""Get the unique emulator AndroidDevice if it is available.
Raises:
NoUniqueDeviceError:
0 or multiple emulators are running.
Returns:
An AndroidDevice associated with the unique running emulator.
"""
return _get_device_by_type('-e', adb_path=adb_path)
# If necessary, modifies subprocess.check_output() or subprocess.Popen() args
# to run the subprocess via Windows PowerShell to work-around an issue in
# Python 2's subprocess class on Windows where it doesn't support Unicode.
def _get_subprocess_args(args):
# Only do this slow work-around if Unicode is in the cmd line on Windows.
# PowerShell takes 600-700ms to startup on a 2013-2014 machine, which is
# very slow.
if os.name != 'nt' or all(not isinstance(arg, unicode) for arg in args[0]):
return args
def escape_arg(arg):
# Escape for the parsing that the C Runtime does in Windows apps. In
# particular, this will take care of double-quotes.
arg = subprocess.list2cmdline([arg])
# Escape single-quote with another single-quote because we're about
# to...
arg = arg.replace(u"'", u"''")
# ...put the arg in a single-quoted string for PowerShell to parse.
arg = u"'" + arg + u"'"
return arg
# Escape command line args.
argv = map(escape_arg, args[0])
# Cause script errors (such as adb not found) to stop script immediately
# with an error.
ps_code = u'$ErrorActionPreference = "Stop"\r\n'
# Add current directory to the PATH var, to match cmd.exe/CreateProcess()
# behavior.
ps_code += u'$env:Path = ".;" + $env:Path\r\n'
# Precede by &, the PowerShell call operator, and separate args by space.
ps_code += u'& ' + u' '.join(argv)
# Make the PowerShell exit code the exit code of the subprocess.
ps_code += u'\r\nExit $LastExitCode'
# Encode as UTF-16LE (without Byte-Order-Mark) which Windows natively
# understands.
ps_code = ps_code.encode('utf-16le')
# Encode the PowerShell command as base64 and use the special
# -EncodedCommand option that base64 decodes. Base64 is just plain ASCII,
# so it should have no problem passing through Win32 CreateProcessA()
# (which python erroneously calls instead of CreateProcessW()).
return (['powershell.exe', '-NoProfile', '-NonInteractive',
'-EncodedCommand', base64.b64encode(ps_code)],) + args[1:]
# Call this instead of subprocess.check_output() to work-around issue in Python
# 2's subprocess class on Windows where it doesn't support Unicode.
def _subprocess_check_output(*args, **kwargs):
try:
return subprocess.check_output(*_get_subprocess_args(args), **kwargs)
except subprocess.CalledProcessError as e:
# Show real command line instead of the powershell.exe command line.
raise subprocess.CalledProcessError(e.returncode, args[0],
output=e.output)
# Call this instead of subprocess.Popen(). Like _subprocess_check_output().
def _subprocess_Popen(*args, **kwargs):
return subprocess.Popen(*_get_subprocess_args(args), **kwargs)
def split_lines(s):
"""Splits lines in a way that works even on Windows and old devices.
Windows will see \r\n instead of \n, old devices do the same, old devices
on Windows will see \r\r\n.
"""
# rstrip is used here to workaround a difference between splineslines and
# re.split:
# >>> 'foo\n'.splitlines()
# ['foo']
# >>> re.split(r'\n', 'foo\n')
# ['foo', '']
return re.split(r'[\r\n]+', s.rstrip())
def version(adb_path=None):
"""Get the version of adb (in terms of ADB_SERVER_VERSION)."""
adb_path = adb_path if adb_path is not None else ['adb']
version_output = subprocess.check_output(adb_path + ['version'])
pattern = r'^Android Debug Bridge version 1.0.(\d+)$'
result = re.match(pattern, version_output.splitlines()[0])
if not result:
return 0
return int(result.group(1))
class AndroidDevice(object):
# Delimiter string to indicate the start of the exit code.
_RETURN_CODE_DELIMITER = 'x'
# Follow any shell command with this string to get the exit
# status of a program since this isn't propagated by adb.
#
# The delimiter is needed because `printf 1; echo $?` would print
# "10", and we wouldn't be able to distinguish the exit code.
_RETURN_CODE_PROBE = [';', 'echo', '{0}$?'.format(_RETURN_CODE_DELIMITER)]
# Maximum search distance from the output end to find the delimiter.
# adb on Windows returns \r\n even if adbd returns \n. Some old devices
# seem to actually return \r\r\n.
_RETURN_CODE_SEARCH_LENGTH = len(
'{0}255\r\r\n'.format(_RETURN_CODE_DELIMITER))
def __init__(self, serial, product=None, adb_path='adb'):
self.serial = serial
self.product = product
self.adb_cmd = [adb_path]
if self.serial is not None:
self.adb_cmd.extend(['-s', serial])
if self.product is not None:
self.adb_cmd.extend(['-p', product])
self._linesep = None
self._features = None
@property
def linesep(self):
if self._linesep is None:
self._linesep = subprocess.check_output(self.adb_cmd +
['shell', 'echo'])
return self._linesep
@property
def features(self):
if self._features is None:
try:
self._features = split_lines(self._simple_call(['features']))
except subprocess.CalledProcessError:
self._features = []
return self._features
def has_shell_protocol(self):
return version(self.adb_cmd) >= 35 and 'shell_v2' in self.features
def _make_shell_cmd(self, user_cmd):
command = self.adb_cmd + ['shell'] + user_cmd
if not self.has_shell_protocol():
command += self._RETURN_CODE_PROBE
return command
def _parse_shell_output(self, out):
"""Finds the exit code string from shell output.
Args:
out: Shell output string.
Returns:
An (exit_code, output_string) tuple. The output string is
cleaned of any additional stuff we appended to find the
exit code.
Raises:
RuntimeError: Could not find the exit code in |out|.
"""
search_text = out
if len(search_text) > self._RETURN_CODE_SEARCH_LENGTH:
# We don't want to search over massive amounts of data when we know
# the part we want is right at the end.
search_text = search_text[-self._RETURN_CODE_SEARCH_LENGTH:]
partition = search_text.rpartition(self._RETURN_CODE_DELIMITER)
if partition[1] == '':
raise RuntimeError('Could not find exit status in shell output.')
result = int(partition[2])
# partition[0] won't contain the full text if search_text was
# truncated, pull from the original string instead.
out = out[:-len(partition[1]) - len(partition[2])]
return result, out
def _simple_call(self, cmd):
logging.info(' '.join(self.adb_cmd + cmd))
return _subprocess_check_output(
self.adb_cmd + cmd, stderr=subprocess.STDOUT)
def shell(self, cmd):
"""Calls `adb shell`
Args:
cmd: command to execute as a list of strings.
Returns:
A (stdout, stderr) tuple. Stderr may be combined into stdout
if the device doesn't support separate streams.
Raises:
ShellError: the exit code was non-zero.
"""
exit_code, stdout, stderr = self.shell_nocheck(cmd)
if exit_code != 0:
raise ShellError(cmd, stdout, stderr, exit_code)
return stdout, stderr
def shell_nocheck(self, cmd):
"""Calls `adb shell`
Args:
cmd: command to execute as a list of strings.
Returns:
An (exit_code, stdout, stderr) tuple. Stderr may be combined
into stdout if the device doesn't support separate streams.
"""
cmd = self._make_shell_cmd(cmd)
logging.info(' '.join(cmd))
p = _subprocess_Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
if self.has_shell_protocol():
exit_code = p.returncode
else:
exit_code, stdout = self._parse_shell_output(stdout)
return exit_code, stdout, stderr
def shell_popen(self, cmd, kill_atexit=True, preexec_fn=None,
creationflags=0, **kwargs):
"""Calls `adb shell` and returns a handle to the adb process.
This function provides direct access to the subprocess used to run the
command, without special return code handling. Users that need the
return value must retrieve it themselves.
Args:
cmd: Array of command arguments to execute.
kill_atexit: Whether to kill the process upon exiting.
preexec_fn: Argument forwarded to subprocess.Popen.
creationflags: Argument forwarded to subprocess.Popen.
**kwargs: Arguments forwarded to subprocess.Popen.
Returns:
subprocess.Popen handle to the adb shell instance
"""
command = self.adb_cmd + ['shell'] + cmd
# Make sure a ctrl-c in the parent script doesn't kill gdbserver.
if os.name == 'nt':
creationflags |= subprocess.CREATE_NEW_PROCESS_GROUP
else:
if preexec_fn is None:
preexec_fn = os.setpgrp
elif preexec_fn is not os.setpgrp:
fn = preexec_fn
def _wrapper():
fn()
os.setpgrp()
preexec_fn = _wrapper
p = _subprocess_Popen(command, creationflags=creationflags,
preexec_fn=preexec_fn, **kwargs)
if kill_atexit:
atexit.register(p.kill)
return p
def install(self, filename, replace=False):
cmd = ['install']
if replace:
cmd.append('-r')
cmd.append(filename)
return self._simple_call(cmd)
def push(self, local, remote, sync=False):
"""Transfer a local file or directory to the device.
Args:
local: The local file or directory to transfer.
remote: The remote path to which local should be transferred.
sync: If True, only transfers files that are newer on the host than
those on the device. If False, transfers all files.
Returns:
Exit status of the push command.
"""
cmd = ['push']
if sync:
cmd.append('--sync')
cmd.extend([local, remote])
return self._simple_call(cmd)
def pull(self, remote, local):
return self._simple_call(['pull', remote, local])
def sync(self, directory=None):
cmd = ['sync']
if directory is not None:
cmd.append(directory)
return self._simple_call(cmd)
def tcpip(self, port):
return self._simple_call(['tcpip', port])
def usb(self):
return self._simple_call(['usb'])
def reboot(self):
return self._simple_call(['reboot'])
def remount(self):
return self._simple_call(['remount'])
def root(self):
return self._simple_call(['root'])
def unroot(self):
return self._simple_call(['unroot'])
def connect(self, host):
return self._simple_call(['connect', host])
def disconnect(self, host):
return self._simple_call(['disconnect', host])
def forward(self, local, remote):
return self._simple_call(['forward', local, remote])
def forward_list(self):
return self._simple_call(['forward', '--list'])
def forward_no_rebind(self, local, remote):
return self._simple_call(['forward', '--no-rebind', local, remote])
def forward_remove(self, local):
return self._simple_call(['forward', '--remove', local])
def forward_remove_all(self):
return self._simple_call(['forward', '--remove-all'])
def reverse(self, remote, local):
return self._simple_call(['reverse', remote, local])
def reverse_list(self):
return self._simple_call(['reverse', '--list'])
def reverse_no_rebind(self, local, remote):
return self._simple_call(['reverse', '--no-rebind', local, remote])
def reverse_remove_all(self):
return self._simple_call(['reverse', '--remove-all'])
def reverse_remove(self, remote):
return self._simple_call(['reverse', '--remove', remote])
def wait(self):
return self._simple_call(['wait-for-device'])
def get_prop(self, prop_name):
output = split_lines(self.shell(['getprop', prop_name])[0])
if len(output) != 1:
raise RuntimeError('Too many lines in getprop output:\n' +
'\n'.join(output))
value = output[0]
if not value.strip():
return None
return value
def set_prop(self, prop_name, value):
self.shell(['setprop', prop_name, value])

View file

@ -0,0 +1,32 @@
#
# Copyright (C) 2015 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from distutils.core import setup
setup(
name='adb',
version='0.0.1',
description='A Python interface to the Android Debug Bridge.',
license='Apache 2.0',
keywords='adb android',
package_dir={'adb': ''},
packages=['adb'],
classifiers=[
'Development Status :: 2 - Pre-Alpha',
'Intended Audience :: Developers',
'License :: OSI Approved :: Apache Software License',
]
)

View file

@ -0,0 +1,2 @@
This library provides access to the fastboot utility.
For fastboot bootloader tests, see platform/system/extra/tests/bootloader

View file

@ -0,0 +1,17 @@
#
# Copyright (C) 2016 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import
from .device import * # pylint: disable=wildcard-import

View file

@ -0,0 +1,235 @@
#
# Copyright (C) 2016 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Provides functionality to interact with a device via `fastboot`."""
import os
import re
import subprocess
class FastbootError(Exception):
"""Something went wrong interacting with fastboot."""
class FastbootDevice(object):
"""Class to interact with a fastboot device."""
# Prefix for INFO-type messages when printed by fastboot. If we want
# to parse the output from an INFO message we need to strip this off.
INFO_PREFIX = '(bootloader) '
def __init__(self, path='fastboot'):
"""Initialization.
Args:
path: path to the fastboot executable to test with.
Raises:
FastbootError: Failed to find a device in fastboot mode.
"""
self.path = path
# Make sure the fastboot executable is available.
try:
_subprocess_check_output([self.path, '--version'])
except OSError:
raise FastbootError('Could not execute `{}`'.format(self.path))
# Make sure exactly 1 fastboot device is available if <specific device>
# was not given as an argument. Do not try to find an adb device and
# put it in fastboot mode, it would be too easy to accidentally
# download to the wrong device.
if not self._check_single_device():
raise FastbootError('Requires exactly 1 device in fastboot mode')
def _check_single_device(self):
"""Returns True if there is exactly one fastboot device attached.
When ANDROID_SERIAL is set it checks that the device is available.
"""
if 'ANDROID_SERIAL' in os.environ:
try:
self.getvar('product')
return True
except subprocess.CalledProcessError:
return False
devices = _subprocess_check_output([self.path, 'devices']).splitlines()
return len(devices) == 1 and devices[0].split()[1] == 'fastboot'
def getvar(self, name):
"""Calls `fastboot getvar`.
To query all variables (fastboot getvar all) use getvar_all()
instead.
Args:
name: variable name to access.
Returns:
String value of variable |name| or None if not found.
"""
try:
output = _subprocess_check_output([self.path, 'getvar', name],
stderr=subprocess.STDOUT).splitlines()
except subprocess.CalledProcessError:
return None
# Output format is <name>:<whitespace><value>.
out = 0
if output[0] == "< waiting for any device >":
out = 1
result = re.search(r'{}:\s*(.*)'.format(name), output[out])
if result:
return result.group(1)
else:
return None
def getvar_all(self):
"""Calls `fastboot getvar all`.
Returns:
A {name, value} dictionary of variables.
"""
output = _subprocess_check_output([self.path, 'getvar', 'all'],
stderr=subprocess.STDOUT).splitlines()
all_vars = {}
for line in output:
result = re.search(r'(.*):\s*(.*)', line)
if result:
var_name = result.group(1)
# `getvar all` works by sending one INFO message per variable
# so we need to strip out the info prefix string.
if var_name.startswith(self.INFO_PREFIX):
var_name = var_name[len(self.INFO_PREFIX):]
# In addition to returning all variables the bootloader may
# also think it's supposed to query a return a variable named
# "all", so ignore this line if so. Fastboot also prints a
# summary line that we want to ignore.
if var_name != 'all' and 'total time' not in var_name:
all_vars[var_name] = result.group(2)
return all_vars
def flashall(self, wipe_user=True, slot=None, skip_secondary=False, quiet=True):
"""Calls `fastboot [-w] flashall`.
Args:
wipe_user: whether to set the -w flag or not.
slot: slot to flash if device supports A/B, otherwise default will be used.
skip_secondary: on A/B devices, flashes only the primary images if true.
quiet: True to hide output, false to send it to stdout.
"""
func = (_subprocess_check_output if quiet else subprocess.check_call)
command = [self.path, 'flashall']
if slot:
command.extend(['--slot', slot])
if skip_secondary:
command.append("--skip-secondary")
if wipe_user:
command.append('-w')
func(command, stderr=subprocess.STDOUT)
def flash(self, partition='cache', img=None, slot=None, quiet=True):
"""Calls `fastboot flash`.
Args:
partition: which partition to flash.
img: path to .img file, otherwise the default will be used.
slot: slot to flash if device supports A/B, otherwise default will be used.
quiet: True to hide output, false to send it to stdout.
"""
func = (_subprocess_check_output if quiet else subprocess.check_call)
command = [self.path, 'flash', partition]
if img:
command.append(img)
if slot:
command.extend(['--slot', slot])
if skip_secondary:
command.append("--skip-secondary")
func(command, stderr=subprocess.STDOUT)
def reboot(self, bootloader=False):
"""Calls `fastboot reboot [bootloader]`.
Args:
bootloader: True to reboot back to the bootloader.
"""
command = [self.path, 'reboot']
if bootloader:
command.append('bootloader')
_subprocess_check_output(command, stderr=subprocess.STDOUT)
def set_active(self, slot):
"""Calls `fastboot set_active <slot>`.
Args:
slot: The slot to set as the current slot."""
command = [self.path, 'set_active', slot]
_subprocess_check_output(command, stderr=subprocess.STDOUT)
# If necessary, modifies subprocess.check_output() or subprocess.Popen() args
# to run the subprocess via Windows PowerShell to work-around an issue in
# Python 2's subprocess class on Windows where it doesn't support Unicode.
def _get_subprocess_args(args):
# Only do this slow work-around if Unicode is in the cmd line on Windows.
# PowerShell takes 600-700ms to startup on a 2013-2014 machine, which is
# very slow.
if os.name != 'nt' or all(not isinstance(arg, unicode) for arg in args[0]):
return args
def escape_arg(arg):
# Escape for the parsing that the C Runtime does in Windows apps. In
# particular, this will take care of double-quotes.
arg = subprocess.list2cmdline([arg])
# Escape single-quote with another single-quote because we're about
# to...
arg = arg.replace(u"'", u"''")
# ...put the arg in a single-quoted string for PowerShell to parse.
arg = u"'" + arg + u"'"
return arg
# Escape command line args.
argv = map(escape_arg, args[0])
# Cause script errors (such as adb not found) to stop script immediately
# with an error.
ps_code = u'$ErrorActionPreference = "Stop"\r\n'
# Add current directory to the PATH var, to match cmd.exe/CreateProcess()
# behavior.
ps_code += u'$env:Path = ".;" + $env:Path\r\n'
# Precede by &, the PowerShell call operator, and separate args by space.
ps_code += u'& ' + u' '.join(argv)
# Make the PowerShell exit code the exit code of the subprocess.
ps_code += u'\r\nExit $LastExitCode'
# Encode as UTF-16LE (without Byte-Order-Mark) which Windows natively
# understands.
ps_code = ps_code.encode('utf-16le')
# Encode the PowerShell command as base64 and use the special
# -EncodedCommand option that base64 decodes. Base64 is just plain ASCII,
# so it should have no problem passing through Win32 CreateProcessA()
# (which python erroneously calls instead of CreateProcessW()).
return (['powershell.exe', '-NoProfile', '-NonInteractive',
'-EncodedCommand', base64.b64encode(ps_code)],) + args[1:]
# Call this instead of subprocess.check_output() to work-around issue in Python
# 2's subprocess class on Windows where it doesn't support Unicode.
def _subprocess_check_output(*args, **kwargs):
try:
return subprocess.check_output(*_get_subprocess_args(args), **kwargs)
except subprocess.CalledProcessError as e:
# Show real command line instead of the powershell.exe command line.
raise subprocess.CalledProcessError(e.returncode, args[0],
output=e.output)

View file

@ -0,0 +1,32 @@
#
# Copyright (C) 2016 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from distutils.core import setup
setup(
name='fastboot',
version='0.0.1',
description='A Python interface to the Fastboot utility.',
license='Apache 2.0',
keywords='fastboot android',
package_dir={'fastboot': ''},
packages=['fastboot'],
classifiers=[
'Development Status :: 2 - Pre-Alpha',
'Intended Audience :: Developers',
'License :: OSI Approved :: Apache Software License',
]
)

View file

@ -0,0 +1,342 @@
#
# Copyright (C) 2015 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Helpers used by both gdbclient.py and ndk-gdb.py."""
import adb
import argparse
import atexit
import os
import subprocess
import sys
import tempfile
class ArgumentParser(argparse.ArgumentParser):
"""ArgumentParser subclass that provides adb device selection."""
def __init__(self):
super(ArgumentParser, self).__init__()
self.add_argument(
"--adb", dest="adb_path",
help="use specific adb command")
group = self.add_argument_group(title="device selection")
group = group.add_mutually_exclusive_group()
group.add_argument(
"-a", action="store_const", dest="device", const="-a",
help="directs commands to all interfaces")
group.add_argument(
"-d", action="store_const", dest="device", const="-d",
help="directs commands to the only connected USB device")
group.add_argument(
"-e", action="store_const", dest="device", const="-e",
help="directs commands to the only connected emulator")
group.add_argument(
"-s", metavar="SERIAL", action="store", dest="serial",
help="directs commands to device/emulator with the given serial")
def parse_args(self, args=None, namespace=None):
result = super(ArgumentParser, self).parse_args(args, namespace)
adb_path = result.adb_path or "adb"
# Try to run the specified adb command
try:
subprocess.check_output([adb_path, "version"],
stderr=subprocess.STDOUT)
except (OSError, subprocess.CalledProcessError):
msg = "ERROR: Unable to run adb executable (tried '{}')."
if not result.adb_path:
msg += "\n Try specifying its location with --adb."
sys.exit(msg.format(adb_path))
try:
if result.device == "-a":
result.device = adb.get_device(adb_path=adb_path)
elif result.device == "-d":
result.device = adb.get_usb_device(adb_path=adb_path)
elif result.device == "-e":
result.device = adb.get_emulator_device(adb_path=adb_path)
else:
result.device = adb.get_device(result.serial, adb_path=adb_path)
except (adb.DeviceNotFoundError, adb.NoUniqueDeviceError, RuntimeError):
# Don't error out if we can't find a device.
result.device = None
return result
def get_processes(device):
"""Return a dict from process name to list of running PIDs on the device."""
# Some custom ROMs use busybox instead of toolbox for ps. Without -w,
# busybox truncates the output, and very long package names like
# com.exampleisverylongtoolongbyfar.plasma exceed the limit.
#
# Perform the check for this on the device to avoid an adb roundtrip
# Some devices might not have readlink or which, so we need to handle
# this as well.
#
# Gracefully handle [ or readlink being missing by always using `ps` if
# readlink is missing. (API 18 has [, but not readlink).
ps_script = """
if $(ls /system/bin/readlink >/dev/null 2>&1); then
if [ $(readlink /system/bin/ps) == "toolbox" ]; then
ps;
else
ps -w;
fi
else
ps;
fi
"""
ps_script = " ".join([line.strip() for line in ps_script.splitlines()])
output, _ = device.shell([ps_script])
return parse_ps_output(output)
def parse_ps_output(output):
processes = dict()
output = adb.split_lines(output.replace("\r", ""))
columns = output.pop(0).split()
try:
pid_column = columns.index("PID")
except ValueError:
pid_column = 1
while output:
columns = output.pop().split()
process_name = columns[-1]
pid = int(columns[pid_column])
if process_name in processes:
processes[process_name].append(pid)
else:
processes[process_name] = [pid]
return processes
def get_pids(device, process_name):
processes = get_processes(device)
return processes.get(process_name, [])
def start_gdbserver(device, gdbserver_local_path, gdbserver_remote_path,
target_pid, run_cmd, debug_socket, port, run_as_cmd=None):
"""Start gdbserver in the background and forward necessary ports.
Args:
device: ADB device to start gdbserver on.
gdbserver_local_path: Host path to push gdbserver from, can be None.
gdbserver_remote_path: Device path to push gdbserver to.
target_pid: PID of device process to attach to.
run_cmd: Command to run on the device.
debug_socket: Device path to place gdbserver unix domain socket.
port: Host port to forward the debug_socket to.
run_as_cmd: run-as or su command to prepend to commands.
Returns:
Popen handle to the `adb shell` process gdbserver was started with.
"""
assert target_pid is None or run_cmd is None
# Push gdbserver to the target.
if gdbserver_local_path is not None:
device.push(gdbserver_local_path, gdbserver_remote_path)
# Run gdbserver.
gdbserver_cmd = [gdbserver_remote_path, "--once",
"+{}".format(debug_socket)]
if target_pid is not None:
gdbserver_cmd += ["--attach", str(target_pid)]
else:
gdbserver_cmd += run_cmd
forward_gdbserver_port(device, local=port, remote="localfilesystem:{}".format(debug_socket))
if run_as_cmd:
gdbserver_cmd = run_as_cmd + gdbserver_cmd
gdbserver_output_path = os.path.join(tempfile.gettempdir(),
"gdbclient.log")
print("Redirecting gdbserver output to {}".format(gdbserver_output_path))
gdbserver_output = file(gdbserver_output_path, 'w')
return device.shell_popen(gdbserver_cmd, stdout=gdbserver_output,
stderr=gdbserver_output)
def forward_gdbserver_port(device, local, remote):
"""Forwards local TCP port `port` to `remote` via `adb forward`."""
device.forward("tcp:{}".format(local), remote)
atexit.register(lambda: device.forward_remove("tcp:{}".format(local)))
def find_file(device, executable_path, sysroot, run_as_cmd=None):
"""Finds a device executable file.
This function first attempts to find the local file which will
contain debug symbols. If that fails, it will fall back to
downloading the stripped file from the device.
Args:
device: the AndroidDevice object to use.
executable_path: absolute path to the executable or symlink.
sysroot: absolute path to the built symbol sysroot.
run_as_cmd: if necessary, run-as or su command to prepend
Returns:
A tuple containing (<open file object>, <was found locally>).
Raises:
RuntimeError: could not find the executable binary.
ValueError: |executable_path| is not absolute.
"""
if not os.path.isabs(executable_path):
raise ValueError("'{}' is not an absolute path".format(executable_path))
def generate_files():
"""Yields (<file name>, <found locally>) tuples."""
# First look locally to avoid shelling into the device if possible.
# os.path.join() doesn't combine absolute paths, use + instead.
yield (sysroot + executable_path, True)
# Next check if the path is a symlink.
try:
target = device.shell(['readlink', '-e', '-n', executable_path])[0]
yield (sysroot + target, True)
except adb.ShellError:
pass
# Last, download the stripped executable from the device if necessary.
file_name = "gdbclient-binary-{}".format(os.getppid())
remote_temp_path = "/data/local/tmp/{}".format(file_name)
local_path = os.path.join(tempfile.gettempdir(), file_name)
cmd = ["cat", executable_path, ">", remote_temp_path]
if run_as_cmd:
cmd = run_as_cmd + cmd
try:
device.shell(cmd)
except adb.ShellError:
raise RuntimeError("Failed to copy '{}' to temporary folder on "
"device".format(executable_path))
device.pull(remote_temp_path, local_path)
yield (local_path, False)
for path, found_locally in generate_files():
if os.path.isfile(path):
return (open(path, "r"), found_locally)
raise RuntimeError('Could not find executable {}'.format(executable_path))
def find_executable_path(device, executable_name, run_as_cmd=None):
"""Find a device executable from its name
This function calls which on the device to retrieve the absolute path of
the executable.
Args:
device: the AndroidDevice object to use.
executable_name: the name of the executable to find.
run_as_cmd: if necessary, run-as or su command to prepend
Returns:
The absolute path of the executable.
Raises:
RuntimeError: could not find the executable.
"""
cmd = ["which", executable_name]
if run_as_cmd:
cmd = run_as_cmd + cmd
try:
output, _ = device.shell(cmd)
return output
except adb.ShellError:
raise RuntimeError("Could not find executable '{}' on "
"device".format(executable_name))
def find_binary(device, pid, sysroot, run_as_cmd=None):
"""Finds a device executable file corresponding to |pid|."""
return find_file(device, "/proc/{}/exe".format(pid), sysroot, run_as_cmd)
def get_binary_arch(binary_file):
"""Parse a binary's ELF header for arch."""
try:
binary_file.seek(0)
binary = binary_file.read(0x14)
except IOError:
raise RuntimeError("failed to read binary file")
ei_class = ord(binary[0x4]) # 1 = 32-bit, 2 = 64-bit
ei_data = ord(binary[0x5]) # Endianness
assert ei_class == 1 or ei_class == 2
if ei_data != 1:
raise RuntimeError("binary isn't little-endian?")
e_machine = ord(binary[0x13]) << 8 | ord(binary[0x12])
if e_machine == 0x28:
assert ei_class == 1
return "arm"
elif e_machine == 0xB7:
assert ei_class == 2
return "arm64"
elif e_machine == 0x03:
assert ei_class == 1
return "x86"
elif e_machine == 0x3E:
assert ei_class == 2
return "x86_64"
elif e_machine == 0x08:
if ei_class == 1:
return "mips"
else:
return "mips64"
else:
raise RuntimeError("unknown architecture: 0x{:x}".format(e_machine))
def start_gdb(gdb_path, gdb_commands, gdb_flags=None):
"""Start gdb in the background and block until it finishes.
Args:
gdb_path: Path of the gdb binary.
gdb_commands: Contents of GDB script to run.
gdb_flags: List of flags to append to gdb command.
"""
# Windows disallows opening the file while it's open for writing.
gdb_script_fd, gdb_script_path = tempfile.mkstemp()
os.write(gdb_script_fd, gdb_commands)
os.close(gdb_script_fd)
gdb_args = [gdb_path, "-x", gdb_script_path] + (gdb_flags or [])
kwargs = {}
if sys.platform.startswith("win"):
kwargs["creationflags"] = subprocess.CREATE_NEW_CONSOLE
gdb_process = subprocess.Popen(gdb_args, **kwargs)
while gdb_process.returncode is None:
try:
gdb_process.communicate()
except KeyboardInterrupt:
pass
os.unlink(gdb_script_path)