Run kafkaConnect with docker image

Comment déployer une image docker de kafka connect avec un connecteur jdbc.

 

Prérequis

Docker installé et opérationnel :

Pour mon cas, j’ai utilisé Docker Engine on debian.

 

Créer un fichier Dockerfile

Dans dossier ‘Docker’, nous allons créer un fichier ‘Dockerfile’ dont le contenu est :

FROM confluentinc/cp-kafka-connect-base:latest
run confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest

Pour récupérer la dernière version de l’image de kafka-connect-base ainsi que la dernier version du connecteur Jdbc.

 

Build de l’image docker

Build l’image depuis Dockerfile :

docker build . -t containerproductitem:1.0.0

 

Créer un fichier (env.list)

Dans ce fichier, nous allons configurer notre accès au kafka Cluster et Kafka registry, afin de pouvoir consommer les messages depuis un topic.

CONNECT_BOOTSTRAP_SERVERS=serveur:port

CONNECT_GROUP_ID=svc-TEST-app-connect-worker

CONNECT_CONFIG_STORAGE_TOPIC= dev-europe-west1-APP-TEST-LM-FR-P1-C1-PRODUCTITEM-V1-configs

CONNECT_OFFSET_STORAGE_TOPIC= dev-europe-west1-APP-TEST-LM-FR-P1-C1-PRODUCTITEM-V1-offsets

CONNECT_STATUS_STORAGE_TOPIC= dev-europe-west1-APP-TEST-LM-FR-P1-C1-PRODUCTITEM-V1-status

CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=3

CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=3

CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=3

SCHEMA_REGISTRY_URL=io.confluent.connect.avro.AvroConverter

BASIC_AUTH_CREDENTIALS_SOURCE=USER_INFO

SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=*******:*******

CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter

CONNECT_KEY_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE=USER_INFO

CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=******

CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=https://schema-registry.*******-staging-europe-west1.streaming.data.******.cloud

CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter

CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE=USER_INFO

CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=********

CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=https://schema-registry.*******-staging-europe-west1.streaming.data.***.cloud

CONNECT_REST_ADVERTISED_HOST_NAME="localhost"

CONNECT_PLUGIN_PATH=/usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/

CONNECT_REST_PORT=8082

CONNECT_LOG4J_LOGGERS="kafka.controller=ERROR,kafka.foo.bar=ERROR"

CONNECT_LOG4J_ROOT_LOGLEVEL=ERROR

CONNECT_TOOLS_LOG4J_LOGLEVEL=ERROR

CONNECT_SASL_MECHANISM=PLAIN

CONNECT_SECURITY_PROTOCOL=SASL_SSL

CONNECT_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="*******" password="*******";

CONNECT_PRODUCER_SASL_MECHANISM=PLAIN

CONNECT_PRODUCER_SECURITY_PROTOCOL=SASL_SSL

CONNECT_PRODUCER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="*******" password="*******";

CONNECT_PRODUCER_GROUP_ID=svc-TEST-app-connect-worker

CONNECT_CONSUMER_SASL_MECHANISM=PLAIN

CONNECT_CONSUMER_SECURITY_PROTOCOL=SASL_SSL

CONNECT_CONSUMER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="*******" password="*******";

CONNECT_CONSUMER_GROUP_ID=svc-TEST-app-connect-worker

CONNECT_CONSUMER_CLIENT_ID=svc-TEST-app-connect-worker

 

Run du container

Nous allons démarrer notre container « containerproductitem » avec les ressources suivantes :

docker run -e 8082 -l error --name containerproductitem --network host --env-file env.list containerproductitem:1.0.0

 

Config du connecteur jdbc

Maintenant que le Kafka Connect est lancé via le container containerproductitem, il reste à configurer le connecteur jbdc Sink, pour consommer un topic Kafka et pousser les messages dans la table « tb_references_step_import » sous postgresql.

Pour accéder au container et pouvoir lancer la commande curl :

docker exec -t -i containerproductitem /bin/bash

Commande curl :

Curl -X POST -H "Content-Type: application/json" --data '{"name":"connector-sinkJDBC","config": { "topics": "dev-europe-west1-APP-TEST-LM-FR-P1-C1-PRODUCTITEM-V1","tasks.max": "1","connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","flush.size":"3","format.class":"io.confluent.connect.sftp.sink.format.csv.avroFormat","connection.url":"jdbc:postgresql://dfrlm****.int.com:5432/postgres?verifyServerCertificate:false&useSSL:true&requireSSL:true","connection.user":"***","connection.password":"****","table.name.format":"tb_references_step_import","pk.mode":"record_value","insert.mode":"upsert","puto.create":"true","auto.evolve":"true","delete.enabled":"false"}}' http://localhost:8082/connectors

 

Cette configuration sera stockée dans le topic config ‘dev-europe-west1-APP-TEST-LM-FR-P1-C1-PRODUCTITEM-V1-configs‘ prédéfini dans notre fichier env.list.

Notre kafka connect est maintenant démarré. Le connecteur jdbc Sink est configuré, et les messages sont consommés du topic dev-europe-west1-APP-TEST-LM-FR-P1-C1-PRODUCTITEM-V1 en format Avro, et stockés dans la table « tb_references_step_import » sous postgresql :