gisaf-backend/src/gisaf/registry.py

693 lines
28 KiB
Python
Raw Normal View History

"""
Define the models for the ORM
"""
import logging
import importlib
import pkgutil
from collections import defaultdict
from importlib.metadata import entry_points
from typing import Any, ClassVar
from pydantic import create_model
from sqlalchemy import inspect, text
from sqlalchemy.orm import selectinload
from sqlmodel import select
import numpy as np
import pandas as pd
from .config import conf
from .models import (misc, category as category_module,
project, reconcile, map_bases, tags)
from .models.geo_models_base import (
LiveGeoModel,
PlottableModel,
GeoModel,
RawSurveyBaseModel,
LineWorkSurveyModel,
GeoPointSurveyModel,
GeoLineSurveyModel,
GeoPolygonSurveyModel,
)
from .utils import ToMigrate
from .models.category import Category, CategoryGroup
from .database import db_session
from .models.metadata import survey, raw_survey
logger = logging.getLogger(__name__)
category_model_mapper = {
'Point': GeoPointSurveyModel,
'Line': GeoLineSurveyModel,
'Polygon': GeoPolygonSurveyModel,
}
class NotInRegistry(Exception):
pass
def import_submodules(package, recursive=True):
""" Import all submodules of a module, recursively, including subpackages
:param package: package (name or actual module)
:type package: str | module
:param recursive: scan package recursively
:rtype: dict[str, types.ModuleType]
"""
if isinstance(package, str):
package = importlib.import_module(package)
results = {}
for loader, name, is_pkg in pkgutil.walk_packages(package.__path__):
full_name = package.__name__ + '.' + name
results[full_name] = importlib.import_module(full_name)
if recursive and is_pkg:
results.update(import_submodules(full_name))
return results
class ModelRegistry:
"""
Collect, categorize, and initialize the SQLAlchemy data models.
Maintains registries for all kind of model types, eg. geom, data, values...
Provides tools to get the models from their names, table names, etc.
"""
stores: pd.DataFrame
def __init__(self):
"""
Get geo models
:return: None
"""
self.geom_custom = {}
self.geom_custom_store = {}
self.geom_live: dict[str, LiveGeoModel] = {}
self.geom_live_defs: dict[str, dict[str, Any]] = {}
self.values = {}
self.other = {}
self.misc = {}
self.raw_survey_models = {}
self.survey_models = {}
async def make_registry(self, app=None):
"""
Make (or refresh) the registry of models.
:return:
"""
logger.debug('make_registry')
await self.make_category_models()
self.scan()
await self.build()
## If ogcapi is in app (i.e. not with scheduler):
## Now that the models are refreshed, tells the ogcapi to (re)build
if app:
#app.extra['registry'] = self
if 'ogcapi' in app.extra:
await app.extra['ogcapi'].build()
async def make_category_models(self):
"""
Make geom models from the category model
and update raw_survey_models and survey_models
Important notes:
- the db must be bound before running this function
- the db must be rebound after running this function,
so that the models created are actually bound to the db connection
:return:
"""
logger.debug('make_category_models')
async with db_session() as session:
query = select(Category).order_by(Category.long_name).options(selectinload(Category.category_group))
data = await session.exec(query)
categories: list[Category] = data.all()
for category in categories:
## Several statuses can coexist for the same model, so
## consider only the ones with the 'E' (existing) status
## The other statuses are defined only for import (?)
if getattr(category, 'status', 'E') != 'E':
continue
## Use pydantic create_model, supported by SQLModel
## See https://github.com/tiangolo/sqlmodel/issues/377
store_name = f'{survey.schema}.{category.table_name}'
raw_store_name = f'{raw_survey.schema}.RAW_{category.table_name}'
raw_survey_field_definitions = {
## FIXME: RawSurveyBaseModel.category should be a Category, not category.name
'category_name': (ClassVar[str], category.name),
## FIXME: Same for RawSurveyBaseModel.group
'group_name': (ClassVar[str], category.category_group.name),
'viewable_role': (ClassVar[str], category.viewable_role),
'store_name': (ClassVar[str], raw_store_name),
# 'icon': (str, ''),
}
## Raw survey points
try:
self.raw_survey_models[store_name] = create_model(
__base__=RawSurveyBaseModel,
__model_name=category.raw_survey_table_name,
__cls_kwargs__={
'table': True,
'__tablename__': category.raw_survey_table_name,
},
**raw_survey_field_definitions
)
except Exception as err:
logger.exception(err)
logger.warning(err)
else:
logger.debug('Discovered {:s}'.format(category.raw_survey_table_name))
model_class = category_model_mapper.get(category.model_type)
## Final geometries
try:
if model_class:
survey_field_definitions = {
'category_name': (ClassVar[str], category.name),
'group_name': (ClassVar[str], category.category_group.name),
'raw_store_name': (ClassVar[str], raw_store_name),
'viewable_role': (ClassVar[str], category.viewable_role),
'symbol': (ClassVar[str], category.symbol),
#'raw_model': (str, self.raw_survey_models.get(raw_store_name)),
# 'icon': (str, f'{survey.schema}-{category.table_name}'),
}
self.survey_models[store_name] = create_model(
__base__= model_class,
__model_name=category.table_name,
__cls_kwargs__={
'table': True,
'__tablename__': category.table_name,
},
**survey_field_definitions,
)
except Exception as err:
logger.warning(err)
else:
logger.debug('Discovered {:s}'.format(category.table_name))
logger.info('Discovered {:d} models'.format(len(categories)))
def scan(self):
"""
Scan all models defined explicitely (not the survey ones,
which are defined by categories), and store them for reference.
"""
logger.debug('scan')
from . import models # nocheck
## Scan the models defined in modules
for module_name, module in import_submodules(models).items():
if module_name in (
'src.gisaf.models.geo_models_base',
'src.gisaf.models.models_base',
):
continue
for name in dir(module):
obj = getattr(module, name)
if hasattr(obj, '__module__') and obj.__module__.startswith(module.__name__)\
and hasattr(obj, '__tablename__') and hasattr(obj, 'get_store_name'):
model_type = self.add_model(obj)
logger.debug(f'Model {obj.get_store_name()} added in the registry from gisaf source tree as {model_type}')
## Scan the models defined in plugins (setuptools' entry points)
for module_name, model in self.scan_entry_points(name='gisaf_extras.models').items():
model_type = self.add_model(model)
logger.debug(f'Model {model.get_store_name()} added in the registry from {module_name} entry point as {model_type}')
for module_name, store in self.scan_entry_points(name='gisaf_extras.stores').items():
self.add_store(store)
logger.debug(f'Store {store} added in the registry from {module_name} gisaf_extras.stores entry point')
## Add misc models
for module in misc, category_module, project, reconcile, map_bases, tags:
for name in dir(module):
obj = getattr(module, name)
if hasattr(obj, '__module__') and hasattr(obj, '__tablename__'):
self.misc[name] = obj
async def build(self):
"""
Build the registry: organize all models in a common reference point.
This should be executed after the discovery of surey models (categories)
and the scan of custom/module defined models.
"""
logger.debug('build')
## Combine all geom models (auto and custom)
self.geom = {**self.survey_models, **self.geom_custom}
await self.make_stores()
## Some lists of table, by usage
## XXX: Gino: doesn't set __tablename__ and __table__ , or engine not started???
## So, hack the table names of auto_geom
#self.geom_tables = [model.__tablename__
#self.geom_tables = [getattr(model, "__tablename__", None)
# for model in sorted(list(self.geom.values()),
# key=lambda a: a.z_index)]
values_tables = [model.__tablename__ for model in self.values.values()]
other_tables = [model.__tablename__ for model in self.other.values()]
self.data_tables = values_tables + other_tables
## Build a dict for quick access to the values from a model
logger.warn(ToMigrate('get_geom_model_from_table_name, only used for values_for_model'))
self.values_for_model = {}
for model_value in self.values.values():
for constraint in inspect(model_value).foreign_key_constraints:
model = self.get_geom_model_from_table_name(constraint.referred_table.name)
self.values_for_model[model] = model_value
self.make_menu()
def scan_entry_points(self, name):
"""
Get the entry points in gisaf_extras.models, and return their models
:return: dict of name: models
"""
named_objects = {}
for entry_point in entry_points().select(group=name):
try:
named_objects.update({entry_point.name: entry_point.load()})
except ModuleNotFoundError as err:
logger.warning(err)
return named_objects
def add_model(self, model):
"""
Add the model
:return: Model type (one of {'GeoModel', 'PlottableModel', 'Other model'})
"""
# if not hasattr(model, 'get_store_name'):
# raise NotInRegistry()
table_name = model.get_store_name()
if issubclass(model, GeoModel) and not issubclass(model, RawSurveyBaseModel) and not model.hidden:
self.geom_custom[table_name] = model
return 'GeoModel'
elif issubclass(model, PlottableModel):
self.values[table_name] = model
return 'PlottableModel'
else:
self.other[table_name] = model
return 'Other model'
def add_store(self, store):
self.geom_custom_store[store.name] = store
def make_menu(self):
"""
Build the Admin menu
:return:
"""
self.menu = defaultdict(list)
for name, model in self.stores.model.items():
if hasattr(model, 'Admin'):
self.menu[model.Admin.menu].append(model)
# def get_raw_survey_model_mapping(self):
# """
# Get a mapping of category_name -> model for categories
# :return: dict of name -> model (class)
# """
# ## TODO: add option to pass a single item
# ## Local imports, avoiding cyclic dependencies
# ## FIXME: Gino
# categories = db.session.query(Category)
# return {category.name: self.raw_survey_models[category.table_name]
# for category in categories
# if self.raw_survey_models.get(category.table_name)}
async def get_model_id_params(self, model, id):
"""
Return the parameters for this item (table name, id), displayed in info pane
"""
if not model:
return {}
item = await model.load(**model.get_join_with()).query.where(model.id==id).gino.first()
if not item:
return {}
resp = {}
resp['itemName'] = item.caption
resp['geoInfoItems'] = await item.get_geo_info()
resp['surveyInfoItems'] = await item.get_survey_info()
resp['infoItems'] = await item.get_info()
resp['tags'] = await item.get_tags()
if hasattr(item, 'get_categorized_info'):
resp['categorized_info_items'] = await item.get_categorized_info()
if hasattr(item, 'get_graph'):
resp['graph'] = item.get_graph()
if hasattr(item, 'Attachments'):
if hasattr(item.Attachments, 'files'):
resp['files'] = await item.Attachments.files(item)
if hasattr(item.Attachments, 'images'):
resp['images'] = await item.Attachments.images(item)
if hasattr(item, 'get_external_record_url'):
resp['externalRecordUrl'] = item.get_external_record_url()
return resp
def get_geom_model_from_table_name(self, table_name):
"""
Utility func to get a geom model from a table name
:param table_name: str
:return: model or None
"""
for model in self.geom.values():
if model.__tablename__ == table_name:
return model
def get_other_model_from_table_name(self, table_name):
"""
Utility func to get a non-geom model from a table name
:param table_name: str
:return: model or None
"""
for model in registry.other.values():
if model.__tablename__ == table_name:
return model
for model in registry.values.values():
if model.__tablename__ == table_name:
return model
async def make_stores(self):
"""
Make registry for primary groups, categories and survey stores using Pandas dataframes.
Used in GraphQl queries.
"""
## Utility functions used with apply method (dataframes)
def fill_columns_from_custom_models(row):
return (
## FIXME: Like: 'AVESHTEquipment'
row.model.__namespace__['__qualname__'], ## Name of the class - hacky
row.model.description,
## FIXME: Like: 'other_aves'
row.model.__table__.schema
)
def fill_columns_from_custom_stores(row):
return (
row.model.description,
row.model.description,
None ## Schema
)
def get_store_name(category):
fragments = ['V', category.group, category.minor_group_1]
if category.minor_group_2 != '----':
fragments.append(category.minor_group_2)
return '.'.join([
survey.schema,
'_'.join(fragments)
])
self.categories = await Category.get_df()
self.categories['title'] = self.categories.long_name.fillna(self.categories.description)
self.categories['store'] = self.categories.apply(get_store_name, axis=1)
self.categories['count'] = pd.Series(dtype=pd.Int64Dtype())
self.categories.set_index('name', inplace=True)
df_models = pd.DataFrame(self.geom.items(),
columns=['store', 'model']
).set_index('store')
df_raw_models = pd.DataFrame(self.raw_survey_models.items(),
columns=('store', 'raw_model')
).set_index('store')
self.categories = self.categories.merge(df_models, left_on='store', right_index=True)
self.categories = self.categories.merge(df_raw_models, left_on='store', right_index=True)
self.categories['custom'] = False
self.categories['is_db'] = True
self.categories.sort_index(inplace=True)
# self.categories['name_letter'] = self.categories.index.str.slice(0, 1)
# self.categories['name_number'] = self.categories.index.str.slice(1).astype('int64')
# self.categories.sort_values(['name_letter', 'name_number'], inplace=True)
## Set in the stores dataframe some useful properties, from the model class
## Maybe at some point it makes sense to get away from class-based definitions
if len(self.categories) > 0:
## XXX: redundant self.categories['store_name'] with self.categories['store']
#self.categories['store_name'] = self.categories.apply(
# lambda row: row.model.get_store_name(),
# axis=1
#)
#self.categories['raw_model_store_name'] = self.categories.apply(
# lambda row: row.raw_model.store_name,
# axis=1
#)
self.categories['is_line_work'] = self.categories.apply(
lambda row: issubclass(row.model, LineWorkSurveyModel),
axis=1
)
else:
self.categories['store_name'] = None
self.categories['raw_model_store_name'] = None
self.categories['is_line_work'] = None
self.categories['raw_survey_model'] = None
## Custom models (Misc)
self.custom_models = pd.DataFrame(
self.geom_custom.items(),
columns=['store', 'model']
).set_index('store')
self.custom_models['group'] = 'Misc'
self.custom_models['custom'] = True
self.custom_models['is_db'] = True
self.custom_models['raw_model_store_name'] = ''
self.custom_models['in_menu'] = self.custom_models.apply(
lambda row: getattr(row.model, 'in_menu', True),
axis=1
)
self.custom_models = self.custom_models.loc[self.custom_models.in_menu]
self.custom_models['auto_import'] = False
self.custom_models['is_line_work'] = False
if len(self.custom_models) > 0:
self.custom_models['long_name'],\
self.custom_models['custom_description'],\
self.custom_models['db_schema'],\
= zip(*self.custom_models.apply(fill_columns_from_custom_models, axis=1))
## Try to give a meaningful description, eg. including the source (db_schema)
self.custom_models['description'] = self.custom_models['custom_description'].fillna(self.custom_models['long_name'] + '-' + self.custom_models['db_schema'])
self.custom_models['title'] = self.custom_models['long_name']
## Custom stores (Community)
self.custom_stores = pd.DataFrame(
self.geom_custom_store.items(),
columns=['store', 'model']
).set_index('store')
self.custom_stores['group'] = 'Community'
self.custom_stores['custom'] = True
self.custom_stores['is_db'] = False
if len(self.custom_stores) == 0:
self.custom_stores['in_menu'] = False
else:
self.custom_stores['in_menu'] = self.custom_stores.apply(
lambda row: getattr(row.model, 'in_menu', True),
axis=1
)
self.custom_stores = self.custom_stores.loc[self.custom_stores.in_menu]
self.custom_stores['auto_import'] = False
self.custom_stores['is_line_work'] = False
if len(self.custom_stores) > 0:
self.custom_stores['long_name'],\
self.custom_stores['description'],\
self.custom_stores['db_schema'],\
= zip(*self.custom_stores.apply(fill_columns_from_custom_stores, axis=1))
self.custom_stores['title'] = self.custom_stores['long_name']
## Combine Misc (custom) and survey (auto) stores
## Retain only one status per category (defaultStatus, 'E'/existing by default)
self.stores = pd.concat([
self.categories[self.categories.status==conf.map.defaultStatus[0]].reset_index().set_index('store').sort_values('title'),
self.custom_models,
self.custom_stores
])#.drop(columns=['store_name'])
self.stores['in_menu'] = self.stores['in_menu'].astype(bool)
## Set in the stores dataframe some useful properties, from the model class
## Maybe at some point it makes sense to get away from class-based definitions
def fill_columns_from_model(row):
return (
# row.model.icon,
# row.model.symbol,
row.model.mapbox_type, # or None,
row.model.base_gis_type,
row.model.z_index,
row.model.attribution,
)
# self.stores['icon'],\
# self.stores['symbol'],\
self.stores['mapbox_type_default'], \
self.stores['base_gis_type'], \
self.stores['z_index'], \
self.stores['attribution'] \
= zip(*self.stores.apply(fill_columns_from_model, axis=1))
#self.stores['mapbox_type_custom'] = self.stores['mapbox_type_custom'].replace('', np.nan).fillna(np.nan)
self.stores['mapbox_type'] = self.stores['mapbox_type_custom'].fillna(
self.stores['mapbox_type_default']
)
self.stores['viewable_role'] = self.stores.apply(
lambda row: getattr(row.model, 'viewable_role', None),
axis=1,
)
self.stores['viewable_role'].replace('', None, inplace=True)
#self.stores['gql_object_type'] = self.stores.apply(make_model_gql_object_type, axis=1)
self.stores['is_live'] = False
self.stores['description'].fillna('', inplace=True)
## Layer groups: Misc, survey's primary groups, Live
self.primary_groups = await CategoryGroup.get_df()
self.primary_groups.sort_values('name', inplace=True)
self.primary_groups['title'] = self.primary_groups['long_name']
## Add Misc and Live
self.primary_groups.loc[-1] = (
'Misc',
False,
'Misc and old layers (not coming from our survey; they will be organized, '
'eventually as the surveys get more complete)',
'Misc',
)
self.primary_groups.index = self.primary_groups.index + 1
self.primary_groups.loc[len(self.primary_groups)] = (
'Live',
False,
'Layers from data processing, sensors, etc, and are updated automatically',
'Live',
)
self.primary_groups.loc[len(self.primary_groups)] = (
'Community',
False,
'Layers from community',
'Community',
)
self.primary_groups.sort_index(inplace=True)
#def make_group(group):
# return GeomGroup(
# name=group['name'],
# title=group['title'],
# description=group['long_name']
# )
#self.primary_groups['gql_object_type'] = self.primary_groups.apply(make_group, axis=1)
await self.update_stores_counts()
async def get_stores(self):
"""
Get information about the available stores
"""
raise DeprecationWarning('get_stores was for graphql')
async def update_stores_counts(self):
"""
Update the counts of the stores fro the DB
"""
query = "SELECT schemaname, relname, n_live_tup FROM pg_stat_user_tables"
# async with db.acquire(reuse=False) as connection:
async with db_session() as session:
rows = await session.exec(text(query))
all_tables_count = pd.DataFrame(rows, columns=['schema', 'table', 'count'])
all_tables_count['store'] = all_tables_count['schema'] + '.' + all_tables_count['table']
all_tables_count.set_index(['store'], inplace=True)
## TODO: a DB VACUUM can be triggered if all counts are 0?
## Update the count in registry's stores
self.stores.loc[:, 'count'] = all_tables_count['count']
# ## FIXME: count for custom stores
# store_df = self.stores.loc[(self.stores['count'] != 0) | (self.stores['is_live'])]
# def set_count(row):
# row.gql_object_type.count = row['count']
# store_df[store_df.is_db].apply(set_count, axis=1)
# return store_df.gql_object_type.to_list()
def update_live_layers(self):
"""
Update the live layers, using the list of model definitions found in
self.geom_live_defs, which is normally updated by the redis store
"""
## Remove existing live layers
self.geom_live = {}
self.stores.drop(self.stores[self.stores.is_live == True].index, # noqa: E712
inplace=True)
df_live = pd.DataFrame.from_dict(self.geom_live_defs.values(),
orient='columns'
).set_index('store')
## Adjust column names
## and add columns, to make sure pandas dtypes are not changed when the
## dataframes are concat
## TODO: standardize names across the whole workflow,
## then remove the rename below:
df_live.rename(
columns={
'live': 'is_live',
'zIndex': 'z_index',
'gisType': 'model_type',
'type': 'mapbox_type',
'viewableRole': 'viewable_role',
}, inplace=True
)
## Add columns
df_live['auto_import'] = False
df_live['base_gis_type'] = df_live['model_type']
df_live['custom'] = False
df_live['group'] = ''
df_live['in_menu'] = True
df_live['is_db'] = False
df_live['is_line_work'] = False
df_live['long_name'] = df_live['name']
df_live['mapbox_type_custom'] = df_live['mapbox_type']
df_live['minor_group_1'] = ''
df_live['minor_group_2'] = ''
df_live['status'] = 'E'
df_live['style'] = None
df_live['title'] = df_live['name']
registry.stores = pd.concat([registry.stores, df_live])
for store, model_info in self.geom_live_defs.items():
## Add provided live layers in the stores df
# Create the pydantic model
# NOTE: Unused at this point, but might be usedful
field_definitions = {
k: (ClassVar[v.__class__], v)
for k, v in model_info.items()
}
self.geom_live[store] = create_model(
__model_name=store,
__base__= LiveGeoModel,
**field_definitions
)
# Accessible as global
registry: ModelRegistry = ModelRegistry()
## Below, some unused code, maybe to be used later for displaying layers in a tree structure
## Some magic for making a tree from enumarables,
## https://gist.github.com/hrldcpr/2012250
#Tree = lambda: defaultdict(Tree)
#
#
#def add(t, path):
# for node in path:
# t = t[node]
#
#
#dicts = lambda t: {k: dicts(t[k]) for k in t}
#
#
#def get_geom_models_tree():
# tree = Tree()
# for model in models.geom_custom:
# full_name = model.__module__[len('gisaf.models')+1:]
# add(tree, full_name.split('.'))
# add(tree, full_name.split('.') + [model])
# return dicts(tree)