Integrating Influx Alerting with Event-driven Architecture
Business Case
In a project where numerous time series from IoT devices are stored, the following architecture is in place:
- A Kafka broker that holds the measurements obtained from the IoT devices.
- A simple sink connector that writes the measurements to the InfluxDB database for historical storage.
- A legacy backend service for process configuration, which uses MySQL for saving configurations.
To ensure that the measurements are being stored in the time series database, we want to create an alerting mechanism that can notify the user if data from an “active” IoT device stops arriving at the database.
First Solution
The first solution we found is to use the native checks in InfluxDB (version 1 in our case) as demonstrated in this article: How to Use a Deadman Check to Alert on Service Outage.
The article illustrates the creation of these checks for InfluxDB version 2, and the process for version 1 is similar.
The outlined solution, however, has a significant limitation, at least in the case of InfluxDB version 1.
It is not possible to create a single Deadman check that spans multiple measurements, even if they reside in the same database.
In our case, each IoT device is associated with a specific measurement, which means we would need to manually create apimately 2000 Deadman checks. Additionally, these checks would need to be manually activated and deactivated by an operator if one of the IoT devices is turned off or on.
It is possible that the design of the data layout, as discussed in Data Layout and Schema Design Best Practices for InfluxDB, was not done correctly.
However, this is the current situation, and it was not possible for us to make changes to the system. So this first solution is not feasible in our case.
Second Solution
The second solution we found aims to create an automated solution and to address some issues inherited from the previous architecture.
Furthermore, the main points have the following requirements:
- The solution must be decoupled from external configuration databases and must be purely in classic event-driven style.
- The solution must be decoupled from the underlying technology for data historization. It should be interchangeable with changes to the solution without altering the events that flow through Kafka.
Here is the high level schema:
Decoupling MySQL
Along with the first point, the dual write problem must also be addressed, which is thoroughly explained in this article: The Dual Write Problem.
The legacy “configurator” software currently writes to the database and then sends the corresponding event to Kafka, thus falling into this error.
To address this, we will use the transactional outbox pattern, as described here: The Transactional Outbox Pattern.
This approach also involves using Kafka Connect (Debezium) to generate Cloud Events from the outbox table.
This way, we can generate events corresponding to the activation/deactivation of specific IoT devices, with the device’s identifying UUID corresponding to the measurement name in Influx, while remaining decoupled from MySQL.
Decoupling Kapacitor/Influx
We need to create a process that listens to Kafka for the on/off events of a device. At that point, the process will call the Kapacitor API to enable or disable the alarms and must also be capable of publishing alerts to Kafka.
To achieve this, we can create a process that translates the events into calls to the Kapacitor HTTP API. This process can also expose an HTTP API that, when invoked, generates alerting events on Kafka.
To accomplish this, Benthos can be used (now Redpanda connect). Benthos is a versatile and high-performance stream processing tool that can facilitate the integration between Kafka and HTTP APIs.
Steps
So, to recap, here are some of the steps in chronological order:
- The operator turns on an IoT device (so we need to activate the alert).
- The “configurator” process makes its changes on the DB side and writes a new entry in the outbox table with the necessary metadata for creating a cloud event.
- Kafka-Connect (Debezium, supported by Red Hat in AMQ Streams) will create the cloud event on a Kafka topic.
- Benthos will listen for the event and generate an HTTP call to Kapacitor to create a new Deadman alert or enable it if it already exists.
- If the Deadman alert is triggered, Kapacitor will make an HTTP POST request to Benthos, passing the alert metadata.
- Benthos will then generate the alert event on Kafka.
Code
Let’s go ahead and outline the steps to create this solution:
OutBox Table
The first thing to do is create the Outbox table. To do this, you can follow the article Reliable Microservices Data Exchange With the Outbox Pattern along with the related example on GitHub: Debezium Outbox Example.
CREATE TABLE `outbox` (
`id` bigint NOT NULL AUTO_INCREMENT,
`payload` json DEFAULT NULL,
`type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
`aggregateid` char(36) NOT NULL,
`aggregatetype` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `aggregateid` (`aggregateid`)
) ENGINE=InnoDB AUTO_INCREMENT=257 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
Obviously, it’s necessary to modify the process that will add entries to the Outbox table. The important thing is that this is done within a single transaction.
Kafka Connect
It is necessary to configure Kafka Connect on AMQ Streams (in our case, on OpenShift). This requires two steps:
- Create the Docker Image with the necessary plugins
- Configure a Connect Cluster
- Configure a Source Connector
The first step is necessary in our case because our environment is airgapped. So we need a Docker Image with all the necessary packages already installed.
The Dockerfile used is the following:
FROM registry.redhat.io/amq7/amq-streams-kafka-31-rhel8:2.1.0
USER root:root
RUN set -ex && \
mkdir -p /opt/kafka/plugins && \
cd /opt/kafka/plugins && \
curl https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-mysql/2.7.3.Final-redhat-00003/debezium-connector-mysql-2.7.3.Final-redhat-00003-plugin.zip -O && \
unzip debezium-connector-mysql-2.7.3.Final-redhat-00003-plugin.zip && \
curl https://maven.repository.redhat.com/ga/io/debezium/debezium-scripting/2.7.3.Final-redhat-00003/debezium-scripting-2.7.3.Final-redhat-00003.zip -O && \
unzip debezium-scripting-2.7.3.Final-redhat-00003.zip && \
curl https://repo1.maven.org/maven2/org/apache/groovy/groovy/4.0.24/groovy-4.0.24.jar -O && \
curl https://repo1.maven.org/maven2/org/apache/groovy/groovy-jsr223/4.0.24/groovy-jsr223-4.0.24.jar -O && \
curl https://repo1.maven.org/maven2/org/apache/groovy/groovy-json/4.0.24/groovy-json-4.0.24.jar && \
rm debezium-connector-mysql-2.7.3.Final-redhat-00003-plugin.zip debezium-scripting-2.7.3.Final-redhat-00003.zip
USER 1001
All those dependencies are necessary for debezium to work. More on that Debezium User Guide and Debezium Openshift
For the second step, we created the following definition for the Kafka Connect CRD:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
annotations:
strimzi.io/use-connector-resources: 'true'
name: connect-cluster
namespace: <redacted>
spec:
bootstrapServers: 'central-cluster-kafka-bootstrap.<redacted>.svc.cluster.local:9092'
config:
config.storage.replication.factor: -1
config.storage.topic: connect-cluster-configs
group.id: connect-cluster
offset.storage.replication.factor: -1
offset.storage.topic: connect-cluster-offsets
status.storage.replication.factor: -1
status.storage.topic: connect-cluster-status
image: <redacted>
replicas: 1
version: 3.1.0
For the third step, we created the following definition for the Kafka Connector CRD:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
labels:
strimzi.io/cluster: connect-cluster
name: mysql-source-connector
namespace: <redacted>
spec:
class: io.debezium.connector.mysql.MySqlConnector
config:
predicates.isHeartbeat.pattern: __debezium-heartbeat.*
value.converter: io.debezium.converters.CloudEventsConverter
table.include.list: <db name redacted>.outbox
transforms.outbox.type: io.debezium.transforms.outbox.EventRouter
transforms.addMetadataHeaders.predicate: isHeartbeat
database.hostname: 192.168.6.38
transforms: 'addMetadataHeaders,outbox'
predicates.isHeartbeat.type: org.apache.kafka.connect.transforms.predicates.TopicNameMatches
transforms.outbox.table.fields.additional.placement: 'type:header'
transforms.addMetadataHeaders.fields: 'source,op,transaction'
tombstones.on.delete: 'false'
transforms.addMetadataHeaders.type: org.apache.kafka.connect.transforms.HeaderFrom$Value
database.password: <redacted>
header.converter.schemas.enable: 'true'
value.converter.metadata.source: 'header,id:generate,type:header,dataSchemaName:header'
topic.prefix: connector-mysql
transforms.outbox.table.expand.json.payload: 'true'
transforms.addMetadataHeaders.negate: 'true'
database.server.id: 184051
database.port: 3306
transforms.addMetadataHeaders.headers: 'source,op,transaction'
transforms.addMetadataHeaders.operation: copy
database.include.list: <db name redacted>
value.converter.json.schemas.enable: 'false'
predicates: isHeartbeat
schema.history.internal.kafka.topic: schema-changes.<db name redacted>
schema.data.name.source.header.enable: true
database.user: <redacted>
schema.history.internal.kafka.bootstrap.servers: 'central-cluster-kafka-bootstrap.<redacted>.svc.cluster.local:9092'
header.converter: org.apache.kafka.connect.json.JsonConverter
Alerting System
For the deployment of Benthos and Kapacitor, we decided to create a Helm chart that, through dependencies on the Helm charts for Kapacitor and Benthos, performs all the initialization steps.
Indeed, it is necessary for the Helm chart to:
- Install the template for alert creation using a job.
- Create the ingress for communication with the InfluxDB instance outside of OpenShift.
- Create the secret with user credentials.
These are the chart dependencies:
dependencies:
- name: kapacitor
version: "1.4.7"
condition: kapacitor.enabled
repository: "https://helm.influxdata.com/"
- name: benthos
version: "2.2.0"
condition: benthos.enabled
repository: https://benthosdev.github.io/charts/
Template
This is the template (more here: template docs):
// Template VARS
var name string // name of the task
var db string // database to use
var rp string // retention policy to use
var measurement string // which measurement to consume
// with default value
var groupBy = []
var whereFilter = lambda: ("ty" == 'F')
var period = 1m
var message = ' id:{{.ID}} name:{{.Name}} taskname:{{.TaskName}} group:{{.Group}} tags:{{.Tags}} level:{{.Level}} fields:{{.Fields}} time:{{.Time}} '
var outputDB = 'chronograf'
var outputRP = 'autogen'
var outputMeasurement = 'alerts'
var threshold = 0.0
var triggerType = 'deadman'
var idVar = name
var idTag = 'alertID'
var levelTag = 'level'
var messageField = 'message'
var durationField = 'duration'
var query = 'SELECT * FROM "' + db + '"."' + rp + '"."' + measurement + '" WHERE "ty" = \'F\''
// alert definition
var data = batch
|query(query)
.period(period)
.every(period)
.groupBy(groupBy)
var trigger = data
|deadman(threshold, period)
.message(message)
.id(idVar)
.idTag(idTag)
.levelTag(levelTag)
.messageField(messageField)
.durationField(durationField)
.stateChangesOnly()
trigger
|eval(lambda: "emitted")
.as('value')
.keep('value', messageField, durationField)
|eval(lambda: float("value"))
.as('value')
.keep()
|influxDBOut()
.create()
.database(outputDB)
.retentionPolicy(outputRP)
.measurement(outputMeasurement)
.tag('alertName', name)
.tag('triggerType', triggerType)
trigger
|httpOut('output')
trigger
.post('output')
.endpoint('benthos')
The template is installed by a Kubernetes Job which read the template from a configmap, uses curl to call the Kapacitor template API:
apiVersion: batch/v1
kind: Job
metadata:
name: {{ include "alerting-system.fullname" . }}-templates-installer
labels:
{{- include "alerting-system.labels" . | nindent 4 }}
annotations:
"helm.sh/hook": post-install,post-upgrade
"helm.sh/hook-delete-policy": hook-succeeded
spec:
template:
spec:
containers:
- name: curl
image: "{{ .Values.templateInstallJob.image.repository }}:{{ .Values.templateInstallJob.image.tag | default "latest" }}"
command: ["sh", "-c"]
args:
- |
set -ex
base_url='http://{{ template "kapacitor.fullname" .Subcharts.kapacitor }}:9092/kapacitor/v1/templates'
script=`cat /config/measurement_series_not_fed_template`
sleep 10
response_code=$(curl -s -o /dev/null -w "%{http_code}" "$base_url/measurement_series_not_fed_template")
if [ "$response_code" -eq 404 ]; then
curl --fail -X POST \
-H "Content-Type: application/json" \
--data-raw "{\"id\":\"measurement_series_not_fed_template\",\"type\":\"batch\",\"script\":\"$script\"}" \
"$base_url"
elif [ "$response_code" -eq 200 ]; then
curl --fail -X PATCH \
-H "Content-Type: application/json" \
--data-raw "{\"id\":\"measurement_series_not_fed_template\",\"type\":\"batch\",\"script\":\"$script\"}" \
"$base_url/measurement_series_not_fed_template"
else
echo "Unexpected response code: $response_code"
exit 1
fi
volumeMounts:
- name: config-volume
mountPath: /config
volumes:
- name: config-volume
configMap:
name: {{ include "alerting-system.fullname" . }}
restartPolicy: Never
backoffLimit: 10
Benthos and Kapacitor
This is the value for configuring Benthos and Kapacitor in our environment:
templateInstallJob:
image:
repository: curlimages/curl
pullPolicy: IfNotPresent
# Overrides the image tag whose default is the chart appVersion.
tag: "8.11.0"
ingress:
enabled: true
className: ""
annotations: {}
hosts:
- host: kapacitor.apps.ocp1.<redacted>
paths:
- path: /
pathType: ImplementationSpecific
tls: []
selectorLabels:
kapacitor:
enabled: true
## kapacitor image version
## ref: https://hub.docker.com/r/library/kapacitor/tags/
##
image:
repository: "kapacitor"
tag: "1.7.6-alpine"
pullPolicy: "IfNotPresent"
## Deployment Strategy type
# strategy:
# type: Recreate
## Specify a service type, defaults to NodePort
## ref: http://kubernetes.io/docs/user-guide/services/
##
service:
type: ClusterIP
## Persist data to a persistent volume
## ref: http://kubernetes.io/docs/user-guide/persistent-volumes/
##
persistence:
enabled: true
## kapacitor data Persistent Volume Storage Class
## If defined, storageClassName: <storageClass>
## If set to "-", storageClassName: "", which disables dynamic provisioning
## If undefined (the default) or set to null, no storageClassName spec is
## set, choosing the default provisioner. (gp2 on AWS, standard on
## GKE, AWS & OpenStack)
##
# storageClass: "-"
accessMode: ReadWriteOnce
size: 8Gi
# existingClaim: ""
## Configure resource requests and limits
## ref: http://kubernetes.io/docs/user-guide/compute-resources/
##
resources:
requests:
memory: 256Mi
cpu: 0.1
limits:
memory: 1Gi
# cpu: 1
## Set the environment variables for kapacitor (or anything else you want to use)
## ref: https://hub.docker.com/_/kapacitor/
## ref: https://docs.influxdata.com/kapacitor/latest/administration/configuration/
##
# Examples below
#
envVars:
KAPACITOR_HTTP_LOG_ENABLED: false
KAPACITOR_LOGGING_LEVEL: "ERROR"
KAPACITOR_INFLUXDB_0_HTTP_PORT: "80"
KAPACITOR_INFLUXDB_0_KAPACITOR_HOSTNAME: "kapacitor.apps.ocp1.<redacted>"
KAPACITOR_HTTPPOST_0_ENDPOINT: "benthos"
KAPACITOR_HTTPPOST_0_URL: "http://benthos-internal-service/post"
#
# or, at your terminal, with
#
# helm install --name kapacitor-rls --set influxURL=http://influxurl.com,envVars.KAPACITOR_SLACK_ENABLED=true,envVars.KAPACITOR_SLACK_URL="http://slack.com/xxxxx/xxxxx/xxxx/xxxxxxx" influxdata/kapacitor
## Set the URL of InfluxDB instance to create subscription on
## ref: https://docs.influxdata.com/kapacitor/v1.1/introduction/getting_started/
##
# influxURL: http://influxdb-influxdb.tick:8086
influxURL: http://192.168.6.39:8086
## Name of an existing Secrect used to set the environment variables for the
## InfluxDB user and password. The expected keys in the secret are
## `influxdb-user` and `influxdb-password`.
##
existingSecret: influx-auth
## Affinity for pod assignment
## Ref: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#affinity-and-anti-affinity
##
affinity: {}
## Tolerations for pod assignment
## Ref: https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/
##
tolerations: []
# - key: "key"
# operator: "Equal|Exists"
# value: "value"
# effect: "NoSchedule|PreferNoSchedule|NoExecute(1.6 only)"
## Role based access control
rbac:
create: true
namespaced: true
## Service account
serviceAccount:
annotations: {}
create: true
name:
## Override the deployment namespace
namespaceOverride: ""
benthos:
enabled: true
deployment:
replicaCount: 1
image:
repository: "docker.io/jeffail/benthos"
resources:
limits:
cpu: 200m
memory: 128Mi
requests:
cpu: 200m
memory: 128Mi
autoscaling:
enabled: false
serviceMonitor:
enabled: true
metrics:
prometheus: {}
logger:
level: INFO
static_fields:
'@service': benthos
config: |
input:
broker:
inputs:
- kafka:
addresses:
- "central-cluster-kafka-bootstrap.<redacted>.svc.cluster.local:9092"
topics: ["<redacted>"]
consumer_group: "kapacitor-alerts-connector"
start_from_oldest: true
- http_server:
path: /post
ws_path: /post/ws
allowed_verbs:
- POST
timeout: 10s
pipeline:
processors:
- bloblang: 'meta original_message = this'
- log:
level: DEBUG
message: "Received message: ${!json()}"
- try:
- group_by:
- check: this.exists("message")
processors:
- mapping: 'meta grouping = "historicization-event"'
- type: log
log:
level: INFO
message: |
Received http post from kapacitor with id: ${!json("id")}, time: ${!json("time")} and level: ${!json("level")}
- bloblang: |
root = {
"id": uuid_v4(),
"subject": this.id.trim_suffix("_not_fed"),
"source": "benthos",
"specversion": "1.0",
"time": this.time,
"type": if this.level == "OK" {
"historicization-restarted"
} else if this.level == "CRITICAL" {
"historicization-stopped"
}
}
- log:
level: INFO
message: Sending cloudevent message to kafka ${!json()}"
- check: this.exists("type")
processors:
- mapping: 'meta grouping = "input-stream-event"'
- type: log
log:
level: INFO
message: |
Received cloud event with id: ${!json("id")} and type: ${!json("type")}
- bloblang: |
root = this.data.devicescode
- unarchive:
format: json_array
- bloblang: |
let parsed = meta("original_message").parse_json()
root = {
"devicecode": this,
"eventtype": $parsed.data.eventtype,
}
- switch:
- check: this.eventtype == "disabled"
processors:
- bloblang: |
meta type = "disabled"
meta id = this.devicecode + "_not_fed"
root = {
"status": "disabled"
}
- check: this.eventtype == "enabled"
processors:
- try:
- bloblang: |
meta type = "enabled"
meta id = this.devicecode + "_not_fed"
root = this
- http:
url: http://{{ template "alerting-system.kapacitorFullname" . }}:9092/kapacitor/v1/tasks/${!meta("id")}
verb: GET
retries: 1
- bloblang: |
root = {
"status": "enabled"
}
- catch:
- log:
level: WARN
message: Task ${!meta("id")} not found. Fallback to created
- bloblang: |
meta type = "created"
meta id = this.devicecode + "_not_fed"
root = {
"id": this.devicecode + "_not_fed",
"template-id": "measurement_series_not_fed_template",
"dbrps": [{"db": "<redacted>", "rp": "autogen"}],
"vars": {
"name": {
"type": "string",
"value": this.devicecode + "_not_fed"
},
"measurement": {
"type": "string",
"value": this.devicecode
},
"db": {
"type": "string",
"value": "<redacted>"
},
"rp": {
"type": "string",
"value": "autogen"
}
},
"status": "enabled"
}
- log:
level: INFO
message: Sending message to kapacitor type "${!meta("type")}" for ${!meta("id")}"
- log:
level: DEBUG
message: Sending message to kapacitor ${!meta("id")} with body ${!json()}"
- catch:
- log:
level: ERROR
message: "Unable to creating message due to: ${!error()}. Sending original message to dlq"
output:
# stdout:
# codec: lines
fallback:
- switch:
retry_until_success: false
cases:
- check: 'meta("grouping") == "input-stream-event" && meta("type") == "created"'
output:
http_client:
retries: 10
retry_period: "10s"
url: "http://{{ template "alerting-system.kapacitorFullname" . }}:9092/kapacitor/v1/tasks"
verb: POST
headers:
Content-Type: "application/json"
- check: 'meta("grouping") == "input-stream-event" && meta("type") == "deleted"'
output:
http_client:
retries: 0
url: http://{{ template "alerting-system.kapacitorFullname" . }}:9092/kapacitor/v1/tasks/${!meta("id")}
verb: DELETE
headers:
Content-Type: "application/json"
- check: 'meta("grouping") == "input-stream-event" && meta("type") == "enabled"'
output:
http_client:
retries: 10
retry_period: "5s"
url: http://{{ template "alerting-system.kapacitorFullname" . }}:9092/kapacitor/v1/tasks/${!meta("id")}
verb: PATCH
headers:
Content-Type: "application/json"
- check: 'meta("grouping") == "input-stream-event" && meta("type") == "disabled"'
output:
http_client:
retries: 0
url: http://{{ template "alerting-system.kapacitorFullname" . }}:9092/kapacitor/v1/tasks/${!meta("id")}
verb: PATCH
headers:
Content-Type: "application/json"
- check: 'meta("grouping") == "historicization-event"'
output:
kafka:
addresses:
- "central-cluster-kafka-bootstrap.<redacted>.svc.cluster.local:9092"
topic: "kapacitor-alerts-connector.events"
target_version: 2.1.0
key: ""
compression: gzip
static_headers: {}
metadata:
exclude_prefixes: []
max_in_flight: 64
- output:
processors:
- bloblang: |
root = {
"original_message": meta("original_message"),
"send_message": this
}
kafka:
addresses:
- "central-cluster-kafka-bootstrap.<redacted>.svc.cluster.local:9092"
topic: "kapacitor-alerts-connector.dead-letters-queue"
target_version: 2.1.0
key: ""
compression: gzip
static_headers: {}
metadata:
exclude_prefixes: []
max_in_flight: 64
- processors:
- bloblang: |
root = {
"original_message": meta("original_message"),
"send_message": this
}
kafka:
addresses:
- "central-cluster-kafka-bootstrap.<redacted>.svc.cluster.local:9092"
topic: "kapacitor-alerts-connector.dead-letters-queue"
target_version: 2.1.0
key: ""
compression: gzip
static_headers: {}
metadata:
exclude_prefixes: []
max_in_flight: 64
Here some considerations:
- To allow external Influx to connect to the Kapacitor deployed on OpenShift and exposed via Ingress, it is necessary to set the environment variable
KAPACITOR_INFLUXDB_0_KAPACITOR_HOSTNAME
. - A dead letter queue is used in Benthos for messages that fail to be parsed or processed successfully.
Useful Resources
Below are some of the main resources I used. Note that the versions are not consistent, but in some newer versions of the documentation, you can find more information even for older versions.
- Implement the outbox-pattern with Red Hat OpenShift Streams for Apache Kafka and Debezium
- Red Hat Streams for Apache Kafka 2.5 API Reference
- AMQ Streams on OpenShift Overview
- Debezium User Guide 2.7.3
- Debezium Installation Reference 3.0
- Maven Repository - Debezium
- Deploying and Managing Streams for Apache Kafka on OpenShift 2.7
- Using Kafka Connect with Plug-ins
- Debezium User Guide 2.7.3
- How to Use Debezium SMT Groovy Filter for Routing Events
- Debezium Transformations Reference 1.9
- Installing Debezium on OpenShift
- Debezium SMT Overview