Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package ai.scynet.core
- import ai.scynet.core.annotations.Inputs
- import ai.scynet.core.annotations.Output
- import ai.scynet.core.annotations.Type
- import ai.scynet.core.descriptors.ProcessorDescriptor
- import ai.scynet.core.processors.IgniteStream
- import ai.scynet.core.processors.Processor
- import ai.scynet.core.processors.Stream
- import kotlinx.coroutines.coroutineScope
- import kotlinx.coroutines.delay
- import kotlinx.coroutines.launch
- import org.apache.ignite.Ignite
- import org.apache.ignite.Ignition
- import org.apache.ignite.cluster.ClusterNode
- import org.apache.ignite.compute.*
- import org.apache.ignite.lang.IgniteFuture
- import org.apache.ignite.resources.TaskContinuousMapperResource
- import org.koin.core.KoinComponent
- import org.koin.core.inject
- import java.nio.file.Files
- import java.nio.file.Paths
- import java.util.*
- import java.util.concurrent.atomic.AtomicInteger
- @ComputeTaskName("BasicConsumerTask")
- class BasicConsumerTask : ComputeTaskAdapter<Ignite, String>() {
- @TaskContinuousMapperResource
- lateinit var mapper: ComputeTaskContinuousMapper
- override fun map(subgrid: MutableList<ClusterNode>?, ignite: Ignite?): MutableMap<out ComputeJob, ClusterNode> {
- if(ignite == null) return mutableMapOf()
- var counter = AtomicInteger(0)
- ignite.message().localListen("text") { id, msg ->
- mapper.send(object : ComputeJob {
- override fun cancel() {
- }
- override fun execute(): Any {
- println(counter.incrementAndGet())
- return (msg as String).split(" ").lastIndex
- }
- })
- true
- }
- mapper.send(object : ComputeJob {
- override fun cancel() {
- }
- override fun execute(): Any {
- println(counter.incrementAndGet())
- return counter.get()
- }
- })
- mapper.send(object : ComputeJob {
- override fun cancel() {
- }
- override fun execute(): Any {
- println(counter.incrementAndGet())
- return counter.get()
- }
- })
- mapper.send(object : ComputeJob {
- override fun cancel() {
- }
- override fun execute(): Any {
- println(counter.incrementAndGet())
- return counter.get()
- }
- })
- mapper.send(object : ComputeJob {
- override fun cancel() {
- }
- override fun execute(): Any {
- println(counter.incrementAndGet())
- return counter.get()
- }
- })
- return mutableMapOf()
- }
- override fun reduce(results: MutableList<ComputeJobResult>?): String? {
- return results!!.map { it.getData() as Int }.reduce { a, b -> a + b }.toString()
- }
- }
- suspend fun main() {
- coroutineScope {
- var ignite = Ignition.start()
- ignite.compute().localDeployTask(BasicConsumerTask::class.java, ClassLoader.getPlatformClassLoader())
- launch {
- ignite.message().send("text", "Hello world")
- ignite.message().send("text", "Hello world")
- ignite.message().send("text", "Hello world")
- ignite.message().send("text", "Hello world")
- ignite.message().send("text", "Hello world")
- ignite.message().send("text", "Hello world")
- delay(1000)
- var future = ignite.compute().executeAsync<Ignite, String>("BasicConsumerTask", ignite)
- future.listen {
- println(it.get())
- }
- ignite.message().send("text", "Hello world")
- ignite.message().send("text", "Hello world")
- ignite.message().send("text", "Hello world")
- ignite.message().send("text", "Hello world")
- delay(1000)
- ignite.message().send("text", "Hello world")
- ignite.message().send("text", "Hello world")
- ignite.message().send("text", "Hello world")
- ignite.message().send("text", "Hello world")
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement