Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public class Temperature {
- private double temperature;
- private Long timeStamp;
- public Temperature(double temperature, Long timeStamp) {
- this.temperature = temperature;
- this.timeStamp = timeStamp;
- }
- public double getTemperature() {
- return temperature;
- }
- public void setTemperature(double temperature) {
- this.temperature = temperature;
- }
- public Long getTimeStamp() {
- return timeStamp;
- }
- public void setTimeStamp(Long timeStamp) {
- this.timeStamp = timeStamp;
- }
- }
- import com.google.gson.Gson;
- public class TemperatureStat {
- private Double lowTemp;
- private Double highTemp;
- private Long timeStamp;
- public TemperatureStat() {
- }
- public TemperatureStat(double lowTemp, double highTemp, long timeStamp) {
- this.lowTemp = lowTemp;
- this.highTemp = highTemp;
- this.timeStamp = timeStamp;
- }
- public double getLowTemp() {
- return lowTemp;
- }
- public void setLowTemp(double lowTemp) {
- this.lowTemp = lowTemp;
- }
- public double getHighTemp() {
- return highTemp;
- }
- public void setHighTemp(double highTemp) {
- this.highTemp = highTemp;
- }
- public long getTimeStamp() {
- return timeStamp;
- }
- public void setTimeStamp(long timeStamp) {
- this.timeStamp = timeStamp;
- }
- public TemperatureStat update(String jsonTemp){
- Temperature temp = new Gson().fromJson(jsonTemp,Temperature.class);
- if(this.highTemp == null) this.highTemp = temp.getTemperature();
- if(temp.getTemperature() > highTemp) highTemp = temp.getTemperature();
- else if(temp.getTemperature() < lowTemp) lowTemp = temp.getTemperature();
- else System.out.print("in between low and high");
- return this;
- }
- }
- import com.mypackage.JsonDeserializer;
- import com.mypackage.JsonSerializer;
- import com.mypackage.WrapperSerde;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.common.serialization.Serdes;
- import org.apache.kafka.common.utils.Bytes;
- import org.apache.kafka.streams.KafkaStreams;
- import org.apache.kafka.streams.StreamsBuilder;
- import org.apache.kafka.streams.StreamsConfig;
- import org.apache.kafka.streams.Topology;
- import org.apache.kafka.streams.kstream.*;
- import org.apache.kafka.streams.state.WindowStore;
- import java.util.Properties;
- public class StreamProcessor {
- public static void main(String... args){
- Properties props = new Properties();
- props.put(StreamsConfig.APPLICATION_ID_CONFIG, "temp_stat_per_minute");
- props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- StreamsBuilder builder = new StreamsBuilder();
- KStream<String, String> tempPerSecStream = builder.stream("temperature_per_sec_topic");
- KTable<Windowed<String>, TemperatureStat> temperatureStatKTable = tempPerSecStream
- .groupByKey()
- .windowedBy(TimeWindows.of(60000L))
- .aggregate(() -> new TemperatureStat(),
- (k, v, temp) -> { return temp.update(v);
- },
- Materialized.<String, TemperatureStat , WindowStore<Bytes, byte[]>>as("TemperatureStat_statestore")
- .withKeySerde(Serdes.String())
- .withValueSerde(new TempStatSerde()));
- KStream<String, TemperatureStat > temperatureStatKStream = temperatureStatKTable
- .toStream()
- .selectKey((k, v) -> k.key());
- temperatureStatKStream.to("temp_stat_per_minute_topic ",Produced.with(Serdes.String(), new TempStatSerde()));
- Topology topology = builder.build();
- KafkaStreams streams = new KafkaStreams(topology, props);
- System.out.println(topology.describe());
- streams.start();
- }
- static public final class TempStatSerde extends WrapperSerde<TemperatureStat> {
- public TempStatSerde() {
- super(new JsonSerializer<TemperatureStat>(), new JsonDeserializer<TemperatureStat>(TemperatureStat.class));
- }
- }
- static public final class TempSerde extends WrapperSerde<Temperature> {
- public TempSerde() {
- super(new JsonSerializer<Temperature>(), new JsonDeserializer<Temperature>(Temperature.class));
- }
- }
- }
- private Gson gson = new Gson();
- private Class<T> deserializedClass;
- public JsonDeserializer(Class<T> deserializedClass) {
- this.deserializedClass = deserializedClass;
- }
- public JsonDeserializer() {
- }
- @Override
- @SuppressWarnings("unchecked")
- public void configure(Map<String, ?> map, boolean b) {
- if(deserializedClass == null) {
- deserializedClass = (Class<T>) map.get("serializedClass");
- }
- }
- @Override
- public T deserialize(String s, byte[] bytes) {
- if(bytes == null){
- return null;
- }
- return gson.fromJson(new String(bytes),deserializedClass);
- }
- @Override
- public void close() {
- }
- import com.google.gson.Gson;
- import org.apache.kafka.common.serialization.Serializer;
- import java.nio.charset.Charset;
- import java.util.Map;
- public class JsonSerializer<T> implements Serializer<T> {
- private Gson gson = new Gson();
- @Override
- public void configure(Map<String, ?> map, boolean b) {
- }
- @Override
- public byte[] serialize(String topic, T t) {
- return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
- }
- @Override
- public void close() {
- }
- }
- import org.apache.kafka.common.serialization.Deserializer;
- import org.apache.kafka.common.serialization.Serde;
- import org.apache.kafka.common.serialization.Serializer;
- import java.util.Map;
- public class WrapperSerde<T> implements Serde<T> {
- final private Serializer<T> serializer;
- final private Deserializer<T> deserializer;
- public WrapperSerde(Serializer<T> serializer, Deserializer<T> deserializer) {
- this.serializer = serializer;
- this.deserializer = deserializer;
- }
- @Override
- public void configure(Map<String, ?> configs, boolean isKey) {
- serializer.configure(configs, isKey);
- deserializer.configure(configs, isKey);
- }
- @Override
- public void close() {
- serializer.close();
- deserializer.close();
- }
- @Override
- public Serializer<T> serializer() {
- return serializer;
- }
- @Override
- public Deserializer<T> deserializer() {
- return deserializer;
- }
- }
- [temp_stat_per_minute-f91817ab-14d5-4b4e-8fe0-8a4e425e1b47-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
- 22:49:12.915 [temp_stat_per_minute-f91817ab-14d5-4b4e-8fe0-8a4e425e1b47-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [temp_stat_per_minute-f91817ab-14d5-4b4e-8fe0-8a4e425e1b47] State transition from RUNNING to ERROR
- 22:49:12.915 [temp_stat_per_minute-f91817ab-14d5-4b4e-8fe0-8a4e425e1b47-StreamThread-1] WARN org.apache.kafka.streams.KafkaStreams - stream-client [temp_stat_per_minute-f91817ab-14d5-4b4e-8fe0-8a4e425e1b47] All stream threads have died. The instance will be in error state and should be closed.
- 22:49:12.915 [temp_stat_per_minute-f91817ab-14d5-4b4e-8fe0-8a4e425e1b47-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [temp_stat_per_minute-f91817ab-14d5-4b4e-8fe0-8a4e425e1b47-StreamThread-1] Shutdown complete
- Exception in thread "temp_stat_per_minute-f91817ab-14d5-4b4e-8fe0-8a4e425e1b47-StreamThread-1" java.lang.ExceptionInInitializerError
- at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:50)
- at org.rocksdb.RocksDB.<clinit>(RocksDB.java:28)
- at org.rocksdb.Options.<clinit>(Options.java:17)
- at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:115)
- at org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:44)
- at org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(Segments.java:91)
- at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:100)
- at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:76)
- at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:81)
- at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:34)
- at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:99)
- at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
- at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
- at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:125)
- at org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:130)
- at org.apache.kafka.streams.state.internals.MeteredWindowStore.flush(MeteredWindowStore.java:160)
- at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:239)
- at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:198)
- at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:406)
- at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:380)
- at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:368)
- at org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
- at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:362)
- at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:352)
- at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:401)
- at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1035)
- at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:845)
- at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
- at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
- Caused by: java.lang.UnsupportedOperationException
- at org.rocksdb.util.Environment.getJniLibraryName(Environment.java:41)
- at org.rocksdb.NativeLibraryLoader.<clinit>(NativeLibraryLoader.java:11)
- ... 29 more
- Disconnected from the target VM, address: '127.0.0.1:56244', transport: 'socket'
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement