Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- namespace com.ibm.daell ;
- composite LineCount
- {
- type
- LineCount_t = rstring fileName, int32 lineCntl ;
- graph
- (stream<rstring fileName> FileFoundInDir) = DirectoryScan()
- {
- param
- directory : getSubmissionTimeValue("DirName") ;
- ignoreDotFiles : true ;
- sleepTime : 10.0 ;
- pattern : ".*\\..*" ; //This should be in regex format
- }
- (stream<rstring fileName> FileFoundInDir2) =DirectoryScan()
- {
- param
- directory : getSubmissionTimeValue("DirName2");
- ignoreDotFiles : true ;
- sleepTime : 10.0 ;
- pattern : ".*\\..*" ; //This should be in regex format
- }
- (stream<rstring line, rstring fileName> Line) = FileSource(FileFoundInDir)
- {
- param
- format : line ;
- output
- Line : fileName = FileName() ;
- }
- (stream<rstring line, rstring fileName> Line2) = FileSource(FileFoundInDir2)
- {
- param
- format : line ;
- output
- Line2 : fileName = FileName() ;
- }
- stream<LineCount_t> LineCount as LC = Custom(Line ;Line2)
- {
- logic
- state :
- {
- mutable int32 _cnt = 0 ;
- mutable rstring _fileName = "" ;
- mutable int32 _cnt2 = 0 ;
- mutable rstring _fileName2 = "" ;
- }
- onTuple Line :
- {
- _cnt ++ ;
- _fileName = Line.fileName ;
- }
- onPunct Line :
- {
- if(currentPunct() == Sys.WindowMarker)
- {
- mutable LineCount_t oTuple = { } ;
- oTuple.fileName = _fileName ;
- oTuple.lineCntl = _cnt ;
- submit(oTuple, LC) ;
- _cnt = 0 ;
- _fileName = "" ;
- }
- }
- onTuple Line2 :
- {
- _cnt2 ++ ;
- _fileName2 = Line2.fileName ;
- }
- onPunct Line2 :
- {
- if(currentPunct() == Sys.WindowMarker)
- {
- mutable LineCount_t oTuple = { } ;
- oTuple.fileName = _fileName2 ;
- oTuple.lineCntl = _cnt2 ;
- submit(oTuple, LC) ;
- _cnt2 = 0 ;
- _fileName2 = "" ;
- }
- }
- }
- () as MySink1 = FileSink(LineCount)
- {
- param
- file : "/dev/stdout" ;
- flush : 1u ;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement