160 lines
5.6 KiB
Python
Executable file
160 lines
5.6 KiB
Python
Executable file
#!/usr/bin/python
|
|
#
|
|
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
import datetime, unittest
|
|
|
|
import mox
|
|
|
|
import common
|
|
|
|
# We want to import setup_django_lite_environment first so that the database
|
|
# is setup correctly.
|
|
from autotest_lib.frontend import setup_django_lite_environment
|
|
from autotest_lib.client.common_lib import utils
|
|
from autotest_lib.frontend.afe import models, rpc_interface
|
|
from django import test
|
|
from autotest_lib.server.cros import repair_utils
|
|
|
|
|
|
# See complete_failures_functional_tests.py for why we need this.
|
|
class MockDatetime(datetime.datetime):
|
|
"""Used to mock out parts of datetime.datetime."""
|
|
pass
|
|
|
|
|
|
# We use a mock rpc client object so that we instead directly use the server.
|
|
class MockAFE():
|
|
"""Used to mock out the rpc client."""
|
|
def run(self, func, **kwargs):
|
|
"""
|
|
The fake run call directly contacts the server.
|
|
|
|
@param func: The name of the remote function that is being called.
|
|
|
|
@param kwargs: The arguments to the remotely called function.
|
|
"""
|
|
return utils.strip_unicode(getattr(rpc_interface, func)(**kwargs))
|
|
|
|
|
|
class FindProblemTestTests(mox.MoxTestBase, test.TestCase):
|
|
"""Test that we properly find the last ran job."""
|
|
|
|
|
|
def setUp(self):
|
|
super(FindProblemTestTests, self).setUp()
|
|
self.mox.StubOutWithMock(MockDatetime, 'today')
|
|
|
|
self.datetime = datetime.datetime
|
|
datetime.datetime = MockDatetime
|
|
self._orig_cutoff = repair_utils._CUTOFF_AFTER_TIMEOUT_MINS
|
|
self._orig_timeout = repair_utils._DEFAULT_TEST_TIMEOUT_MINS
|
|
|
|
|
|
def tearDown(self):
|
|
repair_utils._DEFAULT_TEST_TIMEOUT_MINS = self._orig_timeout
|
|
repair_utils._CUTOFF_AFTER_TIMEOUT_MINS = self._orig_cutoff
|
|
datetime.datetime = self.datetime
|
|
super(FindProblemTestTests, self).tearDown()
|
|
|
|
|
|
def test_should_get_most_recent_job(self):
|
|
"""Test that, for a given host, we get the last job ran on that host."""
|
|
|
|
host = models.Host(hostname='host')
|
|
host.save()
|
|
|
|
old_job = models.Job(owner='me', name='old_job',
|
|
created_on=datetime.datetime(2012, 1, 1))
|
|
old_job.save()
|
|
old_host_queue_entry = models.HostQueueEntry(
|
|
job=old_job, host=host, status='test',
|
|
started_on=datetime.datetime(2012, 1, 1, 1))
|
|
old_host_queue_entry.save()
|
|
|
|
new_job = models.Job(owner='me', name='new_job',
|
|
created_on=datetime.datetime(2012, 1, 1))
|
|
new_job.save()
|
|
new_host_queue_entry = models.HostQueueEntry(
|
|
job=new_job, host=host, status='test',
|
|
started_on=datetime.datetime(2012, 1, 1, 2))
|
|
new_host_queue_entry.save()
|
|
|
|
mock_rpc = MockAFE()
|
|
datetime.datetime.today().AndReturn(datetime.datetime(2012,1,1))
|
|
repair_utils._DEFAULT_TEST_TIMEOUT_MINS = 1440
|
|
|
|
repair_utils._CUTOFF_AFTER_TIMEOUT_MINS = 60
|
|
|
|
self.mox.ReplayAll()
|
|
result = repair_utils._find_problem_test('host', mock_rpc)
|
|
|
|
self.assertEqual(result['job']['name'], 'new_job')
|
|
|
|
|
|
def test_should_get_job_for_specified_host_only(self):
|
|
"""Test that we only get a job that is for the given host."""
|
|
|
|
correct_job = models.Job(owner='me', name='correct_job',
|
|
created_on=datetime.datetime(2012, 1, 1))
|
|
correct_job.save()
|
|
correct_host = models.Host(hostname='correct_host')
|
|
correct_host.save()
|
|
correct_host_queue_entry = models.HostQueueEntry(
|
|
job=correct_job, host=correct_host, status='test',
|
|
started_on=datetime.datetime(2012, 1, 1, 1))
|
|
correct_host_queue_entry.save()
|
|
|
|
wrong_job = models.Job(owner='me', name='wrong_job',
|
|
created_on=datetime.datetime(2012, 1, 1))
|
|
wrong_job.save()
|
|
wrong_host = models.Host(hostname='wrong_host')
|
|
wrong_host.save()
|
|
wrong_host_queue_entry = models.HostQueueEntry(
|
|
job=wrong_job, host=wrong_host, status='test',
|
|
started_on=datetime.datetime(2012, 1, 1, 2))
|
|
wrong_host_queue_entry.save()
|
|
|
|
mock_rpc = MockAFE()
|
|
datetime.datetime.today().AndReturn(datetime.datetime(2012,1,1))
|
|
repair_utils._DEFAULT_TEST_TIMEOUT_MINS = 1440
|
|
|
|
repair_utils._CUTOFF_AFTER_TIMEOUT_MINS = 60
|
|
|
|
self.mox.ReplayAll()
|
|
result = repair_utils._find_problem_test('correct_host', mock_rpc)
|
|
|
|
self.assertEqual(result['job']['name'], 'correct_job')
|
|
|
|
|
|
def test_return_jobs_ran_soon_after_max_job_runtime(self):
|
|
"""Test that we get jobs that are just past the max runtime."""
|
|
|
|
host = models.Host(hostname='host')
|
|
host.save()
|
|
|
|
new_job = models.Job(owner='me', name='new_job',
|
|
created_on=datetime.datetime(2012, 1, 1, 0, 0))
|
|
new_job.save()
|
|
new_host_queue_entry = models.HostQueueEntry(
|
|
job=new_job, host=host, status='test',
|
|
started_on=datetime.datetime(2012, 1, 1, 2))
|
|
new_host_queue_entry.save()
|
|
|
|
mock_rpc = MockAFE()
|
|
datetime.datetime.today().AndReturn(datetime.datetime(2012, 1, 2, 0,
|
|
30))
|
|
repair_utils._DEFAULT_TEST_TIMEOUT_MINS = 1440
|
|
|
|
repair_utils._CUTOFF_AFTER_TIMEOUT_MINS = 60
|
|
|
|
self.mox.ReplayAll()
|
|
result = repair_utils._find_problem_test('host', mock_rpc)
|
|
|
|
self.assertEqual(result['job']['name'], 'new_job')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|