Basic Workflow Function Types
Data sources
fs-source
This type list objects on a S3 bucket and outputs its content as a DataFrame.
- id: read
type: fs-source
format: list # (1)!
addr: 192.168.100.237:31318 # (4)!
user: minioadmin
password: password
topics: s3a://bucket/dir/subdir # (3)!
meta:
checkpoint: true # (2)!
out:
- process
- The
formatattribute must be present and set tolist. - Can optionnaly be added with a
truevalue (falseby default) to use a checkpoint to keep track of the files which have been listed. - Bucket whose content is listed.
- Address, user, password of the minio s3 server
Each found object is output in a Nats subject as an avro record with the following schema :
{
"type": "record",
"name": "s3list",
"namespace": "tech.athea.kosmos",
"fields" : [
{"name": "url", "type": "string"}, // (1)!
{"name": "lastModifiedUnixMilli", "type": "long"},
{"name": "bucket", "type": "string"},
{"name": "key", "type": "string"}
]
}
- The
urlis output in the formats3a://bucket/key
raw-source
This type launches the given image in a Kubernetes Deployment with the intention that this image will generate events. The python Runtime provides a way to generate events. The following Yaml is an illustration of an usage of this type.
- id: generate-some-data
type: raw-source
image: registry.artemis.public/basic-python:dev # (9)!
addr: host.docker.internal:9000
partitionBy: key # (1)!
schema: | # (2)!
{
"type": "record",
"name": "raw-source",
"namespace": "tech.athea.kosmos",
"fields" : [
{"name": "url", "type": "string"},
{"name": "lastModifiedUnixMilli", "type": "long"},
{"name": "bucket", "type": "string"},
{"name": "key", "type": "string"}
]
}
meta: # (3)!
rate: 1000 # (12)!
nats.conn.options: # (11)!
flush_timeout: 10
connect_timeout: 1000
allow_reconnect: true
nats.js.options: # (10)!
timeout: 10
name: data_generator # (13)!
code: |
def data_generator() -> list[dict[str, Any]]:
from time import time
from faker import Faker
faker = Faker()
event = [{
"url": f"http://host.docker.internal:9000",
"lastModifiedUnixMilli": round(time() * 1000),
"bucket": "mybucket",
"key": f"{''.join(faker.name().split())}.csv",
},{
"url": f"http://host.docker.internal:9000",
"lastModifiedUnixMilli": round(time() * 1000),
"bucket": "mybucket",
"key": f"{''.join(faker.name().split())}.csv",
}]
# DO SOMETHING IF NEEDED:
# i.e. generate random file and upload to s3 based on published event
return event
schema: |
{
"type": "record",
"name": "s3list",
"namespace": "tech.athea.kosmos",
"fields" : [
{"name": "url", "type": "string"},
{"name": "lastModifiedUnixMilli", "type": "long"},
{"name": "bucket", "type": "string"},
{"name": "key", "type": "string"}
]
}
container:
cpuRequest: 200m # (4)!
memoryRequest: 100M # (5)!
cpuLimit: 500m # (6)!
memoryLimit: 500M # (7)!
env:
- KFLOW_EVENT_SIMULATOR: "true" # (8)!
out:
- raw-sink # (14)!
- This attribute is optional. If given, the container is expected to output Avro records partitioned by the key given
- Avro schema of output data
metaand each of its attributes is optional. If a value is given, it will replace default values for container creation. Values have the syntax expected by Kubernetes.- 250m by default
- 64Mi by default
- No limit by default
- No limit by default
- Environment variables to pass to the python Runtime to use the event generator
- Image to run. The
imagePullPolicycomes from the global config. - This attribute can also be set with
nats_js_optionsenvironment variable and a serialized json. Its allowable values are the same as thejs_optsoption of theNatsSink.buildpython method. - This attribute can also be set with
nats_conn_optionsenvironment variable and a serialized json. Its allowable values are the same as theconn_optsoption of theNatsSink.buildpython method. - The default value is 5000.
rateis the minimum delay in milliseconds between two generated events. - Name of the function to call in the python code. In this example, the function code is given in the
codeattribute. - When using raw-source with the python Runtime, the output of this task should be a sink task. Thus the pipeline has only two tasks.
nats-source
This type is used internally by kflow. It executes the image given in the global config with execution parameters given in meta. This image should read from a NATS stream and output Avro records.
- id: read
type: nats-source
addr: 192.168.100.237:32657
topics: kflow-global.error
groupId: mygroup # (9)!
schema: | # (7)!
{
"type": "record",
"name": "raw-source",
"namespace": "tech.athea.kosmos",
"fields" : [
{"name": "url", "type": "string"},
{"name": "lastModifiedUnixMilli", "type": "long"},
{"name": "bucket", "type": "string"},
{"name": "key", "type": "string"}
]
}
meta: # (3)!
max.poll.interval.ms: 10 # (8)!
container:
cpuRequest: 200m # (4)!
memoryRequest: 100M # (5)!
cpuLimit: 500m # (6)!
memoryLimit: 500M # (2)!
env:
- ENV_VARIABLE: "value" # (1)!
out:
- process
- Environment variables to pass to the container
- No limit by default
metaand each of its attributes is optional. If a value is given, it will replace default values for container creation. Values have the syntax expected by Kubernetes.- 250m by default
- 64Mi by default
- No limit by default
- Avro schema of output data
- Maximum number of records to read in a single poll. Default is 10 for Java Runtime, 120000 for Python Runtime
- Group Id of Nats source
Data transformation
raw
The raw function type excutes a container with the given image. The container should read Avro record and output Avro records. To ease development, use the Java or Python runtime images which abstract away from the Avro serialization and internal Nats configuration.
- id: process
type: raw
image: kast-registry.kast.wip:5000/mycontainer:latest # (1)!
imagePullPolicy: Always # (2)!
schema: | # (3)!
{
"type": "record",
"name": "raw-output",
"namespace": "tech.athea.kosmos",
"fields" : [
{"name": "url", "type": "string"},
{"name": "lastModifiedUnixMilli", "type": "long"},
{"name": "bucket", "type": "string"},
{"name": "key", "type": "string"}
]
}
partitionBy: key # (11)!
globalConfig: # (10)!
store:
addr: xx
user: yy
password: zz
sideStream:
enabled: true # (12)!
addr: xx
errorTopics: "topic1,topic2"
metrics:
enabled: true
port: 9002
meta: # (4)!
container:
cpuRequest: 200m # (5)!
memoryRequest: 100M # (6)!
cpuLimit: 500m # (7)!
memoryLimit: 500M # (8)!
env:
- ENV_VARIABLE: "value" # (9)!
out:
- write
- image to use in the registry.
- The
imagePullPolicyfield is used to specify the policy for pulling the container image. The default value isIfNotPresent. - Avro schema of output data
metaand each of its attributes is optional. If a value is given, it will replace default values for container creation. Values have the syntax expected by Kubernetes.- No limit by default
- 250m by default
- 64Mi by default
- No limit by default
- Environment variables to pass to the container. The special environment variable
KFLOW_FUNCwith valueprintcan be used with the java runtime image to print the content of the record. - Use this to override the global configuration for the store, sideStream and metrics
- If a partition by key is given, then the container is deployed as a stateful set with the number of replicas given by the number of partitions.
- If
sideStreamis enabled, the container will be able to send error messages to the nats server given in thesideStreamattribute of the global configuration.
If you wan't to implement several processing functions in the same container, it is advisable to use information coming from the environment to select which function to process. See Java Runtime or Python Runtime for more information.
Data sinks
nats-sink
Used internally by KFlow. Write data to a nats stream. The data is written as binary serialized Avro records.
- id: write
type: nats-sink
addr: 192.168.100.237:32658 # (1)!
topics: topic # (3)!
partitionBy: key # (5)!
partition: 3 # (4)!
schema: | # (2)!
{
"type": "record",
"name": "s3-sink",
"namespace": "tech.athea.kosmos",
"fields" : [
{"name": "key", "type": "string"}
{"name": "b", "type": "long"}
]
}
- Address of Nats server
- Avro schema of data. If not specified, Avro schema of the parent task is used.
- The plural
topicssuggests that multiple topics can be used. This is not the case in the current version of KFlow and only one topic can be used. - If
partitionByis specified, this value gives the number of sub topics where data is written. The value of the key is hashed and the modulopartitionof the hash is used to determine the sub topic name : "-kpart" followed by a number - Optional. Use if you want to partition data along values of this key which should be present in the schema
s3-sink
Write data to a S3 bucket. The data is written in a file as a JSON serialization of Avro record (one record per line)
- id: write
type: s3-sink
addr: 192.168.100.237:32658 # (1)!
user: minioadmin
password: minioadmin
topics: s3a://bucketname # (2)!
schema: | # (3)!
{
"type": "record",
"name": "s3-sink",
"namespace": "tech.athea.kosmos",
"fields" : [
{"name": "a", "type": "string"}
{"name": "b", "type": "long"}
]
}
- Address, user, password of the Minio S3 where data will be written
- Bucket name of the bucket where data will be written. Data has the form part-hostname-uuid.json, where hostname is the host name of the Kubernetes node, and uuid is a unique identifier.
- Avro schema of data. If not specified, Avro schema of the parent task is used.
postgresql-sink, sql-sink
Write data to a PostgreSQL table. For Java runtime, each Avro record field become a column in the table. For Python runtime, the record should have a schema with bucket and key fields which will be used to download a file from S3 and use it as an input for PostgreSQL COPY command.
- id: write
type: postgresql-sink # (4)!
addr: 192.168.100.237:32658 # (1)!
user: postgres
password: paswword
topics: topic # (3)!
schema: | # (2)!
key STRING,
b LONG
- Address, user, password of PostgreSQL server
- SQL Schema of the table. column names which match avro field names will be filled with data. Watch out for case sensitivity !
- Name of the table where data will be written. Will be created if it does not exist. Watch out for case sensitivity !
- Even if
sql-sinkis also allowed, you should usepostgresql-sinkfor clarity.
elastic-sink
Write data to an ElasticSearch or OpenSearch index. Each Avro record is serialized as a Json document indexed in ElasticSearch. Insertion errors will be logged but won't stop the workflow.
- id: write
type: elastic-sink
addr: 192.168.100.237:32658 # (1)!
user: postgres
password: paswword
topics: topic # (3)!
meta:
jsonPayloadField: key # (4)!
bulk.enabled: false # (2)!
- Address, user, password of ElasticSearch server
- If
bulk.enabledis set totrue, the sink will use the bulk API of ElasticSearch. This is more efficient for large datasets. The default value if not specified isfalse. - Name of the index where data will be written.
- If specified, then only this field will be indexed in ElasticSearch. If not specified, the whole record will be indexed.
opensearch-sink
Available in Python Runtime. Write data to an OpenSearch index. Each Avro record is serialized as a Json document indexed in ElasticSearch. Insertion errors will be logged but won't stop the workflow.
- id: write
type: elastic-sink
addr: 192.168.100.237:32658 # (1)!
user: postgres
password: paswword
topics: topic # (3)!
meta:
bulk.enabled: false # (2)!
bulk.flush.size: 10000
topics.suffix.datefmt: ""
topics.suffix.sep: ""
- Address, user, password of OpenSearch server
- If
bulk.enabledis set totrue, the sink will use the bulk API of ElasticSearch. This is more efficient for large datasets. The default value if not specified isfalse. - Prefix of the index where data will be written. The index is the concatenation of this prefix, the separator given in
topics.suffix.sep, the current date in the format given intopics.suffix.datefmt.