Connection Arity

Think about a Transformer whose purpose is to monitor a series of Kafka topics for anomalous messages and send an alert somewhere. With the Transformer specification as discussed thus far, a Kafka Monitor Transformer would only be able to monitor a set number of Kafka Topics, as each input and output connection must be known when registering the Transformer. The only way to dynamically adjust a Pipeline to new inputs and outputs is to deploy additional replicas.

Visually, this is what the Transformer might look like:

connection arity example

Modeling Transformers as functions that operate on connections, then we’d want to have something akin to this in Python:

def monitor_kafka(*args):
  pass

Or this in Go:

func monitorKafka(inputConnections ...string) {}

These are referred to as variadic arguments, where the exact number is not specified in the function signature. The number of arguments specified in a function signature is referred to as its arity. That same concept can be applied to Data Pipelines.

The pipeline engine supports arity for all inputs and outputs. It takes the form of an optional field declared for each input and output connection in a Transformer template. It’s presence tells the pipeline engine how to validate and resolve one or more matching instances of that connection in a pipeline submission. This allows a Transformer to accept optional, discrete arity, or variadic connections.

Arity is added inside a connection:

"inputs": {
  "KAFKA_TOPIC_NAME": {
    "conn_type": "INTERNAL_KAFKA",
    "arity": {
      "min": 0,
      "max": 1
    }
  }
}

arity is a range-inclusive min/max processed by the Pipeline Engine to allow matching of multiple connections in a Pipeline to a single input or output in a Transformer template. Importantly, arity enables three key improvements to Transformer templates:

  1. The ability to declare an input or output as optional; if a matching connection isn’t provided in a Pipeline submission, the Pipeline is still valid.

  2. The ability to map a discrete number of connections to an input or output.

  3. The ability to use variadic connections, where the number of connections that may map to a single connection template are not specified by the Transformer template.

The various types of arity are discussed below.

Default Arity

By default, any Transformer template with connections that do not specify arity will retain the default, backwards-compatible behavior: the given connection must be provided by any Pipeline submission, and it must be a single value. Additionally, the following arity blocks are interpreted by the pipeline engine as default cases:

"arity":  {"min": 1, "max": 1}
"arity":  {"min": 0, "max": 0}

Optional Arity

With the default arity, all connection templates must be provided a value for a Pipeline submission to be considered valid. With an optional arity, if no matching connection is found the pipeline engine moves on:

"arity":  {"min": 0, "max": 1}
If an optional connection is not provided in a Pipeline submission, the pipeline engine does not set that configuration (ex. that environment variable). Transformer writers should be careful to not depend on environment variables being present if using optional connections.

Discrete Arity

Discrete arity allows for a range of possible connections to be matched to a connection template. For any connection template whose arity.max is greater than 1 or less than 0 (more about that below), the pipeline engine does not match the connection name directly when configuring the Transformer, as it does for the default behavior. Instead, the pipeline looks for connections in the Pipeline submission of the form

<connection_name>_{0-9}+

i.e. the source connection name followed by an underscore (_) and a number. This is flexible enough for any number of matching connections to be passed in a Pipeline submission. To illustrate this, take the following connection template from a Transformer template:

"outputs": {
  "DEST": {
    "conn_type": "INTERNAL_KAFKA",
    "arity": {
      "min": 1,
      "max": 3
    }
  }
}

This tells the pipeline engine to expect from 1 to 3 matching connections of the form DEST_{0-9}+, and that they should all resolve to the underlying conn_type of INTERNAL_KAFKA. A valid Pipeline submission might look like:

"outputs": {
  "DEST_1": {"conn_type": "DATASET"},
  "DEST_2": {"conn_type": "INTERNAL_KAFKA"},
  "DEST_4": {"conn_type": "DATASET"}
}
While numeric suffixes are validated, they don’t need to be contiguous or start at 1, as demonstrated above by DEST_4. Transformer writers should make sure their app searches for environment according to the regular expression instead of depending on specific numeric indices.

The connection types each may have a different conn_type, as long as they all resolve to the correct underlying conn_type from the Transformer template. If an invalid number of connections match the connection template, then an error is returned by the pipeline engine prior to instantiating the Pipeline.

For any connection template with an arity greater than 1, the pipeline engine validates that its name does not conflict with any other connection names. For example, a Transformer template with the above output connection template with discrete arity cannot also have a connection template like:

"DEST_12345": {
  "conn_type": "INTERNAL_KAFKA"
}

The reason for this is that the pipeline engine would be unable to validate whether a connection named DEST_12345 in a Pipeline submission should be counted as part of DEST or DEST_12345.

Variadic Arity

In this case, variadic arity refers to an unbounded maximum arity:

"arity":  {"min": 1, "max": -1}

Truly unbounded variadic arity would look like this, where any number of matching connections are permissible:

"arity":  {"min": 0, "max": -1}

The same naming conventions for connections passed in Pipeline submissions apply to variadic connections as for those with discrete arity.