from pathlib import Path from typing import Any, ClassVar, Annotated from datetime import date, datetime from collections import OrderedDict from io import BytesIO from zipfile import ZipFile from functools import cached_property import locale import logging import geopandas as gpd # type: ignore import shapely # type: ignore import pyproj from pydantic import BaseModel from sqlmodel import select, Field from sqlmodel.ext.asyncio.session import AsyncSession from sqlalchemy import BigInteger, String, func, and_, text from sqlalchemy.sql import sqltypes from sqlalchemy.orm import declared_attr from geoalchemy2.shape import from_shape from geoalchemy2.types import Geometry, WKBElement from shapely import wkb, from_wkb from shapely.geometry import mapping # type: ignore from shapely.ops import transform # type: ignore from shapefile import ( # type: ignore Writer as ShapeFileWriter, POINT, POINTZ, POLYLINE, POLYLINEZ, POLYGON, POLYGONZ, ) from gisaf.database import db_session from gisaf.config import conf from gisaf.models.map_bases import MaplibreStyle from gisaf.models.models_base import Model from gisaf.models.metadata import gisaf_survey, gisaf_admin, survey, raw_survey from gisaf.models.misc import Qml from gisaf.models.category import Category from gisaf.models.info_item import Tag, InfoItem # from gisaf.models.survey import Equipment, Surveyor, Accuracy # from gisaf.models.project import Project LOCALE_DATE_FORMAT = locale.nl_langinfo(locale.D_FMT) ## Coordinates of WGS84 should be in the (y, x) order, but some components (shapely?) ## always work with (x, y). Force (y, x) order with always_xy=True ## https://github.com/pyproj4/pyproj/issues/504 ## https://proj.org/faq.html#why-is-the-axis-ordering-in-proj-not-consistent reproject_func = pyproj.Transformer.from_crs( conf.geo.srid, conf.geo.srid_for_proj, always_xy=True).transform class NoPoint(Exception): pass logger = logging.getLogger('model_base_base') ## Mapping of sql types to shapefile types, for pyshp ## Character, Numbers, Longs, Dates, or Memo exportable_cols = { sqltypes.Numeric: 'N', sqltypes.Integer: 'N', sqltypes.DateTime: 'D', sqltypes.Date: 'D', sqltypes.Time: 'D', sqltypes.String: 'C', sqltypes.Unicode: 'C', } class BaseSurveyModel(BaseModel): """ Base mixin class for all layers defined from a category: - raw survey (RAW_V_*') - projected ('V_*') """ id: int | None = Field(sa_type=BigInteger, primary_key=True, default=None) equip_id: int = Field(foreign_key=gisaf_survey.table('equipment.id')) srvyr_id: int = Field(foreign_key=gisaf_survey.table('surveyor.id')) accur_id: int = Field(foreign_key=gisaf_survey.table('accuracy.id')) project_id: int = Field(foreign_key=gisaf_admin.table('project.id')) orig_id: str date: date @classmethod def selectinload(cls): return [ cls.equipment, # type: ignore cls.surveyor, # type: ignore cls.accuracy, # type: ignore cls.project, # type: ignore ] # @classmethod # def dyn_join_with(cls): # return { # 'equipment': Equipment, # 'surveyor': Surveyor, # 'accuracy': Accuracy, # 'project': Project, # } #async def get_info(self): # info = await super(BaseSurveyModel, self).get_info() # return info #async def get_geo_info(self): # info = await super(BaseSurveyModel, self).get_geo_info() # return info async def get_survey_info(self) -> list[InfoItem]: info = await super(BaseSurveyModel, self).get_survey_info() if self.category: info.append(InfoItem(key='ISO layer name', value=self.iso_layer_name)) info.append(InfoItem(key='survey category', value=self.category.name)) info.append(InfoItem(key='survey category description', value=self.category.description)) if self.project_id: info.append(InfoItem(key='project', value=self.project.name)) if self.srvyr_id: info.append(InfoItem(key='surveyor', value=self.surveyor.name)) if self.equip_id: info.append(InfoItem(key='survey equipment', value=self.equipment.name)) if self.accur_id: info.append(InfoItem(key='survey accuracy', value=self.accuracy.name)) if self.date: info.append(InfoItem(key='survey date', value=self.date.strftime(LOCALE_DATE_FORMAT))) if self.orig_id: info.append(InfoItem(key='original id', value=self.orig_id)) return info @property def iso_layer_name(self) -> str: """ The ISO layer name, built on the category and status """ return '{category.domain}-{category.group:4s}-{category.minor_group_1:4s}-{category.minor_group_2:4s}-{status:1s}'\ .format(category=self.category, status=self.status) class SurveyModel(BaseSurveyModel): """ Base mixin class for defining final (reprojected) survey data, with a status """ __table_args__ = survey.table_args # status: str = Field(sa_type=String(1)) get_gdf_with_related: ClassVar[bool] = False category_name: ClassVar[str] filtered_columns_on_map: ClassVar[list[str]] = [ 'equip_id', 'srvyr_id', 'accur_id', 'project_id', 'orig_id', 'date', 'gisaf_survey_equipment_id', 'gisaf_survey_equipment_name', 'gisaf_survey_surveyor_id', 'gisaf_survey_surveyor_name', 'gisaf_survey_accuracy_id', 'gisaf_survey_accuracy_name', 'gisaf_survey_accuracy_accuracy', 'gisaf_admin_project_id', 'gisaf_admin_project_name', 'gisaf_admin_project_contact_person', 'gisaf_admin_project_site', 'gisaf_admin_project_date_approved', 'gisaf_admin_project_start_date_planned', 'gisaf_admin_project_start_date_effective', 'gisaf_admin_project_end_date_planned', 'gisaf_admin_project_end_date_effective', 'gisaf_admin_project_skip_columns', ] @declared_attr def __tablename__(cls) -> str: return cls.__name__ # type: ignore async def get_survey_info(self) -> list[InfoItem]: info = await super(SurveyModel, self).get_survey_info() if self.srvyr_id: info.append(InfoItem(key='surveyor', value=self.surveyor.name)) if self.equip_id: info.append(InfoItem(key='survey equipment', value=self.equipment.name)) if self.accur_id: info.append(InfoItem(key='survey accuracy', value=self.accuracy.name)) if self.date: info.append(InfoItem(key='survey date', value=self.date.strftime(LOCALE_DATE_FORMAT))) info.append(InfoItem(key='status', value=self.status)) info.append(InfoItem(key='original id', value=self.orig_id)) return info async def get_feature_properties(self): return { 'status': self.status if self.status else 'E' } @property def caption(self) -> str: return f'{self.category.description} [{self.category.name}: {self.category.group}-{self.category.minor_group_1}] #{self.id:d}'.format(self=self) @classmethod async def get_popup(cls, df): return cls.category.description + \ ' [' + cls.category.group + '-' + cls.category.minor_group_1 + \ '] #' + df.index.astype('U') @classmethod async def get_geojson(cls, simplify_tolerance=0, preserve_topology=False): from gisaf.registry import registry ## Fastest, but the id is in properties and needs the front end (eg Gisaf for mapbox) ## to move it at the feature level ## Requires PostGis 3.0+ sql_query_for_geojson_fast = """ SELECT json_build_object( 'type', 'FeatureCollection', 'features', json_agg(ST_AsGeoJSON(t.*)::json) )::varchar FROM ( SELECT f.geom, f.id::varchar, '{description}' || ' [' || '{category.group}' || '-' || '{category.minor_group_1}' || '] #' || f.id as popup, f.status FROM "{schema}"."{table}" as f WHERE f.geom is not null ) AS t; """ ## Slower version (3x), but compliant with RFC 7946 ## where the id is at the feature level sql_query_for_geojson_slow = """ SELECT jsonb_build_object( 'type', 'FeatureCollection', 'features', jsonb_agg(features.feature) )::varchar FROM ( SELECT jsonb_build_object( 'type', 'Feature', 'id', id, 'geometry', ST_AsGeoJSON(geom)::jsonb, 'properties', jsonb_build_object( 'popup', '{description}' || ' [' || '{category.group}' || '-' || '{category.minor_group_1}' || '] #' || inputs.id::varchar, 'status', status ) ) AS feature FROM "{schema}"."{table}" AS inputs ) AS features """ sql_query_for_geojson = sql_query_for_geojson_fast category = registry.categories.loc[cls.category_name] session: AsyncSession async with db_session() as session: query = sql_query_for_geojson.format( model=cls, category=category, schema=cls.__table__.schema, #table=cls.__tablename__, # FIXME: should be __tablename__, but see SQLModel.__tablename__ which use lower(__name__) table=cls.__name__, description=category.description, ) result = await session.exec(text(query)) return result.first()[0] def to_row(self): """ Get a list of attributes, typically used for exporting in CSV :return: list of attributes """ return [ self.id, self.shapely_geom.x, self.shapely_geom.y, self.shapely_geom.z, self.category_name, self.surveyor, self.equipment, self.date.isoformat(), self.accuracy.name, self.status, self.project.name, self.orig_id ] class GeoModelNoStatus(Model): """ Base class for all geo models """ id: int | None = Field(default=None, primary_key=True) description: ClassVar[str] = '' attribution: ClassVar[str | None] = None can_get_features_as_df: ClassVar[bool] = True """ can_get_features_as_df indicates that the model is ready to get GeoJson using GeoDataframe If False, switch back to gino and dict based conversion using get_features_in_bulk_gino and record.get_feature_as_dict (DEPRECATED) """ cache_enabled: ClassVar[bool] = True """ cache_enabled indicated that the model is OK with the caching mechanism of geojson stores. The cache is time-stamped with DB triggers on modification, so it's safe unless the model fetches other data than from its own table, eg. some status for styling. See gisaf.redis_tools and geoapi.gj_feature for the implementation details of the cache. """ get_gdf_with_related: ClassVar[bool] = False """ get_gdf_with_related indicates that get_df (thus, get_geo_df and the geoJson API for the map online) gets related models (1-n relations, as defined with selectinload) by default. It can be overridden with the with_related parameter when calling get_df. """ z_index: ClassVar[int] = 450 # Field(450, alias='zIndex') """ z-index for the leaflet layer. Should be between 400 and 500. """ icon: ClassVar[str | None] = None """ Icon for the model, used for normal web interface (ie. except the map) """ symbol: ClassVar[str | None] = None """ Icon for the model, used in the map (mapbox) """ style: ClassVar[str] = '' """ Style for the model, used in the map, etc """ _join_with: ClassVar[dict[str, Any]] = { } """ Fields to join when getching items using get_features. """ hidden: ClassVar[bool] = False """ This model should be hidden from the menu """ def __str__(self): return self.caption async def get_geo_info(self) -> list[InfoItem]: """ Geographical info """ return [] async def get_survey_info(self) -> list[InfoItem]: """ Quality info: project, source, accuracy... """ return [] async def get_info(self) -> list[InfoItem]: """ Model specific info """ return [] # @classmethod # def get_join_with(cls): # if hasattr(cls, 'dyn_join_with'): # return cls.dyn_join_with() # else: # return cls._join_with #@classmethod #def get_style(cls): # """ # Style for the layer in Leaflet (css). # :return: # """ # return { # # 'color': "#4B1BDE", # 'weight': '1' # } @classmethod def get_options(cls): """ Options for the layer in Leaflet. For example: {'weight': 0, 'fillOpacity': 0.1} :return: """ return { } @property def caption(self) -> str: """ Subclass me! :return: str """ return 'Some geometry' @classmethod async def get_popup(cls, df): return cls.__name__ + ': ' + df.index.astype('U') @classmethod async def get_properties(cls, df): return {} async def get_tags(self) -> list[Tag]: from gisaf.models.tags import Tags async with db_session() as session: query = select(Tags.tags).where(Tags.store == self.get_store_name(), Tags.ref_id == self.id) data = await session.exec(query) tags = data.one_or_none() if tags is not None: return [Tag(key=key, value=value) for key, value in tags.items()] else: return [] @cached_property def shapely_geom(self): return from_wkb(self.geom.data) # @property # def shapely_geom(self): # if not hasattr(self, '_shapely_geom'): # if isinstance(self.geom, WKBElement): # bytes = self.geom.data # if bytes: # self._shapely_geom = wkb.loads(bytes) # else: # self._shapely_geom = None # else: # self._shapely_geom = None # return self._shapely_geom def get_bgColor(self): """ Get the background color of the element on the map. Subclass me! :return: str """ return '' async def get_feature_as_dict(self, simplify_tolerance=None, reproject=False, css_class_prefix=''): """ Get the parameters of this object (feature) :param css_class_prefix: for leaflet only :return: """ logger.warn(f'{self} use get_feature_as_dict: please DEPRECATE in favor of get_geo_df') if hasattr(self, 'get_feature_properties'): properties = await self.get_feature_properties() else: properties = { 'popup': self.caption } if reproject: shapely_geom = transform(reproject_func, self.shapely_geom) else: shapely_geom = self.shapely_geom if simplify_tolerance: shapely_geom = shapely_geom.simplify( simplify_tolerance / conf.geo.simplify_geom_factor, preserve_topology=conf.geo.simplify_preserve_topology) if shapely_geom.is_empty: raise NoPoint return { "type": "Feature", 'id': str(self.id), "properties": properties, 'geometry': mapping(shapely_geom) } @classmethod def get_columns_for_shapefile_writer(cls, writer): """ Return the columns and corresponding attribute name :param writer: :return: """ ## Define the attributes columns = cls.__table__.columns cols = [col.name for col in columns if col.type.__class__ in exportable_cols] for col in columns: if col.name in cols: writer.field(col.name, exportable_cols[col.type.__class__]) return cols return OrderedDict([(col, cols_to_attr[col]) for col in cols]) @classmethod async def get_shapefile_writer(cls): """ Get a shapefile writer with all records loaded. Subclasses should implement this. :return: """ raise NotImplementedError @classmethod async def get_shapefile_files(cls): """ Get an actual shapefile with all records. :return: zip file """ writer = await cls.get_shapefile_writer() layer_name = cls.__name__ ## Save the dbf, shp, shx data dbf = BytesIO() writer.saveDbf(dbf) shp = BytesIO() writer.saveShp(shp) shx = BytesIO() writer.saveShx(shx) qml = None qml_rec = await Qml.get(layer_name) if qml_rec and qml_rec.qml: qml = qml_rec.qml srid = cls.geom.expression.type.srid ## Write the .prj file: projection system reference ## Try if there's a file prj_path = (conf.prj_files_dir / str(srid)).with_suffix('.prj') proj_str = None if prj_path.is_file(): with open(prj_path) as proj_file: proj_str = proj_file.read() else: ## Get from the spatial_ref_sys table in the database async with db.acquire(reuse=False) as connection: async with connection.transaction() as tx: proj_rec = await connection.first(f'select srtext from spatial_ref_sys where srid={srid}') if proj_rec and proj_rec[0]: proj_str = proj_rec[0] return dbf, shp, shx, qml, proj_str @classmethod async def get_shapefile(cls): layer_name = cls.__name__ dbf, shp, shx, qml, proj_str = await cls.get_shapefile_files() ## Zip and return data zip_file = BytesIO() with ZipFile(zip_file, 'w') as zip: zip.writestr('{}.dbf'.format(layer_name), dbf.getvalue()) zip.writestr('{}.shp'.format(layer_name), shp.getvalue()) zip.writestr('{}.shx'.format(layer_name), shx.getvalue()) if qml: zip.writestr('{}.qml'.format(layer_name), qml) if proj_str: zip.writestr('{}.prj'.format(layer_name), proj_str) zip_file.seek(0) return zip_file @classmethod async def get_maplibre_style(cls) -> MaplibreStyle | None: """ Get the mapbox style (paint, layout, attribution...) """ ## If the model is from survey, it should have a category, which has a style ## Get from database style: MaplibreStyle | None async with db_session() as session: if hasattr(cls, 'category'): category = await session.get(Category, cls.get_store_name()) if category: style = MaplibreStyle( layout=category.mapbox_layout, paint=category.mapbox_paint, ) else: style = None else: qml = await session.get(Qml, cls.__name__) if qml: style = MaplibreStyle( layout=qml.mapbox_layout, paint=qml.mapbox_paint, attribution=qml.attr, ) else: style = None return style @classmethod async def get_features_attrs(cls, simplify_tolerance): """ Get the attributes and kwargs to pass to get_features for this model :param simplify_tolerance: float :return: tuple(dict, dict) """ attrs = {"type": "FeatureCollection"} logger.warn(f'{cls} use get_features_attrs: please DEPRECATE in favor of get_geo_df') ## Get style, with priority order ## If the model is from survey ("auto_model"), it should have a category, which has a style category = getattr(cls, 'category', None) features_kwargs = {} if not issubclass(cls, GeoPointModel): if hasattr(cls, 'simplify') and cls.simplify: features_kwargs['simplify_tolerance'] = float(cls.simplify) else: features_kwargs['simplify_tolerance'] = simplify_tolerance features_kwargs['reproject'] = conf.geojson_srid != conf.srid ## If the model is from survey, it should have a category, which has a style ## Get from database ## TODO: update categories dynamically in the registry if hasattr(cls, 'category'): category = await Category.get(cls.category.name) else: category = None features_kwargs['reproject'] = conf.geojson_srid != conf.srid ## Get options, if the model has defined the get_options function if hasattr(cls, 'get_options'): attrs["options"] = cls.get_options() if hasattr(cls, 'get_popup_dynamic'): attrs["has_popups"] = True return attrs, features_kwargs @classmethod async def get_features_in_bulk_gino(cls, **kwargs): """ Get the features, using joins defined by _join_with, or, in priority, get_join_with(). See Gino's doc: relationship.html (specially loaders) :param kwargs: :return: """ features = [] async with db.acquire(reuse=True) as connection: async with connection.transaction() as tx: join = cls.get_join_with() query = cls.load(**join) #async for record in connection.iterate(query): for record in await connection.all(query): if record.geom is not None: try: feature = await record.get_feature_as_dict(**kwargs) except NoPoint: pass except Exception as err: logger.exception(err) else: features.append(feature) return features @classmethod async def get_geos_df(cls, where=None, crs=None, reproject=False, filter_columns=False, with_popup=False, drop_wkb=True, **kwargs): if not gpd.options.use_pygeos: logger.warn(f'Using get_geos_df for {cls} but gpd.options.use_pygeos not set') if not crs: crs = conf.crs.geojson df = await cls.get_df(where=where, **kwargs) df.set_index('id', inplace=True) df.rename(columns={'geom': 'wkb'}, inplace=True) df['geometry'] = shapely.from_wkb(df['wkb']) if drop_wkb: df = df.drop(columns=['wkb']) return gpd.GeoDataFrame(df, geometry='geometry', crs=crs) @classmethod async def get_geo_df(cls, where=None, crs=None, reproject=False, filter_columns=False, with_popup=False, **kwargs) -> gpd.GeoDataFrame: """ Return a GeoPandas GeoDataFrame of all records :param where: where clause for the query (eg. Model.attr=='foo') :param crs: coordinate system (eg. 'epsg:4326') (priority over the reproject parameter) :param reproject: should reproject to conf.srid_for_proj :return: """ gdf = await cls.get_gdf(where=where, **kwargs) # df.dropna(subset=['geom'], inplace=True) # df.set_index('id', inplace=True) # df.sort_index(inplace=True) # df_clean = df[~df.geom.isna()] # ## Drop coordinates # df_clean.drop(columns=set(df_clean.columns).intersection(['ST_X_1', 'ST_Y_1', 'ST_Z_1']), # inplace=True) # if not crs: # crs = conf.crs.geojson # ## XXX: is it right? Waiting for application.py to remove pygeos support to know more. # # if getattr(gpd.options, 'use_pygeos', False): # # geometry = shapely.from_wkb(df_clean.geom) # # else: # # geometry = [wkb.loads(geom) for geom in df_clean.geom] # gdf = gpd.GeoDataFrame( # df_clean.drop('geom', axis=1), # crs=crs, # geometry=geometry # ) # if hasattr(cls, 'simplify') and cls.simplify: # #shapely_geom = shapely_geom.simplify( # simplify_tolerance / conf.geo.simplify_geom_factor, # preserve_topology=conf.geo.simplify_preserve_topology) # gdf['geometry'] = gdf['geometry'].simplify( # float(cls.simplify) / conf.geo.simplify_geom_factor, # preserve_topology=conf.geo.simplify_preserve_topology) if reproject: gdf.to_crs(crs=conf.crs.for_proj, inplace=True) # ## Filter out columns if filter_columns: gdf.drop(columns=set(gdf.columns).intersection(cls.filtered_columns_on_map), inplace=True) # if with_popup: # gdf['popup'] = await cls.get_popup(gdf) return gdf @classmethod def get_attachment_dir(cls): return cls.__table__.fullname @classmethod def get_attachment_base_dir(cls): return Path(conf.attachments.base_dir) / cls.get_attachment_dir() class GeoModel(GeoModelNoStatus): status: ClassVar[str] = 'E' """ Status (ISO layers definition) of the layer. E -> Existing. """ class LiveGeoModel(GeoModel): store: ClassVar[str] group: ClassVar[str] ='Live' custom: ClassVar[bool] = True is_live: ClassVar[bool] = True is_db: ClassVar[bool] = False # class Geom(str): # pass class GeoPointModelNoStatus(GeoModelNoStatus): shapefile_model: ClassVar[int] = POINT ## geometry typing, see https://stackoverflow.com/questions/77333100/geoalchemy2-geometry-schema-for-pydantic-fastapi geom: Annotated[str, WKBElement] = Field(sa_type=Geometry('POINT', srid=conf.geo.srid)) icon: ClassVar[str | None] = None mapbox_type: ClassVar[str] = 'symbol' base_gis_type: ClassVar[str] = 'Point' symbol: ClassVar[str] = '\ue32b' @property def latitude(self): return self.shapely_geom.y @property def longitude(self): return self.shapely_geom.x @property def elevation(self): return self.shapely_geom.z @property def caption(self) -> str: """ Return user friendly name (used in menu, etc) :return: """ return f'{self.__class__.__name__}: {self.id:d}' def get_coords(self): return (self.shapely_geom.x, self.shapely_geom.y) @classmethod async def get_shapefile_writer(cls): writer = ShapeFileWriter(cls.shapefile_model) cols = cls.get_columns_for_shapefile_writer(writer) query = cls.query.where(cls.geom != None) async with db.acquire(reuse=False) as connection: async with connection.transaction() as tx: ## Add the objects async for obj in connection.iterate(query): writer.point(*obj.get_coords(), shapeType=cls.shapefile_model) attrs = [] for attr_name in cols: attr = getattr(obj, attr_name) if isinstance(attr, date): ## Date in a format approriate for shapefile attrs.append('{:%Y%m%d}'.format(attr)) else: attrs.append(attr) writer.record(*attrs) return writer async def get_geo_info(self) -> list[InfoItem]: g = shapely.from_wkb(self.geom.data) # type: ignore return [ InfoItem(key='longitude', value=g.x), # type: ignore InfoItem(key='latitude', value=g.y), # type: ignore ] class GeoPointModel(GeoPointModelNoStatus, GeoModel): ... class GeoPointZModel(GeoPointModel): geom: Annotated[str, WKBElement] = Field( sa_type=Geometry('POINTZ', dimension=3, srid=conf.geo.srid)) shapefile_model: ClassVar[int] = POINTZ def get_coords(self): return (self.shapely_geom.x, self.shapely_geom.y, self.shapely_geom.z) async def get_geo_info(self) -> list[InfoItem]: info = await super(GeoPointZModel, self).get_geo_info() info.append( InfoItem(key='elevation (m)', value='{:.2f}'.format(self.shapely_geom.z)) ) return info class GeoPointMModel(GeoPointZModel): shapefile_model: ClassVar[int] = POINTZ geom: Annotated[str, WKBElement] = Field( sa_type=Geometry('POINTZ', dimension=3, srid=conf.geo.srid)) class GeoLineModel(GeoModel): shapefile_model: ClassVar[int] = POLYLINE geom: Annotated[str, WKBElement] = Field( sa_type=Geometry('LINESTRING', srid=conf.geo.srid)) mapbox_type: ClassVar[str] = 'line' base_gis_type: ClassVar[str] = 'Line' @property def length(self): return transform(reproject_func, self.shapely_geom).length @property def caption(self) -> str: """ Return user friendly name (used in menu, etc) :return: """ return f'{self.__class__.__name__}: {self.id:d}' @classmethod async def get_shapefile_writer(cls): writer = ShapeFileWriter(cls.shapefile_model) cols = cls.get_columns_for_shapefile_writer(writer) query = cls.query.where(cls.geom != None) async with db.acquire(reuse=False) as connection: async with connection.transaction() as tx: ## Add the objects async for obj in connection.iterate(query): try: writer.line(parts=[[point for point in obj.get_points()]]) except NoPoint: pass attrs = [] for attr_name in cols: attr = getattr(obj, attr_name) if isinstance(attr, date): ## Date in a format approriate for shapefile attrs.append('{:%Y%m%d}'.format(attr)) else: attrs.append(attr) writer.record(*attrs) return writer def get_points(self): """ Get a list of points :return: """ if isinstance(self.geom, None.__class__): raise NoPoint points = wkb.loads(self.geom.data) return zip(points.coords.xy[0], points.coords.xy[1]) async def get_geo_info(self) -> list[InfoItem]: bounds = self.shapely_geom.bounds return [ InfoItem(key='longitude', value='{:.6f} - {:.6f}'.format(bounds[0], bounds[2])), InfoItem(key='latitude', value='{:.6f} - {:.6f}'.format(bounds[1], bounds[3])), InfoItem(key='length (m)', value='{self.length:.2f}'.format(self=self)) ] class GeoLineModelZ(GeoLineModel): shapefile_model: ClassVar[int] = POLYLINEZ geom: Annotated[str, WKBElement] = Field( sa_type=Geometry('LINESTRINGZ', dimension=3, srid=conf.geo.srid)) async def get_geo_info(self) -> list[InfoItem]: info = await super(GeoLineModelZ, self).get_geo_info() elevations = [cc[2] for cc in self.shapely_geom.coords] elev_min = min(elevations) elev_max = max(elevations) if elev_min == elev_max: info.append(InfoItem(key='elevation (m)', value='{:.2f}'.format(elev_min))) else: info.append(InfoItem(key='elevation (m)', value='{:.2f} - {:.2f}'.format(elev_min, elev_max))) return info class GeoPolygonModel(GeoModel): shapefile_model: ClassVar[int] = POLYGON geom: Annotated[str, WKBElement] = Field( sa_type=Geometry('POLYGON', srid=conf.geo.srid)) mapbox_type: ClassVar[str] = 'fill' base_gis_type: ClassVar[str] = 'Polygon' @property def area(self): return transform(reproject_func, self.shapely_geom).area @property def length(self): return transform(reproject_func, self.shapely_geom).length @property def caption(self) -> str: """ Return user friendly name (used in menu, etc) :return: """ return f'{self.__class__.__name__}: {self.id:d}' @classmethod async def get_shapefile_writer(cls): writer = ShapeFileWriter(cls.shapefile_model) cols = cls.get_columns_for_shapefile_writer(writer) query = cls.query.where(cls.geom != None) async with db.acquire(reuse=False) as connection: async with connection.transaction() as tx: ## Add the objects async for obj in connection.iterate(query): try: writer.poly(parts=[[point for point in obj.get_points()]]) except NoPoint: pass attrs = [] for attr_name in cols: attr = getattr(obj, attr_name) if isinstance(attr, date): ## Date in a format approriate for shapefile attrs.append('{:%Y%m%d}'.format(attr)) else: attrs.append(attr) writer.record(*attrs) return writer def get_points(self): """ Get a list of points :return: """ if isinstance(self.geom, None.__class__): raise NoPoint points = wkb.loads(self.geom.data) return zip(points.exterior.coords.xy[0], points.exterior.coords.xy[1]) async def get_geo_info(self) -> list[InfoItem]: info = [] area = self.area bounds = self.shapely_geom.bounds info.append(InfoItem(key='longitude', value='{:.6f} - {:.6f}'.format(bounds[0], bounds[2]))) info.append(InfoItem(key='latitude', value='{:.6f} - {:.6f}'.format(bounds[1], bounds[3]))) info.append(InfoItem(key='length (m)', value='{:.2f}'.format(self.length))) info.append(InfoItem(key='area (sq. m)', value='{:.1f} sq. m'.format(area))) info.append(InfoItem(key='area (ha)', value='{:.1f} ha'.format(area / 10000))) info.append(InfoItem(key='area (acre)', value='{:.1f} acres'.format(area / 4046.85643005078874))) return info class GeoPolygonModelZ(GeoPolygonModel): shapefile_model: ClassVar[int] = POLYGONZ geom: Annotated[str, WKBElement] = Field( sa_type=Geometry('POLYGONZ', dimension=3, srid=conf.geo.srid)) async def get_geo_info(self) -> list[InfoItem]: info = await super(GeoPolygonModelZ, self).get_geo_info() if hasattr(self.shapely_geom, 'exterior'): coords = self.shapely_geom.exterior.coords else: coords = self.shapely_geom.coords elevations = [coord[2] for coord in coords] elev_min = min(elevations) elev_max = max(elevations) if elev_min == elev_max: info.append(InfoItem(key='elevation (m)', value='{:.2f}'.format(elev_min))) else: info.append(InfoItem(key='elevation (m)', value='{:.2f} - {:.2f}'.format(elev_min, elev_max))) return info class GeoPointSurveyModel(SurveyModel, GeoPointMModel): ## raw_model is set in category_models_maker.make_category_models raw_model: ClassVar['RawSurveyBaseModel'] class LineWorkSurveyModel(SurveyModel): ## raw_model is set in category_models_maker.make_category_models raw_model: ClassVar['RawSurveyBaseModel'] def match_raw_points(self): reprojected_geom = transform(reproject_func, self.shapely_geom) reprojected_geom_geoalchemy = from_shape(reprojected_geom, conf.raw_survey_srid) raw_survey_points_project = self.raw_model.query.filter( self.raw_model.project_id==self.project_id) query = raw_survey_points_project.filter( func.ST_Distance(reprojected_geom_geoalchemy, self.raw_model.geom) < conf.epsilon ) return query.all() class GeoLineSurveyModel(LineWorkSurveyModel, GeoLineModelZ): pass class GeoPolygonSurveyModel(LineWorkSurveyModel, GeoPolygonModelZ): pass class RawSurveyBaseModel(BaseSurveyModel, GeoPointModelNoStatus): """ Abstract base class for category based raw survey point models """ __table_args__ = raw_survey.table_args geom: Annotated[str, WKBElement] = Field( sa_type=Geometry('POINTZ', dimension=3, srid=conf.geo.raw_survey.srid), ) # type: ignore status: str = Field(sa_type=String(1)) # type: ignore ## store_name is set in category_models_maker.make_category_models store_name: ClassVar[str | None] = None @classmethod async def get_geo_df(cls, *args, **kwargs): return await super().get_geo_df(crs=conf.raw_survey.spatial_sys_ref, *args, **kwargs) class PlottableModel(Model): """ Model that defines a value for each record, therefore that can be used in plots (eg. bar graphs). Subclasses might want to define: * a unit (default: m) * a resampling method, to be applied to all values (unless values is a dict, see below) * attributes with numerical types, and: * a tuple called values that defines what are these columns to be used (the first one being the default) * OR an ordereed dict of value => resampling method """ float_format: ClassVar[str] = '%.1f' values: ClassVar[list[dict[str, str]]] = [] @classmethod async def get_as_dataframe(cls, item, where=None, **kwargs): """ Get a dataframe for the data. It's quite generic, so subclasses might want to subclass this. """ where_ = cls.ref_id == item.id if where is not None: where_ = and_(where_, where) return await cls.get_df(where=where_, **kwargs) class TimePlottableModel(PlottableModel): time: datetime @classmethod async def get_as_dataframe(cls, item, with_only_columns=None, **kwargs): """ Get the data as a time-indexed dataframe """ if with_only_columns is None: with_only_columns = [val['name'] for val in cls.values] if 'time' not in with_only_columns: with_only_columns.insert(0, 'time') df = await super().get_as_dataframe(item=item, with_only_columns=with_only_columns, **kwargs) ## Set time as index df.set_index('time', drop=True, inplace=True) df.sort_index(inplace=True) return df