363 lines
12 KiB
Python
363 lines
12 KiB
Python
# Copyright 2015-2017 ARM Limited
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
import unittest
|
|
from trappy.stats.Topology import Topology
|
|
from trappy.stats.Trigger import Trigger
|
|
from trappy.stats.Aggregator import MultiTriggerAggregator
|
|
|
|
import collections
|
|
import trappy
|
|
from trappy.base import Base
|
|
import pandas as pd
|
|
from pandas.util.testing import assert_series_equal
|
|
|
|
|
|
class TestTopology(unittest.TestCase):
|
|
|
|
def test_add_to_level(self):
|
|
"""Test level creation"""
|
|
|
|
level_groups = [[1, 2], [0, 3, 4, 5]]
|
|
level = "test_level"
|
|
topology = Topology()
|
|
topology.add_to_level(level, level_groups)
|
|
check_groups = topology.get_level(level)
|
|
|
|
self.assertTrue(topology.has_level(level))
|
|
self.assertEqual(level_groups, check_groups)
|
|
|
|
def test_flatten(self):
|
|
"""Test Topology: flatten"""
|
|
|
|
level_groups = [[1, 2], [0, 3, 4, 5]]
|
|
level = "test_level"
|
|
topology = Topology()
|
|
topology.add_to_level(level, level_groups)
|
|
flattened = [0, 1, 2, 3, 4, 5]
|
|
|
|
self.assertEqual(flattened, topology.flatten())
|
|
|
|
def test_cpu_topology_construction(self):
|
|
"""Test CPU Topology Construction"""
|
|
|
|
cluster_0 = [0, 3, 4, 5]
|
|
cluster_1 = [1, 2]
|
|
clusters = [cluster_0, cluster_1]
|
|
topology = Topology(clusters=clusters)
|
|
|
|
# Check cluster level creation
|
|
cluster_groups = [[0, 3, 4, 5], [1, 2]]
|
|
self.assertTrue(topology.has_level("cluster"))
|
|
self.assertEqual(cluster_groups, topology.get_level("cluster"))
|
|
|
|
# Check cpu level creation
|
|
cpu_groups = [[0], [1], [2], [3], [4], [5]]
|
|
self.assertTrue(topology.has_level("cpu"))
|
|
self.assertEqual(cpu_groups, topology.get_level("cpu"))
|
|
|
|
# Check "all" level
|
|
all_groups = [[0, 1, 2, 3, 4, 5]]
|
|
self.assertEqual(all_groups, topology.get_level("all"))
|
|
|
|
def test_level_span(self):
|
|
"""TestTopology: level_span"""
|
|
|
|
level_groups = [[1, 2], [0, 3, 4, 5]]
|
|
level = "test_level"
|
|
topology = Topology()
|
|
topology.add_to_level(level, level_groups)
|
|
|
|
self.assertEqual(topology.level_span(level), 2)
|
|
|
|
def test_group_index(self):
|
|
"""TestTopology: get_index"""
|
|
|
|
level_groups = [[1, 2], [0, 3, 4, 5]]
|
|
level = "test_level"
|
|
topology = Topology()
|
|
topology.add_to_level(level, level_groups)
|
|
|
|
self.assertEqual(topology.get_index(level, [1, 2]), 0)
|
|
self.assertEqual(topology.get_index(level, [0, 3, 4, 5]), 1)
|
|
|
|
class BaseTestStats(unittest.TestCase):
|
|
def setUp(self):
|
|
trace = trappy.BareTrace()
|
|
data = {
|
|
|
|
"identifier": [
|
|
0,
|
|
0,
|
|
0,
|
|
1,
|
|
1,
|
|
1,
|
|
],
|
|
"result": [
|
|
"fire",
|
|
"blank",
|
|
"fire",
|
|
"blank",
|
|
"fire",
|
|
"blank",
|
|
],
|
|
}
|
|
|
|
index = pd.Series([0.1, 0.2, 0.3, 0.4, 0.5, 0.6], name="Time")
|
|
data_frame = pd.DataFrame(data, index=index)
|
|
trace.add_parsed_event("aim_and_fire", data_frame)
|
|
self._trace = trace
|
|
self.topology = Topology(clusters=[[0], [1]])
|
|
|
|
|
|
class TestTrigger(BaseTestStats):
|
|
|
|
def test_trigger_generation(self):
|
|
"""TestTrigger: generate"""
|
|
|
|
filters = {
|
|
"result": "fire"
|
|
}
|
|
|
|
event_class = self._trace.aim_and_fire
|
|
value = 1
|
|
pivot = "identifier"
|
|
|
|
trigger = Trigger(self._trace,
|
|
event_class,
|
|
filters,
|
|
value,
|
|
pivot)
|
|
|
|
expected = pd.Series([1, 1], index=pd.Index([0.1, 0.3], name="Time"))
|
|
assert_series_equal(expected, trigger.generate(0))
|
|
|
|
expected = pd.Series([1], index=pd.Index([0.5], name="Time"))
|
|
assert_series_equal(expected, trigger.generate(1))
|
|
|
|
def test_trigger_with_func(self):
|
|
"""Trigger works with a function or lambda as filter"""
|
|
|
|
def my_filter(val):
|
|
return val.startswith("fi")
|
|
|
|
trigger = Trigger(self._trace, self._trace.aim_and_fire,
|
|
filters={"result": my_filter}, value=1,
|
|
pivot="identifier")
|
|
|
|
expected = pd.Series([1], index=pd.Index([0.5], name="Time"))
|
|
assert_series_equal(expected, trigger.generate(1))
|
|
|
|
my_filters = {"result": lambda x: x.startswith("bl")}
|
|
trigger = Trigger(self._trace, self._trace.aim_and_fire,
|
|
filters=my_filters, value=1, pivot="identifier")
|
|
|
|
expected = pd.Series([1, 1], index=pd.Index([0.4, 0.6], name="Time"))
|
|
assert_series_equal(expected, trigger.generate(1))
|
|
|
|
def test_trigger_with_callable_class(self):
|
|
"""Trigger works with a callable class as filter"""
|
|
|
|
class my_filter(object):
|
|
def __init__(self, val_out):
|
|
self.prev_val = 0
|
|
self.val_out = val_out
|
|
|
|
def __call__(self, val):
|
|
ret = self.prev_val == self.val_out
|
|
self.prev_val = val
|
|
|
|
return ret
|
|
|
|
trigger = Trigger(self._trace, self._trace.aim_and_fire,
|
|
filters={"identifier": my_filter(1)}, value=1,
|
|
pivot="result")
|
|
|
|
expected = pd.Series([1], index=pd.Index([0.6], name="Time"))
|
|
assert_series_equal(expected, trigger.generate("blank"))
|
|
|
|
def test_filter_prev_values(self):
|
|
"""Trigger works with a filter that depends on previous values of the same pivot"""
|
|
|
|
# We generate an example in which we want a trigger whenever the
|
|
# identifier is no longer 1 for blank
|
|
|
|
class my_filter(object):
|
|
def __init__(self, val_out):
|
|
self.prev_val = 0
|
|
self.val_out = val_out
|
|
|
|
def __call__(self, val):
|
|
ret = self.prev_val == self.val_out
|
|
self.prev_val = val
|
|
|
|
return ret
|
|
|
|
trace = trappy.BareTrace()
|
|
data = collections.OrderedDict([
|
|
(0.1, ["blank", 1]),
|
|
(0.2, ["fire", 1]),
|
|
(0.3, ["blank", 0]), # value is no longer 1, trigger
|
|
(0.4, ["blank", 1]),
|
|
(0.5, ["fire", 0]), # This should NOT trigger
|
|
(0.6, ["blank", 0]), # value is no longer 1 for blank, trigger
|
|
])
|
|
data_frame = pd.DataFrame.from_dict(data, orient="index", )
|
|
data_frame.columns = ["result", "identifier"]
|
|
trace.add_parsed_event("aim_and_fire", data_frame)
|
|
|
|
trigger = Trigger(trace, trace.aim_and_fire,
|
|
filters={"identifier": my_filter(1)}, value=-1,
|
|
pivot="result")
|
|
|
|
expected = pd.Series([-1, -1], index=[0.3, 0.6])
|
|
assert_series_equal(expected, trigger.generate("blank"))
|
|
|
|
|
|
|
|
class TestAggregator(BaseTestStats):
|
|
|
|
def test_scalar_aggfunc_single_trigger(self):
|
|
"""TestAggregator: 1 trigger scalar aggfunc"""
|
|
|
|
def aggfunc(series):
|
|
return series.sum()
|
|
|
|
filters = {
|
|
"result": "fire"
|
|
}
|
|
|
|
event_class = self._trace.aim_and_fire
|
|
value = 1
|
|
pivot = "identifier"
|
|
|
|
trigger = Trigger(self._trace,
|
|
event_class,
|
|
filters,
|
|
value,
|
|
pivot)
|
|
|
|
aggregator = MultiTriggerAggregator([trigger],
|
|
self.topology,
|
|
aggfunc=aggfunc)
|
|
|
|
# There are three "fire" in total
|
|
# The all level in topology looks like
|
|
# [[0, 1]]
|
|
result = aggregator.aggregate(level="all")
|
|
self.assertEqual(result, [3.0])
|
|
|
|
# There are two "fire" on the first node group and a
|
|
# a single "fire" on the second node group at the cluster
|
|
# level which looks like
|
|
# [[0], [1]]
|
|
result = aggregator.aggregate(level="cluster")
|
|
self.assertEqual(result, [2.0, 1.0])
|
|
|
|
def test_vector_aggfunc_single_trigger(self):
|
|
"""TestAggregator: 1 trigger vector aggfunc"""
|
|
|
|
def aggfunc(series):
|
|
return series.cumsum()
|
|
|
|
filters = {
|
|
"result": "fire"
|
|
}
|
|
|
|
event_class = self._trace.aim_and_fire
|
|
value = 1
|
|
pivot = "identifier"
|
|
|
|
trigger = Trigger(self._trace, event_class, filters, value, pivot)
|
|
|
|
aggregator = MultiTriggerAggregator([trigger],
|
|
self.topology,
|
|
aggfunc=aggfunc)
|
|
|
|
# There are three "fire" in total
|
|
# The all level in topology looks like
|
|
# [[0, 1]]
|
|
result = aggregator.aggregate(level="all")
|
|
expected_result = pd.Series([1.0, 1.0, 2.0, 2.0, 3.0, 3.0],
|
|
index=pd.Index([0.1, 0.2, 0.3, 0.4, 0.5, 0.6])
|
|
)
|
|
assert_series_equal(result[0], expected_result)
|
|
|
|
def test_vector_aggfunc_multiple_trigger(self):
|
|
"""TestAggregator: multi trigger vector aggfunc"""
|
|
|
|
def aggfunc(series):
|
|
return series.cumsum()
|
|
|
|
filters = {
|
|
"result": "fire"
|
|
}
|
|
|
|
event_class = self._trace.aim_and_fire
|
|
value = 1
|
|
pivot = "identifier"
|
|
|
|
trigger_fire = Trigger(self._trace,
|
|
event_class,
|
|
filters,
|
|
value,
|
|
pivot)
|
|
|
|
filters = {
|
|
"result": "blank"
|
|
}
|
|
value = -1
|
|
trigger_blank = Trigger(self._trace, event_class, filters, value,
|
|
pivot)
|
|
|
|
aggregator = MultiTriggerAggregator([trigger_fire, trigger_blank],
|
|
self.topology,
|
|
aggfunc=aggfunc)
|
|
|
|
# There are three "fire" in total
|
|
# The all level in topology looks like
|
|
# [[0, 1]]
|
|
result = aggregator.aggregate(level="all")
|
|
expected_result = pd.Series([1.0, 0.0, 1.0, 0.0, 1.0, 0.0],
|
|
index=pd.Index([0.1, 0.2, 0.3, 0.4, 0.5, 0.6])
|
|
)
|
|
assert_series_equal(result[0], expected_result)
|
|
|
|
def test_default_aggfunc_multiple_trigger(self):
|
|
"""MultiTriggerAggregator with the default aggfunc"""
|
|
|
|
trigger_fire = Trigger(self._trace, self._trace.aim_and_fire,
|
|
filters={"result": "fire"},
|
|
pivot="identifier", value=1)
|
|
|
|
trigger_blank = Trigger(self._trace, self._trace.aim_and_fire,
|
|
filters={"result": "blank"},
|
|
pivot="identifier", value=2)
|
|
|
|
aggregator = MultiTriggerAggregator([trigger_fire, trigger_blank],
|
|
self.topology)
|
|
|
|
results = aggregator.aggregate(level="cpu")
|
|
expected_results = [
|
|
pd.Series([1., 2., 1., 0., 0., 0.],
|
|
index=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6]),
|
|
pd.Series([0., 0., 0., 2., 1., 2.],
|
|
index=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6]),
|
|
]
|
|
|
|
self.assertEquals(len(results), len(expected_results))
|
|
for result, expected in zip(results, expected_results):
|
|
assert_series_equal(result, expected)
|