Basic registry, with survey stores
Move to standard src/ dir
versions: sqlmodel official, pydantic v2
etc...
This commit is contained in:
phil 2023-12-13 01:25:00 +05:30
parent 5494f6085f
commit 049b8c9927
31 changed files with 670 additions and 526 deletions

View file

View file

@ -0,0 +1,59 @@
from sqlmodel import Field, SQLModel, MetaData, Relationship
from .metadata import gisaf_admin
class UserRoleLink(SQLModel, table=True):
metadata = gisaf_admin
__tablename__: str = 'roles_users'
user_id: int | None = Field(
default=None, foreign_key="user.id", primary_key=True
)
role_id: int | None = Field(
default=None, foreign_key="role.id", primary_key=True
)
class UserBase(SQLModel):
username: str
email: str
class User(UserBase, table=True):
metadata = gisaf_admin
id: int | None = Field(default=None, primary_key=True)
roles: list["Role"] = Relationship(back_populates="users",
link_model=UserRoleLink)
password: str | None = None
class RoleBase(SQLModel):
name: str = Field(unique=True)
class RoleWithDescription(RoleBase):
description: str | None
class Role(RoleWithDescription, table=True):
metadata = gisaf_admin
id: int | None = Field(default=None, primary_key=True)
users: list[User] = Relationship(back_populates="roles",
link_model=UserRoleLink)
class UserReadNoRoles(UserBase):
id: int
email: str | None
class RoleRead(RoleBase):
id: int
users: list[UserReadNoRoles] = []
class RoleReadNoUsers(RoleBase):
id: int
class UserRead(UserBase):
id: int
email: str | None
roles: list[RoleReadNoUsers] = []

View file

@ -0,0 +1,18 @@
from pydantic import BaseModel
from ..config import conf, Map, Measures, Geo
from .authentication import UserRead
class Proj(BaseModel):
srid: str
srid_for_proj: str
class BootstrapData(BaseModel):
version: str = conf.version
title: str = conf.gisaf.title
windowTitle: str = conf.gisaf.windowTitle
map: Map = conf.map
geo: Geo = conf.geo
measures: Measures = conf.measures
redirect: str = conf.gisaf.redirect
user: UserRead | None = None

View file

@ -0,0 +1,121 @@
from typing import Any, ClassVar
from pydantic import computed_field, ConfigDict
from sqlmodel import Field, Relationship, SQLModel, JSON, TEXT, Column, select
from .metadata import gisaf_survey
from ..database import db_session, pandas_query
mapbox_type_mapping = {
'Point': 'symbol',
'Line': 'line',
'Polygon': 'fill',
}
class BaseModel(SQLModel):
@classmethod
async def get_df(cls):
async with db_session() as session:
query = select(cls)
return await session.run_sync(pandas_query, query)
class CategoryGroup(BaseModel, table=True):
metadata = gisaf_survey
__tablename__ = 'category_group'
name: str | None = Field(min_length=4, max_length=4,
default=None, primary_key=True)
major: str
long_name: str
categories: list['Category'] = Relationship(back_populates='category_group')
class Admin:
menu = 'Other'
flask_admin_model_view = 'CategoryGroupModelView'
class CategoryModelType(BaseModel, table=True):
metadata = gisaf_survey
__tablename__ = 'category_model_type'
name: str = Field(default=None, primary_key=True)
class Admin:
menu = 'Other'
flask_admin_model_view = 'MyModelViewWithPrimaryKey'
class CategoryBase(BaseModel):
model_config = ConfigDict(protected_namespaces=())
class Admin:
menu = 'Other'
flask_admin_model_view = 'CategoryModelView'
name: str | None = Field(default=None, primary_key=True)
domain: ClassVar[str] = 'V'
description: str | None
group: str = Field(min_length=4, max_length=4,
foreign_key="category_group.name", index=True)
minor_group_1: str = Field(min_length=4, max_length=4, default='----')
minor_group_2: str = Field(min_length=4, max_length=4, default='----')
status: str = Field(min_length=1, max_length=1)
custom: bool | None
auto_import: bool = True
model_type: str = Field(max_length=50,
foreign_key='category_model_type.name',
default='Point')
long_name: str | None = Field(max_length=50)
style: str | None = Field(sa_type=TEXT)
symbol: str | None = Field(max_length=1)
mapbox_type_custom: str | None = Field(max_length=32)
mapbox_paint: dict[str, Any] | None = Field(sa_type=JSON(none_as_null=True))
mapbox_layout: dict[str, Any] | None = Field(sa_type=JSON(none_as_null=True))
viewable_role: str | None
extra: dict[str, Any] | None = Field(sa_type=JSON(none_as_null=True))
@computed_field
@property
def layer_name(self) -> str:
"""
ISO compliant layer name (see ISO 13567)
:return: str
"""
return '{self.domain}-{self.group:4s}-{self.minor_group_1:4s}-{self.minor_group_2:4s}-{self.status:1s}'.format(self=self)
@computed_field
@property
def table_name(self) -> str:
"""
Table name
:return:
"""
if self.minor_group_2 == '----':
return '{self.domain}_{self.group:4s}_{self.minor_group_1:4s}'.format(self=self)
else:
return '{self.domain}_{self.group:4s}_{self.minor_group_1:4s}_{self.minor_group_2:4s}'.format(self=self)
@computed_field
@property
def raw_survey_table_name(self) -> str:
"""
Table name
:return:
"""
if self.minor_group_2 == '----':
return 'RAW_{self.domain}_{self.group:4s}_{self.minor_group_1:4s}'.format(self=self)
else:
return 'RAW_{self.domain}_{self.group:4s}_{self.minor_group_1:4s}_{self.minor_group_2:4s}'.format(self=self)
@computed_field
@property
def mapbox_type(self) -> str:
return self.mapbox_type_custom or mapbox_type_mapping[self.model_type]
class Category(CategoryBase, table=True):
metadata = gisaf_survey
name: str = Field(default=None, primary_key=True)
category_group: CategoryGroup = Relationship(back_populates="categories")
class CategoryRead(CategoryBase):
name: str

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,66 @@
from typing import Any
from sqlmodel import Field, String, JSON, Column
from .models_base import Model
from .metadata import gisaf_map
class BaseStyle(Model):
metadata = gisaf_map
__tablename__ = 'map_base_style'
class Admin:
menu = 'Other'
flask_admin_model_view = 'MapBaseStyleModelView'
id: int = Field(primary_key=True)
name: str
style: dict[str, Any] | None = Field(sa_type=JSON(none_as_null=True))
mbtiles: str = Field(sa_type=String(50))
static_tiles_url: str
enabled: bool = True
def __repr__(self):
return '<models.BaseStyle {self.name:s}>'.format(self=self)
class BaseMap(Model):
metadata = gisaf_map
__tablename__ = 'base_map'
class Admin:
menu = 'Other'
id: int = Field(primary_key=True)
name: str
def __repr__(self):
return '<models.BaseMap {self.name:s}>'.format(self=self)
def __str__(self):
return self.name
class BaseMapLayer(Model):
metadata = gisaf_map
__tablename__ = 'base_map_layer'
class Admin:
menu = 'Other'
id: int = Field(primary_key=True)
base_map_id: int = Field(foreign_key='base_map.id', index=True)
store: str = Field(sa_type=String(100))
@classmethod
def dyn_join_with(cls):
return {
'base_map': BaseMap,
}
def __repr__(self):
return f"<models.BaseMapLayer {self.store or '':s}>"
def __str__(self):
return f"{self.store or '':s}"

View file

@ -0,0 +1,10 @@
from sqlmodel import MetaData
from ..config import conf
gisaf = MetaData(schema='gisaf')
gisaf_survey = MetaData(schema='gisaf_survey')
gisaf_admin = MetaData(schema='gisaf_admin')
gisaf_map = MetaData(schema='gisaf_map')
raw_survey = MetaData(schema=conf.survey.db_schema_raw)
survey = MetaData(schema=conf.survey.db_schema)

37
src/gisaf/models/misc.py Normal file
View file

@ -0,0 +1,37 @@
import logging
from typing import Any
from pydantic import ConfigDict
from sqlmodel import Field, JSON, Column
from .models_base import Model
from .metadata import gisaf_map
logger = logging.getLogger(__name__)
class NotADataframeError(Exception):
pass
class Qml(Model):
"""
Model for storing qml (QGis style)
"""
model_config = ConfigDict(protected_namespaces=())
metadata = gisaf_map
class Admin:
menu = 'Other'
flask_admin_model_view = 'QmlModelView'
model_name: str = Field(default=None, primary_key=True)
qml: str
attr: str
style: str
mapbox_paint: dict[str, Any] | None = Field(sa_type=JSON(none_as_null=True))
mapbox_layout: dict[str, Any] | None = Field(sa_type=JSON(none_as_null=True))
def __repr__(self):
return '<models.Qml {self.model_name:s}>'.format(self=self)

View file

@ -0,0 +1,118 @@
from typing import Any
import logging
from sqlmodel import Field, SQLModel, MetaData, JSON, TEXT, Relationship, Column
from pydantic import computed_field
import numpy as np
import pandas as pd
import geopandas as gpd
import shapely
from sqlalchemy.sql import sqltypes
from geoalchemy2.types import Geometry
pandas_cast_map = {
sqltypes.Integer: 'Int64',
sqltypes.Float: 'float64',
}
logger = logging.getLogger('model_base_base')
class Model(SQLModel):
"""
Base mixin class for models that can be converted to a Pandas dataframe with get_df
"""
class Meta:
filtered_columns_on_map: list[str] = []
@classmethod
def get_store_name(cls):
return "{}.{}".format(cls.metadata.schema, cls.__tablename__)
@classmethod
def get_table_name_prefix(cls):
return "{}_{}".format(cls.metadata.schema, cls.__tablename__)
@classmethod
async def get_df(cls, where=None,
with_related=None, recursive=True,
cast=True,
with_only_columns=None,
geom_as_ewkt=False,
**kwargs):
"""
Return a Pandas dataframe of all records
Optional arguments:
* an SQLAlchemy where clause
* with_related: automatically get data from related columns, following the foreign keys in the model definitions
* cast: automatically transform various data in their best python types (eg. with date, time...)
* with_only_columns: fetch only these columns (list of column names)
* geom_as_ewkt: convert geometry columns to EWKB (handy for eg. using upsert_df)
:return:
"""
query = cls.query
if with_related is not False:
if with_related or getattr(cls, 'get_gdf_with_related', False):
joins = get_join_with(cls, recursive)
model_loader = cls.load(**joins)
query = _get_query_with_table_names(model_loader)
if where is not None:
query.append_whereclause(where)
if with_only_columns:
query = query.with_only_columns([getattr(cls, colname) for colname in with_only_columns])
## Got idea from https://github.com/MagicStack/asyncpg/issues/173.
async with query.bind.raw_pool.acquire() as conn:
## Convert hstore fields to dict
await conn.set_builtin_type_codec('hstore', codec_name='pg_contrib.hstore')
compiled = query.compile()
stmt = await conn.prepare(compiled.string)
columns = [a.name for a in stmt.get_attributes()]
data = await stmt.fetch(*[compiled.params.get(param) for param in compiled.positiontup])
df = pd.DataFrame(data, columns=columns)
## Convert primary key columns to Int64:
## allows NaN, fixing type convertion to float with merge
for pk in [c.name for c in cls.__table__.primary_key.columns]:
if pk in df.columns and df[pk].dtype=='int64':
df[pk] = df[pk].astype('Int64')
if cast:
## Cast the type for known types (datetime, ...)
for column_name in df.columns:
col = getattr(query.columns, column_name, None)
if col is None:
logger.debug(f'Cannot get column {column_name} in query for model {cls.__name__}')
continue
column_type = getattr(query.columns, column_name).type
## XXX: Needs refinment, eg. nullable -> Int64 ...
if column_type.__class__ in pandas_cast_map:
df[column_name] = df[column_name].astype(pandas_cast_map[column_type.__class__])
elif isinstance(column_type, (sqltypes.Date, sqltypes.DateTime)):
## Dates, times
df[column_name] = pd.to_datetime(df[column_name])
#elif isinstance(column_type, (sqltypes.Integer, sqltypes.Float)):
# ## Numeric
# df[column_name] = pd.to_numeric(df[column_name], errors='coerce')
## XXX: keeping this note about that is about "char" SQL type, but the fix of #9694 makes it unnessary
#elif isinstance(column_type, sqltypes.CHAR) or (isinstance(column_type, sqltypes.String) and column_type.length == 1):
# ## Workaround for bytes being used for string of length 1 (not sure - why???)
# df[column_name] = df[column_name].str.decode('utf-8')
## Rename the columns, removing the schema_table prefix for the columns in that model
prefix = cls.get_table_name_prefix()
prefix_length = len(prefix) + 1
rename_map = {colname: colname[prefix_length:] for colname in df.columns if colname.startswith(prefix)}
df.rename(columns=rename_map, inplace=True)
## Eventually convert geometry columns to EWKB
if geom_as_ewkt:
geometry_columns = [col.name for col in cls.__table__.columns if isinstance(col.type, Geometry)]
for column in geometry_columns:
df[column] = shapely.to_wkb(shapely.from_wkb(df.geom), hex=True, include_srid=True)
return df

225
src/gisaf/models/project.py Normal file
View file

@ -0,0 +1,225 @@
from datetime import datetime
from csv import writer
from collections import defaultdict
from io import BytesIO, StringIO
from sqlmodel import Field, SQLModel, MetaData, JSON, TEXT, Relationship, Column
import pyproj
from shapely.geometry import Point
from ..config import conf
from .models_base import Model
from .metadata import gisaf_admin
class Project(Model):
metadata = gisaf_admin
class Admin:
menu = 'Other'
flask_admin_model_view = 'ProjectModelView'
id: int = Field(default=None, primary_key=True)
name: str
contact_person: str
site: str
date_approved: datetime
start_date_planned: datetime
start_date_effective: datetime
end_date_planned: datetime
end_date_effective: datetime
def __str__(self):
return '{self.name:s}'.format(self=self)
def __repr__(self):
return '<models.Project {self.name:s}>'.format(self=self)
async def auto_import(self, registry):
"""
Import the points of the given project to the GIS DB
in their appropriate models.raw_survey_models
:return: dict of result (stats)
"""
from .category import Category
from .models_base import GeoPointSurveyModel
result = defaultdict(int)
categories = {cat.table_name: cat for cat in await Category.query.gino.all()}
## Define projections
survey_proj = pyproj.Proj(**conf.raw_survey['spatial_sys_ref'])
target_proj = pyproj.Proj(f'epsg:{conf.srid:d}')
def reproject(x, y, z):
return pyproj.transform(survey_proj, target_proj, x, y, z)
## TODO: Gino session
for survey_model_name, raw_survey_model in registry.raw_survey_models.items():
category = categories[survey_model_name]
if not category.auto_import:
continue
survey_model = registry.geom_auto.get(survey_model_name)
if not survey_model:
continue
if not issubclass(survey_model, GeoPointSurveyModel):
continue
raw_survey_items = await raw_survey_model.query.where(raw_survey_model.project_id == self.id).gino.all()
for item in raw_survey_items:
if not item:
continue
new_point = survey_model(
id=item.id,
date=item.date,
accur_id=item.accur_id,
equip_id=item.equip_id,
srvyr_id=item.srvyr_id,
orig_id=item.orig_id,
status=item.status,
project_id=self.id,
)
geom = Point(*reproject(item.easting, item.northing, item.elevation))
new_point.geom = 'SRID={:d};{:s}'.format(conf.srid, geom.wkb_hex)
## TODO: merge with Gino
#session.merge(new_point)
result[survey_model_name] += 1
#session.commit()
return result
async def download_raw_survey_data(self):
from .raw_survey import RawSurveyModel
## FIXME: old query style
breakpoint()
raw_survey_items = await RawSurveyModel.query.where(RawSurveyModel.project_id == self.id).gino.all()
csv_file = StringIO()
csv_writer = writer(csv_file)
for item in raw_survey_items:
csv_writer.writerow(item.to_row())
now = '{:%Y-%m-%d_%H:%M}'.format(datetime.now())
## XXX: not tested (aiohttp)
#return send_file(BytesIO(bytes(csv_file.getvalue(), 'utf-8')),
# attachment_filename='{:s}-{:s}.csv'.format(self.name, now),
# mimetype='text/csv',
# as_attachment=True)
headers = {
'Content-Disposition': 'attachment; filename="{}"'.format('{:s}-{:s}.csv'.format(self.name, now))
}
return web.Response(
status=200,
headers=headers,
content_type='text/csv',
body=BytesIO(bytes(csv_file.getvalue(), 'utf-8'))
)
async def download_reconciled_raw_survey_data(self, registry):
csv_file = StringIO()
csv_writer = writer(csv_file)
for model_name, model in registry.raw_survey_models.items():
survey_items = await model.query.where(model.project_id == self.id).gino.all()
for item in survey_items:
csv_writer.writerow(item.to_row())
now = '{:%Y-%m-%d_%H:%M}'.format(datetime.now())
## XXX: not tested (aiohttp)
#return send_file(BytesIO(bytes(csv_file.getvalue(), 'utf-8')),
# attachment_filename='{:s}-{:s}-reconciled.csv'.format(self.name, now),
# mimetype='text/csv',
# as_attachment=True)
headers = {
'Content-Disposition': 'attachment; filename="{}"'.format('{:s}-{:s}.csv'.format(self.name, now))
}
return web.Response(
status=200,
headers=headers,
content_type='text/csv',
body=BytesIO(bytes(csv_file.getvalue(), 'utf-8'))
)
async def reconcile(self, registry):
from gisaf.models.reconcile import Reconciliation
result = {}
all_reconciliations = await Reconciliation.query.gino.all()
point_ids_to_reconcile = {p.id: registry.raw_survey_models[p.target]
for p in all_reconciliations
if p.target in registry.raw_survey_models}
result['bad target'] = set([p.target for p in all_reconciliations
if p.target not in registry.raw_survey_models])
result['from'] = defaultdict(int)
result['to'] = defaultdict(int)
result['unchanged'] = defaultdict(int)
## TODO: Gino session
for model_name, model in registry.raw_survey_models.items():
points_to_reconcile = await model.query.\
where(model.project_id==self.id).\
where(model.id.in_(point_ids_to_reconcile.keys())).gino.all()
for point in points_to_reconcile:
new_model = point_ids_to_reconcile[point.id]
if new_model == model:
result['unchanged'][model] += 1
continue
new_point = new_model(
id=point.id,
accur_id=point.accur_id,
srvyr_id=point.accur_id,
project_id=point.project_id,
status=point.status,
orig_id=point.orig_id,
equip_id=point.equip_id,
geom=point.geom,
date=point.date
)
## TODO: Gino add and delete
#session.add(new_point)
#session.delete(point)
result['from'][point.__class__] += 1
result['to'][new_point.__class__] += 1
return result
# def download_raw_survey_data(self, session=None):
# from gisaf.models.raw_survey_models import RawSurvey
# from gisaf.registry import registry
# if not session:
# session = db.session
# raw_survey_items = session.query(RawSurvey).filter(RawSurvey.project_id == self.id).all()
# csv_file = StringIO()
# csv_writer = writer(csv_file)
#
# SURVEY_PROJ = pyproj.Proj(**conf.raw_survey['spatial_sys_ref'])
# TARGET_PROJ = pyproj.Proj(init='epsg:{:d}'.format(conf.srid))
#
# def reproject(x, y, z):
# return pyproj.transform(SURVEY_PROJ, TARGET_PROJ, x, y, z)
#
# for item in raw_survey_items:
# csv_writer.writerow(item.to_row())
#
# ## Add import of points, incl. reprojection, to registry.raw_survey_models:
# new_coords = reproject(item.easting, item.northing, item.elevation)
# geom = Point(*new_coords)
# ## TODO: from here
# model = registry.raw_survey_models
# new_point = model(
# id=item.id,
# category=item.category,
# date=item.date,
# accur_id=item.accur_id,
# equip_id=item.equip_id,
# srvyr_id=item.srvyr_id,
# orig_id=item.original_id,
# )
# new_point.geom = 'SRID={:d};{:s}'.format(conf.srid, geom.wkb_hex)
# session.merge(new_point)
# result[item.category_info] += 1
#
# now = '{:%Y-%m-%d_%H:%M}'.format(datetime.now())
#
# return send_file(BytesIO(bytes(csv_file.getvalue(), 'utf-8')),
# attachment_filename='{:s}-{:s}.csv'.format(self.name, now),
# mimetype='text/csv',
# as_attachment=True)

View file

@ -0,0 +1,97 @@
from typing import ClassVar
from sqlmodel import Field, BigInteger
from .models_base import Model
from .geo_models_base import GeoPointMModel, BaseSurveyModel
from .project import Project
from .category import Category
from .metadata import gisaf_survey
class RawSurveyModel(BaseSurveyModel, GeoPointMModel):
metadata = gisaf_survey
__tablename__ = 'raw_survey'
hidden: ClassVar[bool] = True
id: int = Field(default=None, primary_key=True)
project_id: int | None = Field(foreign_key='project.id')
category: str = Field(foreign_key='category.name')
in_menu: bool = False
@classmethod
def dyn_join_with(cls):
return {
'project': Project.on(cls.project_id == Project.id),
'category_info': Category.on(cls.category == Category.name),
}
#id = db.Column(db.BigInteger, primary_key=True)
## XXX: Can remove the rest since it's is in the GeoPointSurveyModel class?
#geom = db.Column(Geometry('POINTZ', srid=conf.raw_survey_srid))
#date = db.Column(db.Date)
#orig_id = db.Column(db.String)
#status = db.Column(db.String(1))
def __str__(self):
return 'Raw Survey point id {:d}'.format(self.id)
def to_row(self):
"""
Get a list of attributes, typically used for exporting in CSV
:return: list of attributes
"""
return [
self.id,
self.easting,
self.northing,
self.elevation,
self.category,
self.surveyor,
self.equipment,
self.date.isoformat(),
self.accuracy.name,
self.category_info.status,
self.project.name,
self.orig_id
]
def auto_import(self, session, model=None, status=None):
"""
Automatically feed the raw_geom get_raw_survey_model_mapping
:return:
"""
if model is None:
# XXX: move as module import?
from gisaf.registry import registry
model = registry.get_raw_survey_model_mapping().get(self.category)
new_point = model(
id=self.id,
geom=self.geom,
date=self.date,
project_id=self.project_id,
equip_id=self.equip_id,
srvyr_id=self.srvyr_id,
accur_id=self.accur_id,
orig_id=self.orig_id,
status=status,
)
session.merge(new_point)
class OriginRawPoint(Model):
"""
Store information of the raw survey point used in the line work
for each line and polygon shape
Filled when importing shapefiles
"""
metadata = gisaf_survey
__tablename__ = 'origin_raw_point'
id: int = Field(default=None, primary_key=True)
shape_table: str = Field(index=True)
shape_id: int = Field(index=True)
raw_point_id: int = Field(sa_type=BigInteger())
def __repr__(self):
return f'<models.OriginRawPoint {self.id:d} {self.shape_table:s} ' \
f'{self.shape_id:d} {self.raw_point_id:d}>'

View file

@ -0,0 +1,43 @@
from datetime import datetime
from sqlalchemy import BigInteger
from sqlmodel import Field, SQLModel, MetaData, JSON, TEXT, Relationship, Column, String
from .models_base import Model
from .metadata import gisaf_admin
class Reconciliation(Model):
metadata = gisaf_admin
class Admin:
menu = 'Other'
flask_admin_model_view = 'ReconciliationModelView'
id: int = Field(primary_key=True, sa_type=BigInteger,
sa_column_kwargs={'autoincrement': False})
target: str = Field(sa_type=String(50))
source: str = Field(sa_type=String(50))
class StatusChange(Model):
metadata = gisaf_admin
__tablename__ = 'status_change'
id: int = Field(primary_key=True, sa_type=BigInteger,
sa_column_kwargs={'autoincrement': False})
store: str = Field(sa_type=String(50))
ref_id: int = Field(sa_type=BigInteger())
original: str = Field(sa_type=String(1))
new: str = Field(sa_type=String(1))
time: datetime
class FeatureDeletion(Model):
metadata = gisaf_admin
__tablename__ = 'feature_deletion'
id: int = Field(BigInteger, primary_key=True,
sa_column_kwargs={'autoincrement': False})
store: str = Field(sa_type=String(50))
ref_id: int = Field(sa_type=BigInteger())
time: datetime

43
src/gisaf/models/store.py Normal file
View file

@ -0,0 +1,43 @@
from typing import Any
from pydantic import BaseModel
from .geo_models_base import GeoModel, RawSurveyBaseModel, GeoPointSurveyModel
class MapLibreStyle(BaseModel):
...
class Store(BaseModel):
auto_import: bool
base_gis_type: str
count: int
custom: bool
description: str
#extra: dict[str, Any] | None
group: str
#icon: str
in_menu: bool
is_db: bool
is_line_work: bool
is_live: bool
long_name: str | None
#mapbox_layout: dict[str, Any] | None
#mapbox_paint: dict[str, Any] | None
#mapbox_type: str
mapbox_type_custom: str | None
#mapbox_type_default: str
minor_group_1: str
minor_group_2: str
#model: GeoModel
model_type: str
name: str
#name_letter: str
#name_number: int
#raw_model: GeoPointSurveyModel
#raw_model_store_name: str
status: str
store: str
style: str | None
symbol: str | None
title: str
viewable_role: str | None
z_index: int

View file

@ -0,0 +1,84 @@
from enum import Enum
from sqlmodel import Field, SQLModel
from .models_base import Model
from .metadata import gisaf_survey
class Accuracy(Model):
metadata = gisaf_survey
class Admin:
menu = 'Other'
flask_admin_model_view = 'MyModelViewWithPrimaryKey'
id: int = Field(default=None, primary_key=True)
name: str
accuracy: float
def __str__(self):
return f'{self.name} {self.accuracy}'
def __repr__(self):
return f'<models.Accuracy {self.name}>'
class Surveyor(Model):
metadata = gisaf_survey
class Admin:
menu = 'Other'
flask_admin_model_view = 'MyModelViewWithPrimaryKey'
id: int = Field(default=None, primary_key=True)
name: str
def __str__(self):
return self.name
def __repr__(self):
return f'<models.Surveyor {self.name}>'
class Equipment(Model):
metadata = gisaf_survey
class Admin:
menu = 'Other'
flask_admin_model_view = 'MyModelViewWithPrimaryKey'
id: int = Field(default=None, primary_key=True)
name: str
def __str__(self):
return self.name
def __repr__(self):
return f'<models.Equipment {self.name}>'
class GeometryType(str, Enum):
point = 'Point'
line_work = 'Line_work'
class AccuracyEquimentSurveyorMapping(Model):
metadata = gisaf_survey
__tablename__ = 'accuracy_equiment_surveyor_mapping'
class Admin:
menu = 'Other'
id: int = Field(default=None, primary_key=True)
surveyor_id: int = Field(foreign_key='surveyor.id', index=True)
equipment_id: int = Field(foreign_key='equipment.id', index=True)
geometry_type: GeometryType = Field(default='Point', index=True)
accuracy_id: int = Field(foreign_key='accuracy.id')
@classmethod
def dyn_join_with(cls):
return {
'surveyor': Surveyor,
'equipment': Equipment,
'accuracy': Accuracy,
}

45
src/gisaf/models/tags.py Normal file
View file

@ -0,0 +1,45 @@
from typing import Any, ClassVar
from sqlalchemy import BigInteger
from sqlalchemy.ext.mutable import MutableDict
from sqlalchemy.dialects.postgresql import HSTORE
from sqlmodel import Field, SQLModel, MetaData, JSON, TEXT, Relationship, Column
from pydantic import computed_field
from .metadata import gisaf
from .geo_models_base import GeoPointModel
class Tags(GeoPointModel, table=True):
metadata = gisaf
hidden: ClassVar[bool] = True
class Admin:
menu = 'Other'
flask_admin_model_view = 'TagModelView'
id: int | None = Field(primary_key=True)
store: str = Field(index=True)
ref_id: int = Field(index=True, sa_type=BigInteger)
tags: dict = Field(sa_type=MutableDict.as_mutable(HSTORE))
def __str__(self):
return '{self.store:s} {self.ref_id}: {self.tags}'.format(self=self)
def __repr__(self):
return '<models.Tag {self.store:s} {self.ref_id}: {self.tags}>'.format(self=self)
class TagKey(SQLModel, table=True):
metadata = gisaf
## CREATE TABLE gisaf.tagkey (key VARCHAR(255) primary key);
class Admin:
menu = 'Other'
flask_admin_model_view = 'TagKeyModelView'
id: str | None = Field(primary_key=True)
def __str__(self):
return self.key
def __repr__(self):
return '<models.TagKey {self.key}>'.format(self=self)