165 lines
5.4 KiB
Python
165 lines
5.4 KiB
Python
# Copyright 2015 The Chromium Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
"""
|
|
This module is designed to report metadata in a separated thread to avoid the
|
|
performance overhead of sending data to Elasticsearch using HTTP.
|
|
|
|
"""
|
|
|
|
import logging
|
|
import Queue
|
|
import socket
|
|
import time
|
|
import threading
|
|
|
|
import common
|
|
from autotest_lib.client.common_lib import utils
|
|
|
|
try:
|
|
from chromite.lib import metrics
|
|
except ImportError:
|
|
metrics = utils.metrics_mock
|
|
|
|
|
|
_METADATA_METRICS_PREFIX = 'chromeos/autotest/es_metadata_reporter/'
|
|
|
|
# Number of seconds to wait before checking queue again for uploading data.
|
|
_REPORT_INTERVAL_SECONDS = 5
|
|
|
|
_MAX_METADATA_QUEUE_SIZE = 1000000
|
|
_MAX_UPLOAD_SIZE = 50000
|
|
# The number of seconds for upload to fail continuously. After that, upload will
|
|
# be limited to 1 entry.
|
|
_MAX_UPLOAD_FAIL_DURATION = 600
|
|
# Number of entries to retry when the previous upload failed continueously for
|
|
# the duration of _MAX_UPLOAD_FAIL_DURATION.
|
|
_MIN_RETRY_ENTRIES = 10
|
|
# Queue to buffer metadata to be reported.
|
|
metadata_queue = Queue.Queue(_MAX_METADATA_QUEUE_SIZE)
|
|
|
|
_report_lock = threading.Lock()
|
|
_abort = threading.Event()
|
|
_queue_full = threading.Event()
|
|
_metrics_fields = {}
|
|
|
|
def _get_metrics_fields():
|
|
"""Get the fields information to be uploaded to metrics."""
|
|
if not _metrics_fields:
|
|
_metrics_fields['hostname'] = socket.gethostname()
|
|
|
|
return _metrics_fields
|
|
|
|
|
|
def queue(data):
|
|
"""Queue metadata to be uploaded in reporter thread.
|
|
|
|
If the queue is full, an error will be logged for the first time the queue
|
|
becomes full. The call does not wait or raise Queue.Full exception, so
|
|
there is no overhead on the performance of caller, e.g., scheduler.
|
|
|
|
@param data: A metadata entry, which should be a dictionary.
|
|
"""
|
|
if not is_running():
|
|
return
|
|
|
|
try:
|
|
metadata_queue.put_nowait(data)
|
|
if _queue_full.is_set():
|
|
logging.info('Metadata queue is available to receive new data '
|
|
'again.')
|
|
_queue_full.clear()
|
|
except Queue.Full:
|
|
if not _queue_full.is_set():
|
|
_queue_full.set()
|
|
logging.error('Metadata queue is full, cannot report data. '
|
|
'Consider increasing the value of '
|
|
'_MAX_METADATA_QUEUE_SIZE. Its current value is set '
|
|
'to %d.', _MAX_METADATA_QUEUE_SIZE)
|
|
|
|
|
|
def _run():
|
|
"""Report metadata in the queue until being aborted.
|
|
"""
|
|
# Time when the first time upload failed. None if the last upload succeeded.
|
|
first_failed_upload = None
|
|
upload_size = _MIN_RETRY_ENTRIES
|
|
|
|
try:
|
|
while True:
|
|
start_time = time.time()
|
|
data_list = []
|
|
if (first_failed_upload and
|
|
time.time() - first_failed_upload > _MAX_UPLOAD_FAIL_DURATION):
|
|
upload_size = _MIN_RETRY_ENTRIES
|
|
else:
|
|
upload_size = min(upload_size*2, _MAX_UPLOAD_SIZE)
|
|
while (not metadata_queue.empty() and len(data_list) < upload_size):
|
|
data_list.append(metadata_queue.get_nowait())
|
|
if data_list:
|
|
success = False
|
|
fields = _get_metrics_fields().copy()
|
|
fields['success'] = success
|
|
metrics.Gauge(
|
|
_METADATA_METRICS_PREFIX + 'upload/batch_sizes').set(
|
|
len(data_list), fields=fields)
|
|
metrics.Counter(
|
|
_METADATA_METRICS_PREFIX + 'upload/attempts').increment(
|
|
fields=fields);
|
|
|
|
metrics.Gauge(_METADATA_METRICS_PREFIX + 'queue_size').set(
|
|
metadata_queue.qsize(), fields=_get_metrics_fields())
|
|
sleep_time = _REPORT_INTERVAL_SECONDS - time.time() + start_time
|
|
if sleep_time < 0:
|
|
sleep_time = 0.5
|
|
_abort.wait(timeout=sleep_time)
|
|
except Exception as e:
|
|
logging.exception('Metadata reporter thread failed with error: %s', e)
|
|
raise
|
|
finally:
|
|
logging.info('Metadata reporting thread is exiting.')
|
|
_abort.clear()
|
|
_report_lock.release()
|
|
|
|
|
|
def is_running():
|
|
"""Check if metadata_reporter is running.
|
|
|
|
@return: True if metadata_reporter is running.
|
|
"""
|
|
return _report_lock.locked()
|
|
|
|
|
|
def start():
|
|
"""Start the thread to report metadata.
|
|
"""
|
|
# The lock makes sure there is only one reporting thread working.
|
|
if is_running():
|
|
logging.error('There is already a metadata reporter thread.')
|
|
return
|
|
|
|
logging.warn('Elasticsearch db deprecated, no metadata will be '
|
|
'reported.')
|
|
|
|
_report_lock.acquire()
|
|
reporting_thread = threading.Thread(target=_run)
|
|
# Make it a daemon thread so it doesn't need to be closed explicitly.
|
|
reporting_thread.setDaemon(True)
|
|
reporting_thread.start()
|
|
logging.info('Metadata reporting thread is started.')
|
|
|
|
|
|
def abort():
|
|
"""Abort the thread to report metadata.
|
|
|
|
The call will wait up to 5 seconds for existing data to be uploaded.
|
|
"""
|
|
if not is_running():
|
|
logging.error('The metadata reporting thread has already exited.')
|
|
return
|
|
|
|
_abort.set()
|
|
logging.info('Waiting up to %s seconds for metadata reporting thread to '
|
|
'complete.', _REPORT_INTERVAL_SECONDS)
|
|
_abort.wait(_REPORT_INTERVAL_SECONDS)
|