Advertisement
Guest User

Untitled

a guest
Sep 17th, 2018
105
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.23 KB | None | 0 0
  1. {
  2. "type": "record",
  3. "name": "myrecord",
  4. "fields": [
  5. { "name": "int1", "type": "int" },
  6. { "name": "str1", "type": "string" },
  7. { "name": "str2", "type": "string" }
  8. ]
  9. }
  10.  
  11. {"name": "mssql-source",
  12. "config": {
  13. "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  14. "key.converter": "io.confluent.connect.avro.AvroConverter",
  15. "key.converter.schema.registry.url": "http://localhost:8081",
  16. "value.converter": "io.confluent.connect.avro.AvroConverter",
  17. "value.converter.schema.registry.url": "http://localhost:8081",
  18. "incrementing.column.name": "int1",
  19. "tasks.max": "1",
  20. "table.whitelist": "Hello",
  21. "mode": "incrementing",
  22. "topic.prefix": "mssql-",
  23. "name": "mssql-source",
  24. "connection.url":
  25. "jdbc:sqlserver://XXX.XXX.X;databaseName=XXX;username=XX;password=XX"
  26. }
  27.  
  28. import com.twitter.bijection.Injection;
  29. import com.twitter.bijection.avro.GenericAvroCodecs;
  30. import kafka.serializer.DefaultDecoder;
  31. import kafka.serializer.StringDecoder;
  32. import org.apache.avro.Schema;
  33. import org.apache.avro.generic.GenericRecord;
  34. import org.apache.spark.SparkConf;
  35. import org.apache.spark.api.java.JavaSparkContext;
  36. import org.apache.spark.streaming.Duration;
  37. import org.apache.spark.streaming.api.java.JavaPairInputDStream;
  38. import org.apache.spark.streaming.api.java.JavaStreamingContext;
  39. import org.apache.spark.streaming.kafka.KafkaUtils;
  40.  
  41. import java.util.Collections;
  42. import java.util.HashMap;
  43. import java.util.Map;
  44. import java.util.Set;
  45.  
  46. public class SparkAvroConsumer {
  47.  
  48.  
  49. private static Injection<GenericRecord, byte[]> recordInjection;
  50.  
  51. private static final String USER_SCHEMA = "{"
  52. + ""type":"record","
  53. + ""name":"myrecord","
  54. + ""fields":["
  55. + " { "name":"int1", "type":"int" },"
  56. + " { "name":"str1", "type":"string" },"
  57. + " { "name":"str2", "type":"string" }"
  58. + "]}";
  59.  
  60. static {
  61. Schema.Parser parser = new Schema.Parser();
  62. Schema schema = parser.parse(USER_SCHEMA);
  63. recordInjection = GenericAvroCodecs.toBinary(schema);
  64. }
  65.  
  66. public static void main(String[] args) {
  67.  
  68. SparkConf conf = new SparkConf()
  69. .setAppName("kafka-sandbox")
  70. .setMaster("local[*]");
  71. JavaSparkContext sc = new JavaSparkContext(conf);
  72. JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));
  73.  
  74. Set<String> topics = Collections.singleton("mssql-Hello");
  75. Map<String, String> kafkaParams = new HashMap<>();
  76. kafkaParams.put("metadata.broker.list", "localhost:9092");
  77. kafkaParams.put("metadata.broker.list", "localhost:9092");
  78. kafkaParams.put("schema.registry.url", "http://localhost:8081");
  79. JavaPairInputDStream<String, byte[]> directKafkaStream = KafkaUtils.createDirectStream(ssc,
  80. String.class, byte[].class, StringDecoder.class, DefaultDecoder.class, kafkaParams, topics);
  81.  
  82. directKafkaStream
  83. .map(message -> recordInjection.invert(message._2).get())
  84. .foreachRDD(rdd -> {
  85. rdd.foreach(record -> {
  86. System.out.println("int1= " + record.get("int1")
  87. + ", str1= " + record.get("str1")
  88. + ", str2=" + record.get("str2"));
  89. });
  90. });
  91.  
  92. ssc.start();
  93. ssc.awaitTermination();
  94. }
  95.  
  96. }
  97.  
  98. var schemaRegistry: SchemaRegistryClient = null
  99. val url = "http://schema.registry.url:8181"
  100. schemaRegistry = new CachedSchemaRegistryClient(url, 10)
  101. val schema = schemaRegistry.getByID(schemaId) // consult the Schema Registry if you know the `SchemaId` in advance (you get this while registering your Schema)
  102. //CachedSchemaRegistryClient have getAllSubjects API that will return all the schemas in your registry.
  103. println(schema)
  104.  
  105. def getSchema(buffer: Array[Byte]): String = { //buffer is your incoming binary Avro message
  106. val url = "http://schema.registry.url:8181"
  107. val schemaRegistry = new CachedSchemaRegistryClient(url, 10)
  108. val bb = ByteBuffer.wrap(buffer)
  109. bb.get() // consume MAGIC_BYTE
  110. val schemaId = bb.getInt // consume schemaId //println(schemaId.toString)
  111. //println(schemaId.toString)
  112. schemaRegistry.getByID(schemaId) // consult the Schema Registry
  113. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement