Business Case

In a project where numerous time series from IoT devices are stored, the following architecture is in place:

  • A Kafka broker that holds the measurements obtained from the IoT devices.
  • A simple sink connector that writes the measurements to the InfluxDB database for historical storage.
  • A legacy backend service for process configuration, which uses MySQL for saving configurations.

To ensure that the measurements are being stored in the time series database, we want to create an alerting mechanism that can notify the user if data from an “active” IoT device stops arriving at the database.

First Solution

The first solution we found is to use the native checks in InfluxDB (version 1 in our case) as demonstrated in this article: How to Use a Deadman Check to Alert on Service Outage.

The article illustrates the creation of these checks for InfluxDB version 2, and the process for version 1 is similar.

The outlined solution, however, has a significant limitation, at least in the case of InfluxDB version 1.

The Deadman check can be created for a single measurement.

It is not possible to create a single Deadman check that spans multiple measurements, even if they reside in the same database.

In our case, each IoT device is associated with a specific measurement, which means we would need to manually create apimately 2000 Deadman checks. Additionally, these checks would need to be manually activated and deactivated by an operator if one of the IoT devices is turned off or on.

It is possible that the design of the data layout, as discussed in Data Layout and Schema Design Best Practices for InfluxDB, was not done correctly.

However, this is the current situation, and it was not possible for us to make changes to the system. So this first solution is not feasible in our case.

Second Solution

The second solution we found aims to create an automated solution and to address some issues inherited from the previous architecture.

Furthermore, the main points have the following requirements:

  • The solution must be decoupled from external configuration databases and must be purely in classic event-driven style.
  • The solution must be decoupled from the underlying technology for data historization. It should be interchangeable with changes to the solution without altering the events that flow through Kafka.

Here is the high level schema:

Decoupling MySQL

Along with the first point, the dual write problem must also be addressed, which is thoroughly explained in this article: The Dual Write Problem.

The legacy “configurator” software currently writes to the database and then sends the corresponding event to Kafka, thus falling into this error.

To address this, we will use the transactional outbox pattern, as described here: The Transactional Outbox Pattern.

This approach also involves using Kafka Connect (Debezium) to generate Cloud Events from the outbox table.

This way, we can generate events corresponding to the activation/deactivation of specific IoT devices, with the device’s identifying UUID corresponding to the measurement name in Influx, while remaining decoupled from MySQL.

Decoupling Kapacitor/Influx

We need to create a process that listens to Kafka for the on/off events of a device. At that point, the process will call the Kapacitor API to enable or disable the alarms and must also be capable of publishing alerts to Kafka.

To achieve this, we can create a process that translates the events into calls to the Kapacitor HTTP API. This process can also expose an HTTP API that, when invoked, generates alerting events on Kafka.

To accomplish this, Benthos can be used (now Redpanda connect). Benthos is a versatile and high-performance stream processing tool that can facilitate the integration between Kafka and HTTP APIs.

Steps

So, to recap, here are some of the steps in chronological order:

  1. The operator turns on an IoT device (so we need to activate the alert).
  2. The “configurator” process makes its changes on the DB side and writes a new entry in the outbox table with the necessary metadata for creating a cloud event.
  3. Kafka-Connect (Debezium, supported by Red Hat in AMQ Streams) will create the cloud event on a Kafka topic.
  4. Benthos will listen for the event and generate an HTTP call to Kapacitor to create a new Deadman alert or enable it if it already exists.
  5. If the Deadman alert is triggered, Kapacitor will make an HTTP POST request to Benthos, passing the alert metadata.
  6. Benthos will then generate the alert event on Kafka.

Code

Let’s go ahead and outline the steps to create this solution:

OutBox Table

The first thing to do is create the Outbox table. To do this, you can follow the article Reliable Microservices Data Exchange With the Outbox Pattern along with the related example on GitHub: Debezium Outbox Example.

CREATE TABLE `outbox` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `payload` json DEFAULT NULL,
  `type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
  `aggregateid` char(36) NOT NULL,
  `aggregatetype` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `aggregateid` (`aggregateid`)
) ENGINE=InnoDB AUTO_INCREMENT=257 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

Obviously, it’s necessary to modify the process that will add entries to the Outbox table. The important thing is that this is done within a single transaction.

Kafka Connect

It is necessary to configure Kafka Connect on AMQ Streams (in our case, on OpenShift). This requires two steps:

  1. Create the Docker Image with the necessary plugins
  2. Configure a Connect Cluster
  3. Configure a Source Connector

The first step is necessary in our case because our environment is airgapped. So we need a Docker Image with all the necessary packages already installed.

The Dockerfile used is the following:

FROM registry.redhat.io/amq7/amq-streams-kafka-31-rhel8:2.1.0
USER root:root
RUN set -ex && \
    mkdir -p /opt/kafka/plugins && \
    cd /opt/kafka/plugins && \
    curl https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-mysql/2.7.3.Final-redhat-00003/debezium-connector-mysql-2.7.3.Final-redhat-00003-plugin.zip -O && \
    unzip debezium-connector-mysql-2.7.3.Final-redhat-00003-plugin.zip && \
    curl https://maven.repository.redhat.com/ga/io/debezium/debezium-scripting/2.7.3.Final-redhat-00003/debezium-scripting-2.7.3.Final-redhat-00003.zip -O && \
    unzip debezium-scripting-2.7.3.Final-redhat-00003.zip && \
    curl https://repo1.maven.org/maven2/org/apache/groovy/groovy/4.0.24/groovy-4.0.24.jar -O && \
    curl https://repo1.maven.org/maven2/org/apache/groovy/groovy-jsr223/4.0.24/groovy-jsr223-4.0.24.jar -O && \
    curl https://repo1.maven.org/maven2/org/apache/groovy/groovy-json/4.0.24/groovy-json-4.0.24.jar && \
    rm debezium-connector-mysql-2.7.3.Final-redhat-00003-plugin.zip debezium-scripting-2.7.3.Final-redhat-00003.zip
USER 1001

All those dependencies are necessary for debezium to work. More on that Debezium User Guide and Debezium Openshift

For the second step, we created the following definition for the Kafka Connect CRD:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  annotations:
    strimzi.io/use-connector-resources: 'true'
  name: connect-cluster
  namespace: <redacted>
spec:
  bootstrapServers: 'central-cluster-kafka-bootstrap.<redacted>.svc.cluster.local:9092'
  config:
    config.storage.replication.factor: -1
    config.storage.topic: connect-cluster-configs
    group.id: connect-cluster
    offset.storage.replication.factor: -1
    offset.storage.topic: connect-cluster-offsets
    status.storage.replication.factor: -1
    status.storage.topic: connect-cluster-status
  image: <redacted>
  replicas: 1
  version: 3.1.0

For the third step, we created the following definition for the Kafka Connector CRD:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  labels:
    strimzi.io/cluster: connect-cluster
  name: mysql-source-connector
  namespace: <redacted>
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  config:
    predicates.isHeartbeat.pattern: __debezium-heartbeat.*
    value.converter: io.debezium.converters.CloudEventsConverter
    table.include.list: <db name redacted>.outbox
    transforms.outbox.type: io.debezium.transforms.outbox.EventRouter
    transforms.addMetadataHeaders.predicate: isHeartbeat
    database.hostname: 192.168.6.38
    transforms: 'addMetadataHeaders,outbox'
    predicates.isHeartbeat.type: org.apache.kafka.connect.transforms.predicates.TopicNameMatches
    transforms.outbox.table.fields.additional.placement: 'type:header'
    transforms.addMetadataHeaders.fields: 'source,op,transaction'
    tombstones.on.delete: 'false'
    transforms.addMetadataHeaders.type: org.apache.kafka.connect.transforms.HeaderFrom$Value
    database.password: <redacted>
    header.converter.schemas.enable: 'true'
    value.converter.metadata.source: 'header,id:generate,type:header,dataSchemaName:header'
    topic.prefix: connector-mysql
    transforms.outbox.table.expand.json.payload: 'true'
    transforms.addMetadataHeaders.negate: 'true'
    database.server.id: 184051
    database.port: 3306
    transforms.addMetadataHeaders.headers: 'source,op,transaction'
    transforms.addMetadataHeaders.operation: copy
    database.include.list: <db name redacted>
    value.converter.json.schemas.enable: 'false'
    predicates: isHeartbeat
    schema.history.internal.kafka.topic: schema-changes.<db name redacted>
    schema.data.name.source.header.enable: true
    database.user: <redacted>
    schema.history.internal.kafka.bootstrap.servers: 'central-cluster-kafka-bootstrap.<redacted>.svc.cluster.local:9092'
    header.converter: org.apache.kafka.connect.json.JsonConverter

Alerting System

For the deployment of Benthos and Kapacitor, we decided to create a Helm chart that, through dependencies on the Helm charts for Kapacitor and Benthos, performs all the initialization steps.

Indeed, it is necessary for the Helm chart to:

  1. Install the template for alert creation using a job.
  2. Create the ingress for communication with the InfluxDB instance outside of OpenShift.
  3. Create the secret with user credentials.

These are the chart dependencies:

dependencies:
  - name: kapacitor
    version: "1.4.7"
    condition: kapacitor.enabled
    repository: "https://helm.influxdata.com/"
  - name: benthos
    version: "2.2.0"
    condition: benthos.enabled
    repository: https://benthosdev.github.io/charts/

Template

This is the template (more here: template docs):

// Template VARS
var name  string // name of the task
var db string // database to use
var rp string // retention policy to use
var measurement string // which measurement to consume

// with default value
var groupBy = []
var whereFilter = lambda: ("ty" == 'F')
var period = 1m
var message = ' id:{{.ID}} name:{{.Name}} taskname:{{.TaskName}} group:{{.Group}} tags:{{.Tags}} level:{{.Level}} fields:{{.Fields}} time:{{.Time}}  '
var outputDB = 'chronograf'
var outputRP = 'autogen'
var outputMeasurement = 'alerts'
var threshold = 0.0
var triggerType = 'deadman'
var idVar = name
var idTag = 'alertID'
var levelTag = 'level'
var messageField = 'message'
var durationField = 'duration'
var query = 'SELECT * FROM "' + db + '"."' + rp + '"."' + measurement + '" WHERE "ty" = \'F\''


// alert definition

var data = batch
    |query(query)
        .period(period)
        .every(period)
        .groupBy(groupBy)

var trigger = data
    |deadman(threshold, period)
        .message(message)
        .id(idVar)
        .idTag(idTag)
        .levelTag(levelTag)
        .messageField(messageField)
        .durationField(durationField)
        .stateChangesOnly()


trigger
    |eval(lambda: "emitted")
        .as('value')
        .keep('value', messageField, durationField)
    |eval(lambda: float("value"))
        .as('value')
        .keep()
    |influxDBOut()
        .create()
        .database(outputDB)
        .retentionPolicy(outputRP)
        .measurement(outputMeasurement)
        .tag('alertName', name)
        .tag('triggerType', triggerType)

trigger
    |httpOut('output')
    
trigger
    .post('output')
    .endpoint('benthos')

The template is installed by a Kubernetes Job which read the template from a configmap, uses curl to call the Kapacitor template API:

apiVersion: batch/v1
kind: Job
metadata:
  name: {{ include "alerting-system.fullname" . }}-templates-installer
  labels:
    {{- include "alerting-system.labels" . | nindent 4 }}
  annotations:
    "helm.sh/hook": post-install,post-upgrade
    "helm.sh/hook-delete-policy": hook-succeeded
spec:
  template:
    spec:
      containers:
      - name: curl
        image: "{{ .Values.templateInstallJob.image.repository }}:{{ .Values.templateInstallJob.image.tag | default "latest" }}"
        command: ["sh", "-c"]
        args:
          - |
            set -ex
            base_url='http://{{ template "kapacitor.fullname" .Subcharts.kapacitor }}:9092/kapacitor/v1/templates'
            script=`cat /config/measurement_series_not_fed_template`
            sleep 10
            response_code=$(curl -s -o /dev/null -w "%{http_code}" "$base_url/measurement_series_not_fed_template")
            if [ "$response_code" -eq 404 ]; then
              curl --fail -X POST \
                -H "Content-Type: application/json" \
                --data-raw "{\"id\":\"measurement_series_not_fed_template\",\"type\":\"batch\",\"script\":\"$script\"}" \
                "$base_url"
            elif [ "$response_code" -eq 200 ]; then
              curl --fail -X PATCH \
                -H "Content-Type: application/json" \
                --data-raw "{\"id\":\"measurement_series_not_fed_template\",\"type\":\"batch\",\"script\":\"$script\"}" \
                "$base_url/measurement_series_not_fed_template"
            else
              echo "Unexpected response code: $response_code"
              exit 1
            fi            
        volumeMounts:
        - name: config-volume
          mountPath: /config
      volumes:
      - name: config-volume
        configMap:
          name: {{ include "alerting-system.fullname" . }}
      restartPolicy: Never
  backoffLimit: 10

Benthos and Kapacitor

This is the value for configuring Benthos and Kapacitor in our environment:

templateInstallJob:
  image:
    repository: curlimages/curl
    pullPolicy: IfNotPresent
    # Overrides the image tag whose default is the chart appVersion.
    tag: "8.11.0"

ingress:
  enabled: true
  className: ""
  annotations: {}
  hosts:
    - host: kapacitor.apps.ocp1.<redacted>
      paths:
        - path: /
          pathType: ImplementationSpecific
  tls: []



selectorLabels:



kapacitor:
  enabled: true
  ## kapacitor image version
  ## ref: https://hub.docker.com/r/library/kapacitor/tags/
  ##
  image:
    repository: "kapacitor"
    tag: "1.7.6-alpine"
    pullPolicy: "IfNotPresent"

  ## Deployment Strategy type
  # strategy:
  #   type: Recreate

  ## Specify a service type, defaults to NodePort
  ## ref: http://kubernetes.io/docs/user-guide/services/
  ##
  service:
    type: ClusterIP

  ## Persist data to a persistent volume
  ## ref: http://kubernetes.io/docs/user-guide/persistent-volumes/
  ##
  persistence:
    enabled: true
    ## kapacitor data Persistent Volume Storage Class
    ## If defined, storageClassName: <storageClass>
    ## If set to "-", storageClassName: "", which disables dynamic provisioning
    ## If undefined (the default) or set to null, no storageClassName spec is
    ##   set, choosing the default provisioner.  (gp2 on AWS, standard on
    ##   GKE, AWS & OpenStack)
    ##
    # storageClass: "-"
    accessMode: ReadWriteOnce
    size: 8Gi
    # existingClaim: ""

  ## Configure resource requests and limits
  ## ref: http://kubernetes.io/docs/user-guide/compute-resources/
  ##
  resources:
    requests:
      memory: 256Mi
      cpu: 0.1
    limits:
      memory: 1Gi
      # cpu: 1

  ## Set the environment variables for kapacitor (or anything else you want to use)
  ## ref: https://hub.docker.com/_/kapacitor/
  ## ref: https://docs.influxdata.com/kapacitor/latest/administration/configuration/
  ##
  # Examples below
  #
  envVars:
    KAPACITOR_HTTP_LOG_ENABLED: false
    KAPACITOR_LOGGING_LEVEL: "ERROR"
    KAPACITOR_INFLUXDB_0_HTTP_PORT: "80"
    KAPACITOR_INFLUXDB_0_KAPACITOR_HOSTNAME: "kapacitor.apps.ocp1.<redacted>"
    KAPACITOR_HTTPPOST_0_ENDPOINT: "benthos"
    KAPACITOR_HTTPPOST_0_URL: "http://benthos-internal-service/post"
  #
  # or, at your terminal, with
  #
  # helm install --name kapacitor-rls --set influxURL=http://influxurl.com,envVars.KAPACITOR_SLACK_ENABLED=true,envVars.KAPACITOR_SLACK_URL="http://slack.com/xxxxx/xxxxx/xxxx/xxxxxxx" influxdata/kapacitor

  ## Set the URL of InfluxDB instance to create subscription on
  ## ref: https://docs.influxdata.com/kapacitor/v1.1/introduction/getting_started/
  ##
  # influxURL: http://influxdb-influxdb.tick:8086
  influxURL: http://192.168.6.39:8086

  ## Name of an existing Secrect used to set the environment variables for the
  ## InfluxDB user and password. The expected keys in the secret are
  ## `influxdb-user` and `influxdb-password`.
  ##
  existingSecret: influx-auth


  ## Affinity for pod assignment
  ## Ref: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#affinity-and-anti-affinity
  ##
  affinity: {}

  ## Tolerations for pod assignment
  ## Ref: https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/
  ##
  tolerations: []
  # - key: "key"
  #   operator: "Equal|Exists"
  #   value: "value"
  #   effect: "NoSchedule|PreferNoSchedule|NoExecute(1.6 only)"

  ## Role based access control
  rbac:
    create: true
    namespaced: true

  ## Service account
  serviceAccount:
    annotations: {}
    create: true
    name:

  ## Override the deployment namespace
  namespaceOverride: ""

benthos:
  enabled: true
  deployment:
    replicaCount: 1

  image:
    repository: "docker.io/jeffail/benthos"

  resources:
    limits:
      cpu: 200m
      memory: 128Mi
    requests:
      cpu: 200m
      memory: 128Mi

  autoscaling:
    enabled: false


  serviceMonitor:
    enabled: true

  metrics:
    prometheus: {}

  logger:
    level: INFO
    static_fields:
      '@service': benthos


  config: |
    input:
      broker:
        inputs:
          - kafka:
              addresses:
                - "central-cluster-kafka-bootstrap.<redacted>.svc.cluster.local:9092"
              topics: ["<redacted>"]
              consumer_group: "kapacitor-alerts-connector"
              start_from_oldest: true

          - http_server:
              path: /post
              ws_path: /post/ws
              allowed_verbs:
                - POST
              timeout: 10s

    pipeline:
      processors:
        - bloblang: 'meta original_message = this'
        - log:
            level: DEBUG
            message: "Received message: ${!json()}"

        - try:
          - group_by:
            - check: this.exists("message")
              processors:
                - mapping: 'meta grouping = "historicization-event"'
                - type: log
                  log:
                    level: INFO
                    message: |
                      Received http post from kapacitor with id: ${!json("id")}, time: ${!json("time")} and level: ${!json("level")} 
                - bloblang: |
                    root = { 
                      "id": uuid_v4(),
                      "subject": this.id.trim_suffix("_not_fed"),
                      "source": "benthos",
                      "specversion": "1.0",
                      "time": this.time,
                      "type": if this.level == "OK" {
                        "historicization-restarted"
                      } else if this.level == "CRITICAL" {
                        "historicization-stopped"
                      }
                    }
                - log:
                    level: INFO
                    message: Sending cloudevent message to kafka ${!json()}"


            - check: this.exists("type")
              processors:
                - mapping: 'meta grouping = "input-stream-event"'
                - type: log
                  log:
                    level: INFO
                    message: |
                      Received cloud event with id: ${!json("id")} and type: ${!json("type")}
                - bloblang: |
                    root = this.data.devicescode
                - unarchive:
                    format: json_array
                - bloblang: |
                    let parsed = meta("original_message").parse_json()
                    root = {
                      "devicecode": this,
                      "eventtype": $parsed.data.eventtype,
                    }
                - switch:
                  - check: this.eventtype == "disabled"
                    processors:
                      - bloblang: |
                          meta type = "disabled"
                          meta id = this.devicecode + "_not_fed"
                          root = { 
                            "status": "disabled" 
                          }
                  - check: this.eventtype == "enabled"
                    processors:
                      - try:
                        - bloblang: |
                            meta type = "enabled"
                            meta id = this.devicecode + "_not_fed"
                            root = this
                        - http:
                            url: http://{{ template "alerting-system.kapacitorFullname" . }}:9092/kapacitor/v1/tasks/${!meta("id")}
                            verb: GET
                            retries: 1
                        - bloblang: |
                            root = { 
                              "status": "enabled" 
                            }
                      - catch:
                        - log:
                            level: WARN
                            message: Task ${!meta("id")} not found. Fallback to created
                        - bloblang: |
                            meta type = "created"
                            meta id = this.devicecode + "_not_fed" 
                            root = {
                              "id": this.devicecode + "_not_fed",
                              "template-id": "measurement_series_not_fed_template",
                              "dbrps": [{"db": "<redacted>", "rp": "autogen"}],
                              "vars": {
                                "name": {
                                  "type": "string",
                                  "value": this.devicecode + "_not_fed"
                                },
                                "measurement": {
                                  "type": "string",
                                  "value": this.devicecode
                                },
                                "db": {
                                  "type": "string",
                                  "value": "<redacted>"
                                },
                                "rp": {
                                  "type": "string",
                                  "value": "autogen"
                                }
                              },
                              "status": "enabled"
                            }
                - log:
                    level: INFO
                    message: Sending message to kapacitor type "${!meta("type")}" for ${!meta("id")}"
                - log:
                    level: DEBUG
                    message: Sending message to kapacitor ${!meta("id")} with body ${!json()}"
        - catch:
          - log:
              level: ERROR
              message: "Unable to creating message due to: ${!error()}. Sending original message to dlq"

    output:
      # stdout:
      #   codec: lines
      fallback:
        - switch:
            retry_until_success: false
            cases:
              - check: 'meta("grouping") == "input-stream-event" && meta("type") == "created"'
                output:
                  http_client:
                    retries: 10  
                    retry_period: "10s"
                    url: "http://{{ template "alerting-system.kapacitorFullname" . }}:9092/kapacitor/v1/tasks"
                    verb: POST
                    headers:
                      Content-Type: "application/json"

              - check: 'meta("grouping") == "input-stream-event" && meta("type") == "deleted"'
                output:
                  http_client:
                    retries: 0
                    url: http://{{ template "alerting-system.kapacitorFullname" . }}:9092/kapacitor/v1/tasks/${!meta("id")}
                    verb: DELETE
                    headers:
                      Content-Type: "application/json"

              - check: 'meta("grouping") == "input-stream-event" && meta("type") == "enabled"'
                output:
                  http_client:
                    retries: 10
                    retry_period: "5s"
                    url: http://{{ template "alerting-system.kapacitorFullname" . }}:9092/kapacitor/v1/tasks/${!meta("id")}
                    verb: PATCH
                    headers:
                      Content-Type: "application/json"
              - check: 'meta("grouping") == "input-stream-event" && meta("type") == "disabled"'
                output:
                  http_client:
                    retries: 0
                    url: http://{{ template "alerting-system.kapacitorFullname" . }}:9092/kapacitor/v1/tasks/${!meta("id")}
                    verb: PATCH
                    headers:
                      Content-Type: "application/json"
              - check: 'meta("grouping") == "historicization-event"'
                output:
                  kafka:
                    addresses:
                      - "central-cluster-kafka-bootstrap.<redacted>.svc.cluster.local:9092"
                    topic: "kapacitor-alerts-connector.events"
                    target_version: 2.1.0
                    key: ""
                    compression: gzip
                    static_headers: {}
                    metadata:
                      exclude_prefixes: []
                    max_in_flight: 64
              - output:
                  processors:
                    - bloblang: |
                        root = {
                          "original_message": meta("original_message"),
                          "send_message": this
                        }
                  kafka:
                    addresses:
                      - "central-cluster-kafka-bootstrap.<redacted>.svc.cluster.local:9092"
                    topic: "kapacitor-alerts-connector.dead-letters-queue"
                    target_version: 2.1.0
                    key: ""
                    compression: gzip
                    static_headers: {}
                    metadata:
                      exclude_prefixes: []
                    max_in_flight: 64

        - processors:
            - bloblang: |
                root = {
                  "original_message": meta("original_message"),
                  "send_message": this
                }
          kafka:
            addresses:
              - "central-cluster-kafka-bootstrap.<redacted>.svc.cluster.local:9092"
            topic: "kapacitor-alerts-connector.dead-letters-queue"
            target_version: 2.1.0
            key: ""
            compression: gzip
            static_headers: {}
            metadata:
              exclude_prefixes: []
            max_in_flight: 64    

Here some considerations:

  • To allow external Influx to connect to the Kapacitor deployed on OpenShift and exposed via Ingress, it is necessary to set the environment variable KAPACITOR_INFLUXDB_0_KAPACITOR_HOSTNAME.
  • A dead letter queue is used in Benthos for messages that fail to be parsed or processed successfully.

Useful Resources

Below are some of the main resources I used. Note that the versions are not consistent, but in some newer versions of the documentation, you can find more information even for older versions.