diff --git a/doc/TODO.md b/doc/TODO.md new file mode 100644 index 0000000..8c7f18d --- /dev/null +++ b/doc/TODO.md @@ -0,0 +1,5 @@ +# TODO for FastAPI/SQLModel migration + +## Special testing + +models.project.Project.auto_import \ No newline at end of file diff --git a/pdm.lock b/pdm.lock index 16c807c..d6b3d33 100644 --- a/pdm.lock +++ b/pdm.lock @@ -5,7 +5,7 @@ groups = ["default", "dev"] strategy = ["cross_platform"] lock_version = "4.4" -content_hash = "sha256:5f2270d2e84e1fc30449bbcb324864ff25807347a639eb8d6dd070c133fdbe13" +content_hash = "sha256:03b37375a71c7e841ead16f1b6813034b4bfde011ebeb2985958dcba75376c47" [[package]] name = "annotated-types" @@ -540,6 +540,34 @@ files = [ {file = "numpy-1.26.0.tar.gz", hash = "sha256:f93fc78fe8bf15afe2b8d6b6499f1c73953169fad1e9a8dd086cdff3190e7fdf"}, ] +[[package]] +name = "orjson" +version = "3.9.10" +requires_python = ">=3.8" +summary = "Fast, correct Python JSON library supporting dataclasses, datetimes, and numpy" +files = [ + {file = "orjson-3.9.10-cp311-cp311-macosx_10_15_x86_64.macosx_11_0_arm64.macosx_10_15_universal2.whl", hash = "sha256:cff7570d492bcf4b64cc862a6e2fb77edd5e5748ad715f487628f102815165e9"}, + {file = "orjson-3.9.10-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ed8bc367f725dfc5cabeed1ae079d00369900231fbb5a5280cf0736c30e2adf7"}, + {file = "orjson-3.9.10-cp311-cp311-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:c812312847867b6335cfb264772f2a7e85b3b502d3a6b0586aa35e1858528ab1"}, + {file = "orjson-3.9.10-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:9edd2856611e5050004f4722922b7b1cd6268da34102667bd49d2a2b18bafb81"}, + {file = "orjson-3.9.10-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:674eb520f02422546c40401f4efaf8207b5e29e420c17051cddf6c02783ff5ca"}, + {file = "orjson-3.9.10-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1d0dc4310da8b5f6415949bd5ef937e60aeb0eb6b16f95041b5e43e6200821fb"}, + {file = "orjson-3.9.10-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:e99c625b8c95d7741fe057585176b1b8783d46ed4b8932cf98ee145c4facf499"}, + {file = "orjson-3.9.10-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:ec6f18f96b47299c11203edfbdc34e1b69085070d9a3d1f302810cc23ad36bf3"}, + {file = "orjson-3.9.10-cp311-none-win32.whl", hash = "sha256:ce0a29c28dfb8eccd0f16219360530bc3cfdf6bf70ca384dacd36e6c650ef8e8"}, + {file = "orjson-3.9.10-cp311-none-win_amd64.whl", hash = "sha256:cf80b550092cc480a0cbd0750e8189247ff45457e5a023305f7ef1bcec811616"}, + {file = "orjson-3.9.10-cp312-cp312-macosx_10_15_x86_64.macosx_11_0_arm64.macosx_10_15_universal2.whl", hash = "sha256:602a8001bdf60e1a7d544be29c82560a7b49319a0b31d62586548835bbe2c862"}, + {file = "orjson-3.9.10-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f295efcd47b6124b01255d1491f9e46f17ef40d3d7eabf7364099e463fb45f0f"}, + {file = "orjson-3.9.10-cp312-cp312-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:92af0d00091e744587221e79f68d617b432425a7e59328ca4c496f774a356071"}, + {file = "orjson-3.9.10-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:c5a02360e73e7208a872bf65a7554c9f15df5fe063dc047f79738998b0506a14"}, + {file = "orjson-3.9.10-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:858379cbb08d84fe7583231077d9a36a1a20eb72f8c9076a45df8b083724ad1d"}, + {file = "orjson-3.9.10-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:666c6fdcaac1f13eb982b649e1c311c08d7097cbda24f32612dae43648d8db8d"}, + {file = "orjson-3.9.10-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:3fb205ab52a2e30354640780ce4587157a9563a68c9beaf52153e1cea9aa0921"}, + {file = "orjson-3.9.10-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:7ec960b1b942ee3c69323b8721df2a3ce28ff40e7ca47873ae35bfafeb4555ca"}, + {file = "orjson-3.9.10-cp312-none-win_amd64.whl", hash = "sha256:3e892621434392199efb54e69edfff9f699f6cc36dd9553c5bf796058b14b20d"}, + {file = "orjson-3.9.10.tar.gz", hash = "sha256:9ebbdbd6a046c304b1845e96fbcc5559cd296b4dfd3ad2509e33c4d9ce07d6a1"}, +] + [[package]] name = "packaging" version = "23.2" @@ -865,6 +893,16 @@ files = [ {file = "pyproj-3.6.1.tar.gz", hash = "sha256:44aa7c704c2b7d8fb3d483bbf75af6cb2350d30a63b144279a09b75fead501bf"}, ] +[[package]] +name = "pyshp" +version = "2.3.1" +requires_python = ">=2.7" +summary = "Pure Python read/write support for ESRI Shapefile format" +files = [ + {file = "pyshp-2.3.1-py2.py3-none-any.whl", hash = "sha256:67024c0ccdc352ba5db777c4e968483782dfa78f8e200672a90d2d30fd8b7b49"}, + {file = "pyshp-2.3.1.tar.gz", hash = "sha256:4caec82fd8dd096feba8217858068bacb2a3b5950f43c048c6dc32a3489d5af1"}, +] + [[package]] name = "python-dateutil" version = "2.8.2" @@ -1076,7 +1114,7 @@ files = [ name = "sqlmodel" version = "0" requires_python = ">=3.7,<4.0" -git = "https://github.com/honglei/sqlmodel.git" +git = "https://github.com/mbsantiago/sqlmodel.git" revision = "3005495a3ec6c8216b31cbd623f91c7bc8ba174f" summary = "SQLModel, SQL databases in Python, designed for simplicity, compatibility, and robustness." dependencies = [ diff --git a/pyproject.toml b/pyproject.toml index 3f59377..165c9ed 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,8 @@ dependencies = [ "pydantic-settings>=2.0.3", "itsdangerous>=2.1.2", "passlib[bcrypt]>=1.7.4", + "pyshp>=2.3.1", + "orjson>=3.9.10", ] requires-python = ">=3.11" readme = "README.md" @@ -33,7 +35,7 @@ build-backend = "pdm.backend" [project.optional-dependencies] dev = [ "ipdb>=0.13.13", - "sqlmodel @ git+https://github.com/honglei/sqlmodel.git#egg=sqlmodel", + "sqlmodel @ git+https://github.com/mbsantiago/sqlmodel.git#egg=sqlmodel", ] [tool.pdm.version] diff --git a/src/_version.py b/src/_version.py index a96fc95..e107946 100644 --- a/src/_version.py +++ b/src/_version.py @@ -1 +1 @@ -__version__ = '2023.3+d20231113' \ No newline at end of file +__version__ = '2023.4.dev1+g90091e8.d20231118' \ No newline at end of file diff --git a/src/api.py b/src/api.py index a172d91..1282c33 100644 --- a/src/api.py +++ b/src/api.py @@ -3,9 +3,11 @@ from datetime import timedelta from time import time from uuid import uuid1 from typing import Annotated +from contextlib import asynccontextmanager from fastapi import Depends, FastAPI, HTTPException, status, Request from fastapi.security import OAuth2PasswordRequestForm +from fastapi.responses import ORJSONResponse from starlette.middleware.sessions import SessionMiddleware from sqlmodel import select @@ -30,18 +32,21 @@ from .security import ( authenticate_user, get_current_user, create_access_token, ) from .config import conf +from .registry import make_registry logger = logging.getLogger(__name__) -api = FastAPI() -api.add_middleware(SessionMiddleware, secret_key=conf.crypto.secret) + +@asynccontextmanager +async def lifespan(app: FastAPI): + make_registry(app) + yield + +api = FastAPI(lifespan=lifespan) +#api.add_middleware(SessionMiddleware, secret_key=conf.crypto.secret) db_session = Annotated[AsyncSession, Depends(get_db_session)] -@api.get("/nothing") -async def get_nothing() -> str: - return '' - @api.get('/bootstrap') async def bootstrap( @@ -90,7 +95,6 @@ async def get_categories( data = await db_session.exec(query) return data.all() - @api.get("/categories_p") async def get_categories_p( db_session: db_session, diff --git a/src/config.py b/src/config.py index f486a9b..865c743 100644 --- a/src/config.py +++ b/src/config.py @@ -152,13 +152,16 @@ class OGCAPI(BaseSettings): metadata: OGCAPIMetadata server: OGCAPIServer +class TileServer(BaseSettings): + BaseDir: str + UseRequestUrl: bool = False + SpriteBaseDir: str + SpriteUrl: str + SpriteBaseUrl: str + openMapTilesKey: str | None = None + class Map(BaseSettings): - tilesBaseDir: str - tilesUseRequestUrl: bool - tilesSpriteBaseDir: str - tilesSpriteUrl: str - tilesSpriteBaseUrl: str - openMapTilesKey: str + tileServer: TileServer | None = None zoom: int pitch: int lat: float diff --git a/src/models/category.py b/src/models/category.py index 74e24fe..2d2d82b 100644 --- a/src/models/category.py +++ b/src/models/category.py @@ -1,5 +1,5 @@ from typing import Any -from sqlmodel import Field, SQLModel, MetaData, JSON, TEXT, Relationship, Column +from sqlmodel import Field, SQLModel, JSON, TEXT, Column from pydantic import computed_field from .metadata import gisaf_survey diff --git a/src/models/geo_models_base.py b/src/models/geo_models_base.py new file mode 100644 index 0000000..014dacc --- /dev/null +++ b/src/models/geo_models_base.py @@ -0,0 +1,1116 @@ +from pathlib import Path +from typing import Any, ClassVar +from datetime import date, datetime +from collections import OrderedDict +from io import BytesIO +from zipfile import ZipFile +import locale +import logging +from json import dumps + +import numpy as np +import pandas as pd +import geopandas as gpd + +import shapely +import pyproj + +from sqlmodel import SQLModel, Field + +from geoalchemy2.shape import from_shape + +from sqlalchemy import BigInteger, Column, String, func, and_ +from sqlalchemy.sql import sqltypes +from psycopg2.extensions import adapt + +from geoalchemy2.types import Geometry +from geoalchemy2.elements import WKBElement + +from shapely import wkb +from shapely.geometry import mapping +from shapely.ops import transform + +from shapefile import (Writer as ShapeFileWriter, + POINT, POINTZ, + POLYLINE, POLYLINEZ, + POLYGON, POLYGONZ, + ) + +from ..config import conf +from .models_base import Model +from .survey import Equipment, Surveyor, Accuracy +from .misc import Qml +from .category import Category +from .project import Project +from ..utils import upsert_df + +LOCALE_DATE_FORMAT = locale.nl_langinfo(locale.D_FMT) + + +## Coordinates of WGS84 should be in the (y, x) order, but some components (shapely?) +## always work with (x, y). Force (y, x) order with always_xy=True +## https://github.com/pyproj4/pyproj/issues/504 +## https://proj.org/faq.html#why-is-the-axis-ordering-in-proj-not-consistent +reproject_func = pyproj.Transformer.from_crs( + conf.geo.srid, conf.geo.srid_for_proj, always_xy=True).transform + + +class NoPoint(Exception): + pass + +logger = logging.getLogger('model_base_base') + + +## Mapping of sql types to shapefile types, for pyshp +## Character, Numbers, Longs, Dates, or Memo +exportable_cols = { + sqltypes.Numeric: 'N', + sqltypes.Integer: 'N', + sqltypes.DateTime: 'D', + sqltypes.Date: 'D', + sqltypes.Time: 'D', + sqltypes.String: 'C', + sqltypes.Unicode: 'C', +} + + +class BaseSurveyModel(SQLModel): + """ + Base mixin class for all layers defined from a category: + - raw survey (RAW_V_*') + - projected ('V_*') + """ + id: int = Field(sa_column=Column(BigInteger()), primary_key=True) + equip_id: int = Field(foreign_key='equipment.id') + srvyr_id: int = Field('surveyor.id') + accur_id: int = Field('accuracy.id') + project_id: int = Field('project.id') + + orig_id: str + date: date + + @classmethod + def dyn_join_with(cls): + return { + 'equipment': Equipment, + 'surveyor': Surveyor, + 'accuracy': Accuracy, + 'project': Project, + } + + #async def get_info(self): + # info = await super(BaseSurveyModel, self).get_info() + # return info + + #async def get_geo_info(self): + # info = await super(BaseSurveyModel, self).get_geo_info() + # return info + + async def get_survey_info(self): + info = await super(BaseSurveyModel, self).get_survey_info() + if self.category: + info['ISO layer name'] = self.iso_layer_name + info['survey category'] = '{} ({})'.format(self.category.description, self.category.name) + if self.project_id: + info['project'] = self.project.name + if self.srvyr_id: + info['surveyor'] = self.surveyor.name + if self.equip_id: + info['survey equipment'] = self.equipment.name + if self.accur_id: + info['survey accuracy'] = self.accuracy.name + if self.date: + info['survey date'] = self.date.strftime(LOCALE_DATE_FORMAT) + if self.orig_id: + info['original id'] = self.orig_id + return info + + @property + def iso_layer_name(self): + """ + The ISO layer name, built on the category and status + """ + return '{category.domain}-{category.group:4s}-{category.minor_group_1:4s}-{category.minor_group_2:4s}-{status:1s}'\ + .format(category=self.category, status=self.status) + + +class SurveyModel(BaseSurveyModel): + """ + Base mixin class for defining final (reprojected) survey data, with a status + """ + status: str = Field(sa_column=Column(String(1))) + + get_gdf_with_related: bool = False + + filtered_columns_on_map: list[str] = [ + 'equip_id', + 'srvyr_id', + 'accur_id', + 'project_id', + 'orig_id', + 'date', + 'gisaf_survey_equipment_id', + 'gisaf_survey_equipment_name', + 'gisaf_survey_surveyor_id', + 'gisaf_survey_surveyor_name', + 'gisaf_survey_accuracy_id', + 'gisaf_survey_accuracy_name', + 'gisaf_survey_accuracy_accuracy', + 'gisaf_admin_project_id', + 'gisaf_admin_project_name', + 'gisaf_admin_project_contact_person', + 'gisaf_admin_project_site', + 'gisaf_admin_project_date_approved', + 'gisaf_admin_project_start_date_planned', + 'gisaf_admin_project_start_date_effective', + 'gisaf_admin_project_end_date_planned', + 'gisaf_admin_project_end_date_effective', + 'gisaf_admin_project_skip_columns', + ] + + async def get_survey_info(self): + info = await super(SurveyModel, self).get_survey_info() + if self.srvyr_id: + info['surveyor'] = self.surveyor.name + if self.equip_id: + info['survey equipment'] = self.equipment.name + if self.accur_id: + info['survey accuracy'] = self.accuracy.name + if self.date: + info['survey date'] = self.date.strftime(LOCALE_DATE_FORMAT) + info['status'] = self.status + info['original id'] = self.orig_id + return info + + async def get_feature_properties(self): + return { + 'status': self.status if self.status else 'E' + } + + @property + def caption(self): + return '{self.category.description} [{self.category.group}-{self.category.minor_group_1}] #{self.id:d}'.format(self=self) + + @classmethod + async def get_popup(cls, df): + return cls.category.description + \ + ' [' + cls.category.group + '-' + cls.category.minor_group_1 + \ + '] #' + df.index.astype('U') + + @classmethod + async def get_geojson(cls, simplify_tolerance=0): + + ## Fastest, but the id is in properties and needs the front end (eg Gisaf for mapbox) + ## to move it at the feature level + ## Requires PostGis 3.0+ + sql_query_for_geojson_fast = """ + SELECT json_build_object( + 'type', 'FeatureCollection', + 'features', json_agg(ST_AsGeoJSON(t.*)::json) + )::varchar + FROM ( + SELECT f.geom, + f.id::varchar, + {description} || ' [' || '{model.category.group}' || '-' || '{model.category.minor_group_1}' || '] #' || f.id as popup, + f.status + FROM "{schema}"."{table}" as f + WHERE f.geom is not null + ) AS t; + """ + + ## Slower version (3x), but compliant with RFC 7946 + ## where the id is at the feature level + sql_query_for_geojson_slow = """ + SELECT jsonb_build_object( + 'type', 'FeatureCollection', + 'features', jsonb_agg(features.feature) + )::varchar + FROM ( + SELECT jsonb_build_object( + 'type', 'Feature', + 'id', id, + 'geometry', ST_AsGeoJSON(geom)::jsonb, + 'properties', jsonb_build_object( + 'popup', + {description} || ' [' || '{model.category.group}' || '-' || '{model.category.minor_group_1}' || '] #' || inputs.id::varchar, + 'status', status + ) + ) AS feature + FROM "{schema}"."{table}" AS inputs + ) AS features + """ + + sql_query_for_geojson = sql_query_for_geojson_fast + + async with db.acquire(reuse=False) as conn: + query = sql_query_for_geojson.format( + model=cls, + schema=cls.__table_args__['schema'], + table=cls.__tablename__, + description=adapt(cls.category.description), + ) + result = await conn.scalar(query) + return result + + + def to_row(self): + """ + Get a list of attributes, typically used for exporting in CSV + :return: list of attributes + """ + return [ + self.id, + self.shapely_geom.x, + self.shapely_geom.y, + self.shapely_geom.z, + self.category_name, + self.surveyor, + self.equipment, + self.date.isoformat(), + self.accuracy.name, + self.status, + self.project.name, + self.orig_id + ] + + +class GeoModel(Model): + """ + Base class for all geo models + """ + #__abstract__ = True + description: str = '' + attribution: str | None = None + + can_get_features_as_df: bool = True + """ + can_get_features_as_df indicates that the model is ready to get GeoJson using GeoDataframe + If False, switch back to gino and dict based conversion using get_features_in_bulk_gino + and record.get_feature_as_dict (DEPRECATED) + """ + + cache_enabled: bool = True + """ + cache_enabled indicated that the model is OK with the caching mechanism of geojson stores. + The cache is time-stamped with DB triggers on modification, so it's safe unless the model + fetches other data than from its own table, eg. some status for styling. + See gisaf.redis_tools and geoapi.gj_feature for the implementation details of the cache. + """ + + get_gdf_with_related: bool = False + """ + get_gdf_with_related indicates that get_df (thus, get_geo_df and the geoJson API for + the map online) gets related models (1-n relations, as defined with _join_with and dyn_join_with) + by default. + It can be overridden with the with_related parameter when calling get_df. + """ + + z_index: int = 450 + """ + z-index for the leaflet layer. + Should be between 400 and 500. + """ + + icon: str | None = None + """ + Icon for the model, used for normal web interface (ie. except the map) + """ + + symbol: str | None = None + """ + Icon for the model, used in the map (mapbox) + """ + + style: str = '' + """ + Style for the model, used in the map, etc + """ + + status: str = 'E' + """ + Status (ISO layers definition) of the layer. E -> Existing. + """ + + _join_with: dict[str, Any] = { + } + """ + Fields to join when getching items using get_features. + """ + + hidden: bool = False + """ + This model should be hidden from the menu + """ + + def __str__(self): + return self.caption + + async def get_geo_info(self): + """ + Geographical info + """ + return {} + + async def get_survey_info(self): + """ + Quality info: project, source, accuracy... + """ + return OrderedDict() + + async def get_info(self): + """ + Model specific info + """ + return {} + + @classmethod + def get_join_with(cls): + if hasattr(cls, 'dyn_join_with'): + return cls.dyn_join_with() + else: + return cls._join_with + + #@classmethod + #def get_style(cls): + # """ + # Style for the layer in Leaflet (css). + # :return: + # """ + # return { + # # 'color': "#4B1BDE", + # 'weight': '1' + # } + + @classmethod + def get_options(cls): + """ + Options for the layer in Leaflet. + For example: {'weight': 0, 'fillOpacity': 0.1} + :return: + """ + return { + } + + @property + def caption(self): + """ + Subclass me! + :return: str + """ + return 'Some geometry' + + @classmethod + async def get_popup(cls, df): + return cls.__name__ + ': ' + df.index.astype('U') + + @classmethod + async def get_properties(cls, df): + return {} + + async def get_tags(self): + from gisaf.models.tags import Tags + tags = await Tags.get_df( + where=and_( + Tags.store == self.__class__.get_store_name(), + Tags.ref_id == self.id + ), + with_only_columns=['tags'] + ) + if len(tags) > 0: + return tags.loc[0, 'tags'] + else: + return {} + + @property + def shapely_geom(self): + if not hasattr(self, '_shapely_geom'): + if isinstance(self.geom, WKBElement): + bytes = self.geom.data + if bytes: + self._shapely_geom = wkb.loads(bytes) + else: + self._shapely_geom = None + else: + self._shapely_geom = None + return self._shapely_geom + + def get_bgColor(self): + """ + Get the background color of the element on the map. + Subclass me! + :return: str + """ + return '' + + async def get_feature_as_dict(self, simplify_tolerance=None, reproject=False, css_class_prefix=''): + """ + Get the parameters of this object (feature) + :param css_class_prefix: for leaflet only + :return: + """ + logger.warn(f'{self} use get_feature_as_dict: please DEPRECATE in favor of get_geo_df') + + if hasattr(self, 'get_feature_properties'): + properties = await self.get_feature_properties() + else: + properties = { + 'popup': self.caption + } + + if reproject: + shapely_geom = transform(reproject_func, self.shapely_geom) + else: + shapely_geom = self.shapely_geom + + if simplify_tolerance: + shapely_geom = shapely_geom.simplify(simplify_tolerance / conf.geo['simplify_geom_factor'], + preserve_topology=False) + if shapely_geom.is_empty: + raise NoPoint + + return { + "type": "Feature", + 'id': str(self.id), + "properties": properties, + 'geometry': mapping(shapely_geom) + } + + @classmethod + def get_columns_for_shapefile_writer(cls, writer): + """ + Return the columns and corresponding attribute name + :param writer: + :return: + """ + + ## Define the attributes + columns = cls.__table__.columns + cols = [col.name for col in columns if col.type.__class__ in exportable_cols] + for col in columns: + if col.name in cols: + writer.field(col.name, exportable_cols[col.type.__class__]) + + return cols + + return OrderedDict([(col, cols_to_attr[col]) for col in cols]) + + @classmethod + async def get_shapefile_writer(cls): + """ + Get a shapefile writer with all records loaded. + Subclasses should implement this. + :return: + """ + raise NotImplementedError + + @classmethod + async def get_shapefile_files(cls): + """ + Get an actual shapefile with all records. + :return: zip file + """ + writer = await cls.get_shapefile_writer() + layer_name = cls.__name__ + + ## Save the dbf, shp, shx data + dbf = BytesIO() + writer.saveDbf(dbf) + shp = BytesIO() + writer.saveShp(shp) + shx = BytesIO() + writer.saveShx(shx) + + qml = None + qml_rec = await Qml.get(layer_name) + if qml_rec and qml_rec.qml: + qml = qml_rec.qml + + srid = cls.geom.expression.type.srid + + ## Write the .prj file: projection system reference + ## Try if there's a file + prj_path = (conf.prj_files_dir / str(srid)).with_suffix('.prj') + proj_str = None + if prj_path.is_file(): + with open(prj_path) as proj_file: + proj_str = proj_file.read() + else: + ## Get from the spatial_ref_sys table in the database + async with db.acquire(reuse=False) as connection: + async with connection.transaction() as tx: + proj_rec = await connection.first(f'select srtext from spatial_ref_sys where srid={srid}') + if proj_rec and proj_rec[0]: + proj_str = proj_rec[0] + + return dbf, shp, shx, qml, proj_str + + @classmethod + async def get_shapefile(cls): + layer_name = cls.__name__ + dbf, shp, shx, qml, proj_str = await cls.get_shapefile_files() + + ## Zip and return data + zip_file = BytesIO() + with ZipFile(zip_file, 'w') as zip: + zip.writestr('{}.dbf'.format(layer_name), dbf.getvalue()) + zip.writestr('{}.shp'.format(layer_name), shp.getvalue()) + zip.writestr('{}.shx'.format(layer_name), shx.getvalue()) + if qml: + zip.writestr('{}.qml'.format(layer_name), qml) + if proj_str: + zip.writestr('{}.prj'.format(layer_name), proj_str) + + zip_file.seek(0) + return zip_file + + @classmethod + async def get_mapbox_style(cls): + """ + Get the mapbox style (paint, layout, attribution...) + """ + ## If the model is from survey, it should have a category, which has a style + ## Get from database + style = {} + if hasattr(cls, 'category'): + category = await Category.get_item_by_pk(pk=cls.category.name) + if category: + if category['mapbox_paint'] is not None: + style['paint'] = category['mapbox_paint'] + if category['mapbox_layout'] is not None: + style['layout'] = category['mapbox_layout'] + + else: + category = None + + qml = await Qml.get_item_by_pk(pk=cls.__name__) + if qml: + if qml['mapbox_paint']: + style['paint'] = qml['mapbox_paint'] + if qml['mapbox_layout']: + style['layout'] = qml['mapbox_layout'] + + if cls.attribution is not None: + style['attribution'] = cls.attribution + return style + + + @classmethod + async def get_features_attrs(cls, simplify_tolerance): + """ + Get the attributes and kwargs to pass to get_features for this model + :param simplify_tolerance: float + :return: tuple(dict, dict) + """ + attrs = {"type": "FeatureCollection"} + + logger.warn(f'{cls} use get_features_attrs: please DEPRECATE in favor of get_geo_df') + + ## Get style, with priority order + ## If the model is from survey ("auto_model"), it should have a category, which has a style + category = getattr(cls, 'category', None) + + features_kwargs = {} + + if not issubclass(cls, GeoPointModel): + if hasattr(cls, 'simplify') and cls.simplify: + features_kwargs['simplify_tolerance'] = float(cls.simplify) + else: + features_kwargs['simplify_tolerance'] = simplify_tolerance + features_kwargs['reproject'] = conf.geojson_srid != conf.srid + + ## If the model is from survey, it should have a category, which has a style + ## Get from database + ## TODO: update categories dynamically in the registry + if hasattr(cls, 'category'): + category = await Category.get(cls.category.name) + else: + category = None + + features_kwargs['reproject'] = conf.geojson_srid != conf.srid + + ## Get options, if the model has defined the get_options function + if hasattr(cls, 'get_options'): + attrs["options"] = cls.get_options() + + if hasattr(cls, 'get_popup_dynamic'): + attrs["has_popups"] = True + + return attrs, features_kwargs + + @classmethod + async def get_features_in_bulk_gino(cls, **kwargs): + """ + Get the features, using joins defined by _join_with, or, in priority, get_join_with(). + See Gino's doc: relationship.html (specially loaders) + :param kwargs: + :return: + """ + features = [] + async with db.acquire(reuse=True) as connection: + async with connection.transaction() as tx: + join = cls.get_join_with() + query = cls.load(**join) + #async for record in connection.iterate(query): + for record in await connection.all(query): + if record.geom is not None: + try: + feature = await record.get_feature_as_dict(**kwargs) + except NoPoint: + pass + except Exception as err: + logger.exception(err) + else: + features.append(feature) + + return features + + @classmethod + async def get_geos_df(cls, where=None, crs=None, reproject=False, + filter_columns=False, with_popup=False, + drop_wkb=True, **kwargs): + if not gpd.options.use_pygeos: + logger.warn(f'Using get_geos_df for {cls} but gpd.options.use_pygeos not set') + if not crs: + crs = conf.crs['geojson'] + df = await cls.get_df(where=where, **kwargs) + df.set_index('id', inplace=True) + df.rename(columns={'geom': 'wkb'}, inplace=True) + df['geometry'] = shapely.from_wkb(df['wkb']) + if drop_wkb: + df = df.drop(columns=['wkb']) + return gpd.GeoDataFrame(df, geometry='geometry', crs=crs) + + + @classmethod + async def get_geo_df(cls, where=None, crs=None, reproject=False, + filter_columns=False, with_popup=False, **kwargs): + """ + Return a Pandas dataframe of all records + :param where: where clause for the query (eg. Model.attr=='foo') + :param crs: coordinate system (eg. 'epsg:4326') (priority over the reproject parameter) + :param reproject: should reproject to conf.srid_for_proj + :return: + """ + df = await cls.get_df(where=where, **kwargs) + df.dropna(subset=['geom'], inplace=True) + df.set_index('id', inplace=True) + df.sort_index(inplace=True) + df_clean = df[df.geom != None] + ## Drop coordinates + df_clean.drop(columns=set(df_clean.columns).intersection(['ST_X_1', 'ST_Y_1', 'ST_Z_1']), + inplace=True) + if not crs: + crs = conf.crs['geojson'] + + ## XXX: is it right? Waiting for application.py to remove pygeos support to know more. + if getattr(gpd.options, 'use_pygeos', False): + geometry = shapely.from_wkb(df_clean.geom) + else: + geometry = [wkb.loads(geom) for geom in df_clean.geom] + + gdf = gpd.GeoDataFrame( + df_clean.drop('geom', axis=1), + crs=crs, + geometry=geometry + ) + + if hasattr(cls, 'simplify') and cls.simplify: + #shapely_geom = shapely_geom.simplify(simplify_tolerance / conf.geo['simplify_geom_factor'], + #preserve_topology=False) + gdf['geometry'] = gdf['geometry'].simplify( + float(cls.simplify) / conf.geo['simplify_geom_factor'], + preserve_topology=False) + + if reproject: + gdf.to_crs(crs=conf.crs['for_proj'], inplace=True) + + ## Filter out columns + if filter_columns: + gdf.drop(columns=set(gdf.columns).intersection(cls.filtered_columns_on_map), + inplace=True) + + if with_popup: + gdf['popup'] = await cls.get_popup(gdf) + + return gdf + + @classmethod + def get_attachment_dir(cls): + return f'{cls.__table__.schema}.{cls.__table__.name}' + + @classmethod + def get_attachment_base_dir(cls): + return Path(conf.attachments['base_dir'])/cls.get_attachment_dir() + +class Geom(str): + pass + +class GeoPointModel(GeoModel): + #__abstract__ = True + shapefile_model: ClassVar[int] = POINT + geom: Any = Field(sa_column=Column(Geometry('POINT', srid=conf.geo.srid))) + icon: str | None = None + mapbox_type: str = 'symbol' + base_gis_type: str = 'Point' + symbol: str = '\ue32b' + + @property + def latitude(self): + return self.shapely_geom.y + + @property + def longitude(self): + return self.shapely_geom.x + + @property + def elevation(self): + return self.shapely_geom.z + + @property + def caption(self): + """ + Return user friendly name (used in menu, etc) + :return: + """ + return '{self.__class__.__name__}: {self.id:d}'.format(self=self) + + def get_coords(self): + return (self.shapely_geom.x, self.shapely_geom.y) + + @classmethod + async def get_shapefile_writer(cls): + writer = ShapeFileWriter(cls.shapefile_model) + cols = cls.get_columns_for_shapefile_writer(writer) + query = cls.query.where(cls.geom != None) + + async with db.acquire(reuse=False) as connection: + async with connection.transaction() as tx: + ## Add the objects + async for obj in connection.iterate(query): + writer.point(*obj.get_coords(), shapeType=cls.shapefile_model) + attrs = [] + for attr_name in cols: + attr = getattr(obj, attr_name) + if isinstance(attr, date): + ## Date in a format approriate for shapefile + attrs.append('{:%Y%m%d}'.format(attr)) + else: + attrs.append(attr) + writer.record(*attrs) + + return writer + + async def get_geo_info(self): + info = OrderedDict() + if self.shapely_geom: + info['longitude'] = '{:.6f}'.format(self.shapely_geom.x) + info['latitude'] = '{:.6f}'.format(self.shapely_geom.y) + return info + + +class GeoPointZModel(GeoPointModel): + #__abstract__ = True + geom: Any = Field(sa_column=Column(Geometry('POINTZ', dimension=3, srid=conf.geo.srid))) + shapefile_model: int = POINTZ + + def get_coords(self): + return (self.shapely_geom.x, self.shapely_geom.y, self.shapely_geom.z) + + async def get_geo_info(self): + info = await super(GeoPointZModel, self).get_geo_info() + info['elevation (m)'] = '{:.2f}'.format(self.shapely_geom.z) + return info + + +class GeoPointMModel(GeoPointZModel): + #__abstract__ = True + shapefile_model: int = POINTZ + geom: Any = Field(sa_column=Column(Geometry('POINTZ', dimension=3, srid=conf.geo.srid))) + + +class GeoLineModel(GeoModel): + #__abstract__ = True + shapefile_model: int = POLYLINE + geom: Any = Field(sa_column=Column(Geometry('LINESTRING', srid=conf.geo.srid))) + mapbox_type: str = 'line' + base_gis_type: str = 'Line' + + @property + def length(self): + return transform(reproject_func, self.shapely_geom).length + + @property + def caption(self): + """ + Return user friendly name (used in menu, etc) + :return: + """ + return '{self.__class__.__name__}: {self.id:d}'.format(self=self) + + @classmethod + async def get_shapefile_writer(cls): + writer = ShapeFileWriter(cls.shapefile_model) + cols = cls.get_columns_for_shapefile_writer(writer) + query = cls.query.where(cls.geom != None) + + async with db.acquire(reuse=False) as connection: + async with connection.transaction() as tx: + ## Add the objects + async for obj in connection.iterate(query): + try: + writer.line(parts=[[point for point in obj.get_points()]]) + except NoPoint: + pass + attrs = [] + for attr_name in cols: + attr = getattr(obj, attr_name) + if isinstance(attr, date): + ## Date in a format approriate for shapefile + attrs.append('{:%Y%m%d}'.format(attr)) + else: + attrs.append(attr) + writer.record(*attrs) + + return writer + + def get_points(self): + """ + Get a list of points + :return: + """ + if isinstance(self.geom, None.__class__): + raise NoPoint + points = wkb.loads(self.geom.data) + return zip(points.coords.xy[0], points.coords.xy[1]) + + async def get_geo_info(self): + info = OrderedDict() + bounds = self.shapely_geom.bounds + info['longitude'] = '{:.6f} - {:.6f}'.format(bounds[0], bounds[2]) + info['latitude'] = '{:.6f} - {:.6f}'.format(bounds[1], bounds[3]) + info['length (m)'] = '{self.length:.2f}'.format(self=self) + return info + + +class GeoLineModelZ(GeoLineModel): + #__abstract__ = True + shapefile_model: int = POLYLINEZ + geom: Any = Field(sa_column=Column(Geometry('LINESTRINGZ', dimension=3, srid=conf.geo.srid))) + + async def get_geo_info(self): + info = await super(GeoLineModelZ, self).get_geo_info() + elevations = [cc[2] for cc in self.shapely_geom.coords] + elev_min = min(elevations) + elev_max = max(elevations) + if elev_min == elev_max: + info['elevation (m)'] = '{:.2f}'.format(elev_min) + else: + info['elevation (m)'] = '{:.2f} - {:.2f}'.format(elev_min, elev_max) + return info + + +class GeoPolygonModel(GeoModel): + #__abstract__ = True + shapefile_model: int = POLYGON + geom: Any = Field(sa_column=Column(Geometry('POLYGON', srid=conf.geo.srid))) + mapbox_type: str = 'fill' + base_gis_type: str = 'Polygon' + + @property + def area(self): + return transform(reproject_func, self.shapely_geom).area + + @property + def length(self): + return transform(reproject_func, self.shapely_geom).length + + @property + def caption(self): + """ + Return user friendly name (used in menu, etc) + :return: + """ + return '{self.__class__.__name__}: {self.id:d}'.format(self=self) + + @classmethod + async def get_shapefile_writer(cls): + writer = ShapeFileWriter(cls.shapefile_model) + cols = cls.get_columns_for_shapefile_writer(writer) + query = cls.query.where(cls.geom != None) + + async with db.acquire(reuse=False) as connection: + async with connection.transaction() as tx: + ## Add the objects + async for obj in connection.iterate(query): + try: + writer.poly(parts=[[point for point in obj.get_points()]]) + except NoPoint: + pass + attrs = [] + for attr_name in cols: + attr = getattr(obj, attr_name) + if isinstance(attr, date): + ## Date in a format approriate for shapefile + attrs.append('{:%Y%m%d}'.format(attr)) + else: + attrs.append(attr) + writer.record(*attrs) + + return writer + + def get_points(self): + """ + Get a list of points + :return: + """ + if isinstance(self.geom, None.__class__): + raise NoPoint + points = wkb.loads(self.geom.data) + return zip(points.exterior.coords.xy[0], points.exterior.coords.xy[1]) + + async def get_geo_info(self): + info = OrderedDict() + area = self.area + boundam@ + self.shapely_geom.bounds + info['longitude'] = '{:.6f} - {:.6f}'.format(bounds[0], bounds[2]) + info['latitude'] = '{:.6f} - {:.6f}'.format(bounds[1], bounds[3]) + info['length (m)'] = '{:.2f}'.format(self.length) + info['area (sq. m)'] = '{:.1f} sq. m'.format(area) + info['area (ha)'] = '{:.1f} ha'.format(area / 10000) + info['area (acre)'] = '{:.1f} acres'.format(area / 4046.85643005078874) + return info + + +class GeoPolygonModelZ(GeoPolygonModel): + #__abstract__ = True + shapefile_model: int = POLYGONZ + geom: Any = Field(sa_column=Column(Geometry('POLYGONZ', dimension=3, srid=conf.geo.srid))) + + async def get_geo_info(self): + info = await super(GeoPolygonModelZ, self).get_geo_info() + if hasattr(self.shapely_geom, 'exterior'): + coords = self.shapely_geom.exterior.coords + else: + coords = self.shapely_geom.coords + elevations = [coord[2] for coord in coords] + elev_min = min(elevations) + elev_max = max(elevations) + if elev_min == elev_max: + info['elevation (m)'] = '{:.2f}'.format(elev_min) + else: + info['elevation (m)'] = '{:.2f} - {:.2f}'.format(elev_min, elev_max) + return info + + +class GeoPointSurveyModel(SurveyModel, GeoPointMModel): + #__abstract__ = True + + ## raw_model is set in category_models_maker.make_category_models + raw_model: Any = None + + +class LineWorkSurveyModel(SurveyModel): + __abstract__ = True + + ## raw_model is set in category_models_maker.make_category_models + raw_model: Any = None + + def match_raw_points(self): + reprojected_geom = transform(reproject_func, self.shapely_geom) + reprojected_geom_geoalchemy = from_shape(reprojected_geom, conf.raw_survey_srid) + raw_survey_points_project = self.raw_model.query.filter(self.raw_model.project_id==self.project_id) + query = raw_survey_points_project.filter( + func.ST_Distance(reprojected_geom_geoalchemy, self.raw_model.geom) < conf.epsilon + ) + return query.all() + + +class GeoLineSurveyModel(LineWorkSurveyModel, GeoLineModelZ): + __abstract__ = True + + +class GeoPolygonSurveyModel(LineWorkSurveyModel, GeoPolygonModelZ): + __abstract__ = True + + +class RawSurveyBaseModel(BaseSurveyModel, GeoPointMModel): + """ + Abstract base class for category based raw survey point models + """ + __abstract__ = True + geom: Any = Field(sa_column=Column(Geometry('POINTZ', dimension=3, srid=conf.geo.raw_survey.srid))) + status: str = Field(sa_column=Column(String(1))) + + ## store_name is set in category_models_maker.make_category_models + store_name: str | None = None + + @classmethod + async def get_geo_df(cls, *args, **kwargs): + return await super().get_geo_df(crs=conf.raw_survey['spatial_sys_ref'], *args, **kwargs) + + +class PlottableModel(Model): + """ + Model that defines a value for each record, therefore that can be used + in plots (eg. bar graphs). + Subclasses might want to define: + * a unit (default: m) + * a resampling method, to be applied to all values (unless values is a dict, see below) + * attributes with numerical types, and: + * a tuple called values that defines what are these columns + to be used (the first one being the default) + * OR an ordereed dict of value => resampling method + """ + __abstract__ = True + + float_format: str = '%.1f' + values: dict[Any, Any] = {} + + @classmethod + async def get_as_dataframe(cls, model_id=None, where=None, **kwargs): + """ + Get a dataframe for the data. + It's quite generic, so subclasses might want to subclass this. + """ + if where is None: + if model_id is None: + where_ = None + else: + where_ = cls.ref_id == model_id + else: + if model_id is None: + where_ = where + else: + where_ = and_(cls.ref_id == model_id, where) + + if where_ is not None: + df = await cls.get_df(where=where_, **kwargs) + else: + df = await cls.get_df(**kwargs) + + return df + + +class TimePlottableModel(PlottableModel): + __abstract__ = True + + time: datetime + + @classmethod + async def get_as_dataframe(cls, model_id=None, with_only_columns=None, **kwargs): + """ + Get the data as a time-indexed dataframe + """ + if with_only_columns == None: + with_only_columns = [val['name'] for val in cls.values] + if 'time' not in with_only_columns: + with_only_columns.insert(0, 'time') + + df = await super().get_as_dataframe(model_id=model_id, + with_only_columns=with_only_columns, **kwargs) + + ## Set time as index + df.set_index('time', drop=True, inplace=True) + df.sort_index(inplace=True) + return df diff --git a/src/models/map_bases.py b/src/models/map_bases.py new file mode 100644 index 0000000..8f20d14 --- /dev/null +++ b/src/models/map_bases.py @@ -0,0 +1,66 @@ +from typing import Any + +from sqlmodel import Field, String, JSON, Column + +from .models_base import Model +from .metadata import gisaf_map + + +class BaseStyle(Model): + metadata = gisaf_map + __tablename__ = 'map_base_style' + + class Admin: + menu = 'Other' + flask_admin_model_view = 'MapBaseStyleModelView' + + id: int = Field(primary_key=True) + name: str + style: dict[str, Any] | None = Field(sa_column=Column(JSON(none_as_null=True))) + mbtiles: str = Field(sa_column=Column(String(50))) + static_tiles_url: str + enabled: bool = True + + def __repr__(self): + return ''.format(self=self) + + +class BaseMap(Model): + metadata = gisaf_map + __tablename__ = 'base_map' + + class Admin: + menu = 'Other' + + id: int = Field(primary_key=True) + name: str + + def __repr__(self): + return ''.format(self=self) + + def __str__(self): + return self.name + + +class BaseMapLayer(Model): + metadata = gisaf_map + __tablename__ = 'base_map_layer' + + class Admin: + menu = 'Other' + + id: int = Field(primary_key=True) + base_map_id: int = Field(foreign_key='base_map.id', index=True) + store: str = Field(sa_column=Column(String(100))) + + @classmethod + def dyn_join_with(cls): + return { + 'base_map': BaseMap, + } + + def __repr__(self): + return f"" + + def __str__(self): + return f"{self.store or '':s}" \ No newline at end of file diff --git a/src/models/metadata.py b/src/models/metadata.py index 38474ab..7829c11 100644 --- a/src/models/metadata.py +++ b/src/models/metadata.py @@ -2,4 +2,5 @@ from sqlmodel import MetaData gisaf = MetaData(schema='gisaf') gisaf_survey = MetaData(schema='gisaf_survey') -gisaf_admin= MetaData(schema='gisaf_admin') \ No newline at end of file +gisaf_admin= MetaData(schema='gisaf_admin') +gisaf_map= MetaData(schema='gisaf_map') \ No newline at end of file diff --git a/src/models/misc.py b/src/models/misc.py new file mode 100644 index 0000000..1c8c6f9 --- /dev/null +++ b/src/models/misc.py @@ -0,0 +1,35 @@ +import logging +from typing import Any + +from sqlmodel import Field, JSON, Column + +from .models_base import Model +from .metadata import gisaf_map + +logger = logging.getLogger(__name__) + + +class NotADataframeError(Exception): + pass + + +class Qml(Model): + """ + Model for storing qml (QGis style) + """ + metadata = gisaf_map + + class Admin: + menu = 'Other' + flask_admin_model_view = 'QmlModelView' + + model_name: str = Field(default=None, primary_key=True) + qml: str + attr: str + style: str + mapbox_paint: dict[str, Any] | None = Field(sa_column=Column(JSON(none_as_null=True))) + mapbox_layout: dict[str, Any] | None = Field(sa_column=Column(JSON(none_as_null=True))) + + def __repr__(self): + return ''.format(self=self) + diff --git a/src/models/models_base.py b/src/models/models_base.py new file mode 100644 index 0000000..17dcbf2 --- /dev/null +++ b/src/models/models_base.py @@ -0,0 +1,118 @@ +from typing import Any +import logging + +from sqlmodel import Field, SQLModel, MetaData, JSON, TEXT, Relationship, Column +from pydantic import computed_field +import numpy as np +import pandas as pd +import geopandas as gpd +import shapely +from sqlalchemy.sql import sqltypes +from geoalchemy2.types import Geometry + +pandas_cast_map = { + sqltypes.Integer: 'Int64', + sqltypes.Float: 'float64', +} + +logger = logging.getLogger('model_base_base') + +class Model(SQLModel): + """ + Base mixin class for models that can be converted to a Pandas dataframe with get_df + """ + + class Meta: + filtered_columns_on_map: list[str] = [] + + @classmethod + def get_store_name(cls): + return "{}.{}".format(cls.__table_args__['schema'], cls.__tablename__) + + @classmethod + def get_table_name_prefix(cls): + return "{}_{}".format(cls.__table_args__['schema'], cls.__tablename__) + + @classmethod + async def get_df(cls, where=None, + with_related=None, recursive=True, + cast=True, + with_only_columns=None, + geom_as_ewkt=False, + **kwargs): + """ + Return a Pandas dataframe of all records + Optional arguments: + * an SQLAlchemy where clause + * with_related: automatically get data from related columns, following the foreign keys in the model definitions + * cast: automatically transform various data in their best python types (eg. with date, time...) + * with_only_columns: fetch only these columns (list of column names) + * geom_as_ewkt: convert geometry columns to EWKB (handy for eg. using upsert_df) + :return: + """ + query = cls.query + + if with_related is not False: + if with_related or getattr(cls, 'get_gdf_with_related', False): + joins = get_join_with(cls, recursive) + model_loader = cls.load(**joins) + query = _get_query_with_table_names(model_loader) + + if where is not None: + query.append_whereclause(where) + + if with_only_columns: + query = query.with_only_columns([getattr(cls, colname) for colname in with_only_columns]) + + ## Got idea from https://github.com/MagicStack/asyncpg/issues/173. + async with query.bind.raw_pool.acquire() as conn: + ## Convert hstore fields to dict + await conn.set_builtin_type_codec('hstore', codec_name='pg_contrib.hstore') + + compiled = query.compile() + stmt = await conn.prepare(compiled.string) + columns = [a.name for a in stmt.get_attributes()] + data = await stmt.fetch(*[compiled.params.get(param) for param in compiled.positiontup]) + df = pd.DataFrame(data, columns=columns) + + ## Convert primary key columns to Int64: + ## allows NaN, fixing type convertion to float with merge + for pk in [c.name for c in cls.__table__.primary_key.columns]: + if pk in df.columns and df[pk].dtype=='int64': + df[pk] = df[pk].astype('Int64') + + if cast: + ## Cast the type for known types (datetime, ...) + for column_name in df.columns: + col = getattr(query.columns, column_name, None) + if col is None: + logger.debug(f'Cannot get column {column_name} in query for model {cls.__name__}') + continue + column_type = getattr(query.columns, column_name).type + ## XXX: Needs refinment, eg. nullable -> Int64 ... + if column_type.__class__ in pandas_cast_map: + df[column_name] = df[column_name].astype(pandas_cast_map[column_type.__class__]) + elif isinstance(column_type, (sqltypes.Date, sqltypes.DateTime)): + ## Dates, times + df[column_name] = pd.to_datetime(df[column_name]) + #elif isinstance(column_type, (sqltypes.Integer, sqltypes.Float)): + # ## Numeric + # df[column_name] = pd.to_numeric(df[column_name], errors='coerce') + ## XXX: keeping this note about that is about "char" SQL type, but the fix of #9694 makes it unnessary + #elif isinstance(column_type, sqltypes.CHAR) or (isinstance(column_type, sqltypes.String) and column_type.length == 1): + # ## Workaround for bytes being used for string of length 1 (not sure - why???) + # df[column_name] = df[column_name].str.decode('utf-8') + + ## Rename the columns, removing the schema_table prefix for the columns in that model + prefix = cls.get_table_name_prefix() + prefix_length = len(prefix) + 1 + rename_map = {colname: colname[prefix_length:] for colname in df.columns if colname.startswith(prefix)} + df.rename(columns=rename_map, inplace=True) + + ## Eventually convert geometry columns to EWKB + if geom_as_ewkt: + geometry_columns = [col.name for col in cls.__table__.columns if isinstance(col.type, Geometry)] + for column in geometry_columns: + df[column] = shapely.to_wkb(shapely.from_wkb(df.geom), hex=True, include_srid=True) + + return df diff --git a/src/models/project.py b/src/models/project.py new file mode 100644 index 0000000..6e46e9d --- /dev/null +++ b/src/models/project.py @@ -0,0 +1,225 @@ +from datetime import datetime +from csv import writer +from collections import defaultdict +from io import BytesIO, StringIO + +from sqlmodel import Field, SQLModel, MetaData, JSON, TEXT, Relationship, Column +import pyproj +from shapely.geometry import Point + +from ..config import conf +from .models_base import Model +from .metadata import gisaf_admin + +class Project(Model): + metadata = gisaf_admin + + class Admin: + menu = 'Other' + flask_admin_model_view = 'ProjectModelView' + + id: int = Field(default=None, primary_key=True) + name: str + contact_person: str + site: str + date_approved: datetime + start_date_planned: datetime + start_date_effective: datetime + end_date_planned: datetime + end_date_effective: datetime + + def __str__(self): + return '{self.name:s}'.format(self=self) + + def __repr__(self): + return ''.format(self=self) + + async def auto_import(self, registry): + """ + Import the points of the given project to the GIS DB + in their appropriate models.raw_survey_models + :return: dict of result (stats) + """ + from .category import Category + from .models_base import GeoPointSurveyModel + + result = defaultdict(int) + + categories = {cat.table_name: cat for cat in await Category.query.gino.all()} + + ## Define projections + survey_proj = pyproj.Proj(**conf.raw_survey['spatial_sys_ref']) + target_proj = pyproj.Proj(f'epsg:{conf.srid:d}') + def reproject(x, y, z): + return pyproj.transform(survey_proj, target_proj, x, y, z) + + ## TODO: Gino session + for survey_model_name, raw_survey_model in registry.raw_survey_models.items(): + category = categories[survey_model_name] + if not category.auto_import: + continue + survey_model = registry.geom_auto.get(survey_model_name) + if not survey_model: + continue + if not issubclass(survey_model, GeoPointSurveyModel): + continue + raw_survey_items = await raw_survey_model.query.where(raw_survey_model.project_id == self.id).gino.all() + for item in raw_survey_items: + if not item: + continue + new_point = survey_model( + id=item.id, + date=item.date, + accur_id=item.accur_id, + equip_id=item.equip_id, + srvyr_id=item.srvyr_id, + orig_id=item.orig_id, + status=item.status, + project_id=self.id, + ) + geom = Point(*reproject(item.easting, item.northing, item.elevation)) + new_point.geom = 'SRID={:d};{:s}'.format(conf.srid, geom.wkb_hex) + ## TODO: merge with Gino + #session.merge(new_point) + result[survey_model_name] += 1 + #session.commit() + return result + + + async def download_raw_survey_data(self): + from .raw_survey import RawSurveyModel + ## FIXME: old query style + breakpoint() + raw_survey_items = await RawSurveyModel.query.where(RawSurveyModel.project_id == self.id).gino.all() + csv_file = StringIO() + csv_writer = writer(csv_file) + for item in raw_survey_items: + csv_writer.writerow(item.to_row()) + now = '{:%Y-%m-%d_%H:%M}'.format(datetime.now()) + + ## XXX: not tested (aiohttp) + #return send_file(BytesIO(bytes(csv_file.getvalue(), 'utf-8')), + # attachment_filename='{:s}-{:s}.csv'.format(self.name, now), + # mimetype='text/csv', + # as_attachment=True) + headers = { + 'Content-Disposition': 'attachment; filename="{}"'.format('{:s}-{:s}.csv'.format(self.name, now)) + } + return web.Response( + status=200, + headers=headers, + content_type='text/csv', + body=BytesIO(bytes(csv_file.getvalue(), 'utf-8')) + ) + + async def download_reconciled_raw_survey_data(self, registry): + csv_file = StringIO() + csv_writer = writer(csv_file) + for model_name, model in registry.raw_survey_models.items(): + survey_items = await model.query.where(model.project_id == self.id).gino.all() + for item in survey_items: + csv_writer.writerow(item.to_row()) + now = '{:%Y-%m-%d_%H:%M}'.format(datetime.now()) + + ## XXX: not tested (aiohttp) + #return send_file(BytesIO(bytes(csv_file.getvalue(), 'utf-8')), + # attachment_filename='{:s}-{:s}-reconciled.csv'.format(self.name, now), + # mimetype='text/csv', + # as_attachment=True) + headers = { + 'Content-Disposition': 'attachment; filename="{}"'.format('{:s}-{:s}.csv'.format(self.name, now)) + } + return web.Response( + status=200, + headers=headers, + content_type='text/csv', + body=BytesIO(bytes(csv_file.getvalue(), 'utf-8')) + ) + + async def reconcile(self, registry): + from gisaf.models.reconcile import Reconciliation + result = {} + all_reconciliations = await Reconciliation.query.gino.all() + point_ids_to_reconcile = {p.id: registry.raw_survey_models[p.target] + for p in all_reconciliations + if p.target in registry.raw_survey_models} + + result['bad target'] = set([p.target for p in all_reconciliations + if p.target not in registry.raw_survey_models]) + result['from'] = defaultdict(int) + result['to'] = defaultdict(int) + result['unchanged'] = defaultdict(int) + + ## TODO: Gino session + for model_name, model in registry.raw_survey_models.items(): + points_to_reconcile = await model.query.\ + where(model.project_id==self.id).\ + where(model.id.in_(point_ids_to_reconcile.keys())).gino.all() + for point in points_to_reconcile: + new_model = point_ids_to_reconcile[point.id] + if new_model == model: + result['unchanged'][model] += 1 + continue + new_point = new_model( + id=point.id, + accur_id=point.accur_id, + srvyr_id=point.accur_id, + project_id=point.project_id, + status=point.status, + orig_id=point.orig_id, + equip_id=point.equip_id, + geom=point.geom, + date=point.date + ) + ## TODO: Gino add and delete + #session.add(new_point) + #session.delete(point) + result['from'][point.__class__] += 1 + result['to'][new_point.__class__] += 1 + return result + + + + # def download_raw_survey_data(self, session=None): + # from gisaf.models.raw_survey_models import RawSurvey + # from gisaf.registry import registry + # if not session: + # session = db.session + # raw_survey_items = session.query(RawSurvey).filter(RawSurvey.project_id == self.id).all() + # csv_file = StringIO() + # csv_writer = writer(csv_file) + # + # SURVEY_PROJ = pyproj.Proj(**conf.raw_survey['spatial_sys_ref']) + # TARGET_PROJ = pyproj.Proj(init='epsg:{:d}'.format(conf.srid)) + # + # def reproject(x, y, z): + # return pyproj.transform(SURVEY_PROJ, TARGET_PROJ, x, y, z) + # + # for item in raw_survey_items: + # csv_writer.writerow(item.to_row()) + # + # ## Add import of points, incl. reprojection, to registry.raw_survey_models: + # new_coords = reproject(item.easting, item.northing, item.elevation) + # geom = Point(*new_coords) + # ## TODO: from here + # model = registry.raw_survey_models + # new_point = model( + # id=item.id, + # category=item.category, + # date=item.date, + # accur_id=item.accur_id, + # equip_id=item.equip_id, + # srvyr_id=item.srvyr_id, + # orig_id=item.original_id, + # ) + # new_point.geom = 'SRID={:d};{:s}'.format(conf.srid, geom.wkb_hex) + # session.merge(new_point) + # result[item.category_info] += 1 + # + # now = '{:%Y-%m-%d_%H:%M}'.format(datetime.now()) + # + # return send_file(BytesIO(bytes(csv_file.getvalue(), 'utf-8')), + # attachment_filename='{:s}-{:s}.csv'.format(self.name, now), + # mimetype='text/csv', + # as_attachment=True) + diff --git a/src/models/raw_survey.py b/src/models/raw_survey.py new file mode 100644 index 0000000..0c32edc --- /dev/null +++ b/src/models/raw_survey.py @@ -0,0 +1,93 @@ +from sqlmodel import Field, SQLModel, MetaData, JSON, TEXT, Relationship, Column + +from .models_base import Model +from .models_base import GeoPointMModel, BaseSurveyModel +from .project import Project +from .category import Category +from .metadata import gisaf_survey + +class RawSurveyModel(BaseSurveyModel, GeoPointMModel): + metadata = gisaf_survey + __tablename__ = 'raw_survey' + + id: int = Field(default=None, primary_key=True) + project_id = db.Column(db.Integer, db.ForeignKey(Project.id)) + category = db.Column(db.String, db.ForeignKey(Category.name)) + in_menu = False + + @classmethod + def dyn_join_with(cls): + return { + 'project': Project.on(cls.project_id == Project.id), + 'category_info': Category.on(cls.category == Category.name), + } + + #id = db.Column(db.BigInteger, primary_key=True) + ## XXX: Can remove the rest since it's is in the GeoPointSurveyModel class? + #geom = db.Column(Geometry('POINTZ', srid=conf.raw_survey_srid)) + #date = db.Column(db.Date) + #orig_id = db.Column(db.String) + #status = db.Column(db.String(1)) + + def __str__(self): + return 'Raw Survey point id {:d}'.format(self.id) + + def to_row(self): + """ + Get a list of attributes, typically used for exporting in CSV + :return: list of attributes + """ + return [ + self.id, + self.easting, + self.northing, + self.elevation, + self.category, + self.surveyor, + self.equipment, + self.date.isoformat(), + self.accuracy.name, + self.category_info.status, + self.project.name, + self.orig_id + ] + + def auto_import(self, session, model=None, status=None): + """ + Automatically feed the raw_geom get_raw_survey_model_mapping + :return: + """ + if model is None: + # XXX: move as module import? + from gisaf.registry import registry + model = registry.get_raw_survey_model_mapping().get(self.category) + + new_point = model( + id=self.id, + geom=self.geom, + date=self.date, + project_id=self.project_id, + equip_id=self.equip_id, + srvyr_id=self.srvyr_id, + accur_id=self.accur_id, + orig_id=self.orig_id, + status=status, + ) + session.merge(new_point) + + +class OriginRawPoint(Model): + """ + Store information of the raw survey point used in the line work for each line and polygon shape + Filled when importing shapefiles + """ + __tablename__ = 'origin_raw_point' + __table_args__ = {'schema' : 'gisaf_survey'} + + id = db.Column(db.Integer, primary_key=True) + shape_table = db.Column(db.String, index=True) + shape_id = db.Column(db.Integer, index=True) + raw_point_id = db.Column(db.BigInteger) + + def __repr__(self): + return ''.format(self=self) \ No newline at end of file diff --git a/src/models/reconcile.py b/src/models/reconcile.py new file mode 100644 index 0000000..f40c355 --- /dev/null +++ b/src/models/reconcile.py @@ -0,0 +1,40 @@ +from datetime import datetime +from sqlalchemy import BigInteger +from sqlmodel import Field, SQLModel, MetaData, JSON, TEXT, Relationship, Column, String + +from .models_base import Model +from .metadata import gisaf_admin + + +class Reconciliation(Model): + metadata = gisaf_admin + + class Admin: + menu = 'Other' + flask_admin_model_view = 'ReconciliationModelView' + + id: int = Field(primary_key=True, sa_column=Column(BigInteger, autoincrement=False)) + target: str = Field(sa_column=Column(String(50))) + source: str = Field(sa_column=Column(String(50))) + + +class StatusChange(Model): + metadata = gisaf_admin + __tablename__ = 'status_change' + + id: int = Field(BigInteger, primary_key=True, sa_column=Column(autoincrement=False)) + store: str = Field(sa_column=Column(String(50))) + ref_id: int = Field(sa_column=Column(BigInteger())) + original: str = Field(sa_column=Column(String(1))) + new: str = Field(sa_column=Column(String(1))) + time: datetime + + +class FeatureDeletion(Model): + metadata = gisaf_admin + __tablename__ = 'feature_deletion' + + id: int = Field(BigInteger, primary_key=True, sa_column=Column(autoincrement=False)) + store: str = Field(sa_column=Column(String(50))) + ref_id: int = Field(sa_column=Column(BigInteger())) + time: datetime \ No newline at end of file diff --git a/src/models/survey.py b/src/models/survey.py new file mode 100644 index 0000000..8fc55b3 --- /dev/null +++ b/src/models/survey.py @@ -0,0 +1,84 @@ +from enum import Enum + +from sqlmodel import Field, SQLModel + +from .models_base import Model +from .metadata import gisaf_survey + + +class Accuracy(Model): + metadata = gisaf_survey + + class Admin: + menu = 'Other' + flask_admin_model_view = 'MyModelViewWithPrimaryKey' + + id: int = Field(default=None, primary_key=True) + name: str + accuracy: float + + def __str__(self): + return f'{self.name} {self.accuracy}' + + def __repr__(self): + return f'' + + +class Surveyor(Model): + metadata = gisaf_survey + + class Admin: + menu = 'Other' + flask_admin_model_view = 'MyModelViewWithPrimaryKey' + + id: int = Field(default=None, primary_key=True) + name: str + + def __str__(self): + return self.name + + def __repr__(self): + return f'' + + +class Equipment(Model): + metadata = gisaf_survey + + class Admin: + menu = 'Other' + flask_admin_model_view = 'MyModelViewWithPrimaryKey' + + id: int = Field(default=None, primary_key=True) + name: str + + def __str__(self): + return self.name + + def __repr__(self): + return f'' + +class GeometryType(str, Enum): + point = 'Point' + line_work = 'Line_work' + +class AccuracyEquimentSurveyorMapping(Model): + metadata = gisaf_survey + __tablename__ = 'accuracy_equiment_surveyor_mapping' + + class Admin: + menu = 'Other' + + id: int = Field(default=None, primary_key=True) + surveyor_id: int = Field(foreign_key='surveyor.id', index=True) + equipment_id: int = Field(foreign_key='equipment.id', index=True) + geometry_type: GeometryType = Field(default='Point', index=True) + accuracy_id: int = Field(foreign_key='accuracy.id') + + @classmethod + def dyn_join_with(cls): + return { + 'surveyor': Surveyor, + 'equipment': Equipment, + 'accuracy': Accuracy, + } + diff --git a/src/models/tags.py b/src/models/tags.py index f051f0b..7138078 100644 --- a/src/models/tags.py +++ b/src/models/tags.py @@ -1,9 +1,45 @@ from typing import Any +from sqlalchemy import BigInteger +from sqlalchemy.ext.mutable import MutableDict +from sqlalchemy.dialects.postgresql import HSTORE from sqlmodel import Field, SQLModel, MetaData, JSON, TEXT, Relationship, Column from pydantic import computed_field from .metadata import gisaf -from .models_base import GeoPointModel +from .geo_models_base import GeoPointModel class Tags(GeoPointModel, table=True): - metadata = gisaf \ No newline at end of file + metadata = gisaf + hidden: bool = True + + class Admin: + menu = 'Other' + flask_admin_model_view = 'TagModelView' + + id: int | None = Field(primary_key=True) + store: str = Field(index=True) + ref_id: int = Field(index=True, sa_column=Column(BigInteger)) + tags: dict = Field(sa_column=Column(MutableDict.as_mutable(HSTORE))) + + def __str__(self): + return '{self.store:s} {self.ref_id}: {self.tags}'.format(self=self) + + def __repr__(self): + return ''.format(self=self) + + +class TagKey(SQLModel, table=True): + metadata = gisaf + ## CREATE TABLE gisaf.tagkey (key VARCHAR(255) primary key); + + class Admin: + menu = 'Other' + flask_admin_model_view = 'TagKeyModelView' + + id: str | None = Field(primary_key=True) + + def __str__(self): + return self.key + + def __repr__(self): + return ''.format(self=self) \ No newline at end of file diff --git a/src/registry.py b/src/registry.py new file mode 100644 index 0000000..9bb8172 --- /dev/null +++ b/src/registry.py @@ -0,0 +1,627 @@ +""" +Define the models for the ORM +""" +import logging +import importlib +import pkgutil +from collections import OrderedDict, defaultdict +from importlib.metadata import entry_points +from typing import List + +from sqlalchemy import inspect + +import numpy as np +import pandas as pd + +from .config import conf +from .models import misc, category, project, reconcile, map_bases, tags +#from .models.graphql import GeomGroup, GeomModel +from .models.geo_models_base import ( + PlottableModel, + GeoModel, + RawSurveyBaseModel, + LineWorkSurveyModel, + GeoPointSurveyModel, + GeoLineSurveyModel, + GeoPolygonSurveyModel, +) +from .utils import ToMigrate + +registry = None + +logger = logging.getLogger('Gisaf registry') + + +def import_submodules(package, recursive=True): + """ Import all submodules of a module, recursively, including subpackages + + :param package: package (name or actual module) + :type package: str | module + :param recursive: scan package recursively + :rtype: dict[str, types.ModuleType] + """ + if isinstance(package, str): + package = importlib.import_module(package) + results = {} + for loader, name, is_pkg in pkgutil.walk_packages(package.__path__): + full_name = package.__name__ + '.' + name + results[full_name] = importlib.import_module(full_name) + if recursive and is_pkg: + results.update(import_submodules(full_name)) + return results + + +class ModelRegistry: + """ + Collect, categorize, and initialize the SQLAlchemy data models. + Maintains registries for all kind of model types, eg. geom, data, values... + Provides tools to get the models from their names, table names, etc. + """ + def __init__(self, raw_survey_models=None, survey_models=None): + """ + Get geo models + :return: None + """ + self.geom_custom = {} + self.geom_custom_store = {} + self.values = {} + self.other = {} + self.misc = {} + self.raw_survey_models = raw_survey_models or {} + self.geom_auto = survey_models or {} + + def scan_entry_points(self, name): + """ + Get the entry points in gisaf_extras.models, and return their models + :return: dict of name: models + """ + named_objects = {} + for entry_point in entry_points().select(group=name): + try: + named_objects.update({entry_point.name: entry_point.load()}) + except ModuleNotFoundError as err: + logger.warning(err) + return named_objects + + def add_model(self, model): + """ + Add the model + :return: Model type (one of {'GeoModel', 'PlottableModel', 'Other model'}) + """ + table_name = model.get_store_name() + if issubclass(model, GeoModel) and not issubclass(model, RawSurveyBaseModel) and not model.hidden: + self.geom_custom[table_name] = model + return 'GeoModel' + elif issubclass(model, PlottableModel): + self.values[table_name] = model + return 'PlottableModel' + else: + self.other[table_name] = model + return 'Other model' + + def add_store(self, store): + self.geom_custom_store[store.name] = store + + def scan(self): + """ + Scan all models defined explicitely (not the survey ones, which are defined by categories), + and store them for reference. + :return: + """ + from gisaf import models + + ## Scan the models defined in modules + for module_name, module in import_submodules(models).items(): + for name in dir(module): + obj = getattr(module, name) + if hasattr(obj, '__module__') and obj.__module__.startswith(module.__name__)\ + and hasattr(obj, '__tablename__'): + model_type = self.add_model(obj) + logger.debug(f'Model {obj.get_store_name()} added in the registry from gisaf source tree as {model_type}') + + ## Scan the models defined in plugins (setuptools' entry points) + for module_name, model in self.scan_entry_points(name='gisaf_extras.models').items(): + model_type = self.add_model(model) + logger.debug(f'Model {model.get_store_name()} added in the registry from {module_name} entry point as {model_type}') + + for module_name, store in self.scan_entry_points(name='gisaf_extras.stores').items(): + self.add_store(store) + logger.debug(f'Store {store} added in the registry from {module_name} gisaf_extras.stores entry point') + + ## Add misc models + for module in misc, category, project, reconcile, map_bases, tags: + for name in dir(module): + obj = getattr(module, name) + if hasattr(obj, '__module__') and hasattr(obj, '__tablename__'): + self.misc[name] = obj + + async def build(self): + """ + Build the registry: organize all models in a common reference point. + This should be executed after the discovery of surey models (categories) + and the scan of custom/module defined models. + """ + ## Combine all geom models (auto and custom) + self.geom = {**self.geom_auto, **self.geom_custom} + + await self.make_stores() + + ## Some lists of table, by usage + ## XXX: Gino: doesn't set __tablename__ and __table__ , or engine not started??? + ## So, hack the table names of auto_geom + #self.geom_tables = [model.__tablename__ + self.geom_tables = [getattr(model, "__tablename__", None) + for model in sorted(list(self.geom.values()), + key=lambda a: a.z_index)] + + values_tables = [model.__tablename__ for model in self.values.values()] + other_tables = [model.__tablename__ for model in self.other.values()] + + self.data_tables = values_tables + other_tables + + ## Build a dict for quick access to the values from a model + self.values_for_model = {} + for model_value in self.values.values(): + for constraint in inspect(model_value).foreign_key_constraints: + model = self.get_geom_model_from_table_name(constraint.referred_table.name) + self.values_for_model[model] = model_value + self.make_menu() + + def make_menu(self): + """ + Build the Admin menu + :return: + """ + self.menu = defaultdict(list) + for name, model in self.stores.model.items(): + if hasattr(model, 'Admin'): + self.menu[model.Admin.menu].append(model) + + def get_raw_survey_model_mapping(self): + """ + Get a mapping of category_name -> model for categories + :return: dict of name -> model (class) + """ + ## TODO: add option to pass a single item + ## Local imports, avoiding cyclic dependencies + ## FIXME: Gino + from .models.category import Category + from .database import db + categories = db.session.query(Category) + return {category.name: self.raw_survey_models[category.table_name] + for category in categories + if self.raw_survey_models.get(category.table_name)} + + async def get_model_id_params(self, model, id): + """ + Return the parameters for this item (table name, id), displayed in info pane + """ + if not model: + return {} + item = await model.load(**model.get_join_with()).query.where(model.id==id).gino.first() + if not item: + return {} + resp = {} + resp['itemName'] = item.caption + resp['geoInfoItems'] = await item.get_geo_info() + resp['surveyInfoItems'] = await item.get_survey_info() + resp['infoItems'] = await item.get_info() + resp['tags'] = await item.get_tags() + if hasattr(item, 'get_categorized_info'): + resp['categorized_info_items'] = await item.get_categorized_info() + if hasattr(item, 'get_graph'): + resp['graph'] = item.get_graph() + if hasattr(item, 'Attachments'): + if hasattr(item.Attachments, 'files'): + resp['files'] = await item.Attachments.files(item) + if hasattr(item.Attachments, 'images'): + resp['images'] = await item.Attachments.images(item) + if hasattr(item, 'get_external_record_url'): + resp['externalRecordUrl'] = item.get_external_record_url() + return resp + + def get_geom_model_from_table_name(self, table_name): + """ + Utility func to get a geom model from a table name + :param table_name: str + :return: model or None + """ + for model in self.geom.values(): + if model.__tablename__ == table_name: + return model + + def get_other_model_from_table_name(self, table_name): + """ + Utility func to get a non-geom model from a table name + :param table_name: str + :return: model or None + """ + for model in registry.other.values(): + if model.__tablename__ == table_name: + return model + for model in registry.values.values(): + if model.__tablename__ == table_name: + return model + + async def make_stores(self): + """ + Make registry for primary groups, categories and survey stores using Pandas dataframes. + Used in GraphQl queries. + """ + ## Utility functions used with apply method (dataframes) + def fill_columns_from_custom_models(row): + return ( + row.model.__namespace__['__qualname__'], ## Name of the class - hacky + row.model.description, + row.model.__table__.schema + ) + + def fill_columns_from_custom_stores(row): + return ( + row.model.description, + row.model.description, + None ## Schema + ) + + def get_store_name(category): + fragments = ['V', category.group, category.minor_group_1] + if category.minor_group_2 != '----': + fragments.append(category.minor_group_2) + return '.'.join([ + conf.survey['schema'], + '_'.join(fragments) + ]) + + self.categories = await category.Category.get_df() + self.categories['title'] = self.categories.long_name.fillna(self.categories.description) + + self.categories['store'] = self.categories.apply(get_store_name, axis=1) + + self.categories['count'] = pd.Series(dtype=pd.Int64Dtype()) + self.categories.set_index('name', inplace=True) + + df_models = pd.DataFrame(self.geom.items(), columns=['store', 'model']).set_index('store') + + self.categories = self.categories.merge(df_models, left_on='store', right_index=True) + self.categories['custom'] = False + self.categories['is_db'] = True + self.categories['name_letter'] = self.categories.index.str.slice(0, 1) + self.categories['name_number'] = self.categories.index.str.slice(1).astype('int64') + self.categories.sort_values(['name_letter', 'name_number'], inplace=True) + + ## Set in the stores dataframe some useful properties, from the model class + ## Maybe at some point it makes sense to get away from class-based definitions + if len(self.categories) > 0: + self.categories['store_name'] = self.categories.apply( + lambda row: row.model.get_store_name(), + axis=1 + ) + self.categories['raw_model_store_name'] = self.categories.apply( + lambda row: row.model.raw_model.store_name, + axis=1 + ) + self.categories['is_line_work'] = self.categories.apply( + lambda row: issubclass(row.model, LineWorkSurveyModel), + axis=1 + ) + ## Add the raw survey models + self.categories['raw_survey_model'] = self.categories.apply( + lambda row: self.raw_survey_models[row.raw_model_store_name], + axis=1 + ) + else: + self.categories['store_name'] = None + self.categories['raw_model_store_name'] = None + self.categories['is_line_work'] = None + self.categories['raw_survey_model'] = None + + ## Custom models (Misc) + self.custom_models = pd.DataFrame( + self.geom_custom.items(), + columns=['store', 'model'] + ).set_index('store') + self.custom_models['group'] = 'Misc' + self.custom_models['custom'] = True + self.custom_models['is_db'] = True + self.custom_models['raw_model_store_name'] = '' + self.custom_models['in_menu'] = self.custom_models.apply( + lambda row: getattr(row.model, 'in_menu', True), + axis=1 + ) + self.custom_models = self.custom_models.loc[self.custom_models.in_menu] + + if len(self.custom_models) > 0: + self.custom_models['long_name'],\ + self.custom_models['custom_description'],\ + self.custom_models['db_schema'],\ + = zip(*self.custom_models.apply(fill_columns_from_custom_models, axis=1)) + ## Try to give a meaningful description, eg. including the source (db_schema) + self.custom_models['description'] = self.custom_models['custom_description'].fillna(self.custom_models['long_name'] + '-' + self.custom_models['db_schema']) + self.custom_models['title'] = self.custom_models['long_name'] + + ## Custom stores (Community) + self.custom_stores = pd.DataFrame( + self.geom_custom_store.items(), + columns=['store', 'model'] + ).set_index('store') + self.custom_stores['group'] = 'Community' + self.custom_stores['custom'] = True + self.custom_stores['is_db'] = False + if len(self.custom_stores) == 0: + self.custom_stores['in_menu'] = False + else: + self.custom_stores['in_menu'] = self.custom_stores.apply( + lambda row: getattr(row.model, 'in_menu', True), + axis=1 + ) + self.custom_stores = self.custom_stores.loc[self.custom_stores.in_menu] + + if len(self.custom_stores) > 0: + self.custom_stores['long_name'],\ + self.custom_stores['description'],\ + self.custom_stores['db_schema'],\ + = zip(*self.custom_stores.apply(fill_columns_from_custom_stores, axis=1)) + self.custom_stores['title'] = self.custom_stores['long_name'] + + ## Combine Misc (custom) and survey (auto) stores + ## Retain only one status per category (defaultStatus, 'E'/existing by default) + self.stores = pd.concat([ + self.categories[self.categories.status==conf.map['defaultStatus'][0]].reset_index().set_index('store').sort_values('title'), + self.custom_models, + self.custom_stores + ]).drop(columns=['store_name']) + + ## Set in the stores dataframe some useful properties, from the model class + ## Maybe at some point it makes sense to get away from class-based definitions + def fill_columns_from_model(row): + return ( + row.model.mapbox_type or None, + row.model.icon, + row.model.symbol, + row.model.base_gis_type, + row.model.z_index, + ) + + self.stores['mapbox_type_default'],\ + self.stores['icon'],\ + self.stores['symbol'],\ + self.stores['base_gis_type'],\ + self.stores['z_index']\ + = zip(*self.stores.apply(fill_columns_from_model, axis=1)) + + self.stores['mapbox_type_custom'] = self.stores['mapbox_type_custom'].replace('', np.nan).fillna(np.nan) + self.stores['mapbox_type'] = self.stores['mapbox_type_custom'].fillna( + self.stores['mapbox_type_default'] + ) + + self.stores['viewable_role'] = self.stores.apply( + lambda row: getattr(row.model, 'viewable_role', None), + axis=1, + ) + self.stores['viewable_role'].replace('', None, inplace=True) + + def make_model_gql_object_type(row): + raise ToMigrate('make_model_gql_object_type') + # return GeomModel( + # name=row.long_name or row.description, + # category=row.name, + # description=row.description, + # store=row.name, + # rawSurveyStore=row.raw_model_store_name, + # #style=row.style, + # zIndex=row.z_index, + # custom=row.custom, + # count=None, + # group=row.group, + # type=row.mapbox_type, + # icon=row.icon, + # symbol=row.symbol, + # gisType=row.base_gis_type, + # viewableRole=row.viewable_role + # ) + + self.stores['gql_object_type'] = self.stores.apply(make_model_gql_object_type, axis=1) + self.stores['is_live'] = False + + ## Layer groups: Misc, survey's primary groups, Live + self.primary_groups = await category.CategoryGroup.get_df() + self.primary_groups.sort_values('name', inplace=True) + self.primary_groups['title'] = self.primary_groups['long_name'] + + ## Add Misc and Live + self.primary_groups.loc[-1] = ( + 'Misc', + False, + 'Misc and old layers (not coming from our survey; they will be organized, ' + 'eventually as the surveys get more complete)', + 'Misc', + ) + self.primary_groups.index = self.primary_groups.index + 1 + + self.primary_groups.loc[len(self.primary_groups)] = ( + 'Live', + False, + 'Layers from data processing, sensors, etc, and are updated automatically', + 'Live', + ) + + self.primary_groups.loc[len(self.primary_groups)] = ( + 'Community', + False, + 'Layers from community', + 'Community', + ) + + self.primary_groups.sort_index(inplace=True) + + def make_group(group): + return GeomGroup( + name=group['name'], + title=group['title'], + description=group['long_name'] + ) + + self.primary_groups['gql_object_type'] = self.primary_groups.apply(make_group, axis=1) + + async def get_stores(self, db): + """ + Get information about the available stores + """ + query = "SELECT schemaname, relname, n_live_tup FROM pg_stat_user_tables" + async with db.acquire(reuse=False) as connection: + rows = await connection.all(query) + all_tables_count = pd.DataFrame(rows, columns=['schema', 'table', 'count']) + all_tables_count['store'] = all_tables_count['schema'] + '.' + all_tables_count['table'] + all_tables_count.set_index(['store'], inplace=True) + + ## TODO: a DB VACUUM can be triggered if all counts are 0? + + ## Update the count in registry's stores + self.stores.loc[:, 'count'] = all_tables_count['count'] + ## FIXME: count for custom stores + store_df = self.stores.loc[self.stores['count'] != 0] + def set_count(row): + row.gql_object_type.count = row['count'] + + store_df[store_df.is_db].apply(set_count, axis=1) + + return store_df.gql_object_type.to_list() + + #def update_live_layers(self, live_models: List[GeomModel]): + #raise ToMigrate('make_model_gql_object_type') + def update_live_layers(self, live_models): + """ + Update the live layers in the registry, using the provided list of GeomModel + """ + ## Remove existing live layers + self.stores.drop(self.stores[self.stores.is_live==True].index, inplace=True) + + ## Add provided live layers + ## Ideally, should be vectorized + for model in live_models: + self.stores.loc[model.store] = { + 'description': model.description, + 'group': model.group, + 'name': model.name, + 'gql_object_type': model, + 'is_live': True, + 'is_db': False, + 'custom': True, + } + + +category_model_mapper = { + 'Point': GeoPointSurveyModel, + 'Line': GeoLineSurveyModel, + 'Polygon': GeoPolygonSurveyModel, +} + + +async def make_category_models(raw_survey_models, geom_models): + """ + Make geom models from the category model, and update raw_survey_models and geom_models + Important notes: + - the db must be bound before running this function + - the db must be rebound after running this function, + so that the models created are actually bound to the db connection + :return: + """ + from .models.category import Category, CategoryGroup + ## XXX: Using Gino! + categories = await Category.load(group_info=CategoryGroup).order_by(Category.long_name).gino.all() + for category in categories: + ## Several statuses can coexist for the same model, so + ## consider only the ones with the 'E' (existing) status + ## The other statuses are defined only for import (?) + if getattr(category, 'status', 'E') != 'E': + continue + + ## Python magic here! Create classes using type(name, bases, dict) + try: + store_name = '{}.RAW_{}'.format(conf.survey['schema_raw'], category.table_name) + raw_survey_models[store_name] = type( + category.raw_survey_table_name, + (RawSurveyBaseModel, ), { + '__tablename__': category.raw_survey_table_name, + '__table_args__': { + 'schema': conf.survey['schema_raw'], + 'extend_existing': True + }, + 'category': category, + 'group': category.group_info, + 'viewable_role': category.viewable_role, + 'store_name': store_name, + 'icon': '' + }) + except Exception as err: + logger.warning(err) + else: + logger.debug('Discovered {:s}'.format(category.raw_survey_table_name)) + + model_class = category_model_mapper.get(category.model_type) + try: + if model_class: + schema = conf.survey['schema'] + store_name = f'{schema}.{category.table_name}' + raw_survey_store_name = f"{conf.survey['schema_raw']}.RAW_{category.table_name}" + geom_models[store_name] = type( + category.table_name, + (model_class, ), { + '__tablename__': category.table_name, + '__table_args__': { + 'schema': schema, + 'extend_existing': True + }, + 'category': category, + 'group': category.group_info, + 'raw_model': raw_survey_models.get(raw_survey_store_name), + 'viewable_role': category.viewable_role, + 'symbol': category.symbol, + 'icon': f'{schema}-{category.table_name}' + }) + except Exception as err: + logger.warning(err) + else: + logger.debug('Discovered {:s}'.format(category.table_name)) + + logger.info('Discovered {:d} models'.format(len(categories))) + + +async def make_registry(app): + """ + Make (or refresh) the registry of models. + :return: + """ + global registry + registry = ModelRegistry(app['raw_survey_models'], app['survey_models']) + registry.scan() + await registry.build() + app['registry'] = registry + ## If ogcapi is in app (i.e. not with scheduler): + ## Now that the models are refreshed, tells the ogcapi to (re)build + if 'ogcapi' in app: + await app['ogcapi'].build() + + +## Below, some unused code, maybe to be used later for displaying layers in a tree structure + +## Some magic for making a tree from enumarables, +## https://gist.github.com/hrldcpr/2012250 +#Tree = lambda: defaultdict(Tree) +# +# +#def add(t, path): +# for node in path: +# t = t[node] +# +# +#dicts = lambda t: {k: dicts(t[k]) for k in t} +# +# +#def get_geom_models_tree(): +# tree = Tree() +# for model in models.geom_custom: +# full_name = model.__module__[len('gisaf.models')+1:] +# add(tree, full_name.split('.')) +# add(tree, full_name.split('.') + [model]) +# return dicts(tree) diff --git a/src/security.py b/src/security.py index 95cd929..5e952b8 100644 --- a/src/security.py +++ b/src/security.py @@ -1,7 +1,5 @@ from datetime import datetime, timedelta import logging -from typing import Annotated -#from passlib.context import CryptContext from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer diff --git a/src/utils.py b/src/utils.py new file mode 100644 index 0000000..b093fe3 --- /dev/null +++ b/src/utils.py @@ -0,0 +1,400 @@ +import logging +import asyncio +from functools import wraps +from json import dumps, JSONEncoder +from math import isnan +from time import time +import datetime +import pyproj + +from numpy import ndarray +import pandas as pd + +from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.sql.expression import delete + +# from graphene import ObjectType + +from .config import conf + +class ToMigrate(Exception): + pass + +SHAPELY_TYPE_TO_MAPBOX_TYPE = { + 'Point': 'symbol', + 'LineString': 'line', + 'Polygon': 'fill', + 'MultiPolygon': 'fill', +} + +DEFAULT_MAPBOX_LAYOUT = { + 'symbol': { + 'text-line-height': 1, + 'text-padding': 0, + 'text-allow-overlap': True, + 'text-field': '\ue32b', + 'icon-optional': True, + 'text-font': ['GisafSymbols'], + 'text-size': 24, + } +} + +DEFAULT_MAPBOX_PAINT = { + 'symbol': { + 'text-translate-anchor': 'viewport', + 'text-color': '#000000', + }, + 'line': { + 'line-color': 'red', + 'line-opacity': 0.70, + 'line-width': 2, + 'line-blur': 0.5, + }, + 'fill': { + 'fill-color': 'blue', + 'fill-opacity': 0.50, + } +} + +MAPBOX_COLOR_ATTRIBUTE_NAME = { + 'symbol': 'text-color', + 'line': 'line-color', + 'fill': 'fill-color', +} + +MAPBOX_OPACITY_ATTRIBUTE_NAME = { + 'symbol': 'text-opacity', + 'line': 'line-opacity', + 'fill': 'fill-opacity', +} + +gisTypeSymbolMap = { + 'Point': '\ue32b', + 'Line': '\ue32c', + 'Polygon': '\ue32d', + 'MultiPolygon': '\ue32d', +} + + +# survey_to_db_project_func = pyproj.Transformer.from_crs( +# conf.geo.raw_survey.spatial_sys_ref, +# conf.geo.srid, +# always_xy=True +# ).transform + + +class NumpyEncoder(JSONEncoder): + """ + Encoder that can serialize numpy arrays and datetime objects + """ + def default(self, obj): + if isinstance(obj, datetime.datetime): + return obj.isoformat() + if isinstance(obj, datetime.date): + return obj.isoformat() + if isinstance(obj, datetime.timedelta): + return (datetime.datetime.min + obj).time().isoformat() + if isinstance(obj, ndarray): + #return obj.tolist() + ## TODO: convert nat to None + return [None if isinstance(rr, float) and isnan(rr) else rr for rr in obj] + if isinstance(obj, float) and isnan(obj): + return None + if isinstance(obj, bytes): + return obj.decode() + return JSONEncoder.default(self, obj) + + +# class GraphQlObjectTypeEncoder(JSONEncoder): +# """ +# Encoder that can serialize basic Graphene ObjectTypes +# """ +# def default(self, obj): +# if isinstance(obj, datetime.datetime): +# return obj.isoformat() +# if isinstance(obj, datetime.date): +# return obj.isoformat() +# if isinstance(obj, ObjectType): +# return obj.__dict__ + + +# def json_response(data, body=None, status=200, +# reason=None, headers=None, content_type='application/json', check_circular=True, +# **kwargs): +# text = dumps(data, cls=NumpyEncoder, separators=(',', ':'), check_circular=check_circular) +# return web.Response(text=text, body=body, status=status, reason=reason, +# headers=headers, content_type=content_type, **kwargs) + + +def get_join_with(cls, recursive=True): + """ + Helper function for loading related tables with a Gino loader (left outer join) + Should work recursively... + Eg: + cls.load(**get_join_with(cls)).query.gino.all() + :param cls: + :return: + """ + if hasattr(cls, 'dyn_join_with'): + joins = cls.dyn_join_with() + else: + joins = {} + if hasattr(cls, '_join_with'): + joins.update(cls._join_with) + if not recursive: + return joins + recursive_joins = {} + for name, join in joins.items(): + more_joins = get_join_with(join) + if more_joins: + aliased = {name: join.alias() for name, join in more_joins.items()} + recursive_joins[name] = join.load(**aliased) + else: + recursive_joins[name] = join + return recursive_joins + +def get_joined_query(cls): + """ + Helper function to get a query from a model with all the related tables loaded + :param cls: + :return: + """ + return cls.load(**get_join_with(cls)).query + + +def timeit(f): + """ + Decorator for timing *non async* methods (development tool for performance analysis) + """ + @wraps(f) + def wrap(*args, **kw): + ts = time() + result = f(*args, **kw) + te = time() + logging.debug('func:{} args:{}, {} took: {:2.4f} sec'.format(f.__name__, args, kw, te-ts)) + return result + return wrap + + +def atimeit(func): + """ + Decorator for timing *async* methods (development tool for performance analysis) + """ + async def process(func, *args, **params): + if asyncio.iscoroutinefunction(func): + #logging.debug('this function is a coroutine: {}'.format(func.__name__)) + return await func(*args, **params) + else: + #logging.debug('this is not a coroutine') + return func(*args, **params) + + async def helper(*args, **params): + #logging.debug('{}.time'.format(func.__name__)) + start = time() + result = await process(func, *args, **params) + + # Test normal function route... + # result = await process(lambda *a, **p: print(*a, **p), *args, **params) + + logging.debug("{} {}".format(func.__name__, time() - start)) + return result + + return helper + + +async def delete_df(df, model): + """ + Delete all data in the model's table in the database + that matches data in the pandas dataframe. + """ + table = model.__table__ + ids = df.reset_index()['id'].values + delete_stmt = delete(table).where(model.id.in_(ids)) + async with db.bind.raw_pool.acquire() as conn: + async with conn.transaction(): + await conn.execute(str(delete_stmt), *ids) + + +async def upsert_df(df, model): + """ + Insert or update all data in the model's table in the database + that's present in the pandas dataframe. + Use postgres insert ... on conflict update... + with a series of inserts with with one row at a time. + For GeoDataFrame: the "geometry" column (df._geometry_column_name) is not honnored + (yet). It's the caller's responsibility to have a proper column name + (typically "geom" in Gisaf models) with a EWKT or EWKB representation of the geometry. + """ + ## See: https://stackoverflow.com/questions/33307250/postgresql-on-conflict-in-sqlalchemy + + if len(df) == 0: + return df + + table = model.__table__ + + ## Generate the 'upsert' statement, using fake values but defining columns + columns = {c.name for c in table.columns} + values = {col: None for col in df.columns if col in columns} + insrt_stmnt = insert(table, inline=True, values=values, returning=table.primary_key.columns) + df_columns = set(df.columns) + do_update_stmt = insrt_stmnt.on_conflict_do_update( + constraint=table.primary_key, + set_={ + k.name: getattr(insrt_stmnt.excluded, k.name) + for k in insrt_stmnt.excluded + if k.name in df_columns and + k.name not in [c.name for c in table.primary_key.columns] + } + ) + ## Filter and reorder the df columns + ## in order to match the order of columns in the insert statement + df = df[[col for col in do_update_stmt.compile().positiontup + if col in df_columns]].copy() + + def convert_to_object(value): + """ + Quick (but slow) and dirty: clean up values (nan, nat) for inserting to postgres via asyncpg + """ + if isinstance(value, float) and isnan(value): + return None + elif pd.isna(value): + return None + else: + return value + + # def encode_geometry(geometry): + # if not hasattr(geometry, '__geo_interface__'): + # raise TypeError('{g} does not conform to ' + # 'the geo interface'.format(g=geometry)) + # shape = shapely.geometry.asShape(geometry) + # return shapely.wkb.dumps(shape) + + # def decode_geometry(wkb): + # return shapely.wkb.loads(wkb) + + ## pks: list of dicts of primary keys + pks = {pk.name: [] for pk in table.primary_key.columns} + async with db.bind.raw_pool.acquire() as conn: + ## Set standard encoder for HSTORE, geometry + await conn.set_builtin_type_codec('hstore', codec_name='pg_contrib.hstore') + + #await conn.set_type_codec( + # 'geometry', # also works for 'geography' + # encoder=encode_geometry, + # decoder=decode_geometry, + # format='binary', + #) + #await conn.set_type_codec( + # 'json', + # encoder=json.dumps, + # decoder=json.loads, + # schema='pg_catalog' + #) + ## For a sequence of inserts: + insrt_stmnt_single = await conn.prepare(str(do_update_stmt)) + async with conn.transaction(): + for row in df.itertuples(index=False): + converted_row = [convert_to_object(v) for v in row] + returned = await insrt_stmnt_single.fetch(*converted_row) + for returned_single in returned: + for pk, value in returned_single.items(): + pks[pk].append(value) + ## Return a copy of the original df, with actual DB columns, data and the primary keys + for pk, values in pks.items(): + df[pk] = values + return df + + +#async def upsert_df(df, model): +# """ +# Experiment with port of pandas.io.sql port for asyncpg: sql_async +# """ +# from .sql_async import SQLDatabase, SQLTable +# +# table = model.__table__ +# +# async with db.bind.raw_pool.acquire() as conn: +# sql_db = SQLDatabase(conn) +# result = await sql_db.to_sql(df, table.name, if_exists='replace', index=False) +# return f'{len(df)} records imported (create or update)' + + +#async def upsert_df_bulk(df, model): +# """ +# Insert or update all data in the pandas dataframe to the model's table in the database. +# Use postgres insert ... on conflict update... +# in a bulk insert with all data in one request. +# """ +# raise NotImplementedError('Needs fix, use upsert_df instead') +# ## See: https://stackoverflow.com/questions/33307250/postgresql-on-conflict-in-sqlalchemy +# insrt_vals = df.to_dict(orient='records') +# +# insrt_stmnt = insert(model.__table__).values(insrt_vals) +# do_update_stmt = insrt_stmnt.on_conflict_do_update( +# constraint=model.__table__.primary_key, +# set_={ +# k.name: getattr(insrt_stmnt.excluded, k.name) +# for k in insrt_stmnt.excluded +# if k.name not in [c.name for c in model.__table__.primary_key.columns] +# } +# ) +# async with db.bind.raw_pool.acquire() as conn: +# ## For a sequence of inserts: +# insrt_stmnt_single = await conn.prepare(str(insert(model.__table__))) +# async with conn.transaction(): +# ## TODO: flatten the insrt_vals so that they match the request's $n placeholders +# await conn.execute(do_update_stmt, insrt_vals) + +#def encode_geometry(geometry): +# if not hasattr(geometry, '__geo_interface__'): +# raise TypeError('{g} does not conform to ' +# 'the geo interface'.format(g=geometry)) +# shape = shapely.geometry.asShape(geometry) +# geos.lgeos.GEOSSetSRID(shape._geom, conf.raw_survey['srid']) +# return shapely.wkb.dumps(shape, include_srid=True) + +#def decode_geometry(wkb): +# return shapely.wkb.loads(wkb) + +## XXX: dev notes +## What's the best way to save a dataframe to the DB? +## 1/ df.to_sql might have been an easy solution, doesn't support async operations +# +## 2/ Experiment with COPY (copy_records_to_table, see below): it doesn't update records. +#async with db.bind.raw_pool.acquire() as conn: +# await conn.set_type_codec( +# 'geometry', # also works for 'geography' +# encoder=encode_geometry, +# decoder=decode_geometry, +# format='binary', +# ) +# async with conn.transaction(): +# ## See https://github.com/MagicStack/asyncpg/issues/245 +# s = await conn.copy_records_to_table( +# model.__table__.name, +# schema_name=model.__table__.schema, +# records=[tuple(x) for x in gdf_for_db.values], +# columns=list(gdf_for_db.columns), +# timeout=10 +# ) +# +## 3/ SqlAclhemy/Asyncpg multiple inserts, then updates +### Build SQL statements +#insert = db.insert(model.__table__).compile() +#update = db.update(model.__table__).compile() +### Reorder the columns of the dataframe +#gdf_for_db = gdf_for_db[insert.positiontup] +### Find the records whose id already present in the DB, and segregate the df +#existing_records = await model.get_df(with_only_columns=['id']) +#gdf_insert = gdf_for_db[~gdf_for_db.id.isin(existing_records.id)] +#gdf_update = gdf_for_db[gdf_for_db.id.isin(existing_records.id)] +#async with db.bind.raw_pool.acquire() as conn: +# await conn.executemany(insert.string, [tuple(x) for x in gdf_insert.values]) +# await conn.executemany(update.string, [tuple(x) for x in gdf_update.values]) +## +## 4/ Fall back to gino. Bad luck, there's no equivalent to "merge", so the strategy is: +## - get all records ids in DB +## - build the set of records that needs update, and other that needs insert +## - do these operations (possibly in bulk) +# +## 5/ Make a utility lib for other use cases...