Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- @Rule
- public final transient TestPipeline pipeline = TestPipeline.create();
- @Test
- @Category(NeedsRunner.class)
- public void testPipeline() {
- String guid = "user1";
- TimestampedValue<PageLoadEvent> homepage = makePageView(1, "HomePage", "homepage", guid);
- TimestampedValue<PageLoadEvent> productDetails = makePageView(1, "Product Details", "product_details", guid);
- IntervalWindow window1 = new IntervalWindow(new Instant(1), new Instant(3));
- TestStream<PageLoadEvent> testStream = TestStream.create(ProtoCoder.of(PageLoadEvent.class))
- .addElements(homepage)
- .addElements(productDetails)
- .advanceWatermarkTo(new Instant(5))
- .advanceWatermarkToInfinity();
- PCollection<PageLoadEvent> input = pipeline.apply(testStream);
- PCollection<KV<UniqueUserKey, PageLoadEvent>> firstTransform = input.apply("Group Events by Guid", ParDo.of(new SingleTransformations.AssignUniqueUserCriteriaToPageLoadEvent()))
- .apply("Enforce window", Window.into(Sessions.withGapDuration(new Duration(2))));
- PAssert.that(firstTransform).inWindow(window1).containsInAnyOrder(
- KV.of(makeUniqueUserKey(guid), homepage.getValue()),
- KV.of(makeUniqueUserKey(guid), productDetails.getValue())
- );
- pipeline.run().waitUntilFinish();
- }
Add Comment
Please, Sign In to add comment