Guest User

Untitled

a guest
Feb 25th, 2018
69
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.28 KB | None | 0 0
  1. @Rule
  2. public final transient TestPipeline pipeline = TestPipeline.create();
  3.  
  4. @Test
  5. @Category(NeedsRunner.class)
  6. public void testPipeline() {
  7. String guid = "user1";
  8. TimestampedValue<PageLoadEvent> homepage = makePageView(1, "HomePage", "homepage", guid);
  9. TimestampedValue<PageLoadEvent> productDetails = makePageView(1, "Product Details", "product_details", guid);
  10. IntervalWindow window1 = new IntervalWindow(new Instant(1), new Instant(3));
  11.  
  12. TestStream<PageLoadEvent> testStream = TestStream.create(ProtoCoder.of(PageLoadEvent.class))
  13. .addElements(homepage)
  14. .addElements(productDetails)
  15. .advanceWatermarkTo(new Instant(5))
  16. .advanceWatermarkToInfinity();
  17.  
  18. PCollection<PageLoadEvent> input = pipeline.apply(testStream);
  19. PCollection<KV<UniqueUserKey, PageLoadEvent>> firstTransform = input.apply("Group Events by Guid", ParDo.of(new SingleTransformations.AssignUniqueUserCriteriaToPageLoadEvent()))
  20. .apply("Enforce window", Window.into(Sessions.withGapDuration(new Duration(2))));
  21. PAssert.that(firstTransform).inWindow(window1).containsInAnyOrder(
  22. KV.of(makeUniqueUserKey(guid), homepage.getValue()),
  23. KV.of(makeUniqueUserKey(guid), productDetails.getValue())
  24. );
  25. pipeline.run().waitUntilFinish();
  26. }
Add Comment
Please, Sign In to add comment