# -*- coding: utf-8 -*-
"""
S3-Only Backend for Versioned Artifact Management
This module provides a comprehensive S3-based backend for storing and managing
versioned artifacts with alias support, enabling sophisticated deployment patterns
like blue/green deployments, canary releases, and rollbacks.
What is S3-Only Backend?
------------------------------------------------------------------------------
The S3-Only Backend is a storage system that uses Amazon S3 as the sole backend
for artifact versioning and alias management. Unlike hybrid approaches that combine
S3 with DynamoDB, this backend stores all metadata and version information directly
in S3 using a structured folder hierarchy and JSON files.
S3 Folder Structure
------------------------------------------------------------------------------
The backend organizes artifacts in S3 using the following hierarchical structure:
.. code-block:: javascript
s3://{bucket}/{s3_prefix}/ # <--- this is a Repository
├── {artifact_name_1}/ # <--- this is an Artifact
│ ├── versions/ # <--- contains all versions of the artifact
│ │ ├── 000000_LATEST{suffix} # Latest version (always first)
│ │ ├── 999999_000001{suffix} # Version 1 (reverse sorted)
│ │ ├── 999998_000002{suffix} # Version 2
│ │ ├── 999997_000003{suffix} # Version 3
│ │ └── ...
│ └── aliases/ # <--- contains alias JSON files
│ ├── prod.json # Production alias
│ ├── staging.json # Staging alias
│ └── ...
├── {artifact_name_2}/
│ ├── versions/
│ └── aliases/
└── ...
Key Structure Components:
------------------------------------------------------------------------------
- **Root Directory**: ``s3://{bucket}/{s3_prefix}/`` (default: "versioned-artifacts")
- **Artifact Directories**: Each artifact gets its own folder named after the artifact
- **Versions Subdirectory**: Contains all versions of the artifact binary files
- **Aliases Subdirectory**: Contains JSON files mapping aliases to specific versions
Filename Encoding System
------------------------------------------------------------------------------
The backend uses a sophisticated filename encoding system that leverages S3's
alphabetical sorting to ensure proper version ordering:
Version Filename Format
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- **LATEST version**: ``000000_LATEST{suffix}`` (always appears first in listings)
- **Numeric versions**: ``{reverse_sort_key}_{zero_padded_version}{suffix}``
Examples
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- Version ``LATEST`` → ``000000_LATEST.zip``
- Version ``1`` → ``999999_000001.zip``
- Version ``2`` → ``999998_000002.zip``
- Version ``999999`` → ``000001_999999.zip``
The encoding ensures that:
1. LATEST always appears first in S3 listings
2. Newer versions appear before older versions
3. Efficient pagination and sorting without requiring metadata queries
Alias Implementation
------------------------------------------------------------------------------
Aliases are implemented as JSON files stored in the ``aliases/`` subdirectory of
each artifact. Each alias file contains comprehensive metadata about version
mappings and supports advanced deployment patterns.
Alias Structure
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Each alias JSON file contains::
{
"name": "artifact_name",
"alias": "prod",
"version": "5",
"secondary_version": "4",
"secondary_version_weight": 20,
"version_s3uri": "s3://bucket/path/to/version/5",
"secondary_version_s3uri": "s3://bucket/path/to/version/4",
"update_at": "2024-01-01T12:00:00Z"
}
Alias Features:
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1. **Primary Version**: The main version that the alias points to
2. **Secondary Version** (optional): Enables traffic splitting for gradual rollouts
3. **Weight-based Routing**: ``secondary_version_weight`` determines percentage of
traffic routed to the secondary version (0-100)
4. **Immutable References**: Aliases contain full S3 URIs to specific artifact versions
5. **Update Tracking**: Timestamps track when aliases were last modified
Deployment Patterns Enabled
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- **Blue/Green Deployments**: Switch aliases between versions instantly
- **Canary Releases**: Gradually increase traffic to new versions using weights
- **Rollbacks**: Instantly revert to previous versions by updating alias
- **A/B Testing**: Route percentage of traffic to different versions
Core Classes
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The module provides several key classes:
- :class:`Repository`: Main interface for artifact and alias operations
- :class:`Artifact`: Represents a versioned artifact with metadata
- :class:`Alias`: Represents an alias mapping with traffic splitting support
Usage Examples
------------------------------------------------------------------------------
**Repository setup**::
repo = Repository(
aws_region="us-east-1",
s3_bucket="my-artifacts-bucket",
suffix=".zip"
)
repo.bootstrap(bsm)
**Artifact management**::
# Upload and publish versions
repo.put_artifact(bsm, name="myapp", content=binary_data)
repo.publish_artifact_version(bsm, name="myapp") # Creates version 1
# Create alias for production
repo.put_alias(bsm, name="myapp", alias="prod", version=1)
# Canary deployment with traffic splitting
repo.put_alias(
bsm, name="myapp", alias="prod",
version=1, secondary_version=2, secondary_version_weight=10
)
The S3-Only Backend provides a robust, scalable solution for artifact management
with native support for modern deployment practices.
"""
import typing as T
import json
import random
import dataclasses
from datetime import datetime, timedelta
from functools import cached_property
from boto_session_manager import BotoSesManager
from s3pathlib import S3Path
from func_args.api import OPT
from . import exc
from .constants import (
S3_PREFIX,
LATEST_VERSION,
VERSION_ZFILL,
METADATA_KEY_ARTIFACT_SHA256,
)
from .utils import get_utc_now, encode_version
from .vendor.hashes import hashes
[docs]
def encode_filename(version: T.Optional[T.Union[int, str]]) -> str:
"""
Encode version into S3-compatible filename for proper chronological sorting.
Creates filenames that leverage S3's alphabetical object listing to ensure
versions appear in reverse chronological order (newest first). Uses a clever
reverse-sorting algorithm where higher version numbers get lower alphabetical
prefixes, ensuring LATEST always appears first.
The encoding scheme uses the formula:
``{(10^6 - version_number):06d}_{version_number:06d}``
:param version: Version number to encode into filename
:returns: Encoded filename suitable for S3 storage
Examples:
Filename encoding for chronological sorting::
encode_filename(None) # "000000_LATEST"
encode_filename("LATEST") # "000000_LATEST"
encode_filename(1) # "999999_000001"
encode_filename(2) # "999998_000002"
encode_filename(999999) # "000001_999999"
When listed alphabetically in S3, files appear as:
- 000000_LATEST (newest development)
- 000001_999999 (version 999999)
- 999998_000002 (version 2)
- 999999_000001 (version 1, oldest)
"""
# First normalize the version input
version = encode_version(version)
# Special case: LATEST gets prefix of all zeros to appear first
if version == LATEST_VERSION:
return "{}_{}".format(
VERSION_ZFILL * "0", # "000000"
LATEST_VERSION, # "LATEST"
)
else:
# Numeric versions: reverse sort using (10^6 - version_num) as prefix
# This ensures higher version numbers get lower prefixes for reverse chronological order
return "{}_{}".format(
str(10**VERSION_ZFILL - int(version)).zfill(
VERSION_ZFILL
), # Reverse sort prefix
version.zfill(VERSION_ZFILL), # Zero-padded version number
)
[docs]
def decode_filename(version: str) -> str:
"""
Decode S3 filename back to original version string.
Reverses the encoding performed by :func:`encode_filename` to extract
the original version number from the S3-compatible filename format.
:param version: Encoded filename to decode
:returns: Original version string
Examples:
Filename decoding::
decode_filename("000000_LATEST") # "LATEST"
decode_filename("000001_999999") # "999999"
decode_filename("999998_000002") # "2"
decode_filename("999999_000001") # "1"
.. seealso::
:func:`encode_filename` for the encoding process
"""
# Check if filename starts with all zeros (indicates LATEST version)
if version.startswith(VERSION_ZFILL * "0"): # Starts with "000000"
return LATEST_VERSION
else:
# Extract version number from second part after underscore and remove padding
return str(int(version.split("_")[1]))
[docs]
def validate_alias_name(alias: str):
"""
Validate alias name according to naming convention rules.
Ensures alias names follow the required naming patterns to prevent
conflicts with internal version identifiers and maintain consistency
across the artifact management system.
:param alias: Alias name to validate
:raises ValueError: If alias name violates naming conventions
Validation Rules:
- Cannot be "LATEST" (reserved for version identifier)
- Cannot contain hyphens (conflicts with internal encoding)
- Must start with alphabetic character
Examples:
Valid alias names::
validate_alias_name("prod") # Valid
validate_alias_name("staging") # Valid
validate_alias_name("v1_0") # Valid
Invalid alias names::
validate_alias_name("LATEST") # Raises ValueError
validate_alias_name("prod-v1") # Raises ValueError
validate_alias_name("1_prod") # Raises ValueError
"""
if alias == LATEST_VERSION:
raise ValueError(f"alias name cannot be {LATEST_VERSION!r}.")
if "-" in alias:
raise ValueError("alias name cannot contain '-'.")
if alias[0].isalpha() is False:
raise ValueError("alias name must start with a alpha letter.")
[docs]
@dataclasses.dataclass
class Base:
"""
Base class providing common serialization functionality for data classes.
Provides standard dictionary conversion methods used by artifact and alias
data classes for JSON serialization and deserialization operations.
"""
[docs]
@classmethod
def from_dict(cls, dct: dict):
"""
Create instance from dictionary representation.
:param dct: Dictionary containing field values for the data class
:returns: New instance of the data class
"""
return cls(**dct)
[docs]
def to_dict(self) -> dict:
"""
Convert instance to dictionary representation.
:returns: Dictionary containing all field values
"""
return dataclasses.asdict(self)
[docs]
@dataclasses.dataclass
class Artifact(Base):
"""
Immutable artifact version with metadata and content access.
Represents a specific version of an artifact stored in S3, containing
all necessary metadata for version identification, content integrity
verification, and direct access to the stored binary data.
:param name: Artifact identifier unique within the repository
:param version: Version string ("LATEST" or numeric like "1", "2", etc.)
:param update_at: ISO format UTC timestamp when artifact was last modified
:param s3uri: Complete S3 URI pointing to the artifact binary
:param sha256: SHA256 hash for content integrity verification
**Examples**:
Artifact representation::
{
"name": "myapp",
"version": "3",
"update_at": "2024-01-01T12:00:00Z",
"s3uri": "s3://bucket/artifacts/myapp/versions/999997_000003.zip",
"sha256": "abc123...def789"
}
"""
name: str
version: str
update_at: str
s3uri: str
sha256: str
@property
def update_datetime(self) -> datetime:
"""
Parse update timestamp into timezone-aware datetime object.
:returns: Parsed datetime with UTC timezone information
"""
return datetime.fromisoformat(self.update_at)
@property
def s3path(self) -> S3Path:
"""
Create S3Path object for direct S3 operations.
:returns: S3Path instance for reading, copying, or inspecting the artifact
"""
return S3Path(self.s3uri)
[docs]
def get_content(self, bsm: BotoSesManager) -> bytes:
"""
Download and return the complete artifact binary content.
:param bsm: AWS session manager for S3 access
:returns: Complete binary content of the artifact
:raises ClientError: If artifact no longer exists in S3
"""
return self.s3path.read_bytes(bsm=bsm)
[docs]
@dataclasses.dataclass
class Alias(Base):
"""
Named reference to artifact versions with traffic splitting support.
Provides stable references to specific artifact versions that enable
sophisticated deployment patterns including blue/green deployments,
canary releases, and gradual rollouts with weighted traffic distribution.
:param name: Artifact identifier that this alias references
:param alias: Human-readable alias name (cannot contain hyphens)
:param update_at: ISO format UTC timestamp when alias was last modified
:param version: Primary artifact version this alias points to
:param secondary_version: Optional secondary version for traffic splitting
:param secondary_version_weight: Percentage (0-99) of traffic routed to secondary version
:param version_s3uri: Complete S3 URI of the primary artifact version
:param secondary_version_s3uri: Complete S3 URI of the secondary artifact version
**Examples**:
Simple alias pointing to single version::
{
"name": "myapp",
"alias": "prod",
"version": "5",
"secondary_version": null,
"secondary_version_weight": null,
"version_s3uri": "s3://bucket/artifacts/myapp/versions/999995_000005.zip",
"secondary_version_s3uri": null,
"update_at": "2024-01-01T12:00:00Z"
}
Canary deployment with 20% traffic to new version::
{
"name": "myapp",
"alias": "prod",
"version": "5",
"secondary_version": "6",
"secondary_version_weight": 20,
"version_s3uri": "s3://bucket/artifacts/myapp/versions/999995_000005.zip",
"secondary_version_s3uri": "s3://bucket/artifacts/myapp/versions/999994_000006.zip",
"update_at": "2024-01-01T15:30:00Z"
}
"""
name: str
alias: str
update_at: str
version: str
secondary_version: T.Optional[str]
secondary_version_weight: T.Optional[int]
version_s3uri: str
secondary_version_s3uri: T.Optional[str]
@property
def update_datetime(self) -> datetime:
"""
Parse update timestamp into timezone-aware datetime object.
:returns: Parsed datetime with UTC timezone information
"""
return datetime.fromisoformat(self.update_at)
@property
def s3path_version(self) -> S3Path:
"""
Create S3Path object for the primary artifact version.
:returns: S3Path instance for the main version referenced by this alias
"""
return S3Path(self.version_s3uri)
[docs]
def get_version_content(self, bsm: BotoSesManager) -> bytes:
"""
Download and return the primary artifact version content.
:param bsm: AWS session manager for S3 access
:returns: Complete binary content of the primary artifact version
:raises ClientError: If primary artifact no longer exists in S3
"""
return self.s3path_version.read_bytes(bsm=bsm)
@property
def s3path_secondary_version(self) -> S3Path:
"""
Create S3Path object for the secondary artifact version.
:returns: S3Path instance for the secondary version used in traffic splitting
:raises AttributeError: If secondary_version_s3uri is None
"""
return S3Path(self.secondary_version_s3uri)
[docs]
def get_secondary_version_content(self, bsm: BotoSesManager) -> bytes:
"""
Download and return the secondary artifact version content.
:param bsm: AWS session manager for S3 access
:returns: Complete binary content of the secondary artifact version
:raises ClientError: If secondary artifact no longer exists in S3
:raises AttributeError: If secondary_version_s3uri is None
"""
return self.s3path_secondary_version.read_bytes(bsm=bsm)
@cached_property
def _version_weight(self) -> int:
"""
Calculate primary version weight for traffic distribution.
:returns: Percentage weight for primary version (0-100)
"""
if self.secondary_version_weight is None:
return 100
else:
return 100 - self.secondary_version_weight
[docs]
def random_artifact(self) -> str:
"""
Select artifact version based on weighted random distribution.
Implements traffic splitting by randomly selecting between primary
and secondary versions based on the configured weight distribution.
Used for canary deployments and gradual rollouts.
:returns: S3 URI of the selected artifact version
Examples:
Single version alias always returns primary::
alias.random_artifact() # Always returns version_s3uri
Weighted distribution (80% primary, 20% secondary)::
# Returns version_s3uri ~80% of the time
# Returns secondary_version_s3uri ~20% of the time
selected_uri = alias.random_artifact()
"""
# Generate random number between 1-100 for traffic distribution
# _version_weight is the percentage that should go to primary version
if random.randint(1, 100) <= self._version_weight:
# Random number falls within primary version's allocation
return self.version_s3uri
else:
# Random number falls within secondary version's allocation
return self.secondary_version_s3uri
[docs]
@dataclasses.dataclass
class Repository:
"""
S3-based artifact repository with comprehensive version and alias management.
Provides a complete artifact management system using S3 as the storage backend.
Supports versioned artifacts, stable aliases, traffic splitting for deployments,
and automated lifecycle management with intelligent filename encoding for
optimal performance.
The repository organizes artifacts in a hierarchical S3 structure that enables
efficient listing, versioning, and alias management without requiring external
databases or metadata stores.
:param aws_region: AWS region where the S3 bucket resides
:param s3_bucket: S3 bucket name for artifact storage
:param s3_prefix: S3 key prefix (folder path) for organizing artifacts
:param suffix: File extension to append to all artifact files
**Examples**:
Repository initialization::
repo = Repository(
aws_region="us-east-1",
s3_bucket="company-artifacts",
s3_prefix="applications",
suffix=".zip"
)
Complete workflow::
# Setup and upload
repo.bootstrap(bsm)
repo.put_artifact(bsm, "myapp", binary_data)
# Version management
v1 = repo.publish_artifact_version(bsm, "myapp") # Creates version 1
repo.put_artifact(bsm, "myapp", new_binary_data)
v2 = repo.publish_artifact_version(bsm, "myapp") # Creates version 2
# Alias and deployment patterns
repo.put_alias(bsm, "myapp", "prod", version=1) # Blue/green
repo.put_alias( # Canary
bsm, "myapp", "prod", version=1,
secondary_version=2, secondary_version_weight=10
)
.. note::
All artifacts are stored with SHA256 content hashing for integrity
verification and automatic deduplication during publishing.
"""
aws_region: str = dataclasses.field()
s3_bucket: str = dataclasses.field()
s3_prefix: str = dataclasses.field(default=S3_PREFIX)
suffix: str = dataclasses.field(default="")
@property
def s3dir_artifact_store(self) -> S3Path:
"""
Get the root S3 directory for the entire artifact repository.
:returns: S3Path pointing to the repository root directory
Example:
Repository root structure::
s3://my-bucket/artifacts/ # s3dir_artifact_store
├── app1/
├── app2/
└── service-x/
"""
return S3Path(self.s3_bucket).joinpath(self.s3_prefix).to_dir()
def _get_artifact_s3dir(self, name: str) -> S3Path:
"""
Get the S3 directory for a specific artifact.
:param name: Artifact name
:returns: S3Path pointing to the artifact's directory
Example:
Artifact directory structure::
s3://my-bucket/artifacts/myapp/ # _get_artifact_s3dir("myapp")
├── versions/
└── aliases/
"""
return self.s3dir_artifact_store.joinpath(name).to_dir()
def _encode_basename(self, version: T.Optional[T.Union[int, str]] = None) -> str:
return f"{encode_filename(version)}{self.suffix}"
def _decode_basename(self, basename: str) -> str:
"""
Remove file suffix from basename to get encoded version string.
:param basename: Complete filename including suffix
:returns: Encoded version string without suffix
"""
basename = basename[::-1]
suffix = self.suffix[::-1]
basename = basename.replace(suffix, "", 1)
return basename[::-1]
def _get_artifact_s3path(
self,
name: str,
version: T.Optional[T.Union[int, str]] = None,
) -> S3Path:
"""
Get the complete S3 path for a specific artifact version.
:param name: Artifact name
:param version: Version number (defaults to LATEST)
:returns: S3Path pointing to the specific artifact version file
Example:
Version file paths::
s3://bucket/artifacts/myapp/versions/000000_LATEST.zip
s3://bucket/artifacts/myapp/versions/999999_000001.zip
s3://bucket/artifacts/myapp/versions/999998_000002.zip
"""
return self._get_artifact_s3dir(name=name).joinpath(
"versions",
self._encode_basename(version),
)
def _get_alias_s3path(self, name: str, alias: str) -> S3Path:
"""
Get the S3 path for a specific alias JSON file.
:param name: Artifact name
:param alias: Alias name
:returns: S3Path pointing to the alias JSON configuration file
Example:
Alias file paths::
s3://bucket/artifacts/myapp/aliases/prod.json
s3://bucket/artifacts/myapp/aliases/staging.json
s3://bucket/artifacts/myapp/aliases/canary.json
"""
return self._get_artifact_s3dir(name=name).joinpath(
"aliases",
f"{alias}.json",
)
[docs]
def bootstrap(
self,
bsm: BotoSesManager,
):
"""
Initialize the S3 backend by creating the bucket if it doesn't exist.
Ensures the specified S3 bucket exists and is accessible for artifact
storage. Creates the bucket with appropriate regional configuration
if it doesn't already exist.
:param bsm: AWS session manager for S3 operations
:raises ClientError: If bucket creation fails due to permissions or conflicts
.. note::
This method is idempotent - safe to call multiple times.
"""
try:
bsm.s3_client.head_bucket(Bucket=self.s3_bucket)
except Exception as e: # pragma: no cover
if "Not Found" in str(e):
kwargs = dict(Bucket=self.s3_bucket)
if self.aws_region != "us-east-1":
kwargs["CreateBucketConfiguration"] = dict(
LocationConstraint=self.aws_region
)
bsm.s3_client.create_bucket(**kwargs)
else:
raise e
def _get_artifact_object(
self,
bsm: BotoSesManager,
name: str,
version: T.Optional[T.Union[int, str]] = None,
) -> Artifact:
try:
s3path = self._get_artifact_s3path(name=name, version=version)
s3path.head_object(bsm=bsm)
return Artifact(
name=name,
version=encode_version(version),
update_at=s3path.last_modified_at.isoformat(),
s3uri=s3path.uri,
sha256=s3path.metadata[METADATA_KEY_ARTIFACT_SHA256],
)
except Exception as e: # pragma: no cover
error_msg = str(e)
if "Not Found" in error_msg or "does not exist" in error_msg:
raise exc.ArtifactNotFoundError(
f"Cannot find artifact: artifact name = {name!r}, version = {version!r}"
)
else:
raise e
def _list_artifact_versions_s3path(
self,
bsm: BotoSesManager,
name: str,
limit: T.Optional[int] = None,
) -> T.List[S3Path]: # pragma: no cover
"""
List the s3path of artifact versions of this artifact. Perform additional
check to make sure the s3 dir structure are not contaminated.
"""
s3dir_artifact = self._get_artifact_s3dir(name=name).joinpath("versions")
if limit is None:
s3path_list = s3dir_artifact.iter_objects(bsm=bsm).all()
else:
s3path_list = s3dir_artifact.iter_objects(bsm=bsm, limit=limit).all()
n = len(s3path_list)
if n >= 1:
if (
decode_filename(self._decode_basename(s3path_list[0].basename))
!= LATEST_VERSION
):
raise exc.ArtifactS3BackendError(
f"S3 folder {s3dir_artifact.uri} for artifact {name!r} is contaminated! "
f"The first s3 object is not the LATEST version {s3path_list[0].uri}"
)
if n >= 2:
for s3path in s3path_list[1:]:
if (
decode_filename(self._decode_basename(s3path.basename)).isdigit()
is False
):
raise exc.ArtifactS3BackendError(
f"S3 folder {s3dir_artifact.uri} for artifact {name!r} is contaminated! "
f"Found non-numeric version {s3path.uri!r}"
)
return s3path_list
[docs]
def get_latest_published_artifact_version_number(
self,
bsm: BotoSesManager,
name: str,
) -> int:
"""
Get the highest published version number for an artifact.
Returns the numeric version of the most recently published artifact.
Returns 0 if only the LATEST version exists (no published versions yet).
:param bsm: AWS session manager for S3 operations
:param name: Artifact name to check
:returns: Highest published version number, or 0 if none published
Examples:
Version progression::
repo.put_artifact(bsm, "myapp", data) # Creates LATEST
repo.get_latest_published_version(bsm, "myapp") # Returns 0
repo.publish_artifact_version(bsm, "myapp") # Creates version 1
repo.get_latest_published_version(bsm, "myapp") # Returns 1
repo.put_artifact(bsm, "myapp", new_data) # Updates LATEST
repo.publish_artifact_version(bsm, "myapp") # Creates version 2
repo.get_latest_published_version(bsm, "myapp") # Returns 2
"""
s3path_list = self._list_artifact_versions_s3path(
bsm=bsm,
name=name,
limit=2,
)
if len(s3path_list) in [0, 1]:
return 0
else:
return int(decode_filename(self._decode_basename(s3path_list[1].basename)))
def _get_alias_object(
self,
bsm: BotoSesManager,
name: str,
alias: str,
) -> Alias:
try:
s3path = self._get_alias_s3path(name=name, alias=alias)
s3path.head_object(bsm=bsm)
alias = Alias.from_dict(json.loads(s3path.read_text(bsm=bsm)))
alias.update_at = s3path.last_modified_at.isoformat()
return alias
except Exception as e:
error_msg = str(e)
if "Not Found" in error_msg or "does not exist" in error_msg:
raise exc.AliasNotFoundError(
f"Cannot find alias: artifact name = {name!r}, alias = {alias!r}"
)
else: # pragma: no cover
raise e
# --------------------------------------------------------------------------
# Artifact
# --------------------------------------------------------------------------
[docs]
def put_artifact(
self,
bsm: BotoSesManager,
name: str,
content: bytes,
content_type: str = OPT,
metadata: T.Dict[str, str] = OPT,
tags: T.Dict[str, str] = OPT,
) -> Artifact:
"""
Upload or update the LATEST version of an artifact.
Creates or updates the mutable LATEST version with new content. If the
content is identical to the existing LATEST version (based on SHA256),
no upload occurs and the existing artifact metadata is returned.
This is typically the first step in the artifact workflow, followed by
:meth:`publish_artifact_version` to create immutable numbered versions.
:param bsm: AWS session manager for S3 operations
:param name: Unique artifact identifier within the repository
:param content: Binary artifact data to store
:param content_type: MIME type for S3 object (auto-detected if not provided)
:param metadata: Additional S3 object metadata key-value pairs
:param tags: S3 object tags for categorization and billing
:returns: Artifact object with metadata for the LATEST version
Examples:
Basic artifact upload::
with open("app.zip", "rb") as f:
artifact = repo.put_artifact(
bsm=bsm,
name="myapp",
content=f.read()
)
With metadata and content type::
artifact = repo.put_artifact(
bsm=bsm,
name="myapp",
content=binary_data,
content_type="application/zip",
metadata={"build_id": "12345", "commit": "abc123"},
tags={"environment": "production", "team": "backend"}
)
.. note::
Content deduplication is automatic - identical content won't be
re-uploaded, making this operation safe for CI/CD pipelines.
"""
# Calculate SHA256 hash for content integrity checking and deduplication
artifact_sha256 = hashes.of_bytes(content)
# Get S3 path for LATEST version (always encoded as 000000_LATEST{suffix})
s3path = self._get_artifact_s3path(name=name, version=LATEST_VERSION)
# Optimization: Skip upload if content hasn't changed (deduplication)
if s3path.exists(bsm=bsm):
# Compare SHA256 hash stored in S3 metadata with new content hash
if s3path.metadata[METADATA_KEY_ARTIFACT_SHA256] == artifact_sha256:
# Content is identical, return existing artifact info without uploading
return Artifact(
name=name,
version=LATEST_VERSION,
update_at=s3path.last_modified_at.isoformat(),
s3uri=s3path.uri,
sha256=artifact_sha256,
)
# Content has changed or this is a new artifact, prepare for upload
# Build metadata dictionary with required fields
final_metadata = dict(
artifact_name=name, # Store artifact name for reference
artifact_sha256=artifact_sha256, # Store hash for future deduplication
)
# Merge user-provided metadata if any
if metadata is not OPT:
final_metadata.update(metadata)
# Upload new content to S3 LATEST version with metadata and tags
s3path.write_bytes(
content,
metadata=final_metadata,
content_type=content_type,
tags=tags,
bsm=bsm,
)
# Refresh object metadata to get accurate post-upload timestamp
s3path.head_object(bsm=bsm)
# Return artifact object with updated information
return Artifact(
name=name,
version=LATEST_VERSION,
update_at=s3path.last_modified_at.isoformat(),
s3uri=s3path.uri,
sha256=artifact_sha256,
)
[docs]
def get_artifact_version(
self,
bsm: BotoSesManager,
name: str,
version: T.Optional[T.Union[int, str]] = None,
) -> Artifact:
"""
Retrieve metadata for a specific artifact version.
Returns comprehensive information about an artifact version including
S3 location, content hash, and modification timestamps without downloading
the actual binary content.
:param bsm: AWS session manager for S3 operations
:param name: Artifact name to retrieve
:param version: Specific version to get (defaults to LATEST)
:returns: Artifact object with complete metadata
:raises ArtifactNotFoundError: If the specified artifact or version doesn't exist
Examples:
Get latest version::
artifact = repo.get_artifact_version(bsm, "myapp")
print(f"Latest SHA256: {artifact.sha256}")
Get specific version::
v2 = repo.get_artifact_version(bsm, "myapp", version=2)
print(f"Version 2 S3 URI: {v2.s3uri}")
print(f"Updated: {v2.update_datetime}")
"""
return self._get_artifact_object(
bsm=bsm,
name=name,
version=version,
)
[docs]
def list_artifact_versions(
self,
bsm: BotoSesManager,
name: str,
) -> T.List[Artifact]:
"""
List all versions of an artifact in reverse chronological order.
Returns all available versions including the mutable LATEST version
and all published immutable versions. The list is ordered with LATEST
first, followed by numbered versions from newest to oldest.
:param bsm: AWS session manager for S3 operations
:param name: Artifact name to list versions for
:returns: List of Artifact objects sorted by version (newest first)
:raises ArtifactNotFoundError: If the artifact doesn't exist
Examples:
List all versions::
versions = repo.list_artifact_versions(bsm, "myapp")
for artifact in versions:
print(f"Version {artifact.version}: {artifact.update_at}")
Output example::
Version LATEST: 2024-01-01T15:30:00Z
Version 3: 2024-01-01T14:00:00Z
Version 2: 2024-01-01T12:00:00Z
Version 1: 2024-01-01T10:00:00Z
"""
artifact_list = list()
for s3path in self._list_artifact_versions_s3path(
bsm=bsm,
name=name,
):
s3path.head_object(bsm=bsm)
artifact = Artifact(
name=name,
version=decode_filename(self._decode_basename(s3path.basename)),
update_at=s3path.last_modified_at.isoformat(),
s3uri=s3path.uri,
sha256=s3path.metadata[METADATA_KEY_ARTIFACT_SHA256],
)
artifact_list.append(artifact)
return artifact_list
[docs]
def publish_artifact_version(
self,
bsm: BotoSesManager,
name: str,
) -> Artifact:
"""
Create an immutable numbered version from the current LATEST artifact.
Copies the current LATEST version to create a new numbered version that
cannot be modified. If the LATEST content is identical to the most recent
published version (based on SHA256), no new version is created and the
existing version is returned.
This enables immutable deployments while maintaining a mutable development
version in LATEST. Version numbers are automatically incremented.
:param bsm: AWS session manager for S3 operations
:param name: Artifact name to publish a version for
:returns: Artifact object representing the published version
:raises ArtifactNotFoundError: If no LATEST version exists
Examples:
Publishing workflow::
# Upload new content
repo.put_artifact(bsm, "myapp", new_content)
# Create immutable version
v1 = repo.publish_artifact_version(bsm, "myapp")
print(f"Published version {v1.version}") # "Published version 1"
# Update and publish again
repo.put_artifact(bsm, "myapp", newer_content)
v2 = repo.publish_artifact_version(bsm, "myapp")
print(f"Published version {v2.version}") # "Published version 2"
No-op when content unchanged::
# If LATEST hasn't changed since last publish
same_version = repo.publish_artifact_version(bsm, "myapp")
# Returns existing version, no new version created
.. note::
Published versions are immutable and cannot be modified. Use
:meth:`delete_artifact_version` to remove unwanted versions.
"""
# Get up to 2 most recent artifact versions to check state
# Returns list sorted by S3 alphabetical order: [LATEST, most_recent_version]
s3path_list = self._list_artifact_versions_s3path(
bsm=bsm,
name=name,
limit=2, # Only need LATEST + 1 previous version for comparison
)
n = len(s3path_list)
# Ensure LATEST version exists before we can publish
if n == 0:
raise exc.ArtifactNotFoundError(
f"artifact {name!r} not found! you must put artifact first!"
)
# First item is always LATEST due to filename encoding (000000_LATEST)
s3path_latest = s3path_list[0]
# Case 1: Only LATEST exists, create first numbered version (version 1)
if n == 1:
new_version = "1"
# Generate S3 path for new version 1 (will be encoded as 999999_000001)
s3path_new = self._get_artifact_s3path(name=name, version=new_version)
# Copy LATEST content to version 1 with all metadata intact
s3path_latest.copy_to(s3path_new, bsm=bsm)
# Refresh metadata after copy to get accurate timestamps
s3path_new.head_object(bsm=bsm)
return Artifact(
name=name,
version=new_version,
update_at=s3path_new.last_modified_at.isoformat(),
s3uri=s3path_new.uri,
sha256=s3path_new.metadata[METADATA_KEY_ARTIFACT_SHA256],
)
# Case 2: Both LATEST and previous version exist, need to compare content
else:
# Second item is the most recent numbered version
s3path_previous = s3path_list[1]
# Extract version number from encoded filename (e.g., 999998_000002 -> "2")
previous_version = decode_filename(
self._decode_basename(s3path_previous.basename)
)
# Calculate next version number by incrementing
new_version = str(int(previous_version) + 1)
# Content deduplication: check if LATEST is identical to previous version
# ETags are MD5 hashes that change when content changes
if s3path_previous.etag == s3path_latest.etag:
# Content is identical, return existing version instead of creating duplicate
s3path_previous.head_object(bsm=bsm)
return Artifact(
name=name,
version=previous_version, # Return existing version, not new one
update_at=s3path_previous.last_modified_at.isoformat(),
s3uri=s3path_previous.uri,
sha256=s3path_previous.metadata[METADATA_KEY_ARTIFACT_SHA256],
)
else:
# Content has changed, create new numbered version
s3path_new = self._get_artifact_s3path(name=name, version=new_version)
# Copy LATEST to new version with incremented number
s3path_latest.copy_to(s3path_new, bsm=bsm)
# Refresh metadata to get post-copy timestamps
s3path_new.head_object(bsm=bsm)
return Artifact(
name=name,
version=new_version,
update_at=s3path_new.last_modified_at.isoformat(),
s3uri=s3path_new.uri,
sha256=s3path_new.metadata[METADATA_KEY_ARTIFACT_SHA256],
)
[docs]
def delete_artifact_version(
self,
bsm: BotoSesManager,
name: str,
version: T.Union[int, str],
):
"""
Permanently delete a specific artifact version from S3.
Removes the specified version file from S3 storage. The LATEST version
cannot be deleted using this method - use :meth:`purge_artifact` to
remove all versions including LATEST.
This operation is irreversible and should be used carefully, especially
if the version is referenced by active aliases.
:param bsm: AWS session manager for S3 operations
:param name: Artifact name
:param version: Specific version number to delete (cannot be "LATEST")
:raises ArtifactS3BackendError: If attempting to delete the LATEST version
Examples:
Delete specific versions::
# Delete old version no longer needed
repo.delete_artifact_version(bsm, "myapp", version=1)
# This will raise an error
repo.delete_artifact_version(bsm, "myapp", version="LATEST")
.. warning::
Deleting a version that is actively referenced by aliases may cause
deployment issues. Check alias references before deletion.
"""
if encode_version(version) == LATEST_VERSION:
raise exc.ArtifactS3BackendError(
"You cannot delete the LATEST artifact version!"
)
self._get_artifact_s3path(name=name, version=version).delete(bsm=bsm)
[docs]
def list_artifact_names(
self,
bsm: BotoSesManager,
) -> T.List[str]:
"""
List all artifact names stored in this repository.
Scans the repository root directory to find all artifacts that have
been created. Returns just the artifact names without version information.
:param bsm: AWS session manager for S3 operations
:returns: Sorted list of unique artifact names
Examples:
Discover all artifacts::\
names = repo.list_artifact_names(bsm)
print(f"Found artifacts: {names}")
# Output: ["api-service", "frontend", "worker-service"]
Use with other operations::\
for name in repo.list_artifact_names(bsm):
latest = repo.get_artifact_version(bsm, name)
print(f"{name}: {latest.version} ({latest.update_at})")
"""
names = list()
for p in self.s3dir_artifact_store.iterdir(bsm=bsm):
if p.is_dir():
names.append(p.basename)
return names
# ------------------------------------------------------------------------------
# Alias
# ------------------------------------------------------------------------------
[docs]
def put_alias(
self,
bsm: BotoSesManager,
name: str,
alias: str,
version: T.Optional[T.Union[int, str]] = None,
secondary_version: T.Optional[T.Union[int, str]] = None,
secondary_version_weight: T.Optional[int] = None,
) -> Alias:
"""
Create or update an alias pointing to artifact versions with traffic splitting.
Creates stable references to specific artifact versions that enable sophisticated
deployment patterns. Supports optional traffic splitting between a primary and
secondary version for gradual rollouts and canary deployments.
:param bsm: AWS session manager for S3 operations
:param name: Artifact name to create alias for
:param alias: Human-readable alias name (cannot contain hyphens)
:param version: Primary version to point to (defaults to LATEST)
:param secondary_version: Optional secondary version for traffic splitting
:param secondary_version_weight: Percentage (0-99) of traffic routed to secondary
:returns: Alias object with complete configuration
:raises ValueError: If alias name contains hyphens or weight is invalid
:raises TypeError: If secondary_version_weight is not an integer
:raises ArtifactNotFoundError: If specified versions don't exist
Examples:
Simple alias (blue/green deployment)::
# Point prod to version 5
alias = repo.put_alias(bsm, "myapp", "prod", version=5)
Canary deployment with traffic splitting::
# 80% traffic to v5, 20% to v6
alias = repo.put_alias(
bsm, "myapp", "prod",
version=5,
secondary_version=6,
secondary_version_weight=20
)
Point to latest development::
# Alias points to mutable LATEST version
alias = repo.put_alias(bsm, "myapp", "dev")
.. note::
Aliases are immediately active after creation. Ensure versions
are thoroughly tested before updating production aliases.
"""
# Step 1: Validate alias naming conventions
# Hyphens conflict with our internal encoding scheme
if "-" in alias: # pragma: no cover
raise ValueError("alias cannot have hyphen")
# Step 2: Normalize version inputs to standardized format
# Converts None/"LATEST"/1/"000001" to consistent strings ("LATEST"/"1")
version = encode_version(version)
# Step 3: Validate traffic splitting configuration if secondary version provided
if secondary_version is not None:
# Normalize secondary version the same way
secondary_version = encode_version(secondary_version)
# Ensure weight is proper integer type for traffic percentage
if not isinstance(secondary_version_weight, int):
raise TypeError("secondary_version_weight must be int")
# Weight must be 0-99 (percentage to route to secondary, 100-weight goes to primary)
if not (0 <= secondary_version_weight < 100):
raise ValueError("secondary_version_weight must be 0 <= x < 100")
# Prevent same version for primary and secondary (would be meaningless)
if version == secondary_version:
raise ValueError(
f"version {version!r} and secondary_version {secondary_version!r} "
f"cannot be the same!"
)
# Step 4: Verify primary version exists in S3 before creating alias
version_s3path = self._get_artifact_s3path(name=name, version=version)
version_s3uri = version_s3path.uri # Store URI for alias JSON
if version_s3path.exists(bsm=bsm) is False:
raise exc.ArtifactNotFoundError(
f"Cannot put alias to artifact name = {name!r}, version = {version}"
)
# Step 5: Verify secondary version exists if traffic splitting is configured
if secondary_version is not None:
secondary_version_s3path = self._get_artifact_s3path(
name=name,
version=secondary_version,
)
secondary_version_s3uri = (
secondary_version_s3path.uri
) # Store URI for alias JSON
if secondary_version_s3path.exists(bsm=bsm) is False:
raise exc.ArtifactNotFoundError(
f"Cannot put alias to artifact name = {name!r}, version = {secondary_version}"
)
else:
# No traffic splitting, secondary URI remains None
secondary_version_s3uri = None
# Step 6: Create alias data structure with all configuration
alias_obj = Alias(
name=name,
alias=alias,
version=version,
update_at="unknown", # Will be updated after S3 write with actual timestamp
secondary_version=secondary_version,
secondary_version_weight=secondary_version_weight,
version_s3uri=version_s3uri, # Full S3 URI to primary version
secondary_version_s3uri=secondary_version_s3uri, # Full S3 URI to secondary (or None)
)
# Step 7: Write alias configuration to S3 as JSON file
# Path: s3://bucket/prefix/artifact_name/aliases/alias_name.json
alias_s3path = self._get_alias_s3path(name=name, alias=alias)
# Convert alias object to JSON and write to S3
alias_s3path.write_text(json.dumps(alias_obj.to_dict()), bsm=bsm)
# Step 8: Get accurate timestamp from S3 after write operation
alias_s3path.head_object(bsm=bsm)
# Update alias object with real S3 modification timestamp
alias_obj.update_at = alias_s3path.last_modified_at.isoformat()
return alias_obj
[docs]
def get_alias(
self,
bsm: BotoSesManager,
name: str,
alias: str,
) -> Alias:
"""
Retrieve detailed information about a specific alias.
Returns complete alias configuration including version references,
traffic splitting settings, and update timestamps.
:param bsm: AWS session manager for S3 operations
:param name: Artifact name
:param alias: Alias name to retrieve
:returns: Alias object with complete configuration
:raises AliasNotFoundError: If the specified alias doesn't exist
Examples:
Get alias configuration::
alias = repo.get_alias(bsm, "myapp", "prod")
print(f"Primary version: {alias.version}")
if alias.secondary_version:
print(f"Secondary: {alias.secondary_version} ({alias.secondary_version_weight}%)")
Use for deployment decisions::
alias = repo.get_alias(bsm, "myapp", "prod")
selected_uri = alias.random_artifact() # Traffic splitting
artifact_data = S3Path(selected_uri).read_bytes(bsm)
"""
return self._get_alias_object(bsm=bsm, name=name, alias=alias)
[docs]
def list_aliases(
self,
bsm: BotoSesManager,
name: str,
) -> T.List[Alias]:
"""
List all aliases configured for a specific artifact.
Returns all alias configurations for the given artifact, including
their version references and traffic splitting settings.
:param bsm: AWS session manager for S3 operations
:param name: Artifact name to list aliases for
:returns: List of Alias objects for the artifact
Examples:
List all aliases::
aliases = repo.list_aliases(bsm, "myapp")
for alias in aliases:
print(f"{alias.alias}: v{alias.version}")
if alias.secondary_version:
print(f" └─ Canary: v{alias.secondary_version} ({alias.secondary_version_weight}%)")
Output example::
prod: v5
└─ Canary: v6 (20%)
staging: vLATEST
dev: vLATEST
"""
s3dir = self._get_artifact_s3dir(name=name).joinpath("aliases")
alias_list = list()
for s3path in s3dir.iter_objects(bsm=bsm):
alias = Alias.from_dict(json.loads(s3path.read_text(bsm=bsm)))
alias.update_at = s3path.last_modified_at.isoformat()
alias_list.append(alias)
return alias_list
[docs]
def delete_alias(
self,
bsm: BotoSesManager,
name: str,
alias: str,
):
"""
Permanently delete an alias from the repository.
Removes the alias configuration file from S3. This operation is
irreversible and will immediately stop routing traffic to the
versions referenced by this alias.
:param bsm: AWS session manager for S3 operations
:param name: Artifact name
:param alias: Alias name to delete
Examples:
Remove obsolete aliases::
# Remove old staging alias
repo.delete_alias(bsm, "myapp", "staging_v2")
Cleanup after blue/green deployment::
# After successful deployment, remove canary
repo.put_alias(bsm, "myapp", "prod", version=6) # Full traffic to v6
repo.delete_alias(bsm, "myapp", "canary") # Remove canary alias
.. warning::
Deleting an alias that is actively used by applications will cause
immediate failures. Ensure no systems depend on the alias before deletion.
"""
self._get_alias_s3path(name=name, alias=alias).delete(bsm=bsm)
[docs]
def purge_artifact_versions(
self,
bsm: BotoSesManager,
name: str,
keep_last_n: int = 10,
purge_older_than_secs: int = 90 * 24 * 60 * 60,
) -> T.Tuple[datetime, T.List[Artifact]]:
"""
Automatically clean up old artifact versions based on retention policies.
Removes artifact versions that don't meet the retention criteria, helping
manage storage costs and maintain repository hygiene. The LATEST version
is always preserved regardless of age or count.
A version is kept if it meets ANY of these criteria:
- Is within the last ``keep_last_n`` versions (by creation time)
- Is newer than ``purge_older_than_secs`` seconds
- Is the LATEST version (always preserved)
:param bsm: AWS session manager for S3 operations
:param name: Artifact name to clean up
:param keep_last_n: Minimum number of recent versions to retain
:param purge_older_than_secs: Maximum age in seconds (default: 90 days)
:returns: Tuple of (purge_timestamp, list_of_deleted_artifacts)
Examples:
Conservative cleanup (keep 10 versions, 90 days)::
purge_time, deleted = repo.purge_artifact_versions(bsm, "myapp")
print(f"Deleted {len(deleted)} old versions")
Aggressive cleanup (keep 3 versions, 7 days)::
purge_time, deleted = repo.purge_artifact_versions(
bsm, "myapp",
keep_last_n=3,
purge_older_than_secs=7*24*60*60
)
Dry run analysis::
_, would_delete = repo.purge_artifact_versions(
bsm, "myapp", keep_last_n=5, purge_older_than_secs=30*24*60*60
)
print(f"Would delete: {[v.version for v in would_delete]}")
.. note::
This operation cannot be undone. Consider running with restrictive
parameters first to verify which versions would be deleted.
"""
# Get all versions sorted chronologically (LATEST first, then newest to oldest)
artifact_list = self.list_artifact_versions(bsm=bsm, name=name)
# Record purge operation timestamp for consistent time reference
purge_time = get_utc_now()
# Calculate cutoff datetime: versions older than this will be eligible for deletion
expire = purge_time - timedelta(seconds=purge_older_than_secs)
deleted_artifact_list = list()
# Skip first keep_last_n+1 versions (LATEST + keep_last_n numbered versions)
# This ensures we always keep the minimum required versions
for artifact in artifact_list[keep_last_n + 1 :]:
# Only delete if version is older than the age threshold
# This implements the "AND" logic: must be both outside count AND older than age limit
if artifact.update_datetime < expire:
# Delete the artifact version from S3
self.delete_artifact_version(
bsm=bsm,
name=name,
version=artifact.version,
)
# Track what was deleted for reporting
deleted_artifact_list.append(artifact)
return purge_time, deleted_artifact_list
[docs]
def purge_artifact(
self,
bsm: BotoSesManager,
name: str,
):
"""
Completely remove an artifact and all its associated data.
Permanently deletes all versions, aliases, and metadata for the specified
artifact. This is equivalent to removing the entire artifact directory
from the repository.
:param bsm: AWS session manager for S3 operations
:param name: Artifact name to completely remove
Examples:
Remove obsolete artifact::
# Remove an artifact that's no longer maintained
repo.purge_artifact(bsm, "legacy-service")
Clean up test artifacts::
# Remove temporary test artifacts
for test_name in ["test-app-1", "test-app-2"]:
repo.purge_artifact(bsm, test_name)
.. danger::
This operation is IRREVERSIBLE. All versions, aliases, and metadata
for this artifact will be permanently lost. Ensure no systems depend
on this artifact before deletion.
"""
self._get_artifact_s3dir(name=name).delete(bsm=bsm)
[docs]
def purge_all(self, bsm: BotoSesManager):
"""
Completely destroy the entire artifact repository.
Removes all artifacts, versions, aliases, and metadata from the repository.
This effectively resets the repository to an empty state while preserving
the S3 bucket itself.
:param bsm: AWS session manager for S3 operations
Examples:
Reset repository for testing::
# Clean slate for integration tests
repo.purge_all(bsm)
repo.bootstrap(bsm) # Reinitialize
Decommission repository::
# Remove all data before deleting repository
repo.purge_all(bsm)
.. danger::
This operation is IRREVERSIBLE and will destroy ALL artifacts,
versions, and aliases in the repository. Only use for testing
or when completely decommissioning the repository.
"""
self.s3dir_artifact_store.delete(bsm=bsm)