Module library.ingest
Expand source code
import os
import sys
import zipfile
from functools import wraps
from math import floor
from osgeo import gdal
from rich.progress import (
    BarColumn,
    Progress,
    SpinnerColumn,
    TextColumn,
    TimeElapsedColumn,
    TimeRemainingColumn,
)
from . import base_path
from .config import Config
from .sources import generic_source, postgres_source
class Ingestor:
    def __init__(self):
        self.base_path = base_path
    def compress(self, path: str, *files, inplace: bool = True):
        with zipfile.ZipFile(
            path, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9
        ) as _zip:
            for f in files:
                if os.path.isfile(f):
                    _zip.write(f, os.path.basename(f))
                    if inplace:
                        os.remove(f)
                else:
                    print(f"{f} does not exist!")
        return True
    def write_config(self, path: str, config: str):
        with open(path, "w") as f:
            f.write(config)
    def translator(func):
        @wraps(func)
        def wrapper(self, *args, **kwargs) -> list:
            output_files = []
            path = args[0]
            c = Config(path, kwargs.get("version", None))
            dataset, source, destination, _ = c.compute_parsed
            name = dataset["name"]
            version = dataset["version"]
            acl = dataset["acl"]
            (dstDS, output_format, output_suffix, compress, inplace) = func(
                self, *args, **kwargs
            )
            # initiate source and destination datasets
            folder_path = f"{self.base_path}/datasets/{name}/{version}"
            if output_suffix:
                destination_path = f"{folder_path}/{name}.{output_suffix}"
                output_files.append(destination_path)
            else:
                destination_path = None
            # Default dstDS is destination_path if no dstDS is specificed
            dstDS = destination_path if not dstDS else dstDS
            srcDS = generic_source(
                path=source["url"]["gdalpath"],
                options=source["options"],
                fields=destination["fields"],
            )
            layerName = srcDS.GetLayer(0).GetName()
            sql = destination.get("sql", None)
            sql = None if not sql else sql.replace("@filename", layerName)
            # Create output folder and output config
            if folder_path and output_suffix:
                os.makedirs(folder_path, exist_ok=True)
                self.write_config(f"{folder_path}/config.json", c.compute_json)
                self.write_config(f"{folder_path}/config.yml", c.compute_yml)
                output_files.append(f"{folder_path}/config.json")
                output_files.append(f"{folder_path}/config.yml")
            # Initiate vector translate
            with Progress(
                SpinnerColumn(spinner_name="earth"),
                TextColumn("[progress.description]{task.description}"),
                BarColumn(bar_width=30),
                TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
                TimeRemainingColumn(),
                transient=True,
            ) as progress:
                task = progress.add_task(
                    f"[green]Ingesting [bold]{destination['name']}[/bold]", total=1000
                )
                def update_progress(complete, message, unknown):
                    progress.update(task, completed=floor(complete * 1000))
                gdal.VectorTranslate(
                    dstDS,
                    srcDS,
                    format=output_format,
                    layerCreationOptions=destination["options"],
                    dstSRS=destination["geometry"]["SRS"],
                    srcSRS=source["geometry"]["SRS"],
                    geometryType=destination["geometry"]["type"],
                    layerName=destination["name"],
                    accessMode="overwrite",
                    makeValid=True,
                    # optional settings
                    SQLStatement=sql,
                    SQLDialect="sqlite",
                    callback=update_progress,
                )
            # Compression if needed
            if compress and destination_path:
                if output_format == "ESRI Shapefile":
                    files = [
                        f"{destination_path[:-4]}.{suffix}"
                        for suffix in ["shp", "prj", "shx", "dbf"]
                    ]
                    self.compress(f"{destination_path}.zip", *files, inplace=True)
                    output_files.remove(destination_path)
                    output_files.append(f"{destination_path}.zip")
                else:
                    self.compress(
                        f"{destination_path}.zip", destination_path, inplace=inplace
                    )
                    if inplace:
                        output_files.remove(destination_path)
                    output_files.append(f"{destination_path}.zip")
            return output_files, version, acl
        return wrapper
    @translator
    def postgres(
        self,
        path: str,
        compress: bool = False,
        inplace: bool = False,
        postgres_url: str = None,
        *args,
        **kwargs,
    ):
        """
        https://gdal.org/drivers/vector/pg.html
        This function will take in a configuration then send to a
        postgres database
        path: path of the configuration
        postgres_url: connection string for the destination database
        compress: default to False because no files created when output to "PostgreSQL"
        inplace: default to False because no compress = False by default
        """
        dstDS = postgres_source(postgres_url)
        return dstDS, "PostgreSQL", None, compress, inplace
    @translator
    def csv(
        self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs
    ):
        """
        https://gdal.org/drivers/vector/csv.html
        path: path of the configuration file
        compress: True if compression is needed
        inplace: True if the compressed file will replace the original output
        """
        return None, "CSV", "csv", compress, inplace
    @translator
    def pgdump(
        self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs
    ):
        """
        https://gdal.org/drivers/vector/pgdump.html
        path: path of the configuration file
        compress: True if compression is needed
        inplace: True if the compressed file will replace the original output
        """
        return None, "PGDump", "sql", compress, inplace
    @translator
    def shapefile(
        self, path: str, compress: bool = True, inplace: bool = True, *args, **kwargs
    ):
        """
        https://gdal.org/drivers/vector/shapefile.html
        path: path of the configuration file
        compress: default to True so that [shp, shx, dbf, prj] are bundled
        inplace: default to True for ease of transport
        """
        return None, "ESRI Shapefile", "shp", compress, inplace
    @translator
    def geojson(
        self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs
    ):
        """
        https://gdal.org/drivers/vector/geojson.html
        path: path of the configuration file
        compress: True if compression is needed
        inplace: True if the compressed file will replace the original output
        """
        return None, "GeoJSON", "geojson", compress, inplaceClasses
- class Ingestor
- 
Expand source codeclass Ingestor: def __init__(self): self.base_path = base_path def compress(self, path: str, *files, inplace: bool = True): with zipfile.ZipFile( path, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9 ) as _zip: for f in files: if os.path.isfile(f): _zip.write(f, os.path.basename(f)) if inplace: os.remove(f) else: print(f"{f} does not exist!") return True def write_config(self, path: str, config: str): with open(path, "w") as f: f.write(config) def translator(func): @wraps(func) def wrapper(self, *args, **kwargs) -> list: output_files = [] path = args[0] c = Config(path, kwargs.get("version", None)) dataset, source, destination, _ = c.compute_parsed name = dataset["name"] version = dataset["version"] acl = dataset["acl"] (dstDS, output_format, output_suffix, compress, inplace) = func( self, *args, **kwargs ) # initiate source and destination datasets folder_path = f"{self.base_path}/datasets/{name}/{version}" if output_suffix: destination_path = f"{folder_path}/{name}.{output_suffix}" output_files.append(destination_path) else: destination_path = None # Default dstDS is destination_path if no dstDS is specificed dstDS = destination_path if not dstDS else dstDS srcDS = generic_source( path=source["url"]["gdalpath"], options=source["options"], fields=destination["fields"], ) layerName = srcDS.GetLayer(0).GetName() sql = destination.get("sql", None) sql = None if not sql else sql.replace("@filename", layerName) # Create output folder and output config if folder_path and output_suffix: os.makedirs(folder_path, exist_ok=True) self.write_config(f"{folder_path}/config.json", c.compute_json) self.write_config(f"{folder_path}/config.yml", c.compute_yml) output_files.append(f"{folder_path}/config.json") output_files.append(f"{folder_path}/config.yml") # Initiate vector translate with Progress( SpinnerColumn(spinner_name="earth"), TextColumn("[progress.description]{task.description}"), BarColumn(bar_width=30), TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), TimeRemainingColumn(), transient=True, ) as progress: task = progress.add_task( f"[green]Ingesting [bold]{destination['name']}[/bold]", total=1000 ) def update_progress(complete, message, unknown): progress.update(task, completed=floor(complete * 1000)) gdal.VectorTranslate( dstDS, srcDS, format=output_format, layerCreationOptions=destination["options"], dstSRS=destination["geometry"]["SRS"], srcSRS=source["geometry"]["SRS"], geometryType=destination["geometry"]["type"], layerName=destination["name"], accessMode="overwrite", makeValid=True, # optional settings SQLStatement=sql, SQLDialect="sqlite", callback=update_progress, ) # Compression if needed if compress and destination_path: if output_format == "ESRI Shapefile": files = [ f"{destination_path[:-4]}.{suffix}" for suffix in ["shp", "prj", "shx", "dbf"] ] self.compress(f"{destination_path}.zip", *files, inplace=True) output_files.remove(destination_path) output_files.append(f"{destination_path}.zip") else: self.compress( f"{destination_path}.zip", destination_path, inplace=inplace ) if inplace: output_files.remove(destination_path) output_files.append(f"{destination_path}.zip") return output_files, version, acl return wrapper @translator def postgres( self, path: str, compress: bool = False, inplace: bool = False, postgres_url: str = None, *args, **kwargs, ): """ https://gdal.org/drivers/vector/pg.html This function will take in a configuration then send to a postgres database path: path of the configuration postgres_url: connection string for the destination database compress: default to False because no files created when output to "PostgreSQL" inplace: default to False because no compress = False by default """ dstDS = postgres_source(postgres_url) return dstDS, "PostgreSQL", None, compress, inplace @translator def csv( self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs ): """ https://gdal.org/drivers/vector/csv.html path: path of the configuration file compress: True if compression is needed inplace: True if the compressed file will replace the original output """ return None, "CSV", "csv", compress, inplace @translator def pgdump( self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs ): """ https://gdal.org/drivers/vector/pgdump.html path: path of the configuration file compress: True if compression is needed inplace: True if the compressed file will replace the original output """ return None, "PGDump", "sql", compress, inplace @translator def shapefile( self, path: str, compress: bool = True, inplace: bool = True, *args, **kwargs ): """ https://gdal.org/drivers/vector/shapefile.html path: path of the configuration file compress: default to True so that [shp, shx, dbf, prj] are bundled inplace: default to True for ease of transport """ return None, "ESRI Shapefile", "shp", compress, inplace @translator def geojson( self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs ): """ https://gdal.org/drivers/vector/geojson.html path: path of the configuration file compress: True if compression is needed inplace: True if the compressed file will replace the original output """ return None, "GeoJSON", "geojson", compress, inplaceMethods- def compress(self, path: str, *files, inplace: bool = True)
- 
Expand source codedef compress(self, path: str, *files, inplace: bool = True): with zipfile.ZipFile( path, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9 ) as _zip: for f in files: if os.path.isfile(f): _zip.write(f, os.path.basename(f)) if inplace: os.remove(f) else: print(f"{f} does not exist!") return True
- def csv(self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs)
- 
https://gdal.org/drivers/vector/csv.html path: path of the configuration file compress: True if compression is needed inplace: True if the compressed file will replace the original output Expand source code@translator def csv( self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs ): """ https://gdal.org/drivers/vector/csv.html path: path of the configuration file compress: True if compression is needed inplace: True if the compressed file will replace the original output """ return None, "CSV", "csv", compress, inplace
- def geojson(self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs)
- 
https://gdal.org/drivers/vector/geojson.html path: path of the configuration file compress: True if compression is needed inplace: True if the compressed file will replace the original output Expand source code@translator def geojson( self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs ): """ https://gdal.org/drivers/vector/geojson.html path: path of the configuration file compress: True if compression is needed inplace: True if the compressed file will replace the original output """ return None, "GeoJSON", "geojson", compress, inplace
- def pgdump(self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs)
- 
https://gdal.org/drivers/vector/pgdump.html path: path of the configuration file compress: True if compression is needed inplace: True if the compressed file will replace the original output Expand source code@translator def pgdump( self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs ): """ https://gdal.org/drivers/vector/pgdump.html path: path of the configuration file compress: True if compression is needed inplace: True if the compressed file will replace the original output """ return None, "PGDump", "sql", compress, inplace
- def postgres(self, path: str, compress: bool = False, inplace: bool = False, postgres_url: str = None, *args, **kwargs)
- 
https://gdal.org/drivers/vector/pg.html This function will take in a configuration then send to a postgres database path: path of the configuration postgres_url: connection string for the destination database compress: default to False because no files created when output to "PostgreSQL" inplace: default to False because no compress = False by default Expand source code@translator def postgres( self, path: str, compress: bool = False, inplace: bool = False, postgres_url: str = None, *args, **kwargs, ): """ https://gdal.org/drivers/vector/pg.html This function will take in a configuration then send to a postgres database path: path of the configuration postgres_url: connection string for the destination database compress: default to False because no files created when output to "PostgreSQL" inplace: default to False because no compress = False by default """ dstDS = postgres_source(postgres_url) return dstDS, "PostgreSQL", None, compress, inplace
- def shapefile(self, path: str, compress: bool = True, inplace: bool = True, *args, **kwargs)
- 
https://gdal.org/drivers/vector/shapefile.html path: path of the configuration file compress: default to True so that [shp, shx, dbf, prj] are bundled inplace: default to True for ease of transport Expand source code@translator def shapefile( self, path: str, compress: bool = True, inplace: bool = True, *args, **kwargs ): """ https://gdal.org/drivers/vector/shapefile.html path: path of the configuration file compress: default to True so that [shp, shx, dbf, prj] are bundled inplace: default to True for ease of transport """ return None, "ESRI Shapefile", "shp", compress, inplace
- def translator(func)
- 
Expand source codedef translator(func): @wraps(func) def wrapper(self, *args, **kwargs) -> list: output_files = [] path = args[0] c = Config(path, kwargs.get("version", None)) dataset, source, destination, _ = c.compute_parsed name = dataset["name"] version = dataset["version"] acl = dataset["acl"] (dstDS, output_format, output_suffix, compress, inplace) = func( self, *args, **kwargs ) # initiate source and destination datasets folder_path = f"{self.base_path}/datasets/{name}/{version}" if output_suffix: destination_path = f"{folder_path}/{name}.{output_suffix}" output_files.append(destination_path) else: destination_path = None # Default dstDS is destination_path if no dstDS is specificed dstDS = destination_path if not dstDS else dstDS srcDS = generic_source( path=source["url"]["gdalpath"], options=source["options"], fields=destination["fields"], ) layerName = srcDS.GetLayer(0).GetName() sql = destination.get("sql", None) sql = None if not sql else sql.replace("@filename", layerName) # Create output folder and output config if folder_path and output_suffix: os.makedirs(folder_path, exist_ok=True) self.write_config(f"{folder_path}/config.json", c.compute_json) self.write_config(f"{folder_path}/config.yml", c.compute_yml) output_files.append(f"{folder_path}/config.json") output_files.append(f"{folder_path}/config.yml") # Initiate vector translate with Progress( SpinnerColumn(spinner_name="earth"), TextColumn("[progress.description]{task.description}"), BarColumn(bar_width=30), TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), TimeRemainingColumn(), transient=True, ) as progress: task = progress.add_task( f"[green]Ingesting [bold]{destination['name']}[/bold]", total=1000 ) def update_progress(complete, message, unknown): progress.update(task, completed=floor(complete * 1000)) gdal.VectorTranslate( dstDS, srcDS, format=output_format, layerCreationOptions=destination["options"], dstSRS=destination["geometry"]["SRS"], srcSRS=source["geometry"]["SRS"], geometryType=destination["geometry"]["type"], layerName=destination["name"], accessMode="overwrite", makeValid=True, # optional settings SQLStatement=sql, SQLDialect="sqlite", callback=update_progress, ) # Compression if needed if compress and destination_path: if output_format == "ESRI Shapefile": files = [ f"{destination_path[:-4]}.{suffix}" for suffix in ["shp", "prj", "shx", "dbf"] ] self.compress(f"{destination_path}.zip", *files, inplace=True) output_files.remove(destination_path) output_files.append(f"{destination_path}.zip") else: self.compress( f"{destination_path}.zip", destination_path, inplace=inplace ) if inplace: output_files.remove(destination_path) output_files.append(f"{destination_path}.zip") return output_files, version, acl return wrapper
- def write_config(self, path: str, config: str)
- 
Expand source codedef write_config(self, path: str, config: str): with open(path, "w") as f: f.write(config)