roll11226

TimeseriesStream

Feb 10th, 2017
68
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.24 KB | None | 0 0
  1. package com.github.pmtischler.base;
  2.  
  3. import java.io.EOFException;
  4. import java.io.InputStream;
  5. import java.io.ObjectInputStream;
  6. import java.io.ObjectOutputStream;
  7. import java.io.OutputStream;
  8. import java.util.ArrayList;
  9. import java.util.List;
  10.  
  11. /**
  12. * Timeseries streaming.
  13. * Writes and reads timeseries streams.
  14. */
  15. public class TimeseriesStream {
  16. /**
  17. * Data point in a time series.
  18. * A variable has a Datapoint's value until the next instance in the stream.
  19. */
  20. public static class DataPoint implements java.io.Serializable {
  21. // Name of the variable.
  22. public final String varname;
  23. // Time of the data point (seconds, inclusive).
  24. public final double timestamp;
  25. // Value of the variable at the timestamp.
  26. public final double value;
  27.  
  28. /**
  29. * Creates a DataPoint.
  30. * @param varname The name of the variable.
  31. * @param timestamp The time of the data point.
  32. * @param value The value of the variable.
  33. */
  34. public DataPoint(String varname, double timestamp, double value) {
  35. this.varname = varname;
  36. this.timestamp = timestamp;
  37. this.value = value;
  38. }
  39. }
  40.  
  41. /**
  42. * Timeseries writer.
  43. */
  44. public static class Writer {
  45. /**
  46. * Creates the Writer.
  47. * @param outputStream The output stream to write to.
  48. */
  49. public Writer(OutputStream outputStream) throws Exception {
  50. this.outputStream = new ObjectOutputStream(outputStream);
  51. lastTimestamp = 0;
  52. }
  53.  
  54. /**
  55. * Writes a DataPoint to the output stream.
  56. * Calls to this function must be done with non-decreasing timestamps.
  57. */
  58. public void write(DataPoint point) throws Exception {
  59. if (point.timestamp < lastTimestamp) {
  60. throw new IllegalArgumentException("Timestamp decreased.");
  61. }
  62. outputStream.writeObject(point);
  63. lastTimestamp = point.timestamp;
  64. }
  65.  
  66. // The object output stream.
  67. private ObjectOutputStream outputStream;
  68. // The last timestamp seen.
  69. private double lastTimestamp;
  70. }
  71.  
  72. /**
  73. * Timeseries reader.
  74. */
  75. public static class Reader {
  76. /**
  77. * Creates the Reader.
  78. * @param inputStream The input stream to read from.
  79. */
  80. public Reader(InputStream inputStream) throws Exception {
  81. this.inputStream = new ObjectInputStream(inputStream);
  82. nextPoint = null;
  83. }
  84.  
  85. /**
  86. * Reads a DataPoint from the input stream.
  87. * @return DataPoint if available, null otherwise.
  88. */
  89. public DataPoint read() throws Exception {
  90. // Return next data point if stored.
  91. if (nextPoint != null) {
  92. DataPoint ret = nextPoint;
  93. nextPoint = null;
  94. return ret;
  95. }
  96. try {
  97. return (DataPoint)inputStream.readObject();
  98. } catch (EOFException e) {
  99. return null;
  100. }
  101. }
  102.  
  103. /**
  104. * Reads all DataPoint up to specific time.
  105. * @param time The timestamp to read up to (seconds, inclusive).
  106. * @return The DataPoint read.
  107. */
  108. public List<DataPoint> readUntil(double time) throws Exception {
  109. ArrayList<DataPoint> points = new ArrayList<DataPoint>();
  110. // Read from stream until past time, then store as next.
  111. while (true) {
  112. DataPoint p = read(); // Returns next if stored.
  113. if (p == null) {
  114. // No more points.
  115. return points;
  116. }
  117. if (p.timestamp <= time) {
  118. points.add(p);
  119. } else {
  120. // Point past read time, store for future.
  121. nextPoint = p;
  122. return points;
  123. }
  124. }
  125. }
  126.  
  127. // The object input stream.
  128. private ObjectInputStream inputStream;
  129. // The next point to be returned.
  130. private DataPoint nextPoint;
  131. }
  132. }
Add Comment
Please, Sign In to add comment