问题描述
我没有看到如何使用 camel-avro 组件生成和使用 kafka avro 消息的示例?目前我的骆驼路线是这样的.为了使用模式注册和其他类似的道具,应该改变它使用camel-kafka-avro consumer &制片人.
i dont see an example of how to use camel-avro component to produce and consume kafka avro messages? currently my camel route is this. what should it be changed in order to work with schema-registry and other props like this using camel-kafka-avro consumer & producer.
props.put(abstractkafkaavroserdeconfig.schema_registry_url_config, "http://localhost:8081");
props.put(consumerconfig.key_deserializer_class_config, stringdeserializer.class);
props.put(consumerconfig.value_deserializer_class_config, kafkaavrodeserializer.class);
props.put(kafkaavrodeserializerconfig.specific_avro_reader_config, true);
public void configure() {
propertiescomponent pc = getcontext().getcomponent("properties", propertiescomponent.class);
pc.setlocation("classpath:application.properties");
log.info("about to start route: kafka server -> log ");
from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
"&maxpollrecords={{consumer.maxpollrecords}}"
"&consumerscount={{consumer.consumerscount}}"
"&seekto={{consumer.seekto}}"
"&groupid={{consumer.group}}"
"&valuedeserializer=" kafkaavrodeserializer.class
"&keydeserializer=" stringdeserializer.class
)
.routeid("fromkafka")
.log("${body}");
推荐答案
我正在回答我自己的问题,因为我在这个问题上坐了几天.我希望这个答案对其他人有帮助.
i'm answering my own question because i sat on this problem for couple days. i hope this answer will be helpful for others.
我尝试使用 io.confluent.kafka.serializers.kafkaavrodeserializer 反序列化器并得到 kafka 异常.所以我不得不编写自己的反序列化器来做以下事情:
i tried to use io.confluent.kafka.serializers.kafkaavrodeserializer deserializer and got kafka exception. so i had to write my own deserializer to do following things:
- 设置架构注册表
- 使用特定的 avro 阅读器(这意味着不是默认的 stringdeserializer)
然后我们必须访问schemaregistry"、usespecificavroreader"并设置 abstractkafkaavrodeserializer(io.confluent.kafka.serializers.abstractkafkaavrodeserializer) 的这些字段
then we must access "schemaregistry", "usespecificavroreader" and set those fields of the abstractkafkaavrodeserializer(io.confluent.kafka.serializers.abstractkafkaavrodeserializer)
这里是尊龙凯时的解决方案...
public static void main(string[] args) throws exception {
log.info("about to run kafka-camel integration...");
camelcontext camelcontext = new defaultcamelcontext();
// add route to send messages to kafka
camelcontext.addroutes(new routebuilder() {
public void configure() throws exception {
propertiescomponent pc = getcontext().getcomponent("properties",
propertiescomponent.class);
pc.setlocation("classpath:application.properties");
log.info("about to start route: kafka server -> log ");
from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
"&maxpollrecords={{consumer.maxpollrecords}}"
"&consumerscount={{consumer.consumerscount}}"
"&seekto={{consumer.seekto}}"
"&groupid={{consumer.group}}"
"&keydeserializer=" stringdeserializer.class.getname()
"&valuedeserializer=" customkafkaavrodeserializer.class.getname()
)
.routeid("fromkafka")
.log("${body}");
}
});
camelcontext.start();
// let it run for 5 minutes before shutting down
thread.sleep(5 * 60 * 1000);
camelcontext.stop();
}
deserializer classs - 这设置了 schema.registry.url &在抽象 abstractkafkaavrodeserializer 级别使用.specific.avro.reader.如果我不设置这个,我会得到 kafka-config-exception.
package com.example.camel.kafka.avro;
import java.util.collections;
import java.util.list;
import java.util.map;
import io.confluent.common.config.configexception;
import io.confluent.kafka.schemaregistry.client.cachedschemaregistryclient;
import io.confluent.kafka.serializers.abstractkafkaavrodeserializer;
import io.confluent.kafka.serializers.kafkaavrodeserializerconfig;
import org.apache.kafka.common.serialization.deserializer;
public class customkafkaavrodeserializer extends abstractkafkaavrodeserializer
implements deserializer
角落落的光