a guest Nov 14th, 2019 146 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. 1.2 "Streaming Data” by Andrew Psaltis
  2. 1 - Introducing streaming data
  5. 2 - Data Ingestion
  6. Data Ingestion is where the data enters the system, from the various sources of data into the data pipeline. Regardless of the protocol used by a particular source, there are few patterns that describe how the data ingestion can be made.
  8. Patterns
  9. - Request/Response Pattern
  10. Simplest pattern.
  11. Sync: Client and server wait until they receive response and until they can send response, respectively
  12. Half async: Client continues to work after sending request, server waits until they can send response
  13. Full async: Client and server continue to work after sending request and receiving request, respectively
  15. - Request/Acknowledge Pattern
  16. Similar to Request/Response, but no response is needed from server. First request returns an acknowledgement that contains info to make further requests (can be used to identify a client, for example).
  18. - Publish/Subscribe Pattern
  19. A Consumer sends a message of a particular Topic to the Broker, who sends it to all Consumers who subscribe to the given Topic. This last step can also be done in a pull fashion, i.e., the Consumer pulls the messages from the Broker.
  20. The decoupling between consumer and producer provided by this pattern allows for more scalable solutions.
  22. - One-way Pattern
  23. Also known as “Fire and Forget” this pattern is useful when the client device can’t or doesn’t need to receive a response from the server and where some data can be lost without much loss of reliability.
  25. - Stream pattern
  26. The client “becomes” the server. The server sends a request to the client (stream source) and the client responds with no data or a continual flow of data. This is powerful for ingesting streams and producing another stream.
  28. Scaling
  29. The Request/Response patterns (all but the stream pattern) can be scaled horizontally very well, as the stateless nature of these patterns allows any client device to send data to any server and more servers can be added without any changes to the currently running instances.
  30. The Stream pattern doesn’t allow this easy horizontal scaling, due to the coupling that happens between the client and the collection node that is receiving the stream. We can scale up the collection nodes, or add a buffering layer between the connected collection node, who only receives the message and the processing collection nodes, who can do business logic on the message.
  32. Fault Tolerance
  33. Fault Tolerance is important to make sure that when a collection node crashes, we can recover the data and continue working as if the crash didn’t happen. This may be crucial in cases where the client can’t reproduce the data that was sent to a crashed node.
  34. There are two primary approaches to fault tolerance: checkpointing and logging.
  36. There are various checkpoint-based protocols, but most have the following characteristics: the protocol requires a global snapshot (meaning we have to save all the data and computations from the collection tier and save it in persistent storage) and there is potential for data loss, as data or computation that was not saved in the latest global snapshot is lost. Checkpointing is not very appropriate for streaming, as it is hard to consistently capture a snapshot of a system with as many layers and as many data movement as a streaming system.
  38. Logging based solutions for fault tolerance are more suited to these systems, as they remove the need for a global snapshot and instead focus on keeping track of all the messages that have to be re-sent in the case of a crash. We can have receiver-based message logging, sender-based message logging and hybrid message logging.
  40. - Receiver-based message logging
  41. All messages sent from the client to the collection tier are stored in permanent storage before any processing is done on it (before any collection tier business logic is performed), the message is then processed and sent to the next tier. In the case of a crash, the node is taken out of the load balancer rotation and all messages stored in permanent storage are processed. After recovering, it is put back on rotation and the data flow resumes.
  43. - Sender-based message logging
  44. All messages are stored in permanent storage before being sent to next tier, but after the collection business logic is done. An SBML logger records all messages before sending it to the next tier, and, if there was no error, it receives an ACK from the message queuing tier, which allows the SBML logger to delete that message from storage, avoiding sending the same message twice to the next tier. The recovery process is similar to RBML, with the added complexity of deleting all messages that receive an ACK from the next tier.
  46. - Hybrid message logging
  47. HML is based on the combination of RBML and SBML, but has two key differences to reduce the performance impact of logging: there is only one message permanent storage for both incoming and outgoing messages and the writes to permanent storage are done asynchronously, which allows parallel writes to storage.
  49. 3 - Message Queue
  50. The next step in the pipeline is the message queue tier, which gets the messages that are collected in the previous tier to the analysis tier. The need for this layer comes from the fact that the decoupling it provides a higher level of abstraction for the other tiers, and substitutes explicits calls for passing messages. This is particularly useful in distributed systems.
  51. The components in this tier are the producer, the broker and the consumer. The broker writes all messages it receives from the producer to a message queue (a broker can manage several message queues) from which it is read by the consumer.
  53. If this tier didn’t provide decoupling between the collection and analysis tiers, we could have an overflow of data from the collection tier and have an analysis tier that isn’t able to keep up with the data that is entering the pipeline, eventually leading to loss of data. We may have a situation where we may want the analysis tier to only consume information in a certain time-frame, to support a batch-processing model. While all message queues provide the decoupling, not all allow the messages to read at the rate the developer desires. The ones who allow consumers to be offline from time to time support durable messaging.
  55. Durable messaging stores all incoming messages into durable storage (filesystem, JDBC), leading to support for offline consumers, for a more complete decoupling of consumers and producers and for disaster recovery (for example, if a connection between a collection tier server and the message queue server is severed temporarily).
  57. Message Delivery Semantics
  58. Message delivery semantics are guarantees about the messages being sent in a message queue. We have at most once, at least once and exactly once. Not all message queues support exactly once, but it is possible to implement it with some coordination between the broker and the producers/consumers.
  59. We have 6 possible points of failure: the Producer (can lose message or send duplicate), the network between the Producer and the Broker (can send duplicate), the Broker (can lose messages, receive duplicates or allow the reading of duplicates), the Message Queue (can lose messages), the network between the Broker and the Producer (can lose message or allow the reading of duplicates) and the Producer (can read duplicates from itself or another Producer).
  61. To implement the exactly once semantic on top of at least once, we need to do two things: before resending a message to the Broker, the Producer must read data form the Broken to ensure that the message was not received; and store metadata from the last message received from the Broker in the Consumer in persistent storage, to ensure there is no duplicate reading.
  62. This approach has two bonuses: message auditing, meaning that we can track the time it takes to process messages based on the Consumer’s persistent storage, and duplicate detection.
  64. Security/Fault Tolerance
  65. There are various places in the message queue pipeline that need to have security, encrypting all messages on wire (in the network between producers/consumers and broker or between brokers) and at rest (in the brokers’ durable storage) and authenticating all producers and consumers connected to the brokers.
  69. 4 - Data Analysis
  71. In-Flight Data Analysis/Continuous Queries
  72. Data Analysis in Streaming Systems has a lot of differences from traditional DBMS. The data that is processed is in-flight data, meaning the data is always at motion and never persisted to durable storage. This contrasts with the at rest data of a traditional DBMS. Our goal is to pull the data from the message queuing tier as fast as possible, and to reevaluate the query results as we go along. Queries that deal with in-flight data are called continuous queries.
  73. Instead of loading the data into memory and performing a query every x units of time, the query is always executing and calculates results continuously, pushing the result into the next tier of the pipeline. This takes a different approach to DBMS, by assuming users to be passive and the data management systems to be active.
  75. Some differences between DBMS and DSMS include:
  76. - Query Model: While in DBMS queries are executed one-time and query a consistent state of data, in DSMS queries are continuously executed over a flow of data. In DBMS queries are submitted, answered and then forgotten (pull model) and in DSMS queries’ results are regularly sent the client (push model).
  77. - Changing data: During down time in DBMS, the data can’t change, but in DSMS the data can keep flowing while the analysis tier is down, leading to the need to catch up upon recovery.
  78. - Query state: If the system crashes in a DBMS, queries have to be re-issued by the user when the system recovers, while in DSMS continuous queries may need to continue where they left off.
  80. Distributed Stream-Processing Architecture
  81. To answer to the Big Data demands of today, we must think of the stream processing system as a distributed one.
  82. Although there are some differences between some of the most popular query processing systems, they have some features in common:
  83. They all receive streaming applications to be executed in separate cluster nodes based on data from data sources.
  85. The Application Driver component receives the streaming program and the driver to submit the job to the nodes.
  86. The Streaming Manager controls the life cycle of the processors, getting the program to the processing nodes.
  87. The Stream Processor runs the job that was submitted.
  88. The Data Source(s) represent the input to the jobs, which may require one or more inputs. Outputs can be routed to data sources to be used again or sent to other systems or sent to the driver for collection.
  90. Some of the most popular distributed stream-processing systems include Apache Spark Streaming, Apache Storm, Apache Flink and Apache Samza.
  91. (Overview of popular DSMS?)
  93. Key Features of Stream Processing Frameworks
  94. Message Delivery Semantics
  95. Once again, we have the 3 semantic guarantees, each with a particular application.
  96. At-most-once requires no special logic: messages may be lost, but duplicates never happen. It is simple, but it can lose data.
  97. At-least-once requires the system to keep track of sent messages which were acknowledged, leading to no data loss, but duplicates can be sent. A strategy to deal with this (and a general good practice) is to make the streaming job idempotent, meaning that every time a certain message is received, the streaming job produces the same result.
  98. Exactly-once requires even more logic, but ensures that no duplicate messages are received.
  100. State Management
  101. Most streaming use cases have dependencies to previous messages, meaning state management is a crucial point to consider. We may have to store an aggregation result for a period of time, for example.
  102. The complexity of the solutions for state management vary from in-memory storage (prone to failures) to replicated queryable persistent storage. While in-memory storage may be enough for some simpler use cases, replicated queryable persistent storage can answer some more complicated queries, allowing, for example, for the joining streams.
  104. Fault Tolerance
  105. We have several points of failure in components and network, but we can boil down to two main errors: Data Loss (in the processor or in the network) or Loss of Resource Management (when the manager or, in some cases, the application driver fails).
  106. Data Loss can be tackled with replication and coordination, by copying the state of the computation to other replicas. There are two general approaches: state-machine and rollback recovery, both assuming the streaming algorithms are idempotent.
  107. The state-machine approach requires k+1 (for a k-fault tolerant system) times more resources than a single replica execution, as the algorithm is run on k+1 replicas, leading to quick failover but extra resource cost.
  108. The rollback approach uses checkpoints and logs to re-do operations after a failure. After a checkpoint, the state is written to disk or to a different replica (in case the disk cost is too high). The operations between checkpoints are stored to a persistent log, and in case of failure, the state of the last checkpoint is recovered, and the log is then replayed to return to the pre-failure state. This approach has lower overhead but the failover time is higher.
  110. 5 - Algorithms for Data Analysis
  111. When considering processing algorithms there are some constraints we have to keep in mind:
  112. - One pass - We must assume we only have one change to process each tuple. Many data mining algorithms are iterative, so we must adapt them to the streaming scenario.
  113. - Concept drift - Predictive models may be impacted by this as data evolves and properties of it change.
  114. - Resource constraints - There may be times where, due to a temporary peak in data speed or volume and the resources available, where an algorithms may have to drop tuples that can’t be processed in time (load shredding).
  115. - Domain constraints - Depending on the business domain, certain constraints may apply.
  117. Stream time is the time at which an event reaches the analysis tier, while the event time is the time at which the event actually occurred. There can be some significant time skew between the two, making us consider this aspect when developing algorithms.
  119. Windows
  120. Because of the endless nature of the stream, we need a mechanism to have a view on the flowing data. Windows of data represent segments of the data stream that we can perform computation on.
  121. There are two fundamental attributes to windows: it’s trigger policy, meaning the condition that will notify the code that it’s time to process the data within (which can be temporal or spatial), and it’s eviction policy, which defines the rules used to decide if a data element should be evicted from the current window.
  122. There are two main types of window: sliding windows and tumbling windows.
  123. Sliding windows uses eviction and trigger policies based on time, like a 2 second window length and a 1 second sliding interval (trigger policy). This window would hold 2 seconds worth of data and trigger each second, having access to both the sliding interval and the whole window in the code.
  124. Tumbling windows use an eviction policy based on the window being full, and the trigger policy is either based on time or the count of times.
  126. Summarisation Techniques
  127. (Write on Summarisation)
  129. 6 - Storing data
  131. (Read Use Cases)
  132. (Write everything)
  134. 7 - Making the Data Available
  135. To deliver results to the client, we need to build a streaming data API. This layer will communicate with the data analysis store to get results and send them to the client. There are various factors to consider when building the API, like the communication pattern, the protocol used and the filtering that is done.
  137. Communication Patterns
  138. DataSync - The streaming API watches for changes in the data store and sends updates to the client. On initial connection all past data is sent.
  139. RMI/RPC - When the streaming API sees changes or when a certain condition is met, it calls a remote method on the client to update it’s data.
  140. Simple Messaging - The client requests the most recent data, the API receives it from store, and sends it to the client. The complexity in this pattern can vary, using various methods to reduce the number of update messages the client sends to the API.
  141. Publish/Subscribe - The client subscribes to a particular category and the API sends updates to all clients subscribed to the category of the changed data, reducing the burden of sending data to uninterested clients. There is also less burden on the client, who doesn’t have to send messages or keep state (the server needs to know the topics and connected users).
  143. Protocols
  144. WebHooks - Similar to a callback, the user registers a HTTP endpoint to be notified of changes when new data arrives or a certain condition is met. The Streaming API watches for changes and uses HTTP POST to notify the registered endpoint.
  145. HTTP Long Polling - The client maintains a connection with the API until there are updates. When the updates are sent from the API to the client, the client opens a new connection, and the cycle repeats.
  146. Server-sent events - This protocol improves Long Polling by using only one connection between the API and client and by having the capacity to work with a Push Proxy, which maintains the connection with the API when the client devices enters power saving mode and pushes the updates to the client through push messages to wake the client device.
  147. WebSockets - Duplex protocol that uses HTTP for a initial handshake and sends all update data through TCP, with the possibility of the client asking for a slower data rate (for example). This improves the efficiency of previous protocols and provides better fault-tolerance implementation options, because the client can contact the server.
  149. Filtering
  150. Filtering in the API layer is useful for only sending events and properties of those events that the client is interested in. As the data flows into the API, we can filter out events or properties the client doesn’t have an interest in. This is similar to a select clause with a where clause in the SQL world.
  151. We can have static filtering, where the queries to filter events are decided by the designers in advance and can’t be changed in runtime, and dynamic filtering, where the filtering is decided in runtime by the clients. There are methods to achieve both types of filtering with all of the protocols previously discussed
  153. 8 - Consumer devices
  154. When developing the client, meaning the component which consumes the streaming API and presents results to the costumers, there are certain considerations we need to take into account. Regardless if it’s a web browser, another stream processing system or a 3rd party system, some operations like aggregation and filtering and some concerns like fault-tolerance and delivery semantics have to be considered.
  156. Reading Fast Enough
  157. We need to make sure that the clients can read the stream fast enough, so there is no data loss. In case the client is lagging behind, the API has two options: keep the event in memory or blindly send it. Both of these solutions have downsides, with the memory option being limited by the available memory and the blind send option being limited by network buffers.
  158. The API can notify the client that it’s lagging behind and some action can be taken to minimize loss. If we’re using a third party API, we’re limited to the type of messages the API sends to the client. In the example of the Twitter API, the API sends warnings to the client, and terminates the connection if it continues to lag behind.
  159. If we’re building our own API we should have warnings for late clients and log the events and clients to later analyse possible problems. The action to take if the client continues to be behind is up to the developer.
  161. Maintaining State
  162. We may want to save the state where we left off or keep the results of client-side processing, meaning we have to save state in the client. We should not take client-side processing too far, and should push back processing to the stream analysis layer, but there are certain cases where client-side processing is justified.
  163. When building a web client there are two options for storage: web storage or IndexedDB.
  164. We can have local web storage (is persisted throughout sessions) or session web storage (only persists in sessions). Web storage doesn’t have order and there are size limitations.
  165. IndexedDB is designed for in-order retrieval of keys, efficient searching and in-order traversal of a large number of records. It should be the option when performing aggregations/computations in the client side.
  166. Our goal should be to make a client that is as stateless as possible, offloading state management to the streaming API.
  168. Mitigating Data Loss
  169. In terms of mitigating data loss, we have two places to worry about: when sending data from the API to the client and when sending data from the client to a UI or third party API. The solution is HBML (same as in chapter 2). With this method, we can ensure that we receive all messages from the API and all messages are sent to the third party API. If the third party API doesn’t support ACKs (necessary for RBML), we have to use a more inefficient method such as write-and-read, where we write the data to the third party API and read it subsequently, making sure it receive it correctly.
  171. Exactly-once Processing
  172. To make sure we only send one copy of the event to the UI or 3rd party API, we need to have exactly-once processing semantics, meaning there has to be ACKs in the third-party API and our stream API. We also have to keep all the data that was sent from the client in persistent storage, to make sure we only send one copy of the event. This increases client complexity, but it protects us from duplicates when the streaming API fails. Because we’re dealing with replicated streaming APIs, when dealing with failover to another streaming API we don’t want to have duplicates, meaning we have to store the processed messages in a distributed storage.
  174. Example of Client
  175. Imagining a real world application where our stream processing system processes sales of a certain item and we need to pay real-time royalties on the products sold, our client needs the two following requirements to be successful: it must only process a royalty exactly once and it must not lose a royalty.
  176. These first of these requirements can be achieved by only sending a payment request event one time to our client processor and making sure that we only receive one message from the streaming API. We can imagine a flow where the API sends a event, which is logged in a local store for RBML like RocksDB, then the event is processed and logged for SBML. Then, the event is sent for processing, and if there’s an ACK, we can safely remove the RBML event. Finally, an ACK is sent to the streaming API.
  177. The second requirement can also be accomplished without any changes, just making sure we write the incoming events to durable storage as soon as possible. Of course, we can still lost data if the disk where the RocksDB data fails. To mitigate this, we can use techniques like RAID or use a distributed store.
  179. Web Client
  180. Working with a web client relieves some of the problems we discussed earlier, as the operations are idempotent and not transactional like with a third-party API. Web clients are usually used to display stream results and we usually have to keep storage and do some computation.
  181. If our stream is flowing at high velocity, a browser-based client will fall behind eventually, but missing some data may not hurt the overall picture offered by the dashboard, meaning we may have some data loss in some cases.
  182. We need to store data in a storage like IndexedDB, and can store the latest values received from the API or the whole of the data. The benefit of storing the whole of the data is that when the client is started, we have some data to start with. We also have the option of using session or local storage. In the case we only need incremental changes instead of the running totals, where we need to keep track of updates and increment values accordingly.
  183. This is supported by all of the streaming API protocols, with some semantic differences between them.
  185. SQL Support
  186. When the customers want to query the stream results, they’ll probably want to use SQL, as it’s know by a lot of developers and business people. We need to provided SQL interface. If the streaming API supports SQL, our work is simplified. If we don’t have support for SQL, we may need to build a proxy API to apply SQL to the events coming in from the “real” streaming API and that acts as the normal API for the client. There are already solutions for this problem, like Apache Calcite.
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand