Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- def isHeader(s: String):Boolean = {
- s.take(1) == "m"
- }
- def cs(pair: ( (String, String), (List[String], List[String]) ) ) = {
- val msgs1 = pair._2._1.toSet
- val msgs2 = pair._2._2.toSet
- val numer = msgs1.intersect(msgs2).size
- val denom = Math.sqrt(msgs1.size*msgs2.size)
- (pair._1._1, pair._1._2, numer / denom)
- }
- def to_csv_cos(t: (String, String, Double)): String = {
- t._1 ++ "," ++ t._2 ++ "," ++ t._3.toString
- }
- val messages = sc.textFile("....csv")
- val msgData1 = messages.filter(x => !isHeader(x))
- val data = msgData1.map(x => x.split(','))
- val pairs = data.map(x => (x(0), List(x(1)))).reduceByKey((a,b) => a ++ b).flatMap(x => x._2.combinations(2).toList).map(x => (x(0), x(1)))
- val msgs = data.map( x => (x(1), List(x(0)))).reduceByKey((a,b) => a++b)
- val pairs_mapped = pairs.join(msgs).map{
- case (x, (y,z)) => (y, (x,z))
- }.join(msgs).map{
- case (x, ( (y,z),t) ) => ( (x,y), (t,z) )
- }
- val res = pairs_mapped.map{
- x => cs(x)
- }.map(x => to_csv_cos(x)).saveAsTextFile("F:\\Scala\\night_result6")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement