218 lines
7.2 KiB
Python
218 lines
7.2 KiB
Python
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
import os
|
|
import subprocess
|
|
|
|
from autotest_lib.client.bin import utils
|
|
from autotest_lib.client.common_lib.cros import site_eap_certs
|
|
|
|
class HostapdServer(object):
|
|
"""Hostapd server instance wrapped in a context manager.
|
|
|
|
Simple interface to starting and controlling a hsotapd instance.
|
|
This can be combined with a virtual-ethernet setup to test 802.1x
|
|
on a wired interface.
|
|
|
|
Example usage:
|
|
with hostapd_server.HostapdServer(interface='veth_master') as hostapd:
|
|
hostapd.send_eap_packets()
|
|
|
|
"""
|
|
CONFIG_TEMPLATE = """
|
|
interface=%(interface)s
|
|
driver=%(driver)s
|
|
logger_syslog=-1
|
|
logger_syslog_level=2
|
|
logger_stdout=-1
|
|
logger_stdout_level=2
|
|
dump_file=%(config_directory)s/hostapd.dump
|
|
ctrl_interface=%(config_directory)s/%(control_directory)s
|
|
ieee8021x=1
|
|
eapol_key_index_workaround=0
|
|
eap_server=1
|
|
eap_user_file=%(config_directory)s/%(user_file)s
|
|
ca_cert=%(config_directory)s/%(ca_cert)s
|
|
server_cert=%(config_directory)s/%(server_cert)s
|
|
private_key=%(config_directory)s/%(server_key)s
|
|
use_pae_group_addr=1
|
|
eap_reauth_period=10
|
|
"""
|
|
CA_CERTIFICATE_FILE = 'ca.crt'
|
|
CONFIG_FILE = 'hostapd.conf'
|
|
CONTROL_DIRECTORY = 'hostapd.ctl'
|
|
EAP_PASSWORD = 'password'
|
|
EAP_PHASE2 = 'MSCHAPV2'
|
|
EAP_TYPE = 'PEAP'
|
|
EAP_USERNAME = 'test'
|
|
HOSTAPD_EXECUTABLE = 'hostapd'
|
|
HOSTAPD_CLIENT_EXECUTABLE = 'hostapd_cli'
|
|
SERVER_CERTIFICATE_FILE = 'server.crt'
|
|
SERVER_PRIVATE_KEY_FILE = 'server.key'
|
|
USER_AUTHENTICATION_TEMPLATE = """* %(type)s
|
|
"%(username)s"\t%(phase2)s\t"%(password)s"\t[2]
|
|
"""
|
|
USER_FILE = 'hostapd.eap_user'
|
|
# This is the default group MAC address to which EAP challenges
|
|
# are sent, absent any prior knowledge of a specific client on
|
|
# the link.
|
|
PAE_NEAREST_ADDRESS = '01:80:c2:00:00:03'
|
|
|
|
def __init__(self,
|
|
interface=None,
|
|
driver='wired',
|
|
config_directory='/tmp/hostapd-test'):
|
|
super(HostapdServer, self).__init__()
|
|
self._interface = interface
|
|
self._config_directory = config_directory
|
|
self._control_directory = '%s/%s' % (self._config_directory,
|
|
self.CONTROL_DIRECTORY)
|
|
self._driver = driver
|
|
self._process = None
|
|
|
|
|
|
def __enter__(self):
|
|
self.start()
|
|
return self
|
|
|
|
|
|
def __exit__(self, exception, value, traceback):
|
|
self.stop()
|
|
|
|
|
|
def write_config(self):
|
|
"""Write out a hostapd configuration file-set based on the caller
|
|
supplied parameters.
|
|
|
|
@return the file name of the top-level configuration file written.
|
|
|
|
"""
|
|
if not os.path.exists(self._config_directory):
|
|
os.mkdir(self._config_directory)
|
|
config_params = {
|
|
'ca_cert': self.CA_CERTIFICATE_FILE,
|
|
'config_directory' : self._config_directory,
|
|
'control_directory': self.CONTROL_DIRECTORY,
|
|
'driver': self._driver,
|
|
'interface': self._interface,
|
|
'server_cert': self.SERVER_CERTIFICATE_FILE,
|
|
'server_key': self.SERVER_PRIVATE_KEY_FILE,
|
|
'user_file': self.USER_FILE
|
|
}
|
|
authentication_params = {
|
|
'password': self.EAP_PASSWORD,
|
|
'phase2': self.EAP_PHASE2,
|
|
'username': self.EAP_USERNAME,
|
|
'type': self.EAP_TYPE
|
|
}
|
|
for filename, contents in (
|
|
( self.CA_CERTIFICATE_FILE, site_eap_certs.ca_cert_1 ),
|
|
( self.CONFIG_FILE, self.CONFIG_TEMPLATE % config_params),
|
|
( self.SERVER_CERTIFICATE_FILE, site_eap_certs.server_cert_1 ),
|
|
( self.SERVER_PRIVATE_KEY_FILE,
|
|
site_eap_certs.server_private_key_1 ),
|
|
( self.USER_FILE,
|
|
self.USER_AUTHENTICATION_TEMPLATE % authentication_params )):
|
|
config_file = '%s/%s' % (self._config_directory, filename)
|
|
with open(config_file, 'w') as f:
|
|
f.write(contents)
|
|
return '%s/%s' % (self._config_directory, self.CONFIG_FILE)
|
|
|
|
|
|
def start(self):
|
|
"""Start the hostap server."""
|
|
config_file = self.write_config()
|
|
self._process = subprocess.Popen(
|
|
[self.HOSTAPD_EXECUTABLE, '-dd', config_file])
|
|
|
|
|
|
def stop(self):
|
|
"""Stop the hostapd server."""
|
|
if self._process:
|
|
self._process.terminate()
|
|
self._process.wait()
|
|
self._process = None
|
|
|
|
|
|
def running(self):
|
|
"""Tests whether the hostapd process is still running.
|
|
|
|
@return True if the hostapd process is still running, False otherwise.
|
|
|
|
"""
|
|
if not self._process:
|
|
return False
|
|
|
|
if self._process.poll() != None:
|
|
# We have essentially reaped the proces, and it is no more.
|
|
self._process = None
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def send_eap_packets(self):
|
|
"""Start sending EAP packets to the nearest neighbor."""
|
|
self.send_command('new_sta %s' % self.PAE_NEAREST_ADDRESS)
|
|
|
|
|
|
def get_client_mib(self, client_mac_address):
|
|
"""Get a dict representing the MIB properties for |client_mac_address|.
|
|
|
|
@param client_mac_address string MAC address of the client.
|
|
@return dict containing mib properties.
|
|
|
|
"""
|
|
# Expected output of "hostapd cli <client_mac_address>":
|
|
#
|
|
# Selected interface 'veth_master'
|
|
# b6:f1:39:1d:ad:10
|
|
# dot1xPaePortNumber=0
|
|
# dot1xPaePortProtocolVersion=2
|
|
# [...]
|
|
result = self.send_command('sta %s' % client_mac_address)
|
|
client_mib = {}
|
|
found_client = False
|
|
for line in result.splitlines():
|
|
if found_client:
|
|
parts = line.split('=', 1)
|
|
if len(parts) == 2:
|
|
client_mib[parts[0]] = parts[1]
|
|
elif line == client_mac_address:
|
|
found_client = True
|
|
return client_mib
|
|
|
|
|
|
def send_command(self, command):
|
|
"""Send a command to the hostapd instance.
|
|
|
|
@param command string containing the command to run on hostapd.
|
|
@return string output of the command.
|
|
|
|
"""
|
|
return utils.system_output('%s -p %s %s' %
|
|
(self.HOSTAPD_CLIENT_EXECUTABLE,
|
|
self._control_directory, command))
|
|
|
|
|
|
def client_has_authenticated(self, client_mac_address):
|
|
"""Return whether |client_mac_address| has successfully authenticated.
|
|
|
|
@param client_mac_address string MAC address of the client.
|
|
@return True if client is authenticated.
|
|
|
|
"""
|
|
mib = self.get_client_mib(client_mac_address)
|
|
return mib.get('dot1xAuthAuthSuccessesWhileAuthenticating', '') == '1'
|
|
|
|
|
|
def client_has_logged_off(self, client_mac_address):
|
|
"""Return whether |client_mac_address| has logged-off.
|
|
|
|
@param client_mac_address string MAC address of the client.
|
|
@return True if client has logged off.
|
|
|
|
"""
|
|
mib = self.get_client_mib(client_mac_address)
|
|
return mib.get('dot1xAuthAuthEapLogoffWhileAuthenticated', '') == '1'
|