Guest User

Untitled

a guest
Nov 20th, 2018
107
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.18 KB | None | 0 0
  1. public class
  2. Main
  3. {
  4. public static void
  5. main( String[] args )
  6. {
  7. String docPath = args[ 0 ];
  8. String wcPath = args[ 1 ];
  9. String stopPath = args[ 2 ];
  10.  
  11. Properties properties = new Properties();
  12. AppProps.setApplicationJarClass( properties, Main.class );
  13. HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );
  14.  
  15. // create source and sink taps
  16. Tap docTap = new Hfs( new TextDelimited( true, "\t" ), docPath );
  17. Tap wcTap = new Hfs( new TextDelimited( true, "\t" ), wcPath );
  18.  
  19. Fields stop = new Fields( "stop" );
  20. Tap stopTap = new Hfs( new TextDelimited( stop, true, "\t" ), stopPath );
  21.  
  22. // specify a regex operation to split the "document" text lines into a token stream
  23. Fields token = new Fields( "token" );
  24. Fields text = new Fields( "text" );
  25. RegexSplitGenerator splitter = new RegexSplitGenerator( token, "[ \\[\\]\\(\\),.]" );
  26. Fields fieldSelector = new Fields( "doc_id", "token" );
  27. Pipe docPipe = new Each( "token", text, splitter, fieldSelector );
  28.  
  29. // define "ScrubFunction" to clean up the token stream
  30. Fields scrubArguments = new Fields( "doc_id", "token" );
  31. docPipe = new Each( docPipe, scrubArguments, new ScrubFunction( scrubArguments ), Fields.RESULTS );
  32.  
  33. // perform a left join to remove stop words, discarding the rows
  34. // which joined with stop words, i.e., were non-null after left join
  35. Pipe stopPipe = new Pipe( "stop" );
  36. Pipe tokenPipe = new HashJoin( docPipe, token, stopPipe, stop, new LeftJoin() );
  37. tokenPipe = new Each( tokenPipe, stop, new RegexFilter( "^$" ) );
  38.  
  39. // determine the word counts
  40. Pipe wcPipe = new Pipe( "wc", tokenPipe );
  41. wcPipe = new Retain( wcPipe, token );
  42. wcPipe = new GroupBy( wcPipe, token );
  43. wcPipe = new Every( wcPipe, Fields.ALL, new Count(), Fields.ALL );
  44.  
  45. // connect the taps, pipes, etc., into a flow
  46. FlowDef flowDef = FlowDef.flowDef()
  47. .setName( "wc" )
  48. .addSource( docPipe, docTap )
  49. .addSource( stopPipe, stopTap )
  50. .addTailSink( wcPipe, wcTap );
  51.  
  52. // write a DOT file and run the flow
  53. Flow wcFlow = flowConnector.connect( flowDef );
  54. wcFlow.writeDOT( "dot/wc.dot" );
  55. wcFlow.complete();
  56. }
  57. }
Add Comment
Please, Sign In to add comment