Module library.archive

Expand source code
import os
from pathlib import Path

from . import (
    aws_access_key_id,
    aws_s3_bucket,
    aws_s3_endpoint,
    aws_secret_access_key,
    base_path,
    pp,
)
from .ingest import Ingestor
from .s3 import S3


class Archive:
    def __init__(self):
        """
        Initialize the Archive class instance
        """
        self.ingestor = Ingestor()
        self.s3 = S3(
            aws_access_key_id, aws_secret_access_key, aws_s3_endpoint, aws_s3_bucket
        )

    def __call__(
        self,
        path: str = None,
        output_format: str = "pgdump",
        push: bool = False,
        clean: bool = False,
        latest: bool = False,
        name: str = None,
        *args,
        **kwargs,
    ):
        """
        The `__call__` method allows a user to call a class instance with parameters.

        Parameters
        ----------
        path: path to the configutation file
        output_format: currently supported formats: `'csv'`, `'geojson'`, `'shapefile'`, `'postgres'`
        push: if `True` then push to s3
        clean: if `True`, the temporary files created under `.library` will be removed
        latest: if `True` then tag this current version we are processing to be the `latest`

        Optional Parameters
        ----------
        name: name of the dataset, if you would like to use templates already included in this package
        compress: if compression is needed, this is passed into the `ingestor`
        inplace: if compressed zip file will replace the original file, this is passed into the `ingestor`
        postgres_url: Please specify if `output_format=='postgres'`
        version: specify version if using a custom version name

        Sample Usage
        ----------
        ### A flat file example:
        ```python
        from library.archive import Archive
        a = Archive()
        a(
            "path/to/config.yml",
            output_format="csv", push=True, clean=True,
            latest=True, compress=True,
        )
        ```
        ### A postgres example:
        > by default, for postgres `push`, `clean`, `latest`, `compress` are all default to `False`
        ```python
        postgres_url='postgresql://user:password@hose/db'
        a(
            "path/to/config.yml",
            output_format="postgres",
            postgres_url=postgres_url,
        )
        ```
        """
        # If name specified, no template path is needed
        assert (
            path or name
        ), "Please specify either name of the dataset or path to the config file"

        _path = f"{Path(__file__).parent}/templates/{name}.yml"
        path = _path if name and os.path.isfile(_path) else path
        name = os.path.basename(path).split(".")[0]

        # Get ingestor by format
        ingestor_of_format = getattr(self.ingestor, output_format)

        # Initiate ingestion
        output_files, version, acl = ingestor_of_format(path, *args, **kwargs)

        # Write to s3
        for _file in output_files:
            if push:
                key = _file.replace(base_path + "/", "")
                self.s3.put(_file, key, acl, metadata={"version": version})
            if push and latest:
                # Upload file to a latest directory, where version metadata is version
                # This allows us to get the version associated with each file in latest
                self.s3.put(
                    _file,
                    key.replace(version, "latest"),
                    acl,
                    metadata={"version": version},
                )

                # Find all files in latest where the version (stored in s3 metadata)
                # does not match the version of the file currently getting added to latest
                keys_in_latest = self.s3.ls(f"datasets/{name}/latest")
                if len(keys_in_latest) > 0:
                    diff_version = [
                        k
                        for k in keys_in_latest
                        if self.s3.info(k)["Metadata"].get("version", "") != version
                    ]

                    # Remove keys from the latest directory that have versions different
                    # from the version of the file currently getting added to latest
                    self.s3.rm(*diff_version)

            if clean:
                os.remove(_file)

Classes

class Archive

Initialize the Archive class instance

Expand source code
class Archive:
    def __init__(self):
        """
        Initialize the Archive class instance
        """
        self.ingestor = Ingestor()
        self.s3 = S3(
            aws_access_key_id, aws_secret_access_key, aws_s3_endpoint, aws_s3_bucket
        )

    def __call__(
        self,
        path: str = None,
        output_format: str = "pgdump",
        push: bool = False,
        clean: bool = False,
        latest: bool = False,
        name: str = None,
        *args,
        **kwargs,
    ):
        """
        The `__call__` method allows a user to call a class instance with parameters.

        Parameters
        ----------
        path: path to the configutation file
        output_format: currently supported formats: `'csv'`, `'geojson'`, `'shapefile'`, `'postgres'`
        push: if `True` then push to s3
        clean: if `True`, the temporary files created under `.library` will be removed
        latest: if `True` then tag this current version we are processing to be the `latest`

        Optional Parameters
        ----------
        name: name of the dataset, if you would like to use templates already included in this package
        compress: if compression is needed, this is passed into the `ingestor`
        inplace: if compressed zip file will replace the original file, this is passed into the `ingestor`
        postgres_url: Please specify if `output_format=='postgres'`
        version: specify version if using a custom version name

        Sample Usage
        ----------
        ### A flat file example:
        ```python
        from library.archive import Archive
        a = Archive()
        a(
            "path/to/config.yml",
            output_format="csv", push=True, clean=True,
            latest=True, compress=True,
        )
        ```
        ### A postgres example:
        > by default, for postgres `push`, `clean`, `latest`, `compress` are all default to `False`
        ```python
        postgres_url='postgresql://user:password@hose/db'
        a(
            "path/to/config.yml",
            output_format="postgres",
            postgres_url=postgres_url,
        )
        ```
        """
        # If name specified, no template path is needed
        assert (
            path or name
        ), "Please specify either name of the dataset or path to the config file"

        _path = f"{Path(__file__).parent}/templates/{name}.yml"
        path = _path if name and os.path.isfile(_path) else path
        name = os.path.basename(path).split(".")[0]

        # Get ingestor by format
        ingestor_of_format = getattr(self.ingestor, output_format)

        # Initiate ingestion
        output_files, version, acl = ingestor_of_format(path, *args, **kwargs)

        # Write to s3
        for _file in output_files:
            if push:
                key = _file.replace(base_path + "/", "")
                self.s3.put(_file, key, acl, metadata={"version": version})
            if push and latest:
                # Upload file to a latest directory, where version metadata is version
                # This allows us to get the version associated with each file in latest
                self.s3.put(
                    _file,
                    key.replace(version, "latest"),
                    acl,
                    metadata={"version": version},
                )

                # Find all files in latest where the version (stored in s3 metadata)
                # does not match the version of the file currently getting added to latest
                keys_in_latest = self.s3.ls(f"datasets/{name}/latest")
                if len(keys_in_latest) > 0:
                    diff_version = [
                        k
                        for k in keys_in_latest
                        if self.s3.info(k)["Metadata"].get("version", "") != version
                    ]

                    # Remove keys from the latest directory that have versions different
                    # from the version of the file currently getting added to latest
                    self.s3.rm(*diff_version)

            if clean:
                os.remove(_file)