o
    /iF"                     @   s   d Z ddlZddlZddlmZmZmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ eeZG dd	 d	ZG d
d deZdS )z2Asynchronous bi-directional streaming RPC helpers.    N)CallableOptionalUnion)aio)
exceptions)BidiRpcBase)Messagec                
   @   sV   e Zd ZdZ	ddejdeeee	g ef f  ddfddZ
defdd	Zd
d ZdS )_AsyncRequestQueueGeneratora  _AsyncRequestQueueGenerator is a helper class for sending asynchronous
      requests to a gRPC stream from a Queue.

    This generator takes asynchronous requests off a given `asyncio.Queue` and
    yields them to gRPC.

    It's useful when you have an indeterminate, indefinite, or otherwise
    open-ended set of requests to send through a request-streaming (or
    bidirectional) RPC.

    Example::

        requests = _AsyncRequestQueueGenerator(q)
        call = await stub.StreamingRequest(requests)
        requests.call = call

        async for response in call:
            print(response)
            await q.put(...)

    Args:
        queue (asyncio.Queue): The request queue.
        initial_request (Union[ProtobufMessage,
                Callable[[], ProtobufMessage]]): The initial request to
            yield. This is done independently of the request queue to allow for
            easily restarting streams that require some initial configuration
            request.
    Nqueueinitial_requestreturnc                 C   s   || _ || _d | _d S N)_queue_initial_requestcall)selfr
   r    r   N/var/www/passon-env/lib/python3.10/site-packages/google/api_core/bidi_async.py__init__>   s   
z$_AsyncRequestQueueGenerator.__init__c                 C   s   | j du p
| j   S )z5Returns true if the call is not set or not completed.Nr   doner   r   r   r   
_is_activeI   s   z&_AsyncRequestQueueGenerator._is_activec                 C  s   | j d urt| j r|   V  n| j V  	 | j I d H }|d u r)td d S |  s=| j|I d H  td d S |V  q)NTz"Cleanly exiting request generator.zEInactive call, replacing item on queue and exiting request generator.)r   callabler   get_LOGGERdebugr   put)r   itemr   r   r   	__aiter__Q   s$   


z%_AsyncRequestQueueGenerator.__aiter__r   )__name__
__module____qualname____doc__asyncioQueuer   r   ProtobufMessager   r   boolr   r   r   r   r   r   r	       s    "
r	   c                   @   sf   e Zd ZdZdejfddZdddZddd	Zd
e	ddfddZ
de	fddZedefddZdS )AsyncBidiRpca:  A helper for consuming a async bi-directional streaming RPC.

    This maps gRPC's built-in interface which uses a request iterator and a
    response iterator into a socket-like :func:`send` and :func:`recv`. This
    is a more useful pattern for long-running or asymmetric streams (streams
    where there is not a direct correlation between the requests and
    responses).

    Example::

        initial_request = example_pb2.StreamingRpcRequest(
            setting='example')
        rpc = AsyncBidiRpc(
            stub.StreamingRpc,
            initial_request=initial_request,
            metadata=[('name', 'value')]
        )

        await rpc.open()

        while rpc.is_active:
            print(await rpc.recv())
            await rpc.send(example_pb2.StreamingRpcRequest(
                data='example'))

        await rpc.close()

    This does *not* retry the stream on errors.

    Args:
        start_rpc (grpc.aio.StreamStreamMultiCallable): The gRPC method used to
            start the RPC.
        initial_request (Union[ProtobufMessage,
                Callable[[], ProtobufMessage]]): The initial request to
            yield. This is useful if an initial request is needed to start the
            stream.
        metadata (Sequence[Tuple(str, str)]): RPC metadata to include in
            the request.
    r   c                 C   s   t  S )zCreate a queue for requests.)r$   r%   r   r   r   r   _create_queue   s   zAsyncBidiRpc._create_queueNc              
      s   | j rtdt| j| jd}z| j|| jdI dH }W n tjy1 } z| 	|j
  d}~ww ||_t|drB|j| j	 n|| j	 || _|| _dS )zOpens the stream.z#Cannot open an already open stream.)r   )metadataN_wrapped)	is_active
ValueErrorr	   _request_queuer   
_start_rpc_rpc_metadatar   GoogleAPICallError_on_call_doneresponser   hasattrr+   add_done_callback_request_generator)r   request_generatorr   excr   r   r   open   s&   

zAsyncBidiRpc.openc                    s>   | j dur| j   | jdI dH  d| _d| _g | _dS )zCloses the stream.N)r   cancelr.   r   r6   r   
_callbacksr   r   r   r   close   s   


zAsyncBidiRpc.closerequestc                    sH   | j du r
td| j  s| j|I dH  dS | j  I dH  dS )zQueue a message to be sent on the stream.

        If the underlying RPC has been closed, this will raise.

        Args:
            request (ProtobufMessage): The request to send.
        Nz8Cannot send on an RPC stream that has never been opened.)r   r-   r   r.   r   read)r   r=   r   r   r   send   s   

zAsyncBidiRpc.sendc                    s$   | j du r
td| j  I dH S )zWait for a message to be returned from the stream.

        If the underlying RPC has been closed, this will raise.

        Returns:
            ProtobufMessage: The received message.
        Nz8Cannot recv on an RPC stream that has never been opened.)r   r-   r>   r   r   r   r   recv   s   
zAsyncBidiRpc.recvc                 C   s   | j duo
| j   S )z0Whether the stream is currently open and active.Nr   r   r   r   r   r,      s   zAsyncBidiRpc.is_active)r   N)r    r!   r"   r#   r$   r%   r)   r9   r<   r&   r?   r@   propertyr'   r,   r   r   r   r   r(   |   s    (

r(   )r#   r$   loggingtypingr   r   r   grpcr   google.api_corer   google.api_core.bidi_baser   google.protobuf.messager   r&   	getLoggerr    r   r	   r(   r   r   r   r   <module>   s   
\