Source code for monk_tf.conn

# -*- coding: utf-8 -*-
#
# MONK automated test framework
#
# Copyright (C) 2013 DResearch Fahrzeugelektronik GmbH
# Written and maintained by MONK Developers <project-monk@dresearch-fe.de>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version # 3 of the License, or (at your option) any later version.  #

"""
This module implements connection handling. Using the classes from this module
you can connect directly to a :term:`target device` via serial or ssh.
Example::

    import monk_tf.conn as mc
    # create a serial connection
    serial=mc.SerialConn(port="/dev/ttyUSB3", user="tester", pw="test")
    # create a ssh connection
    ssh=mc.SshConn(host="192.168.2.123", user="tester", pw="test")
    # send a command
    print serial.cmd("ls -al")
    [...]
    # send a command
    ssh.cmd("ls -al")
    [...]
"""

import os
import sys
import re
import logging

import pexpect
from pexpect import fdpexpect

[docs]class ConnectionBase(object): """ is the base class for all connections. Don't instantiate this class directly. This class implements the behaviour of cmd() interactions, and it makes sure that it doesn't login a user that is already logged in. Extending this class requires to implement _get_exp() and _login(). """ def __init__(self): if hasattr(self, "name"): self._logger = logging.getLogger(self.name) else: self._logger = logging.getLogger(type(self).__name__) self._logger.debug("hi.") @property
[docs] def exp(self): """ the pexpect object - Don't bother with this if you don't know what it means. """ try: return self._exp except AttributeError as e: self._exp = self._get_exp() return self._exp
[docs] def login(self, user=None, pw=None, timeout=30): """ attempts to authenticate to the connection. Default for user and password are the one's given to the connection on instantiation. :param user: the username :param pw: the password :param timeout: how long the connection waits to see whether it is logged in already """ self._logger.debug("login({},{},{})".format(user, pw, timeout)) try: self.exp.sendline("") self.exp.expect(self.prompt, timeout=timeout) self._logger.debug("already logged in") except pexpect.TIMEOUT as e: self._login(user, pw)
[docs] def cmd(self, msg, expect=None, timeout=30): """ send a shell command and retreive its output. :param msg: the shell command :param expect: a regex that represents the end of an interaction. Defaults to the prompt set on connection instantiation :param timeout: how long a command call should wait for its desired result :return: the stdout and stderr of the shell command """ self._logger.debug("cmd({},{},{},{})".format( msg, expect, timeout, login_timeout)) self.login(timeout=login_timeout or timeout) self.exp.sendline(msg) expect_msg = re.escape(msg[:5]) + "[^\n]*\r\n" self._logger.debug("expect:" + expect_msg.encode("string-escape")) self.exp.expect(expect_msg, timeout=timeout) self._logger.debug("expect:" + (expect or self.prompt).encode("string-escape")) self.exp.expect(expect or self.prompt, timeout=timeout) self._logger.debug("cmd({}) result='{}' expect-match='{}'".format( str(msg[:15]).encode("string_escape") + ("[...]" if len(msg) > 15 else ""), str(self.exp.before[:50]).encode("string-escape") + ("[...]" if len(self.exp.before) > 50 else ""), str(self.exp.after[:50]).encode("string-escape") + ("[...]" if len(self.exp.after) > 50 else ""), )) return self.exp.before
def __del__(self): self._logger.debug("bye.") self.exp.close()
[docs]class SerialConn(ConnectionBase): """ implements a serial connection. """ def __init__(self, name, port, user, pw, prompt="\r?\n?[^\n]*#"): """ :param name: the name of the connection :param port: the path to the device file that is used for this connection :param user: the user name for the login :param pw: the password for the login :param prompt: the default prompt to check for """ self.name = name self.port = port self.user = user self.pw = pw self.prompt = prompt super(SerialConn, self).__init__() def _get_exp(self): spawn = fdpexpect.fdspawn(os.open(self.port, os.O_RDWR|os.O_NONBLOCK|os.O_NOCTTY)) #spawn.logfile = sys.stdout return spawn def _login(self, user=None, pw=None): self._logger.debug("serial._login({},{})".format(user, pw)) self.exp.expect("[lL]ogin: ") self.exp.sendline(user or self.user) self.exp.expect("[pP]assword: ") self.exp.sendline(pw or self.pw) self.exp.expect(self.prompt)
[docs]class SshConn(ConnectionBase): """ implements an ssh connection. """ def __init__(self, name, host, user, pw, prompt="\r?\n?[^\n]*#"): """ :param name: the name of the connection :param host: the URL to the device :param user: the user name for the login :param pw: the password for the login :param prompt: the default prompt to check for """ self.name = name self.host= host self.user = user self.pw = pw self.prompt = prompt super(SshConn, self).__init__() def _get_exp(self): return pexpect.spawn("ssh {}@{} -o TCPKeepAlive=yes -o ServerAliveInterval=5 -o ServerAliveCountMax=3".format( self.user, self.host )) def _login(self, user=None, pw=None): self._logger.debug("ssh._login({},{})".format(user, pw)) self.exp.expect("[pP]assword: ") self.exp.sendline(pw or self.pw) self.exp.expect(self.prompt)