746 lines
26 KiB
Python
746 lines
26 KiB
Python
#!/usr/bin/env python
|
|
|
|
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
import argparse
|
|
import dbus
|
|
import logging
|
|
import os
|
|
import signal
|
|
import sys
|
|
import traceback
|
|
|
|
import at_transceiver
|
|
import global_state
|
|
import modem_configuration
|
|
import task_loop
|
|
import wardmodem_exceptions as wme
|
|
|
|
import common
|
|
from autotest_lib.client.bin import utils
|
|
from autotest_lib.client.common_lib import error
|
|
from autotest_lib.client.cros.cellular import net_interface
|
|
|
|
STATE_MACHINE_DIR_NAME = 'state_machines'
|
|
|
|
class WardModem(object):
|
|
"""
|
|
The main wardmodem object that replaces a physical modem.
|
|
|
|
What it does:
|
|
- Loads configuration data.
|
|
- Accepts custom state machines from the test.
|
|
- Builds objects and ties them together.
|
|
- Exposes objects for further customization
|
|
|
|
What it does not do:
|
|
- Tweak the different knobs provided by internal objects that it exposes
|
|
for further customization.
|
|
That is the responsibility of the WardModemContext.
|
|
- Care about setting up / tearing down environment.
|
|
Again, see WardModemContext.
|
|
"""
|
|
|
|
def __init__(self,
|
|
replaced_modem = None,
|
|
state_machines = None,
|
|
modem_at_port_dev_name = None):
|
|
"""
|
|
@param replaced_modem: Name of the modem being emulated. If left None,
|
|
the base modem will be emulated. A list of valid modems can be
|
|
found in the module modem_configuration
|
|
|
|
@param state_machines: Objects of subtypes of StateMachine that override
|
|
any state machine defined in the configuration files for the
|
|
same well-known-name.
|
|
|
|
@param modem_at_port_dev_name: The full path to the primary AT port of
|
|
the physical modem. This is needed only if we're running in a
|
|
mode where we pass on modemmanager commands to the modem. This
|
|
should be a string of the form '/dev/XXX'
|
|
|
|
"""
|
|
self._logger = logging.getLogger(__name__)
|
|
|
|
if not state_machines:
|
|
state_machines = []
|
|
if modem_at_port_dev_name and (
|
|
type(modem_at_port_dev_name) is not str or
|
|
modem_at_port_dev_name[0:5] != '/dev/'):
|
|
raise wme.WardModemSetupException(
|
|
'Modem device name must be of the form "/dev/XXX", '
|
|
'where XXX is the udev device.')
|
|
|
|
# The modem that wardmodem is intended to replace.
|
|
self._replaced_modem = replaced_modem
|
|
|
|
# Pseudo net interface exported to shill.
|
|
self._net_interface = net_interface.PseudoNetInterface()
|
|
|
|
# The internal task loop object. Readable through a property.
|
|
self._task_loop = task_loop.TaskLoop()
|
|
|
|
# The global state object shared by all state machines.
|
|
self._state = global_state.GlobalState()
|
|
|
|
# The configuration object for the replaced modem.
|
|
self._modem_conf = modem_configuration.ModemConfiguration(
|
|
replaced_modem)
|
|
|
|
self._create_transceiver(modem_at_port_dev_name)
|
|
self._setup_state_machines(state_machines)
|
|
|
|
self._started = False
|
|
|
|
|
|
def start(self):
|
|
"""
|
|
Turns on the wardmodem.
|
|
|
|
This call is blocking. It will return after |stop| is called.
|
|
|
|
"""
|
|
self._logger.info('Starting wardmodem...')
|
|
self._net_interface.Setup()
|
|
self._task_loop.start()
|
|
|
|
|
|
def stop(self):
|
|
"""
|
|
Stops wardmodem and cleanup.
|
|
|
|
"""
|
|
# We need to delete a bunch of stuff *before* the task_loop can be
|
|
# stopped.
|
|
self._logger.info('Stopping wardmodem.')
|
|
self._net_interface.Teardown()
|
|
del self._transceiver
|
|
os.close(self._wm_at_port)
|
|
os.close(self._mm_at_port)
|
|
if self._modem_at_port:
|
|
os.close(self._modem_at_port)
|
|
self.task_loop.stop()
|
|
|
|
|
|
@property
|
|
def modem(self):
|
|
"""
|
|
The physical modem being replaced [read-only].
|
|
|
|
@return string representing the replaced modem.
|
|
|
|
"""
|
|
return self._replaced_modem
|
|
|
|
|
|
@property
|
|
def modem_conf(self):
|
|
"""
|
|
The ModemConfiguration object loaded for the replaced modem [read-only].
|
|
|
|
@return A ModemConfiguration object.
|
|
|
|
"""
|
|
return self._modem_conf
|
|
|
|
@property
|
|
def transceiver(self):
|
|
"""
|
|
The ATTransceiver that will orchestrate communication [read-only].
|
|
|
|
@return ATTransceiver object.
|
|
|
|
"""
|
|
return self._transceiver
|
|
|
|
|
|
@property
|
|
def task_loop(self):
|
|
"""
|
|
The main loop for asynchronous operations [read-only].
|
|
|
|
@return TaskLoop object.
|
|
|
|
"""
|
|
return self._task_loop
|
|
|
|
|
|
@property
|
|
def state(self):
|
|
"""
|
|
The global state object that must by shared by all state machines.
|
|
|
|
@return GlobalState object.
|
|
|
|
"""
|
|
return self._state
|
|
|
|
|
|
@property
|
|
def mm_at_port_pts_name(self):
|
|
"""
|
|
Name of the pty terminal to be used by modemmanager.
|
|
|
|
@return A string of the form 'pts/X' where X is the pty number.
|
|
|
|
"""
|
|
fullname = os.ttyname(self._mm_at_port)
|
|
# fullname is of the form /dev/pts/X where X is a pts number.
|
|
# We want to return just the pts/X part.
|
|
assert fullname[0:5] == '/dev/'
|
|
return fullname[5:]
|
|
|
|
|
|
def _create_transceiver(self, modem_at_port_dev_name):
|
|
"""
|
|
Opens a pty pair and initialize ATTransceiver.
|
|
|
|
@param modem_at_port_dev_name: The device name of the primary port.
|
|
|
|
"""
|
|
self._modem_at_port = None
|
|
if modem_at_port_dev_name:
|
|
try:
|
|
self._modem_at_port = os.open(modem_at_port_dev_name,
|
|
os.O_RDWR)
|
|
except (TypeError, OSError) as e:
|
|
logging.warning('Could not open modem_port |%s|\nError:\n%s',
|
|
modem_at_port_dev_name, e)
|
|
|
|
self._wm_at_port, self._mm_at_port = os.openpty()
|
|
self._transceiver = at_transceiver.ATTransceiver(self._wm_at_port,
|
|
self._modem_conf,
|
|
self._modem_at_port)
|
|
|
|
def _setup_state_machines(self, client_machines):
|
|
"""
|
|
Creates the state machines looking at sources in the right order.
|
|
|
|
@param client_machines: The client provided state machine objects.
|
|
|
|
"""
|
|
# A local list of state machines created
|
|
state_machines = []
|
|
|
|
# Create the state machines comprising the wardmodem.
|
|
# Highest priority is given to the client provided state machines. The
|
|
# remaining will be instantiated based on |replaced_modem|.
|
|
for sm in client_machines:
|
|
if sm.get_well_known_name() in state_machines:
|
|
raise wme.SetupError('Multiple state machines provided with '
|
|
'well-known-name |%s|' %
|
|
sm.get_well_known_name)
|
|
state_machines.append(sm.get_well_known_name())
|
|
self._transceiver.register_state_machine(sm)
|
|
self._logger.debug('Added client specified machine {%s --> %s}',
|
|
sm.get_well_known_name(),
|
|
sm.__class__.__name__)
|
|
# Now instantiate modem specific state machines.
|
|
for sm_module in self._modem_conf.plugin_state_machines:
|
|
sm = self._create_state_machine(sm_module)
|
|
if sm.get_well_known_name() not in state_machines:
|
|
state_machines.append(sm.get_well_known_name())
|
|
self._transceiver.register_state_machine(sm)
|
|
self._logger.debug(
|
|
'Added modem specific machine {%s --> %s}',
|
|
sm.get_well_known_name(),
|
|
sm.__class__.__name__)
|
|
# Finally instantiate generic state machines.
|
|
for sm_module in self._modem_conf.base_state_machines:
|
|
sm = self._create_state_machine(sm_module)
|
|
if sm.get_well_known_name() not in state_machines:
|
|
state_machines.append(sm.get_well_known_name())
|
|
self._transceiver.register_state_machine(sm)
|
|
self._logger.debug('Added default machine {%s --> %s}',
|
|
sm.get_well_known_name(),
|
|
sm.__class__.__name__)
|
|
self._logger.info('Loaded state machines: %s', str(state_machines))
|
|
|
|
# Also setup the fallback state machine
|
|
self._transceiver.register_fallback_state_machine(
|
|
self._modem_conf.fallback_machine,
|
|
self._modem_conf.fallback_function)
|
|
|
|
|
|
def _create_state_machine(self, module_name):
|
|
"""
|
|
Creates a state machine object given the |module_name|.
|
|
|
|
There is a specific naming convention for these state machine
|
|
definitions. If |module_name| is new_and_shiny_machine, the state
|
|
machine class must be named NewAndShinyMachine.
|
|
|
|
@param module_name: The name of the module from which the state machine
|
|
should be created.
|
|
|
|
@returns An object of type new_and_shiny_machine.NewAndShinyMachine, if
|
|
it exists.
|
|
|
|
@raises WardModemSetupError if |module_name| is malformed or the object
|
|
creation fails.
|
|
|
|
"""
|
|
# Obtain the name of the state machine class from module_name.
|
|
# viz, convert my_module_name --> MyModuleName
|
|
parts = module_name.split('_')
|
|
parts = [x.title() for x in parts]
|
|
class_name = ''.join(parts)
|
|
|
|
self._import_state_machine_module_as_sm(module_name)
|
|
return getattr(sm, class_name)(
|
|
self._state,
|
|
self._transceiver,
|
|
self._modem_conf)
|
|
|
|
|
|
def _import_state_machine_module_as_sm(self, module_name):
|
|
global sm
|
|
if module_name == 'call_machine':
|
|
from state_machines import call_machine as sm
|
|
elif module_name == 'call_machine_e362':
|
|
from state_machines import call_machine_e362 as sm
|
|
elif module_name == 'level_indicators_machine':
|
|
from state_machines import level_indicators_machine as sm
|
|
elif module_name == 'modem_power_level_machine':
|
|
from state_machines import modem_power_level_machine as sm
|
|
elif module_name == 'network_identity_machine':
|
|
from state_machines import network_identity_machine as sm
|
|
elif module_name == 'network_operator_machine':
|
|
from state_machines import network_operator_machine as sm
|
|
elif module_name == 'network_registration_machine':
|
|
from state_machines import network_registration_machine as sm
|
|
elif module_name == 'request_response':
|
|
from state_machines import request_response as sm
|
|
else:
|
|
raise wme.WardModemSetupException('Unknown state machine module: '
|
|
'%s' % module_name)
|
|
|
|
|
|
class WardModemContext(object):
|
|
"""
|
|
Setup wardmodem according to the options provided.
|
|
|
|
This context should be used by everyone to interact with WardModem.
|
|
This context will
|
|
(1) Setup wardmodem, setting the correct options on the internals exposed by
|
|
the wardmodem object.
|
|
(2) Manage the modemmanager instance during the context's lifetime.
|
|
|
|
"""
|
|
|
|
MODEMMANAGER_RESTART_TIMEOUT = 60
|
|
|
|
def __init__(self, use_wardmodem=True, detach=True, args=None):
|
|
"""
|
|
@param use_wardmodem: If False, this context is a no-op. Otherwise, the
|
|
whole wardmodem magic is done.
|
|
|
|
@param detach: A bool flag indicating whether wardmodem should be run in
|
|
its own process. If True, |start| will return immediately,
|
|
starting WardModem in its own process; Otherwise, |start| will
|
|
block until |stop| is called.
|
|
|
|
@param args: Options to setup WardModem. This is a list of string
|
|
command line arguments accepted by the parser defined in
|
|
|get_option_parser|.
|
|
TODO(pprabhu) Also except a dict of options to ease
|
|
customization in tests.
|
|
|
|
"""
|
|
self._logger = logging.getLogger(__name__)
|
|
self._logger.info('Initializing wardmodem context.')
|
|
|
|
self._use_wardmodem = use_wardmodem
|
|
if not self._use_wardmodem:
|
|
self._logger.info('WardModemContext directed to do nothing. '
|
|
'All wardmodem actions are no-op.')
|
|
self._logger.debug('........... Welcome to the real world Neo.')
|
|
return
|
|
|
|
self._logger.debug('Wardmodem arguments: detach: %s, args: %s',
|
|
detach, str(args))
|
|
|
|
self._started = False
|
|
self._wardmodem = None
|
|
self._was_mm_running = False
|
|
self._detach = detach
|
|
option_parser = self._get_option_parser()
|
|
|
|
# XXX:HACK For some reason, parse_args picks up argv when the context is
|
|
# created by an autotest test. Workaround: stash away the argv.
|
|
argv_stash = sys.argv
|
|
sys.argv = ['wardmodem']
|
|
self._options = option_parser.parse_args(args)
|
|
sys.argv = argv_stash
|
|
|
|
|
|
def __enter__(self):
|
|
self.start()
|
|
return self
|
|
|
|
|
|
def __exit__(self, type, value, traceback):
|
|
self.stop()
|
|
# Don't supress any exceptions raised in the 'with' block
|
|
return False
|
|
|
|
|
|
def start(self):
|
|
"""
|
|
Start the WardModem, setting up the correct environment.
|
|
|
|
If |detach| was True, this call will return immediately, running
|
|
WardModem in its own process; Otherwise, this call will block and only
|
|
return when |stop| is called.
|
|
|
|
"""
|
|
if not self._use_wardmodem:
|
|
return
|
|
|
|
if self._started:
|
|
raise wme.WardModemSetupException(
|
|
'Attempted to re-enter an already active wardmodem '
|
|
'context.')
|
|
|
|
self._started = True
|
|
self._wardmodem = WardModem(
|
|
self._options.modem,
|
|
modem_at_port_dev_name=self._options.modem_port)
|
|
if not self._prepare_wardmodem(self._options):
|
|
raise wme.WardModemSetupException(
|
|
'Contradictory wardmodem setup options detected.')
|
|
|
|
self._prepare_mm()
|
|
|
|
if not self._detach:
|
|
self._wardmodem.start()
|
|
return
|
|
|
|
self._logger.debug('Will fork wardmodem process.')
|
|
self._child = os.fork()
|
|
if self._child == 0:
|
|
# Setup a way to stop the child.
|
|
def _exit_child(signum, frame):
|
|
self._logger.info('Signal handler called with signal %s',
|
|
signum)
|
|
self._cleanup()
|
|
os._exit(0)
|
|
signal.signal(signal.SIGINT, _exit_child)
|
|
signal.signal(signal.SIGTERM, _exit_child)
|
|
# In detach mode, all uncaught exceptions raised by wardmodem
|
|
# will be thrown here. Since this is a child process, they will
|
|
# be lost.
|
|
# At least log them before throwing them again, so that we know
|
|
# something went wrong in wardmodem.
|
|
try:
|
|
self._wardmodem.start()
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
raise
|
|
|
|
else:
|
|
# Wait here for the child to start before continuing.
|
|
# We will continue once we know that modemmanager process has
|
|
# detected the wardmodem device, and has exported it on its dbus
|
|
# interface.
|
|
utils.poll_for_condition(
|
|
self._check_for_modem,
|
|
exception=wme.WardModemSetupException(
|
|
'Could not cleanly restart modemmanager.'),
|
|
timeout=self.MODEMMANAGER_RESTART_TIMEOUT,
|
|
sleep_interval=1)
|
|
self._logger.debug('Continuing the main process outside '
|
|
'wardmodem.')
|
|
|
|
|
|
def stop(self):
|
|
"""
|
|
Stops WardModem, restore environment to its previous state.
|
|
|
|
"""
|
|
if not self._use_wardmodem:
|
|
return
|
|
|
|
if not self._started:
|
|
self._logger.warning('No wardmodem instance running! '
|
|
'Nothing to stop.')
|
|
return
|
|
|
|
if self._detach:
|
|
self._logger.debug('Attempting to kill child wardmodem process.')
|
|
if self._child != 0:
|
|
os.kill(self._child, signal.SIGINT)
|
|
os.waitpid(self._child, 0)
|
|
self._child = 0
|
|
self._logger.debug('Finished waiting on child wardmodem process '
|
|
'to finish.')
|
|
else:
|
|
self._cleanup()
|
|
self._started = False
|
|
|
|
|
|
def _cleanup(self):
|
|
# Restore mm before turning off wardmodem.
|
|
self._restore_mm()
|
|
self._wardmodem.stop()
|
|
self._logger.info('Bye, Bye!')
|
|
|
|
|
|
def _prepare_wardmodem(self, options):
|
|
"""
|
|
Tweaks the internals exposed by WardModem post-creation according to the
|
|
options provided.
|
|
|
|
@param options: is an object returned by argparse.
|
|
|
|
"""
|
|
if options.modem:
|
|
if options.pass_through_mode:
|
|
self._logger.warning('Ignoring "modem" in pass-through-mode.')
|
|
|
|
if options.at_terminator:
|
|
self._wardmodem.transceiver.at_terminator = options.at_terminator
|
|
|
|
if options.pass_through_mode:
|
|
self._wardmodem.transceiver.mode = \
|
|
at_transceiver.ATTransceiverMode.PASS_THROUGH
|
|
|
|
if options.bridge_mode:
|
|
self._wardmodem.transceiver.mode = \
|
|
at_transceiver.ATTransceiverMode.SPLIT_VERIFY
|
|
|
|
if options.modem_port:
|
|
if not options.pass_through_mode and not options.bridge_mode:
|
|
self._logger.warning('Ignoring "modem-port" in normal mode.')
|
|
else:
|
|
if options.pass_through_mode or options.bridge_mode:
|
|
self._logger.error('"modem-port" needed in %s mode.' %
|
|
'bridge-mode' if options.bridge_mode else
|
|
'pass-through-mode')
|
|
return False
|
|
|
|
if options.fast:
|
|
if options.pass_through_mode:
|
|
self._logger.warning('Ignoring "fast" in pass-through-mode')
|
|
else:
|
|
self._wardmodem.task_loop.ignore_delays = True
|
|
|
|
if options.randomize_delays:
|
|
if options.fast:
|
|
self._logger.warning('Ignoring option "randomize-delays" '
|
|
'"fast" overrides "randomize-delays".')
|
|
if options.pass_through_mode:
|
|
self._logger.warning('Ignoring "randomize-delays" in '
|
|
'pass-through-mode')
|
|
if not options.fast and not options.pass_through_mode:
|
|
self._wardmodem.task_loop.random_delays = True
|
|
|
|
if options.max_randomized_delay:
|
|
if (options.fast or not options.randomize_delays or
|
|
options.pass_through_mode):
|
|
self._logger.warning('Ignoring "max_randomized_delays"')
|
|
else:
|
|
self._wardmodem.task_loop.max_random_delay_ms = \
|
|
options.max_randomized_delay
|
|
|
|
return True
|
|
|
|
|
|
def _prepare_mm(self):
|
|
"""
|
|
Starts modemmanager in test mode listening to the WardModem specified
|
|
pty end-point.
|
|
|
|
"""
|
|
self._was_mm_running = False
|
|
try:
|
|
result = utils.run('pgrep ModemManager')
|
|
if result.stdout:
|
|
self._was_mm_running = True
|
|
except error.CmdError:
|
|
pass
|
|
try:
|
|
utils.run('initctl stop modemmanager')
|
|
except error.CmdError:
|
|
pass
|
|
|
|
mm_opts = ''
|
|
mm_opts += '--log-level=DEBUG '
|
|
mm_opts += '--timestamps '
|
|
mm_opts += '--test '
|
|
mm_opts += '--debug '
|
|
mm_opts += '--test-plugin=' + self._wardmodem.modem_conf.mm_plugin + ' '
|
|
mm_opts += '--test-at-port="' + self._wardmodem.mm_at_port_pts_name + \
|
|
'" '
|
|
mm_opts += '--test-net-port=' + \
|
|
net_interface.PseudoNetInterface.IFACE_NAME + ' '
|
|
result = utils.run('ModemManager %s &' % mm_opts)
|
|
self._logger.debug('ModemManager stderr:\n%s', result.stderr)
|
|
|
|
|
|
def _check_for_modem(self):
|
|
bus = dbus.SystemBus()
|
|
try:
|
|
manager = bus.get_object('org.freedesktop.ModemManager1',
|
|
'/org/freedesktop/ModemManager1')
|
|
imanager = dbus.Interface(manager,
|
|
'org.freedesktop.DBus.ObjectManager')
|
|
modems = imanager.GetManagedObjects()
|
|
except dbus.exceptions.DBusException as e:
|
|
# The ObjectManager interface on modemmanager is not up yet.
|
|
return False
|
|
# Check if a modem with the test at port has been exported
|
|
if self._wardmodem.mm_at_port_pts_name in str(modems):
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
|
|
def _restore_mm(self):
|
|
"""
|
|
Stops the test instance of modemmanager and restore it to previous
|
|
state.
|
|
|
|
"""
|
|
result = None
|
|
try:
|
|
result = utils.run('pgrep ModemManager')
|
|
self._logger.warning('ModemManager in test mode still running! '
|
|
'Killing it ourselves.')
|
|
try:
|
|
utils.run('pkill -9 ModemManager')
|
|
except error.CmdError:
|
|
self._logger.warning('Failed to kill test ModemManager.')
|
|
except error.CmdError:
|
|
self._logger.debug('As expected: ModemManager in test mode killed.')
|
|
if self._was_mm_running:
|
|
try:
|
|
utils.run('initctl start modemmanager')
|
|
except error.CmdError:
|
|
self._logger.warning('Failed to restart modemmanager service.')
|
|
|
|
|
|
def _get_option_parser(self):
|
|
"""
|
|
Build an argparse parser to accept options from the user/test to tweak
|
|
WardModem post-creation.
|
|
|
|
"""
|
|
parser = argparse.ArgumentParser(
|
|
description='Run the wardmodem modem emulator.')
|
|
|
|
modem_group = parser.add_argument_group(
|
|
'Modem',
|
|
'Description of the modem to emulate.')
|
|
modem_group.add_argument(
|
|
'--modem',
|
|
help='The modem to emulate.')
|
|
modem_group.add_argument('--at-terminator',
|
|
help='The string terminator to use.')
|
|
|
|
physical_modem_group = parser.add_argument_group(
|
|
'Physical modem',
|
|
'Interaction with the physical modem on-board.')
|
|
physical_modem_group.add_argument(
|
|
'--pass-through-mode',
|
|
default=False,
|
|
nargs='?',
|
|
const=True,
|
|
help='Act as a transparent channel between the modem manager '
|
|
'and the physical modem. "--modem-port" option required.')
|
|
physical_modem_group.add_argument(
|
|
'--bridge-mode',
|
|
default=False,
|
|
nargs='?',
|
|
const=True,
|
|
help='Should we also setup a bridge with the real modem? Note '
|
|
'that the responses generated by wardmodem state machines '
|
|
'take precedence over those received from the physical '
|
|
'modem. The bridge is used for a soft-verification: A '
|
|
'warning is generated if the responses do not match. '
|
|
'"--modem-port" option required.')
|
|
physical_modem_group.add_argument(
|
|
'--modem-port',
|
|
help='The primary port used by the real modem. ')
|
|
|
|
behaviour_group = parser.add_argument_group(
|
|
'Behaviour',
|
|
'Tweak the behaviour of running wardmodem.')
|
|
behaviour_group.add_argument(
|
|
'--fast',
|
|
default=False,
|
|
nargs='?',
|
|
const=True,
|
|
help='Run the emulator with minimum delay between operations.')
|
|
behaviour_group.add_argument(
|
|
'--randomize-delays',
|
|
default=False,
|
|
nargs='?',
|
|
const=True,
|
|
help='Run emulator with randomized delays between operations.')
|
|
behaviour_group.add_argument(
|
|
'--max-randomized-delay',
|
|
type=int,
|
|
help='The maximum randomized delay added between operations in '
|
|
'"randomize-delays" mode.')
|
|
|
|
return parser
|
|
|
|
|
|
# ##############################################################################
|
|
# Run WardModem as a script.
|
|
# ##############################################################################
|
|
_wardmodem_context = None
|
|
|
|
SIGNAL_TO_NAMES_DICT = \
|
|
dict((getattr(signal, n), n)
|
|
for n in dir(signal) if n.startswith('SIG') and '_' not in n)
|
|
|
|
def exit_wardmodem_script(signum, frame):
|
|
"""
|
|
Signal handler to intercept Keyboard interrupt and stop the WardModem.
|
|
|
|
@param signum: The signal that was sent to the script
|
|
|
|
@param frame: Current stack frame [ignored].
|
|
|
|
"""
|
|
global _wardmodem_context
|
|
if signum == signal.SIGINT:
|
|
logging.info('Captured Ctrl-C. Exiting wardmodem.')
|
|
_wardmodem_context.stop()
|
|
else:
|
|
logging.warning('Captured unexpected signal: %s',
|
|
SIGNAL_TO_NAMES_DICT.get(signum, str(signum)))
|
|
|
|
|
|
def main():
|
|
"""
|
|
Entry function to wardmodem script.
|
|
|
|
"""
|
|
global _wardmodem_context
|
|
# HACK: I should not have logged anything before getting here, but
|
|
# basicConfig wasn't doing anything: So, attempt to clean config.
|
|
root = logging.getLogger()
|
|
if root.handlers:
|
|
for handler in root.handlers:
|
|
root.removeHandler(handler)
|
|
logger_format = ('[%(asctime)-15s][%(filename)s:%(lineno)s:%(levelname)s] '
|
|
'%(message)s')
|
|
logging.basicConfig(format=logger_format,
|
|
level=logging.DEBUG)
|
|
|
|
_wardmodem_context = WardModemContext(True, False, sys.argv[1:])
|
|
logging.info('\n####################################################\n'
|
|
'Running wardmodem, hit Ctrl+C to exit.\n'
|
|
'####################################################\n')
|
|
|
|
signal.signal(signal.SIGINT, exit_wardmodem_script)
|
|
_wardmodem_context.start()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|