Java Basic Runtime
Introduction
The Java Basic Runtime allows you to develop functions in Java that can then be used in a KFlow Workflow. These functions are encapsulated in an image which is then run in a Pod by the KFlow Runtime, either in batch mode or stream mode, depending on the configuration of the workflow.
In order to use the Java Basic Runtime you need to :
- Get the
kflow-basic-java-runtimedependency. - Implement the interface
ProcessFunction. - Assign the implementation to a
BasicJavaRuntimeobject & call theBasicJavaRuntime#runmethod. - Create a "FatJar" (Jar which contains all the necessary dependencies for the application to work properly).
- Create a Docker image which runs this JAR.
- Use the image in a KFlow Workflow.
A demonstration Java project is provided with Kflow, and excerpts from it will be used thereafter.
I - Get the SDK dependency
In order to retrieve the dependency you need to configure your dependency manager to retrieve the tech.athea.kosmos:kflow-basic-java-runtime:{version}.dependency.
For example, using maven the configuration would be :
[...]
<dependency>
<groupId>tech.athea.kosmos</groupId>
<artifactId>kflow-basic-java-runtime</artifactId>
<version>{version}</version>
</dependency>
[...]
In the example, the pom.xml is configured to get this dependency locally.
[...]
<repositories>
<repository>
<id>in-project</id>
<name>In Project Repo</name>
<url>file://${project.basedir}/libs</url>
</repository>
</repositories>
[...]
II - Implement the interface ProcessFunction
Interface
In order to create a ProcessFunction you need to implements the interface with the same name. When doing so you'll have a few methods to implements :
open(String, OpenContext): This method will be called once when the application is starting, use it to create client for example.- The
Stringwill be the Avro schema used by the input data. - The
OpenContextwill contains :- The
Meta, which is the configuration provided to your task in the keymetain the Pipeline. - The
KFlowBlobHandlerwhich allow you to :- upload a document to S3 (including user metadatas if needed).
- download a file from S3 (including user metadatas if any exists).
- This element is null when
autoDownloadis false.
- The
- The
invoke(GenericData.Record, Context): This method is the processing method, that will be applied to each record received.- The
GenericData.Recordcorrespond to the input data. - The
Contextcontains :- The local path of the file that was auto downloaded.
- The user Metadatas of the file that was auto downloaded.
- A method to request manual acknowledgement of inputs. (cf.
Manual Acknowledgementfurther down) - A
flush(Path)method to delete the local files provided. This should be used inManual Acknowledgementmode, as auto downloaded files are no longer automatically deleted. - 2 methods relative to the sidestreams :
getNewRecord(String): By providing a sidestream ID (same as in the topology) you get a new record with the correct schema.toSideStream(String, GenericData.Record): send the record to the sidestream whose ID you provided.
- If you want to send data to an output :
- If you want to send Data to the main output, just
returnyour records as an Array.- If you want to send nothing, just
returnan empty Array ornull.
- If you want to send nothing, just
- If you want to send data to the automatic error output see
SideStream to catch errorsfurther down.
- If you want to send Data to the main output, just
- The
schemaIn(): When the input schema is not provided in the pipeline, it will be retrieved from this method. This schema will be used to deserialize the input data.- Do note that this method is called before
open(String, OpenContext)
- Do note that this method is called before
schemaOut(): This method is not currently used, it should return the Avro schema used to create your data.close(): This method will be called when the application is stopping, use it to close any connection or client.
The demo project contains a ProcessCSVFunction class that reads each CSV file provided as input (autoDownload must be set to true in the globalConfig of the task using this function. See autoDownload documentation) and outputs only the first two columns, renamed "a" and "b".
package tech.athea.function;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import de.siegmar.fastcsv.reader.CommentStrategy;
import de.siegmar.fastcsv.reader.CsvReader;
import de.siegmar.fastcsv.reader.CsvRecord;
import tech.athea.kosmos.function.Context;
import tech.athea.kosmos.function.OpenContext;
import tech.athea.kosmos.function.ProcessFunction;
public class ProcessCSVFunction implements ProcessFunction {
private Schema schema = null;
/**
* Call one time at start.
*
* Usefull to initialize resources or other (db connection, etc...)
*
* @param schemaIn The avro string schema of input message (could be null)
* @param ctx A context that can contain optional configuration, handlers
* to manipulate blob file, etc...
*/
@Override
public void open(final String schemaIn, final OpenContext ctx) throws Exception {
this.schema = SchemaBuilder.record("myname")
.namespace("myns")
.fields()
.requiredString("a")
.requiredString("b")
.endRecord();
};
/**
* Call for each msg batch.
*
* @param record The in record
* @param ctx An optional context that can contain optional configurations,
* handlers, etc...
* @return A records batch to forward
*/
@Override
public Record[] invoke(Record record, Context ctx) throws Exception {
// Get file to process from context if autodownload requested
final String fileToProcessLocalPath = ctx.getDownloaderLocalObjectUrl();
// Select only the two first column
final File f = new File(fileToProcessLocalPath);
final List<GenericData.Record> outRecords = new ArrayList<>();
try (final CsvReader<CsvRecord> csv = CsvReader.builder().commentStrategy(CommentStrategy.SKIP)
.ofCsvRecord(f.toPath())) {
csv.forEach(new Consumer<CsvRecord>() {
@Override
public void accept(CsvRecord csvRec) {
final GenericData.Record outRecord = new GenericData.Record(schema);
outRecord.put("a", csvRec.getField(0));
outRecord.put("b", csvRec.getField(1));
outRecords.add(outRecord);
}
});
}
return outRecords.toArray(new GenericData.Record[0]);
}
@Override
public String schemaOut() {
return this.schema.toString();
}
}
Use the following Avro schema in your specification:
{
"namespace": "myns",
"type": "record",
"name": "myname",
"fields": [
{"name": "a", "type": "string"},
{"name": "b", "type": "string"}
]
}
III - Assign the ProcessCsvFunction to a runtime
package tech.athea;
import tech.athea.function.ProcessCSVFunction;
import tech.athea.kosmos.runtime.BasicJavaRuntime;
public class App {
public static void main(final String[] args) throws Exception {
final BasicJavaRuntime runtime = new BasicJavaRuntime();
runtime.addProcessFunction(new ProcessCSVFunction());
runtime.run();
}
}
IV - Create a "FatJar" for your application
The creation of a "FatJar" is specific to each dependency management tool, the following example is for Maven. When running the mvn package command, maven assembly will create both a "normal" Jar, and a "FatJar" which will be far larger but can function as a stand alone Jar.
[...]
<build>
<resources>
<resource>
<directory>${project.basedir}/src/main/resources</directory>
<excludes>
<exclude>config.yaml</exclude>
<exclude>topology.yaml</exclude>
<exclude>log4j2.xml</exclude>
</excludes>
</resource>
</resources>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<finalName>my-java-runtime</finalName>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- bind to the packaging phase -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
[...]
V - Create a Docker image for your function
You need to create a docker image that will run your Jar, the following example use a multi-stage build to this end, the following example consider that you already ran mvn package outside of the docker build. (Be careful as it means you need to manually run mvn package before very docker build command).
Multi stage build
Note : you need to adapt the docker file to your specific application :
- The line
COPY --from=builder /usr/src/app/target/my-java-runtime-jar-with-dependencies.jar /usr/app/my-java-runtime-jar-with-dependencies.jarneed to be adapted to the name of the Jar you are producing, which depend on your project name. - The line
ENTRYPOINT ["java", "-XX:MaxRAMPercentage=80.0", "-Dlog4j.configurationFile=file:///usr/app/log4j2.xml", "-cp", "/usr/app/my-java-runtime-jar-with-dependencies.jar", "tech.athea.App"]needs at least 2 changes :- You need, as previously, to use the correct name for your Jar file. (
my-java-runtime-jar-with-dependencies.jar) - You need to target the main class of your application (name & package). (
tech.athea.App)
- You need, as previously, to use the correct name for your Jar file. (
FROM maven:3.9.6-eclipse-temurin-17 as builder
COPY src /usr/src/app/src
COPY libs /usr/src/app/libs
COPY pom.xml /usr/src/app
RUN mvn -f /usr/src/app/pom.xml clean package
FROM eclipse-temurin:17
COPY /usr/src/app/src/main/resources/*.xml /usr/app/
COPY /usr/src/app/target/my-java-runtime-jar-with-dependencies.jar /usr/app/my-java-runtime-jar-with-dependencies.jar
RUN addgroup kosmos
RUN useradd -g kosmos kosmos
USER kosmos
ENTRYPOINT ["java", "-XX:MaxRAMPercentage=80.0", "-Dlog4j.configurationFile=file:///usr/app/log4j2.xml", "-cp", "/usr/app/my-java-runtime-jar-with-dependencies.jar", "tech.athea.App"]
Advanced Configurations
Interface ProcessFunction
public interface ProcessFunction {
/**
* Call one time at start
* Usefull to initialize resources or other (db connection, etc...)
* @param schemaIn The avro string schema of input message (could be null)
* @param ctx A context that can contain optional configuration, handlers to manipulate blob file, etc...
* @param
*/
default void open(final String schemaIn, final OpenContext ctx) throws Exception {};
/**
* Call for each msg batch
* @param record The in record
* @param ctx An optional context that can contain optional configurations, handlers, etc...
* @return A records batch to forward
*/
GenericData.Record[] invoke(final GenericData.Record record, final Context ctx) throws Exception;
/**
* Return the produced data schema (avro schema String)
* @return The schema (avro schema String)
*/
default String schemaOut() {
return null;
};
/**
* Return the expected data schema (avro schema String)
* @return The schema (avro schema String)
*/
default String schemaIn() {
return null;
};
/**
* Call one time at the end, close...
* Usefull to clean resources or other (db connection, etc...)
*/
default void close() {};
}