Basic Example

This is a very basic Transformer that does a single thing: forwards messages from one Kafka topic to another. It needs a few configuration options to run:

  1. Configuration to connect to Kafka

  2. The input Kafka topic

  3. The output Kafka topic

  4. An optional setting for Kafka compression

Also assume that a container has been built and pushed to a registry the pipeline engine can read from.

This is what a configuration JSON for this Transformer might look like:

{
  "name": "Kafka Topic Forwarder",
  "description": "Forwards one Kafka Topic to another",
  "status": "available",
  "types": [],
  "inputs": {
    "SOURCE_TOPIC": {
      "display_name": "Source topic",
      "conn_type": "INTERNAL_KAFKA"
    }
  },
  "outputs": {
    "DEST_TOPIC": {
      "display_name": "Destination topic",
      "conn_type": "INTERNAL_KAFKA"
    }
  },
  "configuration": {
    "environment": [
      {
        "name": "KAFKA_COMPRESSION_TYPE",
        "description": "Kafka compression to use when publishing",
        "default_value": "none"
      }
    ],
    "static_environment": []
  },
  "instantiation": {
    "job_image": {
      "image": "topic-forwarder-image",
      "pull_policy": "Always",
      "image_pull_secret": "my-registry-secret",
      "default_replicas": 1
    }
  }
}

Explanation

Taking the configuration block by block:

{
  "name": "Kafka Topic Forwarder",
  "description": "Forwards one Kafka Topic to another",
  "status": "available",
  "types": [],
}

These options are fairly self explanatory. name and description help to describe the Transformer once it’s registered in the Pipeline Engine. Another optional field uid can be specified and serves as a globally unique identifier for the Transformer; this allows you to have multiple Transformers with the same name. status should usually be set to available so the Transformer can be used in Pipelines. The types field holds special tags that represent the transformer type (e.g. sink, source, rdp_workflow, etc). None of these types apply to this transformer, so the types field is empty.

{
  "inputs": {
    "SOURCE_TOPIC": {
      "display_name": "Source topic",
      "conn_type": "INTERNAL_KAFKA"
    }
  },
}

This block defines a single input connection. Its name as it will show up in the Transformer’s environment is SOURCE_TOPIC. display_name is helpful if using the Pipeline UI. We know from conn_type that it needs to be of type INTERNAL_KAFKA. For more information on inputs and outputs, see Inputs and Outputs

{
  "outputs": {
    "DEST_TOPIC": {
      "display_name": "Destination topic",
      "conn_type": "INTERNAL_KAFKA"
    }
  },
}

Very similar to inputs - we declare a single output named DEST_TOPIC. Note that since no arity is defined, we assume the default (this output is required and must be provided in a Pipeline for it to be valid). For more information on arity, see Connection Arity.

{
  "configuration": {
    "environment": [
      {
        "name": "KAFKA_COMPRESSION_TYPE",
        "description": "Kafka compression to use when publishing",
        "default_value": "none"
      }
    ],
    "static_environment": []
  },
}

This section defines any additional configuration options that should be supplied to this Transformer.

{
  "instantiation": {
    "job_image": {
      "image": "topic-forwarder-image",
      "pull_policy": "Always",
      "image_pull_secret": "my-registry-secret",
      "default_replicas": 1
    }
  }
}

The last block identifies how this Transformer should be instantiated by the Pipeline Engine. As additional means of instantiating Transformers become available, new options under instantiation will become available. For more information, see Instantiation. In this example, a container has been pushed that the Pipeline Engine can refer to. It will be deployed as a Kubernetes Job and will have a default number of replicas of 1.