Файловый менеджер - Редактировать - /opt/imh-python/lib/python3.9/site-packages/twisted/conch/test/__pycache__/test_endpoints.cpython-39.pyc
Ðазад
a k�hP� � @ s� d Z ddlZddlmZ ddlmZ ddlmZ ddl m Z mZ ddlZddl mZmZmZ ddlmZ dd lmZ dd lmZ ddlmZ ddlmZmZmZmZ dd lmZm Z m!Z!m"Z" ddl#m$Z$m%Z% ddl&m'Z'm(Z( ddl)m*Z*m+Z+ ddl,m-Z- ddl.m/Z/ ddl0m1Z1 ddl2m3Z3 ddl4m5Z5 ddl6m7Z7m8Z8 ddl9m:Z: e5d��r"e5d��r"ddl;m<Z< ddl=m>Z>m?Z? ddl@mAZAmBZB ddlCmDZDmEZEmFZFmGZGmHZHmIZImJZJ ddlKmLZL ddlMmNZN dd lOmPZP dd!lQmRZR dd"lSmTZT dd#lUmVZV dd$lWmXZX dd%lYmZZZ dd&l[m\Z\m]Z]m^Z^m_Z_ n(d'Z`eaZTeaZZeaZReaZVeaZPeaZNeaZBeaZ?eaZ<dd(lbmcZcmdZd dd)l6meZe G d*d+� d+ec�ZfG d,d-� d-eP�ZgG d.d/� d/eP�ZhG d0d1� d1eP�ZiG d2d3� d3�ZjG d4d5� d5e'�ZkG d6d7� d7�ZlG d8d9� d9eZ�ZmG d:d;� d;eT�Znee$�G d<d=� d=��Zoee%�G d>d?� d?��ZpG d@dA� dA�ZqG dBdC� dCe:eq�ZrG dDdE� dEe:eq�ZsG dFdG� dGe:�ZtG dHdI� dI�ZuG dJdK� dKe:�ZvdS )Lz' Tests for L{twisted.conch.endpoints}. � N)�ENOSYS)�pack)�implementer)�verifyClass�verifyObject)� ConchError�HostKeyChanged�UserRejectedKey)� IConchUser)�'InMemoryUsernamePasswordDatabaseDontUse)�Portal)�IPv4Address)�CancelledError�Deferred�fail�succeed)�ConnectingCancelledError�ConnectionDone�ConnectionRefusedError�ProcessTerminated)�IAddress�IStreamClientEndpoint)�Factory�Protocol)�LogLevel�globalLogPublisher)� networkString)�Failure)�FilePath)�msg)� requireModule)�EventLoggingObserver�MemoryReactorClock)�TestCaseZcryptographyzpyasn1.type)� ConchUser)�InMemorySSHKeyDB�SSHPublicKeyChecker)� ConsoleUI�KnownHostsFile)�AuthenticationFailed�SSHCommandAddress�SSHCommandClientEndpoint�_ExistingConnectionHelper�_ISSHConnectionCreator�_NewConnectionHelper� _ReadFile)�common)�SSHAgentServer)� SSHChannel)� SSHConnection)� SSHFactory)�Key)�SSHClientTransport)�SSHUserAuthServer)�privateDSA_openssh�privateRSA_openssh� privateRSA_openssh_encrypted_aes�publicRSA_opensshz%can't run w/o cryptography and pyasn1)� FakeTransport�connect)�StringTransportc @ s e Zd ZdZdZdd� ZdS )�AbortableFakeTransportzC A L{FakeTransport} with added C{abortConnection} support. Fc C s d| _ dS )z} Abort the connection in a fake manner. This should really be implemented in the underlying module. TN��aborted��self� rD ��/root/rpmbuild/BUILDROOT/imh-python39-modules-3.9.7-92.el8.x86_64/opt/imh-python/lib/python3.9/site-packages/twisted/conch/test/test_endpoints.py�abortConnectionZ s z&AbortableFakeTransport.abortConnectionN)�__name__� __module__�__qualname__�__doc__rA rF rD rD rD rE r? S s r? c @ s e Zd ZdZdd� ZdS )�BrokenExecSessionzO L{BrokenExecSession} is a session on which exec requests always fail. c C s dS )z� Fail all exec requests. @param data: Information about what is being executed. @type data: L{bytes} @return: C{0} to indicate failure @rtype: L{int} r rD �rC �datarD rD rE �request_exech s zBrokenExecSession.request_execN�rG rH rI rJ rN rD rD rD rE rK c s rK c @ s e Zd ZdZdd� ZdS )�WorkingExecSessionzS L{WorkingExecSession} is a session on which exec requests always succeed. c C s dS )z� Succeed all exec requests. @param data: Information about what is being executed. @type data: L{bytes} @return: C{1} to indicate success @rtype: L{int} � rD rL rD rD rE rN z s zWorkingExecSession.request_execNrO rD rD rD rE rP u s rP c @ s e Zd ZdZdd� ZdS )�UnsatisfiedExecSessionz� L{UnsatisfiedExecSession} is a session on which exec requests are always delayed indefinitely, never succeeding or failing. c C s t � S )z� Delay all exec requests indefinitely. @param data: Information about what is being executed. @type data: L{bytes} @return: A L{Deferred} which will never fire. @rtype: L{Deferred} )r rL rD rD rE rN � s z#UnsatisfiedExecSession.request_execNrO rD rD rD rE rR � s rR c @ s e Zd Zdd� Zdd� ZdS )�TrivialRealmc C s i | _ d S �N)� channelLookuprB rD rD rE �__init__� s zTrivialRealm.__init__c G s t � }| j|_t|dd� fS )Nc S s d S rT rD rD rD rD rE �<lambda>� � z,TrivialRealm.requestAvatar.<locals>.<lambda>)r$ rU r )rC ZavatarIdZmindZ interfacesZavatarrD rD rE � requestAvatar� s zTrivialRealm.requestAvatarN)rG rH rI rV rY rD rD rD rE rS � s rS c @ s e Zd ZdZdd� ZdS )�AddressSpyFactoryNc C s || _ t�| |�S rT )�addressr � buildProtocol)rC r[ rD rD rE r\ � s zAddressSpyFactory.buildProtocol)rG rH rI r[ r\ rD rD rD rE rZ � s rZ c @ s$ e Zd Zdd� Zdd� Zdd� ZdS )�FixedResponseUIc C s || _ d S rT ��result)rC r_ rD rD rE rV � s zFixedResponseUI.__init__c C s t | j�S rT )r r_ �rC �textrD rD rE �prompt� s zFixedResponseUI.promptc C s d S rT rD r` rD rD rE �warn� s zFixedResponseUI.warnN)rG rH rI rV rb rc rD rD rD rE r] � s r] c @ s$ e Zd Zedd� �Zedd� �ZdS )�FakeClockSSHUserAuthServerc C s | j jjS )zy Use the C{attemptsBeforeDisconnect} value defined by the factory to make it easier to override. )� transport�factory�attemptsBeforeDisconnectrB rD rD rE rg � s z3FakeClockSSHUserAuthServer.attemptsBeforeDisconnectc C s | j jjS )z� Use the reactor defined by the factory, rather than the default global reactor, to simplify testing (by allowing an alternate implementation to be supplied by tests). )re rf �reactorrB rD rD rE �clock� s z FakeClockSSHUserAuthServer.clockN)rG rH rI �propertyrg ri rD rD rD rE rd � s rd c @ s2 e Zd Zedd� �Zedd� �Zeed�ZdZ dS )�CommandFactoryc C s dt jtd�iS �N� ssh-rsa)rM )r5 � fromStringr; rB rD rD rE � publicKeys� s zCommandFactory.publicKeysc C s dt jtd�iS rl )r5 rn r9 rB rD rD rE �privateKeys� s zCommandFactory.privateKeys)s ssh-userauths ssh-connectionr N) rG rH rI rj ro rp rd r3 Zservicesrg rD rD rD rE rk � s � rk c @ s e Zd ZdS )� MemoryAddressN)rG rH rI rD rD rD rE rq � s rq c @ s e Zd ZdZdd� Zdd� ZdS )�SingleUseMemoryEndpointa] L{SingleUseMemoryEndpoint} is a client endpoint which allows one connection to be set up and then exposes an API for moving around bytes related to that connection. @ivar pump: L{None} until a connection is attempted, then a L{IOPump} instance associated with the protocol which is connected. @type pump: L{IOPump} c C s d| _ || _dS )z� @param server: An L{IProtocol} provider to which the client will be connected. @type server: L{IProtocol} provider N)�pump�_server)rC �serverrD rD rE rV � s z SingleUseMemoryEndpoint.__init__c C sl | j d urtd��z|�t� �}W n ty: t� Y S 0 t| jt| jdd�|t|dd��| _ t |�S d S )Nz(SingleUseMemoryEndpoint was already usedT��isServerF) rs � Exceptionr\ rq � BaseExceptionr r= rt r? r )rC rf �protocolrD rD rE r= � s �zSingleUseMemoryEndpoint.connectN)rG rH rI rJ rV r= rD rD rD rE rr � s rr c @ s� e Zd ZdZdd� Zdd� Zdd� Zdd � Zd d� Zdd � Z dd� Z dd� Zdd� Zdd� Z dd� Zdd� Zdd� Zdd� Zdd� Zd d!� Zd*d#d$�Zd%d&� Zd'd(� Zd)S )+�"SSHCommandClientEndpointTestsMixina� Tests for L{SSHCommandClientEndpoint}, an L{IStreamClientEndpoint} implementations which connects a protocol with the stdin and stdout of a command running in an SSH session. These tests apply to L{SSHCommandClientEndpoint} whether it is constructed using L{SSHCommandClientEndpoint.existingConnection} or L{SSHCommandClientEndpoint.newConnection}. Subclasses must override L{create}, L{assertClientTransportState}, and L{finishConnection}. c C s� d| _ d| _d| _d| _t� | _t� | _t| j�| _ t � | _| j�| j| j� | j � | j� t� | _| j| j_| j | j_ | j�� | �| jj� tddd�| _tddd �| _d S ) Ns ssh.example.comi&� s users passwordZTCPz10.0.0.1i90 z192.168.100.200i1� )�hostname�port�user�passwordr"