1129 lines
38 KiB
Python
1129 lines
38 KiB
Python
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
"""
|
|
Provides graphics related utils, like capturing screenshots or checking on
|
|
the state of the graphics driver.
|
|
"""
|
|
|
|
import collections
|
|
import contextlib
|
|
import glob
|
|
import logging
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
#import traceback
|
|
# Please limit the use of the uinput library to this file. Try not to spread
|
|
# dependencies and abstract as much as possible to make switching to a different
|
|
# input library in the future easier.
|
|
import uinput
|
|
|
|
from autotest_lib.client.bin import test
|
|
from autotest_lib.client.bin import utils
|
|
from autotest_lib.client.common_lib import error
|
|
from autotest_lib.client.common_lib import test as test_utils
|
|
from autotest_lib.client.cros import power_utils
|
|
from autotest_lib.client.cros.graphics import gbm
|
|
from functools import wraps
|
|
|
|
|
|
class GraphicsTest(test.test):
|
|
"""Base class for graphics test.
|
|
|
|
GraphicsTest is the base class for graphics tests.
|
|
Every subclass of GraphicsTest should call GraphicsTests initialize/cleanup
|
|
method as they will do GraphicsStateChecker as well as report states to
|
|
Chrome Perf dashboard.
|
|
|
|
Attributes:
|
|
_test_failure_description(str): Failure name reported to chrome perf
|
|
dashboard. (Default: Failures)
|
|
_test_failure_report_enable(bool): Enable/Disable reporting
|
|
failures to chrome perf dashboard
|
|
automatically. (Default: True)
|
|
"""
|
|
version = 1
|
|
_GSC = None
|
|
|
|
_test_failure_description = "Failures"
|
|
_test_failure_report_enable = True
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
"""Initialize flag setting."""
|
|
super(GraphicsTest, self).__init__(*args, **kwargs)
|
|
self._failures = []
|
|
|
|
def initialize(self, raise_error_on_hang=False, *args, **kwargs):
|
|
"""Initial state checker and report initial value to perf dashboard."""
|
|
self._GSC = GraphicsStateChecker(raise_error_on_hang)
|
|
|
|
self.output_perf_value(
|
|
description='Timeout_Reboot',
|
|
value=1,
|
|
units='count',
|
|
higher_is_better=False
|
|
)
|
|
|
|
if hasattr(super(GraphicsTest, self), "initialize"):
|
|
test_utils._cherry_pick_call(super(GraphicsTest, self).initialize,
|
|
*args, **kwargs)
|
|
|
|
def cleanup(self, *args, **kwargs):
|
|
"""Finalize state checker and report values to perf dashboard."""
|
|
if self._GSC:
|
|
self._GSC.finalize()
|
|
|
|
self.output_perf_value(
|
|
description='Timeout_Reboot',
|
|
value=0,
|
|
units='count',
|
|
higher_is_better=False,
|
|
replace_existing_values=True
|
|
)
|
|
|
|
logging.debug('GraphicsTest recorded failures: %s', self.get_failures())
|
|
if self._test_failure_report_enable:
|
|
self.output_perf_value(
|
|
description=self._test_failure_description,
|
|
value=len(self._failures),
|
|
units='count',
|
|
higher_is_better=False
|
|
)
|
|
|
|
if hasattr(super(GraphicsTest, self), "cleanup"):
|
|
test_utils._cherry_pick_call(super(GraphicsTest, self).cleanup,
|
|
*args, **kwargs)
|
|
|
|
@contextlib.contextmanager
|
|
def failure_report(self, name):
|
|
"""Record the failure of an operation to the self._failures.
|
|
|
|
Records if the operation taken inside executed normally or not.
|
|
If the operation taken inside raise unexpected failure, failure named
|
|
|name|, will be added to the self._failures list and reported to the
|
|
chrome perf dashboard in the cleanup stage.
|
|
|
|
Usage:
|
|
# Record failure of doSomething
|
|
with failure_report('doSomething'):
|
|
doSomething()
|
|
"""
|
|
# Assume failed at the beginning
|
|
self.add_failures(name)
|
|
yield {}
|
|
self.remove_failures(name)
|
|
|
|
@classmethod
|
|
def failure_report_decorator(cls, name):
|
|
"""Record the failure if the function failed to finish.
|
|
This method should only decorate to functions of GraphicsTest.
|
|
In addition, functions with this decorator should be called with no
|
|
unnamed arguments.
|
|
Usage:
|
|
@GraphicsTest.test_run_decorator('graphics_test')
|
|
def Foo(self, bar='test'):
|
|
return doStuff()
|
|
|
|
is equivalent to
|
|
|
|
def Foo(self, bar):
|
|
with failure_reporter('graphics_test'):
|
|
return doStuff()
|
|
|
|
# Incorrect usage.
|
|
@GraphicsTest.test_run_decorator('graphics_test')
|
|
def Foo(self, bar='test'):
|
|
pass
|
|
self.Foo('test_name', bar='test_name') # call Foo with named args
|
|
|
|
# Incorrect usage.
|
|
@GraphicsTest.test_run_decorator('graphics_test')
|
|
def Foo(self, bar='test'):
|
|
pass
|
|
self.Foo('test_name') # call Foo with unnamed args
|
|
"""
|
|
def decorator(fn):
|
|
@wraps(fn)
|
|
def wrapper(*args, **kwargs):
|
|
if len(args) > 1:
|
|
raise error.TestError('Unnamed arguments is not accepted. '
|
|
'Please apply this decorator to '
|
|
'function without unnamed args.')
|
|
# A member function of GraphicsTest is decorated. The first
|
|
# argument is the instance itself.
|
|
instance = args[0]
|
|
with instance.failure_report(name):
|
|
# Cherry pick the arguments for the wrapped function.
|
|
d_args, d_kwargs = test_utils._cherry_pick_args(fn, args,
|
|
kwargs)
|
|
return fn(instance, *d_args, **d_kwargs)
|
|
return wrapper
|
|
return decorator
|
|
|
|
def add_failures(self, failure_description):
|
|
"""
|
|
Add a record to failures list which will report back to chrome perf
|
|
dashboard at the cleanup stage.
|
|
"""
|
|
self._failures.append(failure_description)
|
|
|
|
def remove_failures(self, failure_description):
|
|
"""
|
|
Remove a record from failures list which will report back to chrome perf
|
|
dashboard at the cleanup stage.
|
|
"""
|
|
self._failures.remove(failure_description)
|
|
|
|
def get_failures(self):
|
|
"""
|
|
Get currently recorded failures list.
|
|
"""
|
|
return list(self._failures)
|
|
|
|
|
|
def screen_disable_blanking():
|
|
""" Called from power_Backlight to disable screen blanking. """
|
|
# We don't have to worry about unexpected screensavers or DPMS here.
|
|
return
|
|
|
|
|
|
def screen_disable_energy_saving():
|
|
""" Called from power_Consumption to immediately disable energy saving. """
|
|
# All we need to do here is enable displays via Chrome.
|
|
power_utils.set_display_power(power_utils.DISPLAY_POWER_ALL_ON)
|
|
return
|
|
|
|
|
|
def screen_toggle_fullscreen():
|
|
"""Toggles fullscreen mode."""
|
|
press_keys(['KEY_F11'])
|
|
|
|
|
|
def screen_toggle_mirrored():
|
|
"""Toggles the mirrored screen."""
|
|
press_keys(['KEY_LEFTCTRL', 'KEY_F4'])
|
|
|
|
|
|
def hide_cursor():
|
|
"""Hides mouse cursor."""
|
|
# Send a keystroke to hide the cursor.
|
|
press_keys(['KEY_UP'])
|
|
|
|
|
|
def hide_typing_cursor():
|
|
"""Hides typing cursor."""
|
|
# Press the tab key to move outside the typing bar.
|
|
press_keys(['KEY_TAB'])
|
|
|
|
|
|
def screen_wakeup():
|
|
"""Wake up the screen if it is dark."""
|
|
# Move the mouse a little bit to wake up the screen.
|
|
device = _get_uinput_device_mouse_rel()
|
|
_uinput_emit(device, 'REL_X', 1)
|
|
_uinput_emit(device, 'REL_X', -1)
|
|
|
|
|
|
def switch_screen_on(on):
|
|
"""
|
|
Turn the touch screen on/off.
|
|
|
|
@param on: On or off.
|
|
"""
|
|
raise error.TestFail('switch_screen_on is not implemented.')
|
|
|
|
|
|
# Don't create a device during build_packages or for tests that don't need it.
|
|
uinput_device_keyboard = None
|
|
uinput_device_touch = None
|
|
uinput_device_mouse_rel = None
|
|
|
|
# Don't add more events to this list than are used. For a complete list of
|
|
# available events check python2.7/site-packages/uinput/ev.py.
|
|
UINPUT_DEVICE_EVENTS_KEYBOARD = [
|
|
uinput.KEY_F4,
|
|
uinput.KEY_F11,
|
|
uinput.KEY_KPPLUS,
|
|
uinput.KEY_KPMINUS,
|
|
uinput.KEY_LEFTCTRL,
|
|
uinput.KEY_TAB,
|
|
uinput.KEY_UP,
|
|
uinput.KEY_DOWN,
|
|
uinput.KEY_LEFT,
|
|
uinput.KEY_RIGHT
|
|
]
|
|
# TODO(ihf): Find an ABS sequence that actually works.
|
|
UINPUT_DEVICE_EVENTS_TOUCH = [
|
|
uinput.BTN_TOUCH,
|
|
uinput.ABS_MT_SLOT,
|
|
uinput.ABS_MT_POSITION_X + (0, 2560, 0, 0),
|
|
uinput.ABS_MT_POSITION_Y + (0, 1700, 0, 0),
|
|
uinput.ABS_MT_TRACKING_ID + (0, 10, 0, 0),
|
|
uinput.BTN_TOUCH
|
|
]
|
|
UINPUT_DEVICE_EVENTS_MOUSE_REL = [
|
|
uinput.REL_X,
|
|
uinput.REL_Y,
|
|
uinput.BTN_MOUSE,
|
|
uinput.BTN_LEFT,
|
|
uinput.BTN_RIGHT
|
|
]
|
|
|
|
|
|
def _get_uinput_device_keyboard():
|
|
"""
|
|
Lazy initialize device and return it. We don't want to create a device
|
|
during build_packages or for tests that don't need it, hence init with None.
|
|
"""
|
|
global uinput_device_keyboard
|
|
if uinput_device_keyboard is None:
|
|
uinput_device_keyboard = uinput.Device(UINPUT_DEVICE_EVENTS_KEYBOARD)
|
|
return uinput_device_keyboard
|
|
|
|
|
|
def _get_uinput_device_mouse_rel():
|
|
"""
|
|
Lazy initialize device and return it. We don't want to create a device
|
|
during build_packages or for tests that don't need it, hence init with None.
|
|
"""
|
|
global uinput_device_mouse_rel
|
|
if uinput_device_mouse_rel is None:
|
|
uinput_device_mouse_rel = uinput.Device(UINPUT_DEVICE_EVENTS_MOUSE_REL)
|
|
return uinput_device_mouse_rel
|
|
|
|
|
|
def _get_uinput_device_touch():
|
|
"""
|
|
Lazy initialize device and return it. We don't want to create a device
|
|
during build_packages or for tests that don't need it, hence init with None.
|
|
"""
|
|
global uinput_device_touch
|
|
if uinput_device_touch is None:
|
|
uinput_device_touch = uinput.Device(UINPUT_DEVICE_EVENTS_TOUCH)
|
|
return uinput_device_touch
|
|
|
|
|
|
def _uinput_translate_name(event_name):
|
|
"""
|
|
Translates string |event_name| to uinput event.
|
|
"""
|
|
return getattr(uinput, event_name)
|
|
|
|
|
|
def _uinput_emit(device, event_name, value, syn=True):
|
|
"""
|
|
Wrapper for uinput.emit. Emits event with value.
|
|
Example: ('REL_X', 20), ('BTN_RIGHT', 1)
|
|
"""
|
|
event = _uinput_translate_name(event_name)
|
|
device.emit(event, value, syn)
|
|
|
|
|
|
def _uinput_emit_click(device, event_name, syn=True):
|
|
"""
|
|
Wrapper for uinput.emit_click. Emits click event. Only KEY and BTN events
|
|
are accepted, otherwise ValueError is raised. Example: 'KEY_A'
|
|
"""
|
|
event = _uinput_translate_name(event_name)
|
|
device.emit_click(event, syn)
|
|
|
|
|
|
def _uinput_emit_combo(device, event_names, syn=True):
|
|
"""
|
|
Wrapper for uinput.emit_combo. Emits sequence of events.
|
|
Example: ['KEY_LEFTCTRL', 'KEY_LEFTALT', 'KEY_F5']
|
|
"""
|
|
events = [_uinput_translate_name(en) for en in event_names]
|
|
device.emit_combo(events, syn)
|
|
|
|
|
|
def press_keys(key_list):
|
|
"""Presses the given keys as one combination.
|
|
|
|
Please do not leak uinput dependencies outside of the file.
|
|
|
|
@param key: A list of key strings, e.g. ['LEFTCTRL', 'F4']
|
|
"""
|
|
_uinput_emit_combo(_get_uinput_device_keyboard(), key_list)
|
|
|
|
|
|
def click_mouse():
|
|
"""Just click the mouse.
|
|
Presumably only hacky tests use this function.
|
|
"""
|
|
logging.info('click_mouse()')
|
|
# Move a little to make the cursor appear.
|
|
device = _get_uinput_device_mouse_rel()
|
|
_uinput_emit(device, 'REL_X', 1)
|
|
# Some sleeping is needed otherwise events disappear.
|
|
time.sleep(0.1)
|
|
# Move cursor back to not drift.
|
|
_uinput_emit(device, 'REL_X', -1)
|
|
time.sleep(0.1)
|
|
# Click down.
|
|
_uinput_emit(device, 'BTN_LEFT', 1)
|
|
time.sleep(0.2)
|
|
# Release click.
|
|
_uinput_emit(device, 'BTN_LEFT', 0)
|
|
|
|
|
|
# TODO(ihf): this function is broken. Make it work.
|
|
def activate_focus_at(rel_x, rel_y):
|
|
"""Clicks with the mouse at screen position (x, y).
|
|
|
|
This is a pretty hacky method. Using this will probably lead to
|
|
flaky tests as page layout changes over time.
|
|
@param rel_x: relative horizontal position between 0 and 1.
|
|
@param rel_y: relattive vertical position between 0 and 1.
|
|
"""
|
|
width, height = get_internal_resolution()
|
|
device = _get_uinput_device_touch()
|
|
_uinput_emit(device, 'ABS_MT_SLOT', 0, syn=False)
|
|
_uinput_emit(device, 'ABS_MT_TRACKING_ID', 1, syn=False)
|
|
_uinput_emit(device, 'ABS_MT_POSITION_X', int(rel_x * width), syn=False)
|
|
_uinput_emit(device, 'ABS_MT_POSITION_Y', int(rel_y * height), syn=False)
|
|
_uinput_emit(device, 'BTN_TOUCH', 1, syn=True)
|
|
time.sleep(0.2)
|
|
_uinput_emit(device, 'BTN_TOUCH', 0, syn=True)
|
|
|
|
|
|
def take_screenshot(resultsdir, fname_prefix, extension='png'):
|
|
"""Take screenshot and save to a new file in the results dir.
|
|
Args:
|
|
@param resultsdir: Directory to store the output in.
|
|
@param fname_prefix: Prefix for the output fname.
|
|
@param extension: String indicating file format ('png', 'jpg', etc).
|
|
Returns:
|
|
the path of the saved screenshot file
|
|
"""
|
|
|
|
old_exc_type = sys.exc_info()[0]
|
|
|
|
next_index = len(glob.glob(
|
|
os.path.join(resultsdir, '%s-*.%s' % (fname_prefix, extension))))
|
|
screenshot_file = os.path.join(
|
|
resultsdir, '%s-%d.%s' % (fname_prefix, next_index, extension))
|
|
logging.info('Saving screenshot to %s.', screenshot_file)
|
|
|
|
try:
|
|
image = gbm.crtcScreenshot()
|
|
image.save(screenshot_file)
|
|
except Exception as err:
|
|
# Do not raise an exception if the screenshot fails while processing
|
|
# another exception.
|
|
if old_exc_type is None:
|
|
raise
|
|
logging.error(err)
|
|
|
|
return screenshot_file
|
|
|
|
|
|
def take_screenshot_crop_by_height(fullpath, final_height, x_offset_pixels,
|
|
y_offset_pixels):
|
|
"""
|
|
Take a screenshot, crop to final height starting at given (x, y) coordinate.
|
|
Image width will be adjusted to maintain original aspect ratio).
|
|
|
|
@param fullpath: path, fullpath of the file that will become the image file.
|
|
@param final_height: integer, height in pixels of resulting image.
|
|
@param x_offset_pixels: integer, number of pixels from left margin
|
|
to begin cropping.
|
|
@param y_offset_pixels: integer, number of pixels from top margin
|
|
to begin cropping.
|
|
"""
|
|
image = gbm.crtcScreenshot()
|
|
image.crop()
|
|
width, height = image.size
|
|
# Preserve aspect ratio: Wf / Wi == Hf / Hi
|
|
final_width = int(width * (float(final_height) / height))
|
|
box = (x_offset_pixels, y_offset_pixels,
|
|
x_offset_pixels + final_width, y_offset_pixels + final_height)
|
|
cropped = image.crop(box)
|
|
cropped.save(fullpath)
|
|
return fullpath
|
|
|
|
|
|
def take_screenshot_crop_x(fullpath, box=None):
|
|
"""
|
|
Take a screenshot using import tool, crop according to dim given by the box.
|
|
@param fullpath: path, full path to save the image to.
|
|
@param box: 4-tuple giving the upper left and lower right pixel coordinates.
|
|
"""
|
|
|
|
if box:
|
|
img_w, img_h, upperx, uppery = box
|
|
cmd = ('/usr/local/bin/import -window root -depth 8 -crop '
|
|
'%dx%d+%d+%d' % (img_w, img_h, upperx, uppery))
|
|
else:
|
|
cmd = ('/usr/local/bin/import -window root -depth 8')
|
|
|
|
old_exc_type = sys.exc_info()[0]
|
|
try:
|
|
utils.system('%s %s' % (cmd, fullpath))
|
|
except Exception as err:
|
|
# Do not raise an exception if the screenshot fails while processing
|
|
# another exception.
|
|
if old_exc_type is None:
|
|
raise
|
|
logging.error(err)
|
|
|
|
|
|
def take_screenshot_crop(fullpath, box=None, crtc_id=None):
|
|
"""
|
|
Take a screenshot using import tool, crop according to dim given by the box.
|
|
@param fullpath: path, full path to save the image to.
|
|
@param box: 4-tuple giving the upper left and lower right pixel coordinates.
|
|
"""
|
|
if crtc_id is not None:
|
|
image = gbm.crtcScreenshot(crtc_id)
|
|
else:
|
|
image = gbm.crtcScreenshot(get_internal_crtc())
|
|
if box:
|
|
image = image.crop(box)
|
|
image.save(fullpath)
|
|
return fullpath
|
|
|
|
|
|
_MODETEST_CONNECTOR_PATTERN = re.compile(
|
|
r'^(\d+)\s+\d+\s+(connected|disconnected)\s+(\S+)\s+\d+x\d+\s+\d+\s+\d+')
|
|
|
|
_MODETEST_MODE_PATTERN = re.compile(
|
|
r'\s+.+\d+\s+(\d+)\s+\d+\s+\d+\s+\d+\s+(\d+)\s+\d+\s+\d+\s+\d+\s+flags:.+type:'
|
|
r' preferred')
|
|
|
|
_MODETEST_CRTCS_START_PATTERN = re.compile(r'^id\s+fb\s+pos\s+size')
|
|
|
|
_MODETEST_CRTC_PATTERN = re.compile(
|
|
r'^(\d+)\s+(\d+)\s+\((\d+),(\d+)\)\s+\((\d+)x(\d+)\)')
|
|
|
|
Connector = collections.namedtuple(
|
|
'Connector', [
|
|
'cid', # connector id (integer)
|
|
'ctype', # connector type, e.g. 'eDP', 'HDMI-A', 'DP'
|
|
'connected', # boolean
|
|
'size', # current screen size, e.g. (1024, 768)
|
|
'encoder', # encoder id (integer)
|
|
# list of resolution tuples, e.g. [(1920,1080), (1600,900), ...]
|
|
'modes',
|
|
])
|
|
|
|
CRTC = collections.namedtuple(
|
|
'CRTC', [
|
|
'id', # crtc id
|
|
'fb', # fb id
|
|
'pos', # position, e.g. (0,0)
|
|
'size', # size, e.g. (1366,768)
|
|
])
|
|
|
|
|
|
def get_display_resolution():
|
|
"""
|
|
Parses output of modetest to determine the display resolution of the dut.
|
|
@return: tuple, (w,h) resolution of device under test.
|
|
"""
|
|
connectors = get_modetest_connectors()
|
|
for connector in connectors:
|
|
if connector.connected:
|
|
return connector.size
|
|
return None
|
|
|
|
|
|
def _get_num_outputs_connected():
|
|
"""
|
|
Parses output of modetest to determine the number of connected displays
|
|
@return: The number of connected displays
|
|
"""
|
|
connected = 0
|
|
connectors = get_modetest_connectors()
|
|
for connector in connectors:
|
|
if connector.connected:
|
|
connected = connected + 1
|
|
|
|
return connected
|
|
|
|
|
|
def get_num_outputs_on():
|
|
"""
|
|
Retrieves the number of connected outputs that are on.
|
|
|
|
Return value: integer value of number of connected outputs that are on.
|
|
"""
|
|
|
|
return _get_num_outputs_connected()
|
|
|
|
|
|
def get_modetest_connectors():
|
|
"""
|
|
Retrieves a list of Connectors using modetest.
|
|
|
|
Return value: List of Connectors.
|
|
"""
|
|
connectors = []
|
|
modetest_output = utils.system_output('modetest -c')
|
|
for line in modetest_output.splitlines():
|
|
# First search for a new connector.
|
|
connector_match = re.match(_MODETEST_CONNECTOR_PATTERN, line)
|
|
if connector_match is not None:
|
|
cid = int(connector_match.group(1))
|
|
connected = False
|
|
if connector_match.group(2) == 'connected':
|
|
connected = True
|
|
ctype = connector_match.group(3)
|
|
size = (-1, -1)
|
|
encoder = -1
|
|
modes = None
|
|
connectors.append(
|
|
Connector(cid, ctype, connected, size, encoder, modes))
|
|
else:
|
|
# See if we find corresponding line with modes, sizes etc.
|
|
mode_match = re.match(_MODETEST_MODE_PATTERN, line)
|
|
if mode_match is not None:
|
|
size = (int(mode_match.group(1)), int(mode_match.group(2)))
|
|
# Update display size of last connector in list.
|
|
c = connectors.pop()
|
|
connectors.append(
|
|
Connector(
|
|
c.cid, c.ctype, c.connected, size, c.encoder,
|
|
c.modes))
|
|
return connectors
|
|
|
|
|
|
def get_modetest_crtcs():
|
|
"""
|
|
Returns a list of CRTC data.
|
|
|
|
Sample:
|
|
[CRTC(id=19, fb=50, pos=(0, 0), size=(1366, 768)),
|
|
CRTC(id=22, fb=54, pos=(0, 0), size=(1920, 1080))]
|
|
"""
|
|
crtcs = []
|
|
modetest_output = utils.system_output('modetest -p')
|
|
found = False
|
|
for line in modetest_output.splitlines():
|
|
if found:
|
|
crtc_match = re.match(_MODETEST_CRTC_PATTERN, line)
|
|
if crtc_match is not None:
|
|
crtc_id = int(crtc_match.group(1))
|
|
fb = int(crtc_match.group(2))
|
|
x = int(crtc_match.group(3))
|
|
y = int(crtc_match.group(4))
|
|
width = int(crtc_match.group(5))
|
|
height = int(crtc_match.group(6))
|
|
# CRTCs with fb=0 are disabled, but lets skip anything with
|
|
# trivial width/height just in case.
|
|
if not (fb == 0 or width == 0 or height == 0):
|
|
crtcs.append(CRTC(crtc_id, fb, (x, y), (width, height)))
|
|
elif line and not line[0].isspace():
|
|
return crtcs
|
|
if re.match(_MODETEST_CRTCS_START_PATTERN, line) is not None:
|
|
found = True
|
|
return crtcs
|
|
|
|
|
|
def get_modetest_output_state():
|
|
"""
|
|
Reduce the output of get_modetest_connectors to a dictionary of connector/active states.
|
|
"""
|
|
connectors = get_modetest_connectors()
|
|
outputs = {}
|
|
for connector in connectors:
|
|
# TODO(ihf): Figure out why modetest output needs filtering.
|
|
if connector.connected:
|
|
outputs[connector.ctype] = connector.connected
|
|
return outputs
|
|
|
|
|
|
def get_output_rect(output):
|
|
"""Gets the size and position of the given output on the screen buffer.
|
|
|
|
@param output: The output name as a string.
|
|
|
|
@return A tuple of the rectangle (width, height, fb_offset_x,
|
|
fb_offset_y) of ints.
|
|
"""
|
|
connectors = get_modetest_connectors()
|
|
for connector in connectors:
|
|
if connector.ctype == output:
|
|
# Concatenate two 2-tuples to 4-tuple.
|
|
return connector.size + (0, 0) # TODO(ihf): Should we use CRTC.pos?
|
|
return (0, 0, 0, 0)
|
|
|
|
|
|
def get_internal_resolution():
|
|
if has_internal_display():
|
|
crtcs = get_modetest_crtcs()
|
|
if len(crtcs) > 0:
|
|
return crtcs[0].size
|
|
return (-1, -1)
|
|
|
|
|
|
def has_internal_display():
|
|
"""Checks whether the DUT is equipped with an internal display.
|
|
|
|
@return True if internal display is present; False otherwise.
|
|
"""
|
|
return bool(get_internal_connector_name())
|
|
|
|
|
|
def get_external_resolution():
|
|
"""Gets the resolution of the external display.
|
|
|
|
@return A tuple of (width, height) or None if no external display is
|
|
connected.
|
|
"""
|
|
offset = 1 if has_internal_display() else 0
|
|
crtcs = get_modetest_crtcs()
|
|
if len(crtcs) > offset and crtcs[offset].size != (0, 0):
|
|
return crtcs[offset].size
|
|
return None
|
|
|
|
|
|
def get_display_output_state():
|
|
"""
|
|
Retrieves output status of connected display(s).
|
|
|
|
Return value: dictionary of connected display states.
|
|
"""
|
|
return get_modetest_output_state()
|
|
|
|
|
|
def set_modetest_output(output_name, enable):
|
|
# TODO(ihf): figure out what to do here. Don't think this is the right command.
|
|
# modetest -s <connector_id>[,<connector_id>][@<crtc_id>]:<mode>[-<vrefresh>][@<format>] set a mode
|
|
pass
|
|
|
|
|
|
def set_display_output(output_name, enable):
|
|
"""
|
|
Sets the output given by |output_name| on or off.
|
|
"""
|
|
set_modetest_output(output_name, enable)
|
|
|
|
|
|
# TODO(ihf): Fix this for multiple external connectors.
|
|
def get_external_crtc(index=0):
|
|
offset = 1 if has_internal_display() else 0
|
|
crtcs = get_modetest_crtcs()
|
|
if len(crtcs) > offset + index:
|
|
return crtcs[offset + index].id
|
|
return -1
|
|
|
|
|
|
def get_internal_crtc():
|
|
if has_internal_display():
|
|
crtcs = get_modetest_crtcs()
|
|
if len(crtcs) > 0:
|
|
return crtcs[0].id
|
|
return -1
|
|
|
|
|
|
# TODO(ihf): Fix this for multiple external connectors.
|
|
def get_external_connector_name():
|
|
"""Gets the name of the external output connector.
|
|
|
|
@return The external output connector name as a string, if any.
|
|
Otherwise, return False.
|
|
"""
|
|
outputs = get_display_output_state()
|
|
for output in outputs.iterkeys():
|
|
if outputs[output] and (output.startswith('HDMI')
|
|
or output.startswith('DP')
|
|
or output.startswith('DVI')
|
|
or output.startswith('VGA')):
|
|
return output
|
|
return False
|
|
|
|
|
|
def get_internal_connector_name():
|
|
"""Gets the name of the internal output connector.
|
|
|
|
@return The internal output connector name as a string, if any.
|
|
Otherwise, return False.
|
|
"""
|
|
outputs = get_display_output_state()
|
|
for output in outputs.iterkeys():
|
|
# reference: chromium_org/chromeos/display/output_util.cc
|
|
if (output.startswith('eDP')
|
|
or output.startswith('LVDS')
|
|
or output.startswith('DSI')):
|
|
return output
|
|
return False
|
|
|
|
|
|
def wait_output_connected(output):
|
|
"""Wait for output to connect.
|
|
|
|
@param output: The output name as a string.
|
|
|
|
@return: True if output is connected; False otherwise.
|
|
"""
|
|
def _is_connected(output):
|
|
"""Helper function."""
|
|
outputs = get_display_output_state()
|
|
if output not in outputs:
|
|
return False
|
|
return outputs[output]
|
|
|
|
return utils.wait_for_value(lambda: _is_connected(output),
|
|
expected_value=True)
|
|
|
|
|
|
def set_content_protection(output_name, state):
|
|
"""
|
|
Sets the content protection to the given state.
|
|
|
|
@param output_name: The output name as a string.
|
|
@param state: One of the states 'Undesired', 'Desired', or 'Enabled'
|
|
|
|
"""
|
|
raise error.TestFail('freon: set_content_protection not implemented')
|
|
|
|
|
|
def get_content_protection(output_name):
|
|
"""
|
|
Gets the state of the content protection.
|
|
|
|
@param output_name: The output name as a string.
|
|
@return: A string of the state, like 'Undesired', 'Desired', or 'Enabled'.
|
|
False if not supported.
|
|
|
|
"""
|
|
raise error.TestFail('freon: get_content_protection not implemented')
|
|
|
|
|
|
def is_sw_rasterizer():
|
|
"""Return true if OpenGL is using a software rendering."""
|
|
cmd = utils.wflinfo_cmd() + ' | grep "OpenGL renderer string"'
|
|
output = utils.run(cmd)
|
|
result = output.stdout.splitlines()[0]
|
|
logging.info('wflinfo: %s', result)
|
|
# TODO(ihf): Find exhaustive error conditions (especially ARM).
|
|
return 'llvmpipe' in result.lower() or 'soft' in result.lower()
|
|
|
|
|
|
def get_gles_version():
|
|
cmd = utils.wflinfo_cmd()
|
|
wflinfo = utils.system_output(cmd, retain_output=False, ignore_status=False)
|
|
# OpenGL version string: OpenGL ES 3.0 Mesa 10.5.0-devel
|
|
version = re.findall(r'OpenGL version string: '
|
|
r'OpenGL ES ([0-9]+).([0-9]+)', wflinfo)
|
|
if version:
|
|
version_major = int(version[0][0])
|
|
version_minor = int(version[0][1])
|
|
return (version_major, version_minor)
|
|
return (None, None)
|
|
|
|
|
|
def get_egl_version():
|
|
cmd = 'eglinfo'
|
|
eglinfo = utils.system_output(cmd, retain_output=False, ignore_status=False)
|
|
# EGL version string: 1.4 (DRI2)
|
|
version = re.findall(r'EGL version string: ([0-9]+).([0-9]+)', eglinfo)
|
|
if version:
|
|
version_major = int(version[0][0])
|
|
version_minor = int(version[0][1])
|
|
return (version_major, version_minor)
|
|
return (None, None)
|
|
|
|
|
|
class GraphicsKernelMemory(object):
|
|
"""
|
|
Reads from sysfs to determine kernel gem objects and memory info.
|
|
"""
|
|
# These are sysfs fields that will be read by this test. For different
|
|
# architectures, the sysfs field paths are different. The "paths" are given
|
|
# as lists of strings because the actual path may vary depending on the
|
|
# system. This test will read from the first sysfs path in the list that is
|
|
# present.
|
|
# e.g. ".../memory" vs ".../gpu_memory" -- if the system has either one of
|
|
# these, the test will read from that path.
|
|
amdgpu_fields = {
|
|
'gem_objects': ['/sys/kernel/debug/dri/0/amdgpu_gem_info'],
|
|
'memory': ['/sys/kernel/debug/dri/0/amdgpu_gtt_mm'],
|
|
}
|
|
arm_fields = {}
|
|
exynos_fields = {
|
|
'gem_objects': ['/sys/kernel/debug/dri/?/exynos_gem_objects'],
|
|
'memory': ['/sys/class/misc/mali0/device/memory',
|
|
'/sys/class/misc/mali0/device/gpu_memory'],
|
|
}
|
|
mediatek_fields = {} # TODO(crosbug.com/p/58189) add nodes
|
|
# TODO Add memory nodes once the GPU patches landed.
|
|
rockchip_fields = {}
|
|
tegra_fields = {
|
|
'memory': ['/sys/kernel/debug/memblock/memory'],
|
|
}
|
|
i915_fields = {
|
|
'gem_objects': ['/sys/kernel/debug/dri/0/i915_gem_objects'],
|
|
'memory': ['/sys/kernel/debug/dri/0/i915_gem_gtt'],
|
|
}
|
|
|
|
arch_fields = {
|
|
'amdgpu': amdgpu_fields,
|
|
'arm': arm_fields,
|
|
'exynos5': exynos_fields,
|
|
'i915': i915_fields,
|
|
'mediatek': mediatek_fields,
|
|
'rockchip': rockchip_fields,
|
|
'tegra': tegra_fields,
|
|
}
|
|
|
|
num_errors = 0
|
|
|
|
def __init__(self):
|
|
self._initial_memory = self.get_memory_keyvals()
|
|
|
|
def get_memory_difference_keyvals(self):
|
|
"""
|
|
Reads the graphics memory values and return the difference between now
|
|
and the memory usage at initialization stage as keyvals.
|
|
"""
|
|
current_memory = self.get_memory_keyvals()
|
|
return {key: self._initial_memory[key] - current_memory[key]
|
|
for key in self._initial_memory}
|
|
|
|
def get_memory_keyvals(self):
|
|
"""
|
|
Reads the graphics memory values and returns them as keyvals.
|
|
"""
|
|
keyvals = {}
|
|
|
|
# Get architecture type and list of sysfs fields to read.
|
|
soc = utils.get_cpu_soc_family()
|
|
|
|
arch = utils.get_cpu_arch()
|
|
if arch == 'x86_64' or arch == 'i386':
|
|
pci_vga_device = utils.run("lspci | grep VGA").stdout.rstrip('\n')
|
|
if "Advanced Micro Devices" in pci_vga_device:
|
|
soc = 'amdgpu'
|
|
elif "Intel Corporation" in pci_vga_device:
|
|
soc = 'i915'
|
|
|
|
if not soc in self.arch_fields:
|
|
raise error.TestFail('Error: Architecture "%s" not yet supported.' % soc)
|
|
fields = self.arch_fields[soc]
|
|
|
|
for field_name in fields:
|
|
possible_field_paths = fields[field_name]
|
|
field_value = None
|
|
for path in possible_field_paths:
|
|
if utils.system('ls %s' % path):
|
|
continue
|
|
field_value = utils.system_output('cat %s' % path)
|
|
break
|
|
|
|
if not field_value:
|
|
logging.error('Unable to find any sysfs paths for field "%s"',
|
|
field_name)
|
|
self.num_errors += 1
|
|
continue
|
|
|
|
parsed_results = GraphicsKernelMemory._parse_sysfs(field_value)
|
|
|
|
for key in parsed_results:
|
|
keyvals['%s_%s' % (field_name, key)] = parsed_results[key]
|
|
|
|
if 'bytes' in parsed_results and parsed_results['bytes'] == 0:
|
|
logging.error('%s reported 0 bytes', field_name)
|
|
self.num_errors += 1
|
|
|
|
keyvals['meminfo_MemUsed'] = (utils.read_from_meminfo('MemTotal') -
|
|
utils.read_from_meminfo('MemFree'))
|
|
keyvals['meminfo_SwapUsed'] = (utils.read_from_meminfo('SwapTotal') -
|
|
utils.read_from_meminfo('SwapFree'))
|
|
return keyvals
|
|
|
|
@staticmethod
|
|
def _parse_sysfs(output):
|
|
"""
|
|
Parses output of graphics memory sysfs to determine the number of
|
|
buffer objects and bytes.
|
|
|
|
Arguments:
|
|
output Unprocessed sysfs output
|
|
Return value:
|
|
Dictionary containing integer values of number bytes and objects.
|
|
They may have the keys 'bytes' and 'objects', respectively. However
|
|
the result may not contain both of these values.
|
|
"""
|
|
results = {}
|
|
labels = ['bytes', 'objects']
|
|
|
|
for line in output.split('\n'):
|
|
# Strip any commas to make parsing easier.
|
|
line_words = line.replace(',', '').split()
|
|
|
|
prev_word = None
|
|
for word in line_words:
|
|
# When a label has been found, the previous word should be the
|
|
# value. e.g. "3200 bytes"
|
|
if word in labels and word not in results and prev_word:
|
|
logging.info(prev_word)
|
|
results[word] = int(prev_word)
|
|
|
|
prev_word = word
|
|
|
|
# Once all values has been parsed, return.
|
|
if len(results) == len(labels):
|
|
return results
|
|
|
|
return results
|
|
|
|
|
|
class GraphicsStateChecker(object):
|
|
"""
|
|
Analyzes the state of the GPU and log history. Should be instantiated at the
|
|
beginning of each graphics_* test.
|
|
"""
|
|
crash_blacklist = []
|
|
dirty_writeback_centisecs = 0
|
|
existing_hangs = {}
|
|
|
|
_BROWSER_VERSION_COMMAND = '/opt/google/chrome/chrome --version'
|
|
_HANGCHECK = ['drm:i915_hangcheck_elapsed', 'drm:i915_hangcheck_hung',
|
|
'Hangcheck timer elapsed...']
|
|
_HANGCHECK_WARNING = ['render ring idle']
|
|
_MESSAGES_FILE = '/var/log/messages'
|
|
|
|
def __init__(self, raise_error_on_hang=True):
|
|
"""
|
|
Analyzes the initial state of the GPU and log history.
|
|
"""
|
|
# Attempt flushing system logs every second instead of every 10 minutes.
|
|
self.dirty_writeback_centisecs = utils.get_dirty_writeback_centisecs()
|
|
utils.set_dirty_writeback_centisecs(100)
|
|
self._raise_error_on_hang = raise_error_on_hang
|
|
logging.info(utils.get_board_with_frequency_and_memory())
|
|
self.graphics_kernel_memory = GraphicsKernelMemory()
|
|
|
|
if utils.get_cpu_arch() != 'arm':
|
|
if is_sw_rasterizer():
|
|
raise error.TestFail('Refusing to run on SW rasterizer.')
|
|
logging.info('Initialize: Checking for old GPU hangs...')
|
|
messages = open(self._MESSAGES_FILE, 'r')
|
|
for line in messages:
|
|
for hang in self._HANGCHECK:
|
|
if hang in line:
|
|
logging.info(line)
|
|
self.existing_hangs[line] = line
|
|
messages.close()
|
|
|
|
def finalize(self):
|
|
"""
|
|
Analyzes the state of the GPU, log history and emits warnings or errors
|
|
if the state changed since initialize. Also makes a note of the Chrome
|
|
version for later usage in the perf-dashboard.
|
|
"""
|
|
utils.set_dirty_writeback_centisecs(self.dirty_writeback_centisecs)
|
|
new_gpu_hang = False
|
|
new_gpu_warning = False
|
|
if utils.get_cpu_arch() != 'arm':
|
|
logging.info('Cleanup: Checking for new GPU hangs...')
|
|
messages = open(self._MESSAGES_FILE, 'r')
|
|
for line in messages:
|
|
for hang in self._HANGCHECK:
|
|
if hang in line:
|
|
if not line in self.existing_hangs.keys():
|
|
logging.info(line)
|
|
for warn in self._HANGCHECK_WARNING:
|
|
if warn in line:
|
|
new_gpu_warning = True
|
|
logging.warning(
|
|
'Saw GPU hang warning during test.')
|
|
else:
|
|
logging.warning('Saw GPU hang during test.')
|
|
new_gpu_hang = True
|
|
messages.close()
|
|
|
|
if is_sw_rasterizer():
|
|
logging.warning('Finished test on SW rasterizer.')
|
|
raise error.TestFail('Finished test on SW rasterizer.')
|
|
if self._raise_error_on_hang and new_gpu_hang:
|
|
raise error.TestError('Detected GPU hang during test.')
|
|
if new_gpu_hang:
|
|
raise error.TestWarn('Detected GPU hang during test.')
|
|
if new_gpu_warning:
|
|
raise error.TestWarn('Detected GPU warning during test.')
|
|
|
|
def get_memory_access_errors(self):
|
|
""" Returns the number of errors while reading memory stats. """
|
|
return self.graphics_kernel_memory.num_errors
|
|
|
|
def get_memory_difference_keyvals(self):
|
|
return self.graphics_kernel_memory.get_memory_difference_keyvals()
|
|
|
|
def get_memory_keyvals(self):
|
|
""" Returns memory stats. """
|
|
return self.graphics_kernel_memory.get_memory_keyvals()
|
|
|
|
class GraphicsApiHelper(object):
|
|
"""
|
|
Report on the available graphics APIs.
|
|
Ex. gles2, gles3, gles31, and vk
|
|
"""
|
|
_supported_apis = []
|
|
|
|
DEQP_BASEDIR = os.path.join('/usr', 'local', 'deqp')
|
|
DEQP_EXECUTABLE = {
|
|
'gles2': os.path.join('modules', 'gles2', 'deqp-gles2'),
|
|
'gles3': os.path.join('modules', 'gles3', 'deqp-gles3'),
|
|
'gles31': os.path.join('modules', 'gles31', 'deqp-gles31'),
|
|
'vk': os.path.join('external', 'vulkancts', 'modules',
|
|
'vulkan', 'deqp-vk')
|
|
}
|
|
|
|
def __init__(self):
|
|
# Determine which executable should be run. Right now never egl.
|
|
major, minor = get_gles_version()
|
|
logging.info('Found gles%d.%d.', major, minor)
|
|
if major is None or minor is None:
|
|
raise error.TestFail(
|
|
'Failed: Could not get gles version information (%d, %d).' %
|
|
(major, minor)
|
|
)
|
|
if major >= 2:
|
|
self._supported_apis.append('gles2')
|
|
if major >= 3:
|
|
self._supported_apis.append('gles3')
|
|
if major > 3 or minor >= 1:
|
|
self._supported_apis.append('gles31')
|
|
|
|
# If libvulkan is installed, then assume the board supports vulkan.
|
|
has_libvulkan = False
|
|
for libdir in ('/usr/lib', '/usr/lib64',
|
|
'/usr/local/lib', '/usr/local/lib64'):
|
|
if os.path.exists(os.path.join(libdir, 'libvulkan.so')):
|
|
has_libvulkan = True
|
|
|
|
if has_libvulkan:
|
|
executable_path = os.path.join(
|
|
self.DEQP_BASEDIR,
|
|
self.DEQP_EXECUTABLE['vk']
|
|
)
|
|
if os.path.exists(executable_path):
|
|
self._supported_apis.append('vk')
|
|
else:
|
|
logging.warning('Found libvulkan.so but did not find deqp-vk '
|
|
'binary for testing.')
|
|
|
|
def get_supported_apis(self):
|
|
"""Return the list of supported apis. eg. gles2, gles3, vk etc.
|
|
@returns: a copy of the supported api list will be returned
|
|
"""
|
|
return list(self._supported_apis)
|
|
|
|
def get_deqp_executable(self, api):
|
|
"""Return the path to the api executable."""
|
|
if api not in self.DEQP_EXECUTABLE:
|
|
raise KeyError(
|
|
"%s is not a supported api for GraphicsApiHelper." % api
|
|
)
|
|
|
|
executable = os.path.join(
|
|
self.DEQP_BASEDIR,
|
|
self.DEQP_EXECUTABLE[api]
|
|
)
|
|
return executable
|