Module library.ingest

Expand source code
import os
import sys
import zipfile
from functools import wraps
from math import floor

from osgeo import gdal
from rich.progress import (
    BarColumn,
    Progress,
    SpinnerColumn,
    TextColumn,
    TimeElapsedColumn,
    TimeRemainingColumn,
)

from . import base_path
from .config import Config
from .sources import generic_source, postgres_source


class Ingestor:
    def __init__(self):
        self.base_path = base_path

    def compress(self, path: str, *files, inplace: bool = True):
        with zipfile.ZipFile(
            path, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9
        ) as _zip:
            for f in files:
                if os.path.isfile(f):
                    _zip.write(f, os.path.basename(f))
                    if inplace:
                        os.remove(f)
                else:
                    print(f"{f} does not exist!")
        return True

    def write_config(self, path: str, config: str):
        with open(path, "w") as f:
            f.write(config)

    def translator(func):
        @wraps(func)
        def wrapper(self, *args, **kwargs) -> list:
            output_files = []
            path = args[0]
            c = Config(path, kwargs.get("version", None))
            dataset, source, destination, _ = c.compute_parsed
            name = dataset["name"]
            version = dataset["version"]
            acl = dataset["acl"]
            (dstDS, output_format, output_suffix, compress, inplace) = func(
                self, *args, **kwargs
            )
            # initiate source and destination datasets
            folder_path = f"{self.base_path}/datasets/{name}/{version}"

            if output_suffix:
                destination_path = f"{folder_path}/{name}.{output_suffix}"
                output_files.append(destination_path)
            else:
                destination_path = None

            # Default dstDS is destination_path if no dstDS is specificed
            dstDS = destination_path if not dstDS else dstDS
            srcDS = generic_source(
                path=source["url"]["gdalpath"],
                options=source["options"],
                fields=destination["fields"],
            )

            layerName = srcDS.GetLayer(0).GetName()
            sql = destination.get("sql", None)
            sql = None if not sql else sql.replace("@filename", layerName)

            # Create output folder and output config
            if folder_path and output_suffix:
                os.makedirs(folder_path, exist_ok=True)
                self.write_config(f"{folder_path}/config.json", c.compute_json)
                self.write_config(f"{folder_path}/config.yml", c.compute_yml)
                output_files.append(f"{folder_path}/config.json")
                output_files.append(f"{folder_path}/config.yml")

            # Initiate vector translate
            with Progress(
                SpinnerColumn(spinner_name="earth"),
                TextColumn("[progress.description]{task.description}"),
                BarColumn(bar_width=30),
                TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
                TimeRemainingColumn(),
                transient=True,
            ) as progress:
                task = progress.add_task(
                    f"[green]Ingesting [bold]{destination['name']}[/bold]", total=1000
                )

                def update_progress(complete, message, unknown):
                    progress.update(task, completed=floor(complete * 1000))

                gdal.VectorTranslate(
                    dstDS,
                    srcDS,
                    format=output_format,
                    layerCreationOptions=destination["options"],
                    dstSRS=destination["geometry"]["SRS"],
                    srcSRS=source["geometry"]["SRS"],
                    geometryType=destination["geometry"]["type"],
                    layerName=destination["name"],
                    accessMode="overwrite",
                    makeValid=True,
                    # optional settings
                    SQLStatement=sql,
                    SQLDialect="sqlite",
                    callback=update_progress,
                )

            # Compression if needed
            if compress and destination_path:
                if output_format == "ESRI Shapefile":
                    files = [
                        f"{destination_path[:-4]}.{suffix}"
                        for suffix in ["shp", "prj", "shx", "dbf"]
                    ]
                    self.compress(f"{destination_path}.zip", *files, inplace=True)
                    output_files.remove(destination_path)
                    output_files.append(f"{destination_path}.zip")
                else:
                    self.compress(
                        f"{destination_path}.zip", destination_path, inplace=inplace
                    )
                    if inplace:
                        output_files.remove(destination_path)
                    output_files.append(f"{destination_path}.zip")
            return output_files, version, acl

        return wrapper

    @translator
    def postgres(
        self,
        path: str,
        compress: bool = False,
        inplace: bool = False,
        postgres_url: str = None,
        *args,
        **kwargs,
    ):
        """
        https://gdal.org/drivers/vector/pg.html

        This function will take in a configuration then send to a
        postgres database
        path: path of the configuration
        postgres_url: connection string for the destination database
        compress: default to False because no files created when output to "PostgreSQL"
        inplace: default to False because no compress = False by default
        """
        dstDS = postgres_source(postgres_url)
        return dstDS, "PostgreSQL", None, compress, inplace

    @translator
    def csv(
        self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs
    ):
        """
        https://gdal.org/drivers/vector/csv.html

        path: path of the configuration file
        compress: True if compression is needed
        inplace: True if the compressed file will replace the original output
        """
        return None, "CSV", "csv", compress, inplace

    @translator
    def pgdump(
        self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs
    ):
        """
        https://gdal.org/drivers/vector/pgdump.html

        path: path of the configuration file
        compress: True if compression is needed
        inplace: True if the compressed file will replace the original output
        """
        return None, "PGDump", "sql", compress, inplace

    @translator
    def shapefile(
        self, path: str, compress: bool = True, inplace: bool = True, *args, **kwargs
    ):
        """
        https://gdal.org/drivers/vector/shapefile.html

        path: path of the configuration file
        compress: default to True so that [shp, shx, dbf, prj] are bundled
        inplace: default to True for ease of transport
        """
        return None, "ESRI Shapefile", "shp", compress, inplace

    @translator
    def geojson(
        self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs
    ):
        """
        https://gdal.org/drivers/vector/geojson.html

        path: path of the configuration file
        compress: True if compression is needed
        inplace: True if the compressed file will replace the original output
        """
        return None, "GeoJSON", "geojson", compress, inplace

Classes

class Ingestor
Expand source code
class Ingestor:
    def __init__(self):
        self.base_path = base_path

    def compress(self, path: str, *files, inplace: bool = True):
        with zipfile.ZipFile(
            path, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9
        ) as _zip:
            for f in files:
                if os.path.isfile(f):
                    _zip.write(f, os.path.basename(f))
                    if inplace:
                        os.remove(f)
                else:
                    print(f"{f} does not exist!")
        return True

    def write_config(self, path: str, config: str):
        with open(path, "w") as f:
            f.write(config)

    def translator(func):
        @wraps(func)
        def wrapper(self, *args, **kwargs) -> list:
            output_files = []
            path = args[0]
            c = Config(path, kwargs.get("version", None))
            dataset, source, destination, _ = c.compute_parsed
            name = dataset["name"]
            version = dataset["version"]
            acl = dataset["acl"]
            (dstDS, output_format, output_suffix, compress, inplace) = func(
                self, *args, **kwargs
            )
            # initiate source and destination datasets
            folder_path = f"{self.base_path}/datasets/{name}/{version}"

            if output_suffix:
                destination_path = f"{folder_path}/{name}.{output_suffix}"
                output_files.append(destination_path)
            else:
                destination_path = None

            # Default dstDS is destination_path if no dstDS is specificed
            dstDS = destination_path if not dstDS else dstDS
            srcDS = generic_source(
                path=source["url"]["gdalpath"],
                options=source["options"],
                fields=destination["fields"],
            )

            layerName = srcDS.GetLayer(0).GetName()
            sql = destination.get("sql", None)
            sql = None if not sql else sql.replace("@filename", layerName)

            # Create output folder and output config
            if folder_path and output_suffix:
                os.makedirs(folder_path, exist_ok=True)
                self.write_config(f"{folder_path}/config.json", c.compute_json)
                self.write_config(f"{folder_path}/config.yml", c.compute_yml)
                output_files.append(f"{folder_path}/config.json")
                output_files.append(f"{folder_path}/config.yml")

            # Initiate vector translate
            with Progress(
                SpinnerColumn(spinner_name="earth"),
                TextColumn("[progress.description]{task.description}"),
                BarColumn(bar_width=30),
                TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
                TimeRemainingColumn(),
                transient=True,
            ) as progress:
                task = progress.add_task(
                    f"[green]Ingesting [bold]{destination['name']}[/bold]", total=1000
                )

                def update_progress(complete, message, unknown):
                    progress.update(task, completed=floor(complete * 1000))

                gdal.VectorTranslate(
                    dstDS,
                    srcDS,
                    format=output_format,
                    layerCreationOptions=destination["options"],
                    dstSRS=destination["geometry"]["SRS"],
                    srcSRS=source["geometry"]["SRS"],
                    geometryType=destination["geometry"]["type"],
                    layerName=destination["name"],
                    accessMode="overwrite",
                    makeValid=True,
                    # optional settings
                    SQLStatement=sql,
                    SQLDialect="sqlite",
                    callback=update_progress,
                )

            # Compression if needed
            if compress and destination_path:
                if output_format == "ESRI Shapefile":
                    files = [
                        f"{destination_path[:-4]}.{suffix}"
                        for suffix in ["shp", "prj", "shx", "dbf"]
                    ]
                    self.compress(f"{destination_path}.zip", *files, inplace=True)
                    output_files.remove(destination_path)
                    output_files.append(f"{destination_path}.zip")
                else:
                    self.compress(
                        f"{destination_path}.zip", destination_path, inplace=inplace
                    )
                    if inplace:
                        output_files.remove(destination_path)
                    output_files.append(f"{destination_path}.zip")
            return output_files, version, acl

        return wrapper

    @translator
    def postgres(
        self,
        path: str,
        compress: bool = False,
        inplace: bool = False,
        postgres_url: str = None,
        *args,
        **kwargs,
    ):
        """
        https://gdal.org/drivers/vector/pg.html

        This function will take in a configuration then send to a
        postgres database
        path: path of the configuration
        postgres_url: connection string for the destination database
        compress: default to False because no files created when output to "PostgreSQL"
        inplace: default to False because no compress = False by default
        """
        dstDS = postgres_source(postgres_url)
        return dstDS, "PostgreSQL", None, compress, inplace

    @translator
    def csv(
        self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs
    ):
        """
        https://gdal.org/drivers/vector/csv.html

        path: path of the configuration file
        compress: True if compression is needed
        inplace: True if the compressed file will replace the original output
        """
        return None, "CSV", "csv", compress, inplace

    @translator
    def pgdump(
        self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs
    ):
        """
        https://gdal.org/drivers/vector/pgdump.html

        path: path of the configuration file
        compress: True if compression is needed
        inplace: True if the compressed file will replace the original output
        """
        return None, "PGDump", "sql", compress, inplace

    @translator
    def shapefile(
        self, path: str, compress: bool = True, inplace: bool = True, *args, **kwargs
    ):
        """
        https://gdal.org/drivers/vector/shapefile.html

        path: path of the configuration file
        compress: default to True so that [shp, shx, dbf, prj] are bundled
        inplace: default to True for ease of transport
        """
        return None, "ESRI Shapefile", "shp", compress, inplace

    @translator
    def geojson(
        self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs
    ):
        """
        https://gdal.org/drivers/vector/geojson.html

        path: path of the configuration file
        compress: True if compression is needed
        inplace: True if the compressed file will replace the original output
        """
        return None, "GeoJSON", "geojson", compress, inplace

Methods

def compress(self, path: str, *files, inplace: bool = True)
Expand source code
def compress(self, path: str, *files, inplace: bool = True):
    with zipfile.ZipFile(
        path, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9
    ) as _zip:
        for f in files:
            if os.path.isfile(f):
                _zip.write(f, os.path.basename(f))
                if inplace:
                    os.remove(f)
            else:
                print(f"{f} does not exist!")
    return True
def csv(self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs)

https://gdal.org/drivers/vector/csv.html

path: path of the configuration file compress: True if compression is needed inplace: True if the compressed file will replace the original output

Expand source code
@translator
def csv(
    self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs
):
    """
    https://gdal.org/drivers/vector/csv.html

    path: path of the configuration file
    compress: True if compression is needed
    inplace: True if the compressed file will replace the original output
    """
    return None, "CSV", "csv", compress, inplace
def geojson(self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs)

https://gdal.org/drivers/vector/geojson.html

path: path of the configuration file compress: True if compression is needed inplace: True if the compressed file will replace the original output

Expand source code
@translator
def geojson(
    self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs
):
    """
    https://gdal.org/drivers/vector/geojson.html

    path: path of the configuration file
    compress: True if compression is needed
    inplace: True if the compressed file will replace the original output
    """
    return None, "GeoJSON", "geojson", compress, inplace
def pgdump(self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs)

https://gdal.org/drivers/vector/pgdump.html

path: path of the configuration file compress: True if compression is needed inplace: True if the compressed file will replace the original output

Expand source code
@translator
def pgdump(
    self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs
):
    """
    https://gdal.org/drivers/vector/pgdump.html

    path: path of the configuration file
    compress: True if compression is needed
    inplace: True if the compressed file will replace the original output
    """
    return None, "PGDump", "sql", compress, inplace
def postgres(self, path: str, compress: bool = False, inplace: bool = False, postgres_url: str = None, *args, **kwargs)

https://gdal.org/drivers/vector/pg.html

This function will take in a configuration then send to a postgres database path: path of the configuration postgres_url: connection string for the destination database compress: default to False because no files created when output to "PostgreSQL" inplace: default to False because no compress = False by default

Expand source code
@translator
def postgres(
    self,
    path: str,
    compress: bool = False,
    inplace: bool = False,
    postgres_url: str = None,
    *args,
    **kwargs,
):
    """
    https://gdal.org/drivers/vector/pg.html

    This function will take in a configuration then send to a
    postgres database
    path: path of the configuration
    postgres_url: connection string for the destination database
    compress: default to False because no files created when output to "PostgreSQL"
    inplace: default to False because no compress = False by default
    """
    dstDS = postgres_source(postgres_url)
    return dstDS, "PostgreSQL", None, compress, inplace
def shapefile(self, path: str, compress: bool = True, inplace: bool = True, *args, **kwargs)

https://gdal.org/drivers/vector/shapefile.html

path: path of the configuration file compress: default to True so that [shp, shx, dbf, prj] are bundled inplace: default to True for ease of transport

Expand source code
@translator
def shapefile(
    self, path: str, compress: bool = True, inplace: bool = True, *args, **kwargs
):
    """
    https://gdal.org/drivers/vector/shapefile.html

    path: path of the configuration file
    compress: default to True so that [shp, shx, dbf, prj] are bundled
    inplace: default to True for ease of transport
    """
    return None, "ESRI Shapefile", "shp", compress, inplace
def translator(func)
Expand source code
def translator(func):
    @wraps(func)
    def wrapper(self, *args, **kwargs) -> list:
        output_files = []
        path = args[0]
        c = Config(path, kwargs.get("version", None))
        dataset, source, destination, _ = c.compute_parsed
        name = dataset["name"]
        version = dataset["version"]
        acl = dataset["acl"]
        (dstDS, output_format, output_suffix, compress, inplace) = func(
            self, *args, **kwargs
        )
        # initiate source and destination datasets
        folder_path = f"{self.base_path}/datasets/{name}/{version}"

        if output_suffix:
            destination_path = f"{folder_path}/{name}.{output_suffix}"
            output_files.append(destination_path)
        else:
            destination_path = None

        # Default dstDS is destination_path if no dstDS is specificed
        dstDS = destination_path if not dstDS else dstDS
        srcDS = generic_source(
            path=source["url"]["gdalpath"],
            options=source["options"],
            fields=destination["fields"],
        )

        layerName = srcDS.GetLayer(0).GetName()
        sql = destination.get("sql", None)
        sql = None if not sql else sql.replace("@filename", layerName)

        # Create output folder and output config
        if folder_path and output_suffix:
            os.makedirs(folder_path, exist_ok=True)
            self.write_config(f"{folder_path}/config.json", c.compute_json)
            self.write_config(f"{folder_path}/config.yml", c.compute_yml)
            output_files.append(f"{folder_path}/config.json")
            output_files.append(f"{folder_path}/config.yml")

        # Initiate vector translate
        with Progress(
            SpinnerColumn(spinner_name="earth"),
            TextColumn("[progress.description]{task.description}"),
            BarColumn(bar_width=30),
            TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
            TimeRemainingColumn(),
            transient=True,
        ) as progress:
            task = progress.add_task(
                f"[green]Ingesting [bold]{destination['name']}[/bold]", total=1000
            )

            def update_progress(complete, message, unknown):
                progress.update(task, completed=floor(complete * 1000))

            gdal.VectorTranslate(
                dstDS,
                srcDS,
                format=output_format,
                layerCreationOptions=destination["options"],
                dstSRS=destination["geometry"]["SRS"],
                srcSRS=source["geometry"]["SRS"],
                geometryType=destination["geometry"]["type"],
                layerName=destination["name"],
                accessMode="overwrite",
                makeValid=True,
                # optional settings
                SQLStatement=sql,
                SQLDialect="sqlite",
                callback=update_progress,
            )

        # Compression if needed
        if compress and destination_path:
            if output_format == "ESRI Shapefile":
                files = [
                    f"{destination_path[:-4]}.{suffix}"
                    for suffix in ["shp", "prj", "shx", "dbf"]
                ]
                self.compress(f"{destination_path}.zip", *files, inplace=True)
                output_files.remove(destination_path)
                output_files.append(f"{destination_path}.zip")
            else:
                self.compress(
                    f"{destination_path}.zip", destination_path, inplace=inplace
                )
                if inplace:
                    output_files.remove(destination_path)
                output_files.append(f"{destination_path}.zip")
        return output_files, version, acl

    return wrapper
def write_config(self, path: str, config: str)
Expand source code
def write_config(self, path: str, config: str):
    with open(path, "w") as f:
        f.write(config)