gisaf-backend/src/gisaf/importers.py

628 lines
26 KiB
Python

import datetime
import logging
import re
from typing import Any
from collections import defaultdict, OrderedDict
from math import isnan
from mimetypes import guess_type
from zipfile import ZipFile, BadZipFile
import geopandas as gpd
import numpy as np
import pandas as pd
from shapely.wkb import dumps as dumps_wkb
from sqlalchemy import and_
from sqlalchemy.sql.schema import Column
from gisaf.config import conf
#from .models.admin import FileImport
from gisaf.models.admin import FeatureImportData, BasketImportResult
from gisaf.models.raw_survey import RawSurveyModel
from gisaf.models.survey import Surveyor, Accuracy, Equipment, AccuracyEquimentSurveyorMapping
from gisaf.models.tags import Tags
from gisaf.redis_tools import store as redis_store
from gisaf.registry import registry
from gisaf.utils import (
delete_df, upsert_df,
SHAPELY_TYPE_TO_MAPBOX_TYPE, DEFAULT_MAPBOX_LAYOUT,
MAPBOX_COLOR_ATTRIBUTE_NAME, MAPBOX_OPACITY_ATTRIBUTE_NAME,
DEFAULT_MAPBOX_PAINT, gisTypeSymbolMap
)
logger = logging.getLogger(__name__)
class ImportError(Exception):
pass
class Importer:
"""
Base class for all importers.
The main process is executed by do_import(file)
Subclasses should define read_file and process_df.
"""
basket: Any = None # type hint: baskets.Basket
async def do_import(self, file_record, dry_run=False, **kwargs) -> BasketImportResult:
"""
Return: a BasketImportResult instance, or a message string,
or a tuple or a list like (message, details for user feedback).
"""
df = await self.read_file(file_record)
model = self.get_model(file_record)
return await self.import_df(df, model, file_record, dry_run=dry_run, **kwargs)
async def read_file(self, file_record):
"""
Subclasses should define the operation for reading the file into a DataFrame.
"""
raise NotImplementedError('Nothing defined for reading the file')
async def import_df(self, df, model, file_record, dry_run=False, **kwargs):
"""
Subclasses should define the operation for importing the DataFrame (save to DB).
"""
raise NotImplementedError('Nothing defined for importing the file')
async def get_model(self, file_record):
"""
Subclasses should define the operation for returning the model for the file.
"""
raise NotImplementedError('Nothing defined for identifying the model')
def preprocess(self, df):
"""
Hook for pre-processing the dataframe read from the file.
"""
pass
class RawSurveyImporter(Importer):
"""
Importer for raw survey points
"""
def get_model(self, file_record):
return RawSurveyModel
async def read_file(self, file_record):
try:
df = pd.read_csv(file_record.path, header=None)
except pd.errors.ParserError:
raise ImportError('File format error (cannot parse)')
## Take only the first 5 columns
if len(df.columns) < 5:
raise ImportError('File format error (at least 5 columns needed)')
df = df[range(5)]
df.dropna(inplace=True)
df.columns = ['orig_id', 'easting', 'northing', 'elevation', 'category']
## Strip extra characters
df['category'] = df['category'].str.strip()
df['orig_id'] = df.orig_id.astype(str)
## Create a Geodataframe
gdf = gpd.GeoDataFrame(
df,
geometry=gpd.points_from_xy(df.easting, df.northing, df.elevation),
crs=conf.raw_survey['spatial_sys_ref']
)
return gdf
async def import_df(self, gdf, model, file_record, dry_run=False,
remove_misplaced=False, **kwargs):
user_feedback = OrderedDict()
user_feedback['Points in file'] = len(gdf)
## Date from the file name
## TODO: should be added to FileImport (or other metadata)
fname_search = re.match('^(\S+)-(\d\d\d\d)-(\d\d)-(\d\d).*$', file_record['name'])
if not fname_search:
raise ImportError('The file name is not OK '
'(format should be: "PPP-DESCRIPTION-YYYY-MM-DD", '
'PPP being the project name, DESCRITION is optional and discarded)')
date = datetime.date(day=int(fname_search.group(4)),
month=int(fname_search.group(3)),
year=int(fname_search.group(2)))
## TODO: Check if date == file_record.date
accuracies = await AccuracyEquimentSurveyorMapping.get_df(with_related=True)
## Cleanup
accuracies.rename(columns={
'gisaf_survey_surveyor_id': 'surveyor_id',
'gisaf_survey_surveyor_name': 'surveyor',
'gisaf_survey_equipment_id': 'equipment_id',
'gisaf_survey_equipment_name': 'equipment',
'gisaf_survey_accuracy_id': 'accuracy_id',
'gisaf_survey_accuracy_name': 'accuracy_name',
'gisaf_survey_accuracy_accuracy': 'accuracy',
'geometry__3': 'type',
}, inplace=True)
accuracies.drop(columns=['surveyor__1', 'equipment_2', 'accuracy__4'], inplace=True)
accuracy = accuracies.loc[
(accuracies.surveyor==file_record.surveyor)
& (accuracies.equipment==file_record.equipment)
& (accuracies.type=='Point')
]
if len(accuracy) == 0:
raise ImportError(f'No accuracy defined for surveyor {file_record.surveyor} '\
f'and equipment {file_record.equipment}')
elif len(accuracy) > 1:
raise ImportError(f'More than 1 accuracy defined for surveyor {file_record.surveyor} '\
f'and equipment {file_record.equipment}')
accuracy = accuracy.iloc[0]
def _apply_point_hash_function(rec):
return point_hash_function(
date,
file_record.project_id,
file_record.surveyor_id,
file_record.equipment_id,
rec.name + 1 ## Index in the file
)
gdf['id'] = gdf.apply(_apply_point_hash_function, axis=1)
## Add information to all records in the gdf
gdf['date'] = pd.to_datetime(date)
gdf['project_id'] = file_record.project_id
gdf['srvyr_id'] = file_record.surveyor_id
gdf['equip_id'] = file_record.equipment_id
gdf['accur_id'] = accuracy.accuracy_id
df_raw_survey = await model.get_df(
where=and_(
model.date==date,
model.project_id==file_record.project_id,
model.srvyr_id==file_record.surveyor_id,
model.equip_id==file_record.equipment_id,
)
)
user_feedback['Existing raw_survey points'] = len(df_raw_survey)
## Import to raw_survey_data table
## PostGis specific: add SRID
gdf['geom'] = gdf['geometry'].apply(lambda g: dumps_wkb(g, srid=conf.geo.raw_survey.srid, hex=True))
if not dry_run:
await upsert_df(gdf, model)
## Keep track of points without known categories for user feedback
df_unknown_categories = gdf[~gdf.category.isin(registry.categories.index)]
unknown_categories = df_unknown_categories[['category', 'orig_id']].groupby('category').count()
for row in unknown_categories.itertuples():
user_feedback[f'Category {row[0]} (unknown)'] = row[1]
gdf = gdf.merge(registry.categories, left_on='category', right_index=True)
## Import to raw survey models tables
for category_name, category_gdf in gdf.groupby('category'):
category = registry.categories.loc[category_name]
user_feedback[f"Category {category.name} ({category.store_name})"] = len(category_gdf)
if not dry_run:
await upsert_df(category_gdf, category.raw_survey_model)
## Auto import: import from raw survey models tables to geometry tables
gdf['geom'] = gdf.to_crs(conf.srid).geometry.apply(lambda g: dumps_wkb(g, srid=conf.srid, hex=True))
for category_name, category_gdf in gdf.groupby('category'):
category = registry.categories.loc[category_name]
if category.auto_import and category.gis_type == 'Point':
if not dry_run:
await upsert_df(category_gdf, category.model)
if remove_misplaced:
gdf_db = await category.model.get_geo_df()
gdf_db.reset_index(inplace=True)
## Finding by id does not work since points might
## have been placed in a different category
#matches = gdf_db.loc[gdf_db['id'].isin(category_gdf.id)]
matches = gdf_db.loc[
gdf_db.orig_id.isin(category_gdf.orig_id)
& (gdf_db.srvyr_id == file_record.surveyor_id)
& (gdf_db.equip_id == file_record.equipment_id)
& (gdf_db.date.dt.date == date)
]
matches_merged = matches.merge(
category_gdf.to_crs(conf.srid),
on='orig_id',
suffixes=('', '_new')
)
misplaced = matches_merged.loc[
matches_merged['geometry'] != matches_merged['geometry_new']
]
if len(misplaced) > 0:
await delete_df(misplaced, category.model)
## Filter matches with orig_id and different geometry
pass
user_feedback['Points imported'] = len(gdf)
return BasketImportResult(
message=f"Import successful{' (dry run)' if dry_run else ''}",
details=user_feedback,
)
class GeoDataImporter(Importer):
"""
Base class for geo data importers
"""
def get_model(self, file_record):
return registry.geom.get(file_record.store)
async def read_file(self, file_record):
## TODO: guess_type supports pathlib.Path from Python 3.8, so it should be:
#if guess_type(file_record.path)[0] == 'application/zip':
## For Python 3.7 compatibility:
if guess_type(str(file_record.path.absolute()))[0] == 'application/zip':
return gpd.read_file(f'zip://{file_record.path}')
else:
return gpd.read_file(file_record.path)
async def get_metadata_from_raw_points(self, gdf, model, file_record):
pass
async def import_df(self, gdf_file_input, model, file_record, dry_run=False, live_publish_results=False):
## XXX: Removed qml processing (Mapbox)
#qml_file_ = glob.glob(path.join(tmpdir, '*.qml'))
#if qml_file_:
# process_qml(qml_file_[0], model)
new_or_updated_items = {}
now = datetime.datetime.now()
bad_geometry_types = {}
user_feedback = OrderedDict()
to_keep = []
## If the model has a project (eg. all survey models, see BaseSurveyModel),
## get only the features with the project given in the fileImport object
if hasattr(model, 'project_id'):
if np.isnan(file_record.project_id):
raise ImportError('No project defined for that file')
query = model.project_id==file_record.project_id
else:
query = None
if file_record.status and isinstance(model.status, Column):
query = and_(query, model.status==file_record.status)
## Explode the input (in case of multi-geometries)
gdf_file = gdf_file_input.explode(ignore_index=True)
gdf_db = await model.get_geo_df(query, with_related=False)
gdf_db.reset_index(inplace=True)
## Cleanup input: drop empty geometries and duplicates
nb_all = len(gdf_file)
gdf_file.dropna(subset=['geometry'], inplace=True)
gdf_file = gdf_file[~gdf_file.geometry.is_empty]
nb_empty_geom = nb_all - len(gdf_file)
gdf_file.drop_duplicates(inplace=True)
nb_duplicate = nb_all - nb_empty_geom - len(gdf_file)
## Reproject
gdf_file = gdf_file.copy().to_crs(conf.crs['db'])
synonyms = getattr(model, 'synonyms', {'ID': 'id'})
## Apply synonyms
for k, v in synonyms.items():
if k in gdf_file.columns:
gdf_file[v] = gdf_file[k]
if 'orig_id' in gdf_file.columns:
gdf_file.drop(columns='orig_id', inplace=True)
## Make sure the types of the columns of gdf_file and gdf_db match
for col_name, col_type in gdf_db.dtypes.items():
if col_name in gdf_file.columns:
try:
coerce = gdf_file[col_name].dtypes != col_type
except TypeError:
## Workaround for strange bug in Pandas: data type not understood
coerce = gdf_file[col_name].dtypes.name != col_type.name
if coerce:
gdf_file[col_name] = gdf_file[col_name].astype(col_type)
## Add status
if 'status' not in gdf_file.columns:
gdf_file['status'] = file_record.status
## TODO: define exactly the meaning of the 'id' attribute in shapefiles,
## it gets quite confusing.
## Drop the id, as it migt conflict with the one in the DB
## For now in this import process only, rename eventual 'id' to 'orig_id'
if 'id' not in gdf_file.columns:
gdf_file['id'] = None
gdf_file.rename(columns={'id': 'orig_id'}, inplace=True)
gdf_file['orig_id'] = gdf_file['orig_id'].astype(str).astype(object)
## Get geometries as pygeos for gdf_file and gdf_db
gdf_file['ewkb'] = gdf_file.geometry.apply(lambda g: dumps_wkb(g, srid=conf.srid, hex=True))
gdf_db['ewkb'] = gdf_db.geometry.apply(lambda g: dumps_wkb(g, srid=conf.srid, hex=True))
self.preprocess(gdf_file)
await self.get_metadata_from_raw_points(gdf_file, model, file_record)
## Diffs: using attribute and spatial merges, see https://geopandas.org/mergingdata.html
## Attribute diff: identify identical rows,
## so wont be touched and don't need spatial join
df_identical = gdf_db.merge(gdf_file, how='inner')
## Need an id
if 'id' not in gdf_file.columns:
gdf_file['id'] = gdf_file.index
gdf_diff = gdf_db.merge(gdf_file, on='ewkb', how='outer',
indicator=True, suffixes=('', '__file'))
to_update = gdf_diff.loc[gdf_diff._merge=='both']
## Filter out the identical rows from to_update: update non-geometry attributes,
## keeping the id
to_update = to_update.loc[~to_update.id.isin(df_identical.id)]
to_delete = gdf_diff.loc[gdf_diff._merge=='left_only']
## TODO?: try with geometric operations to determine nearest geometry
## from the file's to the DB's.
## See:
# - _binary_predicate in geopandas.array, which returns True/False,
# when it could return the id of the matching geometry
## Meanwhile, just wait for sjoin to be implemented:
# - https://github.com/geopandas/geopandas/pull/1271
#almost_equals = to_delete.geom_almost_equals(gdf_db.geometry)
#equals = to_delete.geom_equals(gdf_db.geometry)
#equals_exact = to_delete.geom_equals_exact(gdf_db.geometry, tolerance=0.1)
to_insert = gdf_diff.loc[gdf_diff._merge=='right_only']
## Take column names from the gdf_file (*__file) and rename to the gdf_db.
## Geometry is a bit of an exception.
to_insert_columns = [col for col in to_insert.columns
if col.endswith('__file') and col != 'geometry__file']
to_insert = to_insert.drop(columns={col[:-6] for col in to_insert_columns})\
.rename(columns={col: col[:-6] for col in to_insert_columns})\
.merge(gdf_diff[['orig_id', 'ewkb']], left_index=True, right_index=True)\
.drop(columns=['id'])
to_update_columns = [col for col in to_update.columns
if col.endswith('__file')
and col not in ('geometry__file', 'id__file')]
to_update = to_update.drop(columns={col[:-6] for col in to_update_columns})\
.rename(columns={col: col[:-6] for col in to_update_columns})
to_insert['geometry'] = to_insert['geometry__file']
to_insert.rename(columns={'orig_id_x': 'orig_id', 'ewkb_x': 'ewkb'}, inplace=True)
to_insert.drop(columns=['orig_id_y', 'ewkb_y'], inplace=True)
## Add project to all records in the new records
to_insert['project_id'] = file_record.project_id
## Write to DB
if not dry_run:
for _gdf in [to_insert, to_update]:
_gdf['geom'] = _gdf.geometry.apply(
lambda g: dumps_wkb(g, srid=conf.srid, hex=True)
)
if len(to_insert) > 0:
inserted = await upsert_df(to_insert, model)
to_insert['id'] = inserted['id']
await upsert_df(to_update, model)
await delete_df(to_delete, model)
## Add feature_import_data
feature_import_data = await FeatureImportData.get_df(
where=FeatureImportData.store==file_record.store
)
fid_updated = feature_import_data[['id', 'feature_id']]\
.merge(to_update, left_on='feature_id', right_on='id')
fid_updated['file_path'] = str(file_record.path)
fid_updated['store'] = file_record.store
fid_updated['origin'] = 'shapefile' ## TODO: gpkg
fid_updated['time'] = now
fid_updated['file_md5'] = file_record.md5
fid_updated.drop(columns=['orig_id'], inplace=True)
fid_updated.rename(columns={'orig_id__file': 'orig_id'}, inplace=True)
## Write to DB
await upsert_df(fid_updated, FeatureImportData)
## Same for to_insert
if len(to_insert) > 0:
fid_inserted = pd.DataFrame(columns=fid_updated.columns)
fid_inserted['feature_id'] = inserted['id']
## FIXME: fid_inserted['orig_id']
#fid_inserted['orig_id'] = inserted['orig_id']
fid_inserted['store'] = file_record.store
fid_inserted['file_path'] = str(file_record.path)
fid_inserted['origin'] = 'shapefile' ## TODO: gpkg
fid_inserted['time'] = now
fid_inserted['file_md5'] = file_record.md5
await upsert_df(fid_inserted, FeatureImportData)
## Find and update tags
if len(to_insert) > 0:
tags = await Tags.get_geo_df(where=Tags.store==file_record.store)
if len(tags) > 0:
matching_tags = gpd.sjoin(to_insert, tags.reset_index())
new_ref_ids = tags.merge(matching_tags, left_on='ref_id',
right_on='ref_id')[['ref_id', 'id_left']]
new_tags = tags.merge(new_ref_ids, on='ref_id')
new_tags.drop(columns='ref_id', inplace=True)
new_tags.rename(columns={'id_left': 'ref_id'}, inplace=True)
await upsert_df(new_tags, Tags)
## Publish on Gisaf live
gis_type = gdf_file.gis_type.iloc[0]
mapbox_type = SHAPELY_TYPE_TO_MAPBOX_TYPE.get(gis_type, None)
mapbox_paint = DEFAULT_MAPBOX_PAINT.get(mapbox_type, {}).copy()
mapbox_layout = DEFAULT_MAPBOX_LAYOUT.get(mapbox_type, {}).copy()
symbol = gisTypeSymbolMap.get(gis_type, '\ue02e')
live_layers = {
'insert': {
'data': to_insert,
'color': 'blue',
'opacity': 0.3,
},
'update': {
'data': to_update,
'color': 'green',
'opacity': 0.3,
},
'delete': {
'data': to_delete,
'color': 'red',
'opacity': 0.3,
},
}
if live_publish_results:
for name, defn in live_layers.items():
mapbox_paint[MAPBOX_COLOR_ATTRIBUTE_NAME[mapbox_type]] = defn['color']
mapbox_paint[MAPBOX_OPACITY_ATTRIBUTE_NAME[mapbox_type]] = defn['opacity']
await redis_store.publish_gdf(
f'Last import: {name}',
defn['data'],
status='*',
mapbox_paint=mapbox_paint,
mapbox_layout=mapbox_layout,
viewable_role='reviewer',
)
user_feedback['Total in file'] = nb_all
if nb_empty_geom > 0:
user_feedback['Empty single geoms in file'] = nb_empty_geom
if nb_duplicate > 0:
user_feedback['Duplicates in file'] = nb_duplicate
user_feedback['Total in database'] = len(gdf_db)
user_feedback['Identical (no change)'] = len(df_identical)
user_feedback['Update attributes'] = len(to_update)
user_feedback['Insert'] = len(to_insert)
user_feedback['Delete'] = len(to_delete)
#summary = len(gdf_db) + len(to_insert) - len(to_delete) - len(gdf_file)
#user_feedback['Summary (+/-)'] = f'{summary:d}'
return BasketImportResult(
time=now,
message=f"Import successful{' (dry run)' if dry_run else ''}",
details=user_feedback,
)
class LineWorkImporter(GeoDataImporter):
"""
Importer for line work geo data files (from survey)
"""
async def get_metadata_from_raw_points(self, gdf, model, file_record):
"""
Find raw points in the shape and set equip_id, srvyr_id, accur_id
"""
raw_points = await model.raw_model.get_geo_df(where=model.raw_model.project_id==file_record.project_id)
raw_points.drop(columns='orig_id', inplace=True)
raw_points.to_crs(conf.crs['db'], inplace=True)
## Take a small buffer around the points
## due to approximations due to reprojection, inaccurate line work...
buf_raw_points = raw_points.copy()
buf_raw_points['geometry'] = raw_points.buffer(0.0000005, resolution=3)
## Drop columns which interfer with metadata coming from raw points
## In future, the importer might detect and use such metadata in the input file
gdf.drop(
columns=set(gdf.columns).intersection(
{'date', 'accur_id', 'srvyr_id', 'equip_id', 'project_id'}
),
inplace=True
)
sjoin = gpd.sjoin(gdf, buf_raw_points, op='intersects', how='left')
sjoin.drop(columns=['accur_id', 'status_left', 'status_right', 'ewkb'], inplace=True)
sjoin.rename(columns={'index_right': 'raw_point_id'}, inplace=True)
sjoin['raw_point_id'] = sjoin['raw_point_id'].astype('Int64')
## For surveyor info
## Get the last surveyor, equipment (by date)
## Sort raw points by date, maintaining the index (raw points)
cols_surveyor_info = ['srvyr_id', 'equip_id', 'date']
sjoin.set_index([sjoin.index, 'date'], inplace=True)
sjoin.sort_index(ascending=False, inplace=True)
sjoin.reset_index(inplace=True)
sjoin.set_index('level_0', inplace=True)
sjoin.index.name = 'feature_id'
gdf[cols_surveyor_info] = sjoin.groupby(sjoin.index).head(1)[cols_surveyor_info]
## Fix issue when all dates are NaT (reset_index converts that column to float,
## see https://github.com/pandas-dev/pandas/issues/30517)
gdf.date.fillna(pd.NaT, inplace=True)
## A bit the same for accuracies: take the lowest one of the raw points for each feature
accuracies = await Accuracy.get_df()
accuracies.set_index('id', inplace=True)
accuracy_mappings = await AccuracyEquimentSurveyorMapping.get_df(
where=AccuracyEquimentSurveyorMapping.geometry_type == 'Line_work'
)
accuracy_mappings.drop(columns='geometry_type', inplace=True)
sjoin = sjoin.reset_index().merge(
accuracy_mappings,
left_on=['srvyr_id', 'equip_id'],
right_on=['surveyor_id', 'equipment_id']
)
sjoin = sjoin.merge(accuracies[['accuracy']], left_on='accuracy_id', right_index=True)
sjoin.set_index(['feature_id', 'accuracy'], inplace=True)
sjoin.sort_index(ascending=False, inplace=True)
sjoin.reset_index(inplace=True)
sjoin.set_index('feature_id', inplace=True)
gdf['accur_id'] = sjoin.groupby(sjoin.index).head(1)['accuracy_id']
hash_max = 2**32
hash_date_min = datetime.date(year=1900, month=1, day=1).toordinal()
def point_hash_function(date, project_id, surveyor_id, equipment_id, i):
"""
A function which takes arguments, and generates a "unique", positive
integer hash, reasonably speaking.
We could just use hash() %% 2*32, that would be quite reasonable, but hard to reverse.
A simply resersable hash, would be ideally with a number generated
with human readability in mind, like:
return int('{:02d}{:03d}{:03d}{:01d}{:d}'.format(
project_id % hash_project_max,
date.toordinal() - hash_date_min,
surveyor_id % hash_surveyor_max,
equipment_id % hash_equipment_max,
i
)) % 2**32
But i don't think it's reliable enough and would be super confusing actually as soon as
one of the parameters overflows its limits.
Using https://stackoverflow.com/questions/4273466/reversible-hash-function, comment 7,
try a reversable function with some mathematical base.
Choose 2 big primes from https://www.bigprimes.net/archive/prime/14000000/ :
32416190039 and 32416190071
num = '{:03d}{:05d}{:03d}{:02d}{:05d}'.format(
project_id % hash_project_max,
date.toordinal() - hash_date_min,
surveyor_id % hash_surveyor_max,
equipment_id % hash_equipment_max,
i
)
The reverse hash function, for the record, if we ever use a reversable hash based on coprimes,
would look like, maybe:
return (h_ * 32416190071) % 2**32
# Then we would return the inverse of the format function in point_hash_function, easy ;)
No good, so hash() and no smart reverse: have to use brute force to reverse.
"""
h_ = hash((
project_id,
date.toordinal() - hash_date_min,
surveyor_id,
equipment_id,
i
))
return h_ % hash_max