o
    ҷh[                     @  s  U d dl mZ d dlZd dlmZmZ d dlmZmZ ddl	m
Z
mZ ddlmZ ddlmZmZ er9d d	lmZ eg ee f Zd
ed< eg ef Zd
ed< ededZededZG dd dZejG dd deZejG dd deZejdde_ejdde_ddd8dd Zd9d"d#Z d:d'd(Z!d;d*d+Z"G d,d- d-Z#G d.d/ d/eZ$G d0d1 d1eZ%d<d3d4Z&d=d6d7Z'dS )>    )annotationsN)	AwaitableCallable)TYPE_CHECKINGTypeVar   )_core_utilStapledStream)ReceiveStream
SendStream)	TypeAliasr   	AsyncHookSyncHookSendStreamT)boundReceiveStreamTc                   @  s`   e Zd ZdddZdddZdddZdddZdddZdddZddddZ	ddddZ
dS )_UnboundedByteQueuereturnNonec                 C  s(   t  | _d| _t | _td| _d S )NFz%another task is already fetching data)		bytearray_data_closedr   
ParkingLot_lotr	   ConflictDetector_fetch_lockself r    O/var/www/html/venv/lib/python3.10/site-packages/trio/testing/_memory_streams.py__init__   s   

z_UnboundedByteQueue.__init__c                 C  s   d| _ | j  d S NT)r   r   
unpark_allr   r    r    r!   close(   s   z_UnboundedByteQueue.closec                 C  s   t  | _|   d S N)r   r   r%   r   r    r    r!   close_and_wipe,   s   z"_UnboundedByteQueue.close_and_wipedatabytes | bytearray | memoryviewc                 C  s,   | j rtd|  j|7  _| j  d S )Nzvirtual connection closed)r   r   ClosedResourceErrorr   r   r$   r   r(   r    r    r!   put0   s   
z_UnboundedByteQueue.put	max_bytes
int | Nonec                 C  s*   |d u rd S t |}|dk rtdd S )N   max_bytes must be >= 1)operatorindex
ValueErrorr   r-   r    r    r!   _check_max_bytes6   s   
z$_UnboundedByteQueue._check_max_bytesr   c                 C  sT   | j s| jsJ |d u rt| j}| jr'| jd | }| jd |= |s%J |S t S r&   )r   r   lenr   )r   r-   chunkr    r    r!   	_get_impl=   s   
z_UnboundedByteQueue._get_implNc                 C  sP   | j  | | | js| jstj| |W  d    S 1 s!w   Y  d S r&   )r   r5   r   r   r   
WouldBlockr8   r4   r    r    r!   
get_nowaitI   s   
$z_UnboundedByteQueue.get_nowaitc                   sl   | j ( | | | js| js| j I d H  nt I d H  | |W  d    S 1 s/w   Y  d S r&   )	r   r5   r   r   r   parkr   
checkpointr8   r4   r    r    r!   getP   s   
$z_UnboundedByteQueue.getr   r   r(   r)   r   r   )r-   r.   r   r   r-   r.   r   r   r&   )__name__
__module____qualname__r"   r%   r'   r,   r5   r8   r:   r=   r    r    r    r!   r      s    





r   c                   @  sb   e Zd ZdZ			ddd
dZdddZd ddZd ddZd ddZd!d"ddZ	d!d"ddZ
dS )#MemorySendStreama  An in-memory :class:`~trio.abc.SendStream`.

    Args:
      send_all_hook: An async function, or None. Called from
          :meth:`send_all`. Can do whatever you like.
      wait_send_all_might_not_block_hook: An async function, or None. Called
          from :meth:`wait_send_all_might_not_block`. Can do whatever you
          like.
      close_hook: A synchronous function, or None. Called from :meth:`close`
          and :meth:`aclose`. Can do whatever you like.

    .. attribute:: send_all_hook
                   wait_send_all_might_not_block_hook
                   close_hook

       All of these hooks are also exposed as attributes on the object, and
       you can change them at any time.

    Nsend_all_hookAsyncHook | None"wait_send_all_might_not_block_hook
close_hookSyncHook | Noner   r   c                 C  s*   t d| _t | _|| _|| _|| _d S )N!another task is using this stream)r	   r   _conflict_detectorr   	_outgoingrE   rG   rH   )r   rE   rG   rH   r    r    r!   r"   p   s   
zMemorySendStream.__init__r(   r)   c                   s~   | j 1 t I dH  t I dH  | j| | jdur-|  I dH  W d   dS W d   dS 1 s8w   Y  dS )z}Places the given data into the object's internal buffer, and then
        calls the :attr:`send_all_hook` (if any).

        N)rK   r   r<   rL   r,   rE   r+   r    r    r!   send_all~   s   
"zMemorySendStream.send_allc                   s~   | j 1 t I dH  t I dH  | jd | jdur-|  I dH  W d   dS W d   dS 1 s8w   Y  dS )znCalls the :attr:`wait_send_all_might_not_block_hook` (if any), and
        then returns immediately.

        N    )rK   r   r<   rL   r,   rG   r   r    r    r!   wait_send_all_might_not_block   s   
"z.MemorySendStream.wait_send_all_might_not_blockc                 C  s$   | j   | jdur|   dS dS )z^Marks this stream as closed, and then calls the :attr:`close_hook`
        (if any).

        N)rL   r%   rH   r   r    r    r!   r%      s   

zMemorySendStream.closec                      |    t I dH  dS z!Same as :meth:`close`, but async.Nr%   r   r<   r   r    r    r!   aclose      zMemorySendStream.acloser-   r.   r   c                   s   | j |I dH S )a  Retrieves data from the internal buffer, blocking if necessary.

        Args:
          max_bytes (int or None): The maximum amount of data to
              retrieve. None (the default) means to retrieve all the data
              that's present (but still blocks until at least one byte is
              available).

        Returns:
          If this stream has been closed, an empty bytearray. Otherwise, the
          requested data.

        N)rL   r=   r4   r    r    r!   get_data   s   zMemorySendStream.get_datac                 C  s   | j |S )zRetrieves data from the internal buffer, but doesn't block.

        See :meth:`get_data` for details.

        Raises:
          trio.WouldBlock: if no data is available to retrieve.

        )rL   r:   r4   r    r    r!   get_data_nowait   s   	z MemorySendStream.get_data_nowait)NNN)rE   rF   rG   rF   rH   rI   r   r   r?   r>   r&   r@   )rA   rB   rC   __doc__r"   rM   rO   r%   rS   rU   rV   r    r    r    r!   rD   Z   s    



rD   c                   @  sT   e Zd ZdZ		ddd	d
ZddddZdddZdddZdddZdddZ	dS ) MemoryReceiveStreama  An in-memory :class:`~trio.abc.ReceiveStream`.

    Args:
      receive_some_hook: An async function, or None. Called from
          :meth:`receive_some`. Can do whatever you like.
      close_hook: A synchronous function, or None. Called from :meth:`close`
          and :meth:`aclose`. Can do whatever you like.

    .. attribute:: receive_some_hook
                   close_hook

       Both hooks are also exposed as attributes on the object, and you can
       change them at any time.

    Nreceive_some_hookrF   rH   rI   r   r   c                 C  s*   t d| _t | _d| _|| _|| _d S )NrJ   F)r	   r   rK   r   	_incomingr   rY   rH   )r   rY   rH   r    r    r!   r"      s   
zMemoryReceiveStream.__init__r-   r.   r   c                   s   | j 9 t I dH  t I dH  | jrtj| jdur%|  I dH  | j|I dH }| jr4tj|W  d   S 1 s@w   Y  dS )zCalls the :attr:`receive_some_hook` (if any), and then retrieves
        data from the internal buffer, blocking if necessary.

        N)rK   r   r<   r   r*   rY   rZ   r=   )r   r-   r(   r    r    r!   receive_some   s   
$z MemoryReceiveStream.receive_somec                 C  s*   d| _ | j  | jdur|   dS dS )zfDiscards any pending data from the internal buffer, and marks this
        stream as closed.

        TN)r   rZ   r'   rH   r   r    r    r!   r%     s
   

zMemoryReceiveStream.closec                   rP   rQ   rR   r   r    r    r!   rS     rT   zMemoryReceiveStream.acloser(   r)   c                 C  s   | j | dS )z.Appends the given data to the internal buffer.N)rZ   r,   r+   r    r    r!   put_data  s   zMemoryReceiveStream.put_datac                 C  s   | j   dS )z2Adds an end-of-file marker to the internal buffer.N)rZ   r%   r   r    r    r!   put_eof  s   zMemoryReceiveStream.put_eof)NN)rY   rF   rH   rI   r   r   r&   r@   r>   r?   )
rA   rB   rC   rW   r"   r[   r%   rS   r\   r]   r    r    r    r!   rX      s    



rX   z._memory_streams )r-   memory_send_streammemory_receive_streamr-   r.   r   boolc                C  sf   z|  |}W n tjy   Y dS w z|s|  W dS || W dS  tjy2   tddw )a  Take data out of the given :class:`MemorySendStream`'s internal buffer,
    and put it into the given :class:`MemoryReceiveStream`'s internal buffer.

    Args:
      memory_send_stream (MemorySendStream): The stream to get data from.
      memory_receive_stream (MemoryReceiveStream): The stream to put data into.
      max_bytes (int or None): The maximum amount of data to transfer in this
          call, or None to transfer all available data.

    Returns:
      True if it successfully transferred some data, or False if there was no
      data to transfer.

    This is used to implement :func:`memory_stream_one_way_pair` and
    :func:`memory_stream_pair`; see the latter's docstring for an example
    of how you might use it yourself.

    FzMemoryReceiveStream was closedNT)rV   r   r9   r]   r\   r*   BrokenResourceError)r_   r`   r-   r(   r    r    r!   memory_stream_pump$  s   
rc   ,tuple[MemorySendStream, MemoryReceiveStream]c                    s>   t  t dfdd d fdd} | _ _fS )	uQ  Create a connected, pure-Python, unidirectional stream with infinite
    buffering and flexible configuration options.

    You can think of this as being a no-operating-system-involved
    Trio-streamsified version of :func:`os.pipe` (except that :func:`os.pipe`
    returns the streams in the wrong order – we follow the superior convention
    that data flows from left to right).

    Returns:
      A tuple (:class:`MemorySendStream`, :class:`MemoryReceiveStream`), where
      the :class:`MemorySendStream` has its hooks set up so that it calls
      :func:`memory_stream_pump` from its
      :attr:`~MemorySendStream.send_all_hook` and
      :attr:`~MemorySendStream.close_hook`.

    The end result is that data automatically flows from the
    :class:`MemorySendStream` to the :class:`MemoryReceiveStream`. But you're
    also free to rearrange things however you like. For example, you can
    temporarily set the :attr:`~MemorySendStream.send_all_hook` to None if you
    want to simulate a stall in data transmission. Or see
    :func:`memory_stream_pair` for a more elaborate example.

    r   r   c                     s   t   d S r&   )rc   r    )recv_streamsend_streamr    r!   $pump_from_send_stream_to_recv_streame     zHmemory_stream_one_way_pair.<locals>.pump_from_send_stream_to_recv_streamc                     s      d S r&   r    r    )rg   r    r!   *async_pump_from_send_stream_to_recv_streami  s   
zNmemory_stream_one_way_pair.<locals>.async_pump_from_send_stream_to_recv_streamNr>   )rD   rX   rE   rH   )ri   r    )rg   re   rf   r!   memory_stream_one_way_pairJ  s   rj   one_way_pair0Callable[[], tuple[SendStreamT, ReceiveStreamT]]]tuple[StapledStream[SendStreamT, ReceiveStreamT], StapledStream[SendStreamT, ReceiveStreamT]]c                 C  s0   |  \}}|  \}}t ||}t ||}||fS r&   r
   )rk   
pipe1_send
pipe1_recv
pipe2_send
pipe2_recvstream1stream2r    r    r!   _make_stapled_pairq  s
   



rt   qtuple[StapledStream[MemorySendStream, MemoryReceiveStream], StapledStream[MemorySendStream, MemoryReceiveStream]]c                   C     t tS )a  Create a connected, pure-Python, bidirectional stream with infinite
    buffering and flexible configuration options.

    This is a convenience function that creates two one-way streams using
    :func:`memory_stream_one_way_pair`, and then uses
    :class:`~trio.StapledStream` to combine them into a single bidirectional
    stream.

    This is like a no-operating-system-involved, Trio-streamsified version of
    :func:`socket.socketpair`.

    Returns:
      A pair of :class:`~trio.StapledStream` objects that are connected so
      that data automatically flows from one to the other in both directions.

    After creating a stream pair, you can send data back and forth, which is
    enough for simple tests::

       left, right = memory_stream_pair()
       await left.send_all(b"123")
       assert await right.receive_some() == b"123"
       await right.send_all(b"456")
       assert await left.receive_some() == b"456"

    But if you read the docs for :class:`~trio.StapledStream` and
    :func:`memory_stream_one_way_pair`, you'll see that all the pieces
    involved in wiring this up are public APIs, so you can adjust to suit the
    requirements of your tests. For example, here's how to tweak a stream so
    that data flowing from left to right trickles in one byte at a time (but
    data flowing from right to left proceeds at full speed)::

        left, right = memory_stream_pair()
        async def trickle():
            # left is a StapledStream, and left.send_stream is a MemorySendStream
            # right is a StapledStream, and right.recv_stream is a MemoryReceiveStream
            while memory_stream_pump(left.send_stream, right.recv_stream, max_bytes=1):
                # Pause between each byte
                await trio.sleep(1)
        # Normally this send_all_hook calls memory_stream_pump directly without
        # passing in a max_bytes. We replace it with our custom version:
        left.send_stream.send_all_hook = trickle

    And here's a simple test using our modified stream objects::

        async def sender():
            await left.send_all(b"12345")
            await left.send_eof()

        async def receiver():
            async for data in right:
                print(data)

        async with trio.open_nursery() as nursery:
            nursery.start_soon(sender)
            nursery.start_soon(receiver)

    By default, this will print ``b"12345"`` and then immediately exit; with
    our trickle stream it instead sleeps 1 second, then prints ``b"1"``, then
    sleeps 1 second, then prints ``b"2"``, etc.

    Pro-tip: you can insert sleep calls (like in our example above) to
    manipulate the flow of data across tasks... and then use
    :class:`MockClock` and its :attr:`~MockClock.autojump_threshold`
    functionality to keep your test suite running quickly.

    If you want to stress test a protocol implementation, one nice trick is to
    use the :mod:`random` module (preferably with a fixed seed) to move random
    numbers of bytes at a time, and insert random sleeps in between them. You
    can also set up a custom :attr:`~MemoryReceiveStream.receive_some_hook` if
    you want to manipulate things on the receiving side, and not just the
    sending side.

    )rt   rj   r    r    r    r!   memory_stream_pair~  s   Mrw   c                   @  s^   e Zd ZdddZdddZdd	d
ZdddZdddZdddZdddZ	ddddZ
dS ) _LockstepByteQueuer   r   c                 C  s@   t  | _d| _d| _d| _t | _t	d| _
t	d| _d S )NFzanother task is already sendingz!another task is already receiving)r   r   _sender_closed_receiver_closed_receiver_waitingr   r   _waitersr	   r   _send_conflict_detector_receive_conflict_detectorr   r    r    r!   r"     s   

z_LockstepByteQueue.__init__c                 C     | j   d S r&   )r|   r$   r   r    r    r!   _something_happened  rh   z&_LockstepByteQueue._something_happenedfnCallable[[], bool]c                   s>   	 | rn| j s| jrn	| j I d H  qt I d H  d S r&   )ry   rz   r|   r;   r   r<   )r   r   r    r    r!   	_wait_for  s   z_LockstepByteQueue._wait_forc                 C     d| _ |   d S r#   )ry   r   r   r    r    r!   close_sender     z_LockstepByteQueue.close_senderc                 C  r   r#   )rz   r   r   r    r    r!   close_receiver  r   z!_LockstepByteQueue.close_receiverr(   r)   c                   s    j H  jrtj jrtj jrJ   j|7  _     fddI d H   jr3tj jr< jrDtjW d    d S W d    d S 1 sOw   Y  d S )Nc                     s
    j dkS NrN   r   r    r   r    r!   <lambda>      
 z-_LockstepByteQueue.send_all.<locals>.<lambda>)	r}   ry   r   r*   rz   rb   r   r   r   r+   r    r   r!   rM     s$   
"z_LockstepByteQueue.send_allc                   s    j 4  jrtj jrt I d H  	 W d    d S   fddI d H   jr0tjW d    d S 1 s;w   Y  d S )Nc                     s    j S r&   )r{   r    r   r    r!   r     s    zB_LockstepByteQueue.wait_send_all_might_not_block.<locals>.<lambda>)r}   ry   r   r*   rz   r<   r   r   r    r   r!   rO     s   "z0_LockstepByteQueue.wait_send_all_might_not_blockNr-   r.   bytes | bytearrayc              	     s    j f |d urt|}|dk rtd jrtjd _   z 	 fddI d H  W d _nd _w  jr?tj j
r\ j
d | } j
d |=    |W  d    S  jsaJ 	 W d    dS 1 smw   Y  d S )Nr/   r0   Tc                     s
    j dkS r   r   r    r   r    r!   r     r   z1_LockstepByteQueue.receive_some.<locals>.<lambda>FrN   )r~   r1   r2   r3   rz   r   r*   r{   r   r   r   ry   )r   r-   gotr    r   r!   r[     s0   

$z_LockstepByteQueue.receive_somer>   )r   r   r   r   r?   r&   r-   r.   r   r   )rA   rB   rC   r"   r   r   r   r   rM   rO   r[   r    r    r    r!   rx     s    



	


rx   c                   @  s>   e Zd ZdddZdddZdd	d
ZdddZdddZdS )_LockstepSendStreamlbqrx   r   r   c                 C  
   || _ d S r&   _lbqr   r   r    r    r!   r"   2     
z_LockstepSendStream.__init__c                 C  r   r&   )r   r   r   r    r    r!   r%   5  rh   z_LockstepSendStream.closec                      |    t I d H  d S r&   rR   r   r    r    r!   rS   8     z_LockstepSendStream.acloser(   r)   c                   s   | j |I d H  d S r&   )r   rM   r+   r    r    r!   rM   <  s   z_LockstepSendStream.send_allc                   s   | j  I d H  d S r&   )r   rO   r   r    r    r!   rO   ?  s   z1_LockstepSendStream.wait_send_all_might_not_blockNr   rx   r   r   r>   r?   )rA   rB   rC   r"   r%   rS   rM   rO   r    r    r    r!   r   1  s    



r   c                   @  s6   e Zd ZdddZdddZdd	d
ZddddZdS )_LockstepReceiveStreamr   rx   r   r   c                 C  r   r&   r   r   r    r    r!   r"   D  r   z_LockstepReceiveStream.__init__c                 C  r   r&   )r   r   r   r    r    r!   r%   G  rh   z_LockstepReceiveStream.closec                   r   r&   rR   r   r    r    r!   rS   J  r   z_LockstepReceiveStream.acloseNr-   r.   r   c                   s   | j |I d H S r&   )r   r[   r4   r    r    r!   r[   N  s   z#_LockstepReceiveStream.receive_somer   r>   r&   r   )rA   rB   rC   r"   r%   rS   r[   r    r    r    r!   r   C  s
    


r    tuple[SendStream, ReceiveStream]c                  C  s   t  } t| t| fS )a  Create a connected, pure Python, unidirectional stream where data flows
    in lockstep.

    Returns:
      A tuple
      (:class:`~trio.abc.SendStream`, :class:`~trio.abc.ReceiveStream`).

    This stream has *absolutely no* buffering. Each call to
    :meth:`~trio.abc.SendStream.send_all` will block until all the given data
    has been returned by a call to
    :meth:`~trio.abc.ReceiveStream.receive_some`.

    This can be useful for testing flow control mechanisms in an extreme case,
    or for setting up "clogged" streams to use with
    :func:`check_one_way_stream` and friends.

    In addition to fulfilling the :class:`~trio.abc.SendStream` and
    :class:`~trio.abc.ReceiveStream` interfaces, the return objects
    also have a synchronous ``close`` method.

    )rx   r   r   )r   r    r    r!   lockstep_stream_one_way_pairR  s   r   Ytuple[StapledStream[SendStream, ReceiveStream], StapledStream[SendStream, ReceiveStream]]c                   C  rv   )a  Create a connected, pure-Python, bidirectional stream where data flows
    in lockstep.

    Returns:
      A tuple (:class:`~trio.StapledStream`, :class:`~trio.StapledStream`).

    This is a convenience function that creates two one-way streams using
    :func:`lockstep_stream_one_way_pair`, and then uses
    :class:`~trio.StapledStream` to combine them into a single bidirectional
    stream.

    )rt   r   r    r    r    r!   lockstep_stream_pairm  s   r   )r_   rD   r`   rX   r-   r.   r   ra   )r   rd   )rk   rl   r   rm   )r   ru   )r   r   )r   r   )(
__future__r   r1   collections.abcr   r   typingr   r   r^   r   r	   _highlevel_genericr   abcr   r   typing_extensionsr   objectr   __annotations__r   r   r   r   finalrD   rX   rB   replacerc   rj   rt   rw   rx   r   r   r   r   r    r    r    r!   <module>   sD    ?rN	
&
'
U^
