348 lines
12 KiB
Python
348 lines
12 KiB
Python
# Copyright 2016 The Android Open Source Project
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import os
|
|
import re
|
|
|
|
|
|
class Device(object):
|
|
"""Create dict object for relay usb connection.
|
|
|
|
This class provides an interface to locate lab equipment without encoding
|
|
knowledge of the USB bus topology in the lab equipment device drivers.
|
|
"""
|
|
|
|
KEY_VID = 'vendor_id'
|
|
KEY_PID = 'product_id'
|
|
KEY_SN = 'serial_no'
|
|
KEY_INF = 'inf'
|
|
KEY_CFG = 'config'
|
|
KEY_NAME = 'name'
|
|
KEY_TTY = 'tty_path'
|
|
KEY_MFG = 'mfg'
|
|
KEY_PRD = 'product'
|
|
KEY_VER = 'version'
|
|
|
|
_instance = None
|
|
|
|
_USB_DEVICE_SYS_ROOT = '/sys/bus/usb/devices'
|
|
_DEV_ROOT = '/dev'
|
|
|
|
_SYS_VENDOR_ID = 'idVendor'
|
|
_SYS_PRODUCT_ID = 'idProduct'
|
|
_SYS_SERIAL_NO = 'serial'
|
|
_INF_CLASS = 'bInterfaceClass'
|
|
_INF_SUB_CLASS = 'bInterfaceSubClass'
|
|
_INF_PROTOCOL = 'bInterfaceProtocol'
|
|
_MFG_STRING = 'manufacturer'
|
|
_PRODUCT_STRING = 'product'
|
|
_VERSION_STRING = 'version'
|
|
|
|
_USB_CDC_ACM_CLASS = 0x02
|
|
_USB_CDC_ACM_SUB_CLASS = 0x02
|
|
_USB_CDC_ACM_PROTOCOL = 0x01
|
|
|
|
def __init__(self, name, vid, pid, cfg, inf):
|
|
self._device_list = []
|
|
|
|
self._build_device(name, vid, pid, cfg, inf)
|
|
|
|
self._walk_usb_tree(self._init_device_list_callback, None)
|
|
|
|
def __new__(cls, *args, **kwargs):
|
|
# The Device class should be a singleton. A lab test procedure may
|
|
# use multiple pieces of lab equipment and we do not want to have to
|
|
# create a new instance of the Device for each device.
|
|
if not cls._instance:
|
|
cls._instance = super(Device, cls).__new__(cls, *args, **kwargs)
|
|
return cls._instance
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exception_type, exception_value, traceback):
|
|
pass
|
|
|
|
def _build_device(self, name, vid, pid, cfg, inf):
|
|
"""Build relay device information.
|
|
|
|
Args:
|
|
name: device
|
|
vid: vendor ID
|
|
pid: product ID
|
|
cfg: configuration
|
|
inf: interface
|
|
|
|
Returns:
|
|
Nothing
|
|
"""
|
|
entry = {}
|
|
entry[self.KEY_NAME] = name
|
|
entry[self.KEY_VID] = int(vid, 16)
|
|
entry[self.KEY_PID] = int(pid, 16)
|
|
|
|
# The serial number string is optional in USB and not all devices
|
|
# use it. The relay devices do not use it then we specify 'None' in
|
|
# the lab configuration file.
|
|
entry[self.KEY_SN] = None
|
|
entry[self.KEY_CFG] = int(cfg)
|
|
entry[self.KEY_INF] = int(inf)
|
|
entry[self.KEY_TTY] = None
|
|
|
|
self._device_list.append(entry)
|
|
|
|
def _find_lab_device_entry(self, vendor_id, product_id, serial_no):
|
|
"""find a device in the lab device list.
|
|
|
|
Args:
|
|
vendor_id: unique vendor id for device
|
|
product_id: unique product id for device
|
|
serial_no: serial string for the device (may be None)
|
|
|
|
Returns:
|
|
device entry or None
|
|
"""
|
|
for device in self._device_list:
|
|
if device[self.KEY_VID] != vendor_id:
|
|
continue
|
|
if device[self.KEY_PID] != product_id:
|
|
continue
|
|
if device[self.KEY_SN] == serial_no:
|
|
return device
|
|
|
|
return None
|
|
|
|
def _read_sys_attr(self, root, attr):
|
|
"""read a sysfs attribute.
|
|
|
|
Args:
|
|
root: path of the sysfs directory
|
|
attr: attribute to read
|
|
|
|
Returns:
|
|
attribute value or None
|
|
"""
|
|
try:
|
|
path = os.path.join(root, attr)
|
|
with open(path) as f:
|
|
return f.readline().rstrip()
|
|
except IOError:
|
|
return None
|
|
|
|
def _read_sys_hex_attr(self, root, attr):
|
|
"""read a sysfs hexadecimal integer attribute.
|
|
|
|
Args:
|
|
root: path of the sysfs directory
|
|
attr: attribute to read
|
|
|
|
Returns:
|
|
attribute value or None
|
|
"""
|
|
try:
|
|
path = os.path.join(root, attr)
|
|
with open(path) as f:
|
|
return int(f.readline(), 16)
|
|
except IOError:
|
|
return None
|
|
|
|
def _is_cdc_acm(self, inf_path):
|
|
"""determine if the interface implements the CDC ACM class.
|
|
|
|
Args:
|
|
inf_path: directory entry for the inf under /sys/bus/usb/devices
|
|
|
|
Returns:
|
|
True if the inf is CDC ACM, false otherwise
|
|
"""
|
|
cls = self._read_sys_hex_attr(inf_path, self._INF_CLASS)
|
|
sub_cls = self._read_sys_hex_attr(inf_path, self._INF_SUB_CLASS)
|
|
proto = self._read_sys_hex_attr(inf_path, self._INF_PROTOCOL)
|
|
if self._USB_CDC_ACM_CLASS != cls:
|
|
return False
|
|
if self._USB_CDC_ACM_SUB_CLASS != sub_cls:
|
|
return False
|
|
if self._USB_CDC_ACM_PROTOCOL != proto:
|
|
return False
|
|
|
|
return True
|
|
|
|
def _read_tty_name(self, dir_entry, inf, cfg):
|
|
"""Get the path to the associated tty device.
|
|
|
|
Args:
|
|
dir_entry: directory entry for the device under /sys/bus/usb/devices
|
|
inf: Interface number of the device
|
|
cfg: Configuration number of the device
|
|
|
|
Returns:
|
|
Path to a tty device or None
|
|
"""
|
|
inf_path = os.path.join(self._USB_DEVICE_SYS_ROOT,
|
|
'%s:%d.%d' % (dir_entry, cfg, inf))
|
|
|
|
# first determine if this is a CDC-ACM or USB Serial device.
|
|
if self._is_cdc_acm(inf_path):
|
|
tty_list = os.listdir(os.path.join(inf_path, 'tty'))
|
|
|
|
# Each CDC-ACM interface should only have one tty device associated
|
|
# with it so just return the first item in the list.
|
|
return os.path.join(self._DEV_ROOT, tty_list[0])
|
|
else:
|
|
# USB Serial devices have a link to their ttyUSB* device in the inf
|
|
# directory
|
|
tty_re = re.compile(r'ttyUSB\d+$')
|
|
|
|
dir_list = os.listdir(inf_path)
|
|
for entry in dir_list:
|
|
if tty_re.match(entry):
|
|
return os.path.join(self._DEV_ROOT, entry)
|
|
|
|
return None
|
|
|
|
def _init_device_list_callback(self, _, dir_entry):
|
|
"""Callback function used with _walk_usb_tree for device list init.
|
|
|
|
Args:
|
|
_: Callback context (unused)
|
|
dir_entry: Directory entry reported by _walk_usb_tree
|
|
|
|
"""
|
|
path = os.path.join(self._USB_DEVICE_SYS_ROOT, dir_entry)
|
|
|
|
# The combination of vendor id, product id, and serial number
|
|
# should be sufficient to uniquely identify each piece of lab
|
|
# equipment.
|
|
vendor_id = self._read_sys_hex_attr(path, self._SYS_VENDOR_ID)
|
|
product_id = self._read_sys_hex_attr(path, self._SYS_PRODUCT_ID)
|
|
serial_no = self._read_sys_attr(path, self._SYS_SERIAL_NO)
|
|
|
|
# For each device try to match it with a device entry in the lab
|
|
# configuration.
|
|
device = self._find_lab_device_entry(vendor_id, product_id, serial_no)
|
|
if device:
|
|
# If the device is in the lab configuration then determine
|
|
# which tty device it associated with.
|
|
device[self.KEY_TTY] = self._read_tty_name(dir_entry,
|
|
device[self.KEY_INF],
|
|
device[self.KEY_CFG])
|
|
|
|
def _list_all_tty_devices_callback(self, dev_list, dir_entry):
|
|
"""Callback for _walk_usb_tree when listing all USB serial devices.
|
|
|
|
Args:
|
|
dev_list: Device list to fill
|
|
dir_entry: Directory entry reported by _walk_usb_tree
|
|
|
|
"""
|
|
dev_path = os.path.join(self._USB_DEVICE_SYS_ROOT, dir_entry)
|
|
|
|
# Determine if there are any interfaces in the sys directory for the
|
|
# USB Device.
|
|
inf_re = re.compile(r'\d+-\d+(\.\d+){0,}:(?P<cfg>\d+)\.(?P<inf>\d+)$')
|
|
inf_dir_list = os.listdir(dev_path)
|
|
|
|
for inf_entry in inf_dir_list:
|
|
inf_match = inf_re.match(inf_entry)
|
|
if inf_match is None:
|
|
continue
|
|
|
|
inf_dict = inf_match.groupdict()
|
|
inf = int(inf_dict['inf'])
|
|
cfg = int(inf_dict['cfg'])
|
|
|
|
# Check to see if there is a tty device associated with this
|
|
# interface.
|
|
tty_path = self._read_tty_name(dir_entry, inf, cfg)
|
|
if tty_path is None:
|
|
continue
|
|
|
|
# This is a TTY interface, create a dictionary of the relevant
|
|
# sysfs attributes for this device.
|
|
entry = {}
|
|
entry[self.KEY_TTY] = tty_path
|
|
entry[self.KEY_INF] = inf
|
|
entry[self.KEY_CFG] = cfg
|
|
entry[self.KEY_VID] = self._read_sys_hex_attr(dev_path,
|
|
self._SYS_VENDOR_ID)
|
|
entry[self.KEY_PID] = self._read_sys_hex_attr(dev_path,
|
|
self._SYS_PRODUCT_ID)
|
|
entry[self.KEY_SN] = self._read_sys_attr(dev_path,
|
|
self._SYS_SERIAL_NO)
|
|
entry[self.KEY_MFG] = self._read_sys_attr(dev_path,
|
|
self._MFG_STRING)
|
|
entry[self.KEY_PRD] = self._read_sys_attr(dev_path,
|
|
self._PRODUCT_STRING)
|
|
entry[self.KEY_VER] = self._read_sys_attr(dev_path,
|
|
self._VERSION_STRING)
|
|
|
|
# If this device is also in the lab device list then add the
|
|
# friendly name for it.
|
|
lab_device = self._find_lab_device_entry(entry[self.KEY_VID],
|
|
entry[self.KEY_PID],
|
|
entry[self.KEY_SN])
|
|
if lab_device is not None:
|
|
entry[self.KEY_NAME] = lab_device[self.KEY_NAME]
|
|
|
|
dev_list.append(entry)
|
|
|
|
def _walk_usb_tree(self, callback, context):
|
|
"""Walk the USB device and locate lab devices.
|
|
|
|
Traverse the USB device tree in /sys/bus/usb/devices and inspect each
|
|
device and see if it matches a device in the lab configuration. If
|
|
it does then get the path to the associated tty device.
|
|
|
|
Args:
|
|
callback: Callback to invoke when a USB device is found.
|
|
context: Context variable for callback.
|
|
|
|
Returns:
|
|
Nothing
|
|
"""
|
|
# Match only devices, exclude interfaces and root hubs
|
|
file_re = re.compile(r'\d+-\d+(\.\d+){0,}$')
|
|
dir_list = os.listdir(self._USB_DEVICE_SYS_ROOT)
|
|
|
|
for dir_entry in dir_list:
|
|
if file_re.match(dir_entry):
|
|
callback(context, dir_entry)
|
|
|
|
def get_tty_path(self, name):
|
|
"""Get the path to the tty device for a given lab device.
|
|
|
|
Args:
|
|
name: lab device identifier, e.g. 'rail', or 'bt_trigger'
|
|
|
|
Returns:
|
|
Path to the tty device otherwise None
|
|
"""
|
|
for dev in self._device_list:
|
|
if dev[self.KEY_NAME] == name and dev[self.KEY_NAME] is not None:
|
|
return dev[self.KEY_TTY]
|
|
|
|
return None
|
|
|
|
def get_tty_devices(self):
|
|
"""Get a list of all USB based tty devices attached to the machine.
|
|
|
|
Returns:
|
|
List of dictionaries where each dictionary contains a description of
|
|
the USB TTY device.
|
|
"""
|
|
all_dev_list = []
|
|
self._walk_usb_tree(self._list_all_tty_devices_callback, all_dev_list)
|
|
|
|
return all_dev_list
|
|
|