Advertisement
Guest User

Untitled

a guest
Jul 15th, 2019
171
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Kotlin 3.54 KB | None | 0 0
  1. package ai.scynet.core
  2.  
  3. import ai.scynet.core.annotations.Inputs
  4. import ai.scynet.core.annotations.Output
  5. import ai.scynet.core.annotations.Type
  6. import ai.scynet.core.descriptors.ProcessorDescriptor
  7. import ai.scynet.core.processors.IgniteStream
  8. import ai.scynet.core.processors.Processor
  9. import ai.scynet.core.processors.Stream
  10. import kotlinx.coroutines.coroutineScope
  11. import kotlinx.coroutines.delay
  12. import kotlinx.coroutines.launch
  13. import org.apache.ignite.Ignite
  14. import org.apache.ignite.Ignition
  15. import org.apache.ignite.cluster.ClusterNode
  16. import org.apache.ignite.compute.*
  17. import org.apache.ignite.lang.IgniteFuture
  18. import org.apache.ignite.resources.TaskContinuousMapperResource
  19. import org.koin.core.KoinComponent
  20. import org.koin.core.inject
  21. import java.nio.file.Files
  22. import java.nio.file.Paths
  23. import java.util.*
  24. import java.util.concurrent.atomic.AtomicInteger
  25.  
  26. @ComputeTaskName("BasicConsumerTask")
  27. class BasicConsumerTask : ComputeTaskAdapter<Ignite, String>() {
  28.  
  29.     @TaskContinuousMapperResource
  30.     lateinit var mapper: ComputeTaskContinuousMapper
  31.  
  32.     override fun map(subgrid: MutableList<ClusterNode>?, ignite: Ignite?): MutableMap<out ComputeJob, ClusterNode> {
  33.         if(ignite == null) return mutableMapOf()
  34.  
  35.         var counter = AtomicInteger(0)
  36.  
  37.  
  38.  
  39.  
  40.         ignite.message().localListen("text") { id, msg ->
  41.  
  42.             mapper.send(object : ComputeJob {
  43.                 override fun cancel() {
  44.  
  45.                 }
  46.  
  47.                 override fun execute(): Any {
  48.                     println(counter.incrementAndGet())
  49.                     return (msg as String).split(" ").lastIndex
  50.                 }
  51.             })
  52.             true
  53.         }
  54.  
  55.         mapper.send(object : ComputeJob {
  56.             override fun cancel() {
  57.  
  58.             }
  59.  
  60.             override fun execute(): Any {
  61.                 println(counter.incrementAndGet())
  62.                 return counter.get()
  63.             }
  64.         })
  65.  
  66.  
  67.         mapper.send(object : ComputeJob {
  68.             override fun cancel() {
  69.  
  70.             }
  71.  
  72.             override fun execute(): Any {
  73.                 println(counter.incrementAndGet())
  74.                 return counter.get()
  75.             }
  76.         })
  77.  
  78.         mapper.send(object : ComputeJob {
  79.             override fun cancel() {
  80.  
  81.             }
  82.  
  83.             override fun execute(): Any {
  84.                 println(counter.incrementAndGet())
  85.                 return counter.get()
  86.             }
  87.         })
  88.  
  89.         mapper.send(object : ComputeJob {
  90.             override fun cancel() {
  91.  
  92.             }
  93.  
  94.             override fun execute(): Any {
  95.                 println(counter.incrementAndGet())
  96.                 return counter.get()
  97.             }
  98.         })
  99.  
  100.         return mutableMapOf()
  101.     }
  102.  
  103.     override fun reduce(results: MutableList<ComputeJobResult>?): String? {
  104.         return results!!.map { it.getData() as Int }.reduce { a, b -> a + b }.toString()
  105.     }
  106. }
  107.  
  108. suspend fun main() {
  109.  
  110.     coroutineScope {
  111.         var ignite = Ignition.start()
  112.  
  113.         ignite.compute().localDeployTask(BasicConsumerTask::class.java, ClassLoader.getPlatformClassLoader())
  114.  
  115.  
  116.  
  117.  
  118.         launch {
  119.             ignite.message().send("text", "Hello world")
  120.             ignite.message().send("text", "Hello world")
  121.             ignite.message().send("text", "Hello world")
  122.             ignite.message().send("text", "Hello world")
  123.             ignite.message().send("text", "Hello world")
  124.             ignite.message().send("text", "Hello world")
  125.             delay(1000)
  126.  
  127.             var future = ignite.compute().executeAsync<Ignite, String>("BasicConsumerTask", ignite)
  128.             future.listen {
  129.                 println(it.get())
  130.             }
  131.  
  132.             ignite.message().send("text", "Hello world")
  133.             ignite.message().send("text", "Hello world")
  134.             ignite.message().send("text", "Hello world")
  135.             ignite.message().send("text", "Hello world")
  136.  
  137.             delay(1000)
  138.  
  139.             ignite.message().send("text", "Hello world")
  140.             ignite.message().send("text", "Hello world")
  141.             ignite.message().send("text", "Hello world")
  142.             ignite.message().send("text", "Hello world")
  143.  
  144.  
  145.         }
  146.  
  147.  
  148.     }
  149. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement