308 lines
11 KiB
Python
308 lines
11 KiB
Python
|
from pathlib import Path
|
||
|
from collections import defaultdict
|
||
|
from json import loads
|
||
|
from datetime import datetime
|
||
|
import logging
|
||
|
from typing import ClassVar
|
||
|
|
||
|
# from aiohttp_security import check_permission
|
||
|
# from aiohttp.multipart import MultipartReader
|
||
|
# from aiohttp.web import HTTPUnauthorized, HTTPForbidden
|
||
|
|
||
|
from gisaf.config import conf
|
||
|
from gisaf.models.admin import FileImport
|
||
|
# from gisaf.models.graphql import AdminBasketFile, BasketImportResult
|
||
|
from gisaf.models.survey import Surveyor, Accuracy, Equipment, AccuracyEquimentSurveyorMapping
|
||
|
from gisaf.models.project import Project
|
||
|
from gisaf.importers import RawSurveyImporter, GeoDataImporter, LineWorkImporter, ImportError
|
||
|
from gisaf.utils import ToMigrate
|
||
|
|
||
|
logger = logging.getLogger(__name__)
|
||
|
|
||
|
upload_fields_available = ['store', 'status', 'project', 'surveyor', 'equipment']
|
||
|
|
||
|
|
||
|
class Basket:
|
||
|
"""
|
||
|
Base class for all baskets.
|
||
|
Plugin modules can import and subclass (the admin Manager sets the _custom_module
|
||
|
attribute for reference to that module).
|
||
|
The basket displays only columns (of FileImport) defined in the class,
|
||
|
and additional fields for uploads.
|
||
|
The basket can have a role. In that case, it will be completely hidden from users
|
||
|
who don't have that role.
|
||
|
"""
|
||
|
name: ClassVar[str]
|
||
|
importer_class = None
|
||
|
_custom_module = None
|
||
|
columns: list[str] = ['name', 'time', 'import', 'delete']
|
||
|
upload_fields: list[str] = []
|
||
|
role = None
|
||
|
|
||
|
def __init__(self):
|
||
|
self.base_dir = Path(conf.admin.basket.base_dir) / self.name
|
||
|
if self.importer_class:
|
||
|
self.importer = self.importer_class()
|
||
|
self.importer.basket = self
|
||
|
|
||
|
async def allowed_for(self, request):
|
||
|
"""
|
||
|
Return False if the basket is protected by a role
|
||
|
Request: aiohttp.Request instance
|
||
|
"""
|
||
|
if not self.role:
|
||
|
return True
|
||
|
else:
|
||
|
try:
|
||
|
await check_permission(request, self.role)
|
||
|
except (HTTPUnauthorized, HTTPForbidden):
|
||
|
return False
|
||
|
else:
|
||
|
return True
|
||
|
|
||
|
async def get_files(self, convert_path=False):
|
||
|
"""
|
||
|
Get a dataframe of FileImport items in the basket.
|
||
|
"""
|
||
|
where = FileImport.basket==self.name
|
||
|
## First, get the list of files in the base_dir, then associate with the FileImport instance
|
||
|
df = await FileImport.get_df(
|
||
|
where=where,
|
||
|
with_related=True
|
||
|
#with_only_columns=['id', 'path', 'time', 'status', 'table'],
|
||
|
)
|
||
|
df.rename(columns={
|
||
|
'gisaf_admin_project_name': 'project',
|
||
|
'gisaf_survey_surveyor_name': 'surveyor',
|
||
|
'gisaf_survey_equipment_name': 'equipment',
|
||
|
}, inplace=True)
|
||
|
## Sanity check
|
||
|
df.dropna(subset=['name'], inplace=True)
|
||
|
df['dir'] = df.dir.fillna('.')
|
||
|
df.reset_index(drop=True, inplace=True)
|
||
|
|
||
|
## TODO: After the old admin is completely off and all is clean and nice, remove below and just:
|
||
|
# return df
|
||
|
|
||
|
## Until the compatibility with old admin is required and we're sure nothing is destroyed:
|
||
|
## Get files on the file system
|
||
|
if len(df) == 0:
|
||
|
return df
|
||
|
if convert_path:
|
||
|
df['path'] = df.apply(lambda fi: Path(fi['dir'])/fi['name'], axis=1)
|
||
|
#if check_fs:
|
||
|
# files = set(self.base_dir.glob('**/*'))
|
||
|
# df['exists'] = df.apply(lambda fi: self.base_dir/fi['dir']/fi['name'] in files, axis=1)
|
||
|
else:
|
||
|
return df
|
||
|
|
||
|
async def get_file(self, id):
|
||
|
df = await FileImport.get_df(
|
||
|
where=FileImport.id==id,
|
||
|
with_related=True,
|
||
|
)
|
||
|
df.rename(columns={
|
||
|
'gisaf_admin_project_name': 'project',
|
||
|
'gisaf_survey_surveyor_name': 'surveyor',
|
||
|
'gisaf_survey_equipment_name': 'equipment',
|
||
|
}, inplace=True)
|
||
|
df['dir'] = df.dir.fillna('.')
|
||
|
## Replace path with Path from pathlib
|
||
|
df['path'] = df.apply(lambda fi: self.base_dir/fi['dir']/fi['name'], axis=1)
|
||
|
file = df.iloc[0]
|
||
|
## Override file.name, which otherwise is the index of the item (hack)
|
||
|
file.name = file['name']
|
||
|
return file
|
||
|
|
||
|
async def delete_file(self, id):
|
||
|
file = await FileImport.get(id)
|
||
|
if file.dir:
|
||
|
path = self.base_dir/file.dir/file.name
|
||
|
else:
|
||
|
path = self.base_dir/file.name
|
||
|
if path.exists():
|
||
|
path.unlink()
|
||
|
await file.delete()
|
||
|
|
||
|
async def import_file(self, file_import, dry_run=True, return_data_info=False, **kwargs):
|
||
|
"""
|
||
|
Import the file by calling the basket's importer's do_import.
|
||
|
Time stamp the FileImport.
|
||
|
Return a BasketImportResult ObjectType
|
||
|
"""
|
||
|
if not hasattr(self, 'importer'):
|
||
|
return BasketImportResult(
|
||
|
message=f'No import defined/required for {self.name} basket'
|
||
|
)
|
||
|
try:
|
||
|
import_result = await self.importer.do_import(file_import, dry_run=dry_run, **kwargs)
|
||
|
except ImportError as err:
|
||
|
raise
|
||
|
except Exception as err:
|
||
|
logger.exception(err)
|
||
|
raise ImportError(f'Unexpected import error: {err}')
|
||
|
|
||
|
if isinstance(import_result, BasketImportResult):
|
||
|
result = import_result
|
||
|
else:
|
||
|
if import_result:
|
||
|
if isinstance(import_result, (tuple, list)):
|
||
|
assert len(import_result) >= 2, \
|
||
|
'do_import should return message or (message, details)'
|
||
|
result = BasketImportResult(
|
||
|
message=import_result[0],
|
||
|
details=import_result[1],
|
||
|
)
|
||
|
if len(import_result) > 2:
|
||
|
data = import_result[2]
|
||
|
else:
|
||
|
result = BasketImportResult(message=import_result)
|
||
|
else:
|
||
|
result = BasketImportResult(message='Import successful.')
|
||
|
if dry_run:
|
||
|
result.time = file_import.time
|
||
|
else:
|
||
|
if not result.time:
|
||
|
result.time = datetime.now()
|
||
|
## Save time stamp
|
||
|
await (await FileImport.get(file_import.id)).update(time=result.time).apply()
|
||
|
if return_data_info:
|
||
|
return result, data
|
||
|
else:
|
||
|
return result
|
||
|
|
||
|
async def add_files(self, reader, request):
|
||
|
"""
|
||
|
File upload to basket.
|
||
|
Typically called through an http POST view handler.
|
||
|
Save the file, save the FileImport record, return dict of information.
|
||
|
Note that the return dict has eventually numpy types and needs NumpyEncoder
|
||
|
to be json.dump'ed.
|
||
|
"""
|
||
|
raise ToMigrate("basket add_files reader was aiohttp's MultipartReader")
|
||
|
## TODO: multiple items
|
||
|
## TODO: check if file already exists
|
||
|
## First part is the file
|
||
|
part = await reader.next()
|
||
|
assert part.name == 'file'
|
||
|
file_name = part.filename
|
||
|
|
||
|
## Save file on filesystem
|
||
|
size = 0
|
||
|
path = Path(self.base_dir) / file_name
|
||
|
|
||
|
## Eventually create the directory
|
||
|
path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
with path.open('wb') as f:
|
||
|
while True:
|
||
|
chunk = await part.read_chunk() # 8192 bytes by default.
|
||
|
if not chunk:
|
||
|
break
|
||
|
size += len(chunk)
|
||
|
f.write(chunk)
|
||
|
|
||
|
## Read other parts
|
||
|
parts = defaultdict(None)
|
||
|
while True:
|
||
|
part = await reader.next()
|
||
|
if not part:
|
||
|
break
|
||
|
value = (await part.read()).decode()
|
||
|
if value != 'null':
|
||
|
parts[part.name] = value
|
||
|
|
||
|
## Find ids of project, surveyor, equipment
|
||
|
if 'project' in parts:
|
||
|
project_id = (await Project.query.where(Project.name==parts.get('project')).gino.first()).id
|
||
|
else:
|
||
|
project_id = None
|
||
|
if 'surveyor' in parts:
|
||
|
surveyor_id = (await Surveyor.query.where(Surveyor.name==parts.get('surveyor')).gino.first()).id
|
||
|
else:
|
||
|
surveyor_id = None
|
||
|
if 'equipment' in parts:
|
||
|
equipment_id = (await Equipment.query.where(Equipment.name==parts.get('equipment')).gino.first()).id
|
||
|
else:
|
||
|
equipment_id = None
|
||
|
|
||
|
## Save FileImport record
|
||
|
store_keys = [store_key for store_key in parts.keys()
|
||
|
if store_key.startswith('store')]
|
||
|
if len(store_keys) == 1:
|
||
|
store_type_name = parts[store_keys[0]]
|
||
|
else:
|
||
|
store_type_name = None
|
||
|
fileImportRecord = await FileImport(
|
||
|
name=file_name,
|
||
|
dir=parts.get('dir', '.'),
|
||
|
basket=self.name,
|
||
|
project_id=project_id,
|
||
|
surveyor_id=surveyor_id,
|
||
|
equipment_id=equipment_id,
|
||
|
store=store_type_name,
|
||
|
status=parts.get('status', None),
|
||
|
).create()
|
||
|
fileImportRecord.path = self.base_dir/fileImportRecord.dir/fileImportRecord.name
|
||
|
|
||
|
admin_basket_file = AdminBasketFile(
|
||
|
id=fileImportRecord.id,
|
||
|
name=file_name,
|
||
|
status=parts.get('status'),
|
||
|
store=store_type_name,
|
||
|
project=parts.get('project'),
|
||
|
surveyor=parts.get('surveyor'),
|
||
|
equipment=parts.get('equipment'),
|
||
|
)
|
||
|
|
||
|
## Eventually do import
|
||
|
import_result = None
|
||
|
if loads(parts['autoImport']):
|
||
|
## Get the record from DB with Pandas, for compatibility with import_file
|
||
|
file_import_record = await self.get_file(fileImportRecord.id)
|
||
|
try:
|
||
|
await check_permission(request, 'reviewer')
|
||
|
except HTTPUnauthorized as err:
|
||
|
basket_import_result = BasketImportResult(
|
||
|
message="Cannot import: only a reviewer can do that"
|
||
|
)
|
||
|
else:
|
||
|
dry_run = parts.get('dry_run', False)
|
||
|
try:
|
||
|
basket_import_result = await self.import_file(file_import_record, dry_run)
|
||
|
except ImportError as err:
|
||
|
basket_import_result = BasketImportResult(
|
||
|
message=f'Error: {err.args[0]}'
|
||
|
)
|
||
|
|
||
|
admin_basket_file.import_result = basket_import_result
|
||
|
|
||
|
return admin_basket_file
|
||
|
|
||
|
|
||
|
class MiscGeomBasket(Basket):
|
||
|
name = 'Misc geo file'
|
||
|
importer_class = GeoDataImporter
|
||
|
columns = ['name', 'time', 'status', 'store', 'import', 'delete']
|
||
|
upload_fields = ['store_misc', 'status']
|
||
|
|
||
|
|
||
|
class LineWorkBasket(Basket):
|
||
|
name = 'Line work'
|
||
|
importer_class = LineWorkImporter
|
||
|
columns = ['name', 'time', 'status', 'store', 'project', 'import', 'delete']
|
||
|
upload_fields = ['store_line_work', 'project', 'status']
|
||
|
|
||
|
|
||
|
class SurveyBasket(Basket):
|
||
|
name = 'Survey'
|
||
|
importer_class = RawSurveyImporter
|
||
|
columns = ['name', 'time', 'project', 'surveyor', 'equipment', 'import', 'delete']
|
||
|
upload_fields = ['project', 'surveyor', 'equipment']
|
||
|
|
||
|
|
||
|
standard_baskets = (
|
||
|
SurveyBasket(),
|
||
|
LineWorkBasket(),
|
||
|
MiscGeomBasket(),
|
||
|
)
|