
Untitled
By: a guest on
May 8th, 2012 | syntax:
None | size: 1.11 KB | hits: 10 | expires: Never
(ns tester.namespace
(:use cascalog.api
[clojure.string :only (join)])
(:require [cascalog.ops :as c])
(:import [org.apache.hadoop.io.serializer JavaSerialization]))
(defmacro with-serializations
[serial-vec & forms]
`(with-job-conf
{"io.serializations"
~(->> serial-vec
(map #(if (string? %) % (.getName (resolve %))))
(concat ["cascading.tuple.hadoop.BytesSerialization"
"cascading.tuple.hadoop.TupleSerialization"
"org.apache.hadoop.io.serializer.WritableSerialization"])
(join ","))}
~@forms))
(defn count-if-something-init
[[word tester]]
(if (= tester word) 1 0))
(defparallelagg count-if-something*
:init-var #'count-if-something-init
:combine-var #'+)
(defn vectorize [& tuple] [tuple])
(defn count-if-something [word]
(<- [?word :> ?count]
(vectorize ?word word :> ?v)
(count-if-something* ?v :> ?count)))
(with-serializations [JavaSerialization]
(?<- (stdout) [?count]
([["awesome"] ["aaa"] ["awesome"] ["bbb"]] ?word)
((count-if-something "awesome") ?word :> ?count)))