Python Basic Runtime
Introduction
The Python Basic Runtime allows you to develop functions in Python that can then be used in a KFlow Workflow. These functions are encapsulated in an image which is then run in a Pod by the KFlow Runtime, either in batch mode or stream mode, depending on the configuration of the workflow.
In order to use the Python Basic Runtime you need to :
- Get the Python dependency for IDEs.
- Derive the
ProcessFunctionclass. - Assign it to a
BasicRuntimeobject & call theBasicRuntime#runmethod. - Create a Docker image which runs your main Python file.
- Use the image in a KFlow Workflow.
A demonstration python project is provided with Kflow, and excerpts from it will be used thereafter.
I - Get the Python dependency for IDEs
The following has to be done only once. The first step is not necessary with the demo python project, because kfbasicpy.tar.gz is already provided in dist folder.
# extract the dependencies from the runtime image
cat > Dockerfile.kf-basicpy-dist <<EOF
FROM kflow-basic-python-runtime:dev AS runtime
FROM scratch AS dependencies
COPY --from=runtime /dist/kfbasicpy.tar.gz .
EOF
docker buildx build --output=dist --target=dependencies . -f Dockerfile.kf-basicpy-dist
rm Dockerfile.kf-basicpy-dist
# make APIs visible to IDEs
## install essential
pip install dist/kfbasicpy.tar.gz
## install with ner dependencies
pip install "dist/kfbasicpy.tar.gz[ner]"
## install with ner and extract dependencies
pip install "dist/kfbasicpy.tar.gz[ner,extract]"
II - Derive the class ProcessFunction
In order to create a ProcessFunction you need to derive the class with the same name. When doing so you'll have a few methods to implements :
open(self, OpenContext): This method will be called once when the application is starting, use it to create client for example.- The
OpenContexthas three functions :get_meta, returns the configuration provided to your task in the keymetain the Pipeline.upload(self: Self, file_name: str, bucket: str, key: str, **kwargs: str) -> booluploads a document to S3 (including user metadatas if needed).download(self: Self, bucket: str, key: str) -> DownloadResultdownloads a file from S3 (including user metadatas if any exists) and returns its local path and metadata.
- The
process(self, event_dict: dict[str, Any], ctx: Context) -> dict[str, Any] | list[dict[str, Any]]: This method is the processing method, that will be applied to each data received.- The
event_dictis the dictionary corresponding to input data - The
Contextallows you to get :- The local path of the file that was auto downloaded via
get_downloader_local_object_url. - The user Metadatas of the file that was auto downloaded via
get_property. - A method to request manual acknowledgement of inputs and an associted method
flush(self, *paths:str)to delete the local files provided.
- The local path of the file that was auto downloaded via
- If you want to send data to an output, just
returnyour data as an Array of dictionaries or a single dictionary. The keys should be those of the specified output schema in the worflow configuration.
- The
close(): This method will be called when the application is stopping, use it to close any connection or client.
The demo project contains a ProcessCSVFunction class that reads each CSV file provided as input (autoDownload must be set to true in the globalConfig of the task using this function. See autoDownload documentation) and outputs only the first two columns, renamed "a" and "b".
import logging
from typing import Any
from kfbasicpy.function.context import Context
from kfbasicpy.function.function import ProcessFunction
from kfbasicpy.function.open_context import OpenContext
from kfbasicpy.runtime.basic import BasicRuntime
import csv
logger = logging.getLogger(__name__)
class ProcessCSVFunction(ProcessFunction):
def open(self, ctx: OpenContext) -> None:
"""called once on startup"""
logger.info("opened")
def process(self, event_dict: dict[str, Any], ctx: Context) -> dict[str, Any] | list[dict[str, Any]]:
"""process an input event
- The received dict abides to the schema defined in the previous task
- The output dict should abides to the schema defined in either the
- process function task
- or the next function receiving this process function result as input.
in this example, the dict expects to contain as output two fields named
url(type=string) and size(type=long)
"""
logger.info(f"event = {event_dict}, context = {ctx}")
fileToProcessLocalPath = ctx.get_downloader_local_object_url()
eventList: list[dict[str, Any]] = list()
with open(fileToProcessLocalPath, newline='') as csvFile:
reader = csv.reader(csvFile, delimiter=',')
for row in reader:
eventList.append({"a": row[0], "b": row[1]}) # obviously, some error handling should be added here
return eventList
Use the following Avro schema in your specification:
{
"namespace": "myns",
"type": "record",
"name": "myname",
"fields": [
{"name": "a", "type": "string"},
{"name": "b", "type": "string"}
]
}
III - Assign your function to a runtime
This example shows how to set the logging level with an environment varaiable. You could also use an environment variable to select the function to run.
from kfbasicpy.runtime.basic import BasicRuntime
from kfbasicpy.function.forward_function import ForwardProcessFunction
from kfbasicpy.function.print_function import PrintProcessFunction
from kfbasicpy.function.txtextract_function import TxtExtractFunction
from os import getenv
from function.process_csv import ProcessCSVFunction
if __name__ == "__main__":
import logging
import sys
logging.basicConfig(
format='[%(name)s]: (%(asctime)s) => %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p',
level=logging.getLevelName(getenv("KFLOW_RUNTIME_LOG_LEVEL", "INFO")),
stream=sys.stdout
)
runtime = BasicRuntime()
runtime.add_process_function(ProcessCSVFunction())
runtime.run()
Notice that you can add multiple ProcessFunction. Each will be called with the same input and their results will be concatenated in an unspecified order.
IV - Create a Docker image
You need to create a docker image that will run your Python.
FROM kflow-basic-python-runtime:dev
# to install ner,extract additional dependencies
RUN pip install "kfbasicpy.tar.gz[ner,extract]"
# install your dependencies
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt
# include your python code
COPY src src
ENV KFLOW_RUNTIME_LOG_LEVEL=INFO
# Add additional environment variables if needed, for instance to select the default function to run
ENV =forward
ENTRYPOINT ["python", "-u" "src/main.py"]
Then build the image and send it to minikube registry :
docker image build -t myregistry.example.com/project/process-function-python:1.0 .
docker image push myregistry.example.com/project/process-function-python:1.0 .
V - Use the image in a KFlow Workflow
The following workflow reads CSV files in docs bucket, processes them with the above ProcessCSVFunction and writes the result in the testjson bucket. Notice the image used in the demo task.
parallelism: 1
globalConfig:
store:
autoDownload: false
addr: minio.kosmos.svc.cluster.local:9000
user: minioadmin
password: minioadmin
mode: stream
dag:
- id: list-s3-bucket
type: fs-source
format: list
addr: minio.kosmos.svc.cluster.local:9000
user: minioadmin
password: minioadmin
topics: s3a://docs
meta:
checkpoint: true
schema: |
{
"type": "record",
"name": "s3list",
"namespace": "tech.athea.kosmos",
"fields" : [
{"name": "url", "type": "string"},
{"name": "lastModifiedUnixMilli", "type": "long"},
{"name": "bucket", "type": "string"},
{"name": "key", "type": "string"}
]
}
out:
- demo
- id: demo
type: raw
image: myregistry.example.com/project/process-function-python:1.0
imagePullPolicy: IfNotPresent
globalConfig:
store:
autoDownload: true # should not be set globally else s3-sink will expect an input schema like the output schema of list-s3-bucket
schema: |
{
"type": "record",
"name": "necessaryButNotUsed",
"namespace": "alsoNecessaryButNotUsed",
"fields" : [
{"name": "a", "type": "string"},
{"name": "b", "type": "string"}
]
}
meta:
container:
env:
- name: KFLOW_RUNTIME_LOG_LEVEL
value: DEBUG
- name: PYTHONUNBUFFERED
value: "1"
out:
- s3
- id: s3
type: s3-sink
image: myregistry.example.com/basicjavaruntime:latest
imagePullPolicy: Never
addr: minio.kosmos.svc.cluster.local:9000
user: minioadmin
password: minioadmin
# bucket to write into
topics: s3a://testjson
meta:
bulk.enabled: true
bulk.flush.timer: 1000
bulk.flush.size: 1000
container:
env:
- name: KFLOW_RUNTIME_LOG_LEVEL
value: TRACE
schema: |
{
"type": "record",
"name": "necessaryButNotUsed",
"namespace": "alsoNecessaryButNotUsed",
"fields" : [
{"name": "a", "type": "string"},
{"name": "b", "type": "string"}
]
}
Advanced Configurations
Python inlining
For simple computations, the default Runtime allows you to inline the process function in the topology. Here is an example :
- id: name-of-function
type: raw
image: hosted-registry.technique.artemis/basicpythonruntime:1.5.1
imagePullPolicy: IfNotPresent
schema: |
{
"type": "record",
"name": "fakevalidator",
"namespace": "tech.athea",
"fields": [
{ "name": "rasterPath", "type": "string" },
{ "name": "detector", "type": "string" },
{ "name": "geoFence", "type": "string" },
{ "name": "traceId", "type": "string" },
{ "name": "additionalDataPath", "type": "string" },
{ "name": "acquisitionTimeStamp", "type": "long" },
{ "name": "ingestionTimeStamp", "type": "long" },
{ "name": "processingStartTimeStamp", "type": "long" }
]
}
meta:
name: process
code: |-
from kfbasicpy.function.context import Context
def process(ctx: Context, event: dict[str, Any]) -> dict[str, Any]:
from uuid import uuid4
from datetime import datetime, timedelta
record = {
"rasterPath": f"{event['key']}",
"detector": "aircraft",
"geoFence": "",
"traceId": f"{uuid4()}",
"additionalDataPath": "",
"acquisitionTimeStamp": int((datetime.now() - timedelta(days=10)).timestamp() * 1000),
"ingestionTimeStamp": int((datetime.now() - timedelta(hours=2)).timestamp() * 1000),
"processingStartTimeStamp": int((datetime.now() - timedelta(minutes=30)).timestamp() * 1000),
}
print(record)
return record
out:
- name-of-output-function
Note that meta is using the attribute code and that the attribute name is the name of the function to call. Unless any good reason exists, it is recommended to name the function process.
GPU configuration
> configure nvidia on host
https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/latest/install-guide.html
> troubleshoot
Failed to initialize NVML: Driver/library version mismatch
NVML library version: 535.183
==> this is due to host system auto updating libcu.so generating a version conflict at OS-level. Rebooting the host system fix the issue.
==> to avoid this on happening, disable auto updates on nvidia-drivers then reboot system
export VERSION=535
sudo apt-mark hold nvidia-dkms-${VERSION}
sudo apt-mark hold nvidia-driver-${VERSION}
sudo apt-mark hold nvidia-utils-${VERSION}
In your Dockerfile, replace the FROM line with the following:
FROM kflow-basic-python-runtime.gpu:dev