import logging import mimetypes from pathlib import Path from datetime import timedelta import tarfile from typing import Optional from base64 import standard_b64decode import re from typing import Tuple from json import loads from uuid import UUID from fastapi import (FastAPI, Response, HTTPException, File, UploadFile, Request, Form, responses, Depends, status) from fastapi.staticfiles import StaticFiles from fastapi.security import OAuth2PasswordRequestForm from sqlmodel import select from sqlalchemy import or_ import geopandas as gpd # type: ignore import pandas as pd import aiofiles import aiofiles.os from PIL import Image from treetrail.utils import (get_attachment_poi_root, get_attachment_root, get_attachment_trail_root, get_attachment_tree_root, mkdir) from treetrail.security import ( Token, authenticate_user, create_access_token, get_current_active_user, get_current_user, get_current_roles, ) from treetrail.database import fastapi_db_session as db_session from treetrail.models import (BaseMapStyles, User, Role, Bootstrap, MapStyle, Tree, Trail, TreeTrail, POI, UserWithRoles, Zone, VersionedComponent) from treetrail.config import conf, get_cache_dir, __version__ from treetrail.plantekey import get_local_details from treetrail.tiles import registry as tilesRegistry logger = logging.getLogger(__name__) api_app = FastAPI( debug=False, title=conf.app.title, version=conf.version, # lifespan=lifespan, default_response_class=responses.ORJSONResponse, ) re_findmimetype = re.compile('^data:(\S+);') # type: ignore attachment_types: dict[str, type[Tree] | type[Trail] | type[POI]] = { 'tree': Tree, 'trail': Trail, 'poi': POI } attachment_thumbnailable_fields = { 'photo' } thumbnail_size = (200, 200) @api_app.get('/bootstrap') async def get_bootstrap( user: UserWithRoles = Depends(get_current_user) ) -> Bootstrap: # XXX: hide password - issue zith SQLModel return Bootstrap( server=VersionedComponent(version=__version__), client=VersionedComponent(version=__version__), app=conf.app, user=user, map=conf.map, baseMapStyles=BaseMapStyles( embedded=list(tilesRegistry.mbtiles.keys()), external=conf.mapStyles, ), ) @api_app.post("/token", response_model=Token) async def login_for_access_token( form_data: OAuth2PasswordRequestForm = Depends() ): user = await authenticate_user(form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta( minutes=conf.security.access_token_expire_minutes) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires) return {"access_token": access_token, "token_type": "bearer"} @api_app.post("/upload/{type}/{field}/{id}") async def upload(request: Request, type: str, field: str, id: str, db_session: db_session, file: UploadFile = File(...), user: User = Depends(get_current_active_user) ): if type not in attachment_types: raise HTTPException(status_code=status.HTTP_417_EXPECTATION_FAILED, detail=f"No such type: {type}") model = attachment_types[type] if field not in model.model_fields: raise HTTPException(status_code=status.HTTP_417_EXPECTATION_FAILED, detail=f"No such field for {type}: {field}") base_dir = get_attachment_root(type) / id if not base_dir.is_dir(): await aiofiles.os.mkdir(base_dir) filename = base_dir / file.filename # type: ignore if field in attachment_thumbnailable_fields: try: # TODO: async save image = Image.open(file.file) image.thumbnail(thumbnail_size) image.save(filename) logger.info(f'Saved thumbnail {filename}') except Exception as error: logger.warning('Cannot create thumbnail for ' + f'{type} {field} {id} ({filename}): {error}') else: async with aiofiles.open(filename, 'wb') as f: await f.write(file.file.read()) logger.info(f'Saved file {filename}') rec = await db_session.get(model, int(id)) if rec is None: raise HTTPException(status_code=status.HTTP_417_EXPECTATION_FAILED, detail=f'No such {type} id {id}') setattr(rec, field, file.filename) await db_session.commit() return { "message": "Successfully uploaded", "filename": file.filename, } @api_app.get("/makeAttachmentsTarFile") async def makeAttachmentsTarFile( db_session: db_session, current_user: User = Depends(get_current_active_user) ): """ Create a tar file with all photos, used to feed clients' caches for offline use """ logger.info('Generating thumbnails and tar file') tarfile_path = get_cache_dir() / 'attachments.tar' with tarfile.open(str(tarfile_path), 'w') as tar: for type, model in attachment_types.items(): data = await db_session.exec(select(model.id, model.photo)) # recs: list[Tree | Trail | POI] recs = data.all() for rec in recs: photo: str = rec.photo # type: ignore id: str = rec.id # type: ignore if photo: file = get_attachment_root(type) / str(id) / photo if file.is_file(): tar.add(file) logger.info(f'Generation of thumbnails and tar file ({tarfile_path}) finished') return { "message": "Successfully made attachments tar file", } @api_app.get("/logout") def logout(response: Response): response.delete_cookie(key='token') return response @api_app.get('/trail') async def get_trails( roles: list[Role] = Depends(get_current_roles), ): """ Get all trails """ gdf = await Trail.get_gdf( where=or_(Trail.viewable_role_id.in_([role.name for role in roles]), # type: ignore Trail.viewable_role_id == None)) # type: ignore # noqa: E711 if len(gdf) == 0: gdf.set_geometry([], inplace=True) # Get only file name of the photo URL else: photos_path_df = gdf['photo'].str.rpartition('/') # type: ignore if 2 in photos_path_df.columns: gdf['photo'] = photos_path_df[2] gdf['create_date'] = gdf['create_date'].astype(str) # type: ignore return Response(content=gdf.to_json(), media_type="application/json") # type: ignore @api_app.get('/trail/details') async def get_trail_all_details( db_session: db_session, ): """ Get details of all trails """ trails = await db_session.exec(select( Trail.id, Trail.name, Trail.description, Trail.photo, )) df = pd.DataFrame(trails.all()) # Get only file name of the photo URL photos_path_df = df['photo'].str.rpartition('/') if 2 in photos_path_df.columns: df['photo'] = photos_path_df[2] return Response(content=df.to_json(orient='records'), media_type="application/json") @api_app.get('/tree-trail') async def get_tree_trail( db_session: db_session, ) -> list[TreeTrail]: """ Get all relations between trees and trails. Note that these are not checked for permissions, as there's no really valuable information. """ data = await db_session.exec(select(TreeTrail)) return data.all() # type: ignore @api_app.get('/tree') async def get_trees( roles: list[Role] = Depends(get_current_roles), ): """ Get all trees """ gdf = await Tree.get_gdf( where=or_(Tree.viewable_role_id.in_([role.name for role in roles]), # type: ignore Tree.viewable_role_id == None)) # type: ignore # noqa: E711 if len(gdf) > 0: gdf['plantekey_id'] = gdf['plantekey_id'].fillna('') tree_trail_details = await get_local_details() if len(tree_trail_details) > 0: gdf = gdf.merge(tree_trail_details, left_on='plantekey_id', right_index=True, how='left') gdf['symbol'].fillna('\uE034', inplace=True) else: gdf['symbol'] = '\uE034' else: gdf.set_geometry([], inplace=True) # Get only file name of the photo URL if len(gdf) > 0: photos_path_df = gdf['photo'].str.rpartition('/') # type: ignore if 2 in photos_path_df.columns: gdf['photo'] = photos_path_df[2] ## TODO: format create_date in proper json gdf['create_date'] = gdf['create_date'].astype(str) # type: ignore gdf['id'] = gdf.index.astype(str) # type: ignore return Response(content=gdf.to_json(), media_type="application/json") def get_attachment_path(uuid, extension, feature_type, feature_id) -> Tuple[str, Path]: root_storage_path = Path(conf.storage.root_attachment_path) full_name = str(uuid) + extension dir: Path = root_storage_path / feature_type / str(feature_id) dir.mkdir(parents=True, exist_ok=True) return full_name, dir / full_name @api_app.post('/tree') async def addTree( request: Request, db_session: db_session, user: User = Depends(get_current_active_user), plantekey_id: str = Form(), picture: Optional[str] = Form(None), trail_ids: str | None = Form(None), lng: str = Form(), lat: str = Form(), uuid1: Optional[str] = Form(None), details: str | None = Form(None) ): tree = Tree(**Tree.get_tree_insert_params( plantekey_id, lng, lat, user.username, loads(details) if details else {}, )) if trail_ids is not None: for trail_id in trail_ids.split(','): tree_trail = TreeTrail( tree_id=tree.id, trail_id=int(trail_id) ) db_session.add(tree_trail) ## Save files resp:dict[str, UUID | str | None] = {'id': tree.id} if picture is not None: re_mimetype = re_findmimetype.search(picture) if re_mimetype: mimetype: str = re_mimetype.group(1) picture_file, full_path = get_attachment_path( uuid1, mimetypes.guess_extension(mimetype), 'tree', tree.id) with open(full_path, 'wb') as file_: ## Feels i'm missing something as it's quite ugly: # print(full_path) decoded = standard_b64decode(picture[picture.find(',')+1:]) file_.write(decoded) resp['picture'] = picture_file tree.photo = picture_file else: logger.warning('Bad picture data: cannot find mimetype') db_session.add(tree) await db_session.commit() return resp @api_app.get('/poi') async def get_pois( db_session: db_session, roles: list[Role] = Depends(get_current_roles), ) -> list[POI]: """ Get all POI """ gdf = await POI.get_gdf() # type: ignore if len(gdf) > 0: gdf.set_index('id', inplace=True) gdf.set_geometry(gpd.GeoSeries.from_wkb(gdf.wkb), inplace=True) gdf.drop('wkb', axis=1, inplace=True) gdf['symbol'] = '\uE001' else: gdf.set_geometry([], inplace=True) gdf['id'] = gdf.index.astype('str') # Also remove create_date, not really required and would need to be # propared to be serialized gdf.drop(columns='create_date', inplace=True) return Response(content=gdf.to_json(), media_type="application/json") # type: ignore @api_app.get('/zone') async def get_zones( db_session: db_session, roles: list[Role] = Depends(get_current_roles), ) -> list[Zone]: """ Get all Zones """ gdf = await Zone.get_gdf( where=or_(Zone.viewable_role_id.in_([role.name for role in roles]), # type: ignore Zone.viewable_role_id == None)) # type: ignore # noqa: E711 # Sort by area, a simple workaround for selecting smaller areas on the map gdf['area'] = gdf.area gdf.sort_values('area', ascending=False, inplace=True) gdf.drop(columns='area', inplace=True) # Also remove create_date, not really required and would need to be # propared to be serialized gdf.drop(columns='create_date', inplace=True) return Response(content=gdf.to_json(), media_type="application/json") # type: ignore @api_app.get('/style') async def get_styles( db_session: db_session, ) -> list[MapStyle]: """ Get all Styles """ data = await db_session.exec(select(MapStyle)) return data.all() # type: ignore @api_app.put("/trail/photo/{id}/{file_name}") async def upload_trail_photo(request: Request, db_session: db_session, id: str, file_name: str, file: UploadFile | None = None): """ This was tested with QGis, provided the properties for the trail layer have been defined correctly. This includes: in "Attributes Form", field "photo", "Widget Type" is set as WebDav storage, with store URL set correcly with a URL like: * 'http://localhost:4200/v1/trail/photo/' || "id" || '/' || file_name(@selected_file_path) * 'https://treetrail.avcsr.org/v1/trail/' || "id" || '/' || file_name(@selected_file_path) ## XXX: probably broken info as paths have changed """ # noqa: E501 base_dir = get_attachment_trail_root() / id if not base_dir.is_dir(): await aiofiles.os.mkdir(base_dir) if not file: contents = await request.body() # WebDAV if len(contents) > 0: # Save the file async with aiofiles.open(base_dir / file_name, 'wb') as f: await f.write(contents) # Update the trail record # With QGis this gets overwritten when it is saved trail = await db_session.get(Trail, int(id)) if trail is None: raise HTTPException(status_code=status.HTTP_417_EXPECTATION_FAILED, detail=f'No such trail id {id}') trail.photo = file_name await db_session.commit() else: return {"message": "No file found in the request"} else: # Multipart form - not tested try: contents = file.file.read() async with aiofiles.open(base_dir, 'wb') as f: await f.write(contents) except Exception: return {"message": "There was an error uploading the file"} finally: file.file.close() return {"message": f"Successfully uploaded {file.filename} for id {id}"} @api_app.put("/tree/photo/{id}/{file_name}") async def upload_tree_photo(request: Request, db_session: db_session, id: str, file_name: str, file: UploadFile | None = None): """ This was tested with QGis, provided the properties for the tree layer have been defined correctly. This includes: in "Attributes Form", field "photo", "Widget Type" is set as WebDav storage, with store URL set correcly with a URL like: * 'http://localhost:4200/v1/tree/photo/' || "id" || '/' || file_name(@selected_file_path) * 'https://treetrail.avcsr.org/v1/tree/' || "id" || '/' || file_name(@selected_file_path) ## XXX: probably broken info as paths have changed """ # noqa: E501 base_dir = get_attachment_tree_root() / id if not base_dir.is_dir(): await aiofiles.os.mkdir(base_dir) if not file: contents = await request.body() # WebDAV if len(contents) > 0: # Save the file async with aiofiles.open(base_dir / file_name, 'wb') as f: await f.write(contents) # Update the tree record # With QGis this gets overwritten when it is saved tree = await db_session.get(Tree, int(id)) if tree is None: raise HTTPException(status_code=status.HTTP_417_EXPECTATION_FAILED, detail=f'No such tree id {id}') tree.photo = file_name await db_session.commit() else: return {'message': 'No file found in the request'} else: # Multipart form - not tested try: contents = file.file.read() async with aiofiles.open(base_dir, 'wb') as f: await f.write(contents) except Exception: return {"message": "There was an error uploading the file"} finally: file.file.close() return {"message": f"Successfully uploaded {file.filename} for id {id}"} # => Below => # Serve the images # The URLs are better served by a reverse proxy front-end, like Nginx api_app.mount('/tree', StaticFiles(directory=mkdir(get_attachment_tree_root())), name='tree_attachments') api_app.mount('/trail', StaticFiles(directory=mkdir(get_attachment_trail_root())), name='trail_attachments') api_app.mount('/poi', StaticFiles(directory=mkdir(get_attachment_poi_root())), name='poi_attachments')