Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- (Got exception while validating test topics. Waiting and retrying.,kafka.common.LeaderNotAvailableException)
- 3557 [Test worker] INFO org.apache.samza.system.kafka.KafkaSystemAdmin$ - Got metadata: Map(input -> SystemStreamMetadata [streamName=input, partitionMetadata={Partition [partition=0]=SystemStreamPartitionMetadata [oldestOffset=null, newestOffset=null, upcomingOffset=0]}])
- 3578 [Test worker] INFO org.apache.samza.job.local.LocalJobFactory - got partitions for job Set(SystemStreamPartition [partition=Partition [partition=0], system=kafka, stream=input])
- 3585 [Test worker] INFO org.apache.samza.job.local.LocalJobFactory - No config specified for task.command.class. Defaulting to ThreadJob, which is only meant for debugging.
- 3608 [Test worker] INFO org.apache.samza.container.SamzaContainer$ - Setting up Samza container: local-task
- 3613 [Test worker] INFO org.apache.samza.container.SamzaContainer$ - Using streams and partitions: Set(SystemStreamPartition [partition=Partition [partition=0], system=kafka, stream=input])
- 3617 [Test worker] INFO org.apache.samza.container.SamzaContainer$ - Using configuration: {task.inputs=kafka.input, job.factory.class=org.apache.samza.job.local.LocalJobFactory, stores.mystore.msg.serde=string, job.name=hello-stateful-world, systems.kafka.consumer.zookeeper.connect=127.0.0.1:58388, systems.kafka.consumer.auto.offset.reset=largest, systems.kafka.samza.msg.serde=string, stores.mystore.changelog=kafka.mystore, stores.mystore.factory=org.apache.samza.storage.kv.KeyValueStorageEngineFactory, serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory, systems.kafka.samza.offset.default=oldest, task.class=org.apache.samza.test.integration.TestTask, systems.kafka.producer.metadata.broker.list=localhost:53077, systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory, stores.mystore.key.serde=string}
- 3659 [Test worker] INFO org.apache.samza.container.SamzaContainer$ - Got system names: Set(kafka)
- 3683 [Test worker] INFO org.apache.samza.container.SamzaContainer$ - Got serde names: Set(string)
- 3712 [Test worker] INFO org.apache.samza.container.SamzaContainer$ - Got system factories: Set(kafka)
- 3730 [Test worker] INFO org.apache.samza.system.kafka.KafkaSystemAdmin$ - Got metadata: Map(input -> SystemStreamMetadata [streamName=input, partitionMetadata={Partition [partition=0]=SystemStreamPartitionMetadata [oldestOffset=null, newestOffset=null, upcomingOffset=0]}])
- 3734 [Test worker] INFO org.apache.samza.container.SamzaContainer$ - Got input stream metadata: Map(SystemStream [system=kafka, stream=input] -> SystemStreamMetadata [streamName=input, partitionMetadata={Partition [partition=0]=SystemStreamPartitionMetadata [oldestOffset=null, newestOffset=null, upcomingOffset=0]}])
- 3918 [Test worker] INFO org.apache.samza.container.SamzaContainer$ - Got system consumers: Set(kafka)
- 3953 [Test worker] INFO org.apache.samza.container.SamzaContainer$ - Got system producers: Set(kafka)
- 3972 [Test worker] INFO org.apache.samza.container.SamzaContainer$ - Got serdes: Set(string)
- 4049 [Test worker] INFO org.apache.samza.container.SamzaContainer$ - Got change log system streams: Map(mystore -> SystemStream [system=kafka, stream=mystore])
- 4062 [Test worker] INFO org.apache.samza.system.kafka.KafkaSystemAdmin$ - Got metadata: Map(mystore -> SystemStreamMetadata [streamName=mystore, partitionMetadata={Partition [partition=0]=SystemStreamPartitionMetadata [oldestOffset=null, newestOffset=null, upcomingOffset=0]}])
- 4064 [Test worker] INFO org.apache.samza.container.SamzaContainer$ - Got change log stream metadata: Map(SystemStream [system=kafka, stream=mystore] -> SystemStreamMetadata [streamName=mystore, partitionMetadata={Partition [partition=0]=SystemStreamPartitionMetadata [oldestOffset=null, newestOffset=null, upcomingOffset=0]}])
- 4067 [Test worker] INFO org.apache.samza.container.SamzaContainer$ - Setting up JVM metrics.
- 4086 [Test worker] INFO org.apache.samza.container.SamzaContainer$ - Setting up message chooser.
- 4155 [Test worker] INFO org.apache.samza.system.chooser.DefaultChooser - Building default chooser with: useBatching=false, useBootstrapping=false, usePriority=false
- 4157 [Test worker] INFO org.apache.samza.container.SamzaContainer$ - Setting up metrics reporters.
- 4169 [Test worker] INFO org.apache.samza.container.SamzaContainer$ - Got metrics reporters: Set()
- 4179 [Test worker] INFO org.apache.samza.container.SamzaContainer$ - Got checkpoint manager: null
- 4211 [Test worker] INFO org.apache.samza.container.SamzaContainer$ - Got offset manager: org.apache.samza.checkpoint.OffsetManager@33cfa965
- 4261 [Test worker] INFO org.apache.samza.container.SamzaContainer$ - No lifecycle listeners found
- 4268 [Test worker] INFO org.apache.samza.container.SamzaContainer$ - Got storage engine base directory: /home/cloudera/Samza/test/incubator-samza/samza-test/state
- 4281 [Test worker] INFO org.apache.samza.container.SamzaContainer$ - Got storage engines: Set(mystore)
- 4288 [Test worker] INFO org.apache.samza.container.SamzaContainer$ - Got stream task class: org.apache.samza.test.integration.TestTask
- 4291 [Test worker] INFO org.apache.samza.container.SamzaContainer$ - Got window milliseconds: -1
- 4299 [Test worker] INFO org.apache.samza.container.SamzaContainer$ - Got commit milliseconds: 60000
- 4362 [Test worker] INFO org.apache.samza.container.SamzaContainer$ - Got store consumers: Map(mystore -> org.apache.samza.system.kafka.KafkaSystemConsumer@46be6108)
- 4489 [Test worker] INFO org.apache.samza.container.SamzaContainer$ - Got task stores: Map(mystore -> org.apache.samza.storage.kv.KeyValueStorageEngine@145d424)
- 4502 [Test worker] INFO org.apache.samza.container.SamzaContainer$ - Assigning oldest change log offsets for partition Partition [partition=0]: Map(SystemStream [system=kafka, stream=mystore] -> null)
- 4522 [Test worker] INFO org.apache.samza.container.SamzaContainer$ - Assigning SystemStreams Set(SystemStream [system=kafka, stream=input]) to Partition [partition=0]
- 4543 [Test worker] INFO org.apache.samza.container.SamzaContainer$ - Samza container setup complete.
- 4564 [ThreadJob] INFO org.apache.samza.container.SamzaContainer - Starting container.
- 4566 [ThreadJob] INFO org.apache.samza.container.SamzaContainer - Registering task instances with metrics.
- 4574 [ThreadJob] INFO org.apache.samza.container.SamzaContainer - Starting JVM metrics.
- 4584 [ThreadJob] INFO org.apache.samza.container.SamzaContainer - Starting metrics reporters.
- 4587 [ThreadJob] INFO org.apache.samza.container.SamzaContainer - Registering task instances with offsets.
- 4599 [ThreadJob] INFO org.apache.samza.container.SamzaContainer - Starting offset manager.
- 4649 [ThreadJob] INFO org.apache.samza.checkpoint.OffsetManager - Successfully loaded last processed offsets: Map()
- 4654 [ThreadJob] INFO org.apache.samza.checkpoint.OffsetManager - Successfully loaded starting offsets: Map(SystemStreamPartition [partition=Partition [partition=0], system=kafka, stream=input] -> null)
- 4657 [ThreadJob] INFO org.apache.samza.container.SamzaContainer - Starting task instance stores.
- 4684 [ThreadJob] INFO org.apache.samza.storage.TaskStorageManager - Skipping change log restoration for SystemStreamPartition [partition=Partition [partition=0], system=kafka, stream=mystore] because stream appears to be empty (offset was null).
- 4725 [ThreadJob] INFO org.apache.samza.container.SamzaContainer - Initializing stream tasks.
- TestTask.init(): Set()
- 4905 [ThreadJob] INFO org.apache.samza.container.SamzaContainer - Registering task instances with producers.
- 4949 [ThreadJob] INFO org.apache.samza.container.SamzaContainer - Starting producer multiplexer.
- 4965 [ThreadJob] INFO org.apache.samza.container.SamzaContainer - Registering task instances with consumers.
- 5087 [ThreadJob] INFO org.apache.samza.container.SamzaContainer - Starting consumer multiplexer.
- 5165 [ThreadJob] INFO org.apache.samza.system.kafka.BrokerProxy - Creating new SimpleConsumer for host localhost:53077 for system kafka
- 5191 [ThreadJob] WARN org.apache.samza.system.kafka.BrokerProxy - It appears that we received an invalid or empty offset None for [input,0]. Attempting to use Kafka's auto.offset.reset setting. This can result in data loss if processing continues.
- 5205 [ThreadJob] INFO org.apache.samza.system.kafka.GetOffset - Checking if auto.offset.reset is defined for topic input
- 5225 [ThreadJob] INFO org.apache.samza.system.kafka.GetOffset - Got reset of type largest.
- 5265 [ThreadJob] INFO org.apache.samza.system.kafka.BrokerProxy - Starting BrokerProxy for localhost:53077
- 5296 [ThreadJob] INFO org.apache.samza.container.SamzaContainer - Entering run loop.
- 5420 [ThreadJob] INFO org.apache.samza.system.kafka.KafkaSystemProducer - Creating a new producer for system kafka.
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement