feature-info: migrate to pydantic, fix live

This commit is contained in:
phil 2024-01-06 12:29:48 +05:30
parent 71cb491617
commit e3ed311390
5 changed files with 203 additions and 172 deletions

View file

@ -23,11 +23,11 @@ from gisaf.security import (
authenticate_user, get_current_user, create_access_token, authenticate_user, get_current_user, create_access_token,
) )
from gisaf.registry import registry, NotInRegistry from gisaf.registry import registry, NotInRegistry
from gisaf.redis_tools import store as redis_store
from gisaf.custom_store_base import BaseStore from gisaf.custom_store_base import BaseStore
from gisaf.models.to_migrate import ( from gisaf.models.to_migrate import (
FeatureInfo, InfoItem, Attachment, InfoCategory FeatureInfo, InfoItem, Attachment, InfoCategory
) )
from gisaf.live_utils import get_live_feature_info
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -129,113 +129,16 @@ async def get_feature_info(
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
store_record = registry.stores.loc[store] store_record = registry.stores.loc[store]
model = store_record.model model = store_record.model
if store_record.is_live: if store_record.is_live:
item = await redis_store.get_feature_info(store, id) feature_info = await get_live_feature_info(store, id)
geom = item.geometry
## Reproject to default coordinate system (WGS84),
## XXX: only for shapely geometries
if not isinstance(geom, pygeos.Geometry):
geom_reprojected = transform(reproject_func, geom)
geoInfoItems = OrderedDict()
if isinstance(geom, Point):
geoInfoItems['longitude'] = f'{geom.x:.6f}'
geoInfoItems['latitude'] = f'{geom.y:.6f}'
if geom.has_z:
geoInfoItems['elevation (m)'] = f'{geom.z:.6f}'
elif isinstance(geom, (LineString, MultiLineString)):
bounds = geom.bounds
geoInfoItems['longitude'] = f'{bounds[0]:.6f} - {bounds[2]:.6f}'
geoInfoItems['latitude'] = f'{bounds[1]:.6f} - {bounds[3]:.6f}'
geoInfoItems['length (m)'] = f'{geom_reprojected.length:.2f}'
## TODO: elevation for MultiLineString
if geom.has_z and not isinstance(geom, MultiLineString):
elevations = [cc[2] for cc in geom.coords]
elev_min = min(elevations)
elev_max = max(elevations)
if elev_min == elev_max:
geoInfoItems['elevation (m)'] = f'{elev_min:.2f}'
else:
geoInfoItems['elevation (m)'] = f'{elev_min:.2f} - {elev_max:.2f}'
elif isinstance(geom, (Polygon, MultiPolygon)):
area = geom_reprojected.area
bounds = geom.bounds
geoInfoItems['longitude'] = f'{bounds[0]:.6f} - {bounds[2]:.6f}'
geoInfoItems['latitude'] = f'{bounds[1]:.6f} - {bounds[3]:.6f}'
geoInfoItems['area (sq. m)'] = f'{area:.1f} sq. m'
geoInfoItems['area (ha)'] = f'{area / 10000:.1f} ha'
geoInfoItems['area (acre)'] = f'{area / 4046.85643005078874:.1f} acres'
## TODO: elevation for MultiPolygon
if geom.has_z and not isinstance(geom, MultiPolygon):
if hasattr(geom, 'exterior'):
coords = geom.exterior.coords
else:
coords = geom.coords
elevations = [coord[2] for coord in coords]
elev_min = min(elevations)
elev_max = max(elevations)
if elev_min == elev_max:
geoInfoItems['elevation (m)'] = f'{elev_min:.2f}'
else:
geoInfoItems['elevation (m)'] = f'{elev_min:.2f} - {elev_max:.2f}'
feature_info_dict = {
'itemName': item.get('popup', f'Live: {store} #{id}'),
'geoInfoItems': geoInfoItems,
'surveyInfoItems': {
'Note': 'Live layers do not have survey info',
},
'infoItems': dict(item.drop(set(item.keys()).intersection(('geometry', 'popup')))),
'tags': {},
}
elif issubclass(model, BaseStore): elif issubclass(model, BaseStore):
feature_info_dict = await model.get_item_params(id) feature_info = await model.get_item_params(id)
else: else:
## Not a live layer ## A layer in the database
try: try:
feature_info_dict = await registry.get_model_id_params(model, int(id)) feature_info = await registry.get_model_id_params(model, int(id))
except NotInRegistry: except NotInRegistry:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
feature_info = FeatureInfo(
id=id,
itemName=feature_info_dict['itemName'],
geoInfoItems=[InfoItem(key=k, value=v)
for k, v in feature_info_dict['geoInfoItems'].items()],
surveyInfoItems=[InfoItem(key=k, value=v)
for k, v in feature_info_dict['surveyInfoItems'].items()],
infoItems=[InfoItem(key=k, value=v)
for k, v in feature_info_dict['infoItems'].items()],
tags=[InfoItem(key=k, value=v)
for k, v in feature_info_dict['tags'].items()],
graph=feature_info_dict.get('graph'),
)
if 'files' in feature_info_dict and feature_info_dict['files'] is not None:
feature_info.files = [
Attachment(name=k, path=v)
for k, v in feature_info_dict['files'].items()
]
else:
feature_info.files = []
if 'images' in feature_info_dict and feature_info_dict['images'] is not None:
feature_info.images = [
Attachment(name=k, path=v)
for k, v in feature_info_dict['images'].items()
]
else:
feature_info.images = []
if 'categorized_info_items' in feature_info_dict and feature_info_dict['categorized_info_items'] != None:
feature_info.categorizedInfoItems = [
InfoCategory(
name=name,
infoItems=[
InfoItem(key=k, value=v)
for k, v in info_items.items()
]
)
for name, info_items in feature_info_dict['categorized_info_items'].items()
]
feature_info.externalRecordUrl = feature_info_dict.get('externalRecordUrl')
return feature_info return feature_info
# @api.get("/user-role") # @api.get("/user-role")

84
src/gisaf/live_utils.py Normal file
View file

@ -0,0 +1,84 @@
from shapely.ops import transform # type: ignore
from shapely.geometry import (
Point, LineString, MultiLineString, Polygon, MultiPolygon)
from gisaf.redis_tools import store as redis_store
from gisaf.models.geo_models_base import reproject_func
from gisaf.models.to_migrate import (
FeatureInfo, InfoItem, Attachment, InfoCategory
)
async def get_live_feature_info(store: str, id: str) -> FeatureInfo:
item = await redis_store.get_feature_info(store, id)
geom = item.geometry
## Reproject to projected coordinate system
geom_reprojected = transform(reproject_func, geom)
geoInfoItems: list[InfoItem] = []
if isinstance(geom, Point):
geoInfoItems.append(InfoItem(key='longitude',
value=f'{geom.x:.6f}'))
geoInfoItems.append(InfoItem(key='latitude',
value=f'{geom.y:.6f}'))
if geom.has_z:
geoInfoItems.append(InfoItem(key='elevation (m)',
value=f'{geom.z:.6f}'))
elif isinstance(geom, (LineString, MultiLineString)):
bounds = geom.bounds
geoInfoItems.append(InfoItem(key='longitude',
value=f'{bounds[0]:.6f} - {bounds[2]:.6f}'))
geoInfoItems.append(InfoItem(key='latitude',
value=f'{bounds[1]:.6f} - {bounds[3]:.6f}'))
geoInfoItems.append(InfoItem(key='length (m)',
value=f'{geom_reprojected.length:.2f}'))
## TODO: elevation for MultiLineString
if geom.has_z and not isinstance(geom, MultiLineString):
elevations = [cc[2] for cc in geom.coords]
elev_min = min(elevations)
elev_max = max(elevations)
if elev_min == elev_max:
geoInfoItems.append(InfoItem(key='elevation (m)',
value=f'{elev_min:.2f}'))
else:
geoInfoItems.append(InfoItem(key='elevation (m)',
value=f'{elev_min:.2f} - {elev_max:.2f}'))
elif isinstance(geom, (Polygon, MultiPolygon)):
area = geom_reprojected.area
bounds = geom.bounds
geoInfoItems.append(InfoItem(key='longitude',
value=f'{bounds[0]:.6f} - {bounds[2]:.6f}'))
geoInfoItems.append(InfoItem(key='latitude',
value=f'{bounds[1]:.6f} - {bounds[3]:.6f}'))
geoInfoItems.append(InfoItem(key='area (sq. m)',
value=f'{area:.1f} sq. m'))
geoInfoItems.append(InfoItem(key='area (ha)',
value=f'{area / 10000:.1f} ha'))
geoInfoItems.append(InfoItem(key='area (acre)',
value=f'{area / 4046.85643005078874:.1f} acres'))
## TODO: elevation for MultiPolygon
if geom.has_z and not isinstance(geom, MultiPolygon):
if hasattr(geom, 'exterior'):
coords = geom.exterior.coords
else:
coords = geom.coords
elevations = [coord[2] for coord in coords]
elev_min = min(elevations)
elev_max = max(elevations)
if elev_min == elev_max:
geoInfoItems.append(InfoItem(key='elevation (m)',
value=f'{elev_min:.2f}'))
else:
geoInfoItems.append(InfoItem(key='elevation (m)',
value=f'{elev_min:.2f} - {elev_max:.2f}'))
return FeatureInfo(
id=id,
itemName=item.get('popup', f'Live: {store} #{id}'),
geoInfoItems=geoInfoItems,
surveyInfoItems=[InfoItem(key='Note',
value='Live layers do not have survey info')],
infoItems=[
InfoItem(key=key, value=value)
for key, value in item.items()
if key not in ('geometry', 'popup')
]
)

View file

@ -39,6 +39,7 @@ from gisaf.models.models_base import Model
from gisaf.models.metadata import gisaf_survey, gisaf_admin, survey from gisaf.models.metadata import gisaf_survey, gisaf_admin, survey
from gisaf.models.misc import Qml from gisaf.models.misc import Qml
from gisaf.models.category import Category from gisaf.models.category import Category
from gisaf.models.to_migrate import InfoItem
# from gisaf.models.survey import Equipment, Surveyor, Accuracy # from gisaf.models.survey import Equipment, Surveyor, Accuracy
# from gisaf.models.project import Project # from gisaf.models.project import Project
@ -112,27 +113,35 @@ class BaseSurveyModel(BaseModel):
# info = await super(BaseSurveyModel, self).get_geo_info() # info = await super(BaseSurveyModel, self).get_geo_info()
# return info # return info
async def get_survey_info(self): async def get_survey_info(self) -> list[InfoItem]:
info = await super(BaseSurveyModel, self).get_survey_info() info = await super(BaseSurveyModel, self).get_survey_info()
if self.category: if self.category:
info['ISO layer name'] = self.iso_layer_name info.append(InfoItem(key='ISO layer name',
info['survey category'] = '{} ({})'.format(self.category.description, self.category.name) value=self.iso_layer_name))
info.append(InfoItem(key='survey category',
value=f'{self.category.description} ({self.category.name})'))
if self.project_id: if self.project_id:
info['project'] = self.project.name info.append(InfoItem(key='project',
value=self.project.name))
if self.srvyr_id: if self.srvyr_id:
info['surveyor'] = self.surveyor.name info.append(InfoItem(key='surveyor',
value=self.surveyor.name))
if self.equip_id: if self.equip_id:
info['survey equipment'] = self.equipment.name info.append(InfoItem(key='survey equipment',
value=self.equipment.name))
if self.accur_id: if self.accur_id:
info['survey accuracy'] = self.accuracy.name info.append(InfoItem(key='survey accuracy',
value=self.accuracy.name))
if self.date: if self.date:
info['survey date'] = self.date.strftime(LOCALE_DATE_FORMAT) info.append(InfoItem(key='survey date',
value=self.date.strftime(LOCALE_DATE_FORMAT)))
if self.orig_id: if self.orig_id:
info['original id'] = self.orig_id info.append(InfoItem(key='original id',
value=self.orig_id))
return info return info
@property @property
def iso_layer_name(self): def iso_layer_name(self) -> str:
""" """
The ISO layer name, built on the category and status The ISO layer name, built on the category and status
""" """
@ -358,17 +367,17 @@ class GeoModelNoStatus(Model):
def __str__(self): def __str__(self):
return self.caption return self.caption
async def get_geo_info(self): async def get_geo_info(self) -> list[InfoItem]:
""" """
Geographical info Geographical info
""" """
return {} return []
async def get_survey_info(self): async def get_survey_info(self) -> list[InfoItem]:
""" """
Quality info: project, source, accuracy... Quality info: project, source, accuracy...
""" """
return OrderedDict() return []
async def get_info(self) -> dict[str, str]: async def get_info(self) -> dict[str, str]:
""" """
@ -420,13 +429,18 @@ class GeoModelNoStatus(Model):
async def get_properties(cls, df): async def get_properties(cls, df):
return {} return {}
async def get_tags(self): async def get_tags(self) -> list[InfoItem]:
from gisaf.models.tags import Tags from gisaf.models.tags import Tags
async with db_session() as session: async with db_session() as session:
query = select(Tags.tags).where(Tags.store == self.get_store_name(), query = select(Tags.tags).where(Tags.store == self.get_store_name(),
Tags.ref_id == self.id) Tags.ref_id == self.id)
tags = await session.exec(query) data = await session.exec(query)
return tags.one_or_none() or {} tags = data.one_or_none()
if tags is not None:
return [InfoItem(key=key, value=value)
for key, value in tags.items()]
else:
return []
@cached_property @cached_property
def shapely_geom(self): def shapely_geom(self):
@ -453,7 +467,8 @@ class GeoModelNoStatus(Model):
""" """
return '' return ''
async def get_feature_as_dict(self, simplify_tolerance=None, reproject=False, css_class_prefix=''): async def get_feature_as_dict(self, simplify_tolerance=None,
reproject=False, css_class_prefix=''):
""" """
Get the parameters of this object (feature) Get the parameters of this object (feature)
:param css_class_prefix: for leaflet only :param css_class_prefix: for leaflet only
@ -694,7 +709,8 @@ class GeoModelNoStatus(Model):
@classmethod @classmethod
async def get_geo_df(cls, where=None, crs=None, reproject=False, async def get_geo_df(cls, where=None, crs=None, reproject=False,
filter_columns=False, with_popup=False, **kwargs) -> gpd.GeoDataFrame: filter_columns=False, with_popup=False,
**kwargs) -> gpd.GeoDataFrame:
""" """
Return a GeoPandas GeoDataFrame of all records Return a GeoPandas GeoDataFrame of all records
:param where: where clause for the query (eg. Model.attr=='foo') :param where: where clause for the query (eg. Model.attr=='foo')
@ -827,38 +843,42 @@ class GeoPointModelNoStatus(GeoModelNoStatus):
return writer return writer
async def get_geo_info(self): async def get_geo_info(self) -> list[InfoItem]:
info = OrderedDict() return [
if self.shapely_geom: InfoItem(key='longitude', value='{:.6f}.format(self.geom.x)'),
info['longitude'] = '{:.6f}'.format(self.shapely_geom.x) InfoItem(key='latitude', value='{:.6f}.format(self.geom.y)'),
info['latitude'] = '{:.6f}'.format(self.shapely_geom.y) ]
return info
class GeoPointModel(GeoPointModelNoStatus, GeoModel): class GeoPointModel(GeoPointModelNoStatus, GeoModel):
... ...
class GeoPointZModel(GeoPointModel): class GeoPointZModel(GeoPointModel):
geom: Annotated[str, WKBElement] = Field(sa_type=Geometry('POINTZ', dimension=3, srid=conf.geo.srid)) geom: Annotated[str, WKBElement] = Field(
sa_type=Geometry('POINTZ', dimension=3, srid=conf.geo.srid))
shapefile_model: ClassVar[int] = POINTZ shapefile_model: ClassVar[int] = POINTZ
def get_coords(self): def get_coords(self):
return (self.shapely_geom.x, self.shapely_geom.y, self.shapely_geom.z) return (self.shapely_geom.x, self.shapely_geom.y, self.shapely_geom.z)
async def get_geo_info(self): async def get_geo_info(self) -> list[InfoItem]:
info = await super(GeoPointZModel, self).get_geo_info() info = await super(GeoPointZModel, self).get_geo_info()
info['elevation (m)'] = '{:.2f}'.format(self.shapely_geom.z) info.append(
InfoItem(key='elevation (m)', value='{:.2f}'.format(self.shapely_geom.z))
)
return info return info
class GeoPointMModel(GeoPointZModel): class GeoPointMModel(GeoPointZModel):
shapefile_model: ClassVar[int] = POINTZ shapefile_model: ClassVar[int] = POINTZ
geom: Annotated[str, WKBElement] = Field(sa_type=Geometry('POINTZ', dimension=3, srid=conf.geo.srid)) geom: Annotated[str, WKBElement] = Field(
sa_type=Geometry('POINTZ', dimension=3, srid=conf.geo.srid))
class GeoLineModel(GeoModel): class GeoLineModel(GeoModel):
shapefile_model: ClassVar[int] = POLYLINE shapefile_model: ClassVar[int] = POLYLINE
geom: Annotated[str, WKBElement] = Field(sa_type=Geometry('LINESTRING', srid=conf.geo.srid)) geom: Annotated[str, WKBElement] = Field(
sa_type=Geometry('LINESTRING', srid=conf.geo.srid))
mapbox_type: ClassVar[str] = 'line' mapbox_type: ClassVar[str] = 'line'
base_gis_type: ClassVar[str] = 'Line' base_gis_type: ClassVar[str] = 'Line'
@ -910,34 +930,38 @@ class GeoLineModel(GeoModel):
points = wkb.loads(self.geom.data) points = wkb.loads(self.geom.data)
return zip(points.coords.xy[0], points.coords.xy[1]) return zip(points.coords.xy[0], points.coords.xy[1])
async def get_geo_info(self): async def get_geo_info(self) -> list[InfoItem]:
info = OrderedDict()
bounds = self.shapely_geom.bounds bounds = self.shapely_geom.bounds
info['longitude'] = '{:.6f} - {:.6f}'.format(bounds[0], bounds[2]) return [
info['latitude'] = '{:.6f} - {:.6f}'.format(bounds[1], bounds[3]) InfoItem(key='longitude', value='{:.6f} - {:.6f}'.format(bounds[0], bounds[2])),
info['length (m)'] = '{self.length:.2f}'.format(self=self) InfoItem(key='latitude', value='{:.6f} - {:.6f}'.format(bounds[1], bounds[3])),
return info InfoItem(key='length (m)', value='{self.length:.2f}'.format(self=self))
]
class GeoLineModelZ(GeoLineModel): class GeoLineModelZ(GeoLineModel):
shapefile_model: ClassVar[int] = POLYLINEZ shapefile_model: ClassVar[int] = POLYLINEZ
geom: Annotated[str, WKBElement] = Field(sa_type=Geometry('LINESTRINGZ', dimension=3, srid=conf.geo.srid)) geom: Annotated[str, WKBElement] = Field(
sa_type=Geometry('LINESTRINGZ', dimension=3, srid=conf.geo.srid))
async def get_geo_info(self): async def get_geo_info(self) -> list[InfoItem]:
info = await super(GeoLineModelZ, self).get_geo_info() info = await super(GeoLineModelZ, self).get_geo_info()
elevations = [cc[2] for cc in self.shapely_geom.coords] elevations = [cc[2] for cc in self.shapely_geom.coords]
elev_min = min(elevations) elev_min = min(elevations)
elev_max = max(elevations) elev_max = max(elevations)
if elev_min == elev_max: if elev_min == elev_max:
info['elevation (m)'] = '{:.2f}'.format(elev_min) info.append(InfoItem(key='elevation (m)',
value='{:.2f}'.format(elev_min)))
else: else:
info['elevation (m)'] = '{:.2f} - {:.2f}'.format(elev_min, elev_max) info.append(InfoItem(key='elevation (m)',
value='{:.2f} - {:.2f}'.format(elev_min, elev_max)))
return info return info
class GeoPolygonModel(GeoModel): class GeoPolygonModel(GeoModel):
shapefile_model: ClassVar[int] = POLYGON shapefile_model: ClassVar[int] = POLYGON
geom: Annotated[str, WKBElement] = Field(sa_type=Geometry('POLYGON', srid=conf.geo.srid)) geom: Annotated[str, WKBElement] = Field(
sa_type=Geometry('POLYGON', srid=conf.geo.srid))
mapbox_type: ClassVar[str] = 'fill' mapbox_type: ClassVar[str] = 'fill'
base_gis_type: ClassVar[str] = 'Polygon' base_gis_type: ClassVar[str] = 'Polygon'
@ -993,24 +1017,31 @@ class GeoPolygonModel(GeoModel):
points = wkb.loads(self.geom.data) points = wkb.loads(self.geom.data)
return zip(points.exterior.coords.xy[0], points.exterior.coords.xy[1]) return zip(points.exterior.coords.xy[0], points.exterior.coords.xy[1])
async def get_geo_info(self): async def get_geo_info(self) -> list[InfoItem]:
info = OrderedDict() info = []
area = self.area area = self.area
bounds = self.shapely_geom.bounds bounds = self.shapely_geom.bounds
info['longitude'] = '{:.6f} - {:.6f}'.format(bounds[0], bounds[2]) info.append(InfoItem(key='longitude',
info['latitude'] = '{:.6f} - {:.6f}'.format(bounds[1], bounds[3]) value='{:.6f} - {:.6f}'.format(bounds[0], bounds[2])))
info['length (m)'] = '{:.2f}'.format(self.length) info.append(InfoItem(key='latitude',
info['area (sq. m)'] = '{:.1f} sq. m'.format(area) value='{:.6f} - {:.6f}'.format(bounds[1], bounds[3])))
info['area (ha)'] = '{:.1f} ha'.format(area / 10000) info.append(InfoItem(key='length (m)',
info['area (acre)'] = '{:.1f} acres'.format(area / 4046.85643005078874) value='{:.2f}'.format(self.length)))
info.append(InfoItem(key='area (sq. m)',
value='{:.1f} sq. m'.format(area)))
info.append(InfoItem(key='area (ha)',
value='{:.1f} ha'.format(area / 10000)))
info.append(InfoItem(key='area (acre)',
value='{:.1f} acres'.format(area / 4046.85643005078874)))
return info return info
class GeoPolygonModelZ(GeoPolygonModel): class GeoPolygonModelZ(GeoPolygonModel):
shapefile_model: ClassVar[int] = POLYGONZ shapefile_model: ClassVar[int] = POLYGONZ
geom: Annotated[str, WKBElement] = Field(sa_type=Geometry('POLYGONZ', dimension=3, srid=conf.geo.srid)) geom: Annotated[str, WKBElement] = Field(
sa_type=Geometry('POLYGONZ', dimension=3, srid=conf.geo.srid))
async def get_geo_info(self): async def get_geo_info(self) -> list[InfoItem]:
info = await super(GeoPolygonModelZ, self).get_geo_info() info = await super(GeoPolygonModelZ, self).get_geo_info()
if hasattr(self.shapely_geom, 'exterior'): if hasattr(self.shapely_geom, 'exterior'):
coords = self.shapely_geom.exterior.coords coords = self.shapely_geom.exterior.coords
@ -1020,9 +1051,11 @@ class GeoPolygonModelZ(GeoPolygonModel):
elev_min = min(elevations) elev_min = min(elevations)
elev_max = max(elevations) elev_max = max(elevations)
if elev_min == elev_max: if elev_min == elev_max:
info['elevation (m)'] = '{:.2f}'.format(elev_min) info.append(InfoItem(key='elevation (m)',
value='{:.2f}'.format(elev_min)))
else: else:
info['elevation (m)'] = '{:.2f} - {:.2f}'.format(elev_min, elev_max) info.append(InfoItem(key='elevation (m)',
value='{:.2f} - {:.2f}'.format(elev_min, elev_max)))
return info return info
@ -1038,7 +1071,8 @@ class LineWorkSurveyModel(SurveyModel):
def match_raw_points(self): def match_raw_points(self):
reprojected_geom = transform(reproject_func, self.shapely_geom) reprojected_geom = transform(reproject_func, self.shapely_geom)
reprojected_geom_geoalchemy = from_shape(reprojected_geom, conf.raw_survey_srid) reprojected_geom_geoalchemy = from_shape(reprojected_geom, conf.raw_survey_srid)
raw_survey_points_project = self.raw_model.query.filter(self.raw_model.project_id==self.project_id) raw_survey_points_project = self.raw_model.query.filter(
self.raw_model.project_id==self.project_id)
query = raw_survey_points_project.filter( query = raw_survey_points_project.filter(
func.ST_Distance(reprojected_geom_geoalchemy, self.raw_model.geom) < conf.epsilon func.ST_Distance(reprojected_geom_geoalchemy, self.raw_model.geom) < conf.epsilon
) )

View file

@ -28,7 +28,7 @@ class DataProvider(BaseModel):
class InfoItem(BaseModel): class InfoItem(BaseModel):
key: str key: str
value: str value: str | float | int
class InfoCategory(BaseModel): class InfoCategory(BaseModel):

View file

@ -37,6 +37,7 @@ from gisaf.models.category import Category, CategoryGroup
from gisaf.database import db_session from gisaf.database import db_session
from gisaf import models from gisaf import models
from gisaf.models.metadata import gisaf_survey, raw_survey, survey from gisaf.models.metadata import gisaf_survey, raw_survey, survey
from gisaf.models.to_migrate import FeatureInfo, InfoCategory
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -334,12 +335,12 @@ class ModelRegistry:
# for category in categories # for category in categories
# if self.raw_survey_models.get(category.table_name)} # if self.raw_survey_models.get(category.table_name)}
async def get_model_id_params(self, model, id): async def get_model_id_params(self, model: SQLModel, id: int) -> FeatureInfo:
""" """
Return the parameters for this item (table name, id), displayed in info pane Return the parameters for this item (table name, id), displayed in info pane
""" """
if not model: if not model:
return {} return
async with db_session() as session: async with db_session() as session:
query = select(model).where(model.id == id).options( query = select(model).where(model.id == id).options(
*(joinedload(jt) for jt in model.selectinload())) *(joinedload(jt) for jt in model.selectinload()))
@ -351,25 +352,34 @@ class ModelRegistry:
# item = await model.load(**model.get_join_with()).query.where(model.id==id).gino.first() # item = await model.load(**model.get_join_with()).query.where(model.id==id).gino.first()
if not item: if not item:
return {} return
resp = {}
resp['itemName'] = item.caption files, images = [], []
resp['geoInfoItems'] = await item.get_geo_info() externalRecordUrl, graph, categorizedInfoItems = (None, ) * 3
resp['surveyInfoItems'] = await item.get_survey_info()
resp['infoItems'] = await item.get_info()
resp['tags'] = await item.get_tags()
if hasattr(item, 'get_categorized_info'): if hasattr(item, 'get_categorized_info'):
resp['categorized_info_items'] = await item.get_categorized_info() categorizedInfoItems = await item.get_categorized_info()
if hasattr(item, 'get_graph'): if hasattr(item, 'get_graph'):
resp['graph'] = item.get_graph() graph = item.get_graph()
if hasattr(item, 'Attachments'): if hasattr(item, 'Attachments'):
if hasattr(item.Attachments, 'files'): if hasattr(item.Attachments, 'files'):
resp['files'] = await item.Attachments.files(item) files = await item.Attachments.files(item)
if hasattr(item.Attachments, 'images'): if hasattr(item.Attachments, 'images'):
resp['images'] = await item.Attachments.images(item) images = await item.Attachments.images(item)
if hasattr(item, 'get_external_record_url'): if hasattr(item, 'get_external_record_url'):
resp['externalRecordUrl'] = item.get_external_record_url() externalRecordUrl = item.get_external_record_url()
return resp
return FeatureInfo(
id=str(item.id),
itemName=item.caption,
geoInfoItems=await item.get_geo_info(),
infoItems=await item.get_survey_info(),
tags=await item.get_tags(),
categorizedInfoItems=categorizedInfoItems,
graph=graph,
files=files,
images=images,
externalRecordUrl=externalRecordUrl,
)
async def make_stores(self): async def make_stores(self):
""" """