o
    h9                     @   s   U d Z ddlZddlmZmZmZmZmZmZ ddlZddlm	Z	 ddl
Zg dZee ed< ee	 Zee	ef Zeegeee e	f f ZG dd dZG d	d
 d
ZdddZdedee fddZdee fddZdS )zManipulation of micro-batches.    N)AnyCallableListUnioncastSequence)Tensor)NoChunkBatchcheckscattergather__all__c                   @   s*   e Zd ZdZdefddZedd ZdS )r	   a  
    Wrapper for a Tensor in :meth:`Pipe.forward` indicating that the tensor
    should not be chunked on the batch dimension and instead be replicated
    as-is across all micro-batches. This is useful for tensors which might
    not have any 'batch' semantics for the model.
    inpc                 C   s"   t |std| || _d S )Nz+NoChunk only supported for tensors, found: )torch	is_tensor	TypeError_tensor)selfr    r   `/var/www/html/ai/venv/lib/python3.10/site-packages/torch/distributed/pipeline/sync/microbatch.py__init__   s   

zNoChunk.__init__c                 C      | j S N)r   r   r   r   r   tensor#   s   zNoChunk.tensorN)__name__
__module____qualname____doc__r   r   propertyr   r   r   r   r   r	      s
    r	   c                   @   s  e Zd ZdZdeee ef ddfddZe	defddZ
e	d	d
 Zdd Zdd Zdedd fddZdefddZdd ZdefddZdefddZejdededdfddZejdededdfddZdeeef ddfddZdeddfd d!Zdeddfd"d#ZdS )$r
   zC
    An abstraction representing a microbatch in the pipeline.
    valuesreturnNc                 C   sD   || _ t|| _| jstdd | j D s td| j  d S d S )Nc                 s       | ]}t |V  qd S r   r   r   ).0valuer   r   r   	<genexpr>3       z!Batch.__init__.<locals>.<genexpr>zNo tensors found in batch: )_valuesr   r   atomicanyr   )r   r!   r   r   r   r   -   s   zBatch.__init__c                 C   s   | j stdtt| jS )z Retrieves the underlying tensor.znot atomic batch)r*   AttributeErrorr   r   r)   r   r   r   r   r   6   s   zBatch.tensorc                 C   r   )z-Retrieves the underlying values for the batch)r)   r   r   r   r   r!   =   s   zBatch.valuesc                 C   s8   | j rdS t| jD ]\}}t|r|  S q
td)z<
        Retrieves the index of first tensor found.
        r   zNo tensor found!)r*   	enumerater)   r   r   r   )r   ir&   r   r   r   find_tensor_idxB   s   
zBatch.find_tensor_idxc                 C   s2   | j r| jjS | jD ]}t|r|j  S q
dS )z;
        Retrieves the device for this microbatch.
        N)r*   r)   devicer   r   )r   r&   r   r   r   
get_deviceN   s   


zBatch.get_devicefunctionc                 C   s"   | j r
t|| jS t|| j S )zbCalls a function on the microbatch. It also wraps
        the output with :class:`Batch`.
        )r*   r
   r)   )r   r2   r   r   r   callY   s   z
Batch.callc                 C   s   d| j d| jdS )NzBatch[atomic=z]()r*   r)   r   r   r   r   __repr__b   s   zBatch.__repr__c                 c   s$    | j r
| jV  d S | jE d H  d S r   r5   r   r   r   r   __iter__e   s   zBatch.__iter__c                 C   s   | j rdS t| jS )N   )r*   lenr)   r   r   r   r   __len__k   s   zBatch.__len__indexc                 C   s&   | j s| j| S |dkrtd| jS )Nr    atomic batch allows index 0 onlyr*   r)   
IndexError)r   r;   r   r   r   __getitem__n   s
   
zBatch.__getitem__r&   c                 C      d S r   r   r   r;   r&   r   r   r   __setitem__x      zBatch.__setitem__c                 C   r@   r   r   rA   r   r   r   rB   |   rC   c                 C   s*   t |tr| || d S | || d S r   )
isinstanceint_setitem_by_index_setitem_by_slicerA   r   r   r   rB      s   
c                 C   sP   | j s|}| jd | |f | j|d d   | _d S |dkr#td|| _d S )Nr8   r   r<   r=   )r   r;   r&   r.   r   r   r   rF      s   (
zBatch._setitem_by_indexc                 C   sf   |j |j  u r|j  u rd u std td| js"|| _d S t|dkr,td|d | _d S )Nzonly slice [:] supportedr8   z5atomic batch cannot be replaced with multiple tensorsr   )startstopstepNotImplementedErrorr*   r)   r9   r>   rA   r   r   r   rG      s   "zBatch._setitem_by_slice)r   r   r   r   r   r   r   r   r   r    r   r!   r/   r1   Functionr3   strr6   r7   rE   r:   r?   typingoverloadrB   sliceTensorsrF   rG   r   r   r   r   r
   (   s*    	
	
r
   r"   c                    sB   t dd |D std| t  fdd|D rtddS )z
    Checks whether the input contains at least one tensor and each tensor is
    on the same device as the first partition.

    Raises:
        ValueError: input does not contain at least one tensor

    c                 s   r#   r   r$   r%   inputr   r   r   r'      r(   zcheck.<locals>.<genexpr>z inputs do not have any tensors: c                 3   s$    | ]}t |o|j kV  qd S r   )r   r   r0   rR   first_devicer   r   r'      s   " z>All inputs should be on the same device as the first partitionN)r+   r   
ValueError)rU   inputsr   rT   r   r      s
   
r   chunksc                 G   s  t |dkrt|d trdd |d | D S dd t| D }d}|D ]S}t|r]|| }|dkrH|t |krHtd| dt | t |}t|D ]\}}|| 	| qPq&t| D ]}t|t
rq|| 	|j qa|| 	| qaq&|d	| }d
d |D S )z7Splits an input mini-batch into multiple micro-batches.r8   r   c                 S      g | ]}t |qS r   r
   r%   xr   r   r   
<listcomp>       zscatter.<locals>.<listcomp>c                 S   s   g | ]}g qS r   r   )r%   _r   r   r   r]      s    z6Found different number of chunks produced for inputs:  and Nc                 S   rY   r   rZ   r[   r   r   r   r]      r^   )r9   rD   r   chunkranger   r   RuntimeErrorr-   appendr	   r   )rX   rW   batches
num_chunksrS   tensorsr.   r   r   r   r   r      s(   


r   outputsc              	   C   s   | d j rtdd | D }t|}|S g }tt| d D ]E}t| d | }g }| D ]}|t|| krEtd| dt||  |||  q-t	| d | r_|t| q|| qt|}|S )z4Concatenates output micro-batches into a mini-batch.r   c                 s   s    | ]}|j V  qd S r   )r   )r%   br   r   r   r'      s    zgather.<locals>.<genexpr>z2Types for microbatch outputs do not match, found: ra   )
r*   tupler   catrc   r9   typer   re   r   )ri   rh   output
output_bufr.   output_typecurrent_outputsbatchr   r   r   r      s"   

r   )r"   N)r   rN   r   r   r   r   r   r   r   r   torch.cuda.commr   rM   __annotations__rQ   TensorOrTensorsrL   r	   r
   r   rE   r   r   r   r   r   r   <module>   s    
w#