196 lines
7.4 KiB
Python
196 lines
7.4 KiB
Python
# Copyright 2015-2017 ARM Limited
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
|
|
import pandas as pd
|
|
import unittest
|
|
|
|
from trappy.plotter import AttrConf
|
|
from trappy.plotter.Constraint import Constraint, ConstraintManager
|
|
|
|
class TestConstraintManager(unittest.TestCase):
|
|
"""Test trappy.plotter.ConstraintManager"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
"""Init some common data for the tests"""
|
|
|
|
self.dfrs = [pd.DataFrame({"load": [1, 2, 2, 3],
|
|
"freq": [2, 3, 3, 4],
|
|
"cpu": [0, 1, 0, 1]}),
|
|
pd.DataFrame({"load": [2, 3, 2, 1],
|
|
"freq": [1, 2, 2, 1],
|
|
"cpu": [1, 0, 1, 0]})]
|
|
self.cols = ["load", "freq"]
|
|
super(TestConstraintManager, self).__init__(*args, **kwargs)
|
|
|
|
def test_one_constraint(self):
|
|
"""Test that the constraint manager works with one constraint"""
|
|
|
|
dfr = self.dfrs[0]
|
|
|
|
c_mgr = ConstraintManager(dfr, "load", None, AttrConf.PIVOT, {})
|
|
|
|
self.assertEquals(len(c_mgr), 1)
|
|
|
|
constraint = iter(c_mgr).next()
|
|
series = constraint.result[AttrConf.PIVOT_VAL]
|
|
self.assertEquals(series.to_dict().values(),
|
|
dfr["load"].to_dict().values())
|
|
|
|
def test_no_pivot_multiple_traces(self):
|
|
"""Test that the constraint manager works with multiple traces and no pivots"""
|
|
|
|
c_mgr = ConstraintManager(self.dfrs, "load", None, AttrConf.PIVOT, {})
|
|
|
|
self.assertEquals(len(c_mgr), 2)
|
|
|
|
for constraint, orig_dfr in zip(c_mgr, self.dfrs):
|
|
series = constraint.result[AttrConf.PIVOT_VAL]
|
|
self.assertEquals(series.to_dict().values(),
|
|
orig_dfr["load"].to_dict().values())
|
|
|
|
def test_no_pivot_zipped_columns_and_traces(self):
|
|
"""Test the constraint manager with multiple columns and traces zipped"""
|
|
|
|
c_mgr = ConstraintManager(self.dfrs, self.cols, None, AttrConf.PIVOT, {})
|
|
|
|
self.assertEquals(len(c_mgr), 2)
|
|
|
|
for constraint, orig_dfr, col in zip(c_mgr, self.dfrs, self.cols):
|
|
series = constraint.result[AttrConf.PIVOT_VAL]
|
|
self.assertEquals(series.to_dict().values(),
|
|
orig_dfr[col].to_dict().values())
|
|
|
|
def test_no_pivot_multicolumns_multitraces(self):
|
|
"""Test the constraint manager with multiple traces that can have each multiple columns"""
|
|
|
|
c_mgr = ConstraintManager(self.dfrs, self.cols, None, AttrConf.PIVOT,
|
|
{}, zip_constraints=False)
|
|
|
|
self.assertEquals(len(c_mgr), 4)
|
|
|
|
expected_series = [dfr[col] for dfr in self.dfrs for col in self.cols]
|
|
for constraint, orig_series in zip(c_mgr, expected_series):
|
|
series = constraint.result[AttrConf.PIVOT_VAL]
|
|
self.assertEquals(series.to_dict(), orig_series.to_dict())
|
|
|
|
def test_no_pivot_filters(self):
|
|
"""Test the constraint manager with filters"""
|
|
|
|
simple_filter = {"freq": [2]}
|
|
|
|
c_mgr = ConstraintManager(self.dfrs, "load", None, AttrConf.PIVOT,
|
|
simple_filter)
|
|
|
|
num_constraints = len(c_mgr)
|
|
self.assertEquals(num_constraints, 2)
|
|
|
|
constraint_iter = iter(c_mgr)
|
|
constraint = constraint_iter.next()
|
|
self.assertEquals(len(constraint.result), 1)
|
|
|
|
constraint = constraint_iter.next()
|
|
series_second_frame = constraint.result[AttrConf.PIVOT_VAL]
|
|
self.assertEquals(series_second_frame.to_dict().values(), [3, 2])
|
|
|
|
def test_pivoted_data(self):
|
|
"""Test the constraint manager with a pivot and one trace"""
|
|
|
|
c_mgr = ConstraintManager(self.dfrs[0], "load", None, "cpu", {})
|
|
|
|
self.assertEquals(len(c_mgr), 1)
|
|
|
|
constraint = iter(c_mgr).next()
|
|
results = dict([(k, v.to_dict().values()) for k, v in constraint.result.items()])
|
|
expected_results = {0: [1, 2], 1: [2, 3]}
|
|
|
|
self.assertEquals(results, expected_results)
|
|
|
|
def test_pivoted_multitrace(self):
|
|
"""Test the constraint manager with a pivot and multiple traces"""
|
|
|
|
c_mgr = ConstraintManager(self.dfrs, "load", None, "cpu", {})
|
|
|
|
self.assertEquals(len(c_mgr), 2)
|
|
|
|
constraint_iter = iter(c_mgr)
|
|
constraint = constraint_iter.next()
|
|
self.assertEquals(constraint.result[0].to_dict().values(), [1, 2])
|
|
|
|
constraint = constraint_iter.next()
|
|
self.assertEquals(constraint.result[1].to_dict().values(), [2, 2])
|
|
|
|
def test_pivoted_multitraces_multicolumns(self):
|
|
"""Test the constraint manager with multiple traces and columns"""
|
|
|
|
c_mgr = ConstraintManager(self.dfrs, ["load", "freq"], None, "cpu", {})
|
|
self.assertEquals(len(c_mgr), 2)
|
|
|
|
constraint_iter = iter(c_mgr)
|
|
constraint = constraint_iter.next()
|
|
self.assertEquals(constraint.result[1].to_dict().values(), [2, 3])
|
|
|
|
constraint = constraint_iter.next()
|
|
self.assertEquals(constraint.result[0].to_dict().values(), [2, 1])
|
|
|
|
def test_pivoted_with_filters(self):
|
|
"""Test the constraint manager with pivoted data and filters"""
|
|
|
|
simple_filter = {"load": [2]}
|
|
c_mgr = ConstraintManager(self.dfrs[0], "freq", None, "cpu",
|
|
simple_filter)
|
|
|
|
self.assertEquals(len(c_mgr), 1)
|
|
|
|
constraint = iter(c_mgr).next()
|
|
result = constraint.result
|
|
|
|
self.assertEquals(result[0].iloc[0], 3)
|
|
self.assertEquals(result[1].iloc[0], 3)
|
|
|
|
def test_constraint_with_window(self):
|
|
"""Test that the constraint manager can constraint to a window of time"""
|
|
c_mgr = ConstraintManager(self.dfrs[0], "freq", None, AttrConf.PIVOT, {},
|
|
window=(1, 3))
|
|
|
|
constraint = iter(c_mgr).next()
|
|
series = constraint.result[AttrConf.PIVOT_VAL]
|
|
self.assertEquals(len(series), 3)
|
|
|
|
# For the graph to plot a value at 0.75, the resulting series
|
|
# must contain the value before 0.75. Same for the upper limit.
|
|
c_mgr = ConstraintManager(self.dfrs[0], "freq", None, AttrConf.PIVOT, {},
|
|
window=(0.75, 1.5))
|
|
|
|
constraint = iter(c_mgr).next()
|
|
series = constraint.result[AttrConf.PIVOT_VAL]
|
|
self.assertEquals(series.index.tolist(), [0, 1, 2])
|
|
|
|
c_mgr = ConstraintManager(self.dfrs[0], "freq", None, AttrConf.PIVOT, {},
|
|
window=(0, 2))
|
|
|
|
constraint = iter(c_mgr).next()
|
|
series = constraint.result[AttrConf.PIVOT_VAL]
|
|
self.assertEquals(len(series), 3)
|
|
|
|
class TestConstraint(unittest.TestCase):
|
|
def test_str_constraint(self):
|
|
"""str(constraint) doesn't fail when the column is not a string"""
|
|
dfr = pd.DataFrame({12: [1, 2, 3], 13: [3, 4, 5]})
|
|
|
|
constraint = Constraint(dfr, AttrConf.PIVOT, 12, template=None,
|
|
trace_index=0, filters={}, window=None)
|
|
|
|
self.assertEqual(str(constraint), "DataFrame 0:12")
|