Consumidores y Productores de Apache Kafka con Java y Python
Veremos los ejemplos más básicos de Consumidores y Productores de Kafka con Python y Java, aunque estos ejemplos se pueden extender bastante más (por ejemplo, con Stream, batch o bulk de datos).
Índice
- Apache Kafka: Teoría.
- Instalar Apache Kafka: Instalación y consola.
- Consumidores y Productores de Apache Kafka con Java y Python: Ejemplos de código.
- Clúster Apache Kafka tolerante a fallos y alta disponibilidad: Configuración de brókeres y sistema distribuido.
Python
Podremos utilizar lenguajes de programación para crear Consumidores y Productores de nuestro Kafka (o el de terceros) con Python, para ello podemos utilizar la biblioteca “Kafka-python” (su documentación en https://kafka-python.readthedocs.io/)
pip install kafka-python
Productor en Python
Para producir mensajes, utilizaremos “KafkaProducer” que importaremos en primer lugar (documentación de KafkaProducer en https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html?highlight=KafkaProducer ), en el que definiremos en “bootstrap_servers” un String con los servidores Kafka con su puerto (el puerto de Kafka es “9092” por defecto). Posteriormente utilizaremos el método “send” para enviar a un topic el mensaje que queramos (hay que enviarlo como array de bytes, que en Python se puede serializar tan fácilmente como añadiendo una “b” delante del String o con el método “encode”). En el siguiente ejemplo, me conecto a mi servidor en la IP “27.0.173.161” y al puerto del bróker de Kafka “9092”, para enviar cinco mensajes:
from kafka import KafkaProducer
servidores_bootstrap = '27.0.173.161:9092'
topic = 'miNuevoTopic'
productor = KafkaProducer(bootstrap_servers=servidores_bootstrap)
productor.send(topic, b'Un mensaje desde Python')
productor.send(topic, b'Otro mensaje desde Python')
productor.send(topic, b'Un tercer mensaje desde Python')
Nota sobre las claves: podremos extender el método send para añadir una clave de mensaje a cada mensaje, por ejemplo (tanto la clave como el valor deben ser arrays de bytes):
productor.send('miNuevoTopic', key=b'claveDelMensaje', value=b'Un mensaje')
Nota si tenemos varios servidores: En un entorno de producción será lo normal, por lo que en caso de tener varios servidores los pondremos separados por comas para que se tengan en cuenta todos (aunque realmente se conecta al primero que lo logre y luego ya lo gestiona Zookeeper).
Consumidor en Python
Por su parte, para consumir mensajes, importaremos y usaremos “KafkaConsumer” (documentación de KafkaConsumer en https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html?highlight=kafkaconsumer ), al igual que con “KafkaProducer” definiremos los servidores de Kafka en “bootstrap_servers” del que consumiremos los mensajes. En el siguiente ejemplo crearé en la variable “consumidor” un objeto KafkaConsumer que es un iterador, por lo que lo puedo recorrer en un bucle “for” que devuelve mensajes “msg” (que tienen toda la información, desde el valor que produjimos anteriormente, hasta el topic del que viene, etc.), de los cuales puedo obtener el valor introducido anteriormente llamando a “value” (se recibe un array de bytes, que en Python se puede deserializar con el método “decode”):
from kafka import KafkaConsumer
servidores_bootstrap = '27.0.173.161:9092'
topic = 'miNuevoTopic'
consumidor = KafkaConsumer(topic, bootstrap_servers=servidores_bootstrap)
for msg in consumidor:
print(msg.value)
Este código imprime:
b'Un mensaje desde Python'
b'Otro mensaje desde Python'
b'Un tercer mensaje desde Python'
Nota sobre el “grupo de consumidores”: A la hora de inicializar el “KafkaConsumer”, podremos pasarle un argumento de clave “group_id” (que por defecto es None):
consumidor = KafkaConsumer(topic, bootstrap_servers=servidores_bootstrap, group_id='idGrupoConsumidor')
Java
En Java necesitamos instalar la dependencia de Kafka-Clients (documentación en https://kafka.apache.org/documentation/#producerapi y https://docs.confluent.io/clients-kafka-java/current/overview.html), que podemos encontrar en https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients (escoger la versión más actual o la deseada). Podemos instalar esta dependencia con el Pom.xml de Maven (para saber qué es Maven tenemos un artículo dedicado https://jarroba.com/maven/ o si quieres montar un proyecto rápido de Maven por línea de comandos, te dejo la manera en un cuadro “Instalar Maven en Linux por consola y crear un arquetipo rápido” en este artículo de instalar Apache Spark) añadiendo (yo instalaré la versión 3.1.0, tú deberás cambiar a la versión que necesites):
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.0</version>
</dependency>
Productor en Java
Tendremos que usar/importar el paquete de “org.apache.kafka.clients.producer”. Primero establecemos las propiedades (usando la clase Properties) con los servidores en BOOTSTRAP_SERVERS_CONFIG, VALUE_SERIALIZER_CLASS_CONFIG y KEY_SERIALIZER_CLASS_CONFIG. En los Productores de Kafka cada vez que mandemos un mensaje siempre se mandará la clave del mensaje (si no declaramos la clave se enviará un null, que también debe ser serializado), por lo que siempre hay que indicar los serializadores tanto para clave como para mensaje; por ejemplo, si queremos convertir un String en array de bytes, podremos utilizar la clase Java “StringSerializer” y la asignaríamos tanto en la propiedad VALUE_SERIALIZER_CLASS_CONFIG (para los mensajes) como en KEY_SERIALIZER_CLASS_CONFIG (para las claves de los mensajes). A continuación, crearemos un nuevo productor KafkProducer con las propiedades anteriores y enviaremos nuestros mensajes con el método “send” que pide un objeto de tipo ProducerRecord en el que pondremos el topic y el mensaje a enviar. Al final, es importante cerrar la conexión al final con “close”, pero antes enviaremos los mensajes que queden con “flush” (pues con “send” los mensajes se van guardando en un buffer hasta que alcanza cierto tamaño y se mandan todos juntos como un batch de mensajes, por lo que antes de cerrar la conexión, intentamos enviar los mensajes del buffer con “flush”).
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class ProductorKafka {
public static void main(String[] args) {
String servidoresBootstrap = "27.0.173.161:9092";
String topic = "miNuevoTopic";
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servidoresBootstrap);
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> productor = new KafkaProducer<>(props);
try {
productor.send(new ProducerRecord<>(topic, "Un mensaje desde Java"));
productor.send(new ProducerRecord<>(topic, "Otro mensaje desde Java"));
productor.send(new ProducerRecord<>(topic, "Un tercer mensaje desde Java"));
} finally {
productor.flush();
productor.close();
}
}
}
Nota sobre las claves: podremos extender el método send para añadir una clave de mensaje a cada mensaje, por ejemplo:
productor.send(new ProducerRecord<>(topic, "clave", "Un mensaje desde Java"));
Consumidor en Java
Tendremos que usar/importar el paquete de “org.apache.kafka.clients.consumer”. Primero establecemos las propiedades (usando la clase Properties) con los servidores en BOOTSTRAP_SERVERS_CONFIG, VALUE_DESERIALIZER_CLASS_CONFIG y KEY_DESERIALIZER_CLASS_CONFIG (Nota, en los Consumidores se DESeriealiza, no como en los Productores que se Serializa). Para deserializar utilizamos la clase de Java “StringDeserializer”. Crearemos un nuevo consumidor KafkConsumer con las propiedades anteriores, este consumidor los suscribiremos al topic con “suscribe” (se pueden suscribir a varios topics a la vez, por eso pide una colección, aunque le pasemos un solo topic), para traernos los mensajes utilizaremos el método “poll” (se le puede pasar un tiempo de espera, yo le paso 0 segundos pues en este ejemplo quiero que se ejecute de inmediato, pero lo ideal es que este código esté en un bucle infinito para que siempre se traiga mensajes y que espere un tiempo para no saturar la red y para gastar menos recursos). Entonces, creamos un bucle “for” para recorrer todos los mensajes que ha obtenido “poll” en la llamada anterior, el mensaje viene encapsulado dentro un objeto “ConsumerRecord” y se puede obtener el String del mensaje con el método “value” (también se puede obtener la clave si queremos, con “key”):
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class ConsumidorKafka {
public static void main(String[] args) {
String servidoresBootstrap = "27.0.173.161:9092";
String topic = "miNuevoTopic";
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servidoresBootstrap);
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumidor = new KafkaConsumer<>(props);
try {
consumidor.subscribe(Collections.singleton(topic));
ConsumerRecords<String, String> mensajes = consumidor.poll(Duration.ZERO);
for (ConsumerRecord<String, String> mensaje: mensajes) {
System.out.println(mensaje.value());
}
} finally {
consumidor.close();
}
}
}
Este código imprime:
Un mensaje desde Python
Otro mensaje desde Python
Un tercer mensaje desde Python
Nota sobre el “grupo de consumidores”: Podremos extender las propiedades con una nueva cuya clave sea GROUP_ID_CONFIG, como:
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "miGrupoConsumidor");
Nota si queremos reiniciar el Offset: para hacer pruebas querremos que nos devuelva siempre datos desde el principio, para ello estableceremos la propiedad AUTO_OFFSET_RESET_CONFIG a “earliest”:
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Nota para crear un servicio de escucha continua: Como se indicó antes, vale con englobar el “poll” (le he puesto que espere 1 segundo en cada vuelta al bucle, antes de realizar otra petición de datos) y el “for” que lee los mensajes en un “while(true)”, por ejemplo:
while (true) {
ConsumerRecords<String, String> mensajes = consumidor.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> mensaje: mensajes) {
System.out.println(mensaje.value());
}
}
excelente aporte, me despejo muchas dudas, muchas gracias por la info
Me alegro que te fuera útil Nibiru 🙂