148 lines
4.1 KiB
Python
148 lines
4.1 KiB
Python
"""The ABAT harness interface
|
|
|
|
The interface as required for ABAT.
|
|
"""
|
|
|
|
__author__ = """Copyright Andy Whitcroft 2006"""
|
|
|
|
from autotest_lib.client.bin import utils
|
|
import os, harness, time, re
|
|
|
|
def autobench_load(fn):
|
|
disks = re.compile(r'^\s*DATS_FREE_DISKS\s*=(.*\S)\s*$')
|
|
parts = re.compile(r'^\s*DATS_FREE_PARTITIONS\s*=(.*\S)\s*$')
|
|
modules = re.compile(r'^\s*INITRD_MODULES\s*=(.*\S)\s*$')
|
|
|
|
conf = {}
|
|
|
|
try:
|
|
fd = file(fn, "r")
|
|
except:
|
|
return conf
|
|
for ln in fd.readlines():
|
|
m = disks.match(ln)
|
|
if m:
|
|
val = m.groups()[0]
|
|
conf['disks'] = val.strip('"').split()
|
|
m = parts.match(ln)
|
|
if m:
|
|
val = m.groups()[0]
|
|
conf['partitions'] = val.strip('"').split()
|
|
m = modules.match(ln)
|
|
if m:
|
|
val = m.groups()[0]
|
|
conf['modules'] = val.strip('"').split()
|
|
fd.close()
|
|
|
|
return conf
|
|
|
|
|
|
class harness_ABAT(harness.harness):
|
|
"""The ABAT server harness
|
|
|
|
Properties:
|
|
job
|
|
The job object for this job
|
|
"""
|
|
|
|
def __init__(self, job, harness_args):
|
|
"""
|
|
job
|
|
The job object for this job
|
|
"""
|
|
self.setup(job)
|
|
|
|
if 'ABAT_STATUS' in os.environ:
|
|
self.status = file(os.environ['ABAT_STATUS'], "w")
|
|
else:
|
|
self.status = None
|
|
|
|
|
|
def __send(self, msg):
|
|
if self.status:
|
|
msg = msg.rstrip()
|
|
self.status.write(msg + "\n")
|
|
self.status.flush()
|
|
|
|
|
|
def __send_status(self, code, subdir, operation, msg):
|
|
self.__send("STATUS %s %s %s %s" % (code, subdir, operation, msg))
|
|
|
|
|
|
def __root_device(self):
|
|
device = None
|
|
root = re.compile(r'^\S*(/dev/\S+).*\s/\s*$')
|
|
|
|
df = utils.system_output('df -lP')
|
|
for line in df.split("\n"):
|
|
m = root.match(line)
|
|
if m:
|
|
device = m.groups()[0]
|
|
|
|
return device
|
|
|
|
|
|
def run_start(self):
|
|
"""A run within this job is starting"""
|
|
self.__send_status('GOOD', '----', '----', 'run starting')
|
|
|
|
# Load up the autobench.conf if it exists.
|
|
conf = autobench_load("/etc/autobench.conf")
|
|
if 'partitions' in conf:
|
|
self.job.config_set('partition.partitions',
|
|
conf['partitions'])
|
|
|
|
# Search the boot loader configuration for the autobench entry,
|
|
# and extract its args.
|
|
args = None
|
|
for entry in self.job.bootloader.get_entries().itervalues():
|
|
if entry['title'].startswith('autobench'):
|
|
args = entry.get('args')
|
|
|
|
if args:
|
|
args = re.sub(r'autobench_args:.*', '', args)
|
|
args = re.sub(r'root=\S*', '', args)
|
|
args += " root=" + self.__root_device()
|
|
|
|
self.job.config_set('boot.default_args', args)
|
|
|
|
# Turn off boot_once semantics.
|
|
self.job.config_set('boot.set_default', True)
|
|
|
|
# For RedHat installs we do not load up the module.conf
|
|
# as they cannot be builtin. Pass them as arguments.
|
|
vendor = utils.get_os_vendor()
|
|
if vendor in ['Red Hat', 'Fedora Core'] and 'modules' in conf:
|
|
args = '--allow-missing'
|
|
for mod in conf['modules']:
|
|
args += " --with " + mod
|
|
self.job.config_set('kernel.mkinitrd_extra_args', args)
|
|
|
|
|
|
def run_reboot(self):
|
|
"""A run within this job is performing a reboot
|
|
(expect continue following reboot)
|
|
"""
|
|
self.__send("REBOOT")
|
|
|
|
|
|
def run_complete(self):
|
|
"""A run within this job is completing (all done)"""
|
|
self.__send("DONE")
|
|
|
|
|
|
def test_status_detail(self, code, subdir, operation, msg, tag,
|
|
optional_fields):
|
|
"""A test within this job is completing (detail)"""
|
|
|
|
# Send the first line with the status code as a STATUS message.
|
|
lines = msg.split("\n")
|
|
self.__send_status(code, subdir, operation, lines[0])
|
|
|
|
|
|
def test_status(self, msg, tag):
|
|
lines = msg.split("\n")
|
|
|
|
# Send each line as a SUMMARY message.
|
|
for line in lines:
|
|
self.__send("SUMMARY :" + line)
|