o
    hx0                  	   @   s  d dl Z d dlmZmZmZmZmZmZmZ d dl	m
Z
 d dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlm  mZ d dlmZmZmZmZmZm Z  d d	l!m"Z"m#Z# d d
l$m%Z% d dl&m'Z' d dl(m)Z) d dl*m+Z+ d dl,m-Z- d dl.m/Z/m0Z0m1Z1 d dl2m3Z3 ee4eeee5  ee5 f f Z6dgZ7d+de5de4de4fddZ8	d,deej9 defddZ:dej;de<fddZ=d+dedee5 de4dej;fd d!Z>d"edee6eej9 f fd#d$Z?G d%d& d&e)Z@d'ed(e4d)ejAdefd*dZBdS )-    N)DictListOptionalSequenceTupleUnioncast)LoadPlan)ShardedTensor)TensorProperties)Shard)ChunkShardingSpec)BytesStorageMetadataMetadataMetadataIndexSTATE_DICT_TYPETensorStorageMetadataChunkStorageMetadata) create_read_items_for_chunk_list_create_read_items)_remote_device)DTensor)DefaultLoadPlanner)_shard_tensor)unflatten_state_dict)_element_wise_add_element_wise_sub_normalize_device_info)_get_device_module!load_sharded_optimizer_state_dictcudaglobal_rankdevice_typereturnc                 C   s2   |dkrdS t |}| rt|| |  S dS )Ncpu)r   is_availabler   device_count)r!   r"   device_module r(   \/var/www/html/ai/venv/lib/python3.10/site-packages/torch/distributed/checkpoint/optimizer.py_gen_rank_device7   s   r*   pgc                    sl   t j j d u rfddtt  D }n fddt  D }tdtt	t
ttf  |dS )Nc                    s"   g | ]}d | dt |  qS rank:/)r*   .0idx)pg_device_typer(   r)   
<listcomp>E   s    z(_create_colwise_spec.<locals>.<listcomp>c              
      s*   g | ]}d | dt t | qS r,   )r*   distget_global_rankr/   r+   r2   r(   r)   r3   J   s    r   dim
placements)r4   distributed_c10d_get_pg_default_devicetyperangeget_world_sizesizer   r   r   r   r   str)r+   r9   r(   r6   r)   _create_colwise_spec@   s   


rA   valc                 C   s   t | tu r.t|  dkrdS t |  d jtu rdS t |  d jtu r,tddS t | tu rFt | jtu sBt | jtu rFtddS )Nr   FTz2Cannot handle DTensor nested insided ShardedTensorzCannot handle nested DTensor)r<   r
   lenlocal_shardstensorr   
ValueError_local_tensor)rB   r(   r(   r)   _is_nested_tensorT   s   rH   propsr?   c              
   C   s.   t j|| j| j| j| jtt jt|	 dS )N)r?   dtypelayoutrequires_grad
pin_memorydevice)
torchemptyrJ   rK   rL   rM   r   rN   r   current_device)rI   r?   r"   r(   r(   r)   _alloc_tensorf   s   rR   
state_dictc                 C   s   i }d}|   D ]9\}}d| f||< t|rAt| dks$J dt|ts-J d| d }|jj|jj	f||< |j
j}q||fS )a5  
    We have to load the right TP slice of the optimizer state.
    This is not easy since the per-tensor slicing can't be inferred from checkpoint metadata.
    We take advantage of the model state_dict producing a sliced ST to figure out what we need to load.
    This is pretty fragile and it might be easier for FSDP to compute this info for us.
    Returns a dictionary where keys are the same of the state_dict and the value is a tuple of
    (offset, size) for the current rank TP slice.
    N.B. The state_dict *MUST* come from FSDP.sharded_state_dict.
    N   z%Cannot handle ST with multiple shardsz$Can only handle nested ShardedTensorr   )itemsr?   rH   rC   rD   
isinstancer
   metadatashard_offsetsshard_sizesrE   _process_group)rS   specsdp_pgkeyvalueshardr(   r(   r)   _get_state_dict_2d_layoutq   s,   r`   c                       sz   e Zd ZU eeef ed< eed< eed< deee	e
 f ddf fddZdefd	d
Zdedejf fddZ  ZS )_ReaderWithOffsettranslationrS   rW   fqn_to_offsetr#   Nc                    s*   t    || _ti | _i | _i | _d S N)super__init__rc   r   rW   rS   rb   )selfrc   	__class__r(   r)   rf      s
   


z_ReaderWithOffset.__init__c                 C   s   g }i | _ | j D ]\}}| jj| }t|ts"|t|||7 }q
|| jvr0|t|||7 }q
| j| }t	|
 dks?J |
 d }ttt|jj|t|jjdg}t|tt||}|D ]"}	|	jjd usnJ t|	jj|}
tj|	jt|
d}|| j |	j< qd||7 }q
t|S )NrT   r   )offsetssizes)offset)rb   rS   rU   rW   state_dict_metadatarV   r
   r   rc   rC   rD   r   rO   Sizer   rX   rY   r   r   r   
dest_indexrl   r   dataclassesreplacer	   )rg   requestsfqnobjmdrl   original_shardlocal_chunksreqsrioriginal_offsetoriginal_indexr(   r(   r)   create_local_plan   sH   



z#_ReaderWithOffset.create_local_planindexc                    s   t  | j||S rd   )re   lookup_tensorrb   get)rg   r}   rh   r(   r)   r~      s   z_ReaderWithOffset.lookup_tensor)__name__
__module____qualname__r   r   __annotations__r   r   r@   r   intrf   r	   r|   rO   Tensorr~   __classcell__r(   r(   rh   r)   ra      s   
 " .ra   model_state_dictoptimizer_keystorage_readerc              	   C   s  |  }t| \}}tj|j}t|}|du r?g }tt D ]}	t	||	|
  }
|d|	 d|
  q!td|d}nt|}i }i }|j D ]\}}|j| }|d |kr\qLt|trfd||< qL|j dkrxt|j|j|||< qL|du rtt|j|j||||< qL|d }||d|jfd }|t||j}g }t|}|jD ]}tt|j ! |krq|t"t|j|j#||d	 qt$j%|||d
}||v r|| d durtt&t' || d ||< |||< qLt(j)|||durt*|ndd t+||j}|S )a  
    Loads a state_dict in conjunction with FSDP sharded optimizer state.
    This is the current recommended way to checkpoint FSDP.
    >>> # xdoctest: +SKIP
    >>> import torch.distributed.checkpoint as dist_cp
    >>> # Save
    >>> model: torch.nn.Model
    >>> optim_params = model.parameters()
    >>> optim = torch.optim.SGD(optim_params, lr=0.01)
    >>> # Save
    >>> with FSDP.state_dict_type(model, StateDictType.SHARDED_STATE_DICT):
    >>>     state_dict = {
    >>>         "optimizer": FSDP.optim_state_dict(model, optim),
    >>>         "model": model.state_dict()
    >>>     }
    >>>     dist_cp.save_state_dict(
    >>>         state_dict=optim_state,
    >>>         storage_writer=dist_cp.FileSystemWriter("checkpoint"),
    >>>         planner=dist_cp.DefaultSavePlanner(),
    >>>     )
    >>>
    >>> # Load
    >>> with FSDP.state_dict_type(model_tp, StateDictType.SHARDED_STATE_DICT):
    >>>     model_state_dict = model_tp.state_dict()
    >>>     checkpoint = {
    >>>         "model": model_state_dict
    >>>     }
    >>>     dist_cp.load_state_dict(
    >>>         state_dict=checkpoint,
    >>>         storage_reader=dist_cp.FileSystemReader(checkpoint_file),
    >>>         planner=dist_cp.DefaultLoadPlanner(),
    >>>     )
    >>>     model.load_state_dict(checkpoint["model_state"])
    >>>
    >>>     optim_state = dist_cp.load_sharded_optimizer_state_dict(
    >>>         model_state_dict,
    >>>         optimizer_key="optimizer",
    >>>         storage_reader=dist_cp.FileSystemReader("checkpoint"),
    >>>     )
    >>>
    >>>     flattened_osd = FSDP.optim_state_dict_to_load(
    >>>        model, optim, optim_state["optimizer"]
    >>>     )
    >>>
    >>>     optim.load_state_dict(flattened_osd)
    Nr-   r.   r   r7   z
<bytes_io>rT      )rE   rW   )process_group)rS   r   planner),read_metadatar`   r4   r:   r;   r<   r   r=   r>   r   r&   appendr   rA   rm   rU   planner_datarV   r   r?   numelrR   
propertiesr   r   build_metadatarO   rn   get_rankshards_metadatar   r   	placementrankr   rY   r
   +_init_from_local_shards_and_global_metadatar   r   dist_cpload_state_dictra   r   )r   r   r   rW   layout_specsr\   dp_pg_device_typer'   r9   idevice_infosharding_specrS   rc   r]   r^   key_pathspec_key
alloc_sizest_mdrD   current_rankshard_mdstr(   r(   r)   r      s|   3





	
)r    rd   )Crp   typingr   r   r   r   r   r   r   $torch.distributed.checkpoint.plannerr	   rO   torch.distributeddistributedr4   +torch.distributed._shard.sharded_tensor.apir
   0torch.distributed._shard.sharded_tensor.metadatar   -torch.distributed._shard.sharded_tensor.shardr   :torch.distributed._shard.sharding_spec.chunk_sharding_specr   torch.distributed.checkpoint
checkpointr   %torch.distributed.checkpoint.metadatar   r   r   r   r   r   ,torch.distributed.checkpoint.planner_helpersr   r   torch.distributed.remote_devicer   torch.distributed._tensorr   ,torch.distributed.checkpoint.default_plannerr   torch.distributed._shard.apir   )torch.distributed.checkpoint._nested_dictr   "torch.distributed.checkpoint.utilsr   r   r   torch._utilsr   r@   r   STATE_DICT_2D_LAYOUT__all__r*   ProcessGrouprA   r   boolrH   rR   r`   ra   StorageReaderr   r(   r(   r(   r)   <module>   s\   $  

"
$>