Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from paho.mqtt import client as mqtt
- import ssl
- path_to_root_cert = "<local path to the generated testonly-rootca.pem>"
- device_cert = "<local path to the generated newdevice-cert.pem>"
- device_key = "<local path to the generated newdevice-key.pem>
- HubName = "iothub.azure-devices.net"
- devicename = "device001"
- def on_connect(client, userdata, flags, rc):
- print ("Connected with result code: " + str(rc))
- client.subscribe("devices/" + devicename + "/messages/devicebound/#")
- def on_disconnect(client, userdata, rc):
- print ("Disconnected with result code: " + str(rc))
- def on_message(client, userdata, msg):
- print (msg.topic+" "+str(msg.payload))
- client.publish("devices/" + devicename + "/messages/events/", "{id=1}",qos=1)
- def on_publish(client, userdata, mid):
- print ("Sent message")
- client = mqtt.Client(client_id=devicename, protocol=mqtt.MQTTv311)
- client.on_connect = on_connect
- client.on_disconnect = on_disconnect
- client.on_message = on_message
- client.on_publish = on_publish
- client.username_pw_set(username=HubName + "/" + devicename, password=None)
- client.tls_insecure_set(False)
- client.tls_set(ca_certs=path_to_root_cert, certfile=device_cert, keyfile=device_key, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
- client.connect(HubName, port=8883)
- client.publish("devices/" + devicename + "/messages/events/", "{id=MQTT Test}", qos=1)
- client.loop_forever()
- SSL_Verification_failed
Add Comment
Please, Sign In to add comment