Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- File: server.properties
- ```
- broker.id=1
- listeners=EXTERNAL://:9093,INTERNAL://:9092
- advertised.listeners=INTERNAL://172.17.20.117:9092,EXTERNAL://52.80.79.248:9093
- listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
- inter.broker.listener.name=INTERNAL
- num.network.threads=3
- num.io.threads=8
- socket.send.buffer.bytes=102400
- socket.receive.buffer.bytes=102400
- socket.request.max.bytes=104857600
- log.dirs=/opt/kafkalogs
- num.partitions=4
- num.recovery.threads.per.data.dir=1
- offsets.topic.replication.factor=1
- transaction.state.log.replication.factor=1
- transaction.state.log.min.isr=1
- log.retention.hours=168
- log.segment.bytes=1073741824
- log.retention.check.interval.ms=300000
- zookeeper.connect=172.17.21.246:2181,172.17.20.117:2181,172.17.20.255:2181
- zookeeper.connection.timeout.ms=6000
- group.initial.rebalance.delay.ms=0
- ```
- File: client.py
- ```
- #!/usr/bin/env python3
- from kafka import KafkaConsumer
- from kafka.client_async import KafkaClient, selectors
- from kafka.metrics import MetricConfig, Metrics
- from kafka.cluster import ClusterMetadata
- import sys
- import time
- import socket
- #BOOTSTART_SERVERS = 'kafka-ext-809d576b1603e92b.elb.cn-north-1.amazonaws.com.cn:9093'
- BOOTSTART_SERVERS = 'kafka-lb-internal-ac1ce140e307bd3f.elb.cn-north-1.amazonaws.com.cn'
- topic = 'test'
- # topic = 'colourlife_transaction'
- #adminuser = 'colourlifeadmin'
- #adminpass = '6hGawfdXnnj2'
- #consumeruser = 'consumer'
- #consumerpass = 'consumer-0tshrST6'
- #produceruser = 'producer'
- #producerpass = 'producer-75tggKRm'
- now = str(time.time()).encode('utf-8')
- #consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTART_SERVERS, enable_auto_commit=True)
- #consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTART_SERVERS, security_protocol="SASL_PLAINTEXT",
- # sasl_mechanism='PLAIN',
- # sasl_plain_username=consumeruser,
- # sasl_plain_password=consumerpass,
- # group_id='my_favorite_group', enable_auto_commit=True)
- '''
- consumer.subscribe([topic])
- print(consumer.subscription())
- try:
- for message in consumer:
- print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
- message.offset, message.key, message.value))
- #print('assignment: ', consumer.assignment())
- #print('partitions', consumer.partitions_for_topic(topic))
- print('message: ', message)
- print('\n\n')
- except KeyboardInterrupt:
- consumer.close()
- sys.exit(0)
- consumer.close()
- '''
- DEFAULT_CONFIG = {
- 'bootstrap_servers': BOOTSTART_SERVERS,
- 'client_id': 'kafka-python-test',
- 'group_id': None,
- 'key_deserializer': None,
- 'value_deserializer': None,
- 'fetch_max_wait_ms': 500,
- 'fetch_min_bytes': 1,
- 'fetch_max_bytes': 52428800,
- 'max_partition_fetch_bytes': 1 * 1024 * 1024,
- 'request_timeout_ms': 305000, # chosen to be higher than the default of max_poll_interval_ms
- 'retry_backoff_ms': 100,
- 'reconnect_backoff_ms': 50,
- 'reconnect_backoff_max_ms': 1000,
- 'max_in_flight_requests_per_connection': 5,
- 'auto_offset_reset': 'latest',
- 'enable_auto_commit': True,
- 'auto_commit_interval_ms': 5000,
- 'default_offset_commit_callback': lambda offsets, response: True,
- 'check_crcs': True,
- 'metadata_max_age_ms': 5 * 60 * 1000,
- # 'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),
- 'max_poll_records': 500,
- 'max_poll_interval_ms': 300000,
- 'session_timeout_ms': 10000,
- 'heartbeat_interval_ms': 3000,
- 'receive_buffer_bytes': None,
- 'send_buffer_bytes': None,
- 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
- 'sock_chunk_bytes': 4096, # undocumented experimental option
- 'sock_chunk_buffer_count': 1000, # undocumented experimental option
- 'consumer_timeout_ms': float('inf'),
- 'skip_double_compressed_messages': False,
- 'security_protocol': 'PLAINTEXT',
- 'ssl_context': None,
- 'ssl_check_hostname': True,
- 'ssl_cafile': None,
- 'ssl_certfile': None,
- 'ssl_keyfile': None,
- 'ssl_crlfile': None,
- 'ssl_password': None,
- 'api_version': None,
- 'api_version_auto_timeout_ms': 2000,
- 'connections_max_idle_ms': 9 * 60 * 1000,
- 'metric_reporters': [],
- 'metrics_num_samples': 2,
- 'metrics_sample_window_ms': 30000,
- 'metric_group_prefix': 'consumer',
- 'selector': selectors.DefaultSelector,
- 'exclude_internal_topics': True,
- 'sasl_mechanism': None,
- 'sasl_plain_username': None,
- 'sasl_plain_password': None,
- 'sasl_kerberos_service_name': 'kafka',
- 'sasl_kerberos_domain_name': None
- }
- DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000
- metrics_tags = {'client-id': DEFAULT_CONFIG['client_id']}
- metric_config = MetricConfig(samples=DEFAULT_CONFIG['metrics_num_samples'],
- time_window_ms=DEFAULT_CONFIG['metrics_sample_window_ms'],
- tags=metrics_tags)
- reporters = [reporter() for reporter in DEFAULT_CONFIG['metric_reporters']]
- metrics = Metrics(metric_config, reporters)
- client = KafkaClient(metrics=metrics, **DEFAULT_CONFIG)
- print(client.cluster)
- print(client.cluster.brokers())
- '''
- cluster = ClusterMetadata(**DEFAULT_CONFIG)
- bootstrap = BrokerConnection(host, port, afi,
- state_change_callback=cb,
- node_id='bootstrap',
- **self.config)
- metadata_request = MetadataRequest[1](None)
- future = bootstrap.send(metadata_request)
- cluster.update_metadata(future.value)
- '''
- ```
- File: producer.py
- ```
- #!/usr/bin/env python3
- from kafka import KafkaProducer
- from kafka.errors import KafkaError
- import time
- BOOTSTART_SERVERS = 'kafka-lb-internal-ac1ce140e307bd3f.elb.cn-north-1.amazonaws.com.cn'
- topic = 'test'
- #adminuser = 'colourlifeadmin'
- #adminpass = '6hGawfdXnnj2'
- #consumeruser = 'consumer'
- #consumerpass = 'consumer-0tshrST6'
- #produceruser = 'producer'
- #producerpass = 'producer-75tggKRm'
- producer = KafkaProducer(bootstrap_servers=BOOTSTART_SERVERS, api_version='auto')
- #producer = KafkaProducer(bootstrap_servers=BOOTSTART_SERVERS, api_version=(0,10),
- # security_protocol="SASL_PLAINTEXT",
- # sasl_mechanism='PLAIN',
- # sasl_plain_username=produceruser,
- # sasl_plain_password=producerpass)
- for i in range(30):
- now = str(time.time()).encode('utf-8')
- print('data: ', now)
- future = producer.send(topic, now)
- record_metadata = future.get(timeout=10)
- print(record_metadata)
- time.sleep(3)
- i += 1
- print('\n')
- #try:
- # record_metadata = future.get(timeout=10)
- #except KafkaError:
- # # log.exception()
- # print('err')
- # pass
- producer.close()
- ```
Add Comment
Please, Sign In to add comment