Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- {
- "type": "record",
- "name": "myrecord",
- "fields": [
- { "name": "int1", "type": "int" },
- { "name": "str1", "type": "string" },
- { "name": "str2", "type": "string" }
- ]
- }
- {"name": "mssql-source",
- "config": {
- "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
- "key.converter": "io.confluent.connect.avro.AvroConverter",
- "key.converter.schema.registry.url": "http://localhost:8081",
- "value.converter": "io.confluent.connect.avro.AvroConverter",
- "value.converter.schema.registry.url": "http://localhost:8081",
- "incrementing.column.name": "int1",
- "tasks.max": "1",
- "table.whitelist": "Hello",
- "mode": "incrementing",
- "topic.prefix": "mssql-",
- "name": "mssql-source",
- "connection.url":
- "jdbc:sqlserver://XXX.XXX.X;databaseName=XXX;username=XX;password=XX"
- }
- import com.twitter.bijection.Injection;
- import com.twitter.bijection.avro.GenericAvroCodecs;
- import kafka.serializer.DefaultDecoder;
- import kafka.serializer.StringDecoder;
- import org.apache.avro.Schema;
- import org.apache.avro.generic.GenericRecord;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.streaming.Duration;
- import org.apache.spark.streaming.api.java.JavaPairInputDStream;
- import org.apache.spark.streaming.api.java.JavaStreamingContext;
- import org.apache.spark.streaming.kafka.KafkaUtils;
- import java.util.Collections;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.Set;
- public class SparkAvroConsumer {
- private static Injection<GenericRecord, byte[]> recordInjection;
- private static final String USER_SCHEMA = "{"
- + ""type":"record","
- + ""name":"myrecord","
- + ""fields":["
- + " { "name":"int1", "type":"int" },"
- + " { "name":"str1", "type":"string" },"
- + " { "name":"str2", "type":"string" }"
- + "]}";
- static {
- Schema.Parser parser = new Schema.Parser();
- Schema schema = parser.parse(USER_SCHEMA);
- recordInjection = GenericAvroCodecs.toBinary(schema);
- }
- public static void main(String[] args) {
- SparkConf conf = new SparkConf()
- .setAppName("kafka-sandbox")
- .setMaster("local[*]");
- JavaSparkContext sc = new JavaSparkContext(conf);
- JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));
- Set<String> topics = Collections.singleton("mssql-Hello");
- Map<String, String> kafkaParams = new HashMap<>();
- kafkaParams.put("metadata.broker.list", "localhost:9092");
- kafkaParams.put("metadata.broker.list", "localhost:9092");
- kafkaParams.put("schema.registry.url", "http://localhost:8081");
- JavaPairInputDStream<String, byte[]> directKafkaStream = KafkaUtils.createDirectStream(ssc,
- String.class, byte[].class, StringDecoder.class, DefaultDecoder.class, kafkaParams, topics);
- directKafkaStream
- .map(message -> recordInjection.invert(message._2).get())
- .foreachRDD(rdd -> {
- rdd.foreach(record -> {
- System.out.println("int1= " + record.get("int1")
- + ", str1= " + record.get("str1")
- + ", str2=" + record.get("str2"));
- });
- });
- ssc.start();
- ssc.awaitTermination();
- }
- }
- var schemaRegistry: SchemaRegistryClient = null
- val url = "http://schema.registry.url:8181"
- schemaRegistry = new CachedSchemaRegistryClient(url, 10)
- val schema = schemaRegistry.getByID(schemaId) // consult the Schema Registry if you know the `SchemaId` in advance (you get this while registering your Schema)
- //CachedSchemaRegistryClient have getAllSubjects API that will return all the schemas in your registry.
- println(schema)
- def getSchema(buffer: Array[Byte]): String = { //buffer is your incoming binary Avro message
- var schemaRegistry: SchemaRegistryClient = null
- val url = "http://schema.registry.url:8181"
- schemaRegistry = new CachedSchemaRegistryClient(url, 10)
- val bb = ByteBuffer.wrap(buffer)
- bb.get() // consume MAGIC_BYTE
- val schemaId = bb.getInt // consume schemaId //println(schemaId.toString)
- //println(schemaId.toString)
- val schema = schemaRegistry.getByID(schemaId) // consult the Schema Registry //println(schema)
- schema
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement