from pathlib import Path from aiopath import AsyncPath import logging from typing import ClassVar, Type from hashlib import md5 # from aiohttp_security import check_permission # from aiohttp.multipart import MultipartReader # from aiohttp.web import HTTPUnauthorized, HTTPForbidden from sqlmodel import select from sqlalchemy.orm import joinedload, QueryableAttribute from sqlalchemy.exc import NoResultFound from fastapi import UploadFile from gisaf.config import conf from gisaf.database import db_session from gisaf.importers import ( Importer, RawSurveyImporter, GeoDataImporter, LineWorkImporter, ImportError, ) from gisaf.models.admin import FileImport, BasketImportResult from gisaf.models.authentication import UserRead from gisaf.models.survey import Surveyor, Equipment from gisaf.models.project import Project logger = logging.getLogger(__name__) upload_fields_available = ["store", "status", "project", "surveyor", "equipment"] class Basket: """ Base class for all baskets. Plugin modules can import and subclass (the admin Manager sets the _custom_module attribute for reference to that module). The basket displays only columns (of FileImport) defined in the class, and additional fields for uploads. The basket can have a role. In that case, it will be completely hidden from users who don't have that role. """ name: ClassVar[str] importer_class: Type[Importer] | None = None importer: Importer _custom_module: str | None = None columns: list[str] = ["name", "time", "import", "delete"] upload_fields: list[str] = [] role: str | None = None def __init__(self): self.base_dir = Path(conf.admin.basket.base_dir) / self.name if self.importer_class is not None: self.importer = self.importer_class() self.importer.basket = self async def allowed_for(self, user: UserRead): """ Return False if the basket is protected by a role Request: aiohttp.Request instance """ if not self.role: return True if user is not None and user.has_role(self.role): return True else: return False async def get_files(self) -> list[FileImport]: async with db_session() as session: data = await session.exec( select(FileImport).where(FileImport.basket == self.name) ) return data.all() # type: ignore # async def get_files_df(self, convert_path=False): # """ # Get a dataframe of FileImport items in the basket. # """ # where = FileImport.basket==self.name # ## First, get the list of files in the base_dir, then associate with the FileImport instance # df = await FileImport.get_df( # where=where, # with_related=True # #with_only_columns=['id', 'path', 'time', 'status', 'table'], # ) # df.rename(columns={ # 'gisaf_admin_project_name': 'project', # 'gisaf_survey_surveyor_name': 'surveyor', # 'gisaf_survey_equipment_name': 'equipment', # }, inplace=True) # ## Sanity check # df.dropna(subset=['name'], inplace=True) # df['dir'] = df.dir.fillna('.') # df.reset_index(drop=True, inplace=True) # ## TODO: After the old admin is completely off and all is clean and nice, remove below and just: # # return df # ## Until the compatibility with old admin is required and we're sure nothing is destroyed: # ## Get files on the file system # if len(df) == 0: # return df # if convert_path: # df['path'] = df.apply(lambda fi: Path(fi['dir'])/fi['name'], axis=1) # #if check_fs: # # files = set(self.base_dir.glob('**/*')) # # df['exists'] = df.apply(lambda fi: self.base_dir/fi['dir']/fi['name'] in files, axis=1) # else: # return df async def get_file(self, file_id: int) -> FileImport | None: async with db_session() as session: query = ( select(FileImport) .where(FileImport.id == file_id) .options( joinedload(FileImport.project), # type: ignore joinedload(FileImport.surveyor), # type: ignore joinedload(FileImport.equipment), # type: ignore ) ) res = await session.exec(query) try: file = res.one() except NoResultFound: return None else: return file # df = await FileImport.get_df( # where=FileImport.id==id, # with_related=True, # ) # df.rename(columns={ # 'gisaf_admin_project_name': 'project', # 'gisaf_survey_surveyor_name': 'surveyor', # 'gisaf_survey_equipment_name': 'equipment', # }, inplace=True) # df['dir'] = df.dir.fillna('.') # ## Replace path with Path from pathlib # df['path'] = df.apply(lambda fi: self.base_dir/fi['dir']/fi['name'], axis=1) # file = df.iloc[0] # ## Override file.name, which otherwise is the index of the item (hack) # file.name = file['name'] # return file async def delete_file(self, file: FileImport): if file.dir: path = self.base_dir / file.dir / file.name else: path = self.base_dir / file.name if path.exists(): path.unlink() async with db_session() as session: await session.delete(file) await session.commit() async def import_file( self, file_import: FileImport, dry_run=True, **kwargs ) -> BasketImportResult: """ Import the file by calling the basket's importer's do_import. Time stamp the FileImport. Return a BasketImportResult ObjectType """ if not hasattr(self, "importer"): return BasketImportResult( message=f"No import defined/required for {self.name} basket" ) result: BasketImportResult try: result = await self.importer.do_import( file_import, dry_run=dry_run, **kwargs ) except ImportError as err: raise err except Exception as err: logger.exception(err) raise ImportError( f"Unexpected import error (details in the Gisaf logs): {err}" ) if not isinstance(result, BasketImportResult): raise ImportError( "Import error: the importer did not return a BasketImportResult" ) # if import_result: # if isinstance(import_result, (tuple, list)): # assert len(import_result) >= 2, \ # 'do_import should return message or (message, details)' # result = BasketImportResult( # message=import_result[0], # details=import_result[1], # ) # if len(import_result) > 2: # data = import_result[2] # else: # result = BasketImportResult(message=import_result) # else: # result = BasketImportResult(message='Import successful.') if dry_run: result.time = file_import.time else: ## Save time stamp async with db_session() as session: file_import.time = result.time session.add(file_import) await session.commit() return result async def add_file( self, file: UploadFile, user: UserRead, auto_import: bool = False, dry_run: bool = False, project_id: int | None = None, surveyor_id: int | None = None, equipment_id: int | None = None, ) -> BasketImportResult: """ File upload to basket. Typically called through an http POST view handler. Save the file, save the FileImport record, return dict of information. Note that the return dict has eventually numpy types and needs NumpyEncoder to be json.dump'ed. """ ## TODO: multiple items ## TODO: check if file already exists # assert part.name == 'file' ## Save file on filesystem if file.filename is None: raise ImportError("No file name") path = AsyncPath(self.base_dir) / file.filename ## Eventually create the directory await path.parent.mkdir(parents=True, exist_ok=True) file_content = await file.read() md5sum = md5(file_content).hexdigest() async with path.open("wb") as f: ## No way to use async to stream the file content to write it? await f.write(file_content) async with db_session() as session: fileImportRecord = FileImport( name=file.filename, md5=md5sum, dir=str(self.base_dir), basket=self.name, project_id=project_id, surveyor_id=surveyor_id, equipment_id=equipment_id, # store=store_type_name, # status=parts.get('status', None), ) # fileImportRecord.path = self.base_dir / fileImportRecord.dir / fileImportRecord.name session.add(fileImportRecord) await session.commit() await session.refresh(fileImportRecord) if fileImportRecord.id is None: raise ImportError("Cannot save (no fileImportRecord.id)") ## Eventually do import basket_import_result = BasketImportResult(fileImport=fileImportRecord) if auto_import: if user.has_role("reviewer"): ## Get the record from DB, for compatibility with import_file file_import_record = await self.get_file(fileImportRecord.id) if file_import_record is None: basket_import_result.message = "Cannot import: file not found" else: try: basket_import_result = await self.import_file( file_import_record, dry_run ) except ImportError as err: basket_import_result.message = f"Error: {err.args[0]}" else: basket_import_result.message = ( "Cannot import: only a reviewer can do that" ) return basket_import_result class MiscGeomBasket(Basket): name = "Misc geo file" importer_class = GeoDataImporter columns = ["name", "time", "status", "store", "import", "delete"] upload_fields = ["store_misc", "status"] class LineWorkBasket(Basket): name = "Line work" importer_class = LineWorkImporter columns = ["name", "time", "status", "store", "project", "import", "delete"] upload_fields = ["store_line_work", "project", "status"] class SurveyBasket(Basket): name = "Survey" importer_class = RawSurveyImporter columns = ["name", "time", "project", "surveyor", "equipment", "import", "delete"] upload_fields = ["project", "surveyor", "equipment"] standard_baskets = ( SurveyBasket(), LineWorkBasket(), MiscGeomBasket(), )