o
    ͷ6i[                     @  st  U d dl mZ d dlZd dlmZmZ d dlmZmZ ddl	m
Z
mZ ddlmZ ddlmZmZ eg ee f Zd	ed
< eg ef Zd	ed< ededZededZG dd dZejG dd deZejG dd deZejdde_ejdde_ddd7ddZd8d!d"Zd9d&d'Zd:d)d*Z G d+d, d,Z!G d-d. d.eZ"G d/d0 d0eZ#d;d2d3Z$d<d5d6Z%dS )=    )annotationsN)	AwaitableCallable)	TypeAliasTypeVar   )_core_utilStapledStream)ReceiveStream
SendStreamr   	AsyncHookSyncHookSendStreamT)boundReceiveStreamTc                   @  s`   e Zd ZdddZdddZdddZdddZdddZdddZddddZ	ddddZ
dS )_UnboundedByteQueuereturnNonec                 C  s(   t  | _d| _t | _td| _d S )NFz%another task is already fetching data)		bytearray_data_closedr   
ParkingLot_lotr	   ConflictDetector_fetch_lockself r   a/var/www/hoanhtaovolam_webdjango/env/lib/python3.10/site-packages/trio/testing/_memory_streams.py__init__   s   

z_UnboundedByteQueue.__init__c                 C  s   d| _ | j  d S NT)r   r   
unpark_allr   r   r   r    close$   s   z_UnboundedByteQueue.closec                 C  s   t  | _|   d S N)r   r   r$   r   r   r   r    close_and_wipe(   s   z"_UnboundedByteQueue.close_and_wipedatabytes | bytearray | memoryviewc                 C  s,   | j rtd|  j|7  _| j  d S )Nzvirtual connection closed)r   r   ClosedResourceErrorr   r   r#   r   r'   r   r   r    put,   s   
z_UnboundedByteQueue.put	max_bytes
int | Nonec                 C  s*   |d u rd S t |}|dk rtdd S )N   max_bytes must be >= 1)operatorindex
ValueErrorr   r,   r   r   r    _check_max_bytes2   s   
z$_UnboundedByteQueue._check_max_bytesr   c                 C  sT   | j s| jsJ |d u rt| j}| jr'| jd | }| jd |= |s%J |S t S r%   )r   r   lenr   )r   r,   chunkr   r   r    	_get_impl9   s   
z_UnboundedByteQueue._get_implNc                 C  sP   | j  | | | js| jstj| |W  d    S 1 s!w   Y  d S r%   )r   r4   r   r   r   
WouldBlockr7   r3   r   r   r    
get_nowaitE   s   
$z_UnboundedByteQueue.get_nowaitc                   sl   | j ( | | | js| js| j I d H  nt I d H  | |W  d    S 1 s/w   Y  d S r%   )	r   r4   r   r   r   parkr   
checkpointr7   r3   r   r   r    getL   s   
$z_UnboundedByteQueue.getr   r   r'   r(   r   r   )r,   r-   r   r   r,   r-   r   r   r%   )__name__
__module____qualname__r!   r$   r&   r+   r4   r7   r9   r<   r   r   r   r    r      s    





r   c                   @  sb   e Zd ZdZ			ddd
dZdddZd ddZd ddZd ddZd!d"ddZ	d!d"ddZ
dS )#MemorySendStreama  An in-memory :class:`~trio.abc.SendStream`.

    Args:
      send_all_hook: An async function, or None. Called from
          :meth:`send_all`. Can do whatever you like.
      wait_send_all_might_not_block_hook: An async function, or None. Called
          from :meth:`wait_send_all_might_not_block`. Can do whatever you
          like.
      close_hook: A synchronous function, or None. Called from :meth:`close`
          and :meth:`aclose`. Can do whatever you like.

    .. attribute:: send_all_hook
                   wait_send_all_might_not_block_hook
                   close_hook

       All of these hooks are also exposed as attributes on the object, and
       you can change them at any time.

    Nsend_all_hookAsyncHook | None"wait_send_all_might_not_block_hook
close_hookSyncHook | Noner   r   c                 C  s*   t d| _t | _|| _|| _|| _d S )N!another task is using this stream)r	   r   _conflict_detectorr   	_outgoingrD   rF   rG   )r   rD   rF   rG   r   r   r    r!   l   s   
zMemorySendStream.__init__r'   r(   c                   s~   | j 1 t I dH  t I dH  | j| | jdur-|  I dH  W d   dS W d   dS 1 s8w   Y  dS )z}Places the given data into the object's internal buffer, and then
        calls the :attr:`send_all_hook` (if any).

        N)rJ   r   r;   rK   r+   rD   r*   r   r   r    send_allz   s   
"zMemorySendStream.send_allc                   s~   | j 1 t I dH  t I dH  | jd | jdur-|  I dH  W d   dS W d   dS 1 s8w   Y  dS )znCalls the :attr:`wait_send_all_might_not_block_hook` (if any), and
        then returns immediately.

        N    )rJ   r   r;   rK   r+   rF   r   r   r   r    wait_send_all_might_not_block   s   
"z.MemorySendStream.wait_send_all_might_not_blockc                 C  s$   | j   | jdur|   dS dS )z^Marks this stream as closed, and then calls the :attr:`close_hook`
        (if any).

        N)rK   r$   rG   r   r   r   r    r$      s   

zMemorySendStream.closec                      |    t I dH  dS z!Same as :meth:`close`, but async.Nr$   r   r;   r   r   r   r    aclose      zMemorySendStream.acloser,   r-   r   c                   s   | j |I dH S )a  Retrieves data from the internal buffer, blocking if necessary.

        Args:
          max_bytes (int or None): The maximum amount of data to
              retrieve. None (the default) means to retrieve all the data
              that's present (but still blocks until at least one byte is
              available).

        Returns:
          If this stream has been closed, an empty bytearray. Otherwise, the
          requested data.

        N)rK   r<   r3   r   r   r    get_data   s   zMemorySendStream.get_datac                 C  s   | j |S )zRetrieves data from the internal buffer, but doesn't block.

        See :meth:`get_data` for details.

        Raises:
          trio.WouldBlock: if no data is available to retrieve.

        )rK   r9   r3   r   r   r    get_data_nowait   s   	z MemorySendStream.get_data_nowait)NNN)rD   rE   rF   rE   rG   rH   r   r   r>   r=   r%   r?   )r@   rA   rB   __doc__r!   rL   rN   r$   rR   rT   rU   r   r   r   r    rC   V   s    



rC   c                   @  sT   e Zd ZdZ		ddd	d
ZddddZdddZdddZdddZdddZ	dS ) MemoryReceiveStreama  An in-memory :class:`~trio.abc.ReceiveStream`.

    Args:
      receive_some_hook: An async function, or None. Called from
          :meth:`receive_some`. Can do whatever you like.
      close_hook: A synchronous function, or None. Called from :meth:`close`
          and :meth:`aclose`. Can do whatever you like.

    .. attribute:: receive_some_hook
                   close_hook

       Both hooks are also exposed as attributes on the object, and you can
       change them at any time.

    Nreceive_some_hookrE   rG   rH   r   r   c                 C  s*   t d| _t | _d| _|| _|| _d S )NrI   F)r	   r   rJ   r   	_incomingr   rX   rG   )r   rX   rG   r   r   r    r!      s   
zMemoryReceiveStream.__init__r,   r-   r   c                   s   | j 9 t I dH  t I dH  | jrtj| jdur%|  I dH  | j|I dH }| jr4tj|W  d   S 1 s@w   Y  dS )zCalls the :attr:`receive_some_hook` (if any), and then retrieves
        data from the internal buffer, blocking if necessary.

        N)rJ   r   r;   r   r)   rX   rY   r<   )r   r,   r'   r   r   r    receive_some   s   
$z MemoryReceiveStream.receive_somec                 C  s*   d| _ | j  | jdur|   dS dS )zfDiscards any pending data from the internal buffer, and marks this
        stream as closed.

        TN)r   rY   r&   rG   r   r   r   r    r$      s
   

zMemoryReceiveStream.closec                   rO   rP   rQ   r   r   r   r    rR   	  rS   zMemoryReceiveStream.acloser'   r(   c                 C  s   | j | dS )z.Appends the given data to the internal buffer.N)rY   r+   r*   r   r   r    put_data  s   zMemoryReceiveStream.put_datac                 C  s   | j   dS )z2Adds an end-of-file marker to the internal buffer.N)rY   r$   r   r   r   r    put_eof  s   zMemoryReceiveStream.put_eof)NN)rX   rE   rG   rH   r   r   r%   r?   r=   r>   )
r@   rA   rB   rV   r!   rZ   r$   rR   r[   r\   r   r   r   r    rW      s    



rW   z._memory_streams )r,   memory_send_streammemory_receive_streamr,   r-   r   boolc                C  sf   z|  |}W n tjy   Y dS w z|s|  W dS || W dS  tjy2   tddw )a  Take data out of the given :class:`MemorySendStream`'s internal buffer,
    and put it into the given :class:`MemoryReceiveStream`'s internal buffer.

    Args:
      memory_send_stream (MemorySendStream): The stream to get data from.
      memory_receive_stream (MemoryReceiveStream): The stream to put data into.
      max_bytes (int or None): The maximum amount of data to transfer in this
          call, or None to transfer all available data.

    Returns:
      True if it successfully transferred some data, or False if there was no
      data to transfer.

    This is used to implement :func:`memory_stream_one_way_pair` and
    :func:`memory_stream_pair`; see the latter's docstring for an example
    of how you might use it yourself.

    FzMemoryReceiveStream was closedNT)rU   r   r8   r\   r[   r)   BrokenResourceError)r^   r_   r,   r'   r   r   r    memory_stream_pump   s   
rb   ,tuple[MemorySendStream, MemoryReceiveStream]c                    s>   t  t dfdd d fdd} | _ _fS )	uQ  Create a connected, pure-Python, unidirectional stream with infinite
    buffering and flexible configuration options.

    You can think of this as being a no-operating-system-involved
    Trio-streamsified version of :func:`os.pipe` (except that :func:`os.pipe`
    returns the streams in the wrong order – we follow the superior convention
    that data flows from left to right).

    Returns:
      A tuple (:class:`MemorySendStream`, :class:`MemoryReceiveStream`), where
      the :class:`MemorySendStream` has its hooks set up so that it calls
      :func:`memory_stream_pump` from its
      :attr:`~MemorySendStream.send_all_hook` and
      :attr:`~MemorySendStream.close_hook`.

    The end result is that data automatically flows from the
    :class:`MemorySendStream` to the :class:`MemoryReceiveStream`. But you're
    also free to rearrange things however you like. For example, you can
    temporarily set the :attr:`~MemorySendStream.send_all_hook` to None if you
    want to simulate a stall in data transmission. Or see
    :func:`memory_stream_pair` for a more elaborate example.

    r   r   c                     s   t   d S r%   )rb   r   )recv_streamsend_streamr   r    $pump_from_send_stream_to_recv_streama     zHmemory_stream_one_way_pair.<locals>.pump_from_send_stream_to_recv_streamc                     s      d S r%   r   r   )rf   r   r    *async_pump_from_send_stream_to_recv_streame  s   
zNmemory_stream_one_way_pair.<locals>.async_pump_from_send_stream_to_recv_streamNr=   )rC   rW   rD   rG   )rh   r   )rf   rd   re   r    memory_stream_one_way_pairF  s   ri   one_way_pair0Callable[[], tuple[SendStreamT, ReceiveStreamT]]]tuple[StapledStream[SendStreamT, ReceiveStreamT], StapledStream[SendStreamT, ReceiveStreamT]]c                 C  s0   |  \}}|  \}}t ||}t ||}||fS r%   r
   )rj   
pipe1_send
pipe1_recv
pipe2_send
pipe2_recvstream1stream2r   r   r    _make_stapled_pairm  s
   



rs   qtuple[StapledStream[MemorySendStream, MemoryReceiveStream], StapledStream[MemorySendStream, MemoryReceiveStream]]c                   C     t tS )a  Create a connected, pure-Python, bidirectional stream with infinite
    buffering and flexible configuration options.

    This is a convenience function that creates two one-way streams using
    :func:`memory_stream_one_way_pair`, and then uses
    :class:`~trio.StapledStream` to combine them into a single bidirectional
    stream.

    This is like a no-operating-system-involved, Trio-streamsified version of
    :func:`socket.socketpair`.

    Returns:
      A pair of :class:`~trio.StapledStream` objects that are connected so
      that data automatically flows from one to the other in both directions.

    After creating a stream pair, you can send data back and forth, which is
    enough for simple tests::

       left, right = memory_stream_pair()
       await left.send_all(b"123")
       assert await right.receive_some() == b"123"
       await right.send_all(b"456")
       assert await left.receive_some() == b"456"

    But if you read the docs for :class:`~trio.StapledStream` and
    :func:`memory_stream_one_way_pair`, you'll see that all the pieces
    involved in wiring this up are public APIs, so you can adjust to suit the
    requirements of your tests. For example, here's how to tweak a stream so
    that data flowing from left to right trickles in one byte at a time (but
    data flowing from right to left proceeds at full speed)::

        left, right = memory_stream_pair()
        async def trickle():
            # left is a StapledStream, and left.send_stream is a MemorySendStream
            # right is a StapledStream, and right.recv_stream is a MemoryReceiveStream
            while memory_stream_pump(left.send_stream, right.recv_stream, max_bytes=1):
                # Pause between each byte
                await trio.sleep(1)
        # Normally this send_all_hook calls memory_stream_pump directly without
        # passing in a max_bytes. We replace it with our custom version:
        left.send_stream.send_all_hook = trickle

    And here's a simple test using our modified stream objects::

        async def sender():
            await left.send_all(b"12345")
            await left.send_eof()

        async def receiver():
            async for data in right:
                print(data)

        async with trio.open_nursery() as nursery:
            nursery.start_soon(sender)
            nursery.start_soon(receiver)

    By default, this will print ``b"12345"`` and then immediately exit; with
    our trickle stream it instead sleeps 1 second, then prints ``b"1"``, then
    sleeps 1 second, then prints ``b"2"``, etc.

    Pro-tip: you can insert sleep calls (like in our example above) to
    manipulate the flow of data across tasks... and then use
    :class:`MockClock` and its :attr:`~MockClock.autojump_threshold`
    functionality to keep your test suite running quickly.

    If you want to stress test a protocol implementation, one nice trick is to
    use the :mod:`random` module (preferably with a fixed seed) to move random
    numbers of bytes at a time, and insert random sleeps in between them. You
    can also set up a custom :attr:`~MemoryReceiveStream.receive_some_hook` if
    you want to manipulate things on the receiving side, and not just the
    sending side.

    )rs   ri   r   r   r   r    memory_stream_pairz  s   Mrv   c                   @  s^   e Zd ZdddZdddZdd	d
ZdddZdddZdddZdddZ	ddddZ
dS ) _LockstepByteQueuer   r   c                 C  s@   t  | _d| _d| _d| _t | _t	d| _
t	d| _d S )NFzanother task is already sendingz!another task is already receiving)r   r   _sender_closed_receiver_closed_receiver_waitingr   r   _waitersr	   r   _send_conflict_detector_receive_conflict_detectorr   r   r   r    r!     s   

z_LockstepByteQueue.__init__c                 C     | j   d S r%   )r{   r#   r   r   r   r    _something_happened  rg   z&_LockstepByteQueue._something_happenedfnCallable[[], bool]c                   s>   	 | rn| j s| jrn	| j I d H  qt I d H  d S r%   )rx   ry   r{   r:   r   r;   )r   r   r   r   r    	_wait_for  s   z_LockstepByteQueue._wait_forc                 C     d| _ |   d S r"   )rx   r   r   r   r   r    close_sender     z_LockstepByteQueue.close_senderc                 C  r   r"   )ry   r   r   r   r   r    close_receiver  r   z!_LockstepByteQueue.close_receiverr'   r(   c                   s    j H  jrtj jrtj jrJ   j|7  _     fddI d H   jr3tj jr< jrDtjW d    d S W d    d S 1 sOw   Y  d S )Nc                     s
    j dkS NrM   r   r   r   r   r    <lambda>     
 z-_LockstepByteQueue.send_all.<locals>.<lambda>)	r|   rx   r   r)   ry   ra   r   r   r   r*   r   r   r    rL     s$   
"z_LockstepByteQueue.send_allc                   s    j 4  jrtj jrt I d H  	 W d    d S   fddI d H   jr0tjW d    d S 1 s;w   Y  d S )Nc                     s    j S r%   )rz   r   r   r   r    r   	  s    zB_LockstepByteQueue.wait_send_all_might_not_block.<locals>.<lambda>)r|   rx   r   r)   ry   r;   r   r   r   r   r    rN     s   "z0_LockstepByteQueue.wait_send_all_might_not_blockNr,   r-   bytes | bytearrayc              	     s    j f |d urt|}|dk rtd jrtjd _   z 	 fddI d H  W d _nd _w  jr?tj j
r\ j
d | } j
d |=    |W  d    S  jsaJ 	 W d    dS 1 smw   Y  d S )Nr.   r/   Tc                     s
    j dkS r   r   r   r   r   r    r     r   z1_LockstepByteQueue.receive_some.<locals>.<lambda>FrM   )r}   r0   r1   r2   ry   r   r)   rz   r   r   r   rx   )r   r,   gotr   r   r    rZ     s0   

$z_LockstepByteQueue.receive_somer=   )r   r   r   r   r>   r%   r,   r-   r   r   )r@   rA   rB   r!   r   r   r   r   rL   rN   rZ   r   r   r   r    rw     s    



	


rw   c                   @  s>   e Zd ZdddZdddZdd	d
ZdddZdddZdS )_LockstepSendStreamlbqrw   r   r   c                 C  
   || _ d S r%   _lbqr   r   r   r   r    r!   .     
z_LockstepSendStream.__init__c                 C  r~   r%   )r   r   r   r   r   r    r$   1  rg   z_LockstepSendStream.closec                      |    t I d H  d S r%   rQ   r   r   r   r    rR   4     z_LockstepSendStream.acloser'   r(   c                   s   | j |I d H  d S r%   )r   rL   r*   r   r   r    rL   8  s   z_LockstepSendStream.send_allc                   s   | j  I d H  d S r%   )r   rN   r   r   r   r    rN   ;  s   z1_LockstepSendStream.wait_send_all_might_not_blockNr   rw   r   r   r=   r>   )r@   rA   rB   r!   r$   rR   rL   rN   r   r   r   r    r   -  s    



r   c                   @  s6   e Zd ZdddZdddZdd	d
ZddddZdS )_LockstepReceiveStreamr   rw   r   r   c                 C  r   r%   r   r   r   r   r    r!   @  r   z_LockstepReceiveStream.__init__c                 C  r~   r%   )r   r   r   r   r   r    r$   C  rg   z_LockstepReceiveStream.closec                   r   r%   rQ   r   r   r   r    rR   F  r   z_LockstepReceiveStream.acloseNr,   r-   r   c                   s   | j |I d H S r%   )r   rZ   r3   r   r   r    rZ   J  s   z#_LockstepReceiveStream.receive_somer   r=   r%   r   )r@   rA   rB   r!   r$   rR   rZ   r   r   r   r    r   ?  s
    


r    tuple[SendStream, ReceiveStream]c                  C  s   t  } t| t| fS )a  Create a connected, pure Python, unidirectional stream where data flows
    in lockstep.

    Returns:
      A tuple
      (:class:`~trio.abc.SendStream`, :class:`~trio.abc.ReceiveStream`).

    This stream has *absolutely no* buffering. Each call to
    :meth:`~trio.abc.SendStream.send_all` will block until all the given data
    has been returned by a call to
    :meth:`~trio.abc.ReceiveStream.receive_some`.

    This can be useful for testing flow control mechanisms in an extreme case,
    or for setting up "clogged" streams to use with
    :func:`check_one_way_stream` and friends.

    In addition to fulfilling the :class:`~trio.abc.SendStream` and
    :class:`~trio.abc.ReceiveStream` interfaces, the return objects
    also have a synchronous ``close`` method.

    )rw   r   r   )r   r   r   r    lockstep_stream_one_way_pairN  s   r   Ytuple[StapledStream[SendStream, ReceiveStream], StapledStream[SendStream, ReceiveStream]]c                   C  ru   )a  Create a connected, pure-Python, bidirectional stream where data flows
    in lockstep.

    Returns:
      A tuple (:class:`~trio.StapledStream`, :class:`~trio.StapledStream`).

    This is a convenience function that creates two one-way streams using
    :func:`lockstep_stream_one_way_pair`, and then uses
    :class:`~trio.StapledStream` to combine them into a single bidirectional
    stream.

    )rs   r   r   r   r   r    lockstep_stream_pairi  s   r   )r^   rC   r_   rW   r,   r-   r   r`   )r   rc   )rj   rk   r   rl   )r   rt   )r   r   )r   r   )&
__future__r   r0   collections.abcr   r   typingr   r   r]   r   r	   _highlevel_genericr   abcr   r   objectr   __annotations__r   r   r   r   finalrC   rW   rA   replacerb   ri   rs   rv   rw   r   r   r   r   r   r   r   r    <module>   s@    ?rN	
&
'
U^
