Aller au contenu principal

User Defined Functions (UDF)

UDF can be included within the attribute fn of a DAG Task with type map, either inline in the configuration or provided via a zip as a valid python module.

The function should take an iterator of pandas.DataFrame objects, and optionally a keyword arguments parameter named meta, then outputs an iterator of pandas.DataFrame objects.

# Example UDF which calls a custom processing function on each DataFrame in the iterator
# custom_processing takes `meta` as a keyword argument and thus can adapt its behavior according to the `meta` parameter
# of the pipeline configuration
def udf(df_iterator: Iterator[pd.DataFrame], **meta) -> Iterator[pd.DataFrame]:

for df in df_iterator:
processed_df = custom_processing(df, **meta)
yield processed_df

# Example UDF which calls custom_processing with a `dropna=True` parameter.
def udf2(df_iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
for df in df_iterator:
processed_df = custom_processing(df, dropna=True)
yield processed_df

# This example custom_processing function has two behaviors :
# - when `dropna` is true, it will drop all rows with at least when Non Available value.
# See [dropna](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.dropna.html).
# - when `rename_columns` is a dictionary, it will rename the columns of the DataFrame according to the dictionary.
# See [rename](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.rename.html).
def custom_processing(df: pd.DataFrame, **kwargs) -> pd.DataFrame:
"""
Custom processing function for a pandas.DataFrame.

Parameters:
df (pd.DataFrame): The DataFrame to process.
**kwargs: Arbitrary keyword arguments for processing logic.

Returns:
pd.DataFrame: The processed DataFrame.
"""
# Example processing logic
if 'dropna' in kwargs and kwargs['dropna']:
df = df.dropna()
if 'rename_columns' in kwargs and isinstance(kwargs['rename_columns'], dict):
df = df.rename(columns=kwargs['rename_columns'])
# Add more processing steps based on kwargs
return df

Inlining UDF​

Below is an illustration on how inlining python code looks like

  • read a csv of four columns
  • apply some concatenation logic by inlining python code
  • output only desired columns
dag:
- id: read
type: fs-source
addr: host.docker.internal:9000
topics: s3a://testcsv/mycsv.csv
user: minioadmin
password: minioadmin
format: csv
schema: |
a INT,
b INT,
c INT,
d STRING
meta:
delimiter: ","
encoding: utf-8
csvHeader: true
ignore-parse-errors: false
includeFileMetadata: false
out:
- process
- id: process
type: map
schema: |
d STRING,
e STRING
fn: |
def tmp_func(iterator):
columns = ["a", "b"]
for df in iterator:
df["e"] = df[columns].apply(lambda row: "_".join(row.values.astype(str)), axis=1)
yield df[["d", "e"]]
name: tmp_func
out:
- print
- id: print
type: print
out: []

Using zip(s) as python module(s)​

  • Use our dedicated toolkit to generate zip, that may include your code and other additional modules that your code may depends on
  • Your python module should live under a valid python project structure
    • i.e. using python standards package manager such as setuptools, ...
jobId: jobx
globalConfig:
store:
addr: host.docker.internal:9000
user: minioadmin
password: minioadmin
autoDownload: false
extraResources:
addr: host.docker.internal:9000
topics: >-
s3a://others1/dependenciesA.zip,
s3a://others1/dependenciesB.zip,
s3a://others2/demo2.zip
user: minioadmin
password: minioadmin
dag:
- id: fs-source0
type: fs-source
format: csv
addr: host.docker.internal:9000
user: minioadmin
password: minioadmin
topics: s3a://testcsv/mycsv.csv
meta:
delimiter: ","
encoding: utf-8
csvHeader: true
ignore-parse-errors: false
includeFileMetadata: false
out:
- process
- id: process
type: map
name: tmp_func
fn: from demo2.demo2 import tmp_func
out:
- print
- id: print
type: print

Schema and dataptypes​

Schema definition uses SQL syntax. The supported datatypes are described in the table below with the corresping mapping in Python and Java:

Data typeSQL nameValue type in PythonValue type in Java
ArrayTypeARRAY<element_type>list, tuple, or arrayjava.util.List
BinaryTypeBINARYbytearraybyte[]
BooleanTypeBOOLEANboolboolean or Boolean
ByteTypeBYTE, TINYINTint or longbyte or Byte
DateTypeDATEdatetime.datejava.time.LocalDate or java.sql.Date
DayTimeIntervalTypeINTERVAL DAY, INTERVAL DAY TO HOUR, INTERVAL DAY TO MINUTE, INTERVAL DAY TO SECONDdatetime.timedeltajava.time.Duration
DayTimeIntervalTypeINTERVAL HOUR, INTERVAL HOUR TO MINUTE, INTERVAL HOUR TO SECONDdatetime.timedeltajava.time.Duration
DayTimeIntervalType, INTERVAL MINUTE, INTERVAL MINUTE TO SECOND, INTERVAL SECONDdatetime.timedeltajava.time.Duration
DecimalTypeDECIMAL, DEC, NUMERICdecimal.Decimaljava.math.BigDecimal
DoubleTypeDOUBLEfloatdouble or Double
FloatTypeFLOAT, REALint or longfloat or Float
IntegerTypeINT, INTEGERint or longint or Integer
LongTypeLONG, BIGINTint or longlong or Long
MapTypeMAP<key_type, value_type>dictjava.util.Map
ShortTypeSHORT, SMALLINTint or longshort or Short
StringTypeSTRINGstringString
StructTypeSTRUCT<field1_name: field1_type, field2_name: field2_type, …>list or tupleorg.apache.spark.sql.Row
TimestampNTZTypeTIMESTAMP_NTZdatetime.datetimejava.time.LocalDateTime
TimestampTypeTIMESTAMP, TIMESTAMP_LTZdatetime.datetimejava.time.Instant or java.sql.Timestamp
YearMonthIntervalTypeINTERVAL YEAR, INTERVAL YEAR TO MONTH, INTERVAL MONTHdatetime.timedeltajava.time.Period