Skip to main content

Example

from pandas import DataFrame


@transformer
def transform_df(df: DataFrame, *args, **kwargs) -> DataFrame:
    return df.iloc[:10]

How data is received from upstream blocks

All blocks (except Scratchpads and Sensors) pass their data from the return statement in their decorated function to all their downstream blocks. In order to access the data from an upstream block within a current block, use the positional arguments of the decorated function. For example, in the above function named transform_df, the output data from its upstream block is accessed by the positional argument named df.

Getting data from multiple upstream blocks

If a block has more than 1 upstream block, then each upstream block’s output is accessible via positional arguments. For example, if you have 3 upstream blocks, then there will be 3 positional arguments available within the transform_df function. See below for an example:
from pandas import DataFrame
import pandas as pd


@transformer
def transform_df(df1, df2, df3, **kwargs) -> DataFrame:
    return pd.concat([df1, df2, df3])

Handling column renames in data integration pipeline

Since data passed to transform blocks is a pandas DataFrame, many operations are available on each row of your data. It’s possible to do a few dangerous things with this, for example if you have a target destination that needs it’s columns renamed to match existing schemas. In this case, there can be edge case mismatches between the data types in the source columns, and the expected data types in the destination. This is particularly important for SQL columns where the INSERT commands generated by the pipelines will use CAST expressions to try and make sure the writes succeed. However, if a column is renamed in the transformer, we will use our best guess by looking at values to decide what to CAST the column values to. For example, a TIMESTAMPTZ value of 2023-01-09T20:01:00, once it’s renamed, can look like a string, but using an INSERT with CAST('2023-01-09T20:01:00' AS TEXT) to a column with data type TIMESTAMPTZ will fail. The recommended way to rename columns in data integration pipelines is to use the rename_columns parameter in the @transformer decorator. This approach automatically preserves original column types and updates schema metadata including unique_constraints and key_properties. Benefits:
  • Type preservation: Original column types are maintained after renaming
  • Schema integrity: unique_constraints and key_properties are automatically updated
  • Declarative: Column renaming is specified at the decorator level, making intent clear
  • Only renames existing columns: Columns that don’t exist in the DataFrame are silently skipped
Example:
from pandas import DataFrame

@transformer(rename_columns={
    'impressions': 'field_5627',
    'clicks': 'field_5628',
    'new_free_signups': 'field_5629',
    'new_paid_signups': 'field_5626',
    'event_timestamp': 'field_5630'
})
def transform(data, *args, **kwargs):
    # Your transformation logic here
    # Columns are automatically renamed after this function executes
    data.insert(0, 'trashed', False)
    return data
The rename_columns parameter accepts a dictionary where:
  • Keys are the original column names (must exist in the DataFrame)
  • Values are the new column names
Note: If a column in rename_columns doesn’t exist in the DataFrame, it will be silently skipped. All target column names must be unique.

Using manual column renaming with aliases (Legacy approach)

If you need more control or are using an older version, you can manually rename columns in your transformer function and add column “aliases” in the pipeline’s data integration catalog.
  1. Have a transformer block in your integration pipeline that does some column renaming:

from pandas import DataFrame
import pandas as pd

@transformer
def transform(data, *args, **kwargs):
    # keys = source column names, values = destination column names
    COLUMN_MAP = {
        'impressions': 'field_5627', 
        'clicks': 'field_5628', 
        'new_free_signups': 'field_5629',
        'new_paid_signups': 'field_5626', 
        'event_timestamp': 'field_5630'
    }
    # the pandas rename in place to match destination schema
    data.rename(columns=COLUMN_MAP, inplace=True)
    data.insert(0, 'trashed', False)
    return data

  1. Add ‘aliases’ property to each renamed field in your pipeline’s data_integration_catalog.json file.

{
    "catalog": {
        "streams": [
            {
                "tap_stream_id": "marketing_data",
                "replication_method": "INCREMENTAL",
                "key_properties": [
                    "id"
                ],
                "schema": {
                    "properties": {
                        "id": {
                            "type": [
                                "integer"
                            ]
                        },
                        "impressions": {
                            "type": [
                                "null",
                                "number"
                            ],
                            "aliases": [
                                "field_5627"
                            ]
                        },
                        "clicks": {
                            "type": [
                                "null",
                                "number"
                            ],
                            "aliases": [
                                "field_5628"
                            ]
                        },
                        "new_free_signups": {
                            "type": [
                                "null",
                                "number"
                            ],
                            "aliases": [
                                "field_5629"
                            ]
                        },
                        "new_paid_signups": {
                            "type": [
                                "null",
                                "number"
                            ],
                            "aliases": [
                                "field_5626"
                            ]
                        },
                        "event_timestamp": {
                            "format": "date-time",
                            "type": [
                                "null",
                                "string"
                            ],
                            "aliases": [
                                "field_5630"
                            ]
                        }
                    },
                    "type": "object"
                },
                "stream": "marketing_data",
                "metadata": [...],
                ...
            }
        ]
    }
}

Now, pipeline runs will use the correct data types for INSERT commands, thanks to transformer column aliases defined in the schema. Note: When using the rename_columns decorator parameter, you don’t need to manually add aliases to the catalog - the schema metadata is automatically updated. The manual approach with aliases is only needed when renaming columns inside the transformer function code.