Custom Code
Write and execute your own Python logic directly inside a flow.
The Custom Code node lets you write arbitrary Python code that runs as a step in your flow. Input data from upstream nodes is passed in automatically, and you use create_output() to pass results downstream.
Use it when the built-in transformation or model nodes don't cover your exact logic — custom preprocessing, business rules, data enrichment, calling external APIs, generating plots, or anything else Python can do.
Configuration
| Field | Description |
|---|---|
| Libraries | Select which libraries to import. Chosen imports are automatically added at the top of your code. |
| Output Names | Define the names of your outputs. Use these exact names in create_output() calls. |
| Python Code | Your code. Input blobs are available as blob_0, blob_1, etc. (or by name if tagged). |
| Timeout | Maximum execution time in seconds. Default: 300s. Maximum: 600s. |
Available Libraries
| Library | Use it for |
|---|---|
polars | Fast dataframe operations (default) |
plotly | Creating charts and visualizations |
pyarrow | Columnar data and Parquet files |
numpy | Numerical computing |
PIL | Image processing |
torch | PyTorch deep learning |
transformers | Hugging Face models |
sentence_transformers | Sentence embeddings |
flair | NLP sequence labeling |
requests | HTTP requests to external APIs |
google_play_scraper | Scrape Google Play reviews |
datetime, json, math, statistics, uuid, re, emoji | Standard utilities |
Inputs & Outputs
| Name | Description | |
|---|---|---|
| Input | data | Optional blobs from upstream nodes, accessible as blob_0, blob_1, … or by tag name |
| Output | output_data | Data blobs created via create_output() |
| Output | plots | Plotly figures created in your code |
| Output | metrics | Logs and metrics from execution |
Writing Your Code
Input blobs from connected upstream nodes are injected as variables:
# Access inputs by index
df = blob_0 # first connected input
df2 = blob_1 # second connected input
# Or by tag name if you tagged your Select Data / upstream nodes
df = my_datasetUse create_output() to pass data to downstream nodes. The name must match one of your configured Output Names:
filtered = df.filter(pl.col("age") > 18)
summary = filtered.group_by("category").agg(pl.count())
create_output("output", filtered)
create_output("result", summary)Use log() to write messages visible in the node logs:
log(f"Rows after filter: {len(filtered)}")Example: Filter and aggregate tabular data
# Filter adults and summarize by category
filtered_df = blob_0.filter(pl.col("age") > 18)
summary_df = filtered_df.group_by("category").agg(pl.count().alias("count"))
create_output("output", filtered_df)
create_output("result", summary_df)
log(f"Filtered to {len(filtered_df)} rows")Example: Call an external API and structure the response
import json
response = requests.get("https://api.example.com/data", headers={"Authorization": "Bearer TOKEN"})
data = response.json()
df = pl.DataFrame(data["items"])
create_output("output", df)
log(f"Fetched {len(df)} records")Working with Large Datasets
By default, Polars loads the entire file into RAM as a DataFrame. For files in the gigabyte range this can OOM-kill the container before any code runs. Use these techniques to process large files efficiently.
Lazy evaluation with streaming
Wrap any DataFrame in .lazy() and finish with .collect(streaming=True). Polars executes the query in fixed-size chunks, keeping only one chunk in RAM at a time.
result = (
df.lazy()
.filter(pl.col("playcount") > 10)
.group_by("artist_name")
.agg(pl.col("playcount").sum())
.collect(streaming=True) # processes in chunks
)Note: Not every operation supports streaming. Operations requiring global ordering (e.g.
sort,rank) fall back to in-memory execution automatically. Check with.explain(streaming=True).
Use top_k instead of sort().head()
sort() materialises a full sorted copy in RAM before head() trims it. top_k(k, by=...) uses a min-heap — O(n log k) time and O(k) extra memory regardless of dataset size.
# Avoid — sorts every row before discarding all but 20
df.sort("playcount", descending=True).head(20)
# Prefer — heap-based, only tracks 20 candidates
df.lazy().top_k(20, by="playcount").collect(streaming=True)Select only the columns you need
Lazy mode applies projection pushdown automatically, but being explicit prevents accidental wide scans.
result = (
df.lazy()
.select("user_id", "artist_name", "playcount") # drop unused columns early
.group_by("artist_name")
.agg(pl.col("playcount").sum())
.collect(streaming=True)
)Keep aggregations in separate pipelines
Each .collect() call frees intermediate data before the next pipeline starts. Chaining everything into one expression keeps all intermediate results alive simultaneously.
stats = df.lazy().select(pl.col("playcount").mean(), pl.col("playcount").max()).collect(streaming=True)
top_artists = df.lazy().group_by("artist_name").agg(...).top_k(20, by="total_plays").collect(streaming=True)
user_stats = df.lazy().group_by("user_id").agg(...).collect(streaming=True)Example: Analyse a large dataset with streaming
import polars as pl
df = input_data["your_blob_name"] # key = blob filename lowercased, no extension
log(f"Loaded dataset: {df.shape[0]:,} rows × {df.shape[1]} columns")
log_metric("total_rows", df.shape[0])
# Single streaming pass for scalar stats
stats = (
df.lazy()
.select(
pl.col("playcount").mean().alias("mean_playcount"),
pl.col("playcount").max().alias("max_playcount"),
)
.collect(streaming=True)
.row(0, named=True)
)
for k, v in stats.items():
log_metric(k, round(v, 2))
# Top artists — top_k avoids a full sort
top_artists = (
df.lazy()
.group_by("artist_name")
.agg(
pl.col("playcount").sum().alias("total_plays"),
pl.col("track_name").n_unique().alias("unique_tracks"),
)
.top_k(20, by="total_plays")
.collect(streaming=True)
)
log(f"Top artist: {top_artists['artist_name'][0]} ({top_artists['total_plays'][0]:,} plays)")
create_output("top_artists", top_artists)
log("Done.")Task tiers
The executor automatically selects a task tier based on total input size. No action needed in your script.
| Tier | Max input size | Ephemeral storage |
|---|---|---|
| Small | ≤ 1 GB | 20 GiB |
| Large | ≤ 200 GB | 200 GiB |
Tips
- Keep timeout in mind for large datasets or slow external requests — increase it if needed (max 600s)
- Connect a Preview Output node to inspect your outputs during development
- Use
polarsinstead ofpandas— it's significantly faster and is the default dataframe library across the platform - For files larger than ~1 GB, use
.lazy()+.collect(streaming=True)to avoid out-of-memory errors