530 lines
22 KiB
Python
530 lines
22 KiB
Python
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
"""Rdb server module.
|
|
"""
|
|
|
|
import logging
|
|
|
|
import common
|
|
|
|
from django.core import exceptions as django_exceptions
|
|
from django.db.models import fields
|
|
from django.db.models import Q
|
|
from autotest_lib.frontend.afe import models
|
|
from autotest_lib.scheduler import rdb_cache_manager
|
|
from autotest_lib.scheduler import rdb_hosts
|
|
from autotest_lib.scheduler import rdb_requests
|
|
from autotest_lib.scheduler import rdb_utils
|
|
from autotest_lib.server import utils
|
|
|
|
try:
|
|
from chromite.lib import metrics
|
|
except ImportError:
|
|
metrics = utils.metrics_mock
|
|
|
|
|
|
_rdb_timer_name = 'chromeos/autotest/scheduler/rdb/durations/%s'
|
|
_is_master = not utils.is_shard()
|
|
|
|
# Qeury managers: Provide a layer of abstraction over the database by
|
|
# encapsulating common query patterns used by the rdb.
|
|
class BaseHostQueryManager(object):
|
|
"""Base manager for host queries on all hosts.
|
|
"""
|
|
|
|
host_objects = models.Host.objects
|
|
|
|
|
|
def update_hosts(self, host_ids, **kwargs):
|
|
"""Update fields on a hosts.
|
|
|
|
@param host_ids: A list of ids of hosts to update.
|
|
@param kwargs: A key value dictionary corresponding to column, value
|
|
in the host database.
|
|
"""
|
|
self.host_objects.filter(id__in=host_ids).update(**kwargs)
|
|
|
|
|
|
@rdb_hosts.return_rdb_host
|
|
def get_hosts(self, ids):
|
|
"""Get host objects for the given ids.
|
|
|
|
@param ids: The ids for which we need host objects.
|
|
|
|
@returns: A list of RDBServerHostWrapper objects, ordered by host_id.
|
|
"""
|
|
return self.host_objects.filter(id__in=ids).order_by('id')
|
|
|
|
|
|
@rdb_hosts.return_rdb_host
|
|
def find_hosts(self, deps, acls):
|
|
"""Finds valid hosts matching deps, acls.
|
|
|
|
@param deps: A list of dependencies to match.
|
|
@param acls: A list of acls, at least one of which must coincide with
|
|
an acl group the chosen host is in.
|
|
|
|
@return: A set of matching hosts available.
|
|
"""
|
|
hosts_available = self.host_objects.filter(invalid=0)
|
|
queries = [Q(labels__id=dep) for dep in deps]
|
|
queries += [Q(aclgroup__id__in=acls)]
|
|
for query in queries:
|
|
hosts_available = hosts_available.filter(query)
|
|
return set(hosts_available)
|
|
|
|
|
|
class AvailableHostQueryManager(BaseHostQueryManager):
|
|
"""Query manager for requests on un-leased, un-locked hosts.
|
|
"""
|
|
|
|
host_objects = models.Host.leased_objects
|
|
|
|
|
|
# Request Handlers: Used in conjunction with requests in rdb_utils, these
|
|
# handlers acquire hosts for a request and record the acquisition in
|
|
# an response_map dictionary keyed on the request itself, with the host/hosts
|
|
# as values.
|
|
class BaseHostRequestHandler(object):
|
|
"""Handler for requests related to hosts, leased or unleased.
|
|
|
|
This class is only capable of blindly returning host information.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.host_query_manager = BaseHostQueryManager()
|
|
self.response_map = {}
|
|
|
|
|
|
def update_response_map(self, request, response, append=False):
|
|
"""Record a response for a request.
|
|
|
|
The response_map only contains requests that were either satisfied, or
|
|
that ran into an exception. Often this translates to reserving hosts
|
|
against a request. If the rdb hit an exception processing a request, the
|
|
exception gets recorded in the map for the client to reraise.
|
|
|
|
@param response: A response for the request.
|
|
@param request: The request that has reserved these hosts.
|
|
@param append: Boolean, whether to append new hosts in
|
|
|response| for existing request.
|
|
Will not append if existing response is
|
|
a list of exceptions.
|
|
|
|
@raises RDBException: If an empty values is added to the map.
|
|
"""
|
|
if not response:
|
|
raise rdb_utils.RDBException('response_map dict can only contain '
|
|
'valid responses. Request %s, response %s is invalid.' %
|
|
(request, response))
|
|
exist_response = self.response_map.setdefault(request, [])
|
|
if exist_response and not append:
|
|
raise rdb_utils.RDBException('Request %s already has response %s '
|
|
'the rdb cannot return multiple '
|
|
'responses for the same request.' %
|
|
(request, response))
|
|
if exist_response and append and not isinstance(
|
|
exist_response[0], rdb_hosts.RDBHost):
|
|
# Do not append if existing response contains exception.
|
|
return
|
|
exist_response.extend(response)
|
|
|
|
|
|
def _check_response_map(self):
|
|
"""Verify that we never give the same host to different requests.
|
|
|
|
@raises RDBException: If the same host is assigned to multiple requests.
|
|
"""
|
|
unique_hosts = set([])
|
|
for request, response in self.response_map.iteritems():
|
|
# Each value in the response map can only either be a list of
|
|
# RDBHosts or a list of RDBExceptions, not a mix of both.
|
|
if isinstance(response[0], rdb_hosts.RDBHost):
|
|
if any([host in unique_hosts for host in response]):
|
|
raise rdb_utils.RDBException(
|
|
'Assigning the same host to multiple requests. New '
|
|
'hosts %s, request %s, response_map: %s' %
|
|
(response, request, self.response_map))
|
|
else:
|
|
unique_hosts = unique_hosts.union(response)
|
|
|
|
|
|
def _record_exceptions(self, request, exceptions):
|
|
"""Record a list of exceptions for a request.
|
|
|
|
@param request: The request for which the exceptions were hit.
|
|
@param exceptions: The exceptions hit while processing the request.
|
|
"""
|
|
rdb_exceptions = [rdb_utils.RDBException(ex) for ex in exceptions]
|
|
self.update_response_map(request, rdb_exceptions)
|
|
|
|
|
|
def get_response(self):
|
|
"""Convert all RDBServerHostWrapper objects to host info dictionaries.
|
|
|
|
@return: A dictionary mapping requests to a list of matching host_infos.
|
|
|
|
@raises RDBException: If the same host is assigned to multiple requests.
|
|
"""
|
|
self._check_response_map()
|
|
for request, response in self.response_map.iteritems():
|
|
self.response_map[request] = [reply.wire_format()
|
|
for reply in response]
|
|
return self.response_map
|
|
|
|
|
|
def update_hosts(self, update_requests):
|
|
"""Updates host tables with a payload.
|
|
|
|
@param update_requests: A list of update requests, as defined in
|
|
rdb_requests.UpdateHostRequest.
|
|
"""
|
|
# Last payload for a host_id wins in the case of conflicting requests.
|
|
unique_host_requests = {}
|
|
for request in update_requests:
|
|
if unique_host_requests.get(request.host_id):
|
|
unique_host_requests[request.host_id].update(request.payload)
|
|
else:
|
|
unique_host_requests[request.host_id] = request.payload
|
|
|
|
# Batch similar payloads so we can do them in one table scan.
|
|
similar_requests = {}
|
|
for host_id, payload in unique_host_requests.iteritems():
|
|
similar_requests.setdefault(payload, []).append(host_id)
|
|
|
|
# If fields of the update don't match columns in the database,
|
|
# record the exception in the response map. This also means later
|
|
# updates will get applied even if previous updates fail.
|
|
for payload, hosts in similar_requests.iteritems():
|
|
try:
|
|
response = self.host_query_manager.update_hosts(hosts, **payload)
|
|
except (django_exceptions.FieldError,
|
|
fields.FieldDoesNotExist, ValueError) as e:
|
|
for host in hosts:
|
|
# Since update requests have a consistent hash this will map
|
|
# to the same key as the original request.
|
|
request = rdb_requests.UpdateHostRequest(
|
|
host_id=host, payload=payload).get_request()
|
|
self._record_exceptions(request, [e])
|
|
|
|
|
|
def batch_get_hosts(self, host_requests):
|
|
"""Get hosts matching the requests.
|
|
|
|
This method does not acquire the hosts, i.e it reserves hosts against
|
|
requests leaving their leased state untouched.
|
|
|
|
@param host_requests: A list of requests, as defined in
|
|
rdb_utils.BaseHostRequest.
|
|
"""
|
|
host_ids = set([request.host_id for request in host_requests])
|
|
host_map = {}
|
|
|
|
# This list will not contain available hosts if executed using
|
|
# an AvailableHostQueryManager.
|
|
for host in self.host_query_manager.get_hosts(host_ids):
|
|
host_map[host.id] = host
|
|
for request in host_requests:
|
|
if request.host_id in host_map:
|
|
self.update_response_map(request, [host_map[request.host_id]])
|
|
else:
|
|
logging.warning('rdb could not get host for request: %s, it '
|
|
'is already leased or locked', request)
|
|
|
|
|
|
class AvailableHostRequestHandler(BaseHostRequestHandler):
|
|
"""Handler for requests related to available (unleased and unlocked) hosts.
|
|
|
|
This class is capable of acquiring or validating hosts for requests.
|
|
"""
|
|
|
|
|
|
def __init__(self):
|
|
self.host_query_manager = AvailableHostQueryManager()
|
|
self.cache = rdb_cache_manager.RDBHostCacheManager()
|
|
self.response_map = {}
|
|
self.unsatisfied_requests = 0
|
|
self.leased_hosts_count = 0
|
|
self.request_accountant = None
|
|
|
|
|
|
@metrics.SecondsTimerDecorator(_rdb_timer_name % 'lease_hosts')
|
|
def lease_hosts(self, hosts):
|
|
"""Leases a list of hosts.
|
|
|
|
@param hosts: A list of RDBServerHostWrapper instances to lease.
|
|
|
|
@return: The list of RDBServerHostWrappers that were successfully
|
|
leased.
|
|
"""
|
|
#TODO(beeps): crbug.com/353183.
|
|
unleased_hosts = set(hosts)
|
|
leased_hosts = set([])
|
|
for host in unleased_hosts:
|
|
try:
|
|
host.lease()
|
|
except rdb_utils.RDBException as e:
|
|
logging.error('Unable to lease host %s: %s', host.hostname, e)
|
|
else:
|
|
leased_hosts.add(host)
|
|
return list(leased_hosts)
|
|
|
|
|
|
@classmethod
|
|
def valid_host_assignment(cls, request, host):
|
|
"""Check if a host, request pairing is valid.
|
|
|
|
@param request: The request to match against the host.
|
|
@param host: An RDBServerHostWrapper instance.
|
|
|
|
@return: True if the host, request assignment is valid.
|
|
|
|
@raises RDBException: If the request already has another host_ids
|
|
associated with it.
|
|
"""
|
|
if request.host_id and request.host_id != host.id:
|
|
raise rdb_utils.RDBException(
|
|
'Cannot assign a different host for request: %s, it '
|
|
'already has one: %s ' % (request, host.id))
|
|
|
|
# Getting all labels and acls might result in large queries, so
|
|
# bail early if the host is already leased.
|
|
if host.leased:
|
|
return False
|
|
# If a host is invalid it must be a one time host added to the
|
|
# afe specifically for this purpose, so it doesn't require acl checking.
|
|
acl_match = (request.acls.intersection(host.acls) or host.invalid)
|
|
label_match = (request.deps.intersection(host.labels) == request.deps)
|
|
return acl_match and label_match
|
|
|
|
|
|
@classmethod
|
|
def _sort_hosts_by_preferred_deps(cls, hosts, preferred_deps):
|
|
"""Sort hosts in the order of how many preferred deps it has.
|
|
|
|
This allows rdb always choose the hosts with the most preferred deps
|
|
for a request. One important use case is including cros-version as
|
|
a preferred dependence. By choosing a host with the same cros-version,
|
|
we can save the time on provisioning it. Note this is not guaranteed
|
|
if preferred_deps contains other labels as well.
|
|
|
|
@param hosts: A list of hosts to sort.
|
|
@param preferred_deps: A list of deps that are preferred.
|
|
|
|
@return: A list of sorted hosts.
|
|
|
|
"""
|
|
hosts = sorted(
|
|
hosts,
|
|
key=lambda host: len(set(preferred_deps) & set(host.labels)),
|
|
reverse=True)
|
|
return hosts
|
|
|
|
|
|
@rdb_cache_manager.memoize_hosts
|
|
def _acquire_hosts(self, request, hosts_required, is_acquire_min_duts=False,
|
|
**kwargs):
|
|
"""Acquire hosts for a group of similar requests.
|
|
|
|
Find and acquire hosts that can satisfy a group of requests.
|
|
1. If the caching decorator doesn't pass in a list of matching hosts
|
|
via the MEMOIZE_KEY this method will directly check the database for
|
|
matching hosts.
|
|
2. If all matching hosts are not leased for this request, the remaining
|
|
hosts are returned to the caching decorator, to place in the cache.
|
|
|
|
@param hosts_required: Number of hosts required to satisfy request.
|
|
@param request: The request for hosts.
|
|
@param is_acquire_min_duts: Boolean. Indicate whether this is to
|
|
acquire minimum required duts, only used
|
|
for stats purpose.
|
|
|
|
@return: The list of excess matching hosts.
|
|
"""
|
|
hosts = kwargs.get(rdb_cache_manager.MEMOIZE_KEY, [])
|
|
if not hosts:
|
|
hosts = self.host_query_manager.find_hosts(
|
|
request.deps, request.acls)
|
|
|
|
# <-----[:attempt_lease_hosts](evicted)--------> <-(returned, cached)->
|
|
# | -leased_hosts- | -stale cached hosts- | -unleased matching- |
|
|
# --used this request---used by earlier request----------unused--------
|
|
hosts = self._sort_hosts_by_preferred_deps(
|
|
hosts, request.preferred_deps)
|
|
attempt_lease_hosts = min(len(hosts), hosts_required)
|
|
leased_host_count = 0
|
|
if attempt_lease_hosts:
|
|
leased_hosts = self.lease_hosts(hosts[:attempt_lease_hosts])
|
|
if leased_hosts:
|
|
self.update_response_map(request, leased_hosts, append=True)
|
|
|
|
# [:attempt_leased_hosts] - leased_hosts will include hosts that
|
|
# failed leasing, most likely because they're already leased, so
|
|
# don't cache them again.
|
|
leased_host_count = len(leased_hosts)
|
|
failed_leasing = attempt_lease_hosts - leased_host_count
|
|
if failed_leasing > 0:
|
|
# For the sake of simplicity this calculation assumes that
|
|
# leasing only fails if there's a stale cached host already
|
|
# leased by a previous request, ergo, we can only get here
|
|
# through a cache hit.
|
|
line_length = len(hosts)
|
|
self.cache.stale_entries.append(
|
|
(float(failed_leasing)/line_length) * 100)
|
|
self.leased_hosts_count += leased_host_count
|
|
if is_acquire_min_duts:
|
|
self.request_accountant.record_acquire_min_duts(
|
|
request, hosts_required, leased_host_count)
|
|
self.unsatisfied_requests += max(hosts_required - leased_host_count, 0)
|
|
# Cache the unleased matching hosts against the request.
|
|
return hosts[attempt_lease_hosts:]
|
|
|
|
|
|
@metrics.SecondsTimerDecorator(_rdb_timer_name % 'batch_acquire_hosts')
|
|
def batch_acquire_hosts(self, host_requests):
|
|
"""Acquire hosts for a list of requests.
|
|
|
|
The act of acquisition involves finding and leasing a set of hosts
|
|
that match the parameters of a request. Each acquired host is added
|
|
to the response_map dictionary as an RDBServerHostWrapper.
|
|
|
|
@param host_requests: A list of requests to acquire hosts.
|
|
"""
|
|
distinct_requests = 0
|
|
|
|
logging.debug('Processing %s host acquisition requests',
|
|
len(host_requests))
|
|
metrics.Gauge('chromeos/autotest/scheduler/pending_host_acq_requests'
|
|
).set(len(host_requests))
|
|
|
|
self.request_accountant = rdb_utils.RequestAccountant(host_requests)
|
|
# First pass tries to satisfy min_duts for each suite.
|
|
for request in self.request_accountant.requests:
|
|
to_acquire = self.request_accountant.get_min_duts(request)
|
|
if to_acquire > 0:
|
|
self._acquire_hosts(request, to_acquire,
|
|
is_acquire_min_duts=True)
|
|
distinct_requests += 1
|
|
|
|
# Second pass tries to allocate duts to the rest unsatisfied requests.
|
|
for request in self.request_accountant.requests:
|
|
to_acquire = self.request_accountant.get_duts(request)
|
|
if to_acquire > 0:
|
|
self._acquire_hosts(request, to_acquire,
|
|
is_acquire_min_duts=False)
|
|
|
|
self.cache.record_stats()
|
|
logging.debug('Host acquisition stats: distinct requests: %s, leased '
|
|
'hosts: %s, unsatisfied requests: %s', distinct_requests,
|
|
self.leased_hosts_count, self.unsatisfied_requests)
|
|
|
|
|
|
@metrics.SecondsTimerDecorator(_rdb_timer_name % 'batch_validate_hosts')
|
|
def batch_validate_hosts(self, requests):
|
|
"""Validate requests with hosts.
|
|
|
|
Reserve all hosts, check each one for validity and discard invalid
|
|
request-host pairings. Lease the remaining hsots.
|
|
|
|
@param requests: A list of requests to validate.
|
|
|
|
@raises RDBException: If multiple hosts or the wrong host is returned
|
|
for a response.
|
|
"""
|
|
# The following cases are possible for frontend requests:
|
|
# 1. Multiple requests for 1 host, with different acls/deps/priority:
|
|
# These form distinct requests because they hash differently.
|
|
# The response map will contain entries like: {r1: h1, r2: h1}
|
|
# after the batch_get_hosts call. There are 2 sub-cases:
|
|
# a. Same deps/acls, different priority:
|
|
# Since we sort the requests based on priority, the
|
|
# higher priority request r1, will lease h1. The
|
|
# validation of r2, h1 will fail because of the r1 lease.
|
|
# b. Different deps/acls, only one of which matches the host:
|
|
# The matching request will lease h1. The other host
|
|
# pairing will get dropped from the response map.
|
|
# 2. Multiple requests with the same acls/deps/priority and 1 host:
|
|
# These all have the same request hash, so the response map will
|
|
# contain: {r: h}, regardless of the number of r's. If this is not
|
|
# a valid host assignment it will get dropped from the response.
|
|
self.batch_get_hosts(set(requests))
|
|
for request in sorted(self.response_map.keys(),
|
|
key=lambda request: request.priority, reverse=True):
|
|
hosts = self.response_map[request]
|
|
if len(hosts) > 1:
|
|
raise rdb_utils.RDBException('Got multiple hosts for a single '
|
|
'request. Hosts: %s, request %s.' % (hosts, request))
|
|
# Job-shard is 1:1 mapping. Because a job can only belongs
|
|
# to one shard, or belongs to master, we disallow frontend job
|
|
# that spans hosts on and off shards or across multiple shards,
|
|
# which would otherwise break the 1:1 mapping.
|
|
# As such, on master, if a request asks for multiple hosts and
|
|
# if any host is found on shard, we assume other requested hosts
|
|
# would also be on the same shard. We can safely drop this request.
|
|
ignore_request = _is_master and any(
|
|
[host.shard_id for host in hosts])
|
|
if (not ignore_request and
|
|
(self.valid_host_assignment(request, hosts[0]) and
|
|
self.lease_hosts(hosts))):
|
|
continue
|
|
del self.response_map[request]
|
|
logging.warning('Request %s was not able to lease host %s',
|
|
request, hosts[0])
|
|
|
|
|
|
# Request dispatchers: Create the appropriate request handler, send a list
|
|
# of requests to one of its methods. The corresponding request handler in
|
|
# rdb_lib must understand how to match each request with a response from a
|
|
# dispatcher, the easiest way to achieve this is to returned the response_map
|
|
# attribute of the request handler, after making the appropriate requests.
|
|
def get_hosts(host_requests):
|
|
"""Get host information about the requested hosts.
|
|
|
|
@param host_requests: A list of requests as defined in BaseHostRequest.
|
|
@return: A dictionary mapping each request to a list of hosts.
|
|
"""
|
|
rdb_handler = BaseHostRequestHandler()
|
|
rdb_handler.batch_get_hosts(host_requests)
|
|
return rdb_handler.get_response()
|
|
|
|
|
|
def update_hosts(update_requests):
|
|
"""Update hosts.
|
|
|
|
@param update_requests: A list of updates to host tables
|
|
as defined in UpdateHostRequest.
|
|
"""
|
|
rdb_handler = BaseHostRequestHandler()
|
|
rdb_handler.update_hosts(update_requests)
|
|
return rdb_handler.get_response()
|
|
|
|
|
|
def rdb_host_request_dispatcher(host_requests):
|
|
"""Dispatcher for all host acquisition queries.
|
|
|
|
@param host_requests: A list of requests for acquiring hosts, as defined in
|
|
AcquireHostRequest.
|
|
@return: A dictionary mapping each request to a list of hosts, or
|
|
an empty list if none could satisfy the request. Eg:
|
|
{AcquireHostRequest.template: [host_info_dictionaries]}
|
|
"""
|
|
validation_requests = []
|
|
require_hosts_requests = []
|
|
|
|
# Validation requests are made by a job scheduled against a specific host
|
|
# specific host (eg: through the frontend) and only require the rdb to
|
|
# match the parameters of the host against the request. Acquisition
|
|
# requests are made by jobs that need hosts (eg: suites) and the rdb needs
|
|
# to find hosts matching the parameters of the request.
|
|
for request in host_requests:
|
|
if request.host_id:
|
|
validation_requests.append(request)
|
|
else:
|
|
require_hosts_requests.append(request)
|
|
|
|
rdb_handler = AvailableHostRequestHandler()
|
|
rdb_handler.batch_validate_hosts(validation_requests)
|
|
rdb_handler.batch_acquire_hosts(require_hosts_requests)
|
|
return rdb_handler.get_response()
|