User Defined Functions (UDF)
UDF can be included within the attribute fn of a DAG Task with type map, either inline in the configuration or provided via a zip as a valid python module.
The function should take an iterator of pandas.DataFrame objects, and optionally a keyword arguments parameter named meta, then outputs an iterator of pandas.DataFrame objects.
# Example UDF which calls a custom processing function on each DataFrame in the iterator
# custom_processing takes `meta` as a keyword argument and thus can adapt its behavior according to the `meta` parameter
# of the pipeline configuration
def udf(df_iterator: Iterator[pd.DataFrame], **meta) -> Iterator[pd.DataFrame]:
for df in df_iterator:
processed_df = custom_processing(df, **meta)
yield processed_df
# Example UDF which calls custom_processing with a `dropna=True` parameter.
def udf2(df_iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
for df in df_iterator:
processed_df = custom_processing(df, dropna=True)
yield processed_df
# This example custom_processing function has two behaviors :
# - when `dropna` is true, it will drop all rows with at least when Non Available value.
# See [dropna](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.dropna.html).
# - when `rename_columns` is a dictionary, it will rename the columns of the DataFrame according to the dictionary.
# See [rename](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.rename.html).
def custom_processing(df: pd.DataFrame, **kwargs) -> pd.DataFrame:
"""
Custom processing function for a pandas.DataFrame.
Parameters:
df (pd.DataFrame): The DataFrame to process.
**kwargs: Arbitrary keyword arguments for processing logic.
Returns:
pd.DataFrame: The processed DataFrame.
"""
# Example processing logic
if 'dropna' in kwargs and kwargs['dropna']:
df = df.dropna()
if 'rename_columns' in kwargs and isinstance(kwargs['rename_columns'], dict):
df = df.rename(columns=kwargs['rename_columns'])
# Add more processing steps based on kwargs
return df
Inlining UDF​
Below is an illustration on how inlining python code looks like
- read a csv of four columns
- apply some concatenation logic by inlining python code
- output only desired columns
dag:
- id: read
type: fs-source
addr: host.docker.internal:9000
topics: s3a://testcsv/mycsv.csv
user: minioadmin
password: minioadmin
format: csv
schema: |
a INT,
b INT,
c INT,
d STRING
meta:
delimiter: ","
encoding: utf-8
csvHeader: true
ignore-parse-errors: false
includeFileMetadata: false
out:
- process
- id: process
type: map
schema: |
d STRING,
e STRING
fn: |
def tmp_func(iterator):
columns = ["a", "b"]
for df in iterator:
df["e"] = df[columns].apply(lambda row: "_".join(row.values.astype(str)), axis=1)
yield df[["d", "e"]]
name: tmp_func
out:
- print
- id: print
type: print
out: []
Using zip(s) as python module(s)​
- Use our dedicated toolkit to generate zip, that may include your code and other additional modules that your code may depends on
- Your python module should live under a valid python project structure
- i.e. using python standards package manager such as setuptools, ...
jobId: jobx
globalConfig:
store:
addr: host.docker.internal:9000
user: minioadmin
password: minioadmin
autoDownload: false
extraResources:
addr: host.docker.internal:9000
topics: >-
s3a://others1/dependenciesA.zip,
s3a://others1/dependenciesB.zip,
s3a://others2/demo2.zip
user: minioadmin
password: minioadmin
dag:
- id: fs-source0
type: fs-source
format: csv
addr: host.docker.internal:9000
user: minioadmin
password: minioadmin
topics: s3a://testcsv/mycsv.csv
meta:
delimiter: ","
encoding: utf-8
csvHeader: true
ignore-parse-errors: false
includeFileMetadata: false
out:
- process
- id: process
type: map
name: tmp_func
fn: from demo2.demo2 import tmp_func
out:
- print
- id: print
type: print
Schema and dataptypes​
Schema definition uses SQL syntax. The supported datatypes are described in the table below with the corresping mapping in Python and Java:
| Data type | SQL name | Value type in Python | Value type in Java |
|---|---|---|---|
| ArrayType | ARRAY<element_type> | list, tuple, or array | java.util.List |
| BinaryType | BINARY | bytearray | byte[] |
| BooleanType | BOOLEAN | bool | boolean or Boolean |
| ByteType | BYTE, TINYINT | int or long | byte or Byte |
| DateType | DATE | datetime.date | java.time.LocalDate or java.sql.Date |
| DayTimeIntervalType | INTERVAL DAY, INTERVAL DAY TO HOUR, INTERVAL DAY TO MINUTE, INTERVAL DAY TO SECOND | datetime.timedelta | java.time.Duration |
| DayTimeIntervalType | INTERVAL HOUR, INTERVAL HOUR TO MINUTE, INTERVAL HOUR TO SECOND | datetime.timedelta | java.time.Duration |
| DayTimeIntervalType | , INTERVAL MINUTE, INTERVAL MINUTE TO SECOND, INTERVAL SECOND | datetime.timedelta | java.time.Duration |
| DecimalType | DECIMAL, DEC, NUMERIC | decimal.Decimal | java.math.BigDecimal |
| DoubleType | DOUBLE | float | double or Double |
| FloatType | FLOAT, REAL | int or long | float or Float |
| IntegerType | INT, INTEGER | int or long | int or Integer |
| LongType | LONG, BIGINT | int or long | long or Long |
| MapType | MAP<key_type, value_type> | dict | java.util.Map |
| ShortType | SHORT, SMALLINT | int or long | short or Short |
| StringType | STRING | string | String |
| StructType | STRUCT<field1_name: field1_type, field2_name: field2_type, …> | list or tuple | org.apache.spark.sql.Row |
| TimestampNTZType | TIMESTAMP_NTZ | datetime.datetime | java.time.LocalDateTime |
| TimestampType | TIMESTAMP, TIMESTAMP_LTZ | datetime.datetime | java.time.Instant or java.sql.Timestamp |
| YearMonthIntervalType | INTERVAL YEAR, INTERVAL YEAR TO MONTH, INTERVAL MONTH | datetime.timedelta | java.time.Period |