S3 Event Stream

How to consume a message from a Kafka Topic and be notified when a new file is placed into a MinIO bucket?

  • All buckets in SDL have event notifications attached to them and will publish any new events to the Kafka topic: “minio-events”

Internal Consume

Set Up

  1. Log into SDL JupyterHub

    • Navigate to JupyterHub

    • Home/Analyze/Jupyterhub

    • Click on examples folder

    • Double click minio-events-consumer.ipynb

    • This is a template with the topic name of “minio-events”

      1. Open SDL Kafka in a separate tab

        1. Click on Topics

        2. Search for “minio-events” Topic and select

        3. Click Messages

        4. Sort by Newest First (dropdown on right side of window)

      2. Open SDL Object Store in a separate tab

      3. Optional, you can open the SDL Topics page to verify messages were published from the minio bucket to the Kafka producer topic “minio-events”.

Test

  1. In SDL JupyterHub

    1. minio-events-consumer.ipynb is designed to consume the “minio-events” topic

    2. Run all 3 code blocks and start the consumer

  2. Go to SDL Object Store

    1. Upload a file into the bucket of choice

  3. Go to SDL Kafka

    1. Refresh message page of the “minio-events” topic and you will see the event has been published to the “minio-events” topic

  4. Go to SDL JupyterHub

    1. You will see the message output of the consumer for the “minio-events” topic

External Consume

  1. Navigate to SDL Topics

    • Home/Explore/Topics

    • Click KAFKA CA EXPORT

    • This will download a zip file with configurations and sample code for running an external Kafka Consumer

    • Use the example template minio-events-consumer.ipynb as a starting point

Options for filtering messages on the topic you are consuming

Offset

Offset

This is a unique identifier assigned to each message within a partition in Kafka. It is essentially a sequence number that allows consumers to keep track of their position within a partition.

Filtering on Offset

This means consuming messages starting from a specific offset or within a range of offsets. For example, you might consume messages starting from offset 10 up to offset 20 in a partition. This allows you to skip earlier messages and start consuming from a particular point of interest.

Partitions

Partition

Kafka topics are divided into partitions for parallel processing and scalability. Each partition is an ordered, immutable sequence of messages.

Filtering on Partitions

This involves specifying which partitions you want to consume messages from. For example, if a topic has 10 partitions, you might decide to consume messages only from partitions 2, 4, and 6.

Who Decides Partitions

Partitions are typically defined by the producer when the topic is created or configured. The producer decides how to distribute messages across partitions, which can be based on the message key (using a hash function) or a round-robin mechanism if no key is provided.

Key Serde and Value Serde

Serde

Stands for Serializer/Deserializer. It defines how keys and values are serialized (converted to bytes) when producing messages and deserialized (converted back to their original form) when consuming messages.

Filtering on Key Serde and Value Serde

This means filtering messages based on their serialized forms. For example, if you are only interested in messages where the key or value matches a certain pattern or format, you can apply filtering logic during deserialization to select only those messages.

Key Serde

Handles the serialization and deserialization of the message key.

Value Serde

Handles the serialization and deserialization of the message value.

MinIO Event Notification Types

The following event notification types are available on each bucket:

PUT

This event is triggered when an object is created or updated in the bucket. In the context of MinIO, this corresponds to the putObject operation, where new objects are uploaded or existing objects are overwritten.

GET

This event is triggered when an object is read from the bucket. In MinIO, this corresponds to the getObject operation, where objects are retrieved or accessed from the storage.

DELETE

This event is triggered when an object is removed from the bucket. This corresponds to the removeObject operation, where objects are deleted from the storage.

REPLICA

This event is related to the replication process. It is triggered when an object is replicated from one bucket to another, typically in a multi-region or disaster recovery setup. This ensures data redundancy and availability across different geographical locations.

ILM (Intelligent Lifecycle Management)

This event is triggered by actions related to lifecycle management policies. ILM policies automate the transition of objects between different storage classes (e.g., from standard to archival storage) or the expiration of objects after a certain period.

SCANNER

This event is triggered by background scanner processes. In MinIO, scanners typically run to verify the integrity of the stored objects, check for data consistency, or perform data healing in erasure-coded setups.