Dataset Filtering

The Dataset Filtering API enables developers to create filtered datasets from existing Kafka-based datasets. This powerful feature allows for real-time stream processing using KSQL queries generated from intuitive filter expressions.

Overview

Dataset filtering provides:

  • Schema-aware validation of filter expressions

  • Automatic KSQL query generation for stream processing

  • Support for complex filter combinations

  • Dry-run mode for testing filters before deployment

  • Comprehensive operator support for various data types

Prerequisites

Before using the filtering API, ensure:

  • The source dataset has Kafka storage configured

  • The dataset has a valid schema registered in the Schema Registry

  • You have the necessary permissions to create filtered datasets

API Endpoints

Create Filtered Dataset

Creates a new filtered dataset from an existing dataset.

POST /datasources/{datasourceId}/enablements/{enablementId}/datasets/{datasetId}/filters

Request Body

{
  "name": "Filtered Dataset Name",
  "acronym": "FDS",
  "description": "Description of the filtered dataset",
  "labels": ["tag1", "tag2"],
  "autoStart": true,
  "filter": {
    // Filter expression (see Filter Expressions section)
  }
}

Response

{
  "datasourceId": "550e8400-e29b-41d4-a716-446655440001",
  "enablementId": "550e8400-e29b-41d4-a716-446655440002",
  "datasetId": "550e8400-e29b-41d4-a716-446655440003",
  "ingestionStarted": true,
  "ksqlQuery": "SELECT * FROM d WHERE d.status = 'ACTIVE'"
}

Validation (Dry Run)

Test filter expressions without creating a dataset by adding the dryRun parameter.

POST /datasources/{datasourceId}/enablements/{enablementId}/datasets/{datasetId}/filters?dryRun=true

Response

{
  "valid": true,
  "ksqlQuery": "SELECT * FROM d WHERE d.status = 'ACTIVE'",
  "warnings": [],
  "errors": []
}

Filter Expressions

Filter expressions define the criteria for selecting records from the source dataset. There are two types:

Filter Predicate

A basic condition that evaluates a single field.

{
  "field": "status",
  "operator": "EQUALS",
  "value": "ACTIVE"
}

Compound Filter

Combines multiple filters using logical operators.

{
  "operator": "AND",
  "filters": [
    {
      "field": "status",
      "operator": "EQUALS",
      "value": "ACTIVE"
    },
    {
      "field": "priority",
      "operator": "GREATER_THAN",
      "value": 5
    }
  ]
}

Filter Operators

Equality Operators

EQUALS

Tests if a field equals a specific value.

Supported Types All field types (STRING, INTEGER, DOUBLE, BOOLEAN)

Example Filter

{
  "field": "status",
  "operator": "EQUALS",
  "value": "ACTIVE"
}

Generated KSQL

SELECT * FROM d WHERE d.status = 'ACTIVE'

NOT_EQUALS

Tests if a field does not equal a specific value.

Supported Types All field types

Example Filter

{
  "field": "status",
  "operator": "NOT_EQUALS",
  "value": "INACTIVE"
}

Generated KSQL

SELECT * FROM d WHERE d.status != 'INACTIVE'

Comparison Operators

GREATER_THAN

Tests if a field is greater than a value.

Supported Types INTEGER, DOUBLE, STRING (lexicographic)

Example Filter

{
  "field": "priority",
  "operator": "GREATER_THAN",
  "value": 5
}

Generated KSQL

SELECT * FROM d WHERE d.priority > 5

LESS_THAN

Tests if a field is less than a value.

Supported Types INTEGER, DOUBLE, STRING (lexicographic)

Example Filter

{
  "field": "temperature",
  "operator": "LESS_THAN",
  "value": 32.0
}

Generated KSQL

SELECT * FROM d WHERE d.temperature < 32.0

GREATER_THAN_OR_EQUALS

Tests if a field is greater than or equal to a value.

Supported Types INTEGER, DOUBLE, STRING (lexicographic)

Example Filter

{
  "field": "score",
  "operator": "GREATER_THAN_OR_EQUALS",
  "value": 80
}

Generated KSQL

SELECT * FROM d WHERE d.score >= 80

LESS_THAN_OR_EQUALS

Tests if a field is less than or equal to a value.

Supported Types INTEGER, DOUBLE, STRING (lexicographic)

Example Filter

{
  "field": "age",
  "operator": "LESS_THAN_OR_EQUALS",
  "value": 65
}

Generated KSQL

SELECT * FROM d WHERE d.age <= 65

Range Operators

BETWEEN

Tests if a field value falls within a range (inclusive).

Supported Types INTEGER, DOUBLE, STRING (lexicographic)

Example Filter

{
  "field": "temperature",
  "operator": "BETWEEN",
  "lowerBound": 20,
  "upperBound": 30
}

Generated KSQL

SELECT * FROM d WHERE d.temperature BETWEEN 20 AND 30

NOT_BETWEEN

Tests if a field value falls outside a range.

Supported Types INTEGER, DOUBLE, STRING (lexicographic)

Example Filter

{
  "field": "age",
  "operator": "NOT_BETWEEN",
  "lowerBound": 18,
  "upperBound": 65
}

Generated KSQL

SELECT * FROM d WHERE d.age NOT BETWEEN 18 AND 65

Set Operators

IN

Tests if a field matches any value in a list.

Supported Types All field types

Example Filter

{
  "field": "category",
  "operator": "IN",
  "values": ["electronics", "computers", "phones"]
}

Generated KSQL

SELECT * FROM d WHERE d.category IN ('electronics', 'computers', 'phones')

NOT_IN

Tests if a field does not match any value in a list.

Supported Types All field types

Example Filter

{
  "field": "status",
  "operator": "NOT_IN",
  "values": ["CANCELLED", "FAILED", "EXPIRED"]
}

Generated KSQL

SELECT * FROM d WHERE d.status NOT IN ('CANCELLED', 'FAILED', 'EXPIRED')

Null Operators

IS_NULL

Tests if a field is null.

Supported Types All field types

Example Filter

{
  "field": "deletedAt",
  "operator": "IS_NULL"
}

Generated KSQL

SELECT * FROM d WHERE d.deletedAt IS NULL

IS_NOT_NULL

Tests if a field is not null.

Supported Types All field types

Example Filter

{
  "field": "email",
  "operator": "IS_NOT_NULL"
}

Generated KSQL

SELECT * FROM d WHERE d.email IS NOT NULL

String Pattern Operators

CONTAINS

Tests if a string field contains a substring.

Supported Types STRING only

Example Filter

{
  "field": "description",
  "operator": "CONTAINS",
  "value": "urgent"
}

Generated KSQL

SELECT * FROM d WHERE d.description LIKE '%urgent%'

STARTS_WITH

Tests if a string field starts with a prefix.

Supported Types STRING only

Example Filter

{
  "field": "name",
  "operator": "STARTS_WITH",
  "value": "John"
}

Generated KSQL

SELECT * FROM d WHERE d.name LIKE 'John%'

ENDS_WITH

Tests if a string field ends with a suffix.

Supported Types STRING only

Example Filter

{
  "field": "email",
  "operator": "ENDS_WITH",
  "value": "@company.com"
}

Generated KSQL

SELECT * FROM d WHERE d.email LIKE '%@company.com'

NOT_CONTAINS

Tests if a string field does not contain a substring.

This operator is validated but not currently supported in KSQL generation.

MATCHES_REGEX

Tests if a string field matches a regular expression.

This operator is validated but not currently supported in KSQL generation.

Geographic Operators

Geographic operators enable location-based filtering. While these operators are validated by the API, they are not currently supported in KSQL generation and will produce placeholder comments in the generated query.

WITHIN_RADIUS

Tests if coordinates are within a specified radius of a center point.

Required Fields geoFields (latitude/longitude mapping), center, radius

Example Filter

{
  "operator": "WITHIN_RADIUS",
  "geoFields": {
    "latitudeField": "lat",
    "longitudeField": "lon",
    "altitudeField": "alt"
  },
  "center": {
    "latitude": 40.7128,
    "longitude": -74.0060
  },
  "radius": 10000
}

Generated KSQL

TBD

WITHIN_BOUNDS

Tests if coordinates are within a geographic bounding box.

Required Fields geoFields (latitude/longitude mapping), bounds

Example Filter

{
  "operator": "WITHIN_BOUNDS",
  "geoFields": {
    "latitudeField": "lat",
    "longitudeField": "lon"
  },
  "bounds": {
    "northLatitude": 40.917577,
    "southLatitude": 40.477399,
    "eastLongitude": -73.700272,
    "westLongitude": -74.259090
  }
}

Generated KSQL

TBD

WITHIN_MGRS_GRID

Tests if an MGRS (Military Grid Reference System) field matches a grid reference.

Required Fields field, mgrsGrid

Example Filter

{
  "field": "mgrsLocation",
  "operator": "WITHIN_MGRS_GRID",
  "mgrsGrid": "18TWL"
}

Generated KSQL

TBD

Compound Filters

Compound filters combine multiple filter expressions using logical operators.

AND Operator

All conditions must be true.

{
  "operator": "AND",
  "filters": [
    {
      "field": "status",
      "operator": "EQUALS",
      "value": "ACTIVE"
    },
    {
      "field": "priority",
      "operator": "GREATER_THAN",
      "value": 5
    }
  ]
}

Generated KSQL:

SELECT * FROM d WHERE (d.status = 'ACTIVE') AND (d.priority > 5)

OR Operator

At least one condition must be true.

{
  "operator": "OR",
  "filters": [
    {
      "field": "status",
      "operator": "EQUALS",
      "value": "FAILED"
    },
    {
      "field": "retryCount",
      "operator": "GREATER_THAN",
      "value": 3
    }
  ]
}

Generated KSQL:

SELECT * FROM d WHERE (d.status = 'FAILED') OR (d.retryCount > 3)

NOT Operator

Negates a filter condition. Must contain exactly one filter.

{
  "operator": "NOT",
  "filters": [
    {
      "field": "environment",
      "operator": "EQUALS",
      "value": "production"
    }
  ]
}

Generated KSQL:

SELECT * FROM d WHERE NOT (d.environment = 'production')

Field Type Compatibility

The following table shows which operators are compatible with each field type:

Operator STRING INTEGER DOUBLE BOOLEAN

EQUALS

NOT_EQUALS

GREATER_THAN

LESS_THAN

GREATER_THAN_OR_EQUALS

LESS_THAN_OR_EQUALS

BETWEEN

NOT_BETWEEN

IN

NOT_IN

IS_NULL

IS_NOT_NULL

CONTAINS

STARTS_WITH

ENDS_WITH

NOT_CONTAINS

MATCHES_REGEX

Nested Field Access

Access nested fields in complex JSON structures using dot notation:

{
  "field": "user.profile.age",
  "operator": "GREATER_THAN",
  "value": 18
}

Generated KSQL:

SELECT * FROM d WHERE d.user.profile.age > 18

Array Field Access

Arrays can be filtered in two ways:

Filtering the Array Field Directly

When filtering on the array field itself, use operators like CONTAINS to check if any element matches:

{
  "field": "tags",
  "operator": "CONTAINS",
  "value": "urgent"
}

Generated KSQL:

SELECT * FROM d WHERE d.tags LIKE '%urgent%'

Filtering Specific Array Indices

When a schema defines array fields using the [] notation (e.g., measurements[].value), you can filter on specific array indices without the schema explicitly declaring each index:

{
  "field": "measurements[0].value",
  "operator": "GREATER_THAN",
  "value": 100.0
}

Generated KSQL:

SELECT * FROM d WHERE d.measurements[0].value > 100.0

This works for any valid array index: * items[0].name - First item * items[5].price - Sixth item * locations[2].coordinates.latitude - Nested field in third location

The filter validation automatically recognizes array index patterns and validates them against the schema’s array element definitions.

Example: Comparing Array Filtering Approaches

Given a schema with an array of sensor readings:

// Schema fields:
// - sensors (type: ARRAY)
// - sensors[].id (type: STRING)
// - sensors[].temperature (type: DOUBLE)
// - sensors[].status (type: STRING)

Option 1: Filter any sensor (uses array field directly)

{
  "field": "sensors",
  "operator": "CONTAINS",
  "value": "ALARM"
}

Option 2: Filter specific sensor (uses array index)

{
  "field": "sensors[0].status",
  "operator": "EQUALS",
  "value": "ALARM"
}

Option 3: Complex filter on multiple specific sensors

{
  "operator": "OR",
  "filters": [
    {
      "field": "sensors[0].temperature",
      "operator": "GREATER_THAN",
      "value": 100
    },
    {
      "field": "sensors[1].temperature",
      "operator": "GREATER_THAN",
      "value": 100
    }
  ]
}

Error Handling

The API provides detailed validation feedback:

{
  "valid": false,
  "errors": [
    "Field 'nonExistentField' does not exist in schema",
    "Operator 'CONTAINS' is not compatible with field type 'INTEGER'"
  ],
  "warnings": [
    "OR filters may impact performance on large datasets"
  ]
}

Common validation errors include:

  • Field does not exist in schema

  • Operator not compatible with field type

  • Missing required parameters

  • Dataset has no Kafka storage

  • Schema not found or unavailable

Complete Examples

Active Premium Users Filter

Filter for active premium users in specific regions with recent activity:

{
  "name": "Active Premium Users - US West",
  "filter": {
    "operator": "AND",
    "filters": [
      {
        "field": "status",
        "operator": "EQUALS",
        "value": "ACTIVE"
      },
      {
        "field": "subscription.type",
        "operator": "EQUALS",
        "value": "PREMIUM"
      },
      {
        "field": "region",
        "operator": "IN",
        "values": ["us-west-1", "us-west-2"]
      },
      {
        "field": "lastActivityDate",
        "operator": "GREATER_THAN",
        "value": "2024-01-01"
      },
      {
        "field": "deletedAt",
        "operator": "IS_NULL"
      }
    ]
  }
}

Error Event Detection

Filter for events indicating errors or requiring attention:

{
  "name": "Error Events",
  "filter": {
    "operator": "OR",
    "filters": [
      {
        "field": "level",
        "operator": "IN",
        "values": ["ERROR", "CRITICAL", "FATAL"]
      },
      {
        "operator": "AND",
        "filters": [
          {
            "field": "httpStatus",
            "operator": "BETWEEN",
            "lowerBound": 500,
            "upperBound": 599
          },
          {
            "field": "retryCount",
            "operator": "GREATER_THAN",
            "value": 0
          }
        ]
      },
      {
        "field": "message",
        "operator": "CONTAINS",
        "value": "Exception"
      }
    ]
  }
}

High-Value Customer Segmentation

Filter for high-value customers for targeted marketing:

{
  "name": "High Value Customers",
  "filter": {
    "operator": "AND",
    "filters": [
      {
        "operator": "OR",
        "filters": [
          {
            "field": "totalPurchases",
            "operator": "GREATER_THAN",
            "value": 10000
          },
          {
            "field": "customerTier",
            "operator": "IN",
            "values": ["GOLD", "PLATINUM"]
          }
        ]
      },
      {
        "field": "accountAge",
        "operator": "GREATER_THAN",
        "value": 365
      },
      {
        "field": "email",
        "operator": "ENDS_WITH",
        "value": "@corporate.com"
      },
      {
        "operator": "NOT",
        "filters": [
          {
            "field": "optedOut",
            "operator": "EQUALS",
            "value": true
          }
        ]
      }
    ]
  }
}

Sensor Data Monitoring

Filter for anomalous readings from specific sensors in an array:

{
  "name": "Anomalous Sensor Readings",
  "filter": {
    "operator": "OR",
    "filters": [
      {
        "operator": "AND",
        "filters": [
          {
            "field": "sensors[0].temperature",
            "operator": "GREATER_THAN",
            "value": 85.0
          },
          {
            "field": "sensors[0].status",
            "operator": "EQUALS",
            "value": "ACTIVE"
          }
        ]
      },
      {
        "field": "sensors[1].pressure",
        "operator": "NOT_BETWEEN",
        "lowerBound": 25.0,
        "upperBound": 35.0
      },
      {
        "field": "alerts[0].severity",
        "operator": "IN",
        "values": ["HIGH", "CRITICAL"]
      }
    ]
  }
}

This example demonstrates: - Filtering on specific array indices (sensors[0], sensors[1]) - Accessing nested fields within array elements (sensors[0].temperature) - Combining array index filters with other operators

Array Filtering Comparison

Here’s how different array filtering approaches work for various use cases:

Use Case 1: Find documents with a specific tag anywhere in the array

{
  "name": "Documents with Urgent Tag",
  "filter": {
    "field": "tags",
    "operator": "CONTAINS",
    "value": "urgent"
  }
}

Use Case 2: Check if the first author is from a specific department

{
  "name": "Primary Author from Engineering",
  "filter": {
    "field": "authors[0].department",
    "operator": "EQUALS",
    "value": "Engineering"
  }
}

Use Case 3: Complex filtering - any high-priority alert OR first sensor overheating

{
  "name": "Critical Conditions",
  "filter": {
    "operator": "OR",
    "filters": [
      {
        "field": "alerts",
        "operator": "CONTAINS",
        "value": "high-priority"
      },
      {
        "field": "sensors[0].temperature",
        "operator": "GREATER_THAN",
        "value": 100
      }
    ]
  }
}

Best Practices

  1. Test with Dry Run: Always validate filters using dryRun=true before creating filtered datasets

  2. Consider Performance: Complex OR filters and nested conditions may impact stream processing performance

  3. Use Appropriate Types: Ensure filter values match the field types in your schema

  4. Leverage Nested Access: Use dot notation for filtering on nested JSON structures