504 lines
18 KiB
Python
Executable file
504 lines
18 KiB
Python
Executable file
#
|
|
# Copyright (C) 2012 The Android Open Source Project
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
# DEPRECATED
|
|
# Do not use flimflam.py in future development.
|
|
# Extend / migrate to shill_proxy suite of scripts instead.
|
|
|
|
import logging, time
|
|
|
|
import dbus
|
|
|
|
DEFAULT_CELLULAR_TIMEOUT = 60
|
|
|
|
def make_dbus_boolean(value):
|
|
value = value.upper()
|
|
if value in ["ON", "TRUE"]:
|
|
return dbus.Boolean(1)
|
|
elif value in ["OFF", "FALSE"]:
|
|
return dbus.Boolean(0)
|
|
else:
|
|
return dbus.Boolean(int(value))
|
|
|
|
#
|
|
# Convert a DBus value to a printable value; used
|
|
# to print properties returned via DBus
|
|
#
|
|
def convert_dbus_value(value, indent=0):
|
|
# DEPRECATED
|
|
spacer = ' ' * indent
|
|
if value.__class__ == dbus.Byte:
|
|
return int(value)
|
|
elif value.__class__ == dbus.Boolean:
|
|
return bool(value)
|
|
elif value.__class__ == dbus.Dictionary:
|
|
valstr = "{"
|
|
for key in value:
|
|
valstr += "\n" + spacer + " " + \
|
|
key + ": " + str(convert_dbus_value(value[key], indent + 4))
|
|
valstr += "\n" + spacer + "}"
|
|
return valstr
|
|
elif value.__class__ == dbus.Array:
|
|
valstr = "["
|
|
for val in value:
|
|
valstr += "\n" + spacer + " " + \
|
|
str(convert_dbus_value(val, indent + 4))
|
|
valstr += "\n" + spacer + "]"
|
|
return valstr
|
|
else:
|
|
return str(value)
|
|
|
|
class FlimFlam(object):
|
|
# DEPRECATED
|
|
|
|
SHILL_DBUS_INTERFACE = "org.chromium.flimflam"
|
|
UNKNOWN_METHOD = 'org.freedesktop.DBus.Error.UnknownMethod'
|
|
UNKNOWN_OBJECT = 'org.freedesktop.DBus.Error.UnknownObject'
|
|
|
|
DEVICE_WIMAX = 'wimax'
|
|
DEVICE_CELLULAR = 'cellular'
|
|
|
|
@staticmethod
|
|
def _GetContainerName(kind):
|
|
"""Map shill element names to the names of their collections."""
|
|
# For example, Device - > Devices.
|
|
# Just pulling this out so we can use a map if we start
|
|
# caring about "AvailableTechnologies"
|
|
return kind + "s"
|
|
|
|
@staticmethod
|
|
def WaitForServiceState(service, expected_states, timeout,
|
|
ignore_failure=False, property_name="State"):
|
|
"""Wait until service enters a state in expected_states or times out.
|
|
Args:
|
|
service: service to watch
|
|
expected_states: list of exit states
|
|
timeout: in seconds
|
|
ignore_failure: should the failure state be ignored?
|
|
property_name: name of service property
|
|
|
|
Returns: (state, seconds waited)
|
|
|
|
If the state is "failure" and ignore_failure is False we return
|
|
immediately without waiting for the timeout.
|
|
"""
|
|
|
|
state = None
|
|
start_time = time.time()
|
|
timeout = start_time + timeout
|
|
while time.time() < timeout:
|
|
properties = service.GetProperties(utf8_strings = True)
|
|
state = properties.get(property_name, None)
|
|
if ((state == "failure" and not ignore_failure) or
|
|
state in expected_states):
|
|
break
|
|
time.sleep(.5)
|
|
|
|
config_time = time.time() - start_time
|
|
# str() to remove DBus boxing
|
|
return (str(state), config_time)
|
|
|
|
@staticmethod
|
|
def DisconnectService(service, wait_timeout=15):
|
|
try:
|
|
service.Disconnect()
|
|
except dbus.exceptions.DBusException, error:
|
|
if error.get_dbus_name() not in [
|
|
FlimFlam.SHILL_DBUS_INTERFACE + ".Error.InProgress",
|
|
FlimFlam.SHILL_DBUS_INTERFACE + ".Error.NotConnected", ]:
|
|
raise error
|
|
return FlimFlam.WaitForServiceState(service, ['idle'], wait_timeout)
|
|
|
|
def __init__(self, bus=None):
|
|
if not bus:
|
|
bus = dbus.SystemBus()
|
|
self.bus = bus
|
|
shill = bus.get_object(FlimFlam.SHILL_DBUS_INTERFACE, "/")
|
|
self.manager = dbus.Interface(
|
|
shill,
|
|
FlimFlam.SHILL_DBUS_INTERFACE + ".Manager")
|
|
|
|
def _FindDevice(self, device_type, timeout):
|
|
""" Return the first device object that matches a given device type.
|
|
|
|
Wait until the device type is avilable or until timeout
|
|
|
|
Args:
|
|
device_type: string format of the type of device.
|
|
timeout: in seconds
|
|
|
|
Returns: Device or None
|
|
"""
|
|
timeout = time.time() + timeout
|
|
device_obj = None
|
|
while time.time() < timeout:
|
|
device_obj = self.FindElementByPropertySubstring('Device',
|
|
'Type',
|
|
device_type)
|
|
if device_obj:
|
|
break
|
|
time.sleep(1)
|
|
return device_obj
|
|
|
|
def FindCellularDevice(self, timeout=DEFAULT_CELLULAR_TIMEOUT):
|
|
return self._FindDevice(self.DEVICE_CELLULAR, timeout)
|
|
|
|
def FindWimaxDevice(self, timeout=30):
|
|
return self._FindDevice(self.DEVICE_WIMAX, timeout)
|
|
|
|
def _FindService(self, device_type, timeout):
|
|
"""Return the first service object that matches the device type.
|
|
|
|
Wait until a service is available or until the timeout.
|
|
|
|
Args:
|
|
device_type: string format of the type of device.
|
|
timeout: in seconds
|
|
|
|
Returns: service or None
|
|
"""
|
|
start_time = time.time()
|
|
timeout = start_time + timeout
|
|
service = None
|
|
while time.time() < timeout:
|
|
service = self.FindElementByPropertySubstring('Service',
|
|
'Type', device_type)
|
|
if service:
|
|
break
|
|
time.sleep(.5)
|
|
return service
|
|
|
|
def FindCellularService(self, timeout=DEFAULT_CELLULAR_TIMEOUT):
|
|
return self._FindService(self.DEVICE_CELLULAR, timeout)
|
|
|
|
def FindWimaxService(self, timeout=30):
|
|
return self._FindService(self.DEVICE_WIMAX, timeout)
|
|
|
|
def GetService(self, params):
|
|
path = self.manager.GetService(params)
|
|
return self.GetObjectInterface("Service", path)
|
|
|
|
def ConnectService(self, assoc_timeout=15, config_timeout=15,
|
|
async=False, service=None, service_type='',
|
|
retry=False, retries=1, retry_sleep=15,
|
|
save_creds=False,
|
|
**kwargs):
|
|
"""Connect to a service and wait until connection is up
|
|
Args:
|
|
assoc_timeout, config_timeout: Timeouts in seconds.
|
|
async: return immediately. do not wait for connection.
|
|
service: DBus service
|
|
service_type: If supplied, invoke type-specific code to find service.
|
|
retry: Retry connection after Connect failure.
|
|
retries: Number of retries to allow.
|
|
retry_sleep: Number of seconds to wait before retrying.
|
|
kwargs: Additional args for type-specific code
|
|
|
|
Returns:
|
|
(success, dictionary), where dictionary contains stats and diagnostics.
|
|
"""
|
|
output = {}
|
|
connected_states = ["ready", "portal", "online"]
|
|
|
|
# Retry connections on failure. Need to call GetService again as some
|
|
# Connect failure states are unrecoverable.
|
|
connect_success = False
|
|
while not connect_success:
|
|
if service_type == "wifi":
|
|
try:
|
|
# Sanity check to make sure the caller hasn't provided
|
|
# both a service and a service type. At which point its
|
|
# unclear what they actually want to do, so err on the
|
|
# side of caution and except out.
|
|
if service:
|
|
raise Exception('supplied service and service type')
|
|
params = {
|
|
"Type": service_type,
|
|
"Mode": kwargs["mode"],
|
|
"SSID": kwargs["ssid"],
|
|
"Security": kwargs.get("security", "none"),
|
|
"SaveCredentials": save_creds }
|
|
# Supply a passphrase only if it is non-empty.
|
|
passphrase = kwargs.get("passphrase", "")
|
|
if passphrase:
|
|
params["Passphrase"] = passphrase
|
|
path = self.manager.GetService(params)
|
|
service = self.GetObjectInterface("Service", path)
|
|
except Exception, e:
|
|
output["reason"] = "FAIL(GetService): exception %s" % e
|
|
return (False, output)
|
|
|
|
output["service"] = service
|
|
|
|
try:
|
|
service.Connect()
|
|
connect_success = True
|
|
except Exception, e:
|
|
if not retry or retries == 0:
|
|
output["reason"] = "FAIL(Connect): exception %s" % e
|
|
return (False, output)
|
|
else:
|
|
logging.info("INFO(Connect): connect failed. Retrying...")
|
|
retries -= 1
|
|
|
|
if not connect_success:
|
|
# FlimFlam can be a little funny sometimes. At least for In
|
|
# Progress errors, even though the service state may be failed,
|
|
# it is actually still trying to connect. As such, while we're
|
|
# waiting for retry, keep checking the service state to see if
|
|
# it actually succeeded in connecting.
|
|
state = FlimFlam.WaitForServiceState(
|
|
service=service,
|
|
expected_states=connected_states,
|
|
timeout=retry_sleep,
|
|
ignore_failure=True)[0]
|
|
|
|
if state in connected_states:
|
|
return (True, output)
|
|
|
|
# While service can be caller provided, it is also set by the
|
|
# GetService call above. If service was not caller provided we
|
|
# need to reset it to None so we don't fail the sanity check
|
|
# above.
|
|
if service_type != '':
|
|
service = None
|
|
|
|
if async:
|
|
return (True, output)
|
|
|
|
logging.info("Associating...")
|
|
(state, assoc_time) = (
|
|
FlimFlam.WaitForServiceState(service,
|
|
["configuration"] + connected_states,
|
|
assoc_timeout))
|
|
output["state"] = state
|
|
if state == "failure":
|
|
output["reason"] = "FAIL(assoc)"
|
|
if assoc_time > assoc_timeout:
|
|
output["reason"] = "TIMEOUT(assoc)"
|
|
output["assoc_time"] = assoc_time
|
|
if "reason" in output:
|
|
return (False, output)
|
|
|
|
|
|
(state, config_time) = (
|
|
FlimFlam.WaitForServiceState(service,
|
|
connected_states, config_timeout))
|
|
output["state"] = state
|
|
if state == "failure":
|
|
output["reason"] = "FAIL(config)"
|
|
if config_time > config_timeout:
|
|
output["reason"] = "TIMEOUT(config)"
|
|
output["config_time"] = config_time
|
|
|
|
if "reason" in output:
|
|
return (False, output)
|
|
|
|
return (True, output)
|
|
|
|
def GetObjectInterface(self, kind, path):
|
|
return dbus.Interface(
|
|
self.bus.get_object(FlimFlam.SHILL_DBUS_INTERFACE, path),
|
|
FlimFlam.SHILL_DBUS_INTERFACE + "." + kind)
|
|
|
|
def FindElementByNameSubstring(self, kind, substring):
|
|
properties = self.manager.GetProperties(utf8_strings = True)
|
|
for path in properties[FlimFlam._GetContainerName(kind)]:
|
|
if path.find(substring) >= 0:
|
|
return self.GetObjectInterface(kind, path)
|
|
return None
|
|
|
|
def FindElementByPropertySubstring(self, kind, prop, substring):
|
|
properties = self.manager.GetProperties(utf8_strings = True)
|
|
for path in properties[FlimFlam._GetContainerName(kind)]:
|
|
obj = self.GetObjectInterface(kind, path)
|
|
try:
|
|
obj_properties = obj.GetProperties(utf8_strings = True)
|
|
except dbus.exceptions.DBusException, error:
|
|
if (error.get_dbus_name() == self.UNKNOWN_METHOD or
|
|
error.get_dbus_name() == self.UNKNOWN_OBJECT):
|
|
# object disappeared; ignore and keep looking
|
|
continue
|
|
else:
|
|
raise error
|
|
if (prop in obj_properties and
|
|
obj_properties[prop].find(substring) >= 0):
|
|
return obj
|
|
return None
|
|
|
|
def GetObjectList(self, kind, properties=None):
|
|
if properties is None:
|
|
properties = self.manager.GetProperties(utf8_strings = True)
|
|
return [self.GetObjectInterface(kind, path)
|
|
for path in properties[FlimFlam._GetContainerName(kind)]]
|
|
|
|
def GetActiveProfile(self):
|
|
properties = self.manager.GetProperties(utf8_strings = True)
|
|
return self.GetObjectInterface("Profile", properties["ActiveProfile"])
|
|
|
|
def CreateProfile(self, ident):
|
|
path = self.manager.CreateProfile(ident)
|
|
return self.GetObjectInterface("Profile", path)
|
|
|
|
def RemoveProfile(self, ident):
|
|
self.manager.RemoveProfile(ident)
|
|
|
|
def PushProfile(self, ident):
|
|
path = self.manager.PushProfile(ident)
|
|
return self.GetObjectInterface("Profile", path)
|
|
|
|
def PopProfile(self, ident):
|
|
self.manager.PopProfile(ident)
|
|
|
|
def PopAnyProfile(self):
|
|
self.manager.PopAnyProfile()
|
|
|
|
def GetSystemState(self):
|
|
properties = self.manager.GetProperties(utf8_strings = True)
|
|
return properties["State"]
|
|
|
|
def GetDebugTags(self):
|
|
return self.manager.GetDebugTags()
|
|
|
|
def ListDebugTags(self):
|
|
return self.manager.ListDebugTags()
|
|
|
|
def SetDebugTags(self, taglist):
|
|
try:
|
|
self.manager.SetDebugTags(taglist)
|
|
self.SetDebugLevel(-4)
|
|
except dbus.exceptions.DBusException, error:
|
|
if error.get_dbus_name() not in [
|
|
"org.freedesktop.DBus.Error.UnknownMethod" ]:
|
|
raise error
|
|
|
|
def SetDebugLevel(self, level):
|
|
self.manager.SetDebugLevel(level)
|
|
|
|
def GetServiceOrder(self):
|
|
return self.manager.GetServiceOrder()
|
|
|
|
def SetServiceOrder(self, new_order):
|
|
old_order = self.GetServiceOrder()
|
|
self.manager.SetServiceOrder(new_order)
|
|
return (old_order, new_order)
|
|
|
|
def EnableTechnology(self, tech):
|
|
try:
|
|
self.manager.EnableTechnology(tech)
|
|
except dbus.exceptions.DBusException, error:
|
|
if error.get_dbus_name() not in [
|
|
FlimFlam.SHILL_DBUS_INTERFACE + ".Error.AlreadyEnabled",
|
|
FlimFlam.SHILL_DBUS_INTERFACE + ".Error.InProgress" ]:
|
|
raise error
|
|
|
|
def DisableTechnology(self, tech):
|
|
self.manager.DisableTechnology(tech, timeout=60)
|
|
|
|
def RequestScan(self, technology):
|
|
self.manager.RequestScan(technology)
|
|
|
|
def GetCountry(self):
|
|
properties = self.manager.GetProperties(utf8_strings = True)
|
|
return properties["Country"]
|
|
|
|
def SetCountry(self, country):
|
|
self.manager.SetProperty("Country", country)
|
|
|
|
def GetCheckPortalList(self):
|
|
properties = self.manager.GetProperties(utf8_strings = True)
|
|
return properties["CheckPortalList"]
|
|
|
|
def SetCheckPortalList(self, tech_list):
|
|
self.manager.SetProperty("CheckPortalList", tech_list)
|
|
|
|
def GetPortalURL(self):
|
|
properties = self.manager.GetProperties(utf8_strings = True)
|
|
return properties["PortalURL"]
|
|
|
|
def SetPortalURL(self, url):
|
|
self.manager.SetProperty("PortalURL", url)
|
|
|
|
def GetArpGateway(self):
|
|
properties = self.manager.GetProperties()
|
|
return properties["ArpGateway"]
|
|
|
|
def SetArpGateway(self, do_arp_gateway):
|
|
self.manager.SetProperty("ArpGateway", do_arp_gateway)
|
|
|
|
|
|
class DeviceManager(object):
|
|
# DEPRECATED
|
|
"""Use flimflam to isolate a given interface for testing.
|
|
|
|
DeviceManager can be used to turn off network devices that are not
|
|
under test so that they will not interfere with testing.
|
|
|
|
NB: Ethernet devices are special inside Flimflam. You will need to
|
|
take care of them via other means (like, for example, the
|
|
backchannel ethernet code in client autotests)
|
|
|
|
Sample usage:
|
|
|
|
device_manager = flimflam.DeviceManager()
|
|
try:
|
|
device_manager.ShutdownAllExcept('cellular')
|
|
use routing.getRouteFor()
|
|
to verify that only the expected device is used
|
|
do stuff to test cellular connections
|
|
finally:
|
|
device_manager.RestoreDevices()
|
|
"""
|
|
|
|
@staticmethod
|
|
def _EnableDevice(device, enable):
|
|
"""Enables/Disables a device in shill."""
|
|
if enable:
|
|
device.Enable()
|
|
else:
|
|
device.Disable()
|
|
|
|
def __init__(self, flim=None):
|
|
self.flim_ = flim or FlimFlam()
|
|
self.devices_to_restore_ = []
|
|
|
|
def ShutdownAllExcept(self, device_type):
|
|
"""Shutdown all devices except device_type ones."""
|
|
for device in self.flim_.GetObjectList('Device'):
|
|
device_properties = device.GetProperties(utf8_strings = True)
|
|
if (device_properties["Type"] != device_type):
|
|
logging.info("Powering off %s device %s",
|
|
device_properties["Type"],
|
|
device.object_path)
|
|
self.devices_to_restore_.append(device.object_path)
|
|
DeviceManager._EnableDevice(device, False)
|
|
|
|
def RestoreDevices(self):
|
|
"""Restore devices powered down in ShutdownAllExcept."""
|
|
should_raise = False
|
|
to_raise = Exception("Nothing to raise")
|
|
for device_path in self.devices_to_restore_:
|
|
try:
|
|
logging.info("Attempting to power on device %s", device_path)
|
|
device = self.flim_.GetObjectInterface("Device", device_path)
|
|
DeviceManager._EnableDevice(device, True)
|
|
except Exception, e:
|
|
# We want to keep on trying to power things on, so save an
|
|
# exception and continue
|
|
should_raise = True
|
|
to_raise = e
|
|
if should_raise:
|
|
raise to_raise
|