Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- -- Create an output stream with four columns, which is used to send IoT data to the destination
- CREATE OR REPLACE STREAM "DESTINATION_SQL_BASIC_STREAM" (avgspeed INT, userId VARCHAR(8), maxacceleration INT, ratetype VARCHAR(8),datetime timestamp);
- -- Create a pump that continuously selects from the source stream and inserts it into the output data stream
- CREATE OR REPLACE PUMP "STREAM_PUMP_001" AS INSERT INTO "DESTINATION_SQL_BASIC_STREAM"
- -- Filter specific columns from the source stream
- SELECT STREAM avgspeed, userId,maxacceleration,ratetype,datetime FROM "SOURCE_SQL_STREAM_001";
- -- ** Continuous Filter **
- -- Performs a continuous filter based on a WHERE condition.
- -- .----------. .----------. .----------.
- -- | SOURCE | | INSERT | | DESTIN. |
- -- Source-->| STREAM |-->| & SELECT |-->| STREAM |-->Destination
- -- | | | (PUMP) | | |
- -- '----------' '----------' '----------'
- -- STREAM (in-application): a continuously updated entity that you can SELECT from and INSERT into like a TABLE
- -- PUMP: an entity used to continuously 'SELECT ... FROM' a source STREAM, and INSERT SQL results into an output STREAM
- -- Create output stream, which can be used to send to a destination
- CREATE OR REPLACE STREAM "INTERMIDIATE_SQL_STREAM" (avgspeed INT, maxacceleration INT, userId VARCHAR(8));
- -- Create pump to insert into output
- CREATE OR REPLACE PUMP "STREAM_PUMP_002" AS INSERT INTO "INTERMIDIATE_SQL_STREAM"
- -- Select all columns from source stream
- SELECT STREAM avgspeed,maxacceleration, userId
- FROM "SOURCE_SQL_STREAM_001"
- -- LIKE compares a string to a string pattern (_ matches all char, % matches substring)
- -- SIMILAR TO compares string to a regex, may use ESCAPE
- WHERE ratetype ='HIGH';
- -- ** Aggregate (COUNT, AVG, etc.) + Sliding time window **
- -- Performs function on the aggregate rows over a 10 second sliding window for a specified column.
- -- .----------. .----------. .----------.
- -- | SOURCE | | INSERT | | DESTIN. |
- -- Source-->| STREAM |-->| & SELECT |-->| STREAM |-->Destination
- -- | | | (PUMP) | | |
- -- '----------' '----------' '----------'
- -- STREAM (in-application): a continuously updated entity that you can SELECT from and INSERT into like a TABLE
- -- PUMP: an entity used to continuously 'SELECT ... FROM' a source STREAM, and INSERT SQL results into an output STREAM
- -- Create output stream, which can be used to send to a destination
- CREATE OR REPLACE STREAM "DESTINATION_SQL_HHR_STREAM" (userId VARCHAR(8), userId_high_count INTEGER);
- -- Create a pump which continuously selects from a source stream (SOURCE_SQL_STREAM_001)
- -- performs an aggregate count that is grouped by columns ticker over a 10-second sliding window
- CREATE OR REPLACE PUMP "STREAM_PUMP_003" AS INSERT INTO "DESTINATION_SQL_HHR_STREAM"
- -- COUNT|AVG|MAX|MIN|SUM|STDDEV_POP|STDDEV_SAMP|VAR_POP|VAR_SAMP)
- SELECT STREAM *
- FROM (
- SELECT STREAM
- userId,
- COUNT(*) OVER THIRTY_SECOND_SLIDING_WINDOW AS userId_high_count
- FROM "INTERMIDIATE_SQL_STREAM"
- -- Results partitioned by ticker_symbol and a 10-second sliding time window
- WINDOW THIRTY_SECOND_SLIDING_WINDOW AS (
- PARTITION BY userId
- RANGE INTERVAL '30' SECOND PRECEDING)
- ) AS a
- WHERE (userId_high_count >2);
Add Comment
Please, Sign In to add comment