当前位置: 移动技术网 > IT编程>脚本编程>Python > 开源工具 remoto 遇到的问题追踪与记录

开源工具 remoto 遇到的问题追踪与记录

2020年07月30日  | 移动技术网IT编程  | 我要评论
记录一次开发错误定位问题,在比较早之前碰到过这个问题,当时选择了回避,使用 paramiko 代替这个 remoto 模块。今天又碰到了这个问题,出于学习目的,打算认真研究这个问题,通过翻看源码,发现是自己的疏忽大意,忽略一个参数。故记录此次学习记录,以此为戒!1. 问题背景在 ceph-deploy 工具中我接触到了一个远程执行命令的好工具:remoto。但是后续在使用该模块进行编码时,遇到了一个问题,下面来仔细描述下这个问题。具体环境:三台服务器:R10-P01-DN-001.gd.cn、R10

记录一次开发错误定位问题,在比较早之前碰到过这个问题,当时选择了回避,使用 paramiko 代替这个 remoto 模块。今天又碰到了这个问题,出于学习目的,打算认真研究这个问题,通过翻看源码,发现是自己的疏忽大意,忽略一个参数。故记录此次学习记录,以此为戒!

1. 问题背景

在 ceph-deploy 工具中我接触到了一个远程执行命令的好工具:remoto。但是后续在使用该模块进行编码时,遇到了一个问题,下面来仔细描述下这个问题。

具体环境:

三台服务器:R10-P01-DN-001.gd.cn、R10-P01-DN-002.gd.cn、R10-P01-DN-002.gd.cn,其中 01 为主节点,对 01~03的节点都 ssh 免密。目前在 01 节点安装了 python3 而 02~03 只有 python2。

问题重现:

# 在python3中执行,访问001节点,正常

[store@R10-P01-DN-001 redis-agent]$ python3
Python 3.6.10 (default, Jun 19 2020, 10:51:42) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import remoto
>>> from remoto.process import check
>>> conn = remoto.Connection('R10-P01-DN-001.gd.cn')
>>> check(conn, ['hostname'])
INFO:R10-P01-DN-001.gd.cn:Running command: hostname
(['R10-P01-DN-001.gd.cn'], [], 0)

# 在 python3中执行,访问002或者003,异常
>>> conn = remoto.Connection('R10-P01-DN-002.gd.cn')
bash: python3: command not found
ERROR:R10-P01-DN-001.gd.cn:Can't communicate with remote host, possibly because python3 is not installed there
Traceback (most recent call last):
  File "/opt/python3.6/lib/python3.6/site-packages/execnet/gateway_base.py", line 997, in _send
    message.to_io(self._io)
  File "/opt/python3.6/lib/python3.6/site-packages/execnet/gateway_base.py", line 443, in to_io
    io.write(header + self.data)
  File "/opt/python3.6/lib/python3.6/site-packages/execnet/gateway_base.py", line 410, in write
    self.outfile.flush()
BrokenPipeError: [Errno 32] Broken pipe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/python3.6/lib/python3.6/site-packages/remoto/backends/__init__.py", line 35, in __init__
    self.gateway = self._make_gateway(hostname)
  File "/opt/python3.6/lib/python3.6/site-packages/remoto/backends/__init__.py", line 48, in _make_gateway
    gateway.reconfigure(py2str_as_py3str=False, py3str_as_py2str=False)
  File "/opt/python3.6/lib/python3.6/site-packages/execnet/gateway.py", line 72, in reconfigure
    self._send(Message.RECONFIGURE, data=data)
  File "/opt/python3.6/lib/python3.6/site-packages/execnet/gateway_base.py", line 1003, in _send
    raise IOError("cannot send (already closed?)")
OSError: cannot send (already closed?)
# python2中正常,因为对端主机也有python2

[store@R10-P01-DN-001 redis-agent]$ python
Python 2.7.5 (default, Apr  2 2020, 01:29:16)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import remoto
>>> from remoto.process import check
>>> conn = remoto.Connection('R10-P01-DN-002.gd.cn')
>>> check(conn, ['hostname', '-s'])
INFO:R10-P01-DN-001.gd.cn:Running command: hostname -s
([u'R10-P01-DN-002'], [], 0)

这个问题看着会比较明显,就是如果使用的 python3,就会去对端找 python3,找不到就会抛错。我们现在深入分析下源码内容,找到相应的逻辑判断语句进行修改。

2. 源码深入分析

remoto 工具是封装的 execnet 模块,想要彻底搞懂 remoto 模块代码,我们就必须先掌握好 execnet 模块的使用。我们从这个地址开始学习 execnet 模块,以下都是来自官方的例子:

# Execute source code in subprocess, communicate through a channel¶
>>> import execnet
>>> gw = execnet.makegateway()
>>> channel = gw.remote_exec("channel.send(channel.receive()+1)")
>>> channel.send(1)
>>> channel.receive()
2

下面这个例子是整个 remoto 的核心,它在远程执行一个函数,实现交互:

# 远程执行一个函数

import execnet

def multiplier(channel, factor):
    while not channel.isclosed():
        # 收到channel消息
        param = channel.receive()
        # 通过channel将结果返回
        channel.send(param * factor)

# 建立通道
gw = execnet.makegateway()
#远程执行函数multiplier(),后面的是传递给它的参数;返回channel,从中可以拿到返回的结果。
channel = gw.remote_exec(multiplier, factor=10)

for i in range(5):
    # 给channel发送数据
    channel.send(i)
    # 从channel拿到结果
    result = channel.receive()
    assert result == i * 10
    
# 关闭通道
gw.exit()

在这里插入图片描述
翻看 remoto 源码,可知 remoto 源码最核心的两个代码文件分别为:

  • backends/__init__.py:定义了 BaseConnection 类;
  • process.py:定义了最关键的 run()check() 方法;

先来看连接类,其实就是封装的 execnet 模块:

# 忽略导入模块
# ...

class BaseConnection(object):
    """
    Base class for Connection objects. Provides a generic interface to execnet
    for setting up the connection
    """
    executable = ''
    remote_import_system = 'legacy'

    def __init__(self, hostname, logger=None, sudo=False, threads=1, eager=True,
                 detect_sudo=False, interpreter=None, ssh_options=None):
        self.sudo = sudo
        self.hostname = hostname
        self.ssh_options = ssh_options
        self.logger = logger or basic_remote_logger()
        self.remote_module = None
        self.channel = None
        self.global_timeout = None  # wait for ever

        self.interpreter = interpreter or 'python%s' % sys.version_info[0]

        if eager:
            try:
                if detect_sudo:
                    self.sudo = self._detect_sudo()
                # 这里执行
                self.gateway = self._make_gateway(hostname)
            except OSError:
                self.logger.error(
                    "Can't communicate with remote host, possibly because "
                    "%s is not installed there" % self.interpreter
                )
                raise

    def _make_gateway(self, hostname):
        gateway = execnet.makegateway(
            self._make_connection_string(hostname)
        )
        # 这里报错
        gateway.reconfigure(py2str_as_py3str=False, py3str_as_py2str=False)
        return gateway

    def _detect_sudo(self, _execnet=None):
        """
        ``sudo`` detection has to create a different connection to the remote
        host so that we can reliably ensure that ``getuser()`` will return the
        right information.

        After getting the user info it closes the connection and returns
        a boolean
        """
        exc = _execnet or execnet
        gw = exc.makegateway(
            self._make_connection_string(self.hostname, use_sudo=False)
        )

        channel = gw.remote_exec(
            'import getpass; channel.send(getpass.getuser())'
        )

        result = channel.receive()
        gw.exit()

        if result == 'root':
            return False
        self.logger.debug('connection detected need for sudo')
        return True

    ##############################################################################
    def _make_connection_string(self, hostname, _needs_ssh=None, use_sudo=None):
        _needs_ssh = _needs_ssh or needs_ssh
        interpreter = self.interpreter
        if use_sudo is not None:
            if use_sudo:
                interpreter = 'sudo ' + interpreter
        elif self.sudo:
            interpreter = 'sudo ' + interpreter
        if _needs_ssh(hostname):
            if self.ssh_options:
                return 'ssh=%s %s//python=%s' % (
                    self.ssh_options, hostname, interpreter
                )
            else:
                return 'ssh=%s//python=%s' % (hostname, interpreter)
        return 'popen//python=%s' % interpreter
    ##############################################################################

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.exit()
        return False

    def cmd(self, cmd):
        """
        In the base connection class, this method just returns the ``cmd``
        as-is. Other implementations will end up doing transformations to the
        command by prefixing it with other flags needed. See
        :class:`KubernetesConnection` for an example
        """
        return cmd

    ############################################################################
    def execute(self, function, **kw):
        return self.gateway.remote_exec(function, **kw)
    ###########################################################################

    def exit(self):
        self.gateway.exit()

    def import_module(self, module):
        """
        Allows remote execution of a local module. Depending on the
        ``remote_import_system`` attribute it may use execnet's implementation
        or remoto's own based on JSON.

        .. note:: It is not possible to use execnet's remote execution model on
                  connections that aren't SSH or Local.
        """
        if self.remote_import_system is not None:
            if self.remote_import_system == 'json':
                self.remote_module = JsonModuleExecute(self, module, self.logger)
            else:
                self.remote_module = LegacyModuleExecute(self.gateway, module, self.logger)
        else:
            self.remote_module = LegacyModuleExecute(self.gateway, module, self.logger)
        return self.remote_module
    
# ...

前面问题涉及到的重点函数就是 _make_connection_string()execute(),后面结合 process.py 代码进行分析。

# 源码位置: remoto/process.py

# ...


def _remote_check(channel, cmd, **kw):
    import subprocess
    stdin = kw.pop('stdin', None)
    process = subprocess.Popen(
        cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE, **kw
    )

    if stdin:
        if not isinstance(stdin, bytes):
            stdin.encode('utf-8', errors='ignore')
        stdout_stream, stderr_stream = process.communicate(stdin)
    else:
        stdout_stream = process.stdout.read()
        stderr_stream = process.stderr.read()

    try:
        stdout_stream = stdout_stream.decode('utf-8')
        stderr_stream = stderr_stream.decode('utf-8')
    except AttributeError:
        pass

    stdout = stdout_stream.splitlines()
    stderr = stderr_stream.splitlines()
    channel.send((stdout, stderr, process.wait()))


def check(conn, command, exit=False, timeout=None, **kw):
    """
    Execute a remote command with ``subprocess.Popen`` but report back the
    results in a tuple with three items: stdout, stderr, and exit status.

    This helper function *does not* provide any logging as it is the caller's
    responsibility to do so.
    """
    command = conn.cmd(command)

    stop_on_error = kw.pop('stop_on_error', True)
    timeout = timeout or conn.global_timeout
    if not kw.get('env'):
        # get the remote environment's env so we can explicitly add
        # the path without wiping out everything
        kw = extend_env(conn, kw)

    conn.logger.info('Running command: %s' % ' '.join(admin_command(conn.sudo, command)))
    result = conn.execute(_remote_check, cmd=command, **kw)
    response = None
    try:
        response = result.receive(timeout)
    except Exception as err:
        # the things we need to do here :(
        # because execnet magic, we cannot catch this as
        # `except TimeoutError`
        if err.__class__.__name__ == 'TimeoutError':
            msg = 'No data was received after %s seconds, disconnecting...' % timeout
            conn.logger.warning(msg)
            # there is no stdout, stderr, or exit code but make the exit code
            # an error condition (non-zero) regardless
            return [], [], -1
        else:
            remote_trace = traceback.format_exc()
            remote_error = RemoteError(remote_trace)
            if remote_error.exception_name == 'RuntimeError':
                conn.logger.error(remote_error.exception_line)
            else:
                for tb_line in remote_trace.split('\n'):
                    conn.logger.error(tb_line)
            if stop_on_error:
                raise RuntimeError(
                    'Failed to execute command: %s' % ' '.join(command)
                )
    if exit:
        conn.exit()
    return response

可以看到 check() 方法最核心的语句如下:

conn.execute(_remote_check, cmd=command, **kw)

它就是使用 execnet 模块中的远程调用函数方法,远程调用 _remote_check(),这个函数是用于执行 shell 命令并得到相应的返回结果。这里会选择相应的 python 解释器来执行该函数。

回过头来思考下,我们是在做连接的时候就报错了:

>>> conn = remoto.Connection('R10-P01-DN-002.gd.cn')
# 省略异常信息

再看看 BaseConnection 类的初始化过程:

# ...

class BaseConnection(object):
    # ...
    
    def __init__(self, hostname, logger=None, sudo=False, threads=1, eager=True,
                 detect_sudo=False, interpreter=None, ssh_options=None):
        self.sudo = sudo
        self.hostname = hostname
        self.ssh_options = ssh_options
        self.logger = logger or basic_remote_logger()
        self.remote_module = None
        self.channel = None
        self.global_timeout = None  # wait for ever

        self.interpreter = interpreter or 'python%s' % sys.version_info[0]

        if eager:
            try:
                if detect_sudo:
                    self.sudo = self._detect_sudo()
                # 这里执行
                self.gateway = self._make_gateway(hostname)
            except OSError:
                self.logger.error(
                    "Can't communicate with remote host, possibly because "
                    "%s is not installed there" % self.interpreter
                )
                raise

    def _make_gateway(self, hostname):
        gateway = execnet.makegateway(
            self._make_connection_string(hostname)
        )
        # 这里报错
        gateway.reconfigure(py2str_as_py3str=False, py3str_as_py2str=False)
        return gateway
    
    # ...
    
    def _make_connection_string(self, hostname, _needs_ssh=None, use_sudo=None):
        _needs_ssh = _needs_ssh or needs_ssh
        interpreter = self.interpreter
        if use_sudo is not None:
            if use_sudo:
                interpreter = 'sudo ' + interpreter
        elif self.sudo:
            interpreter = 'sudo ' + interpreter
        if _needs_ssh(hostname):
            if self.ssh_options:
                return 'ssh=%s %s//python=%s' % (
                    self.ssh_options, hostname, interpreter
                )
            else:
                return 'ssh=%s//python=%s' % (hostname, interpreter)
        return 'popen//python=%s' % interpreter

可以看到,在 Connection 类初始过程中会对 execnet 模块设置 python 解释器,如果没有设置 self.interpreter,其赋值逻辑如下:

self.interpreter = interpreter or 'python%s' % sys.version_info[0]

这正是我们前面看到的现象,默认使用当前系统的 python 版本。于是我们只需要在连接远端主机时,设置 python 为 python2 版本即可解决这个问题:

3. 问题定位与解决

[store@R10-P01-DN-001 redis-agent]$ python3
Python 3.6.10 (default, Jun 19 2020, 10:51:42) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import remoto
>>> conn = remoto.Connection('R10-P01-DN-002.gd.cn', interpreter='/bin/python')
>>> 
>>> from remoto.process import check
>>> check(conn, ['hostname'])
INFO:R10-P01-DN-001.gd.cn:Running command: hostname
(['R10-P01-DN-002.gd.cn'], [], 0)
>>> 

可以看到,最后不会出现前面那个报错。但这样指定 python 解释器话,远端执行的代码就必须时能用 python2 跑起来的,这点要注意!

4. 小结

花了两个小时看源码还是非常有收获的,理解了 remoto 模块两个方法背后的原理,顺带找到了之前调用出错的原因,终于又可以使用 remoto 模块愉快编程了!

本文地址:https://blog.csdn.net/qq_40085317/article/details/107666454

如您对本文有疑问或者有任何想说的,请点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网