Module library.s3
Expand source code
import logging
import os
from pathlib import Path
import boto3
from botocore.exceptions import ClientError, ParamValidationError
from rich.progress import (
BarColumn,
Progress,
SpinnerColumn,
TextColumn,
TimeElapsedColumn,
TimeRemainingColumn,
)
from . import pp
class S3:
def __init__(
self,
aws_access_key_id: str,
aws_secret_access_key: str,
aws_s3_endpoint: str,
aws_s3_bucket: str,
):
self.client = boto3.client(
"s3",
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
endpoint_url=aws_s3_endpoint,
)
self.bucket = aws_s3_bucket
def upload_file(self, name: str, version: str, path: str, acl: str = "public-read"):
"""
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-uploading-files.html
"""
suffix = Path(path).suffix
key = f"{name}/{version}/{name}{suffix}"
response = self.put(path, key, acl)
return response
def put(
self, path: str, key: str, acl: str = "public-read", metadata: dict = {}
) -> dict:
with Progress(
SpinnerColumn(spinner_name="earth"),
TextColumn("[progress.description]{task.description}"),
BarColumn(bar_width=30),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeRemainingColumn(),
transient=True,
) as progress:
size = os.stat(path).st_size
task = progress.add_task(
f"[green]Uploading [bold]{os.path.basename(path)}[/bold]", total=size
)
def update_progress(complete):
progress.update(task, completed=complete)
try:
response = self.client.upload_file(
path,
self.bucket,
key,
ExtraArgs={"ACL": acl, "Metadata": metadata},
Callback=update_progress,
)
except ClientError as e:
logging.error(e)
return {}
return response
def exists(self, key: str):
try:
self.client.head_object(Bucket=self.bucket, Key=key)
return True
except ClientError:
return False
def ls(self, prefix: str, detail: bool = False) -> list:
response = self.client.list_objects(Bucket=self.bucket, Prefix=prefix)
if "Contents" in response.keys():
contents = response["Contents"]
if detail:
return contents
else:
return [content["Key"] for content in contents]
else:
return []
# https://s3fs.readthedocs.io/en/latest/api.html?highlight=listdir#s3fs.core.S3FileSystem.info
def info(self, key: str) -> dict:
"""
Get header info for a given file
"""
response = self.client.head_object(Bucket=self.bucket, Key=key)
# Set custom metadata keys to lowercase for compatibility with both
# DigitalOcean and minio standards
meta_lower = {k.lower(): v for k, v in response.get("Metadata").items()}
response.update({"Metadata": meta_lower})
return response
def cp(
self,
source_key: str,
dest_key: str,
acl: str = "public-read",
metadata: dict = {},
info: bool = False,
) -> dict:
"""
Copy a file to a new path within the same bucket
Parameters
----------
key: path within the bucket of the file to copy
dest_ket: new path for the copy
acl: acl for newly created file
metadata: dictionary to save as custom s3 metadata
"""
try:
response = self.client.copy_object(
Bucket=self.bucket,
Key=dest_key,
CopySource={"Bucket": self.bucket, "Key": source_key},
ACL=acl,
Metadata=metadata,
)
if info:
return self.info(key=dest_key)
return
except ParamValidationError as e:
raise ValueError(f"Copy {source_key} -> {dest_key} failed: {e}") from e
def rm(self, *keys) -> dict:
"""
Removes a files within the bucket
sample usage:
s3.rm('path/to/file')
s3.rm('file1', 'file2', 'file3')
s3.rm(*['file1', 'file2', 'file3'])
"""
objects = [{"Key": k} for k in keys]
response = self.client.delete_objects(
Bucket=self.bucket, Delete={"Objects": objects, "Quiet": False}
)
return response
def mv(
self,
source_key: str,
dest_key: str,
acl: str = "public-read",
metadata: dict = {},
info: bool = False,
):
"""
Move a file to a new path within the same bucket.
Calls cp then rm
Parameters
----------
source_key: path within the bucket of the file to move
dest_ket: new path for the copy
acl: acl for newly created file
metadata: dictionary to save as custom s3 metadata
info: if true, get info for file in its new location
"""
response = self.cp(
source_key=source_key, dest_key=dest_key, acl=acl, metadata=metadata
)
response = self.rm(source_key)
if info:
return self.info(key=dest_key)
return
Classes
class S3 (aws_access_key_id: str, aws_secret_access_key: str, aws_s3_endpoint: str, aws_s3_bucket: str)
-
Expand source code
class S3: def __init__( self, aws_access_key_id: str, aws_secret_access_key: str, aws_s3_endpoint: str, aws_s3_bucket: str, ): self.client = boto3.client( "s3", aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, endpoint_url=aws_s3_endpoint, ) self.bucket = aws_s3_bucket def upload_file(self, name: str, version: str, path: str, acl: str = "public-read"): """ https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-uploading-files.html """ suffix = Path(path).suffix key = f"{name}/{version}/{name}{suffix}" response = self.put(path, key, acl) return response def put( self, path: str, key: str, acl: str = "public-read", metadata: dict = {} ) -> dict: with Progress( SpinnerColumn(spinner_name="earth"), TextColumn("[progress.description]{task.description}"), BarColumn(bar_width=30), TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), TimeRemainingColumn(), transient=True, ) as progress: size = os.stat(path).st_size task = progress.add_task( f"[green]Uploading [bold]{os.path.basename(path)}[/bold]", total=size ) def update_progress(complete): progress.update(task, completed=complete) try: response = self.client.upload_file( path, self.bucket, key, ExtraArgs={"ACL": acl, "Metadata": metadata}, Callback=update_progress, ) except ClientError as e: logging.error(e) return {} return response def exists(self, key: str): try: self.client.head_object(Bucket=self.bucket, Key=key) return True except ClientError: return False def ls(self, prefix: str, detail: bool = False) -> list: response = self.client.list_objects(Bucket=self.bucket, Prefix=prefix) if "Contents" in response.keys(): contents = response["Contents"] if detail: return contents else: return [content["Key"] for content in contents] else: return [] # https://s3fs.readthedocs.io/en/latest/api.html?highlight=listdir#s3fs.core.S3FileSystem.info def info(self, key: str) -> dict: """ Get header info for a given file """ response = self.client.head_object(Bucket=self.bucket, Key=key) # Set custom metadata keys to lowercase for compatibility with both # DigitalOcean and minio standards meta_lower = {k.lower(): v for k, v in response.get("Metadata").items()} response.update({"Metadata": meta_lower}) return response def cp( self, source_key: str, dest_key: str, acl: str = "public-read", metadata: dict = {}, info: bool = False, ) -> dict: """ Copy a file to a new path within the same bucket Parameters ---------- key: path within the bucket of the file to copy dest_ket: new path for the copy acl: acl for newly created file metadata: dictionary to save as custom s3 metadata """ try: response = self.client.copy_object( Bucket=self.bucket, Key=dest_key, CopySource={"Bucket": self.bucket, "Key": source_key}, ACL=acl, Metadata=metadata, ) if info: return self.info(key=dest_key) return except ParamValidationError as e: raise ValueError(f"Copy {source_key} -> {dest_key} failed: {e}") from e def rm(self, *keys) -> dict: """ Removes a files within the bucket sample usage: s3.rm('path/to/file') s3.rm('file1', 'file2', 'file3') s3.rm(*['file1', 'file2', 'file3']) """ objects = [{"Key": k} for k in keys] response = self.client.delete_objects( Bucket=self.bucket, Delete={"Objects": objects, "Quiet": False} ) return response def mv( self, source_key: str, dest_key: str, acl: str = "public-read", metadata: dict = {}, info: bool = False, ): """ Move a file to a new path within the same bucket. Calls cp then rm Parameters ---------- source_key: path within the bucket of the file to move dest_ket: new path for the copy acl: acl for newly created file metadata: dictionary to save as custom s3 metadata info: if true, get info for file in its new location """ response = self.cp( source_key=source_key, dest_key=dest_key, acl=acl, metadata=metadata ) response = self.rm(source_key) if info: return self.info(key=dest_key) return
Methods
def cp(self, source_key: str, dest_key: str, acl: str = 'public-read', metadata: dict = {}, info: bool = False) ‑> dict
-
Copy a file to a new path within the same bucket
Parameters
key
:path within the bucket
ofthe file to copy
dest_ket
:new path for the copy
acl
:acl for newly created file
metadata
:dictionary to save as custom s3 metadata
Expand source code
def cp( self, source_key: str, dest_key: str, acl: str = "public-read", metadata: dict = {}, info: bool = False, ) -> dict: """ Copy a file to a new path within the same bucket Parameters ---------- key: path within the bucket of the file to copy dest_ket: new path for the copy acl: acl for newly created file metadata: dictionary to save as custom s3 metadata """ try: response = self.client.copy_object( Bucket=self.bucket, Key=dest_key, CopySource={"Bucket": self.bucket, "Key": source_key}, ACL=acl, Metadata=metadata, ) if info: return self.info(key=dest_key) return except ParamValidationError as e: raise ValueError(f"Copy {source_key} -> {dest_key} failed: {e}") from e
def exists(self, key: str)
-
Expand source code
def exists(self, key: str): try: self.client.head_object(Bucket=self.bucket, Key=key) return True except ClientError: return False
def info(self, key: str) ‑> dict
-
Get header info for a given file
Expand source code
def info(self, key: str) -> dict: """ Get header info for a given file """ response = self.client.head_object(Bucket=self.bucket, Key=key) # Set custom metadata keys to lowercase for compatibility with both # DigitalOcean and minio standards meta_lower = {k.lower(): v for k, v in response.get("Metadata").items()} response.update({"Metadata": meta_lower}) return response
def ls(self, prefix: str, detail: bool = False) ‑> list
-
Expand source code
def ls(self, prefix: str, detail: bool = False) -> list: response = self.client.list_objects(Bucket=self.bucket, Prefix=prefix) if "Contents" in response.keys(): contents = response["Contents"] if detail: return contents else: return [content["Key"] for content in contents] else: return []
def mv(self, source_key: str, dest_key: str, acl: str = 'public-read', metadata: dict = {}, info: bool = False)
-
Move a file to a new path within the same bucket. Calls cp then rm
Parameters
source_key
:path within the bucket
ofthe file to move
dest_ket
:new path for the copy
acl
:acl for newly created file
metadata
:dictionary to save as custom s3 metadata
info
:if true, get info for file in its new location
Expand source code
def mv( self, source_key: str, dest_key: str, acl: str = "public-read", metadata: dict = {}, info: bool = False, ): """ Move a file to a new path within the same bucket. Calls cp then rm Parameters ---------- source_key: path within the bucket of the file to move dest_ket: new path for the copy acl: acl for newly created file metadata: dictionary to save as custom s3 metadata info: if true, get info for file in its new location """ response = self.cp( source_key=source_key, dest_key=dest_key, acl=acl, metadata=metadata ) response = self.rm(source_key) if info: return self.info(key=dest_key) return
def put(self, path: str, key: str, acl: str = 'public-read', metadata: dict = {}) ‑> dict
-
Expand source code
def put( self, path: str, key: str, acl: str = "public-read", metadata: dict = {} ) -> dict: with Progress( SpinnerColumn(spinner_name="earth"), TextColumn("[progress.description]{task.description}"), BarColumn(bar_width=30), TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), TimeRemainingColumn(), transient=True, ) as progress: size = os.stat(path).st_size task = progress.add_task( f"[green]Uploading [bold]{os.path.basename(path)}[/bold]", total=size ) def update_progress(complete): progress.update(task, completed=complete) try: response = self.client.upload_file( path, self.bucket, key, ExtraArgs={"ACL": acl, "Metadata": metadata}, Callback=update_progress, ) except ClientError as e: logging.error(e) return {} return response
def rm(self, *keys) ‑> dict
-
Removes a files within the bucket sample usage: s3.rm('path/to/file') s3.rm('file1', 'file2', 'file3') s3.rm(*['file1', 'file2', 'file3'])
Expand source code
def rm(self, *keys) -> dict: """ Removes a files within the bucket sample usage: s3.rm('path/to/file') s3.rm('file1', 'file2', 'file3') s3.rm(*['file1', 'file2', 'file3']) """ objects = [{"Key": k} for k in keys] response = self.client.delete_objects( Bucket=self.bucket, Delete={"Objects": objects, "Quiet": False} ) return response
def upload_file(self, name: str, version: str, path: str, acl: str = 'public-read')
-
Expand source code
def upload_file(self, name: str, version: str, path: str, acl: str = "public-read"): """ https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-uploading-files.html """ suffix = Path(path).suffix key = f"{name}/{version}/{name}{suffix}" response = self.put(path, key, acl) return response