gisaf-backend/src/gisaf/models/geo_models_base.py

1177 lines
40 KiB
Python

from pathlib import Path
from typing import Any, ClassVar, Annotated
from datetime import date, datetime
from collections import OrderedDict
from io import BytesIO
from zipfile import ZipFile
from functools import cached_property
import locale
import logging
import geopandas as gpd # type: ignore
import shapely # type: ignore
import pyproj
from pydantic import BaseModel
from sqlmodel import select, Field
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlalchemy import BigInteger, String, func, and_, text
from sqlalchemy.sql import sqltypes
from sqlalchemy.orm import declared_attr
from psycopg2.extensions import adapt
from geoalchemy2.shape import from_shape
from geoalchemy2.types import Geometry, WKBElement
from shapely import wkb, from_wkb
from shapely.geometry import mapping # type: ignore
from shapely.ops import transform # type: ignore
from shapefile import ( # type: ignore
Writer as ShapeFileWriter,
POINT, POINTZ,
POLYLINE, POLYLINEZ,
POLYGON, POLYGONZ,
)
from gisaf.database import db_session
from gisaf.config import conf
from gisaf.models.map_bases import MaplibreStyle
from gisaf.models.models_base import Model
from gisaf.models.metadata import gisaf_survey, gisaf_admin, survey, raw_survey
from gisaf.models.misc import Qml
from gisaf.models.category import Category
from gisaf.models.info_item import Tag, InfoItem
# from gisaf.models.survey import Equipment, Surveyor, Accuracy
# from gisaf.models.project import Project
LOCALE_DATE_FORMAT = locale.nl_langinfo(locale.D_FMT)
## Coordinates of WGS84 should be in the (y, x) order, but some components (shapely?)
## always work with (x, y). Force (y, x) order with always_xy=True
## https://github.com/pyproj4/pyproj/issues/504
## https://proj.org/faq.html#why-is-the-axis-ordering-in-proj-not-consistent
reproject_func = pyproj.Transformer.from_crs(
conf.geo.srid, conf.geo.srid_for_proj, always_xy=True).transform
class NoPoint(Exception):
pass
logger = logging.getLogger('model_base_base')
## Mapping of sql types to shapefile types, for pyshp
## Character, Numbers, Longs, Dates, or Memo
exportable_cols = {
sqltypes.Numeric: 'N',
sqltypes.Integer: 'N',
sqltypes.DateTime: 'D',
sqltypes.Date: 'D',
sqltypes.Time: 'D',
sqltypes.String: 'C',
sqltypes.Unicode: 'C',
}
class BaseSurveyModel(BaseModel):
"""
Base mixin class for all layers defined from a category:
- raw survey (RAW_V_*')
- projected ('V_*')
"""
id: int | None = Field(sa_type=BigInteger, primary_key=True, default=None)
equip_id: int = Field(foreign_key=gisaf_survey.table('equipment.id'))
srvyr_id: int = Field(foreign_key=gisaf_survey.table('surveyor.id'))
accur_id: int = Field(foreign_key=gisaf_survey.table('accuracy.id'))
project_id: int = Field(foreign_key=gisaf_admin.table('project.id'))
orig_id: str
date: date
@classmethod
def selectinload(cls):
return [
cls.equipment, # type: ignore
cls.surveyor, # type: ignore
cls.accuracy, # type: ignore
cls.project, # type: ignore
]
# @classmethod
# def dyn_join_with(cls):
# return {
# 'equipment': Equipment,
# 'surveyor': Surveyor,
# 'accuracy': Accuracy,
# 'project': Project,
# }
#async def get_info(self):
# info = await super(BaseSurveyModel, self).get_info()
# return info
#async def get_geo_info(self):
# info = await super(BaseSurveyModel, self).get_geo_info()
# return info
async def get_survey_info(self) -> list[InfoItem]:
info = await super(BaseSurveyModel, self).get_survey_info()
if self.category:
info.append(InfoItem(key='ISO layer name',
value=self.iso_layer_name))
info.append(InfoItem(key='survey category',
value=self.category.name))
info.append(InfoItem(key='survey category description',
value=self.category.description))
if self.project_id:
info.append(InfoItem(key='project',
value=self.project.name))
if self.srvyr_id:
info.append(InfoItem(key='surveyor',
value=self.surveyor.name))
if self.equip_id:
info.append(InfoItem(key='survey equipment',
value=self.equipment.name))
if self.accur_id:
info.append(InfoItem(key='survey accuracy',
value=self.accuracy.name))
if self.date:
info.append(InfoItem(key='survey date',
value=self.date.strftime(LOCALE_DATE_FORMAT)))
if self.orig_id:
info.append(InfoItem(key='original id',
value=self.orig_id))
return info
@property
def iso_layer_name(self) -> str:
"""
The ISO layer name, built on the category and status
"""
return '{category.domain}-{category.group:4s}-{category.minor_group_1:4s}-{category.minor_group_2:4s}-{status:1s}'\
.format(category=self.category, status=self.status)
class SurveyModel(BaseSurveyModel):
"""
Base mixin class for defining final (reprojected) survey data, with a status
"""
__table_args__ = survey.table_args
# status: str = Field(sa_type=String(1))
get_gdf_with_related: ClassVar[bool] = False
category_name: ClassVar[str]
filtered_columns_on_map: ClassVar[list[str]] = [
'equip_id',
'srvyr_id',
'accur_id',
'project_id',
'orig_id',
'date',
'gisaf_survey_equipment_id',
'gisaf_survey_equipment_name',
'gisaf_survey_surveyor_id',
'gisaf_survey_surveyor_name',
'gisaf_survey_accuracy_id',
'gisaf_survey_accuracy_name',
'gisaf_survey_accuracy_accuracy',
'gisaf_admin_project_id',
'gisaf_admin_project_name',
'gisaf_admin_project_contact_person',
'gisaf_admin_project_site',
'gisaf_admin_project_date_approved',
'gisaf_admin_project_start_date_planned',
'gisaf_admin_project_start_date_effective',
'gisaf_admin_project_end_date_planned',
'gisaf_admin_project_end_date_effective',
'gisaf_admin_project_skip_columns',
]
@declared_attr
def __tablename__(cls) -> str:
return cls.__name__ # type: ignore
async def get_survey_info(self) -> list[InfoItem]:
info = await super(SurveyModel, self).get_survey_info()
if self.srvyr_id:
info.append(InfoItem(key='surveyor', value=self.surveyor.name))
if self.equip_id:
info.append(InfoItem(key='survey equipment', value=self.equipment.name))
if self.accur_id:
info.append(InfoItem(key='survey accuracy', value=self.accuracy.name))
if self.date:
info.append(InfoItem(key='survey date', value=self.date.strftime(LOCALE_DATE_FORMAT)))
info.append(InfoItem(key='status', value=self.status))
info.append(InfoItem(key='original id', value=self.orig_id))
return info
async def get_feature_properties(self):
return {
'status': self.status if self.status else 'E'
}
@property
def caption(self):
return '{self.category.description} [{self.category.name}: {self.category.group}-{self.category.minor_group_1}] #{self.id:d}'.format(self=self)
@classmethod
async def get_popup(cls, df):
return cls.category.description + \
' [' + cls.category.group + '-' + cls.category.minor_group_1 + \
'] #' + df.index.astype('U')
@classmethod
async def get_geojson(cls,
simplify_tolerance=0, preserve_topology=False):
from gisaf.registry import registry
## Fastest, but the id is in properties and needs the front end (eg Gisaf for mapbox)
## to move it at the feature level
## Requires PostGis 3.0+
sql_query_for_geojson_fast = """
SELECT json_build_object(
'type', 'FeatureCollection',
'features', json_agg(ST_AsGeoJSON(t.*)::json)
)::varchar
FROM (
SELECT f.geom,
f.id::varchar,
{description} || ' [' || '{category.group}' || '-' || '{category.minor_group_1}' || '] #' || f.id as popup,
f.status
FROM "{schema}"."{table}" as f
WHERE f.geom is not null
) AS t;
"""
## Slower version (3x), but compliant with RFC 7946
## where the id is at the feature level
sql_query_for_geojson_slow = """
SELECT jsonb_build_object(
'type', 'FeatureCollection',
'features', jsonb_agg(features.feature)
)::varchar
FROM (
SELECT jsonb_build_object(
'type', 'Feature',
'id', id,
'geometry', ST_AsGeoJSON(geom)::jsonb,
'properties', jsonb_build_object(
'popup',
{description} || ' [' || '{category.group}' || '-' || '{category.minor_group_1}' || '] #' || inputs.id::varchar,
'status', status
)
) AS feature
FROM "{schema}"."{table}" AS inputs
) AS features
"""
sql_query_for_geojson = sql_query_for_geojson_fast
category = registry.categories.loc[cls.category_name]
session: AsyncSession
async with db_session() as session:
query = sql_query_for_geojson.format(
model=cls,
category=category,
schema=cls.__table__.schema,
#table=cls.__tablename__,
# FIXME: should be __tablename__, but see SQLModel.__tablename__ which use lower(__name__)
table=cls.__name__,
description=adapt(category.description),
)
result = await session.exec(text(query))
return result.first()[0]
def to_row(self):
"""
Get a list of attributes, typically used for exporting in CSV
:return: list of attributes
"""
return [
self.id,
self.shapely_geom.x,
self.shapely_geom.y,
self.shapely_geom.z,
self.category_name,
self.surveyor,
self.equipment,
self.date.isoformat(),
self.accuracy.name,
self.status,
self.project.name,
self.orig_id
]
class GeoModelNoStatus(Model):
"""
Base class for all geo models
"""
id: int | None = Field(default=None, primary_key=True)
description: ClassVar[str] = ''
attribution: ClassVar[str | None] = None
can_get_features_as_df: ClassVar[bool] = True
"""
can_get_features_as_df indicates that the model is ready to get GeoJson using GeoDataframe
If False, switch back to gino and dict based conversion using get_features_in_bulk_gino
and record.get_feature_as_dict (DEPRECATED)
"""
cache_enabled: ClassVar[bool] = True
"""
cache_enabled indicated that the model is OK with the caching mechanism of geojson stores.
The cache is time-stamped with DB triggers on modification, so it's safe unless the model
fetches other data than from its own table, eg. some status for styling.
See gisaf.redis_tools and geoapi.gj_feature for the implementation details of the cache.
"""
get_gdf_with_related: ClassVar[bool] = False
"""
get_gdf_with_related indicates that get_df (thus, get_geo_df and the geoJson API for
the map online) gets related models (1-n relations, as defined with selectinload)
by default.
It can be overridden with the with_related parameter when calling get_df.
"""
z_index: ClassVar[int] = 450 # Field(450, alias='zIndex')
"""
z-index for the leaflet layer.
Should be between 400 and 500.
"""
icon: ClassVar[str | None] = None
"""
Icon for the model, used for normal web interface (ie. except the map)
"""
symbol: ClassVar[str | None] = None
"""
Icon for the model, used in the map (mapbox)
"""
style: ClassVar[str] = ''
"""
Style for the model, used in the map, etc
"""
_join_with: ClassVar[dict[str, Any]] = {
}
"""
Fields to join when getching items using get_features.
"""
hidden: ClassVar[bool] = False
"""
This model should be hidden from the menu
"""
def __str__(self):
return self.caption
async def get_geo_info(self) -> list[InfoItem]:
"""
Geographical info
"""
return []
async def get_survey_info(self) -> list[InfoItem]:
"""
Quality info: project, source, accuracy...
"""
return []
async def get_info(self) -> list[InfoItem]:
"""
Model specific info
"""
return []
# @classmethod
# def get_join_with(cls):
# if hasattr(cls, 'dyn_join_with'):
# return cls.dyn_join_with()
# else:
# return cls._join_with
#@classmethod
#def get_style(cls):
# """
# Style for the layer in Leaflet (css).
# :return:
# """
# return {
# # 'color': "#4B1BDE",
# 'weight': '1'
# }
@classmethod
def get_options(cls):
"""
Options for the layer in Leaflet.
For example: {'weight': 0, 'fillOpacity': 0.1}
:return:
"""
return {
}
@property
def caption(self):
"""
Subclass me!
:return: str
"""
return 'Some geometry'
@classmethod
async def get_popup(cls, df):
return cls.__name__ + ': ' + df.index.astype('U')
@classmethod
async def get_properties(cls, df):
return {}
async def get_tags(self) -> list[Tag]:
from gisaf.models.tags import Tags
async with db_session() as session:
query = select(Tags.tags).where(Tags.store == self.get_store_name(),
Tags.ref_id == self.id)
data = await session.exec(query)
tags = data.one_or_none()
if tags is not None:
return [Tag(key=key, value=value)
for key, value in tags.items()]
else:
return []
@cached_property
def shapely_geom(self):
return from_wkb(self.geom.data)
# @property
# def shapely_geom(self):
# if not hasattr(self, '_shapely_geom'):
# if isinstance(self.geom, WKBElement):
# bytes = self.geom.data
# if bytes:
# self._shapely_geom = wkb.loads(bytes)
# else:
# self._shapely_geom = None
# else:
# self._shapely_geom = None
# return self._shapely_geom
def get_bgColor(self):
"""
Get the background color of the element on the map.
Subclass me!
:return: str
"""
return ''
async def get_feature_as_dict(self, simplify_tolerance=None,
reproject=False, css_class_prefix=''):
"""
Get the parameters of this object (feature)
:param css_class_prefix: for leaflet only
:return:
"""
logger.warn(f'{self} use get_feature_as_dict: please DEPRECATE in favor of get_geo_df')
if hasattr(self, 'get_feature_properties'):
properties = await self.get_feature_properties()
else:
properties = {
'popup': self.caption
}
if reproject:
shapely_geom = transform(reproject_func, self.shapely_geom)
else:
shapely_geom = self.shapely_geom
if simplify_tolerance:
shapely_geom = shapely_geom.simplify(
simplify_tolerance / conf.geo.simplify_geom_factor,
preserve_topology=conf.geo.simplify_preserve_topology)
if shapely_geom.is_empty:
raise NoPoint
return {
"type": "Feature",
'id': str(self.id),
"properties": properties,
'geometry': mapping(shapely_geom)
}
@classmethod
def get_columns_for_shapefile_writer(cls, writer):
"""
Return the columns and corresponding attribute name
:param writer:
:return:
"""
## Define the attributes
columns = cls.__table__.columns
cols = [col.name for col in columns if col.type.__class__ in exportable_cols]
for col in columns:
if col.name in cols:
writer.field(col.name, exportable_cols[col.type.__class__])
return cols
return OrderedDict([(col, cols_to_attr[col]) for col in cols])
@classmethod
async def get_shapefile_writer(cls):
"""
Get a shapefile writer with all records loaded.
Subclasses should implement this.
:return:
"""
raise NotImplementedError
@classmethod
async def get_shapefile_files(cls):
"""
Get an actual shapefile with all records.
:return: zip file
"""
writer = await cls.get_shapefile_writer()
layer_name = cls.__name__
## Save the dbf, shp, shx data
dbf = BytesIO()
writer.saveDbf(dbf)
shp = BytesIO()
writer.saveShp(shp)
shx = BytesIO()
writer.saveShx(shx)
qml = None
qml_rec = await Qml.get(layer_name)
if qml_rec and qml_rec.qml:
qml = qml_rec.qml
srid = cls.geom.expression.type.srid
## Write the .prj file: projection system reference
## Try if there's a file
prj_path = (conf.prj_files_dir / str(srid)).with_suffix('.prj')
proj_str = None
if prj_path.is_file():
with open(prj_path) as proj_file:
proj_str = proj_file.read()
else:
## Get from the spatial_ref_sys table in the database
async with db.acquire(reuse=False) as connection:
async with connection.transaction() as tx:
proj_rec = await connection.first(f'select srtext from spatial_ref_sys where srid={srid}')
if proj_rec and proj_rec[0]:
proj_str = proj_rec[0]
return dbf, shp, shx, qml, proj_str
@classmethod
async def get_shapefile(cls):
layer_name = cls.__name__
dbf, shp, shx, qml, proj_str = await cls.get_shapefile_files()
## Zip and return data
zip_file = BytesIO()
with ZipFile(zip_file, 'w') as zip:
zip.writestr('{}.dbf'.format(layer_name), dbf.getvalue())
zip.writestr('{}.shp'.format(layer_name), shp.getvalue())
zip.writestr('{}.shx'.format(layer_name), shx.getvalue())
if qml:
zip.writestr('{}.qml'.format(layer_name), qml)
if proj_str:
zip.writestr('{}.prj'.format(layer_name), proj_str)
zip_file.seek(0)
return zip_file
@classmethod
async def get_maplibre_style(cls) -> MaplibreStyle | None:
"""
Get the mapbox style (paint, layout, attribution...)
"""
## If the model is from survey, it should have a category, which has a style
## Get from database
style: MaplibreStyle | None
async with db_session() as session:
if hasattr(cls, 'category'):
category = await session.get(Category, cls.get_store_name())
if category:
style = MaplibreStyle(
layout=category.mapbox_layout,
paint=category.mapbox_paint,
)
else:
style = None
else:
qml = await session.get(Qml, cls.__name__)
if qml:
style = MaplibreStyle(
layout=qml.mapbox_layout,
paint=qml.mapbox_paint,
attribution=qml.attr,
)
else:
style = None
return style
@classmethod
async def get_features_attrs(cls, simplify_tolerance):
"""
Get the attributes and kwargs to pass to get_features for this model
:param simplify_tolerance: float
:return: tuple(dict, dict)
"""
attrs = {"type": "FeatureCollection"}
logger.warn(f'{cls} use get_features_attrs: please DEPRECATE in favor of get_geo_df')
## Get style, with priority order
## If the model is from survey ("auto_model"), it should have a category, which has a style
category = getattr(cls, 'category', None)
features_kwargs = {}
if not issubclass(cls, GeoPointModel):
if hasattr(cls, 'simplify') and cls.simplify:
features_kwargs['simplify_tolerance'] = float(cls.simplify)
else:
features_kwargs['simplify_tolerance'] = simplify_tolerance
features_kwargs['reproject'] = conf.geojson_srid != conf.srid
## If the model is from survey, it should have a category, which has a style
## Get from database
## TODO: update categories dynamically in the registry
if hasattr(cls, 'category'):
category = await Category.get(cls.category.name)
else:
category = None
features_kwargs['reproject'] = conf.geojson_srid != conf.srid
## Get options, if the model has defined the get_options function
if hasattr(cls, 'get_options'):
attrs["options"] = cls.get_options()
if hasattr(cls, 'get_popup_dynamic'):
attrs["has_popups"] = True
return attrs, features_kwargs
@classmethod
async def get_features_in_bulk_gino(cls, **kwargs):
"""
Get the features, using joins defined by _join_with, or, in priority, get_join_with().
See Gino's doc: relationship.html (specially loaders)
:param kwargs:
:return:
"""
features = []
async with db.acquire(reuse=True) as connection:
async with connection.transaction() as tx:
join = cls.get_join_with()
query = cls.load(**join)
#async for record in connection.iterate(query):
for record in await connection.all(query):
if record.geom is not None:
try:
feature = await record.get_feature_as_dict(**kwargs)
except NoPoint:
pass
except Exception as err:
logger.exception(err)
else:
features.append(feature)
return features
@classmethod
async def get_geos_df(cls, where=None, crs=None, reproject=False,
filter_columns=False, with_popup=False,
drop_wkb=True, **kwargs):
if not gpd.options.use_pygeos:
logger.warn(f'Using get_geos_df for {cls} but gpd.options.use_pygeos not set')
if not crs:
crs = conf.crs.geojson
df = await cls.get_df(where=where, **kwargs)
df.set_index('id', inplace=True)
df.rename(columns={'geom': 'wkb'}, inplace=True)
df['geometry'] = shapely.from_wkb(df['wkb'])
if drop_wkb:
df = df.drop(columns=['wkb'])
return gpd.GeoDataFrame(df, geometry='geometry', crs=crs)
@classmethod
async def get_geo_df(cls, where=None, crs=None, reproject=False,
filter_columns=False, with_popup=False,
**kwargs) -> gpd.GeoDataFrame:
"""
Return a GeoPandas GeoDataFrame of all records
:param where: where clause for the query (eg. Model.attr=='foo')
:param crs: coordinate system (eg. 'epsg:4326') (priority over the reproject parameter)
:param reproject: should reproject to conf.srid_for_proj
:return:
"""
return await cls.get_gdf(where=where, **kwargs)
# df = await cls.get_df(where=where, **kwargs)
# df.dropna(subset=['geom'], inplace=True)
# df.set_index('id', inplace=True)
# df.sort_index(inplace=True)
# df_clean = df[~df.geom.isna()]
# ## Drop coordinates
# df_clean.drop(columns=set(df_clean.columns).intersection(['ST_X_1', 'ST_Y_1', 'ST_Z_1']),
# inplace=True)
# if not crs:
# crs = conf.crs.geojson
# ## XXX: is it right? Waiting for application.py to remove pygeos support to know more.
# # if getattr(gpd.options, 'use_pygeos', False):
# # geometry = shapely.from_wkb(df_clean.geom)
# # else:
# # geometry = [wkb.loads(geom) for geom in df_clean.geom]
# gdf = gpd.GeoDataFrame(
# df_clean.drop('geom', axis=1),
# crs=crs,
# geometry=geometry
# )
# if hasattr(cls, 'simplify') and cls.simplify:
# #shapely_geom = shapely_geom.simplify(
# simplify_tolerance / conf.geo.simplify_geom_factor,
# preserve_topology=conf.geo.simplify_preserve_topology)
# gdf['geometry'] = gdf['geometry'].simplify(
# float(cls.simplify) / conf.geo.simplify_geom_factor,
# preserve_topology=conf.geo.simplify_preserve_topology)
# if reproject:
# gdf.to_crs(crs=conf.crs.for_proj, inplace=True)
# ## Filter out columns
# if filter_columns:
# gdf.drop(columns=set(gdf.columns).intersection(cls.filtered_columns_on_map),
# inplace=True)
# if with_popup:
# gdf['popup'] = await cls.get_popup(gdf)
# return gdf
@classmethod
def get_attachment_dir(cls):
return cls.__table__.fullname
@classmethod
def get_attachment_base_dir(cls):
return Path(conf.attachments.base_dir) / cls.get_attachment_dir()
class GeoModel(GeoModelNoStatus):
status: ClassVar[str] = 'E'
"""
Status (ISO layers definition) of the layer. E -> Existing.
"""
class LiveGeoModel(GeoModel):
store: ClassVar[str]
group: ClassVar[str] ='Live'
custom: ClassVar[bool] = True
is_live: ClassVar[bool] = True
is_db: ClassVar[bool] = False
# class Geom(str):
# pass
class GeoPointModelNoStatus(GeoModelNoStatus):
shapefile_model: ClassVar[int] = POINT
## geometry typing, see https://stackoverflow.com/questions/77333100/geoalchemy2-geometry-schema-for-pydantic-fastapi
geom: Annotated[str, WKBElement] = Field(sa_type=Geometry('POINT', srid=conf.geo.srid))
icon: ClassVar[str | None] = None
mapbox_type: ClassVar[str] = 'symbol'
base_gis_type: ClassVar[str] = 'Point'
symbol: ClassVar[str] = '\ue32b'
@property
def latitude(self):
return self.shapely_geom.y
@property
def longitude(self):
return self.shapely_geom.x
@property
def elevation(self):
return self.shapely_geom.z
@property
def caption(self):
"""
Return user friendly name (used in menu, etc)
:return:
"""
return '{self.__class__.__name__}: {self.id:d}'.format(self=self)
def get_coords(self):
return (self.shapely_geom.x, self.shapely_geom.y)
@classmethod
async def get_shapefile_writer(cls):
writer = ShapeFileWriter(cls.shapefile_model)
cols = cls.get_columns_for_shapefile_writer(writer)
query = cls.query.where(cls.geom != None)
async with db.acquire(reuse=False) as connection:
async with connection.transaction() as tx:
## Add the objects
async for obj in connection.iterate(query):
writer.point(*obj.get_coords(), shapeType=cls.shapefile_model)
attrs = []
for attr_name in cols:
attr = getattr(obj, attr_name)
if isinstance(attr, date):
## Date in a format approriate for shapefile
attrs.append('{:%Y%m%d}'.format(attr))
else:
attrs.append(attr)
writer.record(*attrs)
return writer
async def get_geo_info(self) -> list[InfoItem]:
g = shapely.from_wkb(self.geom.data) # type: ignore
return [
InfoItem(key='longitude', value=g.x), # type: ignore
InfoItem(key='latitude', value=g.y), # type: ignore
]
class GeoPointModel(GeoPointModelNoStatus, GeoModel):
...
class GeoPointZModel(GeoPointModel):
geom: Annotated[str, WKBElement] = Field(
sa_type=Geometry('POINTZ', dimension=3, srid=conf.geo.srid))
shapefile_model: ClassVar[int] = POINTZ
def get_coords(self):
return (self.shapely_geom.x, self.shapely_geom.y, self.shapely_geom.z)
async def get_geo_info(self) -> list[InfoItem]:
info = await super(GeoPointZModel, self).get_geo_info()
info.append(
InfoItem(key='elevation (m)', value='{:.2f}'.format(self.shapely_geom.z))
)
return info
class GeoPointMModel(GeoPointZModel):
shapefile_model: ClassVar[int] = POINTZ
geom: Annotated[str, WKBElement] = Field(
sa_type=Geometry('POINTZ', dimension=3, srid=conf.geo.srid))
class GeoLineModel(GeoModel):
shapefile_model: ClassVar[int] = POLYLINE
geom: Annotated[str, WKBElement] = Field(
sa_type=Geometry('LINESTRING', srid=conf.geo.srid))
mapbox_type: ClassVar[str] = 'line'
base_gis_type: ClassVar[str] = 'Line'
@property
def length(self):
return transform(reproject_func, self.shapely_geom).length
@property
def caption(self):
"""
Return user friendly name (used in menu, etc)
:return:
"""
return '{self.__class__.__name__}: {self.id:d}'.format(self=self)
@classmethod
async def get_shapefile_writer(cls):
writer = ShapeFileWriter(cls.shapefile_model)
cols = cls.get_columns_for_shapefile_writer(writer)
query = cls.query.where(cls.geom != None)
async with db.acquire(reuse=False) as connection:
async with connection.transaction() as tx:
## Add the objects
async for obj in connection.iterate(query):
try:
writer.line(parts=[[point for point in obj.get_points()]])
except NoPoint:
pass
attrs = []
for attr_name in cols:
attr = getattr(obj, attr_name)
if isinstance(attr, date):
## Date in a format approriate for shapefile
attrs.append('{:%Y%m%d}'.format(attr))
else:
attrs.append(attr)
writer.record(*attrs)
return writer
def get_points(self):
"""
Get a list of points
:return:
"""
if isinstance(self.geom, None.__class__):
raise NoPoint
points = wkb.loads(self.geom.data)
return zip(points.coords.xy[0], points.coords.xy[1])
async def get_geo_info(self) -> list[InfoItem]:
bounds = self.shapely_geom.bounds
return [
InfoItem(key='longitude', value='{:.6f} - {:.6f}'.format(bounds[0], bounds[2])),
InfoItem(key='latitude', value='{:.6f} - {:.6f}'.format(bounds[1], bounds[3])),
InfoItem(key='length (m)', value='{self.length:.2f}'.format(self=self))
]
class GeoLineModelZ(GeoLineModel):
shapefile_model: ClassVar[int] = POLYLINEZ
geom: Annotated[str, WKBElement] = Field(
sa_type=Geometry('LINESTRINGZ', dimension=3, srid=conf.geo.srid))
async def get_geo_info(self) -> list[InfoItem]:
info = await super(GeoLineModelZ, self).get_geo_info()
elevations = [cc[2] for cc in self.shapely_geom.coords]
elev_min = min(elevations)
elev_max = max(elevations)
if elev_min == elev_max:
info.append(InfoItem(key='elevation (m)',
value='{:.2f}'.format(elev_min)))
else:
info.append(InfoItem(key='elevation (m)',
value='{:.2f} - {:.2f}'.format(elev_min, elev_max)))
return info
class GeoPolygonModel(GeoModel):
shapefile_model: ClassVar[int] = POLYGON
geom: Annotated[str, WKBElement] = Field(
sa_type=Geometry('POLYGON', srid=conf.geo.srid))
mapbox_type: ClassVar[str] = 'fill'
base_gis_type: ClassVar[str] = 'Polygon'
@property
def area(self):
return transform(reproject_func, self.shapely_geom).area
@property
def length(self):
return transform(reproject_func, self.shapely_geom).length
@property
def caption(self):
"""
Return user friendly name (used in menu, etc)
:return:
"""
return '{self.__class__.__name__}: {self.id:d}'.format(self=self)
@classmethod
async def get_shapefile_writer(cls):
writer = ShapeFileWriter(cls.shapefile_model)
cols = cls.get_columns_for_shapefile_writer(writer)
query = cls.query.where(cls.geom != None)
async with db.acquire(reuse=False) as connection:
async with connection.transaction() as tx:
## Add the objects
async for obj in connection.iterate(query):
try:
writer.poly(parts=[[point for point in obj.get_points()]])
except NoPoint:
pass
attrs = []
for attr_name in cols:
attr = getattr(obj, attr_name)
if isinstance(attr, date):
## Date in a format approriate for shapefile
attrs.append('{:%Y%m%d}'.format(attr))
else:
attrs.append(attr)
writer.record(*attrs)
return writer
def get_points(self):
"""
Get a list of points
:return:
"""
if isinstance(self.geom, None.__class__):
raise NoPoint
points = wkb.loads(self.geom.data)
return zip(points.exterior.coords.xy[0], points.exterior.coords.xy[1])
async def get_geo_info(self) -> list[InfoItem]:
info = []
area = self.area
bounds = self.shapely_geom.bounds
info.append(InfoItem(key='longitude',
value='{:.6f} - {:.6f}'.format(bounds[0], bounds[2])))
info.append(InfoItem(key='latitude',
value='{:.6f} - {:.6f}'.format(bounds[1], bounds[3])))
info.append(InfoItem(key='length (m)',
value='{:.2f}'.format(self.length)))
info.append(InfoItem(key='area (sq. m)',
value='{:.1f} sq. m'.format(area)))
info.append(InfoItem(key='area (ha)',
value='{:.1f} ha'.format(area / 10000)))
info.append(InfoItem(key='area (acre)',
value='{:.1f} acres'.format(area / 4046.85643005078874)))
return info
class GeoPolygonModelZ(GeoPolygonModel):
shapefile_model: ClassVar[int] = POLYGONZ
geom: Annotated[str, WKBElement] = Field(
sa_type=Geometry('POLYGONZ', dimension=3, srid=conf.geo.srid))
async def get_geo_info(self) -> list[InfoItem]:
info = await super(GeoPolygonModelZ, self).get_geo_info()
if hasattr(self.shapely_geom, 'exterior'):
coords = self.shapely_geom.exterior.coords
else:
coords = self.shapely_geom.coords
elevations = [coord[2] for coord in coords]
elev_min = min(elevations)
elev_max = max(elevations)
if elev_min == elev_max:
info.append(InfoItem(key='elevation (m)',
value='{:.2f}'.format(elev_min)))
else:
info.append(InfoItem(key='elevation (m)',
value='{:.2f} - {:.2f}'.format(elev_min, elev_max)))
return info
class GeoPointSurveyModel(SurveyModel, GeoPointMModel):
## raw_model is set in category_models_maker.make_category_models
raw_model: ClassVar['RawSurveyBaseModel']
class LineWorkSurveyModel(SurveyModel):
## raw_model is set in category_models_maker.make_category_models
raw_model: ClassVar['RawSurveyBaseModel']
def match_raw_points(self):
reprojected_geom = transform(reproject_func, self.shapely_geom)
reprojected_geom_geoalchemy = from_shape(reprojected_geom, conf.raw_survey_srid)
raw_survey_points_project = self.raw_model.query.filter(
self.raw_model.project_id==self.project_id)
query = raw_survey_points_project.filter(
func.ST_Distance(reprojected_geom_geoalchemy, self.raw_model.geom) < conf.epsilon
)
return query.all()
class GeoLineSurveyModel(LineWorkSurveyModel, GeoLineModelZ):
pass
class GeoPolygonSurveyModel(LineWorkSurveyModel, GeoPolygonModelZ):
pass
class RawSurveyBaseModel(BaseSurveyModel, GeoPointModelNoStatus):
"""
Abstract base class for category based raw survey point models
"""
__table_args__ = raw_survey.table_args
geom: Annotated[str, WKBElement] = Field(
sa_type=Geometry('POINTZ',
dimension=3,
srid=conf.geo.raw_survey.srid),
) # type: ignore
status: str = Field(sa_type=String(1)) # type: ignore
## store_name is set in category_models_maker.make_category_models
store_name: ClassVar[str | None] = None
@classmethod
async def get_geo_df(cls, *args, **kwargs):
return await super().get_geo_df(crs=conf.raw_survey.spatial_sys_ref,
*args, **kwargs)
class PlottableModel(Model):
"""
Model that defines a value for each record, therefore that can be used
in plots (eg. bar graphs).
Subclasses might want to define:
* a unit (default: m)
* a resampling method, to be applied to all values (unless values is a dict, see below)
* attributes with numerical types, and:
* a tuple called values that defines what are these columns
to be used (the first one being the default)
* OR an ordereed dict of value => resampling method
"""
float_format: ClassVar[str] = '%.1f'
values: ClassVar[list[dict[str, str]]] = []
@classmethod
async def get_as_dataframe(cls, model_id=None, where=None, **kwargs):
"""
Get a dataframe for the data.
It's quite generic, so subclasses might want to subclass this.
"""
if where is None:
if model_id is None:
where_ = None
else:
where_ = cls.ref_id == model_id
else:
if model_id is None:
where_ = where
else:
where_ = and_(cls.ref_id == model_id, where)
if where_ is not None:
df = await cls.get_df(where=where_, **kwargs)
else:
df = await cls.get_df(**kwargs)
return df
class TimePlottableModel(PlottableModel):
time: datetime
@classmethod
async def get_as_dataframe(cls, model_id=None, with_only_columns=None, **kwargs):
"""
Get the data as a time-indexed dataframe
"""
if with_only_columns == None:
with_only_columns = [val['name'] for val in cls.values]
if 'time' not in with_only_columns:
with_only_columns.insert(0, 'time')
df = await super().get_as_dataframe(model_id=model_id,
with_only_columns=with_only_columns,
**kwargs)
## Set time as index
df.set_index('time', drop=True, inplace=True)
df.sort_index(inplace=True)
return df