Dataset Filtering
The Dataset Filtering API enables developers to create filtered datasets from existing Kafka-based datasets. This powerful feature allows for real-time stream processing using KSQL queries generated from intuitive filter expressions.
Overview
Dataset filtering provides:
-
Schema-aware validation of filter expressions
-
Automatic KSQL query generation for stream processing
-
Support for complex filter combinations
-
Dry-run mode for testing filters before deployment
-
Comprehensive operator support for various data types
Prerequisites
Before using the filtering API, ensure:
-
The source dataset has Kafka storage configured
-
The dataset has a valid schema registered in the Schema Registry
-
You have the necessary permissions to create filtered datasets
API Endpoints
Create Filtered Dataset
Creates a new filtered dataset from an existing dataset.
POST /datasources/{datasourceId}/enablements/{enablementId}/datasets/{datasetId}/filters
Filter Expressions
Filter Operators
Equality Operators
Comparison Operators
GREATER_THAN
Tests if a field is greater than a value.
| Supported Types | INTEGER, DOUBLE, STRING (lexicographic) |
|---|---|
Example Filter |
|
Generated KSQL |
|
LESS_THAN
Tests if a field is less than a value.
| Supported Types | INTEGER, DOUBLE, STRING (lexicographic) |
|---|---|
Example Filter |
|
Generated KSQL |
|
Range Operators
Set Operators
Null Operators
String Pattern Operators
CONTAINS
Tests if a string field contains a substring.
| Supported Types | STRING only |
|---|---|
Example Filter |
|
Generated KSQL |
|
STARTS_WITH
Tests if a string field starts with a prefix.
| Supported Types | STRING only |
|---|---|
Example Filter |
|
Generated KSQL |
|
ENDS_WITH
Tests if a string field ends with a suffix.
| Supported Types | STRING only |
|---|---|
Example Filter |
|
Generated KSQL |
|
Geographic Operators
Geographic operators enable location-based filtering. While these operators are validated by the API, they are not currently supported in KSQL generation and will produce placeholder comments in the generated query.
WITHIN_RADIUS
Tests if coordinates are within a specified radius of a center point.
| Required Fields | geoFields (latitude/longitude mapping), center, radius |
|---|---|
Example Filter |
|
Generated KSQL |
|
WITHIN_BOUNDS
Tests if coordinates are within a geographic bounding box.
| Required Fields | geoFields (latitude/longitude mapping), bounds |
|---|---|
Example Filter |
|
Generated KSQL |
|
Compound Filters
Compound filters combine multiple filter expressions using logical operators.
AND Operator
All conditions must be true.
{
"operator": "AND",
"filters": [
{
"field": "status",
"operator": "EQUALS",
"value": "ACTIVE"
},
{
"field": "priority",
"operator": "GREATER_THAN",
"value": 5
}
]
}
Generated KSQL:
SELECT * FROM d WHERE (d.status = 'ACTIVE') AND (d.priority > 5)
Field Type Compatibility
The following table shows which operators are compatible with each field type:
| Operator | STRING | INTEGER | DOUBLE | BOOLEAN |
|---|---|---|---|---|
EQUALS |
✓ |
✓ |
✓ |
✓ |
NOT_EQUALS |
✓ |
✓ |
✓ |
✓ |
GREATER_THAN |
✓ |
✓ |
✓ |
✗ |
LESS_THAN |
✓ |
✓ |
✓ |
✗ |
GREATER_THAN_OR_EQUALS |
✓ |
✓ |
✓ |
✗ |
LESS_THAN_OR_EQUALS |
✓ |
✓ |
✓ |
✗ |
BETWEEN |
✓ |
✓ |
✓ |
✗ |
NOT_BETWEEN |
✓ |
✓ |
✓ |
✗ |
IN |
✓ |
✓ |
✓ |
✓ |
NOT_IN |
✓ |
✓ |
✓ |
✓ |
IS_NULL |
✓ |
✓ |
✓ |
✓ |
IS_NOT_NULL |
✓ |
✓ |
✓ |
✓ |
CONTAINS |
✓ |
✗ |
✗ |
✗ |
STARTS_WITH |
✓ |
✗ |
✗ |
✗ |
ENDS_WITH |
✓ |
✗ |
✗ |
✗ |
NOT_CONTAINS |
✓ |
✗ |
✗ |
✗ |
MATCHES_REGEX |
✓ |
✗ |
✗ |
✗ |
Nested Field Access
Access nested fields in complex JSON structures using dot notation:
{
"field": "user.profile.age",
"operator": "GREATER_THAN",
"value": 18
}
Generated KSQL:
SELECT * FROM d WHERE d.user.profile.age > 18
Array Field Access
Arrays can be filtered in two ways:
Filtering the Array Field Directly
When filtering on the array field itself, use operators like CONTAINS to check if any element matches:
{
"field": "tags",
"operator": "CONTAINS",
"value": "urgent"
}
Generated KSQL:
SELECT * FROM d WHERE d.tags LIKE '%urgent%'
Filtering Specific Array Indices
When a schema defines array fields using the [] notation (e.g., measurements[].value), you can filter on specific array indices without the schema explicitly declaring each index:
{
"field": "measurements[0].value",
"operator": "GREATER_THAN",
"value": 100.0
}
Generated KSQL:
SELECT * FROM d WHERE d.measurements[0].value > 100.0
This works for any valid array index:
* items[0].name - First item
* items[5].price - Sixth item
* locations[2].coordinates.latitude - Nested field in third location
The filter validation automatically recognizes array index patterns and validates them against the schema’s array element definitions.
Example: Comparing Array Filtering Approaches
Given a schema with an array of sensor readings:
// Schema fields:
// - sensors (type: ARRAY)
// - sensors[].id (type: STRING)
// - sensors[].temperature (type: DOUBLE)
// - sensors[].status (type: STRING)
Option 1: Filter any sensor (uses array field directly)
{
"field": "sensors",
"operator": "CONTAINS",
"value": "ALARM"
}
Option 2: Filter specific sensor (uses array index)
{
"field": "sensors[0].status",
"operator": "EQUALS",
"value": "ALARM"
}
Option 3: Complex filter on multiple specific sensors
{
"operator": "OR",
"filters": [
{
"field": "sensors[0].temperature",
"operator": "GREATER_THAN",
"value": 100
},
{
"field": "sensors[1].temperature",
"operator": "GREATER_THAN",
"value": 100
}
]
}
Error Handling
The API provides detailed validation feedback:
{
"valid": false,
"errors": [
"Field 'nonExistentField' does not exist in schema",
"Operator 'CONTAINS' is not compatible with field type 'INTEGER'"
],
"warnings": [
"OR filters may impact performance on large datasets"
]
}
Common validation errors include:
-
Field does not exist in schema
-
Operator not compatible with field type
-
Missing required parameters
-
Dataset has no Kafka storage
-
Schema not found or unavailable
Complete Examples
Active Premium Users Filter
Filter for active premium users in specific regions with recent activity:
{
"name": "Active Premium Users - US West",
"filter": {
"operator": "AND",
"filters": [
{
"field": "status",
"operator": "EQUALS",
"value": "ACTIVE"
},
{
"field": "subscription.type",
"operator": "EQUALS",
"value": "PREMIUM"
},
{
"field": "region",
"operator": "IN",
"values": ["us-west-1", "us-west-2"]
},
{
"field": "lastActivityDate",
"operator": "GREATER_THAN",
"value": "2024-01-01"
},
{
"field": "deletedAt",
"operator": "IS_NULL"
}
]
}
}
Error Event Detection
Filter for events indicating errors or requiring attention:
{
"name": "Error Events",
"filter": {
"operator": "OR",
"filters": [
{
"field": "level",
"operator": "IN",
"values": ["ERROR", "CRITICAL", "FATAL"]
},
{
"operator": "AND",
"filters": [
{
"field": "httpStatus",
"operator": "BETWEEN",
"lowerBound": 500,
"upperBound": 599
},
{
"field": "retryCount",
"operator": "GREATER_THAN",
"value": 0
}
]
},
{
"field": "message",
"operator": "CONTAINS",
"value": "Exception"
}
]
}
}
High-Value Customer Segmentation
Filter for high-value customers for targeted marketing:
{
"name": "High Value Customers",
"filter": {
"operator": "AND",
"filters": [
{
"operator": "OR",
"filters": [
{
"field": "totalPurchases",
"operator": "GREATER_THAN",
"value": 10000
},
{
"field": "customerTier",
"operator": "IN",
"values": ["GOLD", "PLATINUM"]
}
]
},
{
"field": "accountAge",
"operator": "GREATER_THAN",
"value": 365
},
{
"field": "email",
"operator": "ENDS_WITH",
"value": "@corporate.com"
},
{
"operator": "NOT",
"filters": [
{
"field": "optedOut",
"operator": "EQUALS",
"value": true
}
]
}
]
}
}
Sensor Data Monitoring
Filter for anomalous readings from specific sensors in an array:
{
"name": "Anomalous Sensor Readings",
"filter": {
"operator": "OR",
"filters": [
{
"operator": "AND",
"filters": [
{
"field": "sensors[0].temperature",
"operator": "GREATER_THAN",
"value": 85.0
},
{
"field": "sensors[0].status",
"operator": "EQUALS",
"value": "ACTIVE"
}
]
},
{
"field": "sensors[1].pressure",
"operator": "NOT_BETWEEN",
"lowerBound": 25.0,
"upperBound": 35.0
},
{
"field": "alerts[0].severity",
"operator": "IN",
"values": ["HIGH", "CRITICAL"]
}
]
}
}
This example demonstrates:
- Filtering on specific array indices (sensors[0], sensors[1])
- Accessing nested fields within array elements (sensors[0].temperature)
- Combining array index filters with other operators
Array Filtering Comparison
Here’s how different array filtering approaches work for various use cases:
Use Case 1: Find documents with a specific tag anywhere in the array
{
"name": "Documents with Urgent Tag",
"filter": {
"field": "tags",
"operator": "CONTAINS",
"value": "urgent"
}
}
Use Case 2: Check if the first author is from a specific department
{
"name": "Primary Author from Engineering",
"filter": {
"field": "authors[0].department",
"operator": "EQUALS",
"value": "Engineering"
}
}
Use Case 3: Complex filtering - any high-priority alert OR first sensor overheating
{
"name": "Critical Conditions",
"filter": {
"operator": "OR",
"filters": [
{
"field": "alerts",
"operator": "CONTAINS",
"value": "high-priority"
},
{
"field": "sensors[0].temperature",
"operator": "GREATER_THAN",
"value": 100
}
]
}
}
Best Practices
-
Test with Dry Run: Always validate filters using
dryRun=truebefore creating filtered datasets -
Consider Performance: Complex OR filters and nested conditions may impact stream processing performance
-
Use Appropriate Types: Ensure filter values match the field types in your schema
-
Leverage Nested Access: Use dot notation for filtering on nested JSON structures