Fix/update baskets:

- get_file
- import
- Basket importers must return BasketImportResult
Add API points
Fix utils delete_df and upsert_df, also making them async friendly
Auth: add helper functions to UserRead
This commit is contained in:
phil 2024-04-09 16:16:04 +05:30
parent 52e1d2135b
commit d2ae5e4d7b
9 changed files with 323 additions and 182 deletions

41
pdm.lock generated
View file

@ -5,7 +5,20 @@
groups = ["default", "dev", "mqtt"]
strategy = ["cross_platform"]
lock_version = "4.4.1"
content_hash = "sha256:0da68c7fed8db7a12e36002b8d6194c1651f9653fc7fbf7797c553774c9dbf32"
content_hash = "sha256:581ae31055bb26abe5b7bf7cab172337258913d8960892dbd206e00421b309b7"
[[package]]
name = "aiofile"
version = "3.8.8"
requires_python = ">=3.7, <4"
summary = "Asynchronous file operations."
dependencies = [
"caio~=0.9.0",
]
files = [
{file = "aiofile-3.8.8-py3-none-any.whl", hash = "sha256:41e8845cce055779cd77713d949a339deb012eab605b857765e8f8e52a5ed811"},
{file = "aiofile-3.8.8.tar.gz", hash = "sha256:41f3dc40bd730459d58610476e82e5efb2f84ae6e9fa088a9545385d838b8a43"},
]
[[package]]
name = "aiomqtt"
@ -20,6 +33,20 @@ files = [
{file = "aiomqtt-2.0.1.tar.gz", hash = "sha256:60f6451c8ab7235cfb392b1b0cab398e9bc6040f4b140628c0615371abcde15f"},
]
[[package]]
name = "aiopath"
version = "0.6.11"
requires_python = ">=3.10"
summary = "📁 Async pathlib for Python"
dependencies = [
"aiofile<4,>=3.5.0",
"anyio<4,>=3.2.0",
]
files = [
{file = "aiopath-0.6.11-py2.py3-none-any.whl", hash = "sha256:7b1f1aa3acb422050908ac3c4755b5e43f625111be003f1bfc7dc2193027c45d"},
{file = "aiopath-0.6.11.tar.gz", hash = "sha256:2f0d4d9195281612c6508cbfa12ac3184c31540d13b9e6215a325897da59decd"},
]
[[package]]
name = "aiosqlite"
version = "0.20.0"
@ -175,6 +202,18 @@ files = [
{file = "bcrypt-4.0.1.tar.gz", hash = "sha256:27d375903ac8261cfe4047f6709d16f7d18d39b1ec92aaf72af989552a650ebd"},
]
[[package]]
name = "caio"
version = "0.9.13"
requires_python = ">=3.7, <4"
summary = "Asynchronous file IO for Linux MacOS or Windows."
files = [
{file = "caio-0.9.13-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:789f8b55f4a2b46be14361df3ac8d14b6c8f0a3730badd70cb1b7778fcdc7039"},
{file = "caio-0.9.13-cp311-cp311-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a914684bad2a757cf013ae88d785d81659a3add1885bad60cd20bfbd3068bd5a"},
{file = "caio-0.9.13-py3-none-any.whl", hash = "sha256:582cbfc6e203d1dedf662ba972a94db6e744fe0b6bb9e02922b0f86803006fc9"},
{file = "caio-0.9.13.tar.gz", hash = "sha256:26f1e08a442bef4526a66142ea4e325e22dca8f040800aecb3caf8fae0589e98"},
]
[[package]]
name = "certifi"
version = "2023.7.22"

View file

@ -18,7 +18,7 @@ dependencies = [
"pydantic-settings>=2.0.3",
"pyshp>=2.3.1",
"python-jose[cryptography]>=3.3.0",
"python-multipart>=0.0.6",
"python-multipart>=0.0.9",
"pyyaml>=6.0.1",
"redis>=5.0.1",
"sqlalchemy[asyncio]>=2.0.23",
@ -29,6 +29,7 @@ dependencies = [
"psycopg>=3.1.18",
"plotly>=5.20.0",
"matplotlib>=3.8.3",
"aiopath>=0.6.11",
]
requires-python = ">=3.11,<4"
readme = "README.md"

View file

@ -1 +1 @@
__version__: str = '2023.4.dev62+g08c53cf.d20240405'
__version__: str = '2023.4.dev63+g52e1d21.d20240408'

View file

@ -1,9 +1,11 @@
import logging
from fastapi import Depends, APIRouter, HTTPException, status, responses
from fastapi import (Depends, APIRouter, HTTPException, status, responses,
UploadFile)
from fastapi.responses import FileResponse
from gisaf.models.admin import AdminBasket, BasketNameOnly
from gisaf.models.authentication import User
from gisaf.models.admin import AdminBasket, BasketImportResult, BasketNameOnly
from gisaf.models.authentication import User, UserRead
from gisaf.security import get_current_active_user
from gisaf.admin import manager
@ -40,3 +42,55 @@ async def get_basket(
## TODO: Fix projects
# projects=getattr(basket, 'projects', None)
)
@api.post('/basket/upload/{name}')
async def upload_basket_file(
name: str,
file: UploadFile,
user: UserRead = Depends(get_current_active_user),
):
try:
basket = manager.baskets[name]
except KeyError:
raise HTTPException(status.HTTP_404_NOT_FOUND, f'No basket named {name}')
fileItem = await basket.add_files(file, user)
return fileItem
@api.get('/basket/download/{name}/{file_id}/{file_name}')
async def download_basket_file(
name: str,
file_id: int,
file_name: str,
user: User = Depends(get_current_active_user),
) -> FileResponse:
try:
basket = manager.baskets[name]
except KeyError:
raise HTTPException(status.HTTP_404_NOT_FOUND, f'No basket named {name}')
if basket.role:
if not user.has_role(basket.role):
raise HTTPException(status.HTTP_401_UNAUTHORIZED)
file_record = await basket.get_file(file_id)
if file_record is None:
raise HTTPException(status.HTTP_404_NOT_FOUND, f'No import file id {file_id}')
return FileResponse(file_record.path)
@api.get('/basket/import/{basket}/{file_id}')
async def import_basket_file(
basket: str,
file_id: int,
dryRun: bool = False,
user: User = Depends(get_current_active_user),
) -> BasketImportResult:
if not (user and user.has_role('reviewer')):
raise HTTPException(status.HTTP_401_UNAUTHORIZED)
basket_ = manager.baskets[basket]
file_import = await basket_.get_file(file_id)
if file_import is None:
raise HTTPException(status.HTTP_404_NOT_FOUND, f'No import file id {file_id}')
try:
result = await basket_.import_file(file_import, dryRun)
## FIXME: shouldn't it be AdminImportError?
except ImportError as err:
raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, err)
return result

View file

@ -1,4 +1,5 @@
from pathlib import Path
from aiopath import AsyncPath
from collections import defaultdict
from json import loads
from datetime import datetime
@ -10,13 +11,15 @@ from typing import ClassVar
# from aiohttp.web import HTTPUnauthorized, HTTPForbidden
from sqlmodel import select
from sqlalchemy.orm import joinedload, QueryableAttribute
from sqlalchemy.exc import NoResultFound
from fastapi import UploadFile
from gisaf.config import conf
from gisaf.utils import ToMigrate
from gisaf.database import db_session
from gisaf.importers import RawSurveyImporter, GeoDataImporter, LineWorkImporter, ImportError
from gisaf.models.admin import FileImport, AdminBasketFile, BasketImportResult
from gisaf.models.authentication import User
from gisaf.models.authentication import UserRead
from gisaf.models.survey import Surveyor, Equipment
from gisaf.models.project import Project
@ -48,7 +51,7 @@ class Basket:
self.importer = self.importer_class()
self.importer.basket = self
async def allowed_for(self, user: User):
async def allowed_for(self, user: UserRead):
"""
Return False if the basket is protected by a role
Request: aiohttp.Request instance
@ -101,23 +104,37 @@ class Basket:
# else:
# return df
async def get_file(self, id):
df = await FileImport.get_df(
where=FileImport.id==id,
with_related=True,
)
df.rename(columns={
'gisaf_admin_project_name': 'project',
'gisaf_survey_surveyor_name': 'surveyor',
'gisaf_survey_equipment_name': 'equipment',
}, inplace=True)
df['dir'] = df.dir.fillna('.')
## Replace path with Path from pathlib
df['path'] = df.apply(lambda fi: self.base_dir/fi['dir']/fi['name'], axis=1)
file = df.iloc[0]
## Override file.name, which otherwise is the index of the item (hack)
file.name = file['name']
return file
async def get_file(self, file_id: int) -> FileImport | None:
async with db_session() as session:
query = select(FileImport).where(FileImport.id==file_id).options(
joinedload(FileImport.project),
joinedload(FileImport.surveyor),
joinedload(FileImport.equipment),
)
res = await session.exec(query)
try:
file = res.one()
except NoResultFound:
return None
else:
return file
# df = await FileImport.get_df(
# where=FileImport.id==id,
# with_related=True,
# )
# df.rename(columns={
# 'gisaf_admin_project_name': 'project',
# 'gisaf_survey_surveyor_name': 'surveyor',
# 'gisaf_survey_equipment_name': 'equipment',
# }, inplace=True)
# df['dir'] = df.dir.fillna('.')
# ## Replace path with Path from pathlib
# df['path'] = df.apply(lambda fi: self.base_dir/fi['dir']/fi['name'], axis=1)
# file = df.iloc[0]
# ## Override file.name, which otherwise is the index of the item (hack)
# file.name = file['name']
# return file
async def delete_file(self, id):
file = await FileImport.get(id)
@ -129,7 +146,7 @@ class Basket:
path.unlink()
await file.delete()
async def import_file(self, file_import, dry_run=True, return_data_info=False, **kwargs):
async def import_file(self, file_import: FileImport, dry_run=True, **kwargs) -> BasketImportResult:
"""
Import the file by calling the basket's importer's do_import.
Time stamp the FileImport.
@ -139,44 +156,42 @@ class Basket:
return BasketImportResult(
message=f'No import defined/required for {self.name} basket'
)
result: BasketImportResult
try:
import_result = await self.importer.do_import(file_import, dry_run=dry_run, **kwargs)
result = await self.importer.do_import(file_import, dry_run=dry_run, **kwargs)
except ImportError as err:
raise
raise err
except Exception as err:
logger.exception(err)
raise ImportError(f'Unexpected import error: {err}')
raise ImportError(f'Unexpected import error (details in the Gisaf logs): {err}')
if isinstance(import_result, BasketImportResult):
result = import_result
else:
if import_result:
if isinstance(import_result, (tuple, list)):
assert len(import_result) >= 2, \
'do_import should return message or (message, details)'
result = BasketImportResult(
message=import_result[0],
details=import_result[1],
)
if len(import_result) > 2:
data = import_result[2]
else:
result = BasketImportResult(message=import_result)
else:
result = BasketImportResult(message='Import successful.')
if not isinstance(result, BasketImportResult):
raise ImportError('Import error: the importer did not return a BasketImportResult')
# if import_result:
# if isinstance(import_result, (tuple, list)):
# assert len(import_result) >= 2, \
# 'do_import should return message or (message, details)'
# result = BasketImportResult(
# message=import_result[0],
# details=import_result[1],
# )
# if len(import_result) > 2:
# data = import_result[2]
# else:
# result = BasketImportResult(message=import_result)
# else:
# result = BasketImportResult(message='Import successful.')
if dry_run:
result.time = file_import.time
else:
if not result.time:
result.time = datetime.now()
if not dry_run:
## Save time stamp
await (await FileImport.get(file_import.id)).update(time=result.time).apply()
if return_data_info:
return result, data
else:
return result
async with db_session() as session:
file_import.time = result.time
session.add(file_import)
await session.commit()
return result
async def add_files(self, reader, request):
async def add_files(self, file: UploadFile, user: UserRead):
"""
File upload to basket.
Typically called through an http POST view handler.
@ -184,37 +199,26 @@ class Basket:
Note that the return dict has eventually numpy types and needs NumpyEncoder
to be json.dump'ed.
"""
raise ToMigrate("basket add_files reader was aiohttp's MultipartReader")
## TODO: multiple items
## TODO: check if file already exists
## First part is the file
part = await reader.next()
assert part.name == 'file'
file_name = part.filename
# assert part.name == 'file'
## Save file on filesystem
size = 0
path = Path(self.base_dir) / file_name
path = AsyncPath(self.base_dir) / file.filename
## Eventually create the directory
path.parent.mkdir(parents=True, exist_ok=True)
with path.open('wb') as f:
while True:
chunk = await part.read_chunk() # 8192 bytes by default.
if not chunk:
break
size += len(chunk)
f.write(chunk)
await path.parent.mkdir(parents=True, exist_ok=True)
async with path.open('wb') as f:
## No way to use async to stream the file content to write it?
await f.write(await file.read())
## Read other parts
parts = defaultdict(None)
while True:
part = await reader.next()
if not part:
break
value = (await part.read()).decode()
if value != 'null':
parts[part.name] = value
# parts = defaultdict(None)
# while True:
# part = await reader.next()
# if not part:
# break
# value = (await part.read()).decode()
# if value != 'null':
# parts[part.name] = value
## Find ids of project, surveyor, equipment
if 'project' in parts:
@ -238,7 +242,7 @@ class Basket:
else:
store_type_name = None
fileImportRecord = await FileImport(
name=file_name,
name=file.file_name,
dir=parts.get('dir', '.'),
basket=self.name,
project_id=project_id,
@ -251,7 +255,7 @@ class Basket:
admin_basket_file = AdminBasketFile(
id=fileImportRecord.id,
name=file_name,
name=file.file_name,
status=parts.get('status'),
store=store_type_name,
project=parts.get('project'),

View file

@ -15,8 +15,7 @@ from sqlalchemy.sql.schema import Column
from gisaf.config import conf
#from .models.admin import FileImport
from gisaf.models.admin import FeatureImportData
# from gisaf.models.graphql import BasketImportResult
from gisaf.models.admin import FeatureImportData, BasketImportResult
from gisaf.models.raw_survey import RawSurveyModel
from gisaf.models.survey import Surveyor, Accuracy, Equipment, AccuracyEquimentSurveyorMapping
from gisaf.models.tags import Tags
@ -42,7 +41,7 @@ class Importer:
"""
basket = None
async def do_import(self, file_record, dry_run=False, **kwargs):
async def do_import(self, file_record, dry_run=False, **kwargs) -> BasketImportResult:
"""
Return: a BasketImportResult instance, or a message string,
or a tuple or a list like (message, details for user feedback).

View file

@ -157,9 +157,9 @@ class Basket(BasketNameOnly):
class BasketImportResult(BaseModel):
time: datetime
time: datetime = Field(default_factory=datetime.now)
message: str
details: str
details: dict[str, str | int | float | bool] | None = None
class AdminBasketFile(BaseModel):
id: int

View file

@ -86,6 +86,16 @@ class UserRead(UserBase):
email: str | None # type: ignore
roles: list[RoleReadNoUsers] = []
def can_view(self, model) -> bool:
role = getattr(model, 'viewable_role', None)
if role:
return self.has_role(role)
else:
return True
def has_role(self, role: str) -> bool:
return role in (role.name for role in self.roles)
# class ACL(BaseModel):
# user_id: int

View file

@ -11,11 +11,10 @@ from numpy import ndarray
import pandas as pd
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.sql.expression import delete
# from graphene import ObjectType
from sqlmodel import SQLModel, delete
from gisaf.config import conf
from gisaf.database import db_session
class ToMigrate(Exception):
pass
@ -213,107 +212,142 @@ def atimeit(func):
return helper
async def delete_df(df, model):
async def delete_df(df: pd.DataFrame, model: SQLModel):
"""
Delete all data in the model's table in the database
that matches data in the pandas dataframe.
"""
table = model.__table__
if len(df) == 0:
return
ids = df.reset_index()['id'].values
delete_stmt = delete(table).where(model.id.in_(ids))
async with db.bind.raw_pool.acquire() as conn:
async with conn.transaction():
await conn.execute(str(delete_stmt), *ids)
statement = delete(model).where(model.id.in_(ids))
async with db_session() as session:
await session.exec(statement)
await session.commit()
async def upsert_df(df, model):
"""
Insert or update all data in the model's table in the database
that's present in the pandas dataframe.
Use postgres insert ... on conflict update...
with a series of inserts with with one row at a time.
For GeoDataFrame: the "geometry" column (df._geometry_column_name) is not honnored
(yet). It's the caller's responsibility to have a proper column name
(typically "geom" in Gisaf models) with a EWKT or EWKB representation of the geometry.
"""
## See: https://stackoverflow.com/questions/33307250/postgresql-on-conflict-in-sqlalchemy
# async def upsert_df(df, model):
# """
# Insert or update all data in the model's table in the database
# that's present in the pandas dataframe.
# Use postgres insert ... on conflict update...
# with a series of inserts with with one row at a time.
# For GeoDataFrame: the "geometry" column (df._geometry_column_name) is not honnored
# (yet). It's the caller's responsibility to have a proper column name
# (typically "geom" in Gisaf models) with a EWKT or EWKB representation of the geometry.
# """
# ## See: https://stackoverflow.com/questions/33307250/postgresql-on-conflict-in-sqlalchemy
# if len(df) == 0:
# return df
# table = model.__table__
# ## Generate the 'upsert' statement, using fake values but defining columns
# columns = {c.name for c in table.columns}
# values = {col: None for col in df.columns if col in columns}
# insrt_stmnt = insert(table).inline().values(values).returning(table.primary_key.columns)
# df_columns = set(df.columns)
# do_update_stmt = insrt_stmnt.on_conflict_do_update(
# constraint=table.primary_key,
# set_={
# k.name: getattr(insrt_stmnt.excluded, k.name)
# for k in insrt_stmnt.excluded
# if k.name in df_columns and
# k.name not in [c.name for c in table.primary_key.columns]
# }
# )
# ## Filter and reorder the df columns
# ## in order to match the order of columns in the insert statement
# df = df[[col for col in do_update_stmt.compile().positiontup
# if col in df_columns]].copy()
# def convert_to_object(value):
# """
# Quick (but slow) and dirty: clean up values (nan, nat) for inserting to postgres via asyncpg
# """
# if isinstance(value, float) and isnan(value):
# return None
# elif pd.isna(value):
# return None
# else:
# return value
# # def encode_geometry(geometry):
# # if not hasattr(geometry, '__geo_interface__'):
# # raise TypeError('{g} does not conform to '
# # 'the geo interface'.format(g=geometry))
# # shape = shapely.geometry.asShape(geometry)
# # return shapely.wkb.dumps(shape)
# # def decode_geometry(wkb):
# # return shapely.wkb.loads(wkb)
# ## pks: list of dicts of primary keys
# pks = {pk.name: [] for pk in table.primary_key.columns}
# async with db.bind.raw_pool.acquire() as conn:
# ## Set standard encoder for HSTORE, geometry
# await conn.set_builtin_type_codec('hstore', codec_name='pg_contrib.hstore')
# #await conn.set_type_codec(
# # 'geometry', # also works for 'geography'
# # encoder=encode_geometry,
# # decoder=decode_geometry,
# # format='binary',
# #)
# #await conn.set_type_codec(
# # 'json',
# # encoder=json.dumps,
# # decoder=json.loads,
# # schema='pg_catalog'
# #)
# ## For a sequence of inserts:
# insrt_stmnt_single = await conn.prepare(str(do_update_stmt))
# async with conn.transaction():
# for row in df.itertuples(index=False):
# converted_row = [convert_to_object(v) for v in row]
# returned = await insrt_stmnt_single.fetch(*converted_row)
# for returned_single in returned:
# for pk, value in returned_single.items():
# pks[pk].append(value)
# ## Return a copy of the original df, with actual DB columns, data and the primary keys
# for pk, values in pks.items():
# df[pk] = values
# return df
def postgres_upsert(table, conn, keys, data_iter):
# See https://stackoverflow.com/questions/61366664/how-to-upsert-pandas-dataframe-to-postgresql-table
# Comment by @HopefullyThisHelps
data = [dict(zip(keys, row)) for row in data_iter]
insert_statement = insert(table.table).values(data)
upsert_statement = insert_statement.on_conflict_do_update(
constraint=f"{table.table.name}_pkey",
set_={c.key: c for c in insert_statement.excluded},
)
conn.execute(upsert_statement)
async def upsert_df(df: pd.DataFrame, model: SQLModel, chunksize: int = 1000):
if len(df) == 0:
return df
table = model.__table__
## Generate the 'upsert' statement, using fake values but defining columns
columns = {c.name for c in table.columns}
values = {col: None for col in df.columns if col in columns}
insrt_stmnt = insert(table, inline=True, values=values, returning=table.primary_key.columns)
df_columns = set(df.columns)
do_update_stmt = insrt_stmnt.on_conflict_do_update(
constraint=table.primary_key,
set_={
k.name: getattr(insrt_stmnt.excluded, k.name)
for k in insrt_stmnt.excluded
if k.name in df_columns and
k.name not in [c.name for c in table.primary_key.columns]
}
)
## Filter and reorder the df columns
## in order to match the order of columns in the insert statement
df = df[[col for col in do_update_stmt.compile().positiontup
if col in df_columns]].copy()
def convert_to_object(value):
"""
Quick (but slow) and dirty: clean up values (nan, nat) for inserting to postgres via asyncpg
"""
if isinstance(value, float) and isnan(value):
return None
elif pd.isna(value):
return None
else:
return value
# def encode_geometry(geometry):
# if not hasattr(geometry, '__geo_interface__'):
# raise TypeError('{g} does not conform to '
# 'the geo interface'.format(g=geometry))
# shape = shapely.geometry.asShape(geometry)
# return shapely.wkb.dumps(shape)
# def decode_geometry(wkb):
# return shapely.wkb.loads(wkb)
## pks: list of dicts of primary keys
pks = {pk.name: [] for pk in table.primary_key.columns}
async with db.bind.raw_pool.acquire() as conn:
## Set standard encoder for HSTORE, geometry
await conn.set_builtin_type_codec('hstore', codec_name='pg_contrib.hstore')
#await conn.set_type_codec(
# 'geometry', # also works for 'geography'
# encoder=encode_geometry,
# decoder=decode_geometry,
# format='binary',
#)
#await conn.set_type_codec(
# 'json',
# encoder=json.dumps,
# decoder=json.loads,
# schema='pg_catalog'
#)
## For a sequence of inserts:
insrt_stmnt_single = await conn.prepare(str(do_update_stmt))
async with conn.transaction():
for row in df.itertuples(index=False):
converted_row = [convert_to_object(v) for v in row]
returned = await insrt_stmnt_single.fetch(*converted_row)
for returned_single in returned:
for pk, value in returned_single.items():
pks[pk].append(value)
## Return a copy of the original df, with actual DB columns, data and the primary keys
for pk, values in pks.items():
df[pk] = values
return df
from functools import partial
import concurrent.futures
import asyncio
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
await loop.run_in_executor(
pool,
partial(
df.to_sql,
model.__tablename__,
conf.db.get_pg_url(), # Cannot use sync_engine in run_in_executor
# because it's not pickable
schema=model.__table__.schema, # type: ignore
if_exists="append",
index=False,
method=postgres_upsert,
chunksize=chunksize,
),
)
#async def upsert_df(df, model):