diff --git a/src/gisaf/_version.py b/src/gisaf/_version.py index a9c16b1..cf8e203 100644 --- a/src/gisaf/_version.py +++ b/src/gisaf/_version.py @@ -1 +1 @@ -__version__: str = '0.1.dev70+g53c2e35.d20240422' \ No newline at end of file +__version__: str = '0.1.dev74+gd3fa462.d20240430' \ No newline at end of file diff --git a/src/gisaf/ipynb_tools.py b/src/gisaf/ipynb_tools.py index 8efd0d6..358e29f 100644 --- a/src/gisaf/ipynb_tools.py +++ b/src/gisaf/ipynb_tools.py @@ -9,7 +9,7 @@ from urllib.error import URLError from datetime import datetime from io import BytesIO from pickle import dump, HIGHEST_PROTOCOL -# from aiohttp import ClientSession, MultipartWriter +from aiohttp import ClientSession, MultipartWriter import pandas as pd import geopandas as gpd @@ -18,13 +18,16 @@ from geoalchemy2 import WKTElement # from geoalchemy2.shape import from_shape from sqlalchemy import create_engine +from sqlmodel import select # from shapely import wkb from gisaf.config import conf +from gisaf.database import db_session from gisaf.redis_tools import store as redis_store from gisaf.live import live_server from gisaf.registry import registry +from gisaf.models.dashboard import Widget, DashboardPage, DashboardPageSection ## For base maps: contextily try: @@ -35,99 +38,64 @@ except ImportError: logger = logging.getLogger('Gisaf tools') -class Notebook: +async def remove_live_layer(channel): """ - Proof of concept? Gisaf could control notebook execution. + Remove the channel from Gisaf Live """ - def __init__(self, path: str): - self.path = path + async with ClientSession() as session: + async with session.get('{}://{}:{}/api/remove-live/{}'.format( + conf.gisaf_live.scheme, + conf.gisaf_live.hostname, + conf.gisaf_live.port, + channel + )) as resp: + return await resp.text() - -class Gisaf: +async def to_live_layer(gdf, channel, mapbox_paint=None, mapbox_layout=None, properties=None): """ - Gisaf tool for ipython/Jupyter notebooks + Send a geodataframe to a gisaf server with an HTTP POST request for live map display """ - def __init__(self): - # self.db = db - self.conf = conf - self.store = redis_store - self.live_server = live_server - if ctx: - ## Contextily newer version deprecated ctx.sources - self.basemaps = ctx.providers - else: - self.basemaps = None + with BytesIO() as buf: + dump(gdf, buf, protocol=HIGHEST_PROTOCOL) + buf.seek(0) - async def setup(self, with_mqtt=False): - await self.store.create_connections() - if with_mqtt: - logger.warning('Gisaf live_server does not support with_mqtt anymore: ignoring') - try: - await self.live_server.setup() - except Exception as err: - logger.warn(f'Cannot setup live_server: {err}') - logger.exception(err) - - async def make_models(self, **kwargs): - """ - Populate the model registry. - By default, all models will be added, including the those defined in categories (full registry). - Set with_categories=False to skip them and speed up the registry initialization. - :return: - """ - await registry.make_registry() - if 'with_categories' in kwargs: - logger.warning(f'{self.__class__}.make_models() does not support argument with_categories anymore') - self.registry = registry - ## TODO: Compatibility: mark "models" deprecated, replaced by "registry" - # self.models = registry - - def get_layer_list(self): - """ - Get a list of the names of all layers (ie. models with a geometry). - See get_all_geo for fetching data for a layer. - :return: list of strings - """ - return self.registry.geom.keys() - - async def get_query(self, query): - """ - Return a dataframe for the query - """ - async with query.bind.raw_pool.acquire() as conn: - compiled = query.compile() - columns = [a.name for a in compiled.statement.columns] - stmt = await conn.prepare(compiled.string) - data = await stmt.fetch(*[compiled.params.get(param) for param in compiled.positiontup]) - return pd.DataFrame(data, columns=columns) - - async def get_all(self, model, **kwargs): - """ - Return a dataframe with all records for the model - """ - return await self.get_query(model.query) - - async def set_dashboard(self, name, group, - notebook=None, - description=None, - html=None, - plot=None, - df=None, - attached=None, - expanded_panes=None, - sections=None): - """ - Add or update a dashboard page in Gisaf - :param name: name of the dashboard page - :param group: name of the group (level directory) - :param notebook: name of the notebook, to be registered for future use - :param description: - :param attached: a matplotlib/pyplot plot, etc - :param sections: a list of DashboardPageSection - :return: - """ - from gisaf.models.dashboard import DashboardPage, DashboardPageSection + async with ClientSession() as session: + with MultipartWriter('mixed') as mpwriter: + mpwriter.append(buf) + if mapbox_paint != None: + mpwriter.append_json(mapbox_paint, {'name': 'mapbox_paint'}) + if mapbox_layout != None: + mpwriter.append_json(mapbox_layout, {'name': 'mapbox_layout'}) + if properties != None: + mpwriter.append_json(properties, {'name': 'properties'}) + async with session.post('{}://{}:{}/api/live/{}'.format( + conf.gisaf_live.scheme, + conf.gisaf_live.hostname, + conf.gisaf_live.port, + channel, + ), data=mpwriter) as resp: + return await resp.text() +async def set_dashboard(name, group, + notebook=None, + description=None, + html=None, + plot=None, + df=None, + attached=None, + expanded_panes=None, + sections=None): + """ + Add or update a dashboard page in Gisaf + :param name: name of the dashboard page + :param group: name of the group (level directory) + :param notebook: name of the notebook, to be registered for future use + :param description: + :param attached: a matplotlib/pyplot plot, etc + :param sections: a list of DashboardPageSection + :return: + """ + async with db_session() as session: expanded_panes = expanded_panes or [] sections = sections or [] now = datetime.now() @@ -150,9 +118,12 @@ class Gisaf: else: plot_blob = None - page = await DashboardPage.query.where((DashboardPage.name==name) & (DashboardPage.group==group)).gino.first() - if not page: + request = select(DashboardPage).where((DashboardPage.name==name) & (DashboardPage.group==group)) + res = await session.exec(request) + page: DashboardPage | None = res.one_or_none() + if page is None: page = DashboardPage( + id=None, name=name, group=group, description=description, @@ -161,6 +132,7 @@ class Gisaf: df=df_blob, plot=plot_blob, html=html, + source_id=None, # TODO: DashoardPage source expanded_panes=','.join(expanded_panes) ) if attached: @@ -169,16 +141,14 @@ class Gisaf: else: if attached: page.attachment = page.save_attachment(attached) - await page.update( - description=description, - notebook=notebook, - html=html, - attachment=page.attachment, - time=now, - df=df_blob, - plot=plot_blob, - expanded_panes=','.join(expanded_panes) - ).apply() + page.description=description + page.notebook=notebook + page.html=html + page.attachment=page.attachment + page.time=now + page.df=df_blob + page.plot=plot_blob + page.expanded_panes=','.join(expanded_panes) for section in sections: #print(section) @@ -186,172 +156,222 @@ class Gisaf: ## Replace section.plot (matplotlib plot or figure) ## by the name of the rendered pic inthe filesystem section.plot = section.save_plot(section.plot) - section_record = await DashboardPageSection.query.where( + query = select(DashboardPageSection).where( (DashboardPageSection.dashboard_page_id==page.id) & (DashboardPageSection.name==section.name) - ).gino.first() - if not section_record: + ) + res = await session.exec(query) + section_record = res.one_or_none() + if section_record is None: section.dashboard_page_id = page.id - await section.create() + section.add(section) else: logger.warn('TODO: set_dashboard section update') logger.warn('TODO: set_dashboard section remove') + await session.commit() - async def set_widget(self, name, title, subtitle, content, notebook=None): - """ - Create a web widget, that is served by /embed/. - """ - from gisaf.models.dashboard import Widget - now = datetime.now() - widget = await Widget.query.where(Widget.name==name).gino.first() +async def set_widget(name, title, subtitle, content, notebook=None): + """ + Create a web widget, that is served by /embed/. + """ + now = datetime.now() + async with db_session() as session: + query = select(Widget).where(Widget.name==name) + res = await session.exec(query) + widget = res.one_or_none() kwargs = dict( - title=title, - subtitle=subtitle, - content=content, - notebook=notebook, - time=now, ) - if widget: - await widget.update(**kwargs).apply() + if widget is None: + widget = Widget( + name=name, + title=title, + subtitle=subtitle, + content=content, + notebook=notebook, + time=now + ) else: - await Widget(name=name, **kwargs).create() + widget.title=title + widget.subtitle=subtitle + widget.content=content + widget.notebook=notebook + widget.time=now + await session.commit() - async def to_live_layer(self, gdf, channel, mapbox_paint=None, mapbox_layout=None, properties=None): - """ - Send a geodataframe to a gisaf server with an HTTP POST request for live map display - """ - with BytesIO() as buf: - dump(gdf, buf, protocol=HIGHEST_PROTOCOL) - buf.seek(0) - async with ClientSession() as session: - with MultipartWriter('mixed') as mpwriter: - mpwriter.append(buf) - if mapbox_paint != None: - mpwriter.append_json(mapbox_paint, {'name': 'mapbox_paint'}) - if mapbox_layout != None: - mpwriter.append_json(mapbox_layout, {'name': 'mapbox_layout'}) - if properties != None: - mpwriter.append_json(properties, {'name': 'properties'}) - async with session.post('{}://{}:{}/api/live/{}'.format( - self.conf.gisaf_live['scheme'], - self.conf.gisaf_live['hostname'], - self.conf.gisaf_live['port'], - channel, - ), data=mpwriter) as resp: - return await resp.text() - async def remove_live_layer(self, channel): - """ - Remove the channel from Gisaf Live - """ - async with ClientSession() as session: - async with session.get('{}://{}:{}/api/remove-live/{}'.format( - self.conf.gisaf_live['scheme'], - self.conf.gisaf_live['hostname'], - self.conf.gisaf_live['port'], - channel - )) as resp: - return await resp.text() +## Below: old stuf, to delete - def to_layer(self, gdf: gpd.GeoDataFrame, model, project_id=None, - skip_columns=None, replace_all=True, - chunksize=100): - """ - Save the geodataframe gdf to the Gisaf model, using pandas' to_sql dataframes' method. - Note that it's NOT an async call. Explanations: - * to_sql doesn't seems to work with gino/asyncpg - * using Gisaf models is few magnitude orders slower - (the async code using this technique is left commented out, for reference) - """ - if skip_columns == None: - skip_columns = [] +# def to_layer(self, gdf: gpd.GeoDataFrame, model, project_id=None, +# skip_columns=None, replace_all=True, +# chunksize=100): +# """ +# Save the geodataframe gdf to the Gisaf model, using pandas' to_sql dataframes' method. +# Note that it's NOT an async call. Explanations: +# * to_sql doesn't seems to work with gino/asyncpg +# * using Gisaf models is few magnitude orders slower +# (the async code using this technique is left commented out, for reference) +# """ +# if skip_columns == None: +# skip_columns = [] - ## Filter empty geometries, and reproject - _gdf: gpd.GeoDataFrame = gdf[~gdf.geometry.is_empty].to_crs(self.conf.crs['geojson']) +# ## Filter empty geometries, and reproject +# _gdf: gpd.GeoDataFrame = gdf[~gdf.geometry.is_empty].to_crs(conf.crs.geojson) - ## Remove the empty geometries - _gdf.dropna(inplace=True, subset=['geometry']) - #_gdf['geom'] = _gdf.geom1.apply(lambda geom: from_shape(geom, srid=self.conf.srid)) +# ## Remove the empty geometries +# _gdf.dropna(inplace=True, subset=['geometry']) +# #_gdf['geom'] = _gdf.geom1.apply(lambda geom: from_shape(geom, srid=conf.geo.srid)) - for col in skip_columns: - if col in _gdf.columns: - _gdf.drop(columns=[col], inplace=True) +# for col in skip_columns: +# if col in _gdf.columns: +# _gdf.drop(columns=[col], inplace=True) - _gdf['geom'] = _gdf['geometry'].apply(lambda geom: WKTElement(geom.wkt, srid=self.conf.srid)) - _gdf.drop(columns=['geometry'], inplace=True) +# _gdf['geom'] = _gdf['geometry'].apply(lambda geom: WKTElement(geom.wkt, srid=conf.geo.srid)) +# _gdf.drop(columns=['geometry'], inplace=True) - engine = create_engine(self.conf.db['uri'], echo=False) +# engine = create_engine(conf.db.get_sqla_url(), echo=False) - ## Drop existing - if replace_all: - engine.execute('DELETE FROM "{}"'.format(model.__table__.fullname)) - else: - raise NotImplementedError('ipynb_tools.Gisaf.to_layer does not support updates yet') +# ## Drop existing +# if replace_all: +# engine.execute('DELETE FROM "{}"'.format(model.__table__.fullname)) +# else: +# raise NotImplementedError('ipynb_tools.Gisaf.to_layer does not support updates yet') - ## See https://stackoverflow.com/questions/38361336/write-geodataframe-into-sql-database - # Use 'dtype' to specify column's type - _gdf.to_sql( - name=model.__tablename__, - con=engine, - schema=model.__table_args__['schema'], - if_exists='append', - index=False, - dtype={ - 'geom': model.geom.type, - }, - method='multi', - chunksize=chunksize, - ) +# ## See https://stackoverflow.com/questions/38361336/write-geodataframe-into-sql-database +# # Use 'dtype' to specify column's type +# _gdf.to_sql( +# name=model.__tablename__, +# con=engine, +# schema=model.__table_args__['schema'], +# if_exists='append', +# index=False, +# dtype={ +# 'geom': model.geom.type, +# }, +# method='multi', +# chunksize=chunksize, +# ) - #async with self.db.transaction() as tx: - # if replace_all: - # await model.delete.gino.status() - # else: - # raise NotImplementedError('ipynb_tools.Gisaf.to_layer does not support updates yet') - # if not skip_columns: - # skip_columns = ['x', 'y', 'z', 'coords'] + #async with self.db.transaction() as tx: + # if replace_all: + # await model.delete.gino.status() + # else: + # raise NotImplementedError('ipynb_tools.Gisaf.to_layer does not support updates yet') + # if not skip_columns: + # skip_columns = ['x', 'y', 'z', 'coords'] - # ## Reproject - # ggdf = gdf.to_crs(self.conf.crs['geojson']) + # ## Reproject + # ggdf = gdf.to_crs(self.conf.crs['geojson']) - # ## Remove the empty geometries - # ggdf.dropna(inplace=True) - # #ggdf['geom'] = ggdf.geom1.apply(lambda geom: from_shape(geom, srid=self.conf.srid)) + # ## Remove the empty geometries + # ggdf.dropna(inplace=True) + # #ggdf['geom'] = ggdf.geom1.apply(lambda geom: from_shape(geom, srid=self.conf.srid)) - # for col in skip_columns: - # if col in ggdf.columns: - # ggdf.drop(columns=[col], inplace=True) + # for col in skip_columns: + # if col in ggdf.columns: + # ggdf.drop(columns=[col], inplace=True) - # #ggdf.set_geometry('geom', inplace=True) + # #ggdf.set_geometry('geom', inplace=True) - # if project_id: - # ggdf['project_id'] = project_id - # ## XXX: index? - # gdf_dict = ggdf.to_dict(orient='records') + # if project_id: + # ggdf['project_id'] = project_id + # ## XXX: index? + # gdf_dict = ggdf.to_dict(orient='records') - # gdf_dict_2 = [] - # for row in gdf_dict: - # geometry = row.pop('geometry') - # if not geometry.is_empty: - # row['geom'] = str(from_shape(geometry, srid=self.conf.srid)) - # gdf_dict_2.append(row) + # gdf_dict_2 = [] + # for row in gdf_dict: + # geometry = row.pop('geometry') + # if not geometry.is_empty: + # row['geom'] = str(from_shape(geometry, srid=self.conf.srid)) + # gdf_dict_2.append(row) - # result = await model.insert().gino.all(*gdf_dict_2) + # result = await model.insert().gino.all(*gdf_dict_2) - # return + # return - # for row in gdf_dict: - # if 'id' in row: - # ## TODO: Existing id: can use merge - # ex_item = await model.get(item['id']) - # await ex_item.update(**row) - # else: - # geometry = row.pop('geometry') - # if not geometry.is_empty: - # feature = model(**row) - # feature.geom = from_shape(geometry, srid=self.conf.srid) - # await feature.create() - # #db.session.commit() + # for row in gdf_dict: + # if 'id' in row: + # ## TODO: Existing id: can use merge + # ex_item = await model.get(item['id']) + # await ex_item.update(**row) + # else: + # geometry = row.pop('geometry') + # if not geometry.is_empty: + # feature = model(**row) + # feature.geom = from_shape(geometry, srid=self.conf.srid) + # await feature.create() + # #db.session.commit() -gisaf = Gisaf() \ No newline at end of file +# class Notebook: +# """ +# Proof of concept? Gisaf could control notebook execution. +# """ +# def __init__(self, path: str): +# self.path = path + + +# class Gisaf: +# """ +# Gisaf tool for ipython/Jupyter notebooks +# """ +# def __init__(self): +# # self.db = db +# self.conf = conf +# self.store = redis_store +# self.live_server = live_server +# if ctx: +# ## Contextily newer version deprecated ctx.sources +# self.basemaps = ctx.providers +# else: +# self.basemaps = None + +# async def setup(self, with_mqtt=False): +# await self.store.create_connections() +# if with_mqtt: +# logger.warning('Gisaf live_server does not support with_mqtt anymore: ignoring') +# try: +# await self.live_server.setup() +# except Exception as err: +# logger.warn(f'Cannot setup live_server: {err}') +# logger.exception(err) + +# async def make_models(self, **kwargs): +# """ +# Populate the model registry. +# By default, all models will be added, including the those defined in categories (full registry). +# Set with_categories=False to skip them and speed up the registry initialization. +# :return: +# """ +# await registry.make_registry() +# if 'with_categories' in kwargs: +# logger.warning(f'{self.__class__}.make_models() does not support argument with_categories anymore') +# self.registry = registry +# ## TODO: Compatibility: mark "models" deprecated, replaced by "registry" +# # self.models = registry + +# def get_layer_list(self): +# """ +# Get a list of the names of all layers (ie. models with a geometry). +# See get_all_geo for fetching data for a layer. +# :return: list of strings +# """ +# return self.registry.geom.keys() + +# async def get_query(self, query): +# """ +# Return a dataframe for the query +# """ +# async with query.bind.raw_pool.acquire() as conn: +# compiled = query.compile() +# columns = [a.name for a in compiled.statement.columns] +# stmt = await conn.prepare(compiled.string) +# data = await stmt.fetch(*[compiled.params.get(param) for param in compiled.positiontup]) +# return pd.DataFrame(data, columns=columns) + +# async def get_all(self, model, **kwargs): +# """ +# Return a dataframe with all records for the model +# """ +# return await self.get_query(model.query) + +# gisaf = Gisaf() \ No newline at end of file diff --git a/src/gisaf/models/dashboard.py b/src/gisaf/models/dashboard.py index 5a3f350..4e6bc88 100644 --- a/src/gisaf/models/dashboard.py +++ b/src/gisaf/models/dashboard.py @@ -31,16 +31,16 @@ class DashboardPageSource(Model, table=True): name: str -class DashboardPageCommon: +class DashboardPageCommon(Model): """ Base class for DashboardPage and DashboardPageSection, where some methods are common, eg. attachments """ name: str - df: bytes - plot: bytes + df: bytes | None = None + plot: bytes | None = None #plot: dict[str, Any] | None = Field(sa_type=JSON(none_as_null=True)) # type: ignore - attachment: str | None + attachment: str | None = None html: str | None = None def ensure_dir_exists(self): @@ -139,7 +139,7 @@ class DashboardPageMetaData(BaseModel): viewable_role: str | None = None -class DashboardPage(Model, DashboardPageCommon, DashboardPageMetaData, table=True): +class DashboardPage(DashboardPageCommon, DashboardPageMetaData, table=True): __tablename__ = 'dashboard_page' # type: ignore __table_args__ = gisaf.table_args @@ -202,7 +202,7 @@ class DashboardPage(Model, DashboardPageCommon, DashboardPageMetaData, table=Tru logger.debug('Notebook: no base_url in gisaf config') -class DashboardPageSection(Model, DashboardPageCommon, table=True): +class DashboardPageSection(DashboardPageCommon, table=True): __tablename__ = 'dashboard_page_section' # type: ignore __table_args__ = gisaf.table_args @@ -280,7 +280,7 @@ class Widget(Model, table=True): subtitle: str content: str time: datetime - notebook: str + notebook: str | None = None class Admin: menu = 'Dashboard' diff --git a/src/gisaf/models/project.py b/src/gisaf/models/project.py index a2673af..1210596 100644 --- a/src/gisaf/models/project.py +++ b/src/gisaf/models/project.py @@ -179,7 +179,6 @@ class Project(Model, table=True): return result - # def download_raw_survey_data(self, session=None): # from gisaf.models.raw_survey_models import RawSurvey # from gisaf.registry import registry diff --git a/src/gisaf/models/raw_survey.py b/src/gisaf/models/raw_survey.py index 9e167f0..c794d88 100644 --- a/src/gisaf/models/raw_survey.py +++ b/src/gisaf/models/raw_survey.py @@ -1,24 +1,31 @@ -from typing import ClassVar -from sqlmodel import Field, BigInteger +from typing import Annotated, ClassVar +from geoalchemy2 import Geometry, WKBElement +from sqlmodel import Field, BigInteger, Relationship +from gisaf.config import conf from gisaf.models.models_base import Model from gisaf.models.geo_models_base import GeoPointMModel, BaseSurveyModel from gisaf.models.project import Project from gisaf.models.category import Category -from gisaf.models.metadata import gisaf_survey +from gisaf.models.metadata import gisaf_survey, gisaf_admin -class RawSurveyModel(BaseSurveyModel, GeoPointMModel): +class RawSurveyModel(BaseSurveyModel, GeoPointMModel, table=True): __table_args__ = gisaf_survey.table_args __tablename__ = 'raw_survey' hidden: ClassVar[bool] = True + geom: Annotated[str, WKBElement] = Field( + sa_type=Geometry('POINTZ', dimension=3, srid=conf.geo.raw_survey.srid)) id: int | None = Field(default=None, primary_key=True) - project_id: int | None = Field(foreign_key='project.id') - category: str = Field(foreign_key='category.name') - in_menu: bool = False - # Subclasses must include: - # project: Project = Relationship() - # category_info: Project = Relationship() + project_id: int | None = Field(foreign_key=gisaf_admin.table('project.id')) + category: str = Field(foreign_key=gisaf_survey.table('category.name')) + #in_menu: bool = False + project: Project = Relationship() + category_info: Category = Relationship() + + ## XXX: Unused - calls to get_gdf have to provide this + ## if the CRS is not standard, maybe due to an update of shapely? + _crs = conf.geo.raw_survey.spatial_sys_ref @classmethod def selectinload(cls): diff --git a/src/gisaf/redis_tools.py b/src/gisaf/redis_tools.py index 8725f38..d9be0ac 100644 --- a/src/gisaf/redis_tools.py +++ b/src/gisaf/redis_tools.py @@ -243,7 +243,9 @@ class Store: await self.redis.set(self.get_layer_def_channel(store_name), layer_def_data) ## Update the layers/stores registry - await self.get_live_layer_defs() + ## XXX: Commentinhg out the update of live layers: + ## This should be triggerred from a redis listener + #await self.get_live_layer_defs() return geojson