76 lines
		
	
	
		
			2.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			76 lines
		
	
	
		
			2.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
 | 
						|
from enum import Enum
 | 
						|
import hashlib
 | 
						|
import pathlib
 | 
						|
import tarfile
 | 
						|
import zipfile
 | 
						|
import urllib.request
 | 
						|
from SCons.Script import *
 | 
						|
 | 
						|
Import('env')
 | 
						|
 | 
						|
class ArchiveType(Enum):
 | 
						|
    TAR_GZ = 0
 | 
						|
    ZIP = 1
 | 
						|
 | 
						|
def _detect_archive_type(url: str) -> ArchiveType:
 | 
						|
    if url.lower().endswith('.tar.gz'):
 | 
						|
        return ArchiveType.TAR_GZ
 | 
						|
    elif url.lower().endswith('.zip'):
 | 
						|
        return ArchiveType.ZIP
 | 
						|
    raise Exception('could not detect archive type from URL')
 | 
						|
 | 
						|
def _archive_type_ext(archive_type: ArchiveType) -> str:
 | 
						|
    if archive_type == ArchiveType.TAR_GZ:
 | 
						|
        return 'tar.gz'
 | 
						|
    elif archive_type == ArchiveType.ZIP:
 | 
						|
        return 'zip'
 | 
						|
    raise Exception('invalid archive type')
 | 
						|
 | 
						|
def _download_file(url: str, path: pathlib.Path) -> None:
 | 
						|
    if path.exists():
 | 
						|
        return
 | 
						|
    dl_path = path.with_suffix(f'{path.suffix}.tmp')
 | 
						|
    if dl_path.exists():
 | 
						|
        dl_path.unlink()
 | 
						|
    print(f'Downloading {url} to {dl_path}...')
 | 
						|
    urllib.request.urlretrieve(url, dl_path)
 | 
						|
    dl_path.rename(path)
 | 
						|
 | 
						|
def _extract_file(path: pathlib.Path, output_dir: str, archive_type: ArchiveType, skip_folders: int) -> None:
 | 
						|
    if archive_type == ArchiveType.TAR_GZ:
 | 
						|
        file = tarfile.open(str(path))
 | 
						|
        if skip_folders != 0:
 | 
						|
            def skip_filer(member: tarfile.TarInfo, path: str) -> tarfile.TarInfo:
 | 
						|
                name_parts = member.name.split('/')
 | 
						|
                if len(name_parts) <= skip_folders:
 | 
						|
                    return None
 | 
						|
                return member.replace(name = '/'.join(name_parts[skip_folders:]))
 | 
						|
            file.extraction_filter = skip_filer
 | 
						|
        file.extractall(output_dir)
 | 
						|
        file.close()
 | 
						|
    elif archive_type == ArchiveType.ZIP:
 | 
						|
        file = zipfile.open(str(path))
 | 
						|
        file.extractall(output_dir)
 | 
						|
        file.close()
 | 
						|
    else:
 | 
						|
        raise Exception('invalid archive type')
 | 
						|
 | 
						|
def _download_and_extract(env: Environment, repo_name: str, url: str, skip_folders: int = 0) -> dict:
 | 
						|
    archive_type = _detect_archive_type(url)
 | 
						|
    ext = _archive_type_ext(archive_type)
 | 
						|
    path = pathlib.Path(env['DOWNLOAD_DIR'], f'{hashlib.shake_128(url.encode("utf-8")).hexdigest(6)}.{ext}')
 | 
						|
    output_dir = pathlib.Path(env['CLONE_DIR'], 'download', repo_name)
 | 
						|
    stamp_file = pathlib.Path(output_dir, '.spp_extracted')
 | 
						|
 | 
						|
    if not stamp_file.exists():
 | 
						|
        _download_file(url, path)
 | 
						|
        _extract_file(path, output_dir, archive_type, skip_folders)
 | 
						|
        stamp_file.touch()
 | 
						|
 | 
						|
    return {
 | 
						|
        'extracted_root': str(output_dir)
 | 
						|
    }
 | 
						|
 | 
						|
env.AddMethod(_download_and_extract, 'DownloadAndExtract')
 | 
						|
Return('env') |