Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import time
- import threading, Queue
- from flask import Flask, jsonify, abort, request
- from neopixel import *
- from ledconf import *
- app = Flask(__name__)
- #======== NeoPixel RGB-LED ========
- strip = Adafruit_NeoPixel(LED_COUNT, LED_PIN, LED_FREQ_HZ, LED_DMA, LED_INVERT, LED_BRIGHTNESS)
- strip.begin()
- strip_q = Queue.Queue()
- def wheel(pos):
- """Generate rainbow colors across 0-255 positions."""
- if pos < 85:
- return Color(pos * 3, 255 - pos * 3, 0)
- elif pos < 170:
- pos -= 85
- return Color(255 - pos * 3, 0, pos * 3)
- else:
- pos -= 170
- return Color(0, pos * 3, 255 - pos * 3)
- def colorWipe(strip, color, wait_ms=50):
- """Wipe color across display a pixel at a time."""
- qwe = strip.get_nowait()
- for i in range(qwe.numPixels()):
- qwe.setPixelColor(i, color)
- qwe.show()
- time.sleep(wait_ms/1000.0)
- strip.task_done()
- def rainbow(strip, wait_ms=20, iterations=1):
- """Draw rainbow that fades across all pixels at once."""
- qwe = strip.get_nowait()
- for j in range(256*iterations):
- for i in range(qwe.numPixels()):
- qwe.setPixelColor(i, wheel((i+j) & 255))
- qwe.show()
- time.sleep(wait_ms/1000.0)
- strip.task_done()
- #======== end NeoPixel ========
- @app.route('/api/v1.0/led/colorWipe', methods=['PUT'])
- def apicolorWipe():
- if not request.json:
- abort(400)
- if not 'color' in request.json:
- abort(400)
- if not 'wait_ms' in request.json:
- wait_ms = 50
- else:
- wait_ms = int(request.json.get('wait_ms'))
- lcolor = request.json.get('color').split()
- color = Color(int(lcolor[0]), int(lcolor[1]), int(lcolor[2]))
- try:
- strip_q.put(strip)
- t = threading.Thread(target=colorWipe, args=(strip_q, color, wait_ms,))
- t.daemon = True
- t.start()
- # t.join()
- return jsonify({'colorWipe': 'OK'})
- except:
- abort(501)
- @app.route('/api/v1.0/led/rainbow', methods=['PUT'])
- def apirainbow():
- if not request.json:
- abort(400)
- if not 'wait_ms' in request.json:
- wait_ms = 20
- else:
- wait_ms = int(request.json.get('wait_ms'))
- if not 'iterations' in request.json:
- iterations = 1
- else:
- iterations = int(request.json.get('iterations'))
- try:
- strip_q.put(strip)
- u = threading.Thread(target=rainbow, args=(strip_q, wait_ms, iterations,))
- u.daemon = True
- u.start()
- # u.join()
- return jsonify({'rainbow': 'OK'})
- except:
- abort(501)
- if __name__ == "__main__":
- print("starting...")
- app.run(host='0.0.0.0', port=8001, debug=False)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement