Advertisement
Guest User

Untitled

a guest
Jun 26th, 2019
78
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 10.67 KB | None | 0 0
  1. public class Temperature {
  2. private double temperature;
  3. private Long timeStamp;
  4.  
  5. public Temperature(double temperature, Long timeStamp) {
  6. this.temperature = temperature;
  7. this.timeStamp = timeStamp;
  8. }
  9.  
  10. public double getTemperature() {
  11. return temperature;
  12. }
  13.  
  14. public void setTemperature(double temperature) {
  15. this.temperature = temperature;
  16. }
  17.  
  18. public Long getTimeStamp() {
  19. return timeStamp;
  20. }
  21.  
  22. public void setTimeStamp(Long timeStamp) {
  23. this.timeStamp = timeStamp;
  24. }
  25. }
  26.  
  27. import com.google.gson.Gson;
  28.  
  29. public class TemperatureStat {
  30.  
  31. private Double lowTemp;
  32. private Double highTemp;
  33. private Long timeStamp;
  34.  
  35.  
  36. public TemperatureStat() {
  37. }
  38.  
  39. public TemperatureStat(double lowTemp, double highTemp, long timeStamp) {
  40. this.lowTemp = lowTemp;
  41. this.highTemp = highTemp;
  42. this.timeStamp = timeStamp;
  43. }
  44.  
  45.  
  46.  
  47. public double getLowTemp() {
  48. return lowTemp;
  49. }
  50.  
  51. public void setLowTemp(double lowTemp) {
  52. this.lowTemp = lowTemp;
  53. }
  54.  
  55. public double getHighTemp() {
  56. return highTemp;
  57. }
  58.  
  59. public void setHighTemp(double highTemp) {
  60. this.highTemp = highTemp;
  61. }
  62.  
  63. public long getTimeStamp() {
  64. return timeStamp;
  65. }
  66.  
  67. public void setTimeStamp(long timeStamp) {
  68. this.timeStamp = timeStamp;
  69. }
  70.  
  71. public TemperatureStat update(String jsonTemp){
  72. Temperature temp = new Gson().fromJson(jsonTemp,Temperature.class);
  73. if(this.highTemp == null) this.highTemp = temp.getTemperature();
  74.  
  75. if(temp.getTemperature() > highTemp) highTemp = temp.getTemperature();
  76. else if(temp.getTemperature() < lowTemp) lowTemp = temp.getTemperature();
  77. else System.out.print("in between low and high");
  78. return this;
  79.  
  80. }
  81. }
  82.  
  83. import com.mypackage.JsonDeserializer;
  84. import com.mypackage.JsonSerializer;
  85. import com.mypackage.WrapperSerde;
  86. import org.apache.kafka.clients.consumer.ConsumerConfig;
  87. import org.apache.kafka.common.serialization.Serdes;
  88. import org.apache.kafka.common.utils.Bytes;
  89. import org.apache.kafka.streams.KafkaStreams;
  90. import org.apache.kafka.streams.StreamsBuilder;
  91. import org.apache.kafka.streams.StreamsConfig;
  92. import org.apache.kafka.streams.Topology;
  93. import org.apache.kafka.streams.kstream.*;
  94. import org.apache.kafka.streams.state.WindowStore;
  95.  
  96. import java.util.Properties;
  97.  
  98. public class StreamProcessor {
  99.  
  100. public static void main(String... args){
  101. Properties props = new Properties();
  102. props.put(StreamsConfig.APPLICATION_ID_CONFIG, "temp_stat_per_minute");
  103. props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  104. props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
  105. props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
  106. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  107.  
  108. StreamsBuilder builder = new StreamsBuilder();
  109. KStream<String, String> tempPerSecStream = builder.stream("temperature_per_sec_topic");
  110.  
  111. KTable<Windowed<String>, TemperatureStat> temperatureStatKTable = tempPerSecStream
  112. .groupByKey()
  113. .windowedBy(TimeWindows.of(60000L))
  114. .aggregate(() -> new TemperatureStat(),
  115. (k, v, temp) -> { return temp.update(v);
  116. },
  117. Materialized.<String, TemperatureStat , WindowStore<Bytes, byte[]>>as("TemperatureStat_statestore")
  118. .withKeySerde(Serdes.String())
  119. .withValueSerde(new TempStatSerde()));
  120.  
  121. KStream<String, TemperatureStat > temperatureStatKStream = temperatureStatKTable
  122. .toStream()
  123. .selectKey((k, v) -> k.key());
  124.  
  125. temperatureStatKStream.to("temp_stat_per_minute_topic ",Produced.with(Serdes.String(), new TempStatSerde()));
  126.  
  127. Topology topology = builder.build();
  128. KafkaStreams streams = new KafkaStreams(topology, props);
  129.  
  130. System.out.println(topology.describe());
  131.  
  132. streams.start();
  133. }
  134.  
  135. static public final class TempStatSerde extends WrapperSerde<TemperatureStat> {
  136. public TempStatSerde() {
  137. super(new JsonSerializer<TemperatureStat>(), new JsonDeserializer<TemperatureStat>(TemperatureStat.class));
  138. }
  139. }
  140.  
  141. static public final class TempSerde extends WrapperSerde<Temperature> {
  142. public TempSerde() {
  143. super(new JsonSerializer<Temperature>(), new JsonDeserializer<Temperature>(Temperature.class));
  144. }
  145. }
  146. }
  147.  
  148. private Gson gson = new Gson();
  149. private Class<T> deserializedClass;
  150.  
  151. public JsonDeserializer(Class<T> deserializedClass) {
  152. this.deserializedClass = deserializedClass;
  153. }
  154.  
  155. public JsonDeserializer() {
  156. }
  157.  
  158. @Override
  159. @SuppressWarnings("unchecked")
  160. public void configure(Map<String, ?> map, boolean b) {
  161. if(deserializedClass == null) {
  162. deserializedClass = (Class<T>) map.get("serializedClass");
  163. }
  164. }
  165.  
  166. @Override
  167. public T deserialize(String s, byte[] bytes) {
  168. if(bytes == null){
  169. return null;
  170. }
  171.  
  172. return gson.fromJson(new String(bytes),deserializedClass);
  173.  
  174. }
  175.  
  176. @Override
  177. public void close() {
  178.  
  179. }
  180.  
  181. import com.google.gson.Gson;
  182. import org.apache.kafka.common.serialization.Serializer;
  183.  
  184. import java.nio.charset.Charset;
  185. import java.util.Map;
  186.  
  187. public class JsonSerializer<T> implements Serializer<T> {
  188.  
  189. private Gson gson = new Gson();
  190.  
  191. @Override
  192. public void configure(Map<String, ?> map, boolean b) {
  193.  
  194. }
  195.  
  196. @Override
  197. public byte[] serialize(String topic, T t) {
  198. return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
  199. }
  200.  
  201. @Override
  202. public void close() {
  203.  
  204. }
  205. }
  206.  
  207. import org.apache.kafka.common.serialization.Deserializer;
  208. import org.apache.kafka.common.serialization.Serde;
  209. import org.apache.kafka.common.serialization.Serializer;
  210.  
  211. import java.util.Map;
  212.  
  213.  
  214. public class WrapperSerde<T> implements Serde<T> {
  215.  
  216. final private Serializer<T> serializer;
  217. final private Deserializer<T> deserializer;
  218.  
  219. public WrapperSerde(Serializer<T> serializer, Deserializer<T> deserializer) {
  220. this.serializer = serializer;
  221. this.deserializer = deserializer;
  222. }
  223.  
  224. @Override
  225. public void configure(Map<String, ?> configs, boolean isKey) {
  226. serializer.configure(configs, isKey);
  227. deserializer.configure(configs, isKey);
  228. }
  229.  
  230. @Override
  231. public void close() {
  232. serializer.close();
  233. deserializer.close();
  234. }
  235.  
  236. @Override
  237. public Serializer<T> serializer() {
  238. return serializer;
  239. }
  240.  
  241. @Override
  242. public Deserializer<T> deserializer() {
  243. return deserializer;
  244. }
  245. }
  246.  
  247. [temp_stat_per_minute-f91817ab-14d5-4b4e-8fe0-8a4e425e1b47-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
  248. 22:49:12.915 [temp_stat_per_minute-f91817ab-14d5-4b4e-8fe0-8a4e425e1b47-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [temp_stat_per_minute-f91817ab-14d5-4b4e-8fe0-8a4e425e1b47] State transition from RUNNING to ERROR
  249. 22:49:12.915 [temp_stat_per_minute-f91817ab-14d5-4b4e-8fe0-8a4e425e1b47-StreamThread-1] WARN org.apache.kafka.streams.KafkaStreams - stream-client [temp_stat_per_minute-f91817ab-14d5-4b4e-8fe0-8a4e425e1b47] All stream threads have died. The instance will be in error state and should be closed.
  250. 22:49:12.915 [temp_stat_per_minute-f91817ab-14d5-4b4e-8fe0-8a4e425e1b47-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [temp_stat_per_minute-f91817ab-14d5-4b4e-8fe0-8a4e425e1b47-StreamThread-1] Shutdown complete
  251. Exception in thread "temp_stat_per_minute-f91817ab-14d5-4b4e-8fe0-8a4e425e1b47-StreamThread-1" java.lang.ExceptionInInitializerError
  252. at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:50)
  253. at org.rocksdb.RocksDB.<clinit>(RocksDB.java:28)
  254. at org.rocksdb.Options.<clinit>(Options.java:17)
  255. at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:115)
  256. at org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:44)
  257. at org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(Segments.java:91)
  258. at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:100)
  259. at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:76)
  260. at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:81)
  261. at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:34)
  262. at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:99)
  263. at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
  264. at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
  265. at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:125)
  266. at org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:130)
  267. at org.apache.kafka.streams.state.internals.MeteredWindowStore.flush(MeteredWindowStore.java:160)
  268. at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:239)
  269. at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:198)
  270. at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:406)
  271. at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:380)
  272. at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:368)
  273. at org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
  274. at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:362)
  275. at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:352)
  276. at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:401)
  277. at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1035)
  278. at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:845)
  279. at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
  280. at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
  281. Caused by: java.lang.UnsupportedOperationException
  282. at org.rocksdb.util.Environment.getJniLibraryName(Environment.java:41)
  283. at org.rocksdb.NativeLibraryLoader.<clinit>(NativeLibraryLoader.java:11)
  284. ... 29 more
  285. Disconnected from the target VM, address: '127.0.0.1:56244', transport: 'socket'
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement