Datasets bridge the gap between Prompt Engineering and AI Workflows. Use datasets to supply inputs to your flows, store results, and maintain a continuous evaluation cycle.
The Dataset-Flow Connection
1. Prompt Engineering
↓ Create & evaluate prompts
↓ Build test datasets
2. Flow Processing
↓ Load dataset with Dataset Source
↓ Process through flow
↓ Save results with Dataset Sink
3. Back to Evaluation
↓ Analyze flow outputs
↓ Iterate and improve
↓ Repeat cycleDataset Source Node
Selecting a Dataset
- 1
Add Dataset Source node Drag to canvas
- 2
Choose dataset Select from your existing datasets
- 3
Configure fields Map which columns to use
- 4
Set filters Optional: Filter which rows to process
Field Mapping
Dataset has columns:
- company_name
- website_url
- industry
- notes
Expose as outputs:
out.companyName ← company_name
out.website ← website_url
out.industry ← industry
(notes field not exposed, won't be used)Filtering Rows
Process only:
- Unprocessed items (status != "complete")
- Specific industry (industry == "Technology")
- Date range (created_at > "2024-01-01")
- Random sample (10% of dataset)
Examples:
Filter: status == "pending"
→ Process only pending items
Filter: industry IN ["Tech", "Finance"]
→ Process only those industriesTip
Use filters to process datasets incrementally. Run flows on new or updated items without reprocessing everything.
Dataset Sink Node
Saving Results
- 1
Add Dataset Sink node Drag to end of flow
- 2
Select target dataset Existing or create new
- 3
Map fields Map flow outputs to dataset columns
- 4
Set update mode Append, update, or upsert
Field Mapping
Flow outputs:
Prompt.out.summary
Prompt.out.sentiment
WebsiteMapper.out.pageCount
Map to dataset columns:
summary ← Prompt.out.summary
sentiment ← Prompt.out.sentiment
page_count ← WebsiteMapper.out.pageCount
processed_at ← System.timestamp
flow_id ← System.flowIdUpdate Modes
Append (default):
- Add new row for each result
- Original dataset unchanged
- Best for: Logging, history tracking
Update:
- Update existing row by ID
- Requires unique identifier
- Best for: Enriching existing data
Upsert:
- Update if exists, insert if not
- Requires unique identifier
- Best for: Idempotent processingCommon Patterns
Basic Processing Loop
Dataset Source ("Companies")
→ Prompt (analyze company)
→ Dataset Sink ("Company Analysis")
Input dataset: List of company URLs
Output dataset: Analysis for each companyEnrichment Pattern
Dataset Source ("Raw Leads")
→ Website Mapper (discover pages)
→ Prompt (classify business type)
→ Dataset Sink (UPDATE "Raw Leads")
Adds new columns to existing dataset:
- page_count
- business_type
- last_analyzedMulti-Stage Processing
Dataset Source ("URLs to Process")
→ Stage 1: Scrape content
→ Dataset Sink ("Scraped Content")
Then separately:
Dataset Source ("Scraped Content")
→ Stage 2: Analyze with AI
→ Dataset Sink ("Analysis Results")Incremental Processing
Dataset Source ("Products")
Filter: processed_at IS NULL
→ Process unprocessed items
→ Dataset Sink (UPDATE "Products")
Set: processed_at = now()
Run daily to process new products onlyInfo
Incremental processing is efficient for large datasets. Only process what's changed since last run.
Dataset Schema Considerations
Design for Flow Integration
Good dataset design:
- id: Unique identifier
- input_field: Data to process
- output_field: Where to store result
- status: Track processing state
- processed_at: Timestamp
- error_message: Store failures
Example:
id: "lead_123"
website: "https://example.com"
summary: null → (filled by flow)
status: "pending" → "complete"
processed_at: null → "2024-01-15T10:30:00Z"Handling Updates
Update by ID:
Match on: id
Update: summary, status, processed_at
Upsert by unique field:
Match on: website (unique)
If exists: Update summary
If not: Create new rowError Handling with Datasets
Tracking Failures
Dataset Source
→ Try: Process item
→ If success:
Dataset Sink (status="complete", result=output)
→ If error:
Dataset Sink (status="failed", error=message)
All items recorded, even failuresRetry Failed Items
Later run:
Dataset Source
Filter: status == "failed"
→ Retry processing
→ Update status on successPerformance Tips
Batch Size
Small datasets (< 100 items):
- Process all at once
- Use Dataset Source → Array Splitter
Large datasets (> 1000 items):
- Process in batches
- Filter by date or ID range
- Multiple runs to completionParallel Processing
Dataset Source (1000 items)
→ Array Splitter (parallel: 20)
→ Process items in parallel
→ Array Flatten
→ Dataset Sink (batch insert)
Much faster than sequentialSelective Fields
Only load needed fields:
✓ Load: id, website, industry
✗ Skip: notes, description, metadata
Reduces memory and improves performanceExample: Lead Scoring Flow
Input Dataset: "Leads"
[
{"id": 1, "company": "Acme Corp", "website": "https://acme.com"},
{"id": 2, "company": "TechCo", "website": "https://techco.io"},
...
]Flow
Dataset Source ("Leads")
→ Website Mapper (analyze site)
→ Prompt ("Score this company 1-10 based on...")
→ Dataset Sink (UPDATE "Leads")
Adds columns:
- lead_score
- company_size
- technology_stack
- scored_atOutput Dataset: "Leads" (enriched)
[
{
"id": 1,
"company": "Acme Corp",
"website": "https://acme.com",
"lead_score": 8,
"company_size": "200-500 employees",
"technology_stack": "React, AWS, Stripe",
"scored_at": "2024-01-15T10:30:00Z"
},
...
]This pattern transforms a simple list of companies into a rich, actionable leads database.
Best Practices
1. Use Descriptive Names
✓ Good: "Companies - Website Analysis Results"
✗ Bad: "Dataset 1"2. Include Metadata
Always store:
- processed_at timestamp
- flow_id or flow_version
- status (success/failed)
- error_message (if failed)3. Version Your Datasets
leads_v1: Initial data
leads_v2: After enrichment flow
leads_v3: After scoring flow
Keeps audit trail and enables rollback4. Test with Small Samples
Create "Test Leads" dataset with 5 items
→ Run flow
→ Verify results
→ Then run on full "Leads" dataset