diff --git a/src/gisaf/_version.py b/src/gisaf/_version.py index 66fc9d4..189656d 100644 --- a/src/gisaf/_version.py +++ b/src/gisaf/_version.py @@ -1 +1 @@ -__version__ = '2023.4.dev11+g5c9d82f.d20231223' \ No newline at end of file +__version__ = '2023.4.dev12+gfda845c.d20231223' \ No newline at end of file diff --git a/src/gisaf/admin.py b/src/gisaf/admin.py new file mode 100644 index 0000000..2d94742 --- /dev/null +++ b/src/gisaf/admin.py @@ -0,0 +1,113 @@ +from asyncio import create_task +from importlib.metadata import entry_points +import logging + +from gisaf.live import live_server +from gisaf.redis_tools import Store + +logger = logging.getLogger('Gisaf admin manager') + +class AdminManager: + """ + Application wide manager of the admin (baskets). + One instance only, handled by Gisaf's process. + """ + store: Store + async def setup_admin(self, app): + """ + Create the default baskets, scan and create baskets + from the Python entry points. + Runs at startup. + """ + self.app = app + self.store = app['store'] + + ## Standard baskets + from gisaf.baskets import Basket, standard_baskets + self.baskets = { + basket.name: basket + for basket in standard_baskets + } + + for entry_point in entry_points().select(group='gisaf_extras.baskets'): + try: + basket_class = entry_point.load() + except ModuleNotFoundError as err: + logger.warning(err) + continue + if issubclass(basket_class, Basket): + ## Get name, validity check + if basket_class.name == None: + name = entry_point.name + else: + name = basket_class.name + if name in self.baskets: + logger.warn(f'Skip basket {name} in {entry_point.module}: name already defined') + continue + ## Instanciate + basket = basket_class() + basket._custom_module = entry_point.name + ## Check base_dir, eventually create it + if not basket.base_dir.exists(): + try: + basket.base_dir.mkdir() + except Exception as err: + logger.warn(f'Skip basket {name} in {entry_point.module}: ' + f'cannot create directory for basket {name} at {basket.base_dir}') + continue + else: + logger.info(f'Created directory for basket {name} at {basket.base_dir}') + ## Add to register + self.baskets[name] = basket + logger.info(f'Added Basket {entry_point.name} from {entry_point.module}') + + ## Give a reference to the application to the baskets + for basket in self.baskets.values(): + basket.app = app + + ## Subscribe to admin redis channels + self.pub_categories = self.store.redis.pubsub() + self.pub_scheduler = self.store.redis.pubsub() + await self.pub_categories.psubscribe('admin:categories:update') + task1 = create_task(self._listen_to_redis_categories()) + await self.pub_scheduler.psubscribe('admin:scheduler:json') + task2 = create_task(self._listen_to_redis_scheduler()) + + app['admin'] = self + + async def baskets_for_role(self, request): + return { + name: basket for name, basket in self.baskets.items() + if await basket.allowed_for(request) + } + + async def _listen_to_redis_categories(self): + """ + Subscribe the redis sub channel for category updates ("admin:categories:update") + """ + async for msg in self.pub_categories.listen(): + if msg['type'] == 'pmessage': + ## XXX: Why the name isn't retrieved? + #client = await self.app['store'].pub.client_getname() + client = self.app['store'].uuid + + ## !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + ## FIXME: pubsub admin:categories:update + ## !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + + ## Skip for the process which sent this message actually updated its registry + #breakpoint() + if client != msg['data'].decode(): + from gisaf.database import make_auto_models + await make_auto_models(self.app) + + async def _listen_to_redis_scheduler(self): + """ + Subscribe the redis sub channel for scheduler jobs ("admin:scheduler:json") + """ + async for msg in self.pub_scheduler.listen(): + if msg['type'] == 'pmessage': + await live_server._send_to_ws_clients(msg['channel'].decode(), msg['data'].decode()) + + +manager = AdminManager() \ No newline at end of file diff --git a/src/gisaf/api.py b/src/gisaf/api.py index bbc78bf..86bccc8 100644 --- a/src/gisaf/api.py +++ b/src/gisaf/api.py @@ -8,20 +8,20 @@ from fastapi.security import OAuth2PasswordRequestForm from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession -from .models.authentication import ( +from gisaf.models.authentication import ( User, UserRead, Role, RoleRead, ) -from .models.category import Category, CategoryRead -from .config import conf -from .models.bootstrap import BootstrapData -from .models.store import Store -from .database import get_db_session, pandas_query, fastapi_db_session as db_session -from .security import ( +from gisaf.models.category import Category, CategoryRead +from gisaf.config import conf +from gisaf.models.bootstrap import BootstrapData +from gisaf.models.store import Store +from gisaf.database import pandas_query, fastapi_db_session as db_session +from gisaf.security import ( Token, authenticate_user, get_current_user, create_access_token, ) -from .registry import registry +from gisaf.registry import registry logger = logging.getLogger(__name__) diff --git a/src/gisaf/application.py b/src/gisaf/application.py index 3e6861a..5d97e3e 100644 --- a/src/gisaf/application.py +++ b/src/gisaf/application.py @@ -3,12 +3,12 @@ from contextlib import asynccontextmanager from fastapi import FastAPI, responses -from .api import api -from .geoapi import api as geoapi -from .config import conf -from .registry import registry -from .redis_tools import setup_redis, setup_redis_cache, shutdown_redis -from .live import setup_live +from gisaf.api import api +from gisaf.geoapi import api as geoapi +from gisaf.config import conf +from gisaf.registry import registry +from gisaf.redis_tools import setup_redis, setup_redis_cache, shutdown_redis +from gisaf.live import setup_live logging.basicConfig(level=conf.gisaf.debugLevel) logger = logging.getLogger(__name__) diff --git a/src/gisaf/baskets.py b/src/gisaf/baskets.py new file mode 100644 index 0000000..e15776a --- /dev/null +++ b/src/gisaf/baskets.py @@ -0,0 +1,307 @@ +from pathlib import Path +from collections import defaultdict +from json import loads +from datetime import datetime +import logging +from typing import ClassVar + +# from aiohttp_security import check_permission +# from aiohttp.multipart import MultipartReader +# from aiohttp.web import HTTPUnauthorized, HTTPForbidden + +from gisaf.config import conf +from gisaf.models.admin import FileImport +# from gisaf.models.graphql import AdminBasketFile, BasketImportResult +from gisaf.models.survey import Surveyor, Accuracy, Equipment, AccuracyEquimentSurveyorMapping +from gisaf.models.project import Project +from gisaf.importers import RawSurveyImporter, GeoDataImporter, LineWorkImporter, ImportError +from gisaf.utils import ToMigrate + +logger = logging.getLogger(__name__) + +upload_fields_available = ['store', 'status', 'project', 'surveyor', 'equipment'] + + +class Basket: + """ + Base class for all baskets. + Plugin modules can import and subclass (the admin Manager sets the _custom_module + attribute for reference to that module). + The basket displays only columns (of FileImport) defined in the class, + and additional fields for uploads. + The basket can have a role. In that case, it will be completely hidden from users + who don't have that role. + """ + name: ClassVar[str] + importer_class = None + _custom_module = None + columns: list[str] = ['name', 'time', 'import', 'delete'] + upload_fields: list[str] = [] + role = None + + def __init__(self): + self.base_dir = Path(conf.admin.basket.base_dir) / self.name + if self.importer_class: + self.importer = self.importer_class() + self.importer.basket = self + + async def allowed_for(self, request): + """ + Return False if the basket is protected by a role + Request: aiohttp.Request instance + """ + if not self.role: + return True + else: + try: + await check_permission(request, self.role) + except (HTTPUnauthorized, HTTPForbidden): + return False + else: + return True + + async def get_files(self, convert_path=False): + """ + Get a dataframe of FileImport items in the basket. + """ + where = FileImport.basket==self.name + ## First, get the list of files in the base_dir, then associate with the FileImport instance + df = await FileImport.get_df( + where=where, + with_related=True + #with_only_columns=['id', 'path', 'time', 'status', 'table'], + ) + df.rename(columns={ + 'gisaf_admin_project_name': 'project', + 'gisaf_survey_surveyor_name': 'surveyor', + 'gisaf_survey_equipment_name': 'equipment', + }, inplace=True) + ## Sanity check + df.dropna(subset=['name'], inplace=True) + df['dir'] = df.dir.fillna('.') + df.reset_index(drop=True, inplace=True) + + ## TODO: After the old admin is completely off and all is clean and nice, remove below and just: + # return df + + ## Until the compatibility with old admin is required and we're sure nothing is destroyed: + ## Get files on the file system + if len(df) == 0: + return df + if convert_path: + df['path'] = df.apply(lambda fi: Path(fi['dir'])/fi['name'], axis=1) + #if check_fs: + # files = set(self.base_dir.glob('**/*')) + # df['exists'] = df.apply(lambda fi: self.base_dir/fi['dir']/fi['name'] in files, axis=1) + else: + return df + + async def get_file(self, id): + df = await FileImport.get_df( + where=FileImport.id==id, + with_related=True, + ) + df.rename(columns={ + 'gisaf_admin_project_name': 'project', + 'gisaf_survey_surveyor_name': 'surveyor', + 'gisaf_survey_equipment_name': 'equipment', + }, inplace=True) + df['dir'] = df.dir.fillna('.') + ## Replace path with Path from pathlib + df['path'] = df.apply(lambda fi: self.base_dir/fi['dir']/fi['name'], axis=1) + file = df.iloc[0] + ## Override file.name, which otherwise is the index of the item (hack) + file.name = file['name'] + return file + + async def delete_file(self, id): + file = await FileImport.get(id) + if file.dir: + path = self.base_dir/file.dir/file.name + else: + path = self.base_dir/file.name + if path.exists(): + path.unlink() + await file.delete() + + async def import_file(self, file_import, dry_run=True, return_data_info=False, **kwargs): + """ + Import the file by calling the basket's importer's do_import. + Time stamp the FileImport. + Return a BasketImportResult ObjectType + """ + if not hasattr(self, 'importer'): + return BasketImportResult( + message=f'No import defined/required for {self.name} basket' + ) + try: + import_result = await self.importer.do_import(file_import, dry_run=dry_run, **kwargs) + except ImportError as err: + raise + except Exception as err: + logger.exception(err) + raise ImportError(f'Unexpected import error: {err}') + + if isinstance(import_result, BasketImportResult): + result = import_result + else: + if import_result: + if isinstance(import_result, (tuple, list)): + assert len(import_result) >= 2, \ + 'do_import should return message or (message, details)' + result = BasketImportResult( + message=import_result[0], + details=import_result[1], + ) + if len(import_result) > 2: + data = import_result[2] + else: + result = BasketImportResult(message=import_result) + else: + result = BasketImportResult(message='Import successful.') + if dry_run: + result.time = file_import.time + else: + if not result.time: + result.time = datetime.now() + ## Save time stamp + await (await FileImport.get(file_import.id)).update(time=result.time).apply() + if return_data_info: + return result, data + else: + return result + + async def add_files(self, reader, request): + """ + File upload to basket. + Typically called through an http POST view handler. + Save the file, save the FileImport record, return dict of information. + Note that the return dict has eventually numpy types and needs NumpyEncoder + to be json.dump'ed. + """ + raise ToMigrate("basket add_files reader was aiohttp's MultipartReader") + ## TODO: multiple items + ## TODO: check if file already exists + ## First part is the file + part = await reader.next() + assert part.name == 'file' + file_name = part.filename + + ## Save file on filesystem + size = 0 + path = Path(self.base_dir) / file_name + + ## Eventually create the directory + path.parent.mkdir(parents=True, exist_ok=True) + with path.open('wb') as f: + while True: + chunk = await part.read_chunk() # 8192 bytes by default. + if not chunk: + break + size += len(chunk) + f.write(chunk) + + ## Read other parts + parts = defaultdict(None) + while True: + part = await reader.next() + if not part: + break + value = (await part.read()).decode() + if value != 'null': + parts[part.name] = value + + ## Find ids of project, surveyor, equipment + if 'project' in parts: + project_id = (await Project.query.where(Project.name==parts.get('project')).gino.first()).id + else: + project_id = None + if 'surveyor' in parts: + surveyor_id = (await Surveyor.query.where(Surveyor.name==parts.get('surveyor')).gino.first()).id + else: + surveyor_id = None + if 'equipment' in parts: + equipment_id = (await Equipment.query.where(Equipment.name==parts.get('equipment')).gino.first()).id + else: + equipment_id = None + + ## Save FileImport record + store_keys = [store_key for store_key in parts.keys() + if store_key.startswith('store')] + if len(store_keys) == 1: + store_type_name = parts[store_keys[0]] + else: + store_type_name = None + fileImportRecord = await FileImport( + name=file_name, + dir=parts.get('dir', '.'), + basket=self.name, + project_id=project_id, + surveyor_id=surveyor_id, + equipment_id=equipment_id, + store=store_type_name, + status=parts.get('status', None), + ).create() + fileImportRecord.path = self.base_dir/fileImportRecord.dir/fileImportRecord.name + + admin_basket_file = AdminBasketFile( + id=fileImportRecord.id, + name=file_name, + status=parts.get('status'), + store=store_type_name, + project=parts.get('project'), + surveyor=parts.get('surveyor'), + equipment=parts.get('equipment'), + ) + + ## Eventually do import + import_result = None + if loads(parts['autoImport']): + ## Get the record from DB with Pandas, for compatibility with import_file + file_import_record = await self.get_file(fileImportRecord.id) + try: + await check_permission(request, 'reviewer') + except HTTPUnauthorized as err: + basket_import_result = BasketImportResult( + message="Cannot import: only a reviewer can do that" + ) + else: + dry_run = parts.get('dry_run', False) + try: + basket_import_result = await self.import_file(file_import_record, dry_run) + except ImportError as err: + basket_import_result = BasketImportResult( + message=f'Error: {err.args[0]}' + ) + + admin_basket_file.import_result = basket_import_result + + return admin_basket_file + + +class MiscGeomBasket(Basket): + name = 'Misc geo file' + importer_class = GeoDataImporter + columns = ['name', 'time', 'status', 'store', 'import', 'delete'] + upload_fields = ['store_misc', 'status'] + + +class LineWorkBasket(Basket): + name = 'Line work' + importer_class = LineWorkImporter + columns = ['name', 'time', 'status', 'store', 'project', 'import', 'delete'] + upload_fields = ['store_line_work', 'project', 'status'] + + +class SurveyBasket(Basket): + name = 'Survey' + importer_class = RawSurveyImporter + columns = ['name', 'time', 'project', 'surveyor', 'equipment', 'import', 'delete'] + upload_fields = ['project', 'surveyor', 'equipment'] + + +standard_baskets = ( + SurveyBasket(), + LineWorkBasket(), + MiscGeomBasket(), +) diff --git a/src/gisaf/config.py b/src/gisaf/config.py index 83e0499..4e697c8 100644 --- a/src/gisaf/config.py +++ b/src/gisaf/config.py @@ -10,7 +10,7 @@ from pydantic import ConfigDict from pydantic.v1.utils import deep_update from yaml import safe_load -from ._version import __version__ +from gisaf._version import __version__ #from sqlalchemy.ext.asyncio.engine import AsyncEngine #from sqlalchemy.orm.session import sessionmaker diff --git a/src/gisaf/database.py b/src/gisaf/database.py index d6e05f6..9767af6 100644 --- a/src/gisaf/database.py +++ b/src/gisaf/database.py @@ -7,7 +7,7 @@ from sqlmodel.ext.asyncio.session import AsyncSession from fastapi import Depends import pandas as pd -from .config import conf +from gisaf.config import conf engine = create_async_engine( conf.db.get_sqla_url(), diff --git a/src/gisaf/geoapi.py b/src/gisaf/geoapi.py index 48fb14f..12927e4 100644 --- a/src/gisaf/geoapi.py +++ b/src/gisaf/geoapi.py @@ -10,9 +10,9 @@ from asyncio import CancelledError from fastapi import (FastAPI, HTTPException, Response, Header, WebSocket, WebSocketDisconnect, status, responses) -from .redis_tools import store as redis_store -from .live import live_server -from .registry import registry +from gisaf.redis_tools import store as redis_store +from gisaf.live import live_server +from gisaf.registry import registry logger = logging.getLogger(__name__) diff --git a/src/gisaf/importers.py b/src/gisaf/importers.py new file mode 100644 index 0000000..697ba2e --- /dev/null +++ b/src/gisaf/importers.py @@ -0,0 +1,628 @@ +import datetime +import logging +import re +from collections import defaultdict, OrderedDict +from math import isnan +from mimetypes import guess_type +from zipfile import ZipFile, BadZipFile + +import geopandas as gpd +import numpy as np +import pandas as pd +from shapely.wkb import dumps as dumps_wkb +from sqlalchemy import and_ +from sqlalchemy.sql.schema import Column + +from gisaf.config import conf +#from .models.admin import FileImport +from gisaf.models.admin import FeatureImportData +# from gisaf.models.graphql import BasketImportResult +from gisaf.models.raw_survey import RawSurveyModel +from gisaf.models.survey import Surveyor, Accuracy, Equipment, AccuracyEquimentSurveyorMapping +from gisaf.models.tags import Tags +from gisaf.redis_tools import store as redis_store +from gisaf.registry import registry +from gisaf.utils import ( + delete_df, upsert_df, + SHAPELY_TYPE_TO_MAPBOX_TYPE, DEFAULT_MAPBOX_LAYOUT, + MAPBOX_COLOR_ATTRIBUTE_NAME, MAPBOX_OPACITY_ATTRIBUTE_NAME, + DEFAULT_MAPBOX_PAINT, gisTypeSymbolMap +) + +logger = logging.getLogger(__name__) + +class ImportError(Exception): + pass + +class Importer: + """ + Base class for all importers. + The main process is executed by do_import(file) + Subclasses should define read_file and process_df. + """ + basket = None + + async def do_import(self, file_record, dry_run=False, **kwargs): + """ + Return: a BasketImportResult instance, or a message string, + or a tuple or a list like (message, details for user feedback). + """ + df = await self.read_file(file_record) + model = self.get_model(file_record) + return await self.import_df(df, model, file_record, dry_run=dry_run, **kwargs) + + async def read_file(self, file_record): + """ + Subclasses should define the operation for reading the file into a DataFrame. + """ + raise NotImplementedError('Nothing defined for reading the file') + + async def import_df(self, df, model, file_record, dry_run=False, **kwargs): + """ + Subclasses should define the operation for importing the DataFrame (save to DB). + """ + raise NotImplementedError('Nothing defined for importing the file') + + async def get_model(self, file_record): + """ + Subclasses should define the operation for returning the model for the file. + """ + raise NotImplementedError('Nothing defined for identifying the model') + + def preprocess(self, df): + """ + Hook for pre-processing the dataframe read from the file. + """ + pass + + +class RawSurveyImporter(Importer): + """ + Importer for raw survey points + """ + def get_model(self, file_record): + return RawSurveyModel + + async def read_file(self, file_record): + try: + df = pd.read_csv(file_record.path, header=None) + except pd.errors.ParserError: + raise ImportError('File format error (cannot parse)') + + ## Take only the first 5 columns + if len(df.columns) < 5: + raise ImportError('File format error (at least 5 columns needed)') + df = df[range(5)] + df.dropna(inplace=True) + df.columns = ['orig_id', 'easting', 'northing', 'elevation', 'category'] + ## Strip extra characters + df['category'] = df['category'].str.strip() + df['orig_id'] = df.orig_id.astype(str) + + ## Create a Geodataframe + gdf = gpd.GeoDataFrame( + df, + geometry=gpd.points_from_xy(df.easting, df.northing, df.elevation), + crs=conf.raw_survey['spatial_sys_ref'] + ) + return gdf + + async def import_df(self, gdf, model, file_record, dry_run=False, + remove_misplaced=False, **kwargs): + user_feedback = OrderedDict() + user_feedback['Points in file'] = len(gdf) + + ## Date from the file name + ## TODO: should be added to FileImport (or other metadata) + fname_search = re.match('^(\S+)-(\d\d\d\d)-(\d\d)-(\d\d).*$', file_record['name']) + if not fname_search: + raise ImportError('The file name is not OK ' + '(format should be: "PPP-DESCRIPTION-YYYY-MM-DD", ' + 'PPP being the project name, DESCRITION is optional and discarded)') + date = datetime.date(day=int(fname_search.group(4)), + month=int(fname_search.group(3)), + year=int(fname_search.group(2))) + + ## TODO: Check if date == file_record.date + + accuracies = await AccuracyEquimentSurveyorMapping.get_df(with_related=True) + + ## Cleanup + accuracies.rename(columns={ + 'gisaf_survey_surveyor_id': 'surveyor_id', + 'gisaf_survey_surveyor_name': 'surveyor', + 'gisaf_survey_equipment_id': 'equipment_id', + 'gisaf_survey_equipment_name': 'equipment', + 'gisaf_survey_accuracy_id': 'accuracy_id', + 'gisaf_survey_accuracy_name': 'accuracy_name', + 'gisaf_survey_accuracy_accuracy': 'accuracy', + 'geometry__3': 'type', + }, inplace=True) + accuracies.drop(columns=['surveyor__1', 'equipment_2', 'accuracy__4'], inplace=True) + accuracy = accuracies.loc[ + (accuracies.surveyor==file_record.surveyor) + & (accuracies.equipment==file_record.equipment) + & (accuracies.type=='Point') + ] + if len(accuracy) == 0: + raise ImportError(f'No accuracy defined for surveyor {file_record.surveyor} '\ + f'and equipment {file_record.equipment}') + elif len(accuracy) > 1: + raise ImportError(f'More than 1 accuracy defined for surveyor {file_record.surveyor} '\ + f'and equipment {file_record.equipment}') + accuracy = accuracy.iloc[0] + + def _apply_point_hash_function(rec): + return point_hash_function( + date, + file_record.project_id, + file_record.surveyor_id, + file_record.equipment_id, + rec.name + 1 ## Index in the file + ) + + gdf['id'] = gdf.apply(_apply_point_hash_function, axis=1) + + ## Add information to all records in the gdf + gdf['date'] = pd.to_datetime(date) + gdf['project_id'] = file_record.project_id + gdf['srvyr_id'] = file_record.surveyor_id + gdf['equip_id'] = file_record.equipment_id + gdf['accur_id'] = accuracy.accuracy_id + + df_raw_survey = await model.get_df( + where=and_( + model.date==date, + model.project_id==file_record.project_id, + model.srvyr_id==file_record.surveyor_id, + model.equip_id==file_record.equipment_id, + ) + ) + user_feedback['Existing raw_survey points'] = len(df_raw_survey) + + ## Import to raw_survey_data table + ## PostGis specific: add SRID + gdf['geom'] = gdf['geometry'].apply(lambda g: dumps_wkb(g, srid=conf.raw_survey['srid'], hex=True)) + if not dry_run: + await upsert_df(gdf, model) + + ## Keep track of points without known categories for user feedback + df_unknown_categories = gdf[~gdf.category.isin(registry.categories.index)] + unknown_categories = df_unknown_categories[['category', 'orig_id']].groupby('category').count() + for row in unknown_categories.itertuples(): + user_feedback[f'Category {row[0]} (unknown)'] = row[1] + + gdf = gdf.merge(registry.categories, left_on='category', right_index=True) + + ## Import to raw survey models tables + for category_name, category_gdf in gdf.groupby('category'): + category = registry.categories.loc[category_name] + user_feedback[f"Category {category.name} ({category.store_name})"] = len(category_gdf) + if not dry_run: + await upsert_df(category_gdf, category.raw_survey_model) + + ## Auto import: import from raw survey models tables to geometry tables + gdf['geom'] = gdf.to_crs(conf.srid).geometry.apply(lambda g: dumps_wkb(g, srid=conf.srid, hex=True)) + for category_name, category_gdf in gdf.groupby('category'): + category = registry.categories.loc[category_name] + if category.auto_import and category.model_type == 'Point': + if not dry_run: + await upsert_df(category_gdf, category.model) + + if remove_misplaced: + gdf_db = await category.model.get_geo_df() + gdf_db.reset_index(inplace=True) + ## Finding by id does not work since points might + ## have been placed in a different category + #matches = gdf_db.loc[gdf_db['id'].isin(category_gdf.id)] + matches = gdf_db.loc[ + gdf_db.orig_id.isin(category_gdf.orig_id) + & (gdf_db.srvyr_id == file_record.surveyor_id) + & (gdf_db.equip_id == file_record.equipment_id) + & (gdf_db.date.dt.date == date) + ] + matches_merged = matches.merge( + category_gdf.to_crs(conf.srid), + on='orig_id', + suffixes=('', '_new') + ) + misplaced = matches_merged.loc[ + matches_merged['geometry'] != matches_merged['geometry_new'] + ] + if len(misplaced) > 0: + await delete_df(misplaced, category.model) + ## Filter matches with orig_id and different geometry + pass + + user_feedback['Points imported'] = len(gdf) + + return BasketImportResult( + message=f"Import successful{' (dry run)' if dry_run else ''}", + details=user_feedback, + ) + + +class GeoDataImporter(Importer): + """ + Base class for geo data importers + """ + def get_model(self, file_record): + return registry.geom.get(file_record.store) + + async def read_file(self, file_record): + ## TODO: guess_type supports pathlib.Path from Python 3.8, so it should be: + #if guess_type(file_record.path)[0] == 'application/zip': + ## For Python 3.7 compatibility: + if guess_type(str(file_record.path.absolute()))[0] == 'application/zip': + return gpd.read_file(f'zip://{file_record.path}') + else: + return gpd.read_file(file_record.path) + + async def get_metadata_from_raw_points(self, gdf, model, file_record): + pass + + async def import_df(self, gdf_file_input, model, file_record, dry_run=False, live_publish_results=False): + ## XXX: Removed qml processing (Mapbox) + #qml_file_ = glob.glob(path.join(tmpdir, '*.qml')) + #if qml_file_: + # process_qml(qml_file_[0], model) + + new_or_updated_items = {} + now = datetime.datetime.now() + bad_geometry_types = {} + user_feedback = OrderedDict() + to_keep = [] + + ## If the model has a project (eg. all survey models, see BaseSurveyModel), + ## get only the features with the project given in the fileImport object + if hasattr(model, 'project_id'): + if np.isnan(file_record.project_id): + raise ImportError('No project defined for that file') + query = model.project_id==file_record.project_id + else: + query = None + + if file_record.status and isinstance(model.status, Column): + query = and_(query, model.status==file_record.status) + + ## Explode the input (in case of multi-geometries) + gdf_file = gdf_file_input.explode(ignore_index=True) + + gdf_db = await model.get_geo_df(query, with_related=False) + gdf_db.reset_index(inplace=True) + + ## Cleanup input: drop empty geometries and duplicates + nb_all = len(gdf_file) + gdf_file.dropna(subset=['geometry'], inplace=True) + gdf_file = gdf_file[~gdf_file.geometry.is_empty] + nb_empty_geom = nb_all - len(gdf_file) + gdf_file.drop_duplicates(inplace=True) + nb_duplicate = nb_all - nb_empty_geom - len(gdf_file) + + ## Reproject + gdf_file = gdf_file.copy().to_crs(conf.crs['db']) + + synonyms = getattr(model, 'synonyms', {'ID': 'id'}) + ## Apply synonyms + for k, v in synonyms.items(): + if k in gdf_file.columns: + gdf_file[v] = gdf_file[k] + + if 'orig_id' in gdf_file.columns: + gdf_file.drop(columns='orig_id', inplace=True) + + ## Make sure the types of the columns of gdf_file and gdf_db match + for col_name, col_type in gdf_db.dtypes.items(): + if col_name in gdf_file.columns: + try: + coerce = gdf_file[col_name].dtypes != col_type + except TypeError: + ## Workaround for strange bug in Pandas: data type not understood + coerce = gdf_file[col_name].dtypes.name != col_type.name + if coerce: + gdf_file[col_name] = gdf_file[col_name].astype(col_type) + + ## Add status + if 'status' not in gdf_file.columns: + gdf_file['status'] = file_record.status + + ## TODO: define exactly the meaning of the 'id' attribute in shapefiles, + ## it gets quite confusing. + ## Drop the id, as it migt conflict with the one in the DB + ## For now in this import process only, rename eventual 'id' to 'orig_id' + if 'id' not in gdf_file.columns: + gdf_file['id'] = None + gdf_file.rename(columns={'id': 'orig_id'}, inplace=True) + gdf_file['orig_id'] = gdf_file['orig_id'].astype(str).astype(object) + + ## Get geometries as pygeos for gdf_file and gdf_db + gdf_file['ewkb'] = gdf_file.geometry.apply(lambda g: dumps_wkb(g, srid=conf.srid, hex=True)) + gdf_db['ewkb'] = gdf_db.geometry.apply(lambda g: dumps_wkb(g, srid=conf.srid, hex=True)) + + self.preprocess(gdf_file) + + await self.get_metadata_from_raw_points(gdf_file, model, file_record) + + ## Diffs: using attribute and spatial merges, see https://geopandas.org/mergingdata.html + ## Attribute diff: identify identical rows, + ## so wont be touched and don't need spatial join + df_identical = gdf_db.merge(gdf_file, how='inner') + + ## Need an id + if 'id' not in gdf_file.columns: + gdf_file['id'] = gdf_file.index + + gdf_diff = gdf_db.merge(gdf_file, on='ewkb', how='outer', + indicator=True, suffixes=('', '__file')) + + to_update = gdf_diff.loc[gdf_diff._merge=='both'] + ## Filter out the identical rows from to_update: update non-geometry attributes, + ## keeping the id + to_update = to_update.loc[~to_update.id.isin(df_identical.id)] + + to_delete = gdf_diff.loc[gdf_diff._merge=='left_only'] + + ## TODO?: try with geometric operations to determine nearest geometry + ## from the file's to the DB's. + ## See: + # - _binary_predicate in geopandas.array, which returns True/False, + # when it could return the id of the matching geometry + ## Meanwhile, just wait for sjoin to be implemented: + # - https://github.com/geopandas/geopandas/pull/1271 + #almost_equals = to_delete.geom_almost_equals(gdf_db.geometry) + #equals = to_delete.geom_equals(gdf_db.geometry) + #equals_exact = to_delete.geom_equals_exact(gdf_db.geometry, tolerance=0.1) + + to_insert = gdf_diff.loc[gdf_diff._merge=='right_only'] + + ## Take column names from the gdf_file (*__file) and rename to the gdf_db. + ## Geometry is a bit of an exception. + to_insert_columns = [col for col in to_insert.columns + if col.endswith('__file') and col != 'geometry__file'] + to_insert = to_insert.drop(columns={col[:-6] for col in to_insert_columns})\ + .rename(columns={col: col[:-6] for col in to_insert_columns})\ + .merge(gdf_diff[['orig_id', 'ewkb']], left_index=True, right_index=True)\ + .drop(columns=['id']) + + to_update_columns = [col for col in to_update.columns + if col.endswith('__file') + and col not in ('geometry__file', 'id__file')] + to_update = to_update.drop(columns={col[:-6] for col in to_update_columns})\ + .rename(columns={col: col[:-6] for col in to_update_columns}) + + to_insert['geometry'] = to_insert['geometry__file'] + to_insert.rename(columns={'orig_id_x': 'orig_id', 'ewkb_x': 'ewkb'}, inplace=True) + to_insert.drop(columns=['orig_id_y', 'ewkb_y'], inplace=True) + + ## Add project to all records in the new records + to_insert['project_id'] = file_record.project_id + + ## Write to DB + if not dry_run: + for _gdf in [to_insert, to_update]: + _gdf['geom'] = _gdf.geometry.apply( + lambda g: dumps_wkb(g, srid=conf.srid, hex=True) + ) + + if len(to_insert) > 0: + inserted = await upsert_df(to_insert, model) + to_insert['id'] = inserted['id'] + await upsert_df(to_update, model) + await delete_df(to_delete, model) + + ## Add feature_import_data + feature_import_data = await FeatureImportData.get_df( + where=FeatureImportData.store==file_record.store + ) + fid_updated = feature_import_data[['id', 'feature_id']]\ + .merge(to_update, left_on='feature_id', right_on='id') + fid_updated['file_path'] = str(file_record.path) + fid_updated['store'] = file_record.store + fid_updated['origin'] = 'shapefile' ## TODO: gpkg + fid_updated['time'] = now + fid_updated['file_md5'] = file_record.md5 + fid_updated.drop(columns=['orig_id'], inplace=True) + fid_updated.rename(columns={'orig_id__file': 'orig_id'}, inplace=True) + ## Write to DB + await upsert_df(fid_updated, FeatureImportData) + ## Same for to_insert + if len(to_insert) > 0: + fid_inserted = pd.DataFrame(columns=fid_updated.columns) + fid_inserted['feature_id'] = inserted['id'] + ## FIXME: fid_inserted['orig_id'] + #fid_inserted['orig_id'] = inserted['orig_id'] + fid_inserted['store'] = file_record.store + fid_inserted['file_path'] = str(file_record.path) + fid_inserted['origin'] = 'shapefile' ## TODO: gpkg + fid_inserted['time'] = now + fid_inserted['file_md5'] = file_record.md5 + await upsert_df(fid_inserted, FeatureImportData) + + ## Find and update tags + if len(to_insert) > 0: + tags = await Tags.get_geo_df(where=Tags.store==file_record.store) + if len(tags) > 0: + matching_tags = gpd.sjoin(to_insert, tags.reset_index()) + new_ref_ids = tags.merge(matching_tags, left_on='ref_id', + right_on='ref_id')[['ref_id', 'id_left']] + new_tags = tags.merge(new_ref_ids, on='ref_id') + new_tags.drop(columns='ref_id', inplace=True) + new_tags.rename(columns={'id_left': 'ref_id'}, inplace=True) + await upsert_df(new_tags, Tags) + + ## Publish on Gisaf live + gis_type = gdf_file.geom_type.iloc[0] + mapbox_type = SHAPELY_TYPE_TO_MAPBOX_TYPE.get(gis_type, None) + mapbox_paint = DEFAULT_MAPBOX_PAINT.get(mapbox_type, {}).copy() + mapbox_layout = DEFAULT_MAPBOX_LAYOUT.get(mapbox_type, {}).copy() + symbol = gisTypeSymbolMap.get(gis_type, '\ue02e') + + live_layers = { + 'insert': { + 'data': to_insert, + 'color': 'blue', + 'opacity': 0.3, + }, + 'update': { + 'data': to_update, + 'color': 'green', + 'opacity': 0.3, + }, + 'delete': { + 'data': to_delete, + 'color': 'red', + 'opacity': 0.3, + }, + } + if live_publish_results: + for name, defn in live_layers.items(): + mapbox_paint[MAPBOX_COLOR_ATTRIBUTE_NAME[mapbox_type]] = defn['color'] + mapbox_paint[MAPBOX_OPACITY_ATTRIBUTE_NAME[mapbox_type]] = defn['opacity'] + await redis_store.publish_gdf( + f'Last import: {name}', + defn['data'], + status='*', + mapbox_paint=mapbox_paint, + mapbox_layout=mapbox_layout, + viewable_role='reviewer', + ) + + user_feedback['Total in file'] = nb_all + if nb_empty_geom > 0: + user_feedback['Empty single geoms in file'] = nb_empty_geom + if nb_duplicate > 0: + user_feedback['Duplicates in file'] = nb_duplicate + user_feedback['Total in database'] = len(gdf_db) + user_feedback['Identical (no change)'] = len(df_identical) + user_feedback['Update attributes'] = len(to_update) + user_feedback['Insert'] = len(to_insert) + user_feedback['Delete'] = len(to_delete) + #summary = len(gdf_db) + len(to_insert) - len(to_delete) - len(gdf_file) + #user_feedback['Summary (+/-)'] = f'{summary:d}' + + return BasketImportResult( + time=now, + message=f"Import successful{' (dry run)' if dry_run else ''}", + details=user_feedback, + ) + + +class LineWorkImporter(GeoDataImporter): + """ + Importer for line work geo data files (from survey) + """ + async def get_metadata_from_raw_points(self, gdf, model, file_record): + """ + Find raw points in the shape and set equip_id, srvyr_id, accur_id + """ + raw_points = await model.raw_model.get_geo_df(where=model.raw_model.project_id==file_record.project_id) + raw_points.drop(columns='orig_id', inplace=True) + raw_points.to_crs(conf.crs['db'], inplace=True) + + ## Take a small buffer around the points + ## due to approximations due to reprojection, inaccurate line work... + buf_raw_points = raw_points.copy() + buf_raw_points['geometry'] = raw_points.buffer(0.0000005, resolution=3) + + ## Drop columns which interfer with metadata coming from raw points + ## In future, the importer might detect and use such metadata in the input file + gdf.drop( + columns=set(gdf.columns).intersection( + {'date', 'accur_id', 'srvyr_id', 'equip_id', 'project_id'} + ), + inplace=True + ) + + sjoin = gpd.sjoin(gdf, buf_raw_points, op='intersects', how='left') + sjoin.drop(columns=['accur_id', 'status_left', 'status_right', 'ewkb'], inplace=True) + sjoin.rename(columns={'index_right': 'raw_point_id'}, inplace=True) + sjoin['raw_point_id'] = sjoin['raw_point_id'].astype('Int64') + + ## For surveyor info + ## Get the last surveyor, equipment (by date) + ## Sort raw points by date, maintaining the index (raw points) + cols_surveyor_info = ['srvyr_id', 'equip_id', 'date'] + sjoin.set_index([sjoin.index, 'date'], inplace=True) + sjoin.sort_index(ascending=False, inplace=True) + sjoin.reset_index(inplace=True) + sjoin.set_index('level_0', inplace=True) + sjoin.index.name = 'feature_id' + + gdf[cols_surveyor_info] = sjoin.groupby(sjoin.index).head(1)[cols_surveyor_info] + ## Fix issue when all dates are NaT (reset_index converts that column to float, + ## see https://github.com/pandas-dev/pandas/issues/30517) + gdf.date.fillna(pd.NaT, inplace=True) + + ## A bit the same for accuracies: take the lowest one of the raw points for each feature + accuracies = await Accuracy.get_df() + accuracies.set_index('id', inplace=True) + + accuracy_mappings = await AccuracyEquimentSurveyorMapping.get_df( + where=AccuracyEquimentSurveyorMapping.geometry_type == 'Line_work' + ) + accuracy_mappings.drop(columns='geometry_type', inplace=True) + + sjoin = sjoin.reset_index().merge( + accuracy_mappings, + left_on=['srvyr_id', 'equip_id'], + right_on=['surveyor_id', 'equipment_id'] + ) + sjoin = sjoin.merge(accuracies[['accuracy']], left_on='accuracy_id', right_index=True) + + sjoin.set_index(['feature_id', 'accuracy'], inplace=True) + sjoin.sort_index(ascending=False, inplace=True) + sjoin.reset_index(inplace=True) + sjoin.set_index('feature_id', inplace=True) + + gdf['accur_id'] = sjoin.groupby(sjoin.index).head(1)['accuracy_id'] + + +hash_max = 2**32 +hash_date_min = datetime.date(year=1900, month=1, day=1).toordinal() + +def point_hash_function(date, project_id, surveyor_id, equipment_id, i): + """ + A function which takes arguments, and generates a "unique", positive + integer hash, reasonably speaking. + We could just use hash() %% 2*32, that would be quite reasonable, but hard to reverse. + A simply resersable hash, would be ideally with a number generated + with human readability in mind, like: + return int('{:02d}{:03d}{:03d}{:01d}{:d}'.format( + project_id % hash_project_max, + date.toordinal() - hash_date_min, + surveyor_id % hash_surveyor_max, + equipment_id % hash_equipment_max, + i + )) % 2**32 + But i don't think it's reliable enough and would be super confusing actually as soon as + one of the parameters overflows its limits. + + Using https://stackoverflow.com/questions/4273466/reversible-hash-function, comment 7, + try a reversable function with some mathematical base. + + Choose 2 big primes from https://www.bigprimes.net/archive/prime/14000000/ : + 32416190039 and 32416190071 + + num = '{:03d}{:05d}{:03d}{:02d}{:05d}'.format( + project_id % hash_project_max, + date.toordinal() - hash_date_min, + surveyor_id % hash_surveyor_max, + equipment_id % hash_equipment_max, + i + ) + + The reverse hash function, for the record, if we ever use a reversable hash based on coprimes, + would look like, maybe: + return (h_ * 32416190071) % 2**32 + # Then we would return the inverse of the format function in point_hash_function, easy ;) + + No good, so hash() and no smart reverse: have to use brute force to reverse. + """ + h_ = hash(( + project_id, + date.toordinal() - hash_date_min, + surveyor_id, + equipment_id, + i + )) + return h_ % hash_max diff --git a/src/gisaf/ipynb_tools.py b/src/gisaf/ipynb_tools.py index 467975f..c456aee 100644 --- a/src/gisaf/ipynb_tools.py +++ b/src/gisaf/ipynb_tools.py @@ -21,10 +21,10 @@ from sqlalchemy import create_engine # from shapely import wkb -from .config import conf -from .redis_tools import store as redis_store -from .live import live_server -from .registry import registry +from gisaf.config import conf +from gisaf.redis_tools import store as redis_store +from gisaf.live import live_server +from gisaf.registry import registry ## For base maps: contextily try: @@ -285,7 +285,7 @@ class Gisaf: ## Drop existing if replace_all: - engine.execute('DELETE FROM "{}"."{}"'.format(model.__table_args__['schema'], model.__tablename__)) + engine.execute('DELETE FROM "{}"."{}"'.format(model.metadata.schema, model.__tablename__)) else: raise NotImplementedError('ipynb_tools.Gisaf.to_layer does not support updates yet') diff --git a/src/gisaf/live.py b/src/gisaf/live.py index 79c84f9..bc69984 100644 --- a/src/gisaf/live.py +++ b/src/gisaf/live.py @@ -5,7 +5,7 @@ from collections import defaultdict from fastapi import FastAPI, WebSocket, WebSocketDisconnect # from .config import conf -from .redis_tools import store +from gisaf.redis_tools import store logger = logging.getLogger(__name__) diff --git a/src/gisaf/models/admin.py b/src/gisaf/models/admin.py new file mode 100644 index 0000000..dec058e --- /dev/null +++ b/src/gisaf/models/admin.py @@ -0,0 +1,129 @@ +import re +from datetime import datetime + +from sqlmodel import Field, SQLModel, MetaData, JSON, TEXT, Relationship, Column +import pandas as pd + +# from graphene import ObjectType, Int, String, DateTime, List + +from gisaf.models.models_base import Model +from gisaf.models.survey import Surveyor, Equipment +from gisaf.models.project import Project + + +re_file_import_record_date_expr = '^(\S+)-(\d\d\d\d)-(\d\d)-(\d\d).*$' +re_file_import_record_date_combined_expr = '^\S+-(\d\d\d\d-\d\d-\d\d).*$' +re_file_import_record_date = re.compile(re_file_import_record_date_expr) + + +class BadSurveyFileName(Exception): + pass + + +def get_file_import_date(record): + """ + Utility function that returns the date of survey from the file name, + if it matches the convention for CSV survey files. + Return exception otherwise. + """ + fname_search = re_file_import_record_date.search(record['name']) + if not fname_search: + raise BadSurveyFileName( + 'The file name is not OK ' + '(format should be: "PPP-DESCRIPTION-YYYY-MM-DD", ' + 'PPP being the project name, DESCRITION is optional and discarded)' + ) + return datetime.date(day=int(fname_search.group(4)), + month=int(fname_search.group(3)), + year=int(fname_search.group(2))) + + +class FileImport(Model): + """ + Files to import or imported in the DB. + Give either url or path. + """ + __tablename__ = 'file_import' + __table_args__ = {'schema' : 'gisaf_admin'} + + id: int | None = Field(default=None, primary_key=True) + url: str + ## TODO: Deprecate FileImport.path, in favour of dir + name + path: str + dir: str + name: str + md5: str + time: datetime + comment: str + status: str + store: str + basket: str + project_id: int = Field(foreign_key='project.id') + # ALTER TABLE gisaf_admin.file_import add column project_id INT REFERENCES gisaf_admin.project; + surveyor_id: int = Field(foreign_key='surveyor.id') + # ALTER TABLE gisaf_admin.file_import add column surveyor_id INT REFERENCES gisaf_survey.surveyor; + equipment_id: int = Field(foreign_key='equipment.id') + # ALTER TABLE gisaf_admin.file_import add column equipment_id INT REFERENCES gisaf_survey.equipment; + + def __str__(self): + return f'{self.path:s} for project id {self.project_id}' + + def __repr__(self): + return f'' + + @classmethod + def dyn_join_with(cls): + return { + 'project': Project, + 'surveyor': Surveyor, + 'equipment': Equipment, + } + + def set_import_time(self): + self.time = datetime.now() + db.session.commit() + + @classmethod + async def get_df(cls, *args, **kwargs): + """ + Add a column 'date' based on the file name + """ + df = await super().get_df(*args, **kwargs) + dates = df['name'].str.extract(re_file_import_record_date_combined_expr) + df['date'] = pd.to_datetime(dates[0], format='%Y-%m-%d') + return df + + + #def get_parent_dir(self): + # split_path = self.path.split(os_path.sep) + # if len(split_path) == 1: + # return None + # else: + # return split_path[-2] + + #def get_absolute_path(self): + # return os_path.join(conf.shapefiles_import_path, self.path) + + #def exists_in_file_system(self): + # """ + # Check if the file exists + # :return: + # """ + # return os_path.exists(self.get_absolute_path()) + + +class FeatureImportData(Model): + """ + Keep track of imported data, typically from shapefiles + """ + __tablename__ = 'feature_import_data' + __table_args__ = {'schema' : 'gisaf_admin'} + + id: int | None = Field(default=None, primary_key=True) + store: str = Field(index=True) + feature_id: int = Field(index=True) + orig_id: int + time: datetime + origin: str + file_path: str + file_md5: str diff --git a/src/gisaf/models/authentication.py b/src/gisaf/models/authentication.py index 39c7047..62d7875 100644 --- a/src/gisaf/models/authentication.py +++ b/src/gisaf/models/authentication.py @@ -1,10 +1,10 @@ from sqlmodel import Field, SQLModel, MetaData, Relationship -from .metadata import gisaf_admin +from gisaf.models.metadata import gisaf_admin class UserRoleLink(SQLModel, table=True): metadata = gisaf_admin - __tablename__: str = 'roles_users' + __tablename__ = 'roles_users' user_id: int | None = Field( default=None, foreign_key="user.id", primary_key=True ) diff --git a/src/gisaf/models/bootstrap.py b/src/gisaf/models/bootstrap.py index cbfc79b..9a48671 100644 --- a/src/gisaf/models/bootstrap.py +++ b/src/gisaf/models/bootstrap.py @@ -1,6 +1,7 @@ from pydantic import BaseModel -from ..config import conf, Map, Measures, Geo -from .authentication import UserRead + +from gisaf.config import conf, Map, Measures, Geo +from gisaf.models.authentication import UserRead class Proj(BaseModel): srid: str diff --git a/src/gisaf/models/category.py b/src/gisaf/models/category.py index c99a03b..242c564 100644 --- a/src/gisaf/models/category.py +++ b/src/gisaf/models/category.py @@ -1,10 +1,11 @@ from typing import Any, ClassVar +from sqlalchemy import String from pydantic import computed_field, ConfigDict -from sqlmodel import Field, Relationship, SQLModel, JSON, TEXT, Column, select +from sqlmodel import Field, Relationship, SQLModel, JSON, TEXT, select -from .metadata import gisaf_survey -from ..database import db_session, pandas_query +from gisaf.models.metadata import gisaf_survey +from gisaf.database import db_session, pandas_query mapbox_type_mapping = { 'Point': 'symbol', @@ -23,8 +24,7 @@ class BaseModel(SQLModel): class CategoryGroup(BaseModel, table=True): metadata = gisaf_survey __tablename__ = 'category_group' - name: str | None = Field(min_length=4, max_length=4, - default=None, primary_key=True) + name: str | None = Field(sa_type=String(4), default=None, primary_key=True) major: str long_name: str categories: list['Category'] = Relationship(back_populates='category_group') @@ -37,7 +37,7 @@ class CategoryGroup(BaseModel, table=True): class CategoryModelType(BaseModel, table=True): metadata = gisaf_survey __tablename__ = 'category_model_type' - name: str = Field(default=None, primary_key=True) + name: str | None = Field(default=None, primary_key=True) class Admin: menu = 'Other' @@ -45,7 +45,7 @@ class CategoryModelType(BaseModel, table=True): class CategoryBase(BaseModel): - model_config = ConfigDict(protected_namespaces=()) + model_config = ConfigDict(protected_namespaces=()) # type: ignore class Admin: menu = 'Other' flask_admin_model_view = 'CategoryModelView' @@ -113,7 +113,7 @@ class CategoryBase(BaseModel): class Category(CategoryBase, table=True): metadata = gisaf_survey - name: str = Field(default=None, primary_key=True) + name: str | None = Field(default=None, primary_key=True) category_group: CategoryGroup = Relationship(back_populates="categories") diff --git a/src/gisaf/models/geo_models_base.py b/src/gisaf/models/geo_models_base.py index ddf2a4d..e72cce0 100644 --- a/src/gisaf/models/geo_models_base.py +++ b/src/gisaf/models/geo_models_base.py @@ -6,13 +6,10 @@ from io import BytesIO from zipfile import ZipFile import locale import logging -from json import dumps -import numpy as np -import pandas as pd -import geopandas as gpd +import geopandas as gpd # type: ignore -import shapely +import shapely # type: ignore import pyproj from sqlmodel import SQLModel, Field @@ -29,24 +26,24 @@ from geoalchemy2.types import Geometry, WKBElement from shapely import wkb from shapely.geometry import mapping -from shapely.ops import transform +from shapely.ops import transform # type: ignore -from shapefile import (Writer as ShapeFileWriter, - POINT, POINTZ, - POLYLINE, POLYLINEZ, - POLYGON, POLYGONZ, - ) +from shapefile import ( + Writer as ShapeFileWriter, # type: ignore + POINT, POINTZ, + POLYLINE, POLYLINEZ, + POLYGON, POLYGONZ, +) -from ..database import db_session -from ..config import conf -from .models_base import Model -from ..models.metadata import survey, raw_survey -from ..utils import upsert_df -from .survey import Equipment, Surveyor, Accuracy -from .misc import Qml -from .category import Category -from .project import Project +from gisaf.database import db_session +from gisaf.config import conf +from gisaf.models.models_base import Model +from gisaf.models.metadata import survey, raw_survey +from gisaf.models.survey import Equipment, Surveyor, Accuracy +from gisaf.models.misc import Qml +from gisaf.models.category import Category +from gisaf.models.project import Project LOCALE_DATE_FORMAT = locale.nl_langinfo(locale.D_FMT) @@ -84,7 +81,7 @@ class BaseSurveyModel(BaseModel): - raw survey (RAW_V_*') - projected ('V_*') """ - id: int = Field(sa_type=BigInteger, primary_key=True, default=None) + id: int | None = Field(sa_type=BigInteger, primary_key=True, default=None) equip_id: int = Field(foreign_key='equipment.id') srvyr_id: int = Field('surveyor.id') accur_id: int = Field('accuracy.id') @@ -368,7 +365,7 @@ class GeoModel(Model): """ return OrderedDict() - async def get_info(self): + async def get_info(self) -> dict[str, str]: """ Model specific info """ @@ -1088,7 +1085,7 @@ class PlottableModel(Model): #__abstract__ = True float_format: ClassVar[str] = '%.1f' - values: dict[Any, Any] = {} + values: ClassVar[list[dict[str, str]]] = [] @classmethod async def get_as_dataframe(cls, model_id=None, where=None, **kwargs): diff --git a/src/gisaf/models/map_bases.py b/src/gisaf/models/map_bases.py index 0a046f6..8ce84bc 100644 --- a/src/gisaf/models/map_bases.py +++ b/src/gisaf/models/map_bases.py @@ -2,8 +2,8 @@ from typing import Any from sqlmodel import Field, String, JSON, Column -from .models_base import Model -from .metadata import gisaf_map +from gisaf.models.models_base import Model +from gisaf.models.metadata import gisaf_map class BaseStyle(Model): @@ -14,7 +14,7 @@ class BaseStyle(Model): menu = 'Other' flask_admin_model_view = 'MapBaseStyleModelView' - id: int = Field(primary_key=True) + id: int | None = Field(primary_key=True, default=None) name: str style: dict[str, Any] | None = Field(sa_type=JSON(none_as_null=True)) mbtiles: str = Field(sa_type=String(50)) @@ -32,7 +32,7 @@ class BaseMap(Model): class Admin: menu = 'Other' - id: int = Field(primary_key=True) + id: int | None = Field(primary_key=True, default=None) name: str def __repr__(self): @@ -49,7 +49,7 @@ class BaseMapLayer(Model): class Admin: menu = 'Other' - id: int = Field(primary_key=True) + id: int | None = Field(primary_key=True, default=None) base_map_id: int = Field(foreign_key='base_map.id', index=True) store: str = Field(sa_type=String(100)) diff --git a/src/gisaf/models/metadata.py b/src/gisaf/models/metadata.py index 7563e97..a87ac1f 100644 --- a/src/gisaf/models/metadata.py +++ b/src/gisaf/models/metadata.py @@ -1,6 +1,6 @@ from sqlmodel import MetaData -from ..config import conf +from gisaf.config import conf gisaf = MetaData(schema='gisaf') gisaf_survey = MetaData(schema='gisaf_survey') diff --git a/src/gisaf/models/misc.py b/src/gisaf/models/misc.py index 0e40c69..b4161d9 100644 --- a/src/gisaf/models/misc.py +++ b/src/gisaf/models/misc.py @@ -4,8 +4,8 @@ from pydantic import ConfigDict from sqlmodel import Field, JSON, Column -from .models_base import Model -from .metadata import gisaf_map +from gisaf.models.models_base import Model +from gisaf.models.metadata import gisaf_map logger = logging.getLogger(__name__) @@ -25,7 +25,7 @@ class Qml(Model): menu = 'Other' flask_admin_model_view = 'QmlModelView' - model_name: str = Field(default=None, primary_key=True) + model_name: str | None = Field(default=None, primary_key=True) qml: str attr: str style: str diff --git a/src/gisaf/models/project.py b/src/gisaf/models/project.py index 6e46e9d..d099ea3 100644 --- a/src/gisaf/models/project.py +++ b/src/gisaf/models/project.py @@ -7,9 +7,9 @@ from sqlmodel import Field, SQLModel, MetaData, JSON, TEXT, Relationship, Column import pyproj from shapely.geometry import Point -from ..config import conf -from .models_base import Model -from .metadata import gisaf_admin +from gisaf.config import conf +from gisaf.models.models_base import Model +from gisaf.models.metadata import gisaf_admin class Project(Model): metadata = gisaf_admin @@ -18,7 +18,7 @@ class Project(Model): menu = 'Other' flask_admin_model_view = 'ProjectModelView' - id: int = Field(default=None, primary_key=True) + id: int | None = Field(default=None, primary_key=True) name: str contact_person: str site: str diff --git a/src/gisaf/models/raw_survey.py b/src/gisaf/models/raw_survey.py index a26b9e6..d1905c1 100644 --- a/src/gisaf/models/raw_survey.py +++ b/src/gisaf/models/raw_survey.py @@ -1,18 +1,18 @@ from typing import ClassVar from sqlmodel import Field, BigInteger -from .models_base import Model -from .geo_models_base import GeoPointMModel, BaseSurveyModel -from .project import Project -from .category import Category -from .metadata import gisaf_survey +from gisaf.models.models_base import Model +from gisaf.models.geo_models_base import GeoPointMModel, BaseSurveyModel +from gisaf.models.project import Project +from gisaf.models.category import Category +from gisaf.models.metadata import gisaf_survey class RawSurveyModel(BaseSurveyModel, GeoPointMModel): metadata = gisaf_survey __tablename__ = 'raw_survey' hidden: ClassVar[bool] = True - id: int = Field(default=None, primary_key=True) + id: int | None = Field(default=None, primary_key=True) project_id: int | None = Field(foreign_key='project.id') category: str = Field(foreign_key='category.name') in_menu: bool = False @@ -87,7 +87,7 @@ class OriginRawPoint(Model): metadata = gisaf_survey __tablename__ = 'origin_raw_point' - id: int = Field(default=None, primary_key=True) + id: int | None = Field(default=None, primary_key=True) shape_table: str = Field(index=True) shape_id: int = Field(index=True) raw_point_id: int = Field(sa_type=BigInteger()) diff --git a/src/gisaf/models/reconcile.py b/src/gisaf/models/reconcile.py index 3724238..08db7b8 100644 --- a/src/gisaf/models/reconcile.py +++ b/src/gisaf/models/reconcile.py @@ -1,9 +1,10 @@ from datetime import datetime -from sqlalchemy import BigInteger -from sqlmodel import Field, SQLModel, MetaData, JSON, TEXT, Relationship, Column, String -from .models_base import Model -from .metadata import gisaf_admin +from sqlalchemy import BigInteger, String +from sqlmodel import Field + +from gisaf.models.models_base import Model +from gisaf.models.metadata import gisaf_admin class Reconciliation(Model): diff --git a/src/gisaf/models/store.py b/src/gisaf/models/store.py index 5ee621f..f33d913 100644 --- a/src/gisaf/models/store.py +++ b/src/gisaf/models/store.py @@ -1,6 +1,6 @@ from typing import Any from pydantic import BaseModel -from .geo_models_base import GeoModel, RawSurveyBaseModel, GeoPointSurveyModel +from gisaf.models.geo_models_base import GeoModel, RawSurveyBaseModel, GeoPointSurveyModel class MapLibreStyle(BaseModel): diff --git a/src/gisaf/models/survey.py b/src/gisaf/models/survey.py index 8fc55b3..501b93f 100644 --- a/src/gisaf/models/survey.py +++ b/src/gisaf/models/survey.py @@ -1,9 +1,9 @@ from enum import Enum -from sqlmodel import Field, SQLModel +from sqlmodel import Field -from .models_base import Model -from .metadata import gisaf_survey +from gisaf.models.models_base import Model +from gisaf.models.metadata import gisaf_survey class Accuracy(Model): @@ -13,7 +13,7 @@ class Accuracy(Model): menu = 'Other' flask_admin_model_view = 'MyModelViewWithPrimaryKey' - id: int = Field(default=None, primary_key=True) + id: int | None = Field(default=None, primary_key=True) name: str accuracy: float @@ -31,7 +31,7 @@ class Surveyor(Model): menu = 'Other' flask_admin_model_view = 'MyModelViewWithPrimaryKey' - id: int = Field(default=None, primary_key=True) + id: int | None = Field(default=None, primary_key=True) name: str def __str__(self): @@ -48,7 +48,7 @@ class Equipment(Model): menu = 'Other' flask_admin_model_view = 'MyModelViewWithPrimaryKey' - id: int = Field(default=None, primary_key=True) + id: int | None = Field(default=None, primary_key=True) name: str def __str__(self): @@ -68,7 +68,7 @@ class AccuracyEquimentSurveyorMapping(Model): class Admin: menu = 'Other' - id: int = Field(default=None, primary_key=True) + id: int | None= Field(default=None, primary_key=True) surveyor_id: int = Field(foreign_key='surveyor.id', index=True) equipment_id: int = Field(foreign_key='equipment.id', index=True) geometry_type: GeometryType = Field(default='Point', index=True) diff --git a/src/gisaf/models/tags.py b/src/gisaf/models/tags.py index 9c8d5e6..5ee87c1 100644 --- a/src/gisaf/models/tags.py +++ b/src/gisaf/models/tags.py @@ -5,8 +5,8 @@ from sqlalchemy.dialects.postgresql import HSTORE from sqlmodel import Field, SQLModel, MetaData, JSON, TEXT, Relationship, Column from pydantic import computed_field -from .metadata import gisaf -from .geo_models_base import GeoPointModel +from gisaf.models.metadata import gisaf +from gisaf.models.geo_models_base import GeoPointModel class Tags(GeoPointModel, table=True): metadata = gisaf @@ -16,7 +16,7 @@ class Tags(GeoPointModel, table=True): menu = 'Other' flask_admin_model_view = 'TagModelView' - id: int | None = Field(primary_key=True) + id: int | None = Field(primary_key=True, default=None) store: str = Field(index=True) ref_id: int = Field(index=True, sa_type=BigInteger) tags: dict = Field(sa_type=MutableDict.as_mutable(HSTORE)) @@ -36,7 +36,7 @@ class TagKey(SQLModel, table=True): menu = 'Other' flask_admin_model_view = 'TagKeyModelView' - id: str | None = Field(primary_key=True) + id: str | None = Field(default=None, primary_key=True) def __str__(self): return self.key diff --git a/src/gisaf/models/to_migrate.py b/src/gisaf/models/to_migrate.py new file mode 100644 index 0000000..13c34ac --- /dev/null +++ b/src/gisaf/models/to_migrate.py @@ -0,0 +1,18 @@ +from pydantic import BaseModel + +class ActionResult(BaseModel): + message: str + +class ActionResults(BaseModel): + name: str + message: str + actionResults: list[ActionResult] + +class FormField(BaseModel): + name: str + type: str + +class ModelAction(BaseModel): + name: str + icon: str + formFields: list[FormField] diff --git a/src/gisaf/plugins.py b/src/gisaf/plugins.py new file mode 100644 index 0000000..00660a3 --- /dev/null +++ b/src/gisaf/plugins.py @@ -0,0 +1,560 @@ +import logging +import re +from collections import defaultdict +from importlib.metadata import entry_points +from itertools import chain +from datetime import datetime + +# from aiohttp.web_exceptions import HTTPUnauthorized +# from aiohttp_security import check_permission + +from sqlalchemy import or_, and_ +# from geoalchemy2.shape import to_shape, from_shape +# from graphene import ObjectType, String, List, Boolean, Field, Float, InputObjectType + +import pandas as pd +import shapely + +from gisaf.config import conf +from gisaf.models.tags import Tags as TagsModel +from gisaf.utils import upsert_df +from gisaf.models.reconcile import StatusChange +from gisaf.models.to_migrate import ( + ActionResults, +) +# from gisaf.models.graphql import ( +# Action, +# ActionAction, +# ActionParam, +# ActionParamInput, +# ActionResult, +# ActionResults, +# ActionsResults, +# ActionsStore, +# Downloader, +# FormField, +# Tag, +# TagAction, +# TagActions, +# TagKeyList, +# TaggedFeature, +# TaggedLayer, +# TagsStore, +# TagsStores, +# ) + +## GraphQL object types +## TODO: move to models.graphql + + +logger = logging.getLogger('Gisaf plugin manager') + + +class NoSuchAction(Exception): + pass + + +class ActionPlugin: + """ + Base class for all actions plugins. + """ + def __init__(self, name, stores=None, stores_by_re=None, + icon='build', roles=None, params=None, + form_fields=None): + self.name = name + self.stores = stores or [] + self.stores_by_re = stores_by_re or [] + self.roles = roles or [] + self.params = params or {} + self.icon = icon + self.form_fields = form_fields or [] + + +class TagPlugin: + """ + Base class for tag plugins. + Tags can have a "domain". The tag's key is actally like: 'domain:key'. + Keys might be reg exp. + See Link (below) for a very basic example. + """ + + def __init__(self, key='', domain='', + stores=None, stores_by_re=None, roles=None, + save=True, link=None, action=None): + ## self._tags: instanciated tags + self.key = key + self.domain = domain + self.stores = stores or [] + self.stores_by_re = stores_by_re or [] + self.roles = roles or [] + #self.keys = {tag.key: tag(self) for tag in self.tags} + self.save = save + self.link = link + self.action = action + + @property + def full_key(self): + if self.domain: + return f'{self.domain}:{self.key}' + else: + return self.key + + +class DownloadPlugin: + """ + Base class for all download plugins. + """ + def __init__(self, name, stores=None, stores_by_re=None, + icon='cloud_download', roles=None, + form_fields=None): + self.name = name + self.stores = stores or [] + self.stores_by_re = stores_by_re or [] + self.roles = roles or [] + self.icon = icon + + async def execute(self, model, item, request): + raise NotImplementedError(f'Missing execute in downloader {self.name}') + + +class DownloadCSVPlugin(DownloadPlugin): + async def execute(self, model, item, request): + from gisaf.registry import registry + values_model = registry.values_for_model.get(model) + df = await values_model.get_as_dataframe(model_id=item.id) + csv = df.to_csv(date_format='%d/%m/%Y %H:%M', float_format=values_model.float_format) + return { + 'file_name': '{:s}.csv'.format(item.caption), + 'content_type': 'text/csv', + 'content': csv + } + + +class PluginManager: + """ + Application wide manager of the plugins. + One instance only, handled by Gisaf's process. + """ + def setup(self, app): + self.app = app + for entry_point in entry_points().select(group='gisaf_extras.context'): + try: + context = entry_point.load() + except ModuleNotFoundError as err: + logger.warning(err) + continue + self.app.cleanup_ctx.append(context) + logger.info(f'Added context for {entry_point.name}') + + async def scan_plugins(self, app): + """ + Scan tag and action plugins from the Python entry points. + Get all references of the tags defined in modules ad build a registry of: + keys, domains, action names and executors, etc. + Runs at startup; re-runing it later has not been tested. + """ + self.tags_plugins = {} + self.tags_models = defaultdict(lambda: defaultdict(lambda: defaultdict(list))) + self.tags_domains = defaultdict(list) + + #self.actions_plugins = {} + self.actions_stores = defaultdict(lambda: defaultdict(list)) + self.executors = defaultdict(list) + self.downloaders = defaultdict(list) + self.downloaders_stores = defaultdict(list) + self.actions_names = defaultdict(list) + + registered_models = app['registry'].geom + registered_stores = registered_models.keys() + + for entry_point in entry_points().select(group='gisaf_extras.tags'): + try: + tag = entry_point.load() + except ModuleNotFoundError as err: + logger.warning(err) + continue + ## Keys, domains + self.tags_domains[tag.domain].append(tag) + stores = tag.stores + + ## Stores to which the tags apply + for _tag in tag.stores_by_re: + _re = re.compile(_tag) + for store in registered_stores: + if _re.match(store) and store not in stores: + stores.append(store) + + for store in stores: + model = registered_models.get(store) + if not model: + logger.warn(f'Tag plugin {entry_point.name}: skip model {store}' + ', which is not found in registry') + continue + + ## Actions + ## For graphql queries + self.tags_models[model][tag.domain][tag.key].append( + TagAction( + #plugin=plugin.__class__.__name__, + roles=tag.roles, + link=tag.link, + action=tag.action, + save=tag.save, + ) + ) + logger.info(f'Added tags plugin {entry_point.name} for {len(stores)} stores') + + for entry_point in entry_points().select(group='gisaf_extras.actions'): + try: + action = entry_point.load() + except ModuleNotFoundError as err: + logger.warning(err) + continue + self.executors[action.name].append(action) + #self.actions_names[action.name].append(plugin) + stores = action.stores + for _action in action.stores_by_re: + _re = re.compile(_action) + for store in registered_stores: + if _re.match(store) and store not in stores: + stores.append(store) + for store in stores: + model = registered_models.get(store) + if not model: + logger.warn(f'Action plugin {entry_point.name}: skip model {store}' + ', which is not found in registry') + continue + self.actions_stores[store][action.name].append( + ActionAction( + action=action, + roles=action.roles, + #plugin=plugin.__class__.__name__, + name=action.name, + params=action.params, + icon=action.icon, + formFields=action.form_fields + ) + ) + logger.info(f'Added action plugin {entry_point.name}') + + for entry_point in entry_points().select(group='gisaf_extras.downloaders'): + try: + downloader = entry_point.load() + except ModuleNotFoundError as err: + logger.warning(err) + continue + self.downloaders[downloader.name].append(downloader) + stores = downloader.stores + for _downloader in downloader.stores_by_re: + _re = re.compile(_downloader) + for store in registered_stores: + if _re.match(store) and store not in stores: + stores.append(store) + for store in stores: + model = registered_models.get(store) + if not model: + logger.warn(f'Downloader plugin {entry_point.name}: skip model {store}' + ', which is not found in registry') + continue + self.downloaders_stores[store].append( + Downloader( + downloader=downloader, + roles=downloader.roles, + name=downloader.name, + icon=downloader.icon, + ) + ) + logger.info(f'Added downloader plugin {entry_point.name}') + + self.tagsStores = TagsStores( + stores=[ + TagsStore( + store=model.get_store_name(), + tagActions=[ + TagActions(domain=domain, key=key, actions=actions) + for domain, tag_keys in tag_domains.items() + for key, actions in tag_keys.items() + ] + ) + for model, tag_domains in self.tags_models.items() + ] + ) + + self.actionsStores = [ + ActionsStore( + store=store, + actions=[ + Action( + name=name, + roles=[rr for rr in set( + [r for r in chain.from_iterable( + [aa._roles for aa in action_actions] + )] + )], + params=[rr for rr in set( + [r for r in chain.from_iterable( + [aa._params for aa in action_actions] + )] + )], + ) + for name, action_actions in actions.items() + ] + ) + for store, actions in self.actions_stores.items() + ] + + app['plugins'] = self + + async def do_tag_action(self, request, store, id, plugin_name, value, action): + logger.warning('FIXME: in do_tag_action: self.tags_plugins is never populated!') + ## FIXME: self.tags_plugins is never populated! + plugin = self.tags_plugins[plugin_name] + ## Check permission: roles + ## XXX: no way to check several roles? + if len(plugin.tag.roles) > 0: + allowed = False + for role in plugin.tag.roles: + try: + await check_permission(request, role) + except HTTPUnauthorized as err: + pass + else: + allowed = True + if not allowed: + raise Exception('Not authorized') + ## Execute + result = await plugin.tag.execute(request, store, int(id), value) + return result + + async def execute_tag_mutation(self, features, key, value): + """ + Execute tag action on the features (store and ids) + """ + results = [] + if ':' in key: + domain, key = key.split(':', 1) + else: + domain = '' + try: + plugins = self.tags_domains[domain] + except KeyError: + logger.warn(f"No tag domain '{domain}' defined by any plugin") + return + for plugin in plugins: + if key != plugin.key: + continue + features_for_action = {store: ids + for store, ids in features.items() + if store in plugin.stores} + if features_for_action: + if plugin.save: + await self.save_tags(features_for_action, domain, key, value) + #results.append(await action(features_for_action, key, value)) + return ', '.join(results) + + async def execute_action(self, request, features, name, params, form_fields): + """ + Execute the plugin action by calling the executor's execute function. + It is up to the plugin action to check for security, using eg: + + from aiohttp_security import check_permission + ... + await check_permission(request, 'role') + """ + results = [] + try: + plugins = self.actions_names[name] + except KeyError: + raise NoSuchAction + for executor in self.executors[name]: + ## TODO: get features from DB? + + ## Check permission + if executor.roles: + authorized = False + for role in executor.roles: + try: + await check_permission(request, role) + except HTTPUnauthorized as err: + pass + else: + authorized = True + break + else: + ## No roles: OK for anonymous + authorized = True + + if authorized: + result = await executor.execute( + request, features, params, + **{field['name']: field for field in form_fields} + ) + result.name = name + results.append(result) + else: + raise HTTPUnauthorized + + return ActionsResults( + actionResults=results + ) + + #for store, ids in all_features.items(): + # actions = self.actions_stores[store] + # for action in actions[name]: + # action._action.execute(ids, params) + # pass + #for plugin in plugins: + # for action in action_plugin.actions: + # for executor in action_plugin.actions: + # result = executor.execute(features, params) + # pass + + async def save_tags(self, features, domain, key, value): + """ + Standard tagging - should be called explicitely by plugin actions + """ + filter = [ + and_(TagsModel.store == store, TagsModel.ref_id.in_(ids)) + for store, ids in features.items() + ] + tags_df = await TagsModel.get_df(where=or_(*filter), geom_as_ewkt=True) + tags_df['tags'] = tags_df.tags.astype('object') + + for store, ids in features.items(): + model = self.app['registry'].stores.loc[store, 'model'] + feature_df = await model.get_df(where=model.id.in_(ids)) + feature_df['centroid'] = shapely.centroid(shapely.from_wkb(feature_df['geom'])) + tagged_feature_df = tags_df[tags_df.store==store].merge( + feature_df[['id', 'centroid']], + left_on='ref_id', right_on='id', + suffixes=('', '_feature'), + how='outer', indicator=True, + ) + + if domain: + full_key = ':'.join([domain, key]) + else: + full_key = key + ## Fill missing values (no existing tags) + ## Insert without pk (id) for new tags records + new_tagged_feature_df = tagged_feature_df[tagged_feature_df['_merge']=='right_only'].drop(columns='id') + new_tagged_feature_df['tags'] = [{full_key: value}] * len(new_tagged_feature_df) + new_tagged_feature_df['ref_id'] = new_tagged_feature_df['id_feature'] + new_tagged_feature_df['store'] = store + new_tagged_feature_df['geom'] = shapely.to_wkb(new_tagged_feature_df['centroid'], hex=True, include_srid=True) + + ## Existing tags: add key: value + existing_tagged_feature_df = tagged_feature_df[tagged_feature_df['_merge']!='right_only'] + for row in existing_tagged_feature_df.itertuples(): + row.tags[full_key] = value + + ## Save + await upsert_df(new_tagged_feature_df, TagsModel) + await upsert_df(existing_tagged_feature_df, TagsModel) + + + +"""Generic tag plugin for creating a link with a simple URL""" +link_tag = TagPlugin( + key='link', + stores_by_re=[''], ## Match everything + link='{value}', +) + + +class ChangeFeatureStatus(ActionPlugin): + async def execute(self, request, all_features, params, **kwargs): + results = ActionResults(actionResults=[]) + registry = request.app['registry'] + status = kwargs['Status']['value'] + for store, ids in all_features.items(): + model = registry.stores.loc[store, 'model'] + raw_model = registry.stores.loc[store, 'raw_survey_model'] + + features = await model.get_df(where=model.id.in_(ids), geom_as_ewkt=True) + raw_features = await raw_model.get_df(where=raw_model.id.in_(ids), geom_as_ewkt=True) + + ## Find the features with a status to actually change + features = features.loc[features.status!=status] + features['original_status'] = features['status'] + raw_features = raw_features.loc[raw_features.status!=status] + + features['status'] = status + raw_features['status'] = status + await upsert_df(features, model) + await upsert_df(raw_features, raw_model) + + ## Store the status change record + for row in features.itertuples(index=False): + await StatusChange.create( + store=store, + ref_id=row.id, + original=row.original_status, + new=status, + time=datetime.now() + ) + results.actionResults.append(ActionResult(message=f'Changed status of {len(features)} {store}(s)')) + + return results + + +async def create_tags(features, keys, values): + """ + Create the tags (keys: values) for the features. + Return a list of dataframes + """ + from gisaf.registry import registry + result = [] + for store, ids in features.items(): + model = registry.stores.loc[store, 'model'] + tags_df = await TagsModel.get_df( + where=and_(TagsModel.store == store, TagsModel.ref_id.in_(ids)), + geom_as_ewkt=True + ) + feature_df = await model.get_df(where=model.id.in_(ids)) + feature_df['centroid'] = shapely.centroid(shapely.from_wkb(feature_df['geom'])) + tagged_feature_df = tags_df[tags_df.store==store].merge( + feature_df[['id', 'centroid']], + left_on='ref_id', right_on='id', + suffixes=('', '_feature'), + how='outer', indicator=True, + ) + + ## Fill missing values (no existing tags) + ## Insert without pk (id) for new tags records + key_values = dict(zip(keys, values)) + new_tagged_feature_df = tagged_feature_df[tagged_feature_df['_merge']=='right_only'].drop(columns='id') + new_tagged_feature_df['tags'] = [key_values] * len(new_tagged_feature_df) + new_tagged_feature_df['ref_id'] = new_tagged_feature_df['id_feature'] + new_tagged_feature_df['store'] = store + new_tagged_feature_df['geom'] = shapely.to_wkb(new_tagged_feature_df['centroid'], hex=True, include_srid=True) + + ## Existing tags: add key: value + existing_tagged_feature_df = tagged_feature_df[tagged_feature_df['_merge']!='right_only'] + for row in existing_tagged_feature_df.itertuples(): + for key, value in key_values.items(): + row.tags[key] = value + + ## Save + new_tags_df = await upsert_df(new_tagged_feature_df, TagsModel) + await upsert_df(existing_tagged_feature_df, TagsModel) + result.append(pd.concat([existing_tagged_feature_df, new_tags_df])) + + return pd.concat(result) + +from gisaf.utils import ToMigrate +logger.warning(ToMigrate('plugins.change_feature_status (graphql)')) +change_feature_status = ChangeFeatureStatus( + name='Change status', + stores_by_re=[f"{conf.survey.db_schema}"], + roles=['Change status'], + form_fields=[ + # FormField( + # name='Status', + # type='string' + # ) + ] +) + + +manager = PluginManager() diff --git a/src/gisaf/py.typed b/src/gisaf/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/src/gisaf/reactor.py b/src/gisaf/reactor.py new file mode 100755 index 0000000..38918bc --- /dev/null +++ b/src/gisaf/reactor.py @@ -0,0 +1,193 @@ +#!/usr/bin/env python +""" +Gisaf reactor, deals with message processing (mqtt, etc) +""" +import asyncio +import re +import sys +import logging +from typing import Any +from importlib.metadata import entry_points +from collections import OrderedDict + +from aiomqtt import Client, Message + +from gisaf.ipynb_tools import gisaf +from gisaf.config import conf + +logger = logging.getLogger('gisaf.reactor') + + +class Reactor: + def __init__(self): + self.processors = {} + + async def setup(self, exclude_processor_names=None): + if exclude_processor_names == None: + exclude_processor_names = [] + for entry_point in entry_points().select(group='gisaf_message_processors'): + logger.debug( + f'Processing message processor module {entry_point.name}' + ) + try: + message_processor_class = entry_point.load() + except Exception as err: + logger.error(f'Skip message processor module ' \ + f'{entry_point.name}: {err}') + continue + try: + message_processor = message_processor_class() + except Exception as err: + logger.error(f'Skip message processor module ' \ + f'{entry_point.name} (cannot instanciate): {err}') + continue + if not message_processor.enabled: + continue + message_processor.name = entry_point.name + + ## Eventually skip processor according to arguments of the command line + if message_processor.name in exclude_processor_names: + continue + + await message_processor.setup() + self.add_processor(message_processor) + logger.info(f'Added message processor "{entry_point.name}"') + + def get_available_processors(self): + return [ + entry_point.name + for entry_point in entry_points().select(group='gisaf_message_processors') + ] + + def add_processor(self, processor): + try: + processor.topic_re = re.compile(processor.topic) + except Exception as err: + logger.warning(f'Cannot treat topic "{processor.topic}" of '\ + f'"{processor.name}" as reg exp: {err}') + processor.topic_re = None + self.processors[processor.name] = processor + + async def process_unfiltered_messages(self, messages): + async for message in messages: + await self.process_unfiltered_message(message) + + async def process_unfiltered_message(self, message): + ## Log + if len(message.payload)>50: + msg = message.payload[:50].decode() + else: + msg = message.payload.decode() + logger.debug( + f'Got unfiltered message on "{message.topic}" '\ + f'({len(message.payload)} bytes): {msg}' + ) + tasks = OrderedDict() + for name, processor in self.processors.items(): + if processor.topic == message.topic: + match = True + else: + if processor.topic_re: + match = processor.topic_re.fullmatch(message.topic) + else: + match = False + if match: + tasks[processor.name] = processor.process(message) + results = await asyncio.gather(*tasks.values(), return_exceptions=True) + for task_number, task in enumerate(tasks.items()): + result = results[task_number] + if isinstance(result, Exception): + logger.warning(f'Error executing task "{task[0]}" ' \ + f'for topic "{message.topic}": {result}') + + +class MessageProcessorBaseClass: + """ + Base class for all message processors. + Subclasses can set the attribute "context" + (an async context processor) in setup(): + all the contexts will be used when the reactor runs. + """ + enabled: bool = True + topic: str = '' + context: Any = None + + async def setup(self, *args, **kwargs) -> None: + pass + + async def process(self, message) -> None: + pass + + +async def cancel_tasks(tasks): + for task in tasks: + if task.done(): + continue + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + +async def main(list=None, exclude_processor_names=None) -> None: + if list: + reactor = Reactor() + jobs = reactor.get_available_processors() + print(' '.join(jobs)) + sys.exit(0) + await gisaf.setup() + await gisaf.make_models() + reactor = Reactor() + await reactor.setup(exclude_processor_names=exclude_processor_names) + + async with Client( + hostname=conf.gisaf_live.mqtt.broker, + port=conf.gisaf_live.mqtt.port + ) as client: + async with client.messages() as messages: + for name, processor in reactor.processors.items(): + await client.subscribe(processor.topic) + message: Message + async for message in messages: + for name, processor in reactor.processors.items(): + if message.topic.matches(processor.topic): + try: + await processor.process(message) + except Exception as err: + logger.warning( + 'Error while processing message ' + f'{message.topic} by {processor.name}. ' + 'See below for the full trace.' + ) + logger.exception(err) + + +if __name__ == '__main__': + import argparse + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument('--debug', '-d', action='store_true', + help='Print debug messages') + parser.add_argument('--exclude-processor-name', '-e', action='append', + help='Do not run the processor with a given name') + parser.add_argument('--list', '-l', action='store_true', + help='Only list available processors and exit') + + args = parser.parse_args() + if args.debug: + logging.root.setLevel(logging.DEBUG) + logger.setLevel(logging.DEBUG) + else: + logging.root.setLevel(logging.INFO) + logger.setLevel(logging.INFO) + logging.getLogger('websockets').setLevel(logging.WARNING) + logging.getLogger('engineio.client').setLevel(logging.WARNING) + logging.getLogger('socketio.client').setLevel(logging.WARNING) + logging.getLogger('aiosasl').setLevel(logging.WARNING) + + asyncio.run( + main( + list=args.list, + exclude_processor_names=args.exclude_processor_name, + ) + ) diff --git a/src/gisaf/redis_tools.py b/src/gisaf/redis_tools.py index aa421e8..3b31427 100644 --- a/src/gisaf/redis_tools.py +++ b/src/gisaf/redis_tools.py @@ -1,4 +1,3 @@ -from typing import ClassVar from uuid import uuid1 from io import BytesIO from asyncio import create_task @@ -15,14 +14,14 @@ from asyncpg.exceptions import UndefinedTableError, InterfaceError from sqlalchemy import text from redis import asyncio as aioredis -from .config import conf +from gisaf.config import conf # from gisaf.models.live import LiveModel -from .utils import (SHAPELY_TYPE_TO_MAPBOX_TYPE, DEFAULT_MAPBOX_LAYOUT, +from gisaf.utils import (SHAPELY_TYPE_TO_MAPBOX_TYPE, DEFAULT_MAPBOX_LAYOUT, DEFAULT_MAPBOX_PAINT, gisTypeSymbolMap) -from .registry import registry +from gisaf.registry import registry #from .models.geom import GeomGroup, GeomModel -from .models.geo_models_base import LiveGeoModel -from .database import db_session +from gisaf.models.geo_models_base import LiveGeoModel +from gisaf.database import db_session logger = logging.getLogger(__name__) diff --git a/src/gisaf/registry.py b/src/gisaf/registry.py index 98d3042..fca7bbd 100644 --- a/src/gisaf/registry.py +++ b/src/gisaf/registry.py @@ -9,15 +9,16 @@ from importlib.metadata import entry_points from typing import Any, ClassVar from pydantic import create_model -from sqlalchemy import inspect, text +from pydantic_core import PydanticUndefined +from sqlalchemy import text from sqlalchemy.orm import selectinload from sqlmodel import SQLModel, select import pandas as pd -from .config import conf -from .models import (misc, category as category_module, +from gisaf.config import conf +from gisaf.models import (misc, category as category_module, project, reconcile, map_bases, tags) -from .models.geo_models_base import ( +from gisaf.models.geo_models_base import ( LiveGeoModel, PlottableModel, GeoModel, @@ -28,11 +29,11 @@ from .models.geo_models_base import ( GeoLineSurveyModel, GeoPolygonSurveyModel, ) -from .utils import ToMigrate -from .models.category import Category, CategoryGroup -from .database import db_session -from . import models -from .models.metadata import survey, raw_survey +from gisaf.utils import ToMigrate +from gisaf.models.category import Category, CategoryGroup +from gisaf.database import db_session +from gisaf import models +from gisaf.models.metadata import survey, raw_survey logger = logging.getLogger(__name__) @@ -71,6 +72,7 @@ class ModelRegistry: Provides tools to get the models from their names, table names, etc. """ stores: pd.DataFrame + categories: pd.DataFrame values: dict[str, PlottableModel] geom: dict[str, GeoModel] geom_live: dict[str, LiveGeoModel] @@ -97,7 +99,7 @@ class ModelRegistry: self.raw_survey_models = {} self.survey_models = {} - async def make_registry(self): + async def make_registry(self) -> None: """ Make (or refresh) the registry of models. :return: @@ -110,7 +112,7 @@ class ModelRegistry: ## Now that the models are refreshed, tells the ogcapi to (re)build #await app.extra['ogcapi'].build() - async def make_category_models(self): + async def make_category_models(self) -> None: """ Make geom models from the category model and update raw_survey_models and survey_models @@ -191,7 +193,7 @@ class ModelRegistry: logger.info('Discovered {:d} models'.format(len(categories))) - def scan(self): + def scan(self) -> None: """ Scan all models defined explicitely (not the survey ones, which are defined by categories), and store them for reference. @@ -228,7 +230,7 @@ class ModelRegistry: if hasattr(obj, '__module__') and hasattr(obj, '__tablename__'): self.misc[name] = obj - async def build(self): + async def build(self) -> None: """ Build the registry: organize all models in a common reference point. This should be executed after the discovery of surey models (categories) @@ -252,15 +254,24 @@ class ModelRegistry: other_tables = [model.__tablename__ for model in self.other.values()] self.data_tables = values_tables + other_tables + self.populate_values_for_model() + self.make_menu() - ## Build a dict for quick access to the values from a model - logger.warn(ToMigrate('get_geom_model_from_table_name, only used for values_for_model')) + def populate_values_for_model(self): + ''' + Build a dict for quick access to the values from a model + ''' + logger.warning(ToMigrate('populate_values_for_model')) + return self.values_for_model = {} for model_value in self.values.values(): - for constraint in inspect(model_value).foreign_key_constraints: - model = self.get_geom_model_from_table_name(constraint.referred_table.name) - self.values_for_model[model] = model_value - self.make_menu() + for field in model_value.model_fields.values(): + foreign_key = getattr(field, 'foreign_key', False) + if foreign_key and foreign_key is not PydanticUndefined: + breakpoint() + model = ... + self.values_for_model[model] = model_value + def scan_entry_points(self, name): """ @@ -275,7 +286,7 @@ class ModelRegistry: logger.warning(err) return named_objects - def add_model(self, model): + def add_model(self, model) -> str: """ Add the model :return: Model type (one of {'GeoModel', 'PlottableModel', 'Other model'}) @@ -293,7 +304,7 @@ class ModelRegistry: self.other[table_name] = model return 'Other model' - def add_store(self, store): + def add_store(self, store) -> None: self.geom_custom_store[store.name] = store def make_menu(self): @@ -347,16 +358,6 @@ class ModelRegistry: resp['externalRecordUrl'] = item.get_external_record_url() return resp - def get_geom_model_from_table_name(self, table_name): - """ - Utility func to get a geom model from a table name - :param table_name: str - :return: model or None - """ - for model in self.geom.values(): - if model.__tablename__ == table_name: - return model - def get_other_model_from_table_name(self, table_name): """ Utility func to get a non-geom model from a table name @@ -376,23 +377,21 @@ class ModelRegistry: Used in GraphQl queries. """ ## Utility functions used with apply method (dataframes) - def fill_columns_from_custom_models(row): + def fill_columns_from_custom_models(row) -> tuple[str, str, str]: return ( - ## FIXME: Like: 'AVESHTEquipment' - row.model.__namespace__['__qualname__'], ## Name of the class - hacky + row.model.__name__, row.model.description, - ## FIXME: Like: 'other_aves' - row.model.__table__.schema + row.model.metadata.schema ) - def fill_columns_from_custom_stores(row): + def fill_columns_from_custom_stores(row) -> tuple[str, str, None]: return ( row.model.description, row.model.description, None ## Schema ) - def get_store_name(category): + def get_store_name(category) -> str: fragments = ['V', category.group, category.minor_group_1] if category.minor_group_2 != '----': fragments.append(category.minor_group_2) diff --git a/src/gisaf/scheduler.py b/src/gisaf/scheduler.py index 46bc4e8..58a3d88 100755 --- a/src/gisaf/scheduler.py +++ b/src/gisaf/scheduler.py @@ -21,7 +21,7 @@ from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.date import DateTrigger -from .ipynb_tools import Gisaf +from gisaf.ipynb_tools import Gisaf formatter = logging.Formatter( "%(asctime)s:%(levelname)s:%(name)s:%(message)s", diff --git a/src/gisaf/scheduler_application.py b/src/gisaf/scheduler_application.py index d25c16b..6295d57 100755 --- a/src/gisaf/scheduler_application.py +++ b/src/gisaf/scheduler_application.py @@ -10,12 +10,12 @@ from starlette.routing import Mount from fastapi.middleware.cors import CORSMiddleware from apscheduler.schedulers.asyncio import AsyncIOScheduler -from .config import conf -from .ipynb_tools import gisaf -from .registry import registry -from .redis_tools import setup_redis, shutdown_redis -from .scheduler import GSFastAPI, js, startup, Settings -from .scheduler_web import app as sched_app +from gisaf.config import conf +from gisaf.ipynb_tools import gisaf +from gisaf.registry import registry +from gisaf.redis_tools import setup_redis, shutdown_redis +from gisaf.scheduler import GSFastAPI, js, startup, Settings +from gisaf.scheduler_web import app as sched_app formatter = logging.Formatter( diff --git a/src/gisaf/security.py b/src/gisaf/security.py index 5e952b8..eeacfc8 100644 --- a/src/gisaf/security.py +++ b/src/gisaf/security.py @@ -12,9 +12,9 @@ from jose import JWTError, jwt, ExpiredSignatureError from sqlalchemy import select from sqlalchemy.orm import selectinload -from .config import conf -from .database import db_session -from .models.authentication import User, UserRead +from gisaf.config import conf +from gisaf.database import db_session +from gisaf.models.authentication import User, UserRead logger = logging.getLogger(__name__) diff --git a/src/gisaf/utils.py b/src/gisaf/utils.py index b093fe3..edf65ec 100644 --- a/src/gisaf/utils.py +++ b/src/gisaf/utils.py @@ -15,7 +15,7 @@ from sqlalchemy.sql.expression import delete # from graphene import ObjectType -from .config import conf +from gisaf.config import conf class ToMigrate(Exception): pass