o
    hD=                  
   @   s  d dl mZmZ d dlZd dlZd dlZd dlmZ d dlZd dlZd dl	Z	d dl
Z
d dlmZmZmZmZ d dlZd dlmZ d dlmZ d dlmZ dd	lmZmZ dd
lmZmZmZ ddlmZmZm Z m!Z!m"Z"m#Z#m$Z$m%Z% ddl&m'Z' d dl(m)Z) d dl*m+Z+ ddgZ,eG dd dZ-eG dd dZ.dZ/dejdejfddZ0de$defddZ1G dd deZ2G dd  d e2Z3G d!d" d"e2Z4de$de5fd#d$Z6d%ee$ deee$  fd&d'Z7d(d) Z8d*ej9d+ej9d,e"d-e5d.e:f
d/d0Z;G d1d deZ<G d2d deZ=dS )3    )ABCabstractmethodN)	dataclass)ListUnionDictcast)Tensor)Future)Path   )MetadataMetadataIndex)StorageReaderStorageWriterWriteResult)LoadItemTypeLoadPlannerLoadPlanSavePlanSavePlannerReadItem	WriteItemWriteItemType)_create_file_view)narrow_tensor_by_index)_get_device_moduleFileSystemWriterFileSystemReaderc                   @   s*   e Zd ZU dZeed< eed< eed< dS )_StorageInfoz,
    This is the per entry storage info
    relative_pathoffsetlengthN)__name__
__module____qualname____doc__str__annotations__int r*   r*   ]/var/www/html/ai/venv/lib/python3.10/site-packages/torch/distributed/checkpoint/filesystem.pyr   2   s
   
 r   c                   @   s   e Zd ZU eed< dS )_StoragePrefixprefixN)r#   r$   r%   r'   r(   r*   r*   r*   r+   r,   =   s   
 r,   z.distcptensorreturnc                 C   s,   |    } |   |  kr|  } | S N)detachcpu_typed_storage_sizenumelclone)r.   r*   r*   r+   _trimE   s   r7   itemc                 C   s   t | j||dS )N)indexsize_in_bytesstorage_data)r   r9   )r8   r:   r;   r*   r*   r+   _result_from_write_itemL   s   r<   c                   @   s0   e Zd Zedd Zedd Zedd ZdS )_TensorLoaderc                 C      d S r0   r*   selfsizeobjr*   r*   r+   addU      z_TensorLoader.addc                 C   r>   r0   r*   r@   r*   r*   r+   start_loadingY   rD   z_TensorLoader.start_loadingc                 C   r>   r0   r*   rE   r*   r*   r+   values]   rD   z_TensorLoader.valuesN)r#   r$   r%   r   rC   rF   rG   r*   r*   r*   r+   r=   T   s    

r=   c                   @   s,   e Zd Zdd Zdd Zdd Zdd Zd	S )
_SerialCpuLoaderc                 C   s   || _ g | _d S r0   )resolve_funitems)r@   rI   r*   r*   r+   __init__c   s   
z_SerialCpuLoader.__init__c                 C   s   | j ||f d S r0   )rJ   appendr?   r*   r*   r+   rC   g   s   z_SerialCpuLoader.addc                 C   r>   r0   r*   rE   r*   r*   r+   rF   j      z_SerialCpuLoader.start_loadingc                 c   sR    | j D ]"\}}| | }| }|  | kr!| }||fV  qd S r0   )rJ   rI   r1   r2   storagerA   r5   r6   r@   _rB   r.   r*   r*   r+   rG   m   s   z_SerialCpuLoader.valuesN)r#   r$   r%   rK   rC   rF   rG   r*   r*   r*   r+   rH   b   s
    rH   c                   @   sR   e Zd ZdddZedd Zdd Zd	d
 Zdd Zdd Z	dd Z
dd ZdS )_OverlappingCpuLoaderN@B c                 C   s   || _ g | _|| _d| _t | _d| _d| _|r|j	nt
dj| _	t| j	| _|p/| j | _| j| j krD| j| j  d S d S )Nr   Fcuda)rI   rJ   inflight_threshholdin_flight_datacollectionsdequecurrent_itemsidxstarteddevice_typetorchdevicetyper   device_modulecurrent_streamstreamwait_stream)r@   rI   ra   rT   r*   r*   r+   rK   z   s   
z_OverlappingCpuLoader.__init__c                 C   s   | j t| jkS r0   )rY   lenrJ   rE   r*   r*   r+   _done   s   z_OverlappingCpuLoader._donec                 C   sl   g }| j | jkr| j  | j | jkr4| j }|  j |d  |d   8  _ || | j | jks|S Nr   )	rU   rT   ra   synchronizerX   popleftr5   element_sizerL   )r@   drainedvalr*   r*   r+   _drain   s   

"
z_OverlappingCpuLoader._drainc                 C   s(  | j | j | jsr| j| jk rz| j| j \}}|  jd7  _| | }|j	j
| jkr6|jddd}n|j	t	dkrL|  | krL| }| j||f |  j| |  7  _| js| j| jk sW d    d S W d    d S W d    d S W d    d S 1 sw   Y  d S )Nr   r2   T)r]   non_blocking)r_   ra   rd   rU   rT   rJ   rY   rI   r1   r]   r^   r[   tor\   rN   rA   r5   r6   rX   rL   rh   rO   r*   r*   r+   _refill   s8   "z_OverlappingCpuLoader._refillc                 C   s(   | j sJ t| jdkr| j  | jS re   )rd   rc   rX   ra   rf   rE   r*   r*   r+   _finish   s   

z_OverlappingCpuLoader._finishc                 C   s"   | j rtd| j||f d S )Nz&cannot add items after loading started)rZ   RuntimeErrorrJ   rL   r?   r*   r*   r+   rC      s   z_OverlappingCpuLoader.addc                 C   s.   | j rd S d| _ | jjdd d |   d S )NTc                 S      | d S re   r*   xr*   r*   r+   <lambda>       z5_OverlappingCpuLoader.start_loading.<locals>.<lambda>key)rZ   rJ   sortrn   rE   r*   r*   r+   rF      s
   z#_OverlappingCpuLoader.start_loadingc                 c   sB    |    | js|  }|   |E d H  | jr|  E d H  d S r0   )rF   rd   rk   rn   ro   )r@   ri   r*   r*   r+   rG      s   
z_OverlappingCpuLoader.values)NrR   )r#   r$   r%   rK   propertyrd   rk   rn   ro   rC   rF   rG   r*   r*   r*   r+   rQ   y   s    


rQ   c                 C   sB   d}| j d us	J | j jD ]}||9 }q| j jj}|tj| S Nr   )tensor_datarA   
propertiesdtyper\   _utils_element_size)r8   rA   sr}   r*   r*   r+   
_item_size   s   

r   rJ   c           	      C   s   | dkr|gS dd |D }dd |D }dd t | D }dd t | D }|jtdd t|D ]\}}|||   | q2|D ]}tt|d	d
 dd }|| | ||  t|7  < qB|S )Nr   c                 S      g | ]
}|j tjkr|qS r*   r^   r   BYTE_IO.0wir*   r*   r+   
<listcomp>       z+_split_by_size_and_type.<locals>.<listcomp>c                 S      g | ]
}|j tjkr|qS r*   r   r   r*   r*   r+   r      r   c                 S   s   g | ]}g qS r*   r*   r   rP   r*   r*   r+   r          c                 S   s   g | ]}d qS )r   r*   r   r*   r*   r+   r      r   T)rw   reversec                 S   rq   rz   r*   rr   r*   r*   r+   rt      ru   z)_split_by_size_and_type.<locals>.<lambda>rv   r   )rangerx   r   	enumeraterL   min)	binsrJ   bytes_wtensor_wbucketsbucket_sizesir   rY   r*   r*   r+   _split_by_size_and_type   s   r   c                 C   s   |   }|jtjkrt|tjsJ | |  nt|t	j
s"J |jt	dks,J t	||  |   | }t||t|||S )Nr2   )tellr^   r   r   
isinstanceioBytesIOwrite	getbufferr\   r	   r]   saver<   r   )ra   data
write_itemstorage_keyr!   r"   r*   r*   r+   _write_item   s   r   
file_queueresult_queueplannerrT   	use_fsyncc              	      sB  z	 |   \}}}tj r|dkrt fdd|d}nt fdd}dd |D }	|	D ]
}
|t|
|
 q.|  d	d |D }g }t	|d
=}|D ]}
 
|
}|t|||
| qN| D ]\}}
|jsmJ |t|||
| qd|rt|  W d    n1 sw   Y  || q tjy   Y d S w )NTr   c                    
     | S r0   resolve_datarr   r   r*   r+   rt        
 z)_write_files_from_queue.<locals>.<lambda>)rT   c                    r   r0   r   rr   r   r*   r+   rt     r   c                 S   r   r*   r   r   r*   r*   r+   r         z+_write_files_from_queue.<locals>.<listcomp>c                 S   r   r*   r   r   r*   r*   r+   r     r   wb)
get_nowaitr\   rS   is_availablerQ   rH   rC   r   rF   openr   rL   r   rG   is_cpuosfsyncfilenoputqueueEmpty)r   r   r   rT   r   	file_namer   write_itemsloaderr   r   r   write_resultsra   r   r.   r*   r   r+   _write_files_from_queue   sR   




*r   c                       s   e Zd ZdZ				ddeeejf dedede	d	e	d
df fddZ
ded
dfddZded
efddZdee d
ee fddZdeded
eee  fddZdedeee  d
dfddZ  ZS )r   aa  
    Basic implementation of StorageWriter using file IO.

    This implementation makes the following assumptions and simplifications:

    * The checkpoint path is an empty or non-existing directory.
    * File creation is atomic

    The checkpoint consist of one file per write request plus
    a `.metadata` file with the serialized metadata.

    Tr   逖 pathsingle_file_per_rank
sync_filesthread_countper_thread_copy_aheadr/   Nc                    s0   t    t|| _|| _|| _|| _|| _dS )a  
        Initialize the writer pointing to `path`

        Args:
            path: directory where the checkpoint will be written to.
            single_file_per_rank: Produce one file per rank instead of one file per tensor/blob. Default to True.
            sync_files : force files to be synced to permanent storage. Default to True.
            thread_count: Number of IO threads to use to write. Default to 1.
            per_thread_copy_ahead: How many bytes to copy from the GPU ahead of saving then. Default 10Mb.

        N. B. If sync_files is disabled, there's no guarantee that the checkpoint will be consistent in the case of a failure.
        N)superrK   r   r   r   r   r   r   )r@   r   r   r   r   r   	__class__r*   r+   rK   D  s   


zFileSystemWriter.__init__is_coordinatorc                 C   r>   r0   r*   )r@   r   r*   r*   r+   set_up_storage_writer_  rM   z&FileSystemWriter.set_up_storage_writerplanc                 C   s   | j jddd |S )NT)parentsexist_ok)r   mkdirr@   r   r*   r*   r+   prepare_local_planb  s   z#FileSystemWriter.prepare_local_planglobal_planc                 C   s   dd t |D }|S )Nc                 S   s*   g | ]\}}t j|td | ddqS )__rP   r;   )dataclassesreplacer,   )r   r   r   r*   r*   r+   r   i  s    z8FileSystemWriter.prepare_global_plan.<locals>.<listcomp>)r   )r@   r   	new_plansr*   r*   r+   prepare_global_planf  s   z$FileSystemWriter.prepare_global_planr   c                    s6  |j d  fdd}t }| jr,t| j|jD ]}| }|| j| ||f qn|jD ]}| }|| j| ||gf q/t }g }	t	d| jD ]}
t
jt|||| j| jfd}|  |	| qMt|||| j| jd |	D ]}|  qtg }z	 || 7 }q tjy   	 t }|| | Y S w )Nr   c                     s   j    t }  d7  | S rz   )r-   DEFAULT_SUFFIX)r   
file_countstorage_planr*   r+   gen_filew  s   z-FileSystemWriter.write_data.<locals>.gen_filer   )targetargs)r   r   r   rT   r   )r;   r   Queuer   r   r   rJ   r   r   r   	threadingThreadr   r   r   startrL   joinr   r   r
   
set_result)r@   r   r   r   r   bucketr   r8   r   threadsrP   tresfutr*   r   r+   
write_datao  s`   



zFileSystemWriter.write_datametadataresultsc                 C   s   t  }|D ]}|dd |D  q||_| jd d}t|| t|	  W d    n1 s5w   Y  | jd 
| jd  d S )Nc                 S   s   i | ]}|j |jqS r*   )r9   r;   )r   wrr*   r*   r+   
<dictcomp>  s    z+FileSystemWriter.finish.<locals>.<dictcomp>z.metadata.tmpr   	.metadata)dictupdater;   r   r   pickledumpr   r   r   rename)r@   r   r   
storage_mdwr_listmetadata_filer*   r*   r+   finish  s   zFileSystemWriter.finish)TTr   r   )r#   r$   r%   r&   r   r'   r   PathLikeboolr)   rK   r   r   r   r   r   r   r
   r   r   r   r   __classcell__r*   r*   r   r+   r   6  sP    
	

A
c                       s   e Zd Zdeeejf ddf fddZdefddZ	d	e
d
eded fddZdefddZdededdfddZd	e
de
fddZdee
 dee
 fddZ  ZS )r   r   r/   Nc                    s    t    t|| _t | _d S r0   )r   rK   r   r   r   r;   )r@   r   r   r*   r+   rK     s   

zFileSystemReader.__init__sinfoc                 C   s   t ||j|jS r0   )r   r!   r"   )r@   filer   r*   r*   r+   _slice_file  s   zFileSystemReader._slice_filer   r   c                 C   s`  t  }|jD ]}| j|j }|j}||g | q| D ]\}}| j| dr}	|D ]g}
| j|
j }| 	|	|}|
j
tjkrWt||j}|d ||
| q.tttj|dd}t||
j|
j}||
 }| | ksJ d|
j d|  d|  || ||
| q.W d    n1 sw   Y  qt }| d  |S )Nrbr   r2   )map_locationzreq z mismatch sizes z vs )!r   rJ   r;   storage_indexr    
setdefaultrL   r   r   r   r^   r   r   r   r   readr"   seek
load_bytesr   r	   r\   loadr   storage_offsetslengthsresolve_tensorr1   rA   copy_commit_tensorr
   r   )r@   r   r   per_file	read_itemitem_mdr   r    reqsr   req
file_slicebytesr.   target_tensorr   r*   r*   r+   	read_data  s@   




zFileSystemReader.read_datac                 C   s>   | j d d}t|W  d    S 1 sw   Y  d S )Nr   r   )r   r   r   r  )r@   r   r*   r*   r+   read_metadata  s   $zFileSystemReader.read_metadatar   r   c                 C   s   |j | _ | j d usJ d S r0   r   )r@   r   r   r*   r*   r+   set_up_storage_reader  s   z&FileSystemReader.set_up_storage_readerc                 C      |S r0   r*   r   r*   r*   r+   r     rM   z#FileSystemReader.prepare_local_planr   c                 C   r  r0   r*   )r@   r   r*   r*   r+   r     s   z$FileSystemReader.prepare_global_plan)r#   r$   r%   r   r'   r   r   rK   r   r   r   r   r
   r  r   r  r   r  r   r   r   r   r*   r*   r   r+   r     s     &)>abcr   r   r   r   rV   r   r   r   r   r   typingr   r   r   r   r\   r	   torch.futuresr
   pathlibr   r   r   r   rN   r   r   r   r   r   r   r   r   r   r   r   r   utilsr   torch.distributed._shard._utilsr   torch._utilsr   __all__r   r,   r   r7   r<   r=   rH   rQ   r)   r   r   r   r   r   r   r   r   r*   r*   r*   r+   <module>   st    (

Q


6 	