o
    h%                  
   @   s  U d Z ddlmZ ddlmZ ddlmZmZmZm	Z	m
Z
mZmZmZmZ ddlZddlmZmZ ddlmZ dd	lmZ dd
lmZmZ ddlmZmZ ddlmZ ddlmZ ddl m!Z!m"Z" ddl#m$Z$m%Z%m&Z& ddl'm(Z(m)Z) dgZ*ee+ e,d< ee Z-eee-f Z.e
ee/ e/ef Z0eree	d  Z1ee
e2ee
def e0df f  Z3neZ1eZ3dededdfddZ4dede$de$ddfddZ5dede$de$ddfddZ6d e7d!e7deee
e7e7f   fd"d#Z8G d$d dZ9dS )%z!The pipeline parallelism of Pipe.    )Queue)TracebackType)	TYPE_CHECKINGIterableListOptionalTupleTypeUnioncastSequenceN)Tensornn)record_function   )Checkpointing)CopyWait)forkjoin)Batch)
SkipLayout)SkipTrackerThroughPotalsuse_skip_tracker)AbstractStreamcurrent_stream
use_device)Taskcreate_workersPipeline__all__r   	fork_fromjoin_toreturnc                 C   s:   |   }|  }t| | \| |< }t|| |||< d S N)find_tensor_idxr   r   )r!   r"   fork_from_idxjoin_to_idxphony r)   ^/var/www/html/ai/venv/lib/python3.10/site-packages/torch/distributed/pipeline/sync/pipeline.py_depend+   s   r+   batchprev_streamnext_streamc                 C   :   t j||g| R  | d d < tdd | D | d d < d S )Nc                 S   *   g | ]}t |r| s| n|qS r)   torch	is_tensoris_floating_pointdetach.0xr)   r)   r*   
<listcomp>6      * z_copy.<locals>.<listcomp>)r   applytupler,   r-   r.   r)   r)   r*   _copy3      r>   c                 C   r/   )Nc                 S   r0   r)   r1   r6   r)   r)   r*   r9   <   r:   z_wait.<locals>.<listcomp>)r   r;   r<   r=   r)   r)   r*   _wait9   r?   r@   mnc                 #   sN    t | | d D ]  fddt td  |  dtd  |D V  q	dS )z)Generates schedules for each clock cycle.r   c                    s   g | ]} | |fqS r)   r)   )r7   jkr)   r*   r9   O   s    z!_clock_cycles.<locals>.<listcomp>r   N)rangemaxmin)rA   rB   r)   rD   r*   _clock_cycles?   s   4rI   c                   @   s   e Zd ZdZdeej deej deee	  de
deddfd	d
Zdee ddfddZdee deeeef  dee ddfddZdee deeeef  dee ddfddZdS )r   z"The pipeline parallelism for Pipe.
partitionsdevicescopy_streamsskip_layoutcheckpoint_stopr#   Nc                 C   s2   || _ || _|| _|| _|| _t|\| _| _d S r$   )rJ   rK   rL   rM   rN   r   	in_queues
out_queues)selfrJ   rK   rL   rM   rN   r)   r)   r*   __init__U   s   zPipeline.__init__batchesc                    sd   | j }| j}| j t|}t|} fdd|D }t||D ]}| ||| | ||| qdS )zURuns pipeline parallelism.

        It modifies the given batches in place.

        c                    s   g | ]}t  qS r)   )r   )r7   _rM   r)   r*   r9   q       z Pipeline.run.<locals>.<listcomp>N)rJ   rK   rM   lenrI   fencecompute)rQ   rS   rJ   rK   rA   rB   skip_trackersscheduler)   rU   r*   rund   s   zPipeline.runr[   rZ   c              	   C   s   | j }| j}|D ]O\}}|dkr|dkrt||d  ||  || | }||D ]\}	}
}||	 | }|| || |||
| q*|dkrW||d  | }t|| || qdS )zWCopies micro-batches after computation for the previous
        micro-batches.
        r   r   N)rL   rM   r+   copy_policycopyr>   )rQ   rS   r[   rZ   rL   rM   irC   r.   prev_jnsnamer-   r)   r)   r*   rX   w   s   zPipeline.fencec                 C   s  | j }| j}| j}| j}| j d jsd}t|}dd |D }	d}
|D ]y\}}|| }|| }|dkr?t||| | |	|  ||k }|rp||| ||ddtjdt	dt
d	t
d
tf
dd}t||}t|	| |j|jd}~~n$|||| ||fdtdtjdt	dt
d	t
d
tfdd}t|	| |dd}~| j| | q#|D ]V\}}| j|  \}}|
durq|stt|}
qttttf |\}}||d krt||	| || |  t||  || W d   n1 sw   Y  |||< q|
dur|
d |
d |
d dS )z0Runs tasks with synchronization to copy streams.r   c                 S   s   g | ]}t |qS r)   )r   )r7   dr)   r)   r*   r9      rV   z$Pipeline.compute.<locals>.<listcomp>N)	partitionskip_trackerchunk_idpart_idrd   re   rf   rg   r#   c              	   W   st   t |, td||f  | | W  d    W  d    S 1 s#w   Y  W d    d S 1 s3w   Y  d S Nzchunk%d-part%d)r   r   )rd   re   rf   rg   inputsr)   r)   r*   function   s   Rz"Pipeline.compute.<locals>.function)rY   finalizer,   c              	   S   sv   t |- td||f  | |W  d    W  d    S 1 s$w   Y  W d    d S 1 s4w   Y  d S rh   )r   r   call)r,   rd   re   rf   rg   r)   r)   r*   rY      s   Rz!Pipeline.compute.<locals>.computer      )rJ   rK   rL   rN   trainingrW   r@   r   Moduler   intTensorOrTensorsr   r   
checkpoint	recomputer   rO   putrP   getr   ExcInfor   r   rk   with_traceback)rQ   rS   r[   rZ   rJ   rK   rL   rN   rB   streamsexc_infor_   rC   r,   rd   rr   rj   chktaskrY   okpayloadr)   r)   r*   rY      s   







zPipeline.compute)__name__
__module____qualname____doc__r   r   
Sequentialr2   devicer   r   rp   rR   r   r\   r   r   rX   rY   r)   r)   r)   r*   r   R   sD    


):r   queuer   typesr   typingr   r   r   r   r   r	   r
   r   r   r2   r   r   torch.autograd.profilerr   rr   r   r^   r   r   
dependencyr   r   
microbatchr   skip.layoutr   skip.trackerr   r   streamr   r   r   workerr   r   r    str__annotations__Tensorsrq   BaseExceptionrv   InQueueboolOutQueuer+   r>   r@   rp   rI   r   r)   r)   r)   r*   <module>   s:   ,$&