Inital commit
All checks were successful
/ build (push) Successful in 5s

This commit is contained in:
phil 2025-05-28 17:36:06 +02:00
commit 7c129cc81b
11 changed files with 1486 additions and 0 deletions

View file

@ -0,0 +1,10 @@
import importlib.metadata
try:
from dunamai import Version, Style
__version__ = Version.from_git().serialize(style=Style.SemVer, dirty=True)
except ImportError:
# __name__ can be used if the package name is the same
# as the directory. Otherwise, specify it explicitely.
__version__ = importlib.metadata.version(__name__)

View file

@ -0,0 +1,39 @@
from typing import Annotated
import logging
from fastapi import (
FastAPI,
Depends,
HTTPException,
status,
)
from fastapi.responses import FileResponse, HTMLResponse
from pydantic import BaseModel
from sqlalchemy.sql.base import ExecutableOption
from sqlalchemy.exc import NoResultFound
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from hatchet_sdk.runnables.types import EmptyModel
from knoc.settings import settings
from knoc.workflows import all_wfs
from knoc.workflows.utils import ImportReport
from knoc.db import engine, aio_sql_to_df, get_session
from knoc_plugin_template.workflows import SimpleOutput, do_nothing_wf
logger = logging.getLogger("knoc-plugin-template.api")
app = FastAPI()
@app.get("/")
async def template_api_home() -> HTMLResponse:
return HTMLResponse("<p>Plugin template home</p>")
@app.get("/run_wf")
async def template_api_execute_wf() -> dict[str, SimpleOutput]:
result = await do_nothing_wf.aio_run(EmptyModel())
return result

View file

@ -0,0 +1,24 @@
from hatchet_sdk.runnables.types import EmptyModel
import typer
from knoc.settings import settings
from knoc_plugin_template.workflows import do_nothing_wf, SimpleOutput
app = typer.Typer(no_args_is_help=True)
@app.command()
def do_nothing():
"""A CLI command that does very litle"""
typer.echo("Just a template of a CLI command.")
typer.echo(
f"The plugin has a setting, which is: {settings.plugin_template.setting}."
)
@app.command()
def run_wf():
"""A CLI command that runs a workflow"""
typer.echo("Just a template of a CLI command that runs a workflow.")
typer.echo("Requesting the workflow to tun...")
typer.echo(do_nothing_wf.run(EmptyModel()))

View file

@ -0,0 +1,5 @@
from pydantic import BaseModel
class PluginTemplateSettings(BaseModel):
setting: str | None = None

View file

@ -0,0 +1,24 @@
from datetime import datetime
from pydantic import BaseModel
from hatchet_sdk import Context, EmptyModel
from knoc_plugin_template import __version__
from knoc.settings import settings
from knoc.hatchet_client import hatchet
class SimpleOutput(BaseModel):
message: str
do_nothing_wf = hatchet.workflow(
name="Do-nothing-wf",
version=__version__,
)
@do_nothing_wf.task()
async def do_nothing(input: EmptyModel, ctx: Context) -> SimpleOutput:
ctx.log("Just a workflow template")
return SimpleOutput(message=f"Foo v{__version__} at {datetime.now()}")