261 lines
14 KiB
YAML
261 lines
14 KiB
YAML
|
{{- if .Values.provisioning.enabled }}
|
||
|
{{- $replicaCount := int .Values.replicaCount }}
|
||
|
kind: Job
|
||
|
apiVersion: batch/v1
|
||
|
metadata:
|
||
|
name: {{ printf "%s-provisioning" (include "common.names.fullname" .) }}
|
||
|
namespace: {{ .Release.Namespace | quote }}
|
||
|
labels: {{- include "common.labels.standard" . | nindent 4 }}
|
||
|
app.kubernetes.io/component: kafka-provisioning
|
||
|
{{- if .Values.commonLabels }}
|
||
|
{{- include "common.tplvalues.render" ( dict "value" .Values.commonLabels "context" $ ) | nindent 4 }}
|
||
|
{{- end }}
|
||
|
annotations:
|
||
|
helm.sh/hook: post-install,post-upgrade
|
||
|
helm.sh/hook-delete-policy: before-hook-creation,hook-succeeded
|
||
|
{{- if .Values.commonAnnotations }}
|
||
|
{{- include "common.tplvalues.render" ( dict "value" .Values.commonAnnotations "context" $ ) | nindent 4 }}
|
||
|
{{- end }}
|
||
|
spec:
|
||
|
template:
|
||
|
metadata:
|
||
|
labels: {{- include "common.labels.standard" . | nindent 8 }}
|
||
|
app.kubernetes.io/component: kafka-provisioning
|
||
|
{{- if .Values.provisioning.podLabels }}
|
||
|
{{- include "common.tplvalues.render" (dict "value" .Values.provisioning.podLabels "context" $) | nindent 8 }}
|
||
|
{{- end }}
|
||
|
annotations:
|
||
|
{{- if .Values.provisioning.podAnnotations }}
|
||
|
{{- include "common.tplvalues.render" (dict "value" .Values.provisioning.podAnnotations "context" $) | nindent 8 }}
|
||
|
{{- end }}
|
||
|
spec:
|
||
|
serviceAccountName: {{ template "kafka.provisioning.serviceAccountName" . }}
|
||
|
{{- include "kafka.imagePullSecrets" . | nindent 6 }}
|
||
|
{{- if .Values.provisioning.schedulerName }}
|
||
|
schedulerName: {{ .Values.provisioning.schedulerName | quote }}
|
||
|
{{- end }}
|
||
|
{{- if .Values.provisioning.podSecurityContext.enabled }}
|
||
|
securityContext: {{- omit .Values.provisioning.podSecurityContext "enabled" | toYaml | nindent 8 }}
|
||
|
{{- end }}
|
||
|
restartPolicy: OnFailure
|
||
|
terminationGracePeriodSeconds: 0
|
||
|
{{- if .Values.provisioning.nodeSelector }}
|
||
|
nodeSelector: {{- include "common.tplvalues.render" ( dict "value" .Values.provisioning.nodeSelector "context" $) | nindent 8 }}
|
||
|
{{- end }}
|
||
|
{{- if .Values.provisioning.tolerations }}
|
||
|
tolerations: {{- include "common.tplvalues.render" (dict "value" .Values.provisioning.tolerations "context" .) | nindent 8 }}
|
||
|
{{- end }}
|
||
|
{{- if or .Values.provisioning.initContainers .Values.provisioning.waitForKafka }}
|
||
|
initContainers:
|
||
|
{{- if .Values.provisioning.waitForKafka }}
|
||
|
- name: wait-for-available-kafka
|
||
|
image: {{ include "kafka.image" . }}
|
||
|
imagePullPolicy: {{ .Values.image.pullPolicy | quote }}
|
||
|
{{- if .Values.provisioning.containerSecurityContext.enabled }}
|
||
|
securityContext: {{- omit .Values.provisioning.containerSecurityContext "enabled" | toYaml | nindent 12 }}
|
||
|
{{- end }}
|
||
|
command:
|
||
|
- /bin/bash
|
||
|
args:
|
||
|
- -ec
|
||
|
- |
|
||
|
wait-for-port \
|
||
|
--host={{ include "common.names.fullname" . }} \
|
||
|
--state=inuse \
|
||
|
--timeout=120 \
|
||
|
{{ .Values.service.ports.client | int64 }};
|
||
|
echo "Kafka is available";
|
||
|
{{- if .Values.provisioning.resources }}
|
||
|
resources: {{- toYaml .Values.provisioning.resources | nindent 12 }}
|
||
|
{{- end }}
|
||
|
{{- end }}
|
||
|
{{- if .Values.provisioning.initContainers }}
|
||
|
{{- include "common.tplvalues.render" ( dict "value" .Values.provisioning.initContainers "context" $ ) | nindent 8 }}
|
||
|
{{- end }}
|
||
|
{{- end }}
|
||
|
containers:
|
||
|
- name: kafka-provisioning
|
||
|
image: {{ include "kafka.image" . }}
|
||
|
imagePullPolicy: {{ .Values.image.pullPolicy | quote }}
|
||
|
{{- if .Values.provisioning.containerSecurityContext.enabled }}
|
||
|
securityContext: {{- omit .Values.provisioning.containerSecurityContext "enabled" | toYaml | nindent 12 }}
|
||
|
{{- end }}
|
||
|
{{- if .Values.diagnosticMode.enabled }}
|
||
|
command: {{- include "common.tplvalues.render" (dict "value" .Values.diagnosticMode.command "context" $) | nindent 12 }}
|
||
|
{{- else if .Values.provisioning.command }}
|
||
|
command: {{- include "common.tplvalues.render" (dict "value" .Values.provisioning.command "context" $) | nindent 12 }}
|
||
|
{{- else }}
|
||
|
command:
|
||
|
- /bin/bash
|
||
|
{{- end }}
|
||
|
{{- if .Values.diagnosticMode.enabled }}
|
||
|
args: {{- include "common.tplvalues.render" (dict "value" .Values.diagnosticMode.args "context" $) | nindent 12 }}
|
||
|
{{- else if .Values.provisioning.args }}
|
||
|
args: {{- include "common.tplvalues.render" (dict "value" .Values.provisioning.args "context" $) | nindent 12 }}
|
||
|
{{- else }}
|
||
|
args:
|
||
|
- -ec
|
||
|
- |
|
||
|
echo "Configuring environment"
|
||
|
. /opt/bitnami/scripts/libkafka.sh
|
||
|
export CLIENT_CONF="${CLIENT_CONF:-/opt/bitnami/kafka/config/client.properties}"
|
||
|
if [ ! -f "$CLIENT_CONF" ]; then
|
||
|
touch $CLIENT_CONF
|
||
|
|
||
|
kafka_common_conf_set "$CLIENT_CONF" security.protocol {{ include "kafka.listenerType" ( dict "protocol" .Values.auth.clientProtocol ) | quote }}
|
||
|
{{- if (include "kafka.client.tlsEncryption" .) }}
|
||
|
kafka_common_conf_set "$CLIENT_CONF" ssl.keystore.type {{ upper .Values.provisioning.auth.tls.type | quote }}
|
||
|
kafka_common_conf_set "$CLIENT_CONF" ssl.truststore.type {{ upper .Values.provisioning.auth.tls.type | quote }}
|
||
|
! is_empty_value "$KAFKA_CLIENT_KEY_PASSWORD" && kafka_common_conf_set "$CLIENT_CONF" ssl.key.password "$KAFKA_CLIENT_KEY_PASSWORD"
|
||
|
{{- if eq (upper .Values.provisioning.auth.tls.type) "PEM" }}
|
||
|
file_to_multiline_property() {
|
||
|
awk 'NR > 1{print line" \\"}{line=$0;}END{print $0" "}' <"${1:?missing file}"
|
||
|
}
|
||
|
kafka_common_conf_set "$CLIENT_CONF" ssl.keystore.key "$(file_to_multiline_property "/certs/{{ .Values.provisioning.auth.tls.key }}")"
|
||
|
kafka_common_conf_set "$CLIENT_CONF" ssl.keystore.certificate.chain "$(file_to_multiline_property "/certs/{{ .Values.provisioning.auth.tls.caCert }}")"
|
||
|
kafka_common_conf_set "$CLIENT_CONF" ssl.truststore.certificates "$(file_to_multiline_property "/certs/{{ .Values.provisioning.auth.tls.cert }}")"
|
||
|
{{- else if eq (upper .Values.provisioning.auth.tls.type) "JKS" }}
|
||
|
kafka_common_conf_set "$CLIENT_CONF" ssl.keystore.location "/certs/{{ .Values.provisioning.auth.tls.keystore }}"
|
||
|
kafka_common_conf_set "$CLIENT_CONF" ssl.truststore.location "/certs/{{ .Values.provisioning.auth.tls.truststore }}"
|
||
|
! is_empty_value "$KAFKA_CLIENT_KEYSTORE_PASSWORD" && kafka_common_conf_set "$CLIENT_CONF" ssl.keystore.password "$KAFKA_CLIENT_KEYSTORE_PASSWORD"
|
||
|
! is_empty_value "$KAFKA_CLIENT_TRUSTSTORE_PASSWORD" && kafka_common_conf_set "$CLIENT_CONF" ssl.truststore.password "$KAFKA_CLIENT_TRUSTSTORE_PASSWORD"
|
||
|
{{- end }}
|
||
|
{{- end }}
|
||
|
{{- if (include "kafka.client.saslAuthentication" .) }}
|
||
|
{{- if contains "plain" .Values.auth.sasl.mechanisms }}
|
||
|
kafka_common_conf_set "$CLIENT_CONF" sasl.mechanism PLAIN
|
||
|
kafka_common_conf_set "$CLIENT_CONF" sasl.jaas.config "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$SASL_USERNAME\" password=\"$SASL_USER_PASSWORD\";"
|
||
|
{{- else if contains "scram-sha-256" .Values.auth.sasl.mechanisms }}
|
||
|
kafka_common_conf_set "$CLIENT_CONF" sasl.mechanism SCRAM-SHA-256
|
||
|
kafka_common_conf_set "$CLIENT_CONF" sasl.jaas.config "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"$SASL_USERNAME\" password=\"$SASL_USER_PASSWORD\";"
|
||
|
{{- else if contains "scram-sha-512" .Values.auth.sasl.mechanisms }}
|
||
|
kafka_common_conf_set "$CLIENT_CONF" sasl.mechanism SCRAM-SHA-512
|
||
|
kafka_common_conf_set "$CLIENT_CONF" sasl.jaas.config "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"$SASL_USERNAME\" password=\"$SASL_USER_PASSWORD\";"
|
||
|
{{- end }}
|
||
|
{{- end }}
|
||
|
fi
|
||
|
|
||
|
echo "Running pre-provisioning script if any given"
|
||
|
{{ .Values.provisioning.preScript | nindent 14 }}
|
||
|
|
||
|
kafka_provisioning_commands=(
|
||
|
{{- range $topic := .Values.provisioning.topics }}
|
||
|
"/opt/bitnami/kafka/bin/kafka-topics.sh \
|
||
|
--create \
|
||
|
--if-not-exists \
|
||
|
--bootstrap-server ${KAFKA_SERVICE} \
|
||
|
--replication-factor {{ $topic.replicationFactor | default $.Values.provisioning.replicationFactor }} \
|
||
|
--partitions {{ $topic.partitions | default $.Values.provisioning.numPartitions }} \
|
||
|
{{- range $name, $value := $topic.config }}
|
||
|
--config {{ $name }}={{ $value }} \
|
||
|
{{- end }}
|
||
|
--command-config ${CLIENT_CONF} \
|
||
|
--topic {{ $topic.name }}"
|
||
|
{{- end }}
|
||
|
{{- range $command := .Values.provisioning.extraProvisioningCommands }}
|
||
|
{{- $command | quote | nindent 16 }}
|
||
|
{{- end }}
|
||
|
)
|
||
|
|
||
|
echo "Starting provisioning"
|
||
|
for ((index=0; index < ${#kafka_provisioning_commands[@]}; index+={{ .Values.provisioning.parallel }}))
|
||
|
do
|
||
|
for j in $(seq ${index} $((${index}+{{ .Values.provisioning.parallel }}-1)))
|
||
|
do
|
||
|
${kafka_provisioning_commands[j]} & # Async command
|
||
|
done
|
||
|
wait # Wait the end of the jobs
|
||
|
done
|
||
|
|
||
|
echo "Running post-provisioning script if any given"
|
||
|
{{ .Values.provisioning.postScript | nindent 14 }}
|
||
|
|
||
|
echo "Provisioning succeeded"
|
||
|
{{- end }}
|
||
|
env:
|
||
|
- name: BITNAMI_DEBUG
|
||
|
value: {{ ternary "true" "false" (or .Values.image.debug .Values.diagnosticMode.enabled) | quote }}
|
||
|
{{- if (include "kafka.client.tlsEncryption" .) }}
|
||
|
- name: KAFKA_CLIENT_KEY_PASSWORD
|
||
|
valueFrom:
|
||
|
secretKeyRef:
|
||
|
name: {{ template "kafka.client.passwordsSecretName" . }}
|
||
|
key: {{ .Values.provisioning.auth.tls.keyPasswordSecretKey }}
|
||
|
- name: KAFKA_CLIENT_KEYSTORE_PASSWORD
|
||
|
valueFrom:
|
||
|
secretKeyRef:
|
||
|
name: {{ template "kafka.client.passwordsSecretName" . }}
|
||
|
key: {{ .Values.provisioning.auth.tls.keystorePasswordSecretKey }}
|
||
|
- name: KAFKA_CLIENT_TRUSTSTORE_PASSWORD
|
||
|
valueFrom:
|
||
|
secretKeyRef:
|
||
|
name: {{ template "kafka.client.passwordsSecretName" . }}
|
||
|
key: {{ .Values.provisioning.auth.tls.truststorePasswordSecretKey }}
|
||
|
{{- end }}
|
||
|
- name: KAFKA_SERVICE
|
||
|
value: {{ printf "%s:%d" (include "common.names.fullname" .) (.Values.service.ports.client | int64) }}
|
||
|
{{- if (include "kafka.client.saslAuthentication" .) }}
|
||
|
{{- $clientUsers := .Values.auth.sasl.jaas.clientUsers }}
|
||
|
- name: SASL_USERNAME
|
||
|
value: {{ index $clientUsers 0 | quote }}
|
||
|
- name: SASL_USER_PASSWORD
|
||
|
valueFrom:
|
||
|
secretKeyRef:
|
||
|
name: {{ include "kafka.jaasSecretName" . }}
|
||
|
key: system-user-password
|
||
|
{{- end }}
|
||
|
{{- if .Values.provisioning.extraEnvVars }}
|
||
|
{{- include "common.tplvalues.render" ( dict "value" .Values.provisioning.extraEnvVars "context" $) | nindent 12 }}
|
||
|
{{- end }}
|
||
|
{{- if or .Values.provisioning.extraEnvVarsCM .Values.provisioning.extraEnvVarsSecret }}
|
||
|
envFrom:
|
||
|
{{- if .Values.provisioning.extraEnvVarsCM }}
|
||
|
- configMapRef:
|
||
|
name: {{ include "common.tplvalues.render" (dict "value" .Values.provisioning.extraEnvVarsCM "context" $) }}
|
||
|
{{- end }}
|
||
|
{{- if .Values.provisioning.extraEnvVarsSecret }}
|
||
|
- secretRef:
|
||
|
name: {{ include "common.tplvalues.render" (dict "value" .Values.provisioning.extraEnvVarsSecret "context" $) }}
|
||
|
{{- end }}
|
||
|
{{- end }}
|
||
|
{{- if .Values.provisioning.resources }}
|
||
|
resources: {{- toYaml .Values.provisioning.resources | nindent 12 }}
|
||
|
{{- end }}
|
||
|
volumeMounts:
|
||
|
{{- if or .Values.log4j .Values.existingLog4jConfigMap }}
|
||
|
- name: log4j-config
|
||
|
mountPath: {{ .Values.persistence.mountPath }}/config/log4j.properties
|
||
|
subPath: log4j.properties
|
||
|
{{- end }}
|
||
|
{{- if (include "kafka.client.tlsEncryption" .) }}
|
||
|
{{- if not (empty .Values.provisioning.auth.tls.certificatesSecret) }}
|
||
|
- name: kafka-client-certs
|
||
|
mountPath: /certs
|
||
|
readOnly: true
|
||
|
{{- end }}
|
||
|
{{- end }}
|
||
|
{{- if .Values.provisioning.extraVolumeMounts }}
|
||
|
{{- include "common.tplvalues.render" (dict "value" .Values.provisioning.extraVolumeMounts "context" $) | nindent 12 }}
|
||
|
{{- end }}
|
||
|
{{- if .Values.provisioning.sidecars }}
|
||
|
{{- include "common.tplvalues.render" (dict "value" .Values.provisioning.sidecars "context" $) | nindent 8 }}
|
||
|
{{- end }}
|
||
|
volumes:
|
||
|
{{- if or .Values.log4j .Values.existingLog4jConfigMap }}
|
||
|
- name: log4j-config
|
||
|
configMap:
|
||
|
name: {{ include "kafka.log4j.configMapName" . }}
|
||
|
{{ end }}
|
||
|
{{- if (include "kafka.client.tlsEncryption" .) }}
|
||
|
{{- if not (empty .Values.provisioning.auth.tls.certificatesSecret) }}
|
||
|
- name: kafka-client-certs
|
||
|
secret:
|
||
|
secretName: {{ .Values.provisioning.auth.tls.certificatesSecret }}
|
||
|
defaultMode: 256
|
||
|
{{- end }}
|
||
|
{{- end }}
|
||
|
{{- if .Values.provisioning.extraVolumes }}
|
||
|
{{- include "common.tplvalues.render" (dict "value" .Values.provisioning.extraVolumes "context" $) | nindent 8 }}
|
||
|
{{- end }}
|
||
|
{{- end }}
|