Aller au contenu principal

Java Basic Runtime

Introduction

The Java Basic Runtime allows you to develop functions in Java that can then be used in a KFlow Workflow. These functions are encapsulated in an image which is then run in a Pod by the KFlow Runtime, either in batch mode or stream mode, depending on the configuration of the workflow.

In order to use the Java Basic Runtime you need to :

  1. Get the kflow-basic-java-runtime dependency.
  2. Implement the interface ProcessFunction.
  3. Assign the implementation to a BasicJavaRuntime object & call the BasicJavaRuntime#run method.
  4. Create a "FatJar" (Jar which contains all the necessary dependencies for the application to work properly).
  5. Create a Docker image which runs this JAR.
  6. Use the image in a KFlow Workflow.

A demonstration Java project is provided with Kflow, and excerpts from it will be used thereafter.

I - Get the SDK dependency

In order to retrieve the dependency you need to configure your dependency manager to retrieve the tech.athea.kosmos:kflow-basic-java-runtime:{version}.dependency.

For example, using maven the configuration would be :

[...]
<dependency>
<groupId>tech.athea.kosmos</groupId>
<artifactId>kflow-basic-java-runtime</artifactId>
<version>{version}</version>
</dependency>
[...]

In the example, the pom.xml is configured to get this dependency locally.

[...]
<repositories>
<repository>
<id>in-project</id>
<name>In Project Repo</name>
<url>file://${project.basedir}/libs</url>
</repository>
</repositories>
[...]

II - Implement the interface ProcessFunction

Interface

In order to create a ProcessFunction you need to implements the interface with the same name. When doing so you'll have a few methods to implements :

  • open(String, OpenContext) : This method will be called once when the application is starting, use it to create client for example.
    • The String will be the Avro schema used by the input data.
    • The OpenContext will contains :
      • The Meta, which is the configuration provided to your task in the key meta in the Pipeline.
      • The KFlowBlobHandler which allow you to :
        • upload a document to S3 (including user metadatas if needed).
        • download a file from S3 (including user metadatas if any exists).
        • This element is null when autoDownload is false.
  • invoke(GenericData.Record, Context) : This method is the processing method, that will be applied to each record received.
    • The GenericData.Record correspond to the input data.
    • The Context contains :
      • The local path of the file that was auto downloaded.
      • The user Metadatas of the file that was auto downloaded.
      • A method to request manual acknowledgement of inputs. (cf. Manual Acknowledgement further down)
      • A flush(Path) method to delete the local files provided. This should be used in Manual Acknowledgement mode, as auto downloaded files are no longer automatically deleted.
      • 2 methods relative to the sidestreams :
        • getNewRecord(String) : By providing a sidestream ID (same as in the topology) you get a new record with the correct schema.
        • toSideStream(String, GenericData.Record) : send the record to the sidestream whose ID you provided.
    • If you want to send data to an output :
      • If you want to send Data to the main output, just return your records as an Array.
        • If you want to send nothing, just return an empty Array or null.
      • If you want to send data to the automatic error output see SideStream to catch errors further down.
  • schemaIn() : When the input schema is not provided in the pipeline, it will be retrieved from this method. This schema will be used to deserialize the input data.
    • Do note that this method is called before open(String, OpenContext)
  • schemaOut() : This method is not currently used, it should return the Avro schema used to create your data.
  • close() : This method will be called when the application is stopping, use it to close any connection or client.

The demo project contains a ProcessCSVFunction class that reads each CSV file provided as input (autoDownload must be set to true in the globalConfig of the task using this function. See autoDownload documentation) and outputs only the first two columns, renamed "a" and "b".

package tech.athea.function;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;

import de.siegmar.fastcsv.reader.CommentStrategy;
import de.siegmar.fastcsv.reader.CsvReader;
import de.siegmar.fastcsv.reader.CsvRecord;
import tech.athea.kosmos.function.Context;
import tech.athea.kosmos.function.OpenContext;
import tech.athea.kosmos.function.ProcessFunction;

public class ProcessCSVFunction implements ProcessFunction {
private Schema schema = null;

/**
* Call one time at start.
*
* Usefull to initialize resources or other (db connection, etc...)
*
* @param schemaIn The avro string schema of input message (could be null)
* @param ctx A context that can contain optional configuration, handlers
* to manipulate blob file, etc...
*/
@Override
public void open(final String schemaIn, final OpenContext ctx) throws Exception {
this.schema = SchemaBuilder.record("myname")
.namespace("myns")
.fields()
.requiredString("a")
.requiredString("b")
.endRecord();
};

/**
* Call for each msg batch.
*
* @param record The in record
* @param ctx An optional context that can contain optional configurations,
* handlers, etc...
* @return A records batch to forward
*/
@Override
public Record[] invoke(Record record, Context ctx) throws Exception {
// Get file to process from context if autodownload requested
final String fileToProcessLocalPath = ctx.getDownloaderLocalObjectUrl();

// Select only the two first column
final File f = new File(fileToProcessLocalPath);
final List<GenericData.Record> outRecords = new ArrayList<>();
try (final CsvReader<CsvRecord> csv = CsvReader.builder().commentStrategy(CommentStrategy.SKIP)
.ofCsvRecord(f.toPath())) {
csv.forEach(new Consumer<CsvRecord>() {
@Override
public void accept(CsvRecord csvRec) {
final GenericData.Record outRecord = new GenericData.Record(schema);
outRecord.put("a", csvRec.getField(0));
outRecord.put("b", csvRec.getField(1));
outRecords.add(outRecord);
}
});
}

return outRecords.toArray(new GenericData.Record[0]);
}

@Override
public String schemaOut() {
return this.schema.toString();
}

}

Use the following Avro schema in your specification:

{
"namespace": "myns",
"type": "record",
"name": "myname",
"fields": [
{"name": "a", "type": "string"},
{"name": "b", "type": "string"}
]
}

III - Assign the ProcessCsvFunction to a runtime

package tech.athea;

import tech.athea.function.ProcessCSVFunction;
import tech.athea.kosmos.runtime.BasicJavaRuntime;

public class App {
public static void main(final String[] args) throws Exception {
final BasicJavaRuntime runtime = new BasicJavaRuntime();
runtime.addProcessFunction(new ProcessCSVFunction());
runtime.run();
}
}

IV - Create a "FatJar" for your application

The creation of a "FatJar" is specific to each dependency management tool, the following example is for Maven. When running the mvn package command, maven assembly will create both a "normal" Jar, and a "FatJar" which will be far larger but can function as a stand alone Jar.

[...]
<build>
<resources>
<resource>
<directory>${project.basedir}/src/main/resources</directory>
<excludes>
<exclude>config.yaml</exclude>
<exclude>topology.yaml</exclude>
<exclude>log4j2.xml</exclude>
</excludes>
</resource>
</resources>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<finalName>my-java-runtime</finalName>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- bind to the packaging phase -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
[...]

V - Create a Docker image for your function

You need to create a docker image that will run your Jar, the following example use a multi-stage build to this end, the following example consider that you already ran mvn package outside of the docker build. (Be careful as it means you need to manually run mvn package before very docker build command).

Multi stage build

Note : you need to adapt the docker file to your specific application :

  • The line COPY --from=builder /usr/src/app/target/my-java-runtime-jar-with-dependencies.jar /usr/app/my-java-runtime-jar-with-dependencies.jar need to be adapted to the name of the Jar you are producing, which depend on your project name.
  • The line ENTRYPOINT ["java", "-XX:MaxRAMPercentage=80.0", "-Dlog4j.configurationFile=file:///usr/app/log4j2.xml", "-cp", "/usr/app/my-java-runtime-jar-with-dependencies.jar", "tech.athea.App"] needs at least 2 changes :
    • You need, as previously, to use the correct name for your Jar file. (my-java-runtime-jar-with-dependencies.jar)
    • You need to target the main class of your application (name & package). (tech.athea.App)
FROM maven:3.9.6-eclipse-temurin-17 as builder

COPY src /usr/src/app/src
COPY libs /usr/src/app/libs
COPY pom.xml /usr/src/app

RUN mvn -f /usr/src/app/pom.xml clean package

FROM eclipse-temurin:17

COPY --from=builder /usr/src/app/src/main/resources/*.xml /usr/app/
COPY --from=builder /usr/src/app/target/my-java-runtime-jar-with-dependencies.jar /usr/app/my-java-runtime-jar-with-dependencies.jar

RUN addgroup kosmos
RUN useradd -g kosmos kosmos

USER kosmos

ENTRYPOINT ["java", "-XX:MaxRAMPercentage=80.0", "-Dlog4j.configurationFile=file:///usr/app/log4j2.xml", "-cp", "/usr/app/my-java-runtime-jar-with-dependencies.jar", "tech.athea.App"]

Advanced Configurations

Interface ProcessFunction

public interface ProcessFunction {
/**
* Call one time at start
* Usefull to initialize resources or other (db connection, etc...)
* @param schemaIn The avro string schema of input message (could be null)
* @param ctx A context that can contain optional configuration, handlers to manipulate blob file, etc...
* @param
*/
default void open(final String schemaIn, final OpenContext ctx) throws Exception {};

/**
* Call for each msg batch
* @param record The in record
* @param ctx An optional context that can contain optional configurations, handlers, etc...
* @return A records batch to forward
*/
GenericData.Record[] invoke(final GenericData.Record record, final Context ctx) throws Exception;

/**
* Return the produced data schema (avro schema String)
* @return The schema (avro schema String)
*/
default String schemaOut() {
return null;
};

/**
* Return the expected data schema (avro schema String)
* @return The schema (avro schema String)
*/
default String schemaIn() {
return null;
};

/**
* Call one time at the end, close...
* Usefull to clean resources or other (db connection, etc...)
*/
default void close() {};
}