Dokumentation (english)

Custom Code

Write and execute your own Python logic directly inside a flow.

The Custom Code node lets you write arbitrary Python code that runs as a step in your flow. Input data from upstream nodes is passed in automatically, and you use create_output() to pass results downstream.

Use it when the built-in transformation or model nodes don't cover your exact logic — custom preprocessing, business rules, data enrichment, calling external APIs, generating plots, or anything else Python can do.

Configuration

FieldDescription
LibrariesSelect which libraries to import. Chosen imports are automatically added at the top of your code.
Output NamesDefine the names of your outputs. Use these exact names in create_output() calls.
Python CodeYour code. Input blobs are available as blob_0, blob_1, etc. (or by name if tagged).
TimeoutMaximum execution time in seconds. Default: 300s. Maximum: 600s.

Available Libraries

LibraryUse it for
polarsFast dataframe operations (default)
plotlyCreating charts and visualizations
pyarrowColumnar data and Parquet files
numpyNumerical computing
PILImage processing
torchPyTorch deep learning
transformersHugging Face models
sentence_transformersSentence embeddings
flairNLP sequence labeling
requestsHTTP requests to external APIs
google_play_scraperScrape Google Play reviews
datetime, json, math, statistics, uuid, re, emojiStandard utilities

Inputs & Outputs

NameDescription
InputdataOptional blobs from upstream nodes, accessible as blob_0, blob_1, … or by tag name
Outputoutput_dataData blobs created via create_output()
OutputplotsPlotly figures created in your code
OutputmetricsLogs and metrics from execution

Writing Your Code

Input blobs from connected upstream nodes are injected as variables:

# Access inputs by index
df = blob_0  # first connected input
df2 = blob_1  # second connected input

# Or by tag name if you tagged your Select Data / upstream nodes
df = my_dataset

Use create_output() to pass data to downstream nodes. The name must match one of your configured Output Names:

filtered = df.filter(pl.col("age") > 18)
summary = filtered.group_by("category").agg(pl.count())

create_output("output", filtered)
create_output("result", summary)

Use log() to write messages visible in the node logs:

log(f"Rows after filter: {len(filtered)}")

Example: Filter and aggregate tabular data

# Filter adults and summarize by category
filtered_df = blob_0.filter(pl.col("age") > 18)
summary_df = filtered_df.group_by("category").agg(pl.count().alias("count"))

create_output("output", filtered_df)
create_output("result", summary_df)

log(f"Filtered to {len(filtered_df)} rows")

Example: Call an external API and structure the response

import json

response = requests.get("https://api.example.com/data", headers={"Authorization": "Bearer TOKEN"})
data = response.json()

df = pl.DataFrame(data["items"])
create_output("output", df)
log(f"Fetched {len(df)} records")

Working with Large Datasets

By default, Polars loads the entire file into RAM as a DataFrame. For files in the gigabyte range this can OOM-kill the container before any code runs. Use these techniques to process large files efficiently.

Lazy evaluation with streaming

Wrap any DataFrame in .lazy() and finish with .collect(streaming=True). Polars executes the query in fixed-size chunks, keeping only one chunk in RAM at a time.

result = (
    df.lazy()
    .filter(pl.col("playcount") > 10)
    .group_by("artist_name")
    .agg(pl.col("playcount").sum())
    .collect(streaming=True)   # processes in chunks
)

Note: Not every operation supports streaming. Operations requiring global ordering (e.g. sort, rank) fall back to in-memory execution automatically. Check with .explain(streaming=True).

Use top_k instead of sort().head()

sort() materialises a full sorted copy in RAM before head() trims it. top_k(k, by=...) uses a min-heap — O(n log k) time and O(k) extra memory regardless of dataset size.

# Avoid — sorts every row before discarding all but 20
df.sort("playcount", descending=True).head(20)

# Prefer — heap-based, only tracks 20 candidates
df.lazy().top_k(20, by="playcount").collect(streaming=True)

Select only the columns you need

Lazy mode applies projection pushdown automatically, but being explicit prevents accidental wide scans.

result = (
    df.lazy()
    .select("user_id", "artist_name", "playcount")  # drop unused columns early
    .group_by("artist_name")
    .agg(pl.col("playcount").sum())
    .collect(streaming=True)
)

Keep aggregations in separate pipelines

Each .collect() call frees intermediate data before the next pipeline starts. Chaining everything into one expression keeps all intermediate results alive simultaneously.

stats = df.lazy().select(pl.col("playcount").mean(), pl.col("playcount").max()).collect(streaming=True)
top_artists = df.lazy().group_by("artist_name").agg(...).top_k(20, by="total_plays").collect(streaming=True)
user_stats = df.lazy().group_by("user_id").agg(...).collect(streaming=True)

Example: Analyse a large dataset with streaming

import polars as pl

df = input_data["your_blob_name"]  # key = blob filename lowercased, no extension

log(f"Loaded dataset: {df.shape[0]:,} rows × {df.shape[1]} columns")
log_metric("total_rows", df.shape[0])

# Single streaming pass for scalar stats
stats = (
    df.lazy()
    .select(
        pl.col("playcount").mean().alias("mean_playcount"),
        pl.col("playcount").max().alias("max_playcount"),
    )
    .collect(streaming=True)
    .row(0, named=True)
)
for k, v in stats.items():
    log_metric(k, round(v, 2))

# Top artists — top_k avoids a full sort
top_artists = (
    df.lazy()
    .group_by("artist_name")
    .agg(
        pl.col("playcount").sum().alias("total_plays"),
        pl.col("track_name").n_unique().alias("unique_tracks"),
    )
    .top_k(20, by="total_plays")
    .collect(streaming=True)
)
log(f"Top artist: {top_artists['artist_name'][0]} ({top_artists['total_plays'][0]:,} plays)")

create_output("top_artists", top_artists)
log("Done.")

Task tiers

The executor automatically selects a task tier based on total input size. No action needed in your script.

TierMax input sizeEphemeral storage
Small≤ 1 GB20 GiB
Large≤ 200 GB200 GiB

Tips

  • Keep timeout in mind for large datasets or slow external requests — increase it if needed (max 600s)
  • Connect a Preview Output node to inspect your outputs during development
  • Use polars instead of pandas — it's significantly faster and is the default dataframe library across the platform
  • For files larger than ~1 GB, use .lazy() + .collect(streaming=True) to avoid out-of-memory errors

Command Palette

Search for a command to run...

Schnellzugriffe
STRG + KSuche
STRG + DNachtmodus / Tagmodus
STRG + LSprache ändern

Software-Details
Kompiliert vor etwa 14 Stunden
Release: v4.0.0-production
Buildnummer: master@0fe3401
Historie: 51 Items