Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public class
- Main
- {
- public static void
- main( String[] args )
- {
- String docPath = args[ 0 ];
- String wcPath = args[ 1 ];
- String stopPath = args[ 2 ];
- Properties properties = new Properties();
- AppProps.setApplicationJarClass( properties, Main.class );
- HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );
- // create source and sink taps
- Tap docTap = new Hfs( new TextDelimited( true, "\t" ), docPath );
- Tap wcTap = new Hfs( new TextDelimited( true, "\t" ), wcPath );
- Fields stop = new Fields( "stop" );
- Tap stopTap = new Hfs( new TextDelimited( stop, true, "\t" ), stopPath );
- // specify a regex operation to split the "document" text lines into a token stream
- Fields token = new Fields( "token" );
- Fields text = new Fields( "text" );
- RegexSplitGenerator splitter = new RegexSplitGenerator( token, "[ \\[\\]\\(\\),.]" );
- Fields fieldSelector = new Fields( "doc_id", "token" );
- Pipe docPipe = new Each( "token", text, splitter, fieldSelector );
- // define "ScrubFunction" to clean up the token stream
- Fields scrubArguments = new Fields( "doc_id", "token" );
- docPipe = new Each( docPipe, scrubArguments, new ScrubFunction( scrubArguments ), Fields.RESULTS );
- // perform a left join to remove stop words, discarding the rows
- // which joined with stop words, i.e., were non-null after left join
- Pipe stopPipe = new Pipe( "stop" );
- Pipe tokenPipe = new HashJoin( docPipe, token, stopPipe, stop, new LeftJoin() );
- tokenPipe = new Each( tokenPipe, stop, new RegexFilter( "^$" ) );
- // determine the word counts
- Pipe wcPipe = new Pipe( "wc", tokenPipe );
- wcPipe = new Retain( wcPipe, token );
- wcPipe = new GroupBy( wcPipe, token );
- wcPipe = new Every( wcPipe, Fields.ALL, new Count(), Fields.ALL );
- // connect the taps, pipes, etc., into a flow
- FlowDef flowDef = FlowDef.flowDef()
- .setName( "wc" )
- .addSource( docPipe, docTap )
- .addSource( stopPipe, stopTap )
- .addTailSink( wcPipe, wcTap );
- // write a DOT file and run the flow
- Flow wcFlow = flowConnector.connect( flowDef );
- wcFlow.writeDOT( "dot/wc.dot" );
- wcFlow.complete();
- }
- }
Add Comment
Please, Sign In to add comment