{{- if .Values.provisioning.enabled }} {{- $replicaCount := int .Values.replicaCount }} kind: Job apiVersion: batch/v1 metadata: name: {{ printf "%s-provisioning" (include "common.names.fullname" .) }} namespace: {{ .Release.Namespace | quote }} labels: {{- include "common.labels.standard" . | nindent 4 }} app.kubernetes.io/component: kafka-provisioning {{- if .Values.commonLabels }} {{- include "common.tplvalues.render" ( dict "value" .Values.commonLabels "context" $ ) | nindent 4 }} {{- end }} annotations: helm.sh/hook: post-install,post-upgrade helm.sh/hook-delete-policy: before-hook-creation,hook-succeeded {{- if .Values.commonAnnotations }} {{- include "common.tplvalues.render" ( dict "value" .Values.commonAnnotations "context" $ ) | nindent 4 }} {{- end }} spec: template: metadata: labels: {{- include "common.labels.standard" . | nindent 8 }} app.kubernetes.io/component: kafka-provisioning {{- if .Values.provisioning.podLabels }} {{- include "common.tplvalues.render" (dict "value" .Values.provisioning.podLabels "context" $) | nindent 8 }} {{- end }} annotations: {{- if .Values.provisioning.podAnnotations }} {{- include "common.tplvalues.render" (dict "value" .Values.provisioning.podAnnotations "context" $) | nindent 8 }} {{- end }} spec: serviceAccountName: {{ template "kafka.provisioning.serviceAccountName" . }} {{- include "kafka.imagePullSecrets" . | nindent 6 }} {{- if .Values.provisioning.schedulerName }} schedulerName: {{ .Values.provisioning.schedulerName | quote }} {{- end }} {{- if .Values.provisioning.podSecurityContext.enabled }} securityContext: {{- omit .Values.provisioning.podSecurityContext "enabled" | toYaml | nindent 8 }} {{- end }} restartPolicy: OnFailure terminationGracePeriodSeconds: 0 {{- if .Values.provisioning.nodeSelector }} nodeSelector: {{- include "common.tplvalues.render" ( dict "value" .Values.provisioning.nodeSelector "context" $) | nindent 8 }} {{- end }} {{- if .Values.provisioning.tolerations }} tolerations: {{- include "common.tplvalues.render" (dict "value" .Values.provisioning.tolerations "context" .) | nindent 8 }} {{- end }} {{- if or .Values.provisioning.initContainers .Values.provisioning.waitForKafka }} initContainers: {{- if .Values.provisioning.waitForKafka }} - name: wait-for-available-kafka image: {{ include "kafka.image" . }} imagePullPolicy: {{ .Values.image.pullPolicy | quote }} {{- if .Values.provisioning.containerSecurityContext.enabled }} securityContext: {{- omit .Values.provisioning.containerSecurityContext "enabled" | toYaml | nindent 12 }} {{- end }} command: - /bin/bash args: - -ec - | wait-for-port \ --host={{ include "common.names.fullname" . }} \ --state=inuse \ --timeout=120 \ {{ .Values.service.ports.client | int64 }}; echo "Kafka is available"; {{- if .Values.provisioning.resources }} resources: {{- toYaml .Values.provisioning.resources | nindent 12 }} {{- end }} {{- end }} {{- if .Values.provisioning.initContainers }} {{- include "common.tplvalues.render" ( dict "value" .Values.provisioning.initContainers "context" $ ) | nindent 8 }} {{- end }} {{- end }} containers: - name: kafka-provisioning image: {{ include "kafka.image" . }} imagePullPolicy: {{ .Values.image.pullPolicy | quote }} {{- if .Values.provisioning.containerSecurityContext.enabled }} securityContext: {{- omit .Values.provisioning.containerSecurityContext "enabled" | toYaml | nindent 12 }} {{- end }} {{- if .Values.diagnosticMode.enabled }} command: {{- include "common.tplvalues.render" (dict "value" .Values.diagnosticMode.command "context" $) | nindent 12 }} {{- else if .Values.provisioning.command }} command: {{- include "common.tplvalues.render" (dict "value" .Values.provisioning.command "context" $) | nindent 12 }} {{- else }} command: - /bin/bash {{- end }} {{- if .Values.diagnosticMode.enabled }} args: {{- include "common.tplvalues.render" (dict "value" .Values.diagnosticMode.args "context" $) | nindent 12 }} {{- else if .Values.provisioning.args }} args: {{- include "common.tplvalues.render" (dict "value" .Values.provisioning.args "context" $) | nindent 12 }} {{- else }} args: - -ec - | echo "Configuring environment" . /opt/bitnami/scripts/libkafka.sh export CLIENT_CONF="${CLIENT_CONF:-/opt/bitnami/kafka/config/client.properties}" if [ ! -f "$CLIENT_CONF" ]; then touch $CLIENT_CONF kafka_common_conf_set "$CLIENT_CONF" security.protocol {{ include "kafka.listenerType" ( dict "protocol" .Values.auth.clientProtocol ) | quote }} {{- if (include "kafka.client.tlsEncryption" .) }} kafka_common_conf_set "$CLIENT_CONF" ssl.keystore.type {{ upper .Values.provisioning.auth.tls.type | quote }} kafka_common_conf_set "$CLIENT_CONF" ssl.truststore.type {{ upper .Values.provisioning.auth.tls.type | quote }} ! is_empty_value "$KAFKA_CLIENT_KEY_PASSWORD" && kafka_common_conf_set "$CLIENT_CONF" ssl.key.password "$KAFKA_CLIENT_KEY_PASSWORD" {{- if eq (upper .Values.provisioning.auth.tls.type) "PEM" }} {{- if .Values.provisioning.auth.tls.caCert }} file_to_multiline_property() { awk 'NR > 1{print line" \\"}{line=$0;}END{print $0" "}' <"${1:?missing file}" } kafka_common_conf_set "$CLIENT_CONF" ssl.keystore.key "$(file_to_multiline_property "/certs/{{ .Values.provisioning.auth.tls.key }}")" kafka_common_conf_set "$CLIENT_CONF" ssl.keystore.certificate.chain "$(file_to_multiline_property "/certs/{{ .Values.provisioning.auth.tls.cert }}")" kafka_common_conf_set "$CLIENT_CONF" ssl.truststore.certificates "$(file_to_multiline_property "/certs/{{ .Values.provisioning.auth.tls.caCert }}")" {{- else }} kafka_common_conf_set "$CLIENT_CONF" ssl.keystore.location "/certs/{{ .Values.provisioning.auth.tls.keystore }}" kafka_common_conf_set "$CLIENT_CONF" ssl.truststore.location "/certs/{{ .Values.provisioning.auth.tls.truststore }}" {{- end }} {{- else if eq (upper .Values.provisioning.auth.tls.type) "JKS" }} kafka_common_conf_set "$CLIENT_CONF" ssl.keystore.location "/certs/{{ .Values.provisioning.auth.tls.keystore }}" kafka_common_conf_set "$CLIENT_CONF" ssl.truststore.location "/certs/{{ .Values.provisioning.auth.tls.truststore }}" ! is_empty_value "$KAFKA_CLIENT_KEYSTORE_PASSWORD" && kafka_common_conf_set "$CLIENT_CONF" ssl.keystore.password "$KAFKA_CLIENT_KEYSTORE_PASSWORD" ! is_empty_value "$KAFKA_CLIENT_TRUSTSTORE_PASSWORD" && kafka_common_conf_set "$CLIENT_CONF" ssl.truststore.password "$KAFKA_CLIENT_TRUSTSTORE_PASSWORD" {{- end }} {{- end }} {{- if (include "kafka.client.saslAuthentication" .) }} {{- if contains "plain" .Values.auth.sasl.mechanisms }} kafka_common_conf_set "$CLIENT_CONF" sasl.mechanism PLAIN kafka_common_conf_set "$CLIENT_CONF" sasl.jaas.config "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$SASL_USERNAME\" password=\"$SASL_USER_PASSWORD\";" {{- else if contains "scram-sha-256" .Values.auth.sasl.mechanisms }} kafka_common_conf_set "$CLIENT_CONF" sasl.mechanism SCRAM-SHA-256 kafka_common_conf_set "$CLIENT_CONF" sasl.jaas.config "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"$SASL_USERNAME\" password=\"$SASL_USER_PASSWORD\";" {{- else if contains "scram-sha-512" .Values.auth.sasl.mechanisms }} kafka_common_conf_set "$CLIENT_CONF" sasl.mechanism SCRAM-SHA-512 kafka_common_conf_set "$CLIENT_CONF" sasl.jaas.config "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"$SASL_USERNAME\" password=\"$SASL_USER_PASSWORD\";" {{- end }} {{- end }} fi echo "Running pre-provisioning script if any given" {{ .Values.provisioning.preScript | nindent 14 }} kafka_provisioning_commands=( {{- range $topic := .Values.provisioning.topics }} "/opt/bitnami/kafka/bin/kafka-topics.sh \ --create \ --if-not-exists \ --bootstrap-server ${KAFKA_SERVICE} \ --replication-factor {{ $topic.replicationFactor | default $.Values.provisioning.replicationFactor }} \ --partitions {{ $topic.partitions | default $.Values.provisioning.numPartitions }} \ {{- range $name, $value := $topic.config }} --config {{ $name }}={{ $value }} \ {{- end }} --command-config ${CLIENT_CONF} \ --topic {{ $topic.name }}" {{- end }} {{- range $command := .Values.provisioning.extraProvisioningCommands }} {{- $command | quote | nindent 16 }} {{- end }} ) echo "Starting provisioning" for ((index=0; index < ${#kafka_provisioning_commands[@]}; index+={{ .Values.provisioning.parallel }})) do for j in $(seq ${index} $((${index}+{{ .Values.provisioning.parallel }}-1))) do ${kafka_provisioning_commands[j]} & # Async command done wait # Wait the end of the jobs done echo "Running post-provisioning script if any given" {{ .Values.provisioning.postScript | nindent 14 }} echo "Provisioning succeeded" {{- end }} env: - name: BITNAMI_DEBUG value: {{ ternary "true" "false" (or .Values.image.debug .Values.diagnosticMode.enabled) | quote }} {{- if (include "kafka.client.tlsEncryption" .) }} - name: KAFKA_CLIENT_KEY_PASSWORD valueFrom: secretKeyRef: name: {{ template "kafka.client.passwordsSecretName" . }} key: {{ .Values.provisioning.auth.tls.keyPasswordSecretKey }} - name: KAFKA_CLIENT_KEYSTORE_PASSWORD valueFrom: secretKeyRef: name: {{ template "kafka.client.passwordsSecretName" . }} key: {{ .Values.provisioning.auth.tls.keystorePasswordSecretKey }} - name: KAFKA_CLIENT_TRUSTSTORE_PASSWORD valueFrom: secretKeyRef: name: {{ template "kafka.client.passwordsSecretName" . }} key: {{ .Values.provisioning.auth.tls.truststorePasswordSecretKey }} {{- end }} - name: KAFKA_SERVICE value: {{ printf "%s:%d" (include "common.names.fullname" .) (.Values.service.ports.client | int64) }} {{- if (include "kafka.client.saslAuthentication" .) }} {{- $clientUsers := .Values.auth.sasl.jaas.clientUsers }} - name: SASL_USERNAME value: {{ index $clientUsers 0 | quote }} - name: SASL_USER_PASSWORD valueFrom: secretKeyRef: name: {{ include "kafka.jaasSecretName" . }} key: system-user-password {{- end }} {{- if .Values.provisioning.extraEnvVars }} {{- include "common.tplvalues.render" ( dict "value" .Values.provisioning.extraEnvVars "context" $) | nindent 12 }} {{- end }} {{- if or .Values.provisioning.extraEnvVarsCM .Values.provisioning.extraEnvVarsSecret }} envFrom: {{- if .Values.provisioning.extraEnvVarsCM }} - configMapRef: name: {{ include "common.tplvalues.render" (dict "value" .Values.provisioning.extraEnvVarsCM "context" $) }} {{- end }} {{- if .Values.provisioning.extraEnvVarsSecret }} - secretRef: name: {{ include "common.tplvalues.render" (dict "value" .Values.provisioning.extraEnvVarsSecret "context" $) }} {{- end }} {{- end }} {{- if .Values.provisioning.resources }} resources: {{- toYaml .Values.provisioning.resources | nindent 12 }} {{- end }} volumeMounts: {{- if or .Values.log4j .Values.existingLog4jConfigMap }} - name: log4j-config mountPath: {{ .Values.persistence.mountPath }}/config/log4j.properties subPath: log4j.properties {{- end }} {{- if (include "kafka.client.tlsEncryption" .) }} {{- if not (empty .Values.provisioning.auth.tls.certificatesSecret) }} - name: kafka-client-certs mountPath: /certs readOnly: true {{- end }} {{- end }} {{- if .Values.provisioning.extraVolumeMounts }} {{- include "common.tplvalues.render" (dict "value" .Values.provisioning.extraVolumeMounts "context" $) | nindent 12 }} {{- end }} {{- if .Values.provisioning.sidecars }} {{- include "common.tplvalues.render" (dict "value" .Values.provisioning.sidecars "context" $) | nindent 8 }} {{- end }} volumes: {{- if or .Values.log4j .Values.existingLog4jConfigMap }} - name: log4j-config configMap: name: {{ include "kafka.log4j.configMapName" . }} {{ end }} {{- if (include "kafka.client.tlsEncryption" .) }} {{- if not (empty .Values.provisioning.auth.tls.certificatesSecret) }} - name: kafka-client-certs secret: secretName: {{ .Values.provisioning.auth.tls.certificatesSecret }} defaultMode: 256 {{- end }} {{- end }} {{- if .Values.provisioning.extraVolumes }} {{- include "common.tplvalues.render" (dict "value" .Values.provisioning.extraVolumes "context" $) | nindent 8 }} {{- end }} {{- end }}