Module library.ingest
Expand source code
import os
import sys
import zipfile
from functools import wraps
from math import floor
from osgeo import gdal
from rich.progress import (
BarColumn,
Progress,
SpinnerColumn,
TextColumn,
TimeElapsedColumn,
TimeRemainingColumn,
)
from . import base_path
from .config import Config
from .sources import generic_source, postgres_source
class Ingestor:
def __init__(self):
self.base_path = base_path
def compress(self, path: str, *files, inplace: bool = True):
with zipfile.ZipFile(
path, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9
) as _zip:
for f in files:
if os.path.isfile(f):
_zip.write(f, os.path.basename(f))
if inplace:
os.remove(f)
else:
print(f"{f} does not exist!")
return True
def write_config(self, path: str, config: str):
with open(path, "w") as f:
f.write(config)
def translator(func):
@wraps(func)
def wrapper(self, *args, **kwargs) -> list:
output_files = []
path = args[0]
c = Config(path, kwargs.get("version", None))
dataset, source, destination, _ = c.compute_parsed
name = dataset["name"]
version = dataset["version"]
acl = dataset["acl"]
(dstDS, output_format, output_suffix, compress, inplace) = func(
self, *args, **kwargs
)
# initiate source and destination datasets
folder_path = f"{self.base_path}/datasets/{name}/{version}"
if output_suffix:
destination_path = f"{folder_path}/{name}.{output_suffix}"
output_files.append(destination_path)
else:
destination_path = None
# Default dstDS is destination_path if no dstDS is specificed
dstDS = destination_path if not dstDS else dstDS
srcDS = generic_source(
path=source["url"]["gdalpath"],
options=source["options"],
fields=destination["fields"],
)
layerName = srcDS.GetLayer(0).GetName()
sql = destination.get("sql", None)
sql = None if not sql else sql.replace("@filename", layerName)
# Create output folder and output config
if folder_path and output_suffix:
os.makedirs(folder_path, exist_ok=True)
self.write_config(f"{folder_path}/config.json", c.compute_json)
self.write_config(f"{folder_path}/config.yml", c.compute_yml)
output_files.append(f"{folder_path}/config.json")
output_files.append(f"{folder_path}/config.yml")
# Initiate vector translate
with Progress(
SpinnerColumn(spinner_name="earth"),
TextColumn("[progress.description]{task.description}"),
BarColumn(bar_width=30),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeRemainingColumn(),
transient=True,
) as progress:
task = progress.add_task(
f"[green]Ingesting [bold]{destination['name']}[/bold]", total=1000
)
def update_progress(complete, message, unknown):
progress.update(task, completed=floor(complete * 1000))
gdal.VectorTranslate(
dstDS,
srcDS,
format=output_format,
layerCreationOptions=destination["options"],
dstSRS=destination["geometry"]["SRS"],
srcSRS=source["geometry"]["SRS"],
geometryType=destination["geometry"]["type"],
layerName=destination["name"],
accessMode="overwrite",
makeValid=True,
# optional settings
SQLStatement=sql,
SQLDialect="sqlite",
callback=update_progress,
)
# Compression if needed
if compress and destination_path:
if output_format == "ESRI Shapefile":
files = [
f"{destination_path[:-4]}.{suffix}"
for suffix in ["shp", "prj", "shx", "dbf"]
]
self.compress(f"{destination_path}.zip", *files, inplace=True)
output_files.remove(destination_path)
output_files.append(f"{destination_path}.zip")
else:
self.compress(
f"{destination_path}.zip", destination_path, inplace=inplace
)
if inplace:
output_files.remove(destination_path)
output_files.append(f"{destination_path}.zip")
return output_files, version, acl
return wrapper
@translator
def postgres(
self,
path: str,
compress: bool = False,
inplace: bool = False,
postgres_url: str = None,
*args,
**kwargs,
):
"""
https://gdal.org/drivers/vector/pg.html
This function will take in a configuration then send to a
postgres database
path: path of the configuration
postgres_url: connection string for the destination database
compress: default to False because no files created when output to "PostgreSQL"
inplace: default to False because no compress = False by default
"""
dstDS = postgres_source(postgres_url)
return dstDS, "PostgreSQL", None, compress, inplace
@translator
def csv(
self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs
):
"""
https://gdal.org/drivers/vector/csv.html
path: path of the configuration file
compress: True if compression is needed
inplace: True if the compressed file will replace the original output
"""
return None, "CSV", "csv", compress, inplace
@translator
def pgdump(
self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs
):
"""
https://gdal.org/drivers/vector/pgdump.html
path: path of the configuration file
compress: True if compression is needed
inplace: True if the compressed file will replace the original output
"""
return None, "PGDump", "sql", compress, inplace
@translator
def shapefile(
self, path: str, compress: bool = True, inplace: bool = True, *args, **kwargs
):
"""
https://gdal.org/drivers/vector/shapefile.html
path: path of the configuration file
compress: default to True so that [shp, shx, dbf, prj] are bundled
inplace: default to True for ease of transport
"""
return None, "ESRI Shapefile", "shp", compress, inplace
@translator
def geojson(
self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs
):
"""
https://gdal.org/drivers/vector/geojson.html
path: path of the configuration file
compress: True if compression is needed
inplace: True if the compressed file will replace the original output
"""
return None, "GeoJSON", "geojson", compress, inplace
Classes
class Ingestor
-
Expand source code
class Ingestor: def __init__(self): self.base_path = base_path def compress(self, path: str, *files, inplace: bool = True): with zipfile.ZipFile( path, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9 ) as _zip: for f in files: if os.path.isfile(f): _zip.write(f, os.path.basename(f)) if inplace: os.remove(f) else: print(f"{f} does not exist!") return True def write_config(self, path: str, config: str): with open(path, "w") as f: f.write(config) def translator(func): @wraps(func) def wrapper(self, *args, **kwargs) -> list: output_files = [] path = args[0] c = Config(path, kwargs.get("version", None)) dataset, source, destination, _ = c.compute_parsed name = dataset["name"] version = dataset["version"] acl = dataset["acl"] (dstDS, output_format, output_suffix, compress, inplace) = func( self, *args, **kwargs ) # initiate source and destination datasets folder_path = f"{self.base_path}/datasets/{name}/{version}" if output_suffix: destination_path = f"{folder_path}/{name}.{output_suffix}" output_files.append(destination_path) else: destination_path = None # Default dstDS is destination_path if no dstDS is specificed dstDS = destination_path if not dstDS else dstDS srcDS = generic_source( path=source["url"]["gdalpath"], options=source["options"], fields=destination["fields"], ) layerName = srcDS.GetLayer(0).GetName() sql = destination.get("sql", None) sql = None if not sql else sql.replace("@filename", layerName) # Create output folder and output config if folder_path and output_suffix: os.makedirs(folder_path, exist_ok=True) self.write_config(f"{folder_path}/config.json", c.compute_json) self.write_config(f"{folder_path}/config.yml", c.compute_yml) output_files.append(f"{folder_path}/config.json") output_files.append(f"{folder_path}/config.yml") # Initiate vector translate with Progress( SpinnerColumn(spinner_name="earth"), TextColumn("[progress.description]{task.description}"), BarColumn(bar_width=30), TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), TimeRemainingColumn(), transient=True, ) as progress: task = progress.add_task( f"[green]Ingesting [bold]{destination['name']}[/bold]", total=1000 ) def update_progress(complete, message, unknown): progress.update(task, completed=floor(complete * 1000)) gdal.VectorTranslate( dstDS, srcDS, format=output_format, layerCreationOptions=destination["options"], dstSRS=destination["geometry"]["SRS"], srcSRS=source["geometry"]["SRS"], geometryType=destination["geometry"]["type"], layerName=destination["name"], accessMode="overwrite", makeValid=True, # optional settings SQLStatement=sql, SQLDialect="sqlite", callback=update_progress, ) # Compression if needed if compress and destination_path: if output_format == "ESRI Shapefile": files = [ f"{destination_path[:-4]}.{suffix}" for suffix in ["shp", "prj", "shx", "dbf"] ] self.compress(f"{destination_path}.zip", *files, inplace=True) output_files.remove(destination_path) output_files.append(f"{destination_path}.zip") else: self.compress( f"{destination_path}.zip", destination_path, inplace=inplace ) if inplace: output_files.remove(destination_path) output_files.append(f"{destination_path}.zip") return output_files, version, acl return wrapper @translator def postgres( self, path: str, compress: bool = False, inplace: bool = False, postgres_url: str = None, *args, **kwargs, ): """ https://gdal.org/drivers/vector/pg.html This function will take in a configuration then send to a postgres database path: path of the configuration postgres_url: connection string for the destination database compress: default to False because no files created when output to "PostgreSQL" inplace: default to False because no compress = False by default """ dstDS = postgres_source(postgres_url) return dstDS, "PostgreSQL", None, compress, inplace @translator def csv( self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs ): """ https://gdal.org/drivers/vector/csv.html path: path of the configuration file compress: True if compression is needed inplace: True if the compressed file will replace the original output """ return None, "CSV", "csv", compress, inplace @translator def pgdump( self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs ): """ https://gdal.org/drivers/vector/pgdump.html path: path of the configuration file compress: True if compression is needed inplace: True if the compressed file will replace the original output """ return None, "PGDump", "sql", compress, inplace @translator def shapefile( self, path: str, compress: bool = True, inplace: bool = True, *args, **kwargs ): """ https://gdal.org/drivers/vector/shapefile.html path: path of the configuration file compress: default to True so that [shp, shx, dbf, prj] are bundled inplace: default to True for ease of transport """ return None, "ESRI Shapefile", "shp", compress, inplace @translator def geojson( self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs ): """ https://gdal.org/drivers/vector/geojson.html path: path of the configuration file compress: True if compression is needed inplace: True if the compressed file will replace the original output """ return None, "GeoJSON", "geojson", compress, inplace
Methods
def compress(self, path: str, *files, inplace: bool = True)
-
Expand source code
def compress(self, path: str, *files, inplace: bool = True): with zipfile.ZipFile( path, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9 ) as _zip: for f in files: if os.path.isfile(f): _zip.write(f, os.path.basename(f)) if inplace: os.remove(f) else: print(f"{f} does not exist!") return True
def csv(self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs)
-
https://gdal.org/drivers/vector/csv.html
path: path of the configuration file compress: True if compression is needed inplace: True if the compressed file will replace the original output
Expand source code
@translator def csv( self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs ): """ https://gdal.org/drivers/vector/csv.html path: path of the configuration file compress: True if compression is needed inplace: True if the compressed file will replace the original output """ return None, "CSV", "csv", compress, inplace
def geojson(self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs)
-
https://gdal.org/drivers/vector/geojson.html
path: path of the configuration file compress: True if compression is needed inplace: True if the compressed file will replace the original output
Expand source code
@translator def geojson( self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs ): """ https://gdal.org/drivers/vector/geojson.html path: path of the configuration file compress: True if compression is needed inplace: True if the compressed file will replace the original output """ return None, "GeoJSON", "geojson", compress, inplace
def pgdump(self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs)
-
https://gdal.org/drivers/vector/pgdump.html
path: path of the configuration file compress: True if compression is needed inplace: True if the compressed file will replace the original output
Expand source code
@translator def pgdump( self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs ): """ https://gdal.org/drivers/vector/pgdump.html path: path of the configuration file compress: True if compression is needed inplace: True if the compressed file will replace the original output """ return None, "PGDump", "sql", compress, inplace
def postgres(self, path: str, compress: bool = False, inplace: bool = False, postgres_url: str = None, *args, **kwargs)
-
https://gdal.org/drivers/vector/pg.html
This function will take in a configuration then send to a postgres database path: path of the configuration postgres_url: connection string for the destination database compress: default to False because no files created when output to "PostgreSQL" inplace: default to False because no compress = False by default
Expand source code
@translator def postgres( self, path: str, compress: bool = False, inplace: bool = False, postgres_url: str = None, *args, **kwargs, ): """ https://gdal.org/drivers/vector/pg.html This function will take in a configuration then send to a postgres database path: path of the configuration postgres_url: connection string for the destination database compress: default to False because no files created when output to "PostgreSQL" inplace: default to False because no compress = False by default """ dstDS = postgres_source(postgres_url) return dstDS, "PostgreSQL", None, compress, inplace
def shapefile(self, path: str, compress: bool = True, inplace: bool = True, *args, **kwargs)
-
https://gdal.org/drivers/vector/shapefile.html
path: path of the configuration file compress: default to True so that [shp, shx, dbf, prj] are bundled inplace: default to True for ease of transport
Expand source code
@translator def shapefile( self, path: str, compress: bool = True, inplace: bool = True, *args, **kwargs ): """ https://gdal.org/drivers/vector/shapefile.html path: path of the configuration file compress: default to True so that [shp, shx, dbf, prj] are bundled inplace: default to True for ease of transport """ return None, "ESRI Shapefile", "shp", compress, inplace
def translator(func)
-
Expand source code
def translator(func): @wraps(func) def wrapper(self, *args, **kwargs) -> list: output_files = [] path = args[0] c = Config(path, kwargs.get("version", None)) dataset, source, destination, _ = c.compute_parsed name = dataset["name"] version = dataset["version"] acl = dataset["acl"] (dstDS, output_format, output_suffix, compress, inplace) = func( self, *args, **kwargs ) # initiate source and destination datasets folder_path = f"{self.base_path}/datasets/{name}/{version}" if output_suffix: destination_path = f"{folder_path}/{name}.{output_suffix}" output_files.append(destination_path) else: destination_path = None # Default dstDS is destination_path if no dstDS is specificed dstDS = destination_path if not dstDS else dstDS srcDS = generic_source( path=source["url"]["gdalpath"], options=source["options"], fields=destination["fields"], ) layerName = srcDS.GetLayer(0).GetName() sql = destination.get("sql", None) sql = None if not sql else sql.replace("@filename", layerName) # Create output folder and output config if folder_path and output_suffix: os.makedirs(folder_path, exist_ok=True) self.write_config(f"{folder_path}/config.json", c.compute_json) self.write_config(f"{folder_path}/config.yml", c.compute_yml) output_files.append(f"{folder_path}/config.json") output_files.append(f"{folder_path}/config.yml") # Initiate vector translate with Progress( SpinnerColumn(spinner_name="earth"), TextColumn("[progress.description]{task.description}"), BarColumn(bar_width=30), TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), TimeRemainingColumn(), transient=True, ) as progress: task = progress.add_task( f"[green]Ingesting [bold]{destination['name']}[/bold]", total=1000 ) def update_progress(complete, message, unknown): progress.update(task, completed=floor(complete * 1000)) gdal.VectorTranslate( dstDS, srcDS, format=output_format, layerCreationOptions=destination["options"], dstSRS=destination["geometry"]["SRS"], srcSRS=source["geometry"]["SRS"], geometryType=destination["geometry"]["type"], layerName=destination["name"], accessMode="overwrite", makeValid=True, # optional settings SQLStatement=sql, SQLDialect="sqlite", callback=update_progress, ) # Compression if needed if compress and destination_path: if output_format == "ESRI Shapefile": files = [ f"{destination_path[:-4]}.{suffix}" for suffix in ["shp", "prj", "shx", "dbf"] ] self.compress(f"{destination_path}.zip", *files, inplace=True) output_files.remove(destination_path) output_files.append(f"{destination_path}.zip") else: self.compress( f"{destination_path}.zip", destination_path, inplace=inplace ) if inplace: output_files.remove(destination_path) output_files.append(f"{destination_path}.zip") return output_files, version, acl return wrapper
def write_config(self, path: str, config: str)
-
Expand source code
def write_config(self, path: str, config: str): with open(path, "w") as f: f.write(config)