From d2ae5e4d7bedae81f9298dbb8070c7252f8191dd Mon Sep 17 00:00:00 2001 From: phil Date: Tue, 9 Apr 2024 16:16:04 +0530 Subject: [PATCH] Fix/update baskets: - get_file - import - Basket importers must return BasketImportResult Add API points Fix utils delete_df and upsert_df, also making them async friendly Auth: add helper functions to UserRead --- pdm.lock | 41 +++++- pyproject.toml | 3 +- src/gisaf/_version.py | 2 +- src/gisaf/api/admin.py | 60 +++++++- src/gisaf/baskets.py | 158 ++++++++++---------- src/gisaf/importers.py | 5 +- src/gisaf/models/admin.py | 4 +- src/gisaf/models/authentication.py | 10 ++ src/gisaf/utils.py | 222 +++++++++++++++++------------ 9 files changed, 323 insertions(+), 182 deletions(-) diff --git a/pdm.lock b/pdm.lock index 15213b4..daa384b 100644 --- a/pdm.lock +++ b/pdm.lock @@ -5,7 +5,20 @@ groups = ["default", "dev", "mqtt"] strategy = ["cross_platform"] lock_version = "4.4.1" -content_hash = "sha256:0da68c7fed8db7a12e36002b8d6194c1651f9653fc7fbf7797c553774c9dbf32" +content_hash = "sha256:581ae31055bb26abe5b7bf7cab172337258913d8960892dbd206e00421b309b7" + +[[package]] +name = "aiofile" +version = "3.8.8" +requires_python = ">=3.7, <4" +summary = "Asynchronous file operations." +dependencies = [ + "caio~=0.9.0", +] +files = [ + {file = "aiofile-3.8.8-py3-none-any.whl", hash = "sha256:41e8845cce055779cd77713d949a339deb012eab605b857765e8f8e52a5ed811"}, + {file = "aiofile-3.8.8.tar.gz", hash = "sha256:41f3dc40bd730459d58610476e82e5efb2f84ae6e9fa088a9545385d838b8a43"}, +] [[package]] name = "aiomqtt" @@ -20,6 +33,20 @@ files = [ {file = "aiomqtt-2.0.1.tar.gz", hash = "sha256:60f6451c8ab7235cfb392b1b0cab398e9bc6040f4b140628c0615371abcde15f"}, ] +[[package]] +name = "aiopath" +version = "0.6.11" +requires_python = ">=3.10" +summary = "📁 Async pathlib for Python" +dependencies = [ + "aiofile<4,>=3.5.0", + "anyio<4,>=3.2.0", +] +files = [ + {file = "aiopath-0.6.11-py2.py3-none-any.whl", hash = "sha256:7b1f1aa3acb422050908ac3c4755b5e43f625111be003f1bfc7dc2193027c45d"}, + {file = "aiopath-0.6.11.tar.gz", hash = "sha256:2f0d4d9195281612c6508cbfa12ac3184c31540d13b9e6215a325897da59decd"}, +] + [[package]] name = "aiosqlite" version = "0.20.0" @@ -175,6 +202,18 @@ files = [ {file = "bcrypt-4.0.1.tar.gz", hash = "sha256:27d375903ac8261cfe4047f6709d16f7d18d39b1ec92aaf72af989552a650ebd"}, ] +[[package]] +name = "caio" +version = "0.9.13" +requires_python = ">=3.7, <4" +summary = "Asynchronous file IO for Linux MacOS or Windows." +files = [ + {file = "caio-0.9.13-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:789f8b55f4a2b46be14361df3ac8d14b6c8f0a3730badd70cb1b7778fcdc7039"}, + {file = "caio-0.9.13-cp311-cp311-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a914684bad2a757cf013ae88d785d81659a3add1885bad60cd20bfbd3068bd5a"}, + {file = "caio-0.9.13-py3-none-any.whl", hash = "sha256:582cbfc6e203d1dedf662ba972a94db6e744fe0b6bb9e02922b0f86803006fc9"}, + {file = "caio-0.9.13.tar.gz", hash = "sha256:26f1e08a442bef4526a66142ea4e325e22dca8f040800aecb3caf8fae0589e98"}, +] + [[package]] name = "certifi" version = "2023.7.22" diff --git a/pyproject.toml b/pyproject.toml index a6d6897..dc1cdf3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,7 +18,7 @@ dependencies = [ "pydantic-settings>=2.0.3", "pyshp>=2.3.1", "python-jose[cryptography]>=3.3.0", - "python-multipart>=0.0.6", + "python-multipart>=0.0.9", "pyyaml>=6.0.1", "redis>=5.0.1", "sqlalchemy[asyncio]>=2.0.23", @@ -29,6 +29,7 @@ dependencies = [ "psycopg>=3.1.18", "plotly>=5.20.0", "matplotlib>=3.8.3", + "aiopath>=0.6.11", ] requires-python = ">=3.11,<4" readme = "README.md" diff --git a/src/gisaf/_version.py b/src/gisaf/_version.py index 0597d15..699f4f2 100644 --- a/src/gisaf/_version.py +++ b/src/gisaf/_version.py @@ -1 +1 @@ -__version__: str = '2023.4.dev62+g08c53cf.d20240405' \ No newline at end of file +__version__: str = '2023.4.dev63+g52e1d21.d20240408' \ No newline at end of file diff --git a/src/gisaf/api/admin.py b/src/gisaf/api/admin.py index 9e94b66..213c573 100644 --- a/src/gisaf/api/admin.py +++ b/src/gisaf/api/admin.py @@ -1,9 +1,11 @@ import logging -from fastapi import Depends, APIRouter, HTTPException, status, responses +from fastapi import (Depends, APIRouter, HTTPException, status, responses, + UploadFile) +from fastapi.responses import FileResponse -from gisaf.models.admin import AdminBasket, BasketNameOnly -from gisaf.models.authentication import User +from gisaf.models.admin import AdminBasket, BasketImportResult, BasketNameOnly +from gisaf.models.authentication import User, UserRead from gisaf.security import get_current_active_user from gisaf.admin import manager @@ -40,3 +42,55 @@ async def get_basket( ## TODO: Fix projects # projects=getattr(basket, 'projects', None) ) + +@api.post('/basket/upload/{name}') +async def upload_basket_file( + name: str, + file: UploadFile, + user: UserRead = Depends(get_current_active_user), + ): + try: + basket = manager.baskets[name] + except KeyError: + raise HTTPException(status.HTTP_404_NOT_FOUND, f'No basket named {name}') + fileItem = await basket.add_files(file, user) + return fileItem + +@api.get('/basket/download/{name}/{file_id}/{file_name}') +async def download_basket_file( + name: str, + file_id: int, + file_name: str, + user: User = Depends(get_current_active_user), + ) -> FileResponse: + try: + basket = manager.baskets[name] + except KeyError: + raise HTTPException(status.HTTP_404_NOT_FOUND, f'No basket named {name}') + if basket.role: + if not user.has_role(basket.role): + raise HTTPException(status.HTTP_401_UNAUTHORIZED) + file_record = await basket.get_file(file_id) + if file_record is None: + raise HTTPException(status.HTTP_404_NOT_FOUND, f'No import file id {file_id}') + return FileResponse(file_record.path) + +@api.get('/basket/import/{basket}/{file_id}') +async def import_basket_file( + basket: str, + file_id: int, + dryRun: bool = False, + user: User = Depends(get_current_active_user), + ) -> BasketImportResult: + if not (user and user.has_role('reviewer')): + raise HTTPException(status.HTTP_401_UNAUTHORIZED) + basket_ = manager.baskets[basket] + file_import = await basket_.get_file(file_id) + if file_import is None: + raise HTTPException(status.HTTP_404_NOT_FOUND, f'No import file id {file_id}') + try: + result = await basket_.import_file(file_import, dryRun) + ## FIXME: shouldn't it be AdminImportError? + except ImportError as err: + raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, err) + return result \ No newline at end of file diff --git a/src/gisaf/baskets.py b/src/gisaf/baskets.py index 349a300..f273193 100644 --- a/src/gisaf/baskets.py +++ b/src/gisaf/baskets.py @@ -1,4 +1,5 @@ from pathlib import Path +from aiopath import AsyncPath from collections import defaultdict from json import loads from datetime import datetime @@ -10,13 +11,15 @@ from typing import ClassVar # from aiohttp.web import HTTPUnauthorized, HTTPForbidden from sqlmodel import select +from sqlalchemy.orm import joinedload, QueryableAttribute +from sqlalchemy.exc import NoResultFound +from fastapi import UploadFile from gisaf.config import conf -from gisaf.utils import ToMigrate from gisaf.database import db_session from gisaf.importers import RawSurveyImporter, GeoDataImporter, LineWorkImporter, ImportError from gisaf.models.admin import FileImport, AdminBasketFile, BasketImportResult -from gisaf.models.authentication import User +from gisaf.models.authentication import UserRead from gisaf.models.survey import Surveyor, Equipment from gisaf.models.project import Project @@ -48,7 +51,7 @@ class Basket: self.importer = self.importer_class() self.importer.basket = self - async def allowed_for(self, user: User): + async def allowed_for(self, user: UserRead): """ Return False if the basket is protected by a role Request: aiohttp.Request instance @@ -101,23 +104,37 @@ class Basket: # else: # return df - async def get_file(self, id): - df = await FileImport.get_df( - where=FileImport.id==id, - with_related=True, - ) - df.rename(columns={ - 'gisaf_admin_project_name': 'project', - 'gisaf_survey_surveyor_name': 'surveyor', - 'gisaf_survey_equipment_name': 'equipment', - }, inplace=True) - df['dir'] = df.dir.fillna('.') - ## Replace path with Path from pathlib - df['path'] = df.apply(lambda fi: self.base_dir/fi['dir']/fi['name'], axis=1) - file = df.iloc[0] - ## Override file.name, which otherwise is the index of the item (hack) - file.name = file['name'] - return file + async def get_file(self, file_id: int) -> FileImport | None: + async with db_session() as session: + query = select(FileImport).where(FileImport.id==file_id).options( + joinedload(FileImport.project), + joinedload(FileImport.surveyor), + joinedload(FileImport.equipment), + ) + res = await session.exec(query) + try: + file = res.one() + except NoResultFound: + return None + else: + return file + + # df = await FileImport.get_df( + # where=FileImport.id==id, + # with_related=True, + # ) + # df.rename(columns={ + # 'gisaf_admin_project_name': 'project', + # 'gisaf_survey_surveyor_name': 'surveyor', + # 'gisaf_survey_equipment_name': 'equipment', + # }, inplace=True) + # df['dir'] = df.dir.fillna('.') + # ## Replace path with Path from pathlib + # df['path'] = df.apply(lambda fi: self.base_dir/fi['dir']/fi['name'], axis=1) + # file = df.iloc[0] + # ## Override file.name, which otherwise is the index of the item (hack) + # file.name = file['name'] + # return file async def delete_file(self, id): file = await FileImport.get(id) @@ -129,7 +146,7 @@ class Basket: path.unlink() await file.delete() - async def import_file(self, file_import, dry_run=True, return_data_info=False, **kwargs): + async def import_file(self, file_import: FileImport, dry_run=True, **kwargs) -> BasketImportResult: """ Import the file by calling the basket's importer's do_import. Time stamp the FileImport. @@ -139,44 +156,42 @@ class Basket: return BasketImportResult( message=f'No import defined/required for {self.name} basket' ) + result: BasketImportResult try: - import_result = await self.importer.do_import(file_import, dry_run=dry_run, **kwargs) + result = await self.importer.do_import(file_import, dry_run=dry_run, **kwargs) except ImportError as err: - raise + raise err except Exception as err: logger.exception(err) - raise ImportError(f'Unexpected import error: {err}') + raise ImportError(f'Unexpected import error (details in the Gisaf logs): {err}') - if isinstance(import_result, BasketImportResult): - result = import_result - else: - if import_result: - if isinstance(import_result, (tuple, list)): - assert len(import_result) >= 2, \ - 'do_import should return message or (message, details)' - result = BasketImportResult( - message=import_result[0], - details=import_result[1], - ) - if len(import_result) > 2: - data = import_result[2] - else: - result = BasketImportResult(message=import_result) - else: - result = BasketImportResult(message='Import successful.') + if not isinstance(result, BasketImportResult): + raise ImportError('Import error: the importer did not return a BasketImportResult') + # if import_result: + # if isinstance(import_result, (tuple, list)): + # assert len(import_result) >= 2, \ + # 'do_import should return message or (message, details)' + # result = BasketImportResult( + # message=import_result[0], + # details=import_result[1], + # ) + # if len(import_result) > 2: + # data = import_result[2] + # else: + # result = BasketImportResult(message=import_result) + # else: + # result = BasketImportResult(message='Import successful.') if dry_run: result.time = file_import.time - else: - if not result.time: - result.time = datetime.now() + if not dry_run: ## Save time stamp - await (await FileImport.get(file_import.id)).update(time=result.time).apply() - if return_data_info: - return result, data - else: - return result + async with db_session() as session: + file_import.time = result.time + session.add(file_import) + await session.commit() + return result - async def add_files(self, reader, request): + async def add_files(self, file: UploadFile, user: UserRead): """ File upload to basket. Typically called through an http POST view handler. @@ -184,37 +199,26 @@ class Basket: Note that the return dict has eventually numpy types and needs NumpyEncoder to be json.dump'ed. """ - raise ToMigrate("basket add_files reader was aiohttp's MultipartReader") ## TODO: multiple items ## TODO: check if file already exists - ## First part is the file - part = await reader.next() - assert part.name == 'file' - file_name = part.filename - + # assert part.name == 'file' ## Save file on filesystem - size = 0 - path = Path(self.base_dir) / file_name - + path = AsyncPath(self.base_dir) / file.filename ## Eventually create the directory - path.parent.mkdir(parents=True, exist_ok=True) - with path.open('wb') as f: - while True: - chunk = await part.read_chunk() # 8192 bytes by default. - if not chunk: - break - size += len(chunk) - f.write(chunk) + await path.parent.mkdir(parents=True, exist_ok=True) + async with path.open('wb') as f: + ## No way to use async to stream the file content to write it? + await f.write(await file.read()) ## Read other parts - parts = defaultdict(None) - while True: - part = await reader.next() - if not part: - break - value = (await part.read()).decode() - if value != 'null': - parts[part.name] = value + # parts = defaultdict(None) + # while True: + # part = await reader.next() + # if not part: + # break + # value = (await part.read()).decode() + # if value != 'null': + # parts[part.name] = value ## Find ids of project, surveyor, equipment if 'project' in parts: @@ -238,7 +242,7 @@ class Basket: else: store_type_name = None fileImportRecord = await FileImport( - name=file_name, + name=file.file_name, dir=parts.get('dir', '.'), basket=self.name, project_id=project_id, @@ -251,7 +255,7 @@ class Basket: admin_basket_file = AdminBasketFile( id=fileImportRecord.id, - name=file_name, + name=file.file_name, status=parts.get('status'), store=store_type_name, project=parts.get('project'), diff --git a/src/gisaf/importers.py b/src/gisaf/importers.py index e2b24d6..e5c83a7 100644 --- a/src/gisaf/importers.py +++ b/src/gisaf/importers.py @@ -15,8 +15,7 @@ from sqlalchemy.sql.schema import Column from gisaf.config import conf #from .models.admin import FileImport -from gisaf.models.admin import FeatureImportData -# from gisaf.models.graphql import BasketImportResult +from gisaf.models.admin import FeatureImportData, BasketImportResult from gisaf.models.raw_survey import RawSurveyModel from gisaf.models.survey import Surveyor, Accuracy, Equipment, AccuracyEquimentSurveyorMapping from gisaf.models.tags import Tags @@ -42,7 +41,7 @@ class Importer: """ basket = None - async def do_import(self, file_record, dry_run=False, **kwargs): + async def do_import(self, file_record, dry_run=False, **kwargs) -> BasketImportResult: """ Return: a BasketImportResult instance, or a message string, or a tuple or a list like (message, details for user feedback). diff --git a/src/gisaf/models/admin.py b/src/gisaf/models/admin.py index d6bfd38..e74b5b5 100644 --- a/src/gisaf/models/admin.py +++ b/src/gisaf/models/admin.py @@ -157,9 +157,9 @@ class Basket(BasketNameOnly): class BasketImportResult(BaseModel): - time: datetime + time: datetime = Field(default_factory=datetime.now) message: str - details: str + details: dict[str, str | int | float | bool] | None = None class AdminBasketFile(BaseModel): id: int diff --git a/src/gisaf/models/authentication.py b/src/gisaf/models/authentication.py index 13c3be6..b40d580 100644 --- a/src/gisaf/models/authentication.py +++ b/src/gisaf/models/authentication.py @@ -86,6 +86,16 @@ class UserRead(UserBase): email: str | None # type: ignore roles: list[RoleReadNoUsers] = [] + def can_view(self, model) -> bool: + role = getattr(model, 'viewable_role', None) + if role: + return self.has_role(role) + else: + return True + + def has_role(self, role: str) -> bool: + return role in (role.name for role in self.roles) + # class ACL(BaseModel): # user_id: int diff --git a/src/gisaf/utils.py b/src/gisaf/utils.py index 3273e83..525efaf 100644 --- a/src/gisaf/utils.py +++ b/src/gisaf/utils.py @@ -11,11 +11,10 @@ from numpy import ndarray import pandas as pd from sqlalchemy.dialects.postgresql import insert -from sqlalchemy.sql.expression import delete - -# from graphene import ObjectType +from sqlmodel import SQLModel, delete from gisaf.config import conf +from gisaf.database import db_session class ToMigrate(Exception): pass @@ -213,107 +212,142 @@ def atimeit(func): return helper -async def delete_df(df, model): +async def delete_df(df: pd.DataFrame, model: SQLModel): """ Delete all data in the model's table in the database that matches data in the pandas dataframe. """ - table = model.__table__ + if len(df) == 0: + return ids = df.reset_index()['id'].values - delete_stmt = delete(table).where(model.id.in_(ids)) - async with db.bind.raw_pool.acquire() as conn: - async with conn.transaction(): - await conn.execute(str(delete_stmt), *ids) + statement = delete(model).where(model.id.in_(ids)) + async with db_session() as session: + await session.exec(statement) + await session.commit() -async def upsert_df(df, model): - """ - Insert or update all data in the model's table in the database - that's present in the pandas dataframe. - Use postgres insert ... on conflict update... - with a series of inserts with with one row at a time. - For GeoDataFrame: the "geometry" column (df._geometry_column_name) is not honnored - (yet). It's the caller's responsibility to have a proper column name - (typically "geom" in Gisaf models) with a EWKT or EWKB representation of the geometry. - """ - ## See: https://stackoverflow.com/questions/33307250/postgresql-on-conflict-in-sqlalchemy +# async def upsert_df(df, model): +# """ +# Insert or update all data in the model's table in the database +# that's present in the pandas dataframe. +# Use postgres insert ... on conflict update... +# with a series of inserts with with one row at a time. +# For GeoDataFrame: the "geometry" column (df._geometry_column_name) is not honnored +# (yet). It's the caller's responsibility to have a proper column name +# (typically "geom" in Gisaf models) with a EWKT or EWKB representation of the geometry. +# """ +# ## See: https://stackoverflow.com/questions/33307250/postgresql-on-conflict-in-sqlalchemy +# if len(df) == 0: +# return df + +# table = model.__table__ + +# ## Generate the 'upsert' statement, using fake values but defining columns +# columns = {c.name for c in table.columns} +# values = {col: None for col in df.columns if col in columns} +# insrt_stmnt = insert(table).inline().values(values).returning(table.primary_key.columns) +# df_columns = set(df.columns) +# do_update_stmt = insrt_stmnt.on_conflict_do_update( +# constraint=table.primary_key, +# set_={ +# k.name: getattr(insrt_stmnt.excluded, k.name) +# for k in insrt_stmnt.excluded +# if k.name in df_columns and +# k.name not in [c.name for c in table.primary_key.columns] +# } +# ) +# ## Filter and reorder the df columns +# ## in order to match the order of columns in the insert statement +# df = df[[col for col in do_update_stmt.compile().positiontup +# if col in df_columns]].copy() + +# def convert_to_object(value): +# """ +# Quick (but slow) and dirty: clean up values (nan, nat) for inserting to postgres via asyncpg +# """ +# if isinstance(value, float) and isnan(value): +# return None +# elif pd.isna(value): +# return None +# else: +# return value + +# # def encode_geometry(geometry): +# # if not hasattr(geometry, '__geo_interface__'): +# # raise TypeError('{g} does not conform to ' +# # 'the geo interface'.format(g=geometry)) +# # shape = shapely.geometry.asShape(geometry) +# # return shapely.wkb.dumps(shape) + +# # def decode_geometry(wkb): +# # return shapely.wkb.loads(wkb) + +# ## pks: list of dicts of primary keys +# pks = {pk.name: [] for pk in table.primary_key.columns} +# async with db.bind.raw_pool.acquire() as conn: +# ## Set standard encoder for HSTORE, geometry +# await conn.set_builtin_type_codec('hstore', codec_name='pg_contrib.hstore') + +# #await conn.set_type_codec( +# # 'geometry', # also works for 'geography' +# # encoder=encode_geometry, +# # decoder=decode_geometry, +# # format='binary', +# #) +# #await conn.set_type_codec( +# # 'json', +# # encoder=json.dumps, +# # decoder=json.loads, +# # schema='pg_catalog' +# #) +# ## For a sequence of inserts: +# insrt_stmnt_single = await conn.prepare(str(do_update_stmt)) +# async with conn.transaction(): +# for row in df.itertuples(index=False): +# converted_row = [convert_to_object(v) for v in row] +# returned = await insrt_stmnt_single.fetch(*converted_row) +# for returned_single in returned: +# for pk, value in returned_single.items(): +# pks[pk].append(value) +# ## Return a copy of the original df, with actual DB columns, data and the primary keys +# for pk, values in pks.items(): +# df[pk] = values +# return df + +def postgres_upsert(table, conn, keys, data_iter): + # See https://stackoverflow.com/questions/61366664/how-to-upsert-pandas-dataframe-to-postgresql-table + # Comment by @HopefullyThisHelps + data = [dict(zip(keys, row)) for row in data_iter] + insert_statement = insert(table.table).values(data) + upsert_statement = insert_statement.on_conflict_do_update( + constraint=f"{table.table.name}_pkey", + set_={c.key: c for c in insert_statement.excluded}, + ) + conn.execute(upsert_statement) + +async def upsert_df(df: pd.DataFrame, model: SQLModel, chunksize: int = 1000): if len(df) == 0: return df - - table = model.__table__ - - ## Generate the 'upsert' statement, using fake values but defining columns - columns = {c.name for c in table.columns} - values = {col: None for col in df.columns if col in columns} - insrt_stmnt = insert(table, inline=True, values=values, returning=table.primary_key.columns) - df_columns = set(df.columns) - do_update_stmt = insrt_stmnt.on_conflict_do_update( - constraint=table.primary_key, - set_={ - k.name: getattr(insrt_stmnt.excluded, k.name) - for k in insrt_stmnt.excluded - if k.name in df_columns and - k.name not in [c.name for c in table.primary_key.columns] - } - ) - ## Filter and reorder the df columns - ## in order to match the order of columns in the insert statement - df = df[[col for col in do_update_stmt.compile().positiontup - if col in df_columns]].copy() - - def convert_to_object(value): - """ - Quick (but slow) and dirty: clean up values (nan, nat) for inserting to postgres via asyncpg - """ - if isinstance(value, float) and isnan(value): - return None - elif pd.isna(value): - return None - else: - return value - - # def encode_geometry(geometry): - # if not hasattr(geometry, '__geo_interface__'): - # raise TypeError('{g} does not conform to ' - # 'the geo interface'.format(g=geometry)) - # shape = shapely.geometry.asShape(geometry) - # return shapely.wkb.dumps(shape) - - # def decode_geometry(wkb): - # return shapely.wkb.loads(wkb) - - ## pks: list of dicts of primary keys - pks = {pk.name: [] for pk in table.primary_key.columns} - async with db.bind.raw_pool.acquire() as conn: - ## Set standard encoder for HSTORE, geometry - await conn.set_builtin_type_codec('hstore', codec_name='pg_contrib.hstore') - - #await conn.set_type_codec( - # 'geometry', # also works for 'geography' - # encoder=encode_geometry, - # decoder=decode_geometry, - # format='binary', - #) - #await conn.set_type_codec( - # 'json', - # encoder=json.dumps, - # decoder=json.loads, - # schema='pg_catalog' - #) - ## For a sequence of inserts: - insrt_stmnt_single = await conn.prepare(str(do_update_stmt)) - async with conn.transaction(): - for row in df.itertuples(index=False): - converted_row = [convert_to_object(v) for v in row] - returned = await insrt_stmnt_single.fetch(*converted_row) - for returned_single in returned: - for pk, value in returned_single.items(): - pks[pk].append(value) - ## Return a copy of the original df, with actual DB columns, data and the primary keys - for pk, values in pks.items(): - df[pk] = values - return df + from functools import partial + import concurrent.futures + import asyncio + loop = asyncio.get_running_loop() + with concurrent.futures.ProcessPoolExecutor() as pool: + await loop.run_in_executor( + pool, + partial( + df.to_sql, + model.__tablename__, + conf.db.get_pg_url(), # Cannot use sync_engine in run_in_executor + # because it's not pickable + schema=model.__table__.schema, # type: ignore + if_exists="append", + index=False, + method=postgres_upsert, + chunksize=chunksize, + ), + ) #async def upsert_df(df, model):