Guest User

Untitled

a guest
Dec 3rd, 2018
180
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.89 KB | None | 0 0
  1. File: server.properties
  2.  
  3. ```
  4. broker.id=1
  5. listeners=EXTERNAL://:9093,INTERNAL://:9092
  6. advertised.listeners=INTERNAL://172.17.20.117:9092,EXTERNAL://52.80.79.248:9093
  7. listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
  8. inter.broker.listener.name=INTERNAL
  9. num.network.threads=3
  10. num.io.threads=8
  11. socket.send.buffer.bytes=102400
  12. socket.receive.buffer.bytes=102400
  13. socket.request.max.bytes=104857600
  14. log.dirs=/opt/kafkalogs
  15. num.partitions=4
  16. num.recovery.threads.per.data.dir=1
  17. offsets.topic.replication.factor=1
  18. transaction.state.log.replication.factor=1
  19. transaction.state.log.min.isr=1
  20. log.retention.hours=168
  21. log.segment.bytes=1073741824
  22. log.retention.check.interval.ms=300000
  23. zookeeper.connect=172.17.21.246:2181,172.17.20.117:2181,172.17.20.255:2181
  24. zookeeper.connection.timeout.ms=6000
  25. group.initial.rebalance.delay.ms=0
  26. ```
  27.  
  28. File: client.py
  29. ```
  30. #!/usr/bin/env python3
  31.  
  32. from kafka import KafkaConsumer
  33. from kafka.client_async import KafkaClient, selectors
  34. from kafka.metrics import MetricConfig, Metrics
  35. from kafka.cluster import ClusterMetadata
  36.  
  37. import sys
  38. import time
  39. import socket
  40.  
  41.  
  42.  
  43. #BOOTSTART_SERVERS = 'kafka-ext-809d576b1603e92b.elb.cn-north-1.amazonaws.com.cn:9093'
  44. BOOTSTART_SERVERS = 'kafka-lb-internal-ac1ce140e307bd3f.elb.cn-north-1.amazonaws.com.cn'
  45. topic = 'test'
  46. # topic = 'colourlife_transaction'
  47. #adminuser = 'colourlifeadmin'
  48. #adminpass = '6hGawfdXnnj2'
  49. #consumeruser = 'consumer'
  50. #consumerpass = 'consumer-0tshrST6'
  51. #produceruser = 'producer'
  52. #producerpass = 'producer-75tggKRm'
  53. now = str(time.time()).encode('utf-8')
  54.  
  55. #consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTART_SERVERS, enable_auto_commit=True)
  56. #consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTART_SERVERS, security_protocol="SASL_PLAINTEXT",
  57. # sasl_mechanism='PLAIN',
  58. # sasl_plain_username=consumeruser,
  59. # sasl_plain_password=consumerpass,
  60. # group_id='my_favorite_group', enable_auto_commit=True)
  61. '''
  62. consumer.subscribe([topic])
  63. print(consumer.subscription())
  64.  
  65. try:
  66. for message in consumer:
  67. print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
  68. message.offset, message.key, message.value))
  69. #print('assignment: ', consumer.assignment())
  70. #print('partitions', consumer.partitions_for_topic(topic))
  71. print('message: ', message)
  72. print('\n\n')
  73. except KeyboardInterrupt:
  74. consumer.close()
  75. sys.exit(0)
  76.  
  77. consumer.close()
  78. '''
  79. DEFAULT_CONFIG = {
  80. 'bootstrap_servers': BOOTSTART_SERVERS,
  81. 'client_id': 'kafka-python-test',
  82. 'group_id': None,
  83. 'key_deserializer': None,
  84. 'value_deserializer': None,
  85. 'fetch_max_wait_ms': 500,
  86. 'fetch_min_bytes': 1,
  87. 'fetch_max_bytes': 52428800,
  88. 'max_partition_fetch_bytes': 1 * 1024 * 1024,
  89. 'request_timeout_ms': 305000, # chosen to be higher than the default of max_poll_interval_ms
  90. 'retry_backoff_ms': 100,
  91. 'reconnect_backoff_ms': 50,
  92. 'reconnect_backoff_max_ms': 1000,
  93. 'max_in_flight_requests_per_connection': 5,
  94. 'auto_offset_reset': 'latest',
  95. 'enable_auto_commit': True,
  96. 'auto_commit_interval_ms': 5000,
  97. 'default_offset_commit_callback': lambda offsets, response: True,
  98. 'check_crcs': True,
  99. 'metadata_max_age_ms': 5 * 60 * 1000,
  100. # 'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),
  101. 'max_poll_records': 500,
  102. 'max_poll_interval_ms': 300000,
  103. 'session_timeout_ms': 10000,
  104. 'heartbeat_interval_ms': 3000,
  105. 'receive_buffer_bytes': None,
  106. 'send_buffer_bytes': None,
  107. 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
  108. 'sock_chunk_bytes': 4096, # undocumented experimental option
  109. 'sock_chunk_buffer_count': 1000, # undocumented experimental option
  110. 'consumer_timeout_ms': float('inf'),
  111. 'skip_double_compressed_messages': False,
  112. 'security_protocol': 'PLAINTEXT',
  113. 'ssl_context': None,
  114. 'ssl_check_hostname': True,
  115. 'ssl_cafile': None,
  116. 'ssl_certfile': None,
  117. 'ssl_keyfile': None,
  118. 'ssl_crlfile': None,
  119. 'ssl_password': None,
  120. 'api_version': None,
  121. 'api_version_auto_timeout_ms': 2000,
  122. 'connections_max_idle_ms': 9 * 60 * 1000,
  123. 'metric_reporters': [],
  124. 'metrics_num_samples': 2,
  125. 'metrics_sample_window_ms': 30000,
  126. 'metric_group_prefix': 'consumer',
  127. 'selector': selectors.DefaultSelector,
  128. 'exclude_internal_topics': True,
  129. 'sasl_mechanism': None,
  130. 'sasl_plain_username': None,
  131. 'sasl_plain_password': None,
  132. 'sasl_kerberos_service_name': 'kafka',
  133. 'sasl_kerberos_domain_name': None
  134. }
  135. DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000
  136.  
  137. metrics_tags = {'client-id': DEFAULT_CONFIG['client_id']}
  138. metric_config = MetricConfig(samples=DEFAULT_CONFIG['metrics_num_samples'],
  139. time_window_ms=DEFAULT_CONFIG['metrics_sample_window_ms'],
  140. tags=metrics_tags)
  141. reporters = [reporter() for reporter in DEFAULT_CONFIG['metric_reporters']]
  142. metrics = Metrics(metric_config, reporters)
  143. client = KafkaClient(metrics=metrics, **DEFAULT_CONFIG)
  144. print(client.cluster)
  145. print(client.cluster.brokers())
  146.  
  147. '''
  148. cluster = ClusterMetadata(**DEFAULT_CONFIG)
  149.  
  150. bootstrap = BrokerConnection(host, port, afi,
  151. state_change_callback=cb,
  152. node_id='bootstrap',
  153. **self.config)
  154.  
  155. metadata_request = MetadataRequest[1](None)
  156. future = bootstrap.send(metadata_request)
  157. cluster.update_metadata(future.value)
  158. '''
  159. ```
  160.  
  161.  
  162. File: producer.py
  163.  
  164. ```
  165. #!/usr/bin/env python3
  166.  
  167.  
  168. from kafka import KafkaProducer
  169. from kafka.errors import KafkaError
  170. import time
  171.  
  172.  
  173. BOOTSTART_SERVERS = 'kafka-lb-internal-ac1ce140e307bd3f.elb.cn-north-1.amazonaws.com.cn'
  174. topic = 'test'
  175. #adminuser = 'colourlifeadmin'
  176. #adminpass = '6hGawfdXnnj2'
  177. #consumeruser = 'consumer'
  178. #consumerpass = 'consumer-0tshrST6'
  179. #produceruser = 'producer'
  180. #producerpass = 'producer-75tggKRm'
  181.  
  182. producer = KafkaProducer(bootstrap_servers=BOOTSTART_SERVERS, api_version='auto')
  183. #producer = KafkaProducer(bootstrap_servers=BOOTSTART_SERVERS, api_version=(0,10),
  184. # security_protocol="SASL_PLAINTEXT",
  185. # sasl_mechanism='PLAIN',
  186. # sasl_plain_username=produceruser,
  187. # sasl_plain_password=producerpass)
  188. for i in range(30):
  189. now = str(time.time()).encode('utf-8')
  190. print('data: ', now)
  191. future = producer.send(topic, now)
  192. record_metadata = future.get(timeout=10)
  193. print(record_metadata)
  194. time.sleep(3)
  195. i += 1
  196. print('\n')
  197. #try:
  198. # record_metadata = future.get(timeout=10)
  199. #except KafkaError:
  200. # # log.exception()
  201. # print('err')
  202. # pass
  203.  
  204.  
  205. producer.close()
  206. ```
Add Comment
Please, Sign In to add comment