o
    h4                     @   s   d dl Z d dlmZmZ d dlmZ d dlmZmZm	Z	m
Z
mZ d dlZd dlmZ g dZG dd dZG dd	 d	eZG d
d de	ZG dd dZdS )    N)ABCabstractmethod)TracebackType)AnyList
NamedTupleOptionalType)JoinHookJoinableJoinc                   @   s,   e Zd ZdZd	ddZdeddfddZdS )
r
   a  
    This defines a join hook, which provides two entry points in the join
    context manager: a main hook, which is called repeatedly while there exists
    a non-joined process, and a post-hook, which is called once all processes
    have joined.

    To implement a join hook for the generic join context manager, define a
    class that inherits from :class:`JoinHook` and override ``main_hook()`` and
    ``post_hook()`` as appropriate.
    returnNc                 C      dS )z
        This hook is called repeatedly while there exists a non-joined process
        to shadow collective communications in one training iteration (i.e. in
        one forward pass, backward pass, and optimizer step).
        N selfr   r   W/var/www/html/ai/venv/lib/python3.10/site-packages/torch/distributed/algorithms/join.py	main_hook   s   zJoinHook.main_hookis_last_joinerc                 C   r   )a\  
        This hook is called after all processes have joined. It is passed an
        additional ``bool`` argument ``is_last_joiner``, which indicates if the
        rank is one of the last to join.

        Arguments:
            is_last_joiner (bool): ``True`` if the rank is one of the last to
                join; ``False`` otherwise.
        Nr   )r   r   r   r   r   	post_hook   s   
zJoinHook.post_hookr   N)__name__
__module____qualname____doc__r   boolr   r   r   r   r   r
      s    

r
   c                       sd   e Zd ZdZe fddZedefddZeede	j
fddZeedefd	d
Z  ZS )r   aZ  
    This defines an abstract base class for joinable classes. A joinable class
    (inheriting from :class:`Joinable`) should implement :meth:`join_hook`,
    which returns a :class:`JoinHook` instance, in addition to
    :meth:`join_device` and :meth:`join_process_group` that return device and
    process group information, respectively.
    c                    s   t    t | _d S N)super__init___JoinConfigconstruct_disabled_join_config_join_configr   	__class__r   r   r   3   s   
zJoinable.__init__r   c                 K   r   )a  
        Returns a :class:`JoinHook` instance for the given :class:`Joinable`.

        Arguments:
            kwargs (dict): a :class:`dict` containing any keyword arguments
                to modify the behavior of the join hook at run time; all
                :class:`Joinable` instances sharing the same join context
                manager are forwarded the same value for ``kwargs``.
        Nr   )r   kwargsr   r   r   	join_hook8   s   zJoinable.join_hookc                 C   r   )z
        Returns the device from which to perform collective communications
        needed by the join context manager implementation itself.
        Nr   r   r   r   r   join_deviceE      zJoinable.join_devicec                 C   r   )z
        Returns the process group for the collective communications needed by
        the join context manager itself.
        Nr   r   r   r   r   join_process_groupN   r'   zJoinable.join_process_group)r   r   r   r   r   r   r
   r%   propertytorchdevicer&   r   r(   __classcell__r   r   r"   r   r   +   s    r   c                   @   s6   e Zd ZU dZeed< eed< eed< edd ZdS )r   zr
    This includes all fields needed from a :class:`Joinable` instance for the
    join context manager side.
    enablethrow_on_early_terminationis_first_joinablec                   C   s   t ddddS )z
        Returns a :class:`_JoinConfig` instance indicating that join-related
        logic should be disabled, e.g. if the caller is not in a join context
        manager.
        Fr-   r.   r/   )r   r   r   r   r   r    a   s
   z*_JoinConfig.construct_disabled_join_configN)r   r   r   r   r   __annotations__staticmethodr    r   r   r   r   r   X   s   
 r   c                   @   s   e Zd ZdZ		ddee dedefddZdddZdddZ	dd Z
deee  dee dee fddZdd Zdd ZedefddZd
S )r   a
  
    This class defines the generic join context manager, which allows custom
    hooks to be called after a process joins. These hooks should shadow the
    collective communications of non-joined processes to prevent hanging and
    erroring and to ensure algorithmic correctness. Refer to :class:`JoinHook`
    for details about the hook definition.

    .. warning::
        The context manager requires each participating :class:`Joinable` to
        call the method :meth:`notify_join_context()` before its own per-
        iteration collective communications to ensure correctness.

    .. warning::
        The context manager requires that all ``process_group`` attributes in
        the :class:`JoinHook` objects are the same. If there are multiple
        :class:`JoinHook` objects, then the ``device`` of the first is used.
        The process group and device information is used for checking for non-
        joined processes and for notifying processes to throw an exception if
        ``throw_on_early_termination`` is enabled, both of which using an all-
        reduce.

    Arguments:
        joinables (List[Joinable]): a list of the participating
            :class:`Joinable` s; their hooks are iterated over in the given
            order.

        enable (bool): a flag enabling uneven input detection; setting to
            ``False`` disables the context manager's functionality and should
            only be set when the user knows the inputs will not be uneven
            (default: ``True``).

        throw_on_early_termination (bool): a flag controlling whether to throw an
            exception upon detecting uneven inputs (default: ``False``).

    Example::

        >>> import os
        >>> import torch
        >>> import torch.distributed as dist
        >>> import torch.multiprocessing as mp
        >>> # xdoctest: +SKIP
        >>> import torch.nn.parallel.DistributedDataParallel as DDP
        >>> import torch.distributed.optim.ZeroRedundancyOptimizer as ZeRO
        >>> from torch.distributed.algorithms.join import Join
        >>>
        >>> # On each spawned worker
        >>> def worker(rank):
        >>>     dist.init_process_group("nccl", rank=rank, world_size=2)
        >>>     model = DDP(torch.nn.Linear(1, 1).to(rank), device_ids=[rank])
        >>>     optim = ZeRO(model.parameters(), torch.optim.Adam, lr=0.01)
        >>>     # Rank 1 gets one more input than rank 0
        >>>     inputs = [torch.tensor([1.]).to(rank) for _ in range(10 + rank)]
        >>>     with Join([model, optim]):
        >>>         for input in inputs:
        >>>             loss = model(input).sum()
        >>>             loss.backward()
        >>>             optim.step()
        >>>     # All ranks reach here without hanging/erroring
    TF	joinablesr-   r.   c                    sP   t |dkr
td|| _ fdd| jD | _|| _|| _|   |   d S )Nr   z7The join context manager requires at least one joinablec                    s   g | ]
}|j d i  qS )r   )r%   ).0joinabler$   r   r   
<listcomp>   s    z!Join.__init__.<locals>.<listcomp>)len
ValueError
_joinables_join_hooks_enable_throw_on_early_termination_set_joinable_configs_extract_dist_info)r   r3   r-   r.   r$   r   r6   r   r      s   zJoin.__init__r   Nc                 C   s>   t | jdks	J d}| jD ]}t| j| j|d|_d}qdS )zX
        Sets the :class:`_JoinConfig` of each participating :class:`Joinable`.
        r   Tr0   FN)r8   r:   r   r<   r=   r!   )r   r/   r5   r   r   r   r>      s   
zJoin._set_joinable_configsc                 C   sb   d}d}| j D ]}|du r|j}n	||jkrtd|du r!|j}q|| _t| j| _|| _dS )a  
        Extracts the process group and device information from the joinables.
        If there are multiple joinables, then the context manager uses the
        first specified device.

        Preconditions:
            ``self._joinables`` is not ``None`` and is non-empty.

        Raises:
            ValueError
                If there are multiple conflicting ``process_group`` attributes
                among the ``Joinable`` objects.
        Nz7Using join context manager with multiple process groups)	r:   r(   r9   r&   _process_groupdistget_rank_rank_device)r   process_groupr+   r5   r   r   r   r?      s   


zJoin._extract_dist_infoc                 C   s   d S r   r   r   r   r   r   	__enter__   s   zJoin.__enter__typevalue	tracebackc           
   	   C   s   | j r|rdS d}d}d}d}td |sN||kr*td| d| j d	| d
 |  }|dkr5d}n| jr<|   | jD ]}	|		  q?d}|d7 }|r| jD ]}	|	
| qQdS )z
        Repeatedly runs the main hooks until all processes join; then, runs
        the post-hooks.

        Raises:
            RuntimeError
                If ``throw_on_early_termination=True``.
        NFTr   i  oncez+Detected uneven input skew of greater than z. This means that rank z has at least zz fewer inputs than other currently-active ranks. This level of skew could lead to performance degradation during training.   )r<   warningssimplefilterwarnrC   _get_num_nonjoined_procsr=   _notify_procs_to_terminater;   r   r   )
r   rG   rH   rI   all_procs_joinedr   iWARN_THRESHOLDnum_nonjoined_procsr%   r   r   r   __exit__   s>   

	


zJoin.__exit__c                 C   s(   t jd| jd}tj|| jd | S )z|
        Returns the number of non-joined processes by shadowing an all-reduce
        in the non-joined processes.
        rK   r+   group)r*   zerosrD   rA   
all_reducer@   item)r   rT   r   r   r   rO     s   zJoin._get_num_nonjoined_procsc                 C   s2   t jd| jd}tj|| jd td| j d)z
        Schedules an all-reduce to notify non-joined processes to terminate
        and raises a ``RuntimeError`` indicating that the current process has
        exhausted its inputs.
        rK   rV   rW   zRank z exhausted all inputs.)r*   onesrD   rA   rZ   r@   RuntimeErrorrC   )r   r\   r   r   r   rP   &  s   zJoin._notify_procs_to_terminater5   c                 C   s   t | dsJ dt|  d| j}|jr|jsdS | j}| j}tjd|d}t	j
||dd}|jrJtjd|d}t	j
||d	 | }|rJtd
|S )aO  
        Notifies the join context manager that the calling process has not yet
        joined; then, if ``throw_on_early_termination=True``, checks if uneven
        inputs have been detected (i.e. if one process has already joined) and
        throws an exception if so.

        This method should be called from a :class:`Joinable` object before
        its per-iteration collective communications. For example, this should
        be called at the beginning of the forward pass in
        :class:`DistributedDataParallel`.

        Only the first :class:`Joinable` object passed into the context
        manager performs the collective communications in this method, and
        for the others, this method is vacuous.

        Arguments:
            joinable (Joinable): the :class:`Joinable` object calling this
                method.

        Returns:
            An async work handle for the all-reduce meant to notify the context
            manager that the process has not yet joined if ``joinable`` is the
            first one passed into the context manager; ``None`` otherwise.
        r!   zCheck that the z/ constructor calls the ``Joinable`` constructorNrK   rV   T)rX   async_oprW   zLDetected at least one rank that exhausted inputs. Throwing across all ranks.)hasattrrG   r!   r/   r-   r&   r(   r*   r\   rA   rZ   r.   rY   r[   r]   )r5   join_configr+   rE   r\   workrY   should_throwr   r   r   notify_join_context0  s&   zJoin.notify_join_context)TFr   )r   r   r   r   r   r   r   r   r>   r?   rF   r   r	   BaseExceptionr   rU   rO   rP   r2   rc   r   r   r   r   r   p   s2    >




5	
r   )rL   abcr   r   typesr   typingr   r   r   r   r	   r*   torch.distributeddistributedrA   __all__r
   r   r   r   r   r   r   r   <module>   s     -