Advertisement
Guest User

Untitled

a guest
Jul 17th, 2019
80
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Kotlin 3.59 KB | None | 0 0
  1. import ai.scynet.core.annotations.Inputs
  2. import ai.scynet.core.annotations.Output
  3. import ai.scynet.core.annotations.Type
  4. import ai.scynet.core.descriptors.ProcessorDescriptor
  5. import ai.scynet.core.processors.IgniteStream
  6. import ai.scynet.core.processors.Processor
  7. import ai.scynet.core.processors.Stream
  8. import kotlinx.coroutines.coroutineScope
  9. import kotlinx.coroutines.delay
  10. import kotlinx.coroutines.launch
  11. import org.apache.ignite.Ignite
  12. import org.apache.ignite.Ignition
  13. import org.apache.ignite.cache.CacheEntryProcessor
  14. import org.apache.ignite.cluster.ClusterNode
  15. import org.apache.ignite.compute.*
  16. import org.apache.ignite.configuration.IgniteConfiguration
  17. import org.apache.ignite.events.EventType
  18. import org.apache.ignite.lang.IgniteFuture
  19. import org.apache.ignite.resources.*
  20. import org.apache.ignite.services.Service
  21. import org.apache.ignite.services.ServiceContext
  22.  
  23. import org.koin.core.KoinComponent
  24. import org.koin.core.inject
  25. import java.nio.file.Files
  26. import java.nio.file.Paths
  27. import java.sql.Timestamp
  28. import java.util.*
  29. import java.util.concurrent.ConcurrentLinkedQueue
  30. import java.util.concurrent.LinkedBlockingDeque
  31. import java.util.concurrent.atomic.AtomicInteger
  32. import javax.cache.processor.MutableEntry
  33. import org.apache.ignite.compute.ComputeJobContext
  34. import org.apache.ignite.lang.IgniteBiPredicate
  35. import org.apache.ignite.lang.IgniteClosure
  36. import org.apache.ignite.resources.JobContextResource
  37.  
  38.  
  39.  
  40. @ComputeTaskName("BasicConsumerTask")
  41. @ComputeTaskMapAsync
  42. class BasicConsumerTask : ComputeTaskAdapter<String, String>() {
  43.  
  44.     @IgniteInstanceResource
  45.     lateinit var ignite: Ignite
  46.  
  47.     @TaskContinuousMapperResource
  48.     lateinit var mapper: ComputeTaskContinuousMapper
  49.  
  50.     override fun map(subgrid: MutableList<ClusterNode>?, string: String?): MutableMap<out ComputeJob, ClusterNode> {
  51.         return mutableMapOf(Pair(object : ComputeJob {
  52.             @JobContextResource
  53.             lateinit var jobCtx: ComputeJobContext
  54.             var state = AtomicInteger(0)
  55.  
  56.             var listener = IgniteBiPredicate<UUID, String> { id, msg ->
  57.                 println("Listen")
  58.  
  59.                 mapper.send(object : ComputeJob {
  60.                     @TaskSessionResource
  61.                     lateinit var session: ComputeTaskSession
  62.  
  63.                     override fun cancel() {
  64.                         session.setAttribute("cancel", true)
  65.                     }
  66.  
  67.                     override fun execute(): Any {
  68.                         println(msg)
  69.                         return (msg).split(" ").lastIndex
  70.                     }
  71.                 })
  72.                 true
  73.             }
  74.  
  75.             override fun cancel() {
  76.                 println("Cancel")
  77.  
  78.                 ignite.message().stopLocalListen("text", listener)
  79.                 jobCtx.callcc()
  80.             }
  81.  
  82.             override fun execute(): Any? {
  83.                 if(state.getAndIncrement() == 0){
  84.                     println("jobCtx: $jobCtx")
  85.                     ignite.message().localListen("text", listener)
  86.                     return jobCtx.holdcc<Unit>()
  87.                 }
  88.  
  89.                 println("Hello world")
  90.                 return 0
  91.             }
  92.         }, subgrid!![0]))
  93.     }
  94.  
  95.     override fun reduce(results: MutableList<ComputeJobResult>?): String? {
  96.  
  97.         return results!!.map { it.getData() as Int }.reduce { a, b -> a + b }.toString()
  98.     }
  99. }
  100.  
  101. interface S {
  102.     fun run(it: String)
  103. }
  104.  
  105.  
  106. fun main() {
  107.     var cfg = IgniteConfiguration()
  108.     cfg.setPeerClassLoadingEnabled(true)
  109.     cfg.setIncludeEventTypes(*EventType.EVTS_ALL_MINUS_METRIC_UPDATE)
  110.     cfg.igniteInstanceName = "SHIT"
  111.  
  112.     var ignite = Ignition.start(cfg)
  113.     ignite.cluster().active()
  114.  
  115.     ignite.compute().localDeployTask(BasicConsumerTask::class.java, BasicConsumerTask::class.java.classLoader)
  116.  
  117.     var future = ignite.compute().executeAsync<String, String>("BasicConsumerTask", "random")
  118.  
  119.  
  120.     while(true) {
  121.         var line = readLine()
  122.         if (line == "exit") {
  123.             future.cancel()
  124.  
  125.         } else {
  126.             ignite.message().send("text", line)
  127.  
  128.         }
  129.     }
  130. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement