170 lines
6.5 KiB
Python
170 lines
6.5 KiB
Python
# Copyright 2015 The Chromium Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
"""
|
|
This module is designed to report metadata in a separated thread to avoid the
|
|
performance overhead of sending data to Elasticsearch using HTTP.
|
|
|
|
"""
|
|
|
|
import logging
|
|
import Queue
|
|
import time
|
|
import threading
|
|
|
|
import common
|
|
from autotest_lib.client.common_lib.cros.graphite import autotest_es
|
|
from autotest_lib.client.common_lib.cros.graphite import autotest_stats
|
|
from autotest_lib.scheduler import email_manager
|
|
# The metadata_reporter thread runs inside scheduler process, thus it doesn't
|
|
# need to setup django, otherwise, following import is needed:
|
|
# from autotest_lib.frontend import setup_django_environment
|
|
from autotest_lib.site_utils import server_manager_utils
|
|
|
|
|
|
# Number of seconds to wait before checking queue again for uploading data.
|
|
_REPORT_INTERVAL_SECONDS = 5
|
|
|
|
_MAX_METADATA_QUEUE_SIZE = 1000000
|
|
_MAX_UPLOAD_SIZE = 50000
|
|
# The number of seconds for upload to fail continuously. After that, upload will
|
|
# be limited to 1 entry.
|
|
_MAX_UPLOAD_FAIL_DURATION = 600
|
|
# Number of entries to retry when the previous upload failed continueously for
|
|
# the duration of _MAX_UPLOAD_FAIL_DURATION.
|
|
_MIN_RETRY_ENTRIES = 10
|
|
# Queue to buffer metadata to be reported.
|
|
metadata_queue = Queue.Queue(_MAX_METADATA_QUEUE_SIZE)
|
|
|
|
_report_lock = threading.Lock()
|
|
_abort = threading.Event()
|
|
_queue_full = threading.Event()
|
|
|
|
def queue(data):
|
|
"""Queue metadata to be uploaded in reporter thread.
|
|
|
|
If the queue is full, an error will be logged for the first time the queue
|
|
becomes full. The call does not wait or raise Queue.Full exception, so
|
|
there is no overhead on the performance of caller, e.g., scheduler.
|
|
|
|
@param data: A metadata entry, which should be a dictionary.
|
|
"""
|
|
try:
|
|
metadata_queue.put_nowait(data)
|
|
if _queue_full.is_set():
|
|
logging.info('Metadata queue is available to receive new data '
|
|
'again.')
|
|
_queue_full.clear()
|
|
except Queue.Full:
|
|
if not _queue_full.is_set():
|
|
_queue_full.set()
|
|
logging.error('Metadata queue is full, cannot report data. '
|
|
'Consider increasing the value of '
|
|
'_MAX_METADATA_QUEUE_SIZE. Its current value is set '
|
|
'to %d.', _MAX_METADATA_QUEUE_SIZE)
|
|
|
|
|
|
def _email_alert():
|
|
"""
|
|
"""
|
|
if not server_manager_utils.use_server_db():
|
|
logging.debug('Server database not emailed, email alert is skipped.')
|
|
return
|
|
try:
|
|
server_manager_utils.confirm_server_has_role(hostname='localhost',
|
|
role='scheduler')
|
|
except server_manager_utils.ServerActionError:
|
|
# Only email alert if the server is a scheduler, not shard.
|
|
return
|
|
subject = ('Metadata upload has been failing for %d seconds' %
|
|
_MAX_UPLOAD_FAIL_DURATION)
|
|
email_manager.manager.enqueue_notify_email(subject, '')
|
|
email_manager.manager.send_queued_emails()
|
|
|
|
|
|
def _run():
|
|
"""Report metadata in the queue until being aborted.
|
|
"""
|
|
# Time when the first time upload failed. None if the last upload succeeded.
|
|
first_failed_upload = None
|
|
# True if email alert was sent when upload has been failing continuously
|
|
# for _MAX_UPLOAD_FAIL_DURATION seconds.
|
|
email_alert = False
|
|
upload_size = _MIN_RETRY_ENTRIES
|
|
try:
|
|
while True:
|
|
start_time = time.time()
|
|
data_list = []
|
|
if (first_failed_upload and
|
|
time.time() - first_failed_upload > _MAX_UPLOAD_FAIL_DURATION):
|
|
upload_size = _MIN_RETRY_ENTRIES
|
|
if not email_alert:
|
|
_email_alert()
|
|
email_alert = True
|
|
else:
|
|
upload_size = min(upload_size*2, _MAX_UPLOAD_SIZE)
|
|
while (not metadata_queue.empty() and len(data_list) < upload_size):
|
|
data_list.append(metadata_queue.get_nowait())
|
|
if data_list:
|
|
if autotest_es.bulk_post(data_list=data_list):
|
|
time_used = time.time() - start_time
|
|
logging.info('%d entries of metadata uploaded in %s '
|
|
'seconds.', len(data_list), time_used)
|
|
autotest_stats.Timer('metadata_reporter').send(
|
|
'time_used', time_used)
|
|
autotest_stats.Gauge('metadata_reporter').send(
|
|
'entries_uploaded', len(data_list))
|
|
first_failed_upload = None
|
|
email_alert = False
|
|
else:
|
|
logging.warn('Failed to upload %d entries of metadata, '
|
|
'they will be retried later.', len(data_list))
|
|
autotest_stats.Gauge('metadata_reporter').send(
|
|
'entries_failed', len(data_list))
|
|
for data in data_list:
|
|
queue(data)
|
|
if not first_failed_upload:
|
|
first_failed_upload = time.time()
|
|
sleep_time = _REPORT_INTERVAL_SECONDS - time.time() + start_time
|
|
if sleep_time < 0:
|
|
sleep_time = 0.5
|
|
_abort.wait(timeout=sleep_time)
|
|
except Exception as e:
|
|
logging.error('Metadata reporter thread failed with error: %s', e)
|
|
raise
|
|
finally:
|
|
logging.info('Metadata reporting thread is exiting.')
|
|
_abort.clear()
|
|
_report_lock.release()
|
|
|
|
|
|
def start():
|
|
"""Start the thread to report metadata.
|
|
"""
|
|
# The lock makes sure there is only one reporting thread working.
|
|
if _report_lock.locked():
|
|
logging.error('There is already a metadata reporter thread.')
|
|
return
|
|
|
|
_report_lock.acquire()
|
|
reporting_thread = threading.Thread(target=_run)
|
|
# Make it a daemon thread so it doesn't need to be closed explicitly.
|
|
reporting_thread.setDaemon(True)
|
|
reporting_thread.start()
|
|
logging.info('Metadata reporting thread is started.')
|
|
|
|
|
|
def abort():
|
|
"""Abort the thread to report metadata.
|
|
|
|
The call will wait up to 5 seconds for existing data to be uploaded.
|
|
"""
|
|
if not _report_lock.locked():
|
|
logging.error('The metadata reporting thread has already exited.')
|
|
return
|
|
|
|
_abort.set()
|
|
logging.info('Waiting up to %s seconds for metadata reporting thread to '
|
|
'complete.', _REPORT_INTERVAL_SECONDS)
|
|
_abort.wait(_REPORT_INTERVAL_SECONDS)
|