Transformer Simulation

Overview

SDL supports the ability to simulate the execution of individual Transformers on predefined sets of inputs. This allows users to iteratively build and test their Transformers before deploying a Pipeline. This is particularly useful in the case of Dynamic Transformers, which allow users to write custom code snippets.

This document is intended for Transformer authors who want their Transformers to support simulation. It outlines the contract that the Data Pipeline Engine expects for Transformer Templates that advertise the ability to be simulated, as well as the functional flow that occurs when a simulation request is received.

Limitations

Currently, the two conn_type variantes that are supported for simulation are INTERNAL_KAFKA, INTERNAL_POSTGRES, and INTERNAL_ICEBERG. This means that simulation is limited to Transformer Templates that only define inputs and outputs of those types. When registering a Transformer Template, the Data Pipeline Engine will validate that these constraints are met and will return an error if the Transformer Template cannot be used for simulation.

Enabling Simulation for a Transformer Template

Transformer Templates that want to use simulation must include the simulation-enabled label in their definition:

{
    "uid": "1afc0a55-ca7e-40db-9efc-971f85f48b42",
    "labels": [
        {"name": "simulation-enabled"}
    ]
    // Rest of definition
}

Simulation API - Kubernetes-based Transformers

This is the contract that the Data Pipeline Engine expects each Transformer Template to adhere to if it defines itself as having the simulation-enabled label. Any Transformer author that wants to enable simulation for their Transformer Template must adhere to this API for simulation to work correctly.

Note that this contract applies only to Transformer Templates that are specified as containerized images to be run in Kubernetes; other instantiation types will require separate simulation APIs.

Assumptions/Prerequisites

  1. Simulation cannot occur with Transformer Templates that have any input connections of type TRANSFORMER, because the method to supply inputs to those connections is unknown by the Data Pipeline Engine.

  2. A Transformer Template should support running either as a Transformer directly or in simulation mode without rebuilding or changing the image.

Detecting Simulation

Transformer Templates should read the value of the environment variable named TRANSFORMER_SIMULATION_MODE to determine whether or not to start in Simulation (FaaS) or Transformer mode:

  • If the environment variable exists and has the value of true, then the Transformer should start up in Simulation mode.

  • In any other case, the Transformer should start normally.

Startup

If running in Simulation mode, a Transformer should not run an event loop or listener, nor should it expect that any configuration values to connect to data sources are present. For example, the configuration contract for INTERNAL_KAFKA connections will not be fulfilled in Simulation mode. However, any user-provided configuration values (as specified in the Transformer Template’s configuration block) will be filled in by the Data Pipeline Engine.

Instead, the Transformer should launch an HTTP server on port 8111 that contains handlers for two routes:

  1. /health: This route should return 200 OK once the Transformer is finished startup and is ready to accept the simulation request. The GET HTTP verb should be handled.

  2. /simulate: This route should handle the simulation request coming from the Data Pipeline Engine via an HTTP POST request, with the JSON-encoded body containing the simulation request itself.

Handling /simulate

The /simulate route should be handled in a very specific way in order to be as efficient as possible. Transformers should anticipate that only a single request to /simulate will be made, and that after returning an HTTP response, the Transformer should shut down. Note that repeated simulation requests from users for the same Transformer Template will result in additional replicas being instantiated, not that the same Transformer will be used to handle more than one request. While this may impact performance slightly, it ensures that repeated simulation requests are stateless.

Request Body

The request body for /simulate is very specific, and is always encoded as application/json:

{
    "inputs": {
        // Each item corresponds to the name of an input connection in the Transformer Template
        "CONNECTION_1": {
            // Note that conn_type is elided here, as it isn't important
            // Assume for this example that CONNECTION_1 is of type INTERNAL_KAFKA.
            "msgs": [
                // Each object here should be fed into the Transformer code as though it were coming from Kafka.
                {"key": "key1", "headers": {}, "value": "Hello, world!"},
                {"key": "key2", "headers": {"__from__": "value1"}, "value": "Hello, world!"},
                {"key": "key3", "headers": {}, "value": "Hello, world!"},
            ]
        },
        "CONNECTION_2": {
            // Assume for this example that CONNECTION_1 is of type INTERNAL_ICEBERG or INTERNAL_POSTGRES.
            "table": {
                // This is a JSON representation of a table that should be created in the Transformer as input.
                "cols": ["col1", "col2", "col3"],
                "rows": [
                    [1, "one", false],
                    [2, "two", true],
                    [3, "three", false]
                ]
            }
        }
    }
}

Upon receipt of this request body, the Transformer should route these inputs into its core transformation logic as though they were coming directly from the connected data sources.

Response

Transformers should attempt to run the simulation request as similiarly to normal operation as they can; ideally the code paths for running simulated requests and normal operation are the same.

Note that normal logging to stdout or stderr will not be captured by the Data Pipeline Engine when returning results to the client. Any logs the Transformer wants to return must be encoded in the response body, as outlined below.

Once a result has been obtained, the Transformer should return a response to the Data Pipeline Engine containing one of these HTTP status codes:

200 OK

Simulation succeeded. The Transformer should return the simulation results in a JSON response body matching this format:

{
    // Additional arbitrary values or objects may be returned by the Transformer in this block. Keys beginning
    // with a double-underscore (__) will NOT be returned to the client, but any others are.
    "metadata": {
        "simulation_time_ms": 26, // Will be returned to the client.
        "__trace_info": { // Will not be returned to the client.
            "id": 273465,
        }
    },
    // `logs` contains zero or more log messages, as output by the Transformer.
    "logs": [
        {"msg": "Hello, world!"},
        {"msg": "Second log"},
        {"msg": "Third log"}
    ],
    // `outputs` should correspond to the list of output connections defined by the Transformer Template.
    // Note that the data model returned in each output (table or msgs) is validated by the Data Pipeline Engine
    // against the conn_type of the output; a 500 will be returned if the returned data do not match the expected
    // conn_type.
    "outputs": {
        "ICEBERG_TABLE_OUTPUT_CONN": {
            "table": {
                "cols": ["output1", "output2"],
                "rows": [
                    [2, "asdf"],
                    [3, "asdf"],
                ]
            }
        },
        "KAFKA_OUTPUT_CONN": {
            "msgs": [
                {"key": "key1", "headers": {}, "value": "Hello, world!"},
                {"key": "key2", "headers": {"__from__": "value1"}, "value": "Hello, world!"},
                {"key": "key3", "headers": {}, "value": "Hello, world!"},
            ]
        },
    }
}

400 Bad Request

The Transformer failed to run the simulation request due to issues with the data. - For example, the input Kafka messages were raw JSON when the Transformer expected raw XML.

The Transformer should return the error in a JSON response body matching this format:

{
    // Additional arbitrary values or objects may be returned by the Transformer in this block. Keys beginning
    // with a double-underscore (__) will NOT be returned to the client, but any others are.
    "metadata": {
        "simulation_time_ms": 26, // Will be returned to the client.
        "__trace_info": { // Will not be returned to the client.
            "id": 273465,
        }
    },
    // `logs` contains zero or more log messages, as output by the Transformer.
    "logs": [
        {"msg": "Hello, world!"},
        {"msg": "Second log"},
        {"msg": "Third log"}
    ],
    "errors": [
        {"msg": "Connection XYZ must contain XML data"}
    ]
}

500 Internal Server Error

The Transformer failed to run the simulation request for some internal reason. Note that this will be returned to the client as a server-side error, rather than an error on the client’s part.

The Transformer should return the error in a JSON response body matching this format:

{
    // Additional arbitrary values or objects may be returned by the Transformer in this block. Keys beginning
    // with a double-underscore (__) will NOT be returned to the client, but any others are.
    "metadata": {
        "simulation_time_ms": 26, // Will be returned to the client.
        "__trace_info": { // Will not be returned to the client.
            "id": 273465,
        }
    },
    // `logs` contains zero or more log messages, as output by the Transformer.
    "logs": [
        {"msg": "Hello, world!"},
        {"msg": "Second log"},
        {"msg": "Third log"}
    ],
    "errors": [
        {"msg": "Connection XYZ must contain XML data"}
    ]
}

Shutdown

As mentioned above, once a request to /simulate has been handled and responded to, the Transformer is obligated to shut down as quickly as possible, to ensure that simulation requests are as efficient as possible.

Functional Flow

When the Data Pipeline Engine receives a simulation request from a client, it requires the following information to be present:

  1. The UID of the Transformer Template to execute the simulation request against. This tells the Data Pipeline Engine how to instantiate the app that will receive the simulation request.

  2. Configuration for each input connection on the Transformer Template, which will be passed to the app at runtime. This will usually take the form of discrete data, as opposed to a connection to a Dataset.

  3. General Transformer configuration (environment variables, file configurations, etc.)-effectively the same as what configuration is passed to a Transformer when it is used in a Pipeline.

Based on these pieces of information, the Data Pipeline Engine performs the following steps:

  1. It retrieves the Transformer Template with the matching UID from the persistence layer.

  2. It validates the configuration and input blocks from the simulation request against the Transformer Template.

  3. If successful, it instantiates the given Transformer Template as a run-once task.

  4. Once the Transformer is active, it forwards the simulation request to it and waits for a response.

  5. Once it has received a response, it shuts the Transformer down (if it has not shut itself down already).

  6. It returns the simulation response, with additional metadata, to the client.