added local version of pixel ring
parent
69795e880f
commit
12bcf9a5ac
@ -0,0 +1,52 @@
|
|||||||
|
|
||||||
|
|
||||||
|
from . import usb_pixel_ring_v1
|
||||||
|
from . import usb_pixel_ring_v2
|
||||||
|
from .apa102_pixel_ring import PixelRing
|
||||||
|
|
||||||
|
pixel_ring = usb_pixel_ring_v2.find()
|
||||||
|
|
||||||
|
if not pixel_ring:
|
||||||
|
pixel_ring = usb_pixel_ring_v1.find()
|
||||||
|
|
||||||
|
if not pixel_ring:
|
||||||
|
pixel_ring = PixelRing()
|
||||||
|
|
||||||
|
|
||||||
|
USAGE = '''
|
||||||
|
If the hardware is ReSpeaker 4 Mic Array for Pi or ReSpeaker V2,
|
||||||
|
there is a power-enable pin which should be enabled at first.
|
||||||
|
+ ReSpeaker 4 Mic Array for Pi:
|
||||||
|
|
||||||
|
import gpiozero
|
||||||
|
power = LED(5)
|
||||||
|
power.on()
|
||||||
|
|
||||||
|
+ ReSpeaker V2:
|
||||||
|
|
||||||
|
import mraa
|
||||||
|
power = mraa.Gpio(12)
|
||||||
|
power.dir(mraa.DIR_OUT)
|
||||||
|
power.write(0)
|
||||||
|
'''
|
||||||
|
|
||||||
|
def main():
|
||||||
|
import time
|
||||||
|
|
||||||
|
if isinstance(pixel_ring, usb_pixel_ring_v2.PixelRing):
|
||||||
|
print('Found ReSpeaker USB 4 Mic Array')
|
||||||
|
elif isinstance(pixel_ring, usb_pixel_ring_v1.UsbPixelRing):
|
||||||
|
print('Found ReSpeaker USB 6+1 Mic Array')
|
||||||
|
else:
|
||||||
|
print('Control APA102 RGB LEDs via SPI')
|
||||||
|
print(USAGE)
|
||||||
|
|
||||||
|
pixel_ring.think()
|
||||||
|
time.sleep(3)
|
||||||
|
pixel_ring.off()
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
|
|
@ -0,0 +1,243 @@
|
|||||||
|
"""
|
||||||
|
from https://github.com/tinue/APA102_Pi
|
||||||
|
This is the main driver module for APA102 LEDs
|
||||||
|
"""
|
||||||
|
import spidev
|
||||||
|
from math import ceil
|
||||||
|
|
||||||
|
RGB_MAP = { 'rgb': [3, 2, 1], 'rbg': [3, 1, 2], 'grb': [2, 3, 1],
|
||||||
|
'gbr': [2, 1, 3], 'brg': [1, 3, 2], 'bgr': [1, 2, 3] }
|
||||||
|
|
||||||
|
class APA102:
|
||||||
|
"""
|
||||||
|
Driver for APA102 LEDS (aka "DotStar").
|
||||||
|
|
||||||
|
(c) Martin Erzberger 2016-2017
|
||||||
|
|
||||||
|
My very first Python code, so I am sure there is a lot to be optimized ;)
|
||||||
|
|
||||||
|
Public methods are:
|
||||||
|
- set_pixel
|
||||||
|
- set_pixel_rgb
|
||||||
|
- show
|
||||||
|
- clear_strip
|
||||||
|
- cleanup
|
||||||
|
|
||||||
|
Helper methods for color manipulation are:
|
||||||
|
- combine_color
|
||||||
|
- wheel
|
||||||
|
|
||||||
|
The rest of the methods are used internally and should not be used by the
|
||||||
|
user of the library.
|
||||||
|
|
||||||
|
Very brief overview of APA102: An APA102 LED is addressed with SPI. The bits
|
||||||
|
are shifted in one by one, starting with the least significant bit.
|
||||||
|
|
||||||
|
An LED usually just forwards everything that is sent to its data-in to
|
||||||
|
data-out. While doing this, it remembers its own color and keeps glowing
|
||||||
|
with that color as long as there is power.
|
||||||
|
|
||||||
|
An LED can be switched to not forward the data, but instead use the data
|
||||||
|
to change it's own color. This is done by sending (at least) 32 bits of
|
||||||
|
zeroes to data-in. The LED then accepts the next correct 32 bit LED
|
||||||
|
frame (with color information) as its new color setting.
|
||||||
|
|
||||||
|
After having received the 32 bit color frame, the LED changes color,
|
||||||
|
and then resumes to just copying data-in to data-out.
|
||||||
|
|
||||||
|
The really clever bit is this: While receiving the 32 bit LED frame,
|
||||||
|
the LED sends zeroes on its data-out line. Because a color frame is
|
||||||
|
32 bits, the LED sends 32 bits of zeroes to the next LED.
|
||||||
|
As we have seen above, this means that the next LED is now ready
|
||||||
|
to accept a color frame and update its color.
|
||||||
|
|
||||||
|
So that's really the entire protocol:
|
||||||
|
- Start by sending 32 bits of zeroes. This prepares LED 1 to update
|
||||||
|
its color.
|
||||||
|
- Send color information one by one, starting with the color for LED 1,
|
||||||
|
then LED 2 etc.
|
||||||
|
- Finish off by cycling the clock line a few times to get all data
|
||||||
|
to the very last LED on the strip
|
||||||
|
|
||||||
|
The last step is necessary, because each LED delays forwarding the data
|
||||||
|
a bit. Imagine ten people in a row. When you yell the last color
|
||||||
|
information, i.e. the one for person ten, to the first person in
|
||||||
|
the line, then you are not finished yet. Person one has to turn around
|
||||||
|
and yell it to person 2, and so on. So it takes ten additional "dummy"
|
||||||
|
cycles until person ten knows the color. When you look closer,
|
||||||
|
you will see that not even person 9 knows its own color yet. This
|
||||||
|
information is still with person 2. Essentially the driver sends additional
|
||||||
|
zeroes to LED 1 as long as it takes for the last color frame to make it
|
||||||
|
down the line to the last LED.
|
||||||
|
"""
|
||||||
|
# Constants
|
||||||
|
MAX_BRIGHTNESS = 0b11111 # Safeguard: Set to a value appropriate for your setup
|
||||||
|
LED_START = 0b11100000 # Three "1" bits, followed by 5 brightness bits
|
||||||
|
|
||||||
|
def __init__(self, num_led, global_brightness=MAX_BRIGHTNESS,
|
||||||
|
order='rgb', bus=0, device=1, max_speed_hz=8000000):
|
||||||
|
self.num_led = num_led # The number of LEDs in the Strip
|
||||||
|
order = order.lower()
|
||||||
|
self.rgb = RGB_MAP.get(order, RGB_MAP['rgb'])
|
||||||
|
# Limit the brightness to the maximum if it's set higher
|
||||||
|
if global_brightness > self.MAX_BRIGHTNESS:
|
||||||
|
self.global_brightness = self.MAX_BRIGHTNESS
|
||||||
|
else:
|
||||||
|
self.global_brightness = global_brightness
|
||||||
|
|
||||||
|
self.leds = [self.LED_START,0,0,0] * self.num_led # Pixel buffer
|
||||||
|
self.spi = spidev.SpiDev() # Init the SPI device
|
||||||
|
self.spi.open(bus, device) # Open SPI port 0, slave device (CS) 1
|
||||||
|
# Up the speed a bit, so that the LEDs are painted faster
|
||||||
|
if max_speed_hz:
|
||||||
|
self.spi.max_speed_hz = max_speed_hz
|
||||||
|
|
||||||
|
def clock_start_frame(self):
|
||||||
|
"""Sends a start frame to the LED strip.
|
||||||
|
|
||||||
|
This method clocks out a start frame, telling the receiving LED
|
||||||
|
that it must update its own color now.
|
||||||
|
"""
|
||||||
|
self.spi.xfer2([0] * 4) # Start frame, 32 zero bits
|
||||||
|
|
||||||
|
|
||||||
|
def clock_end_frame(self):
|
||||||
|
"""Sends an end frame to the LED strip.
|
||||||
|
|
||||||
|
As explained above, dummy data must be sent after the last real colour
|
||||||
|
information so that all of the data can reach its destination down the line.
|
||||||
|
The delay is not as bad as with the human example above.
|
||||||
|
It is only 1/2 bit per LED. This is because the SPI clock line
|
||||||
|
needs to be inverted.
|
||||||
|
|
||||||
|
Say a bit is ready on the SPI data line. The sender communicates
|
||||||
|
this by toggling the clock line. The bit is read by the LED
|
||||||
|
and immediately forwarded to the output data line. When the clock goes
|
||||||
|
down again on the input side, the LED will toggle the clock up
|
||||||
|
on the output to tell the next LED that the bit is ready.
|
||||||
|
|
||||||
|
After one LED the clock is inverted, and after two LEDs it is in sync
|
||||||
|
again, but one cycle behind. Therefore, for every two LEDs, one bit
|
||||||
|
of delay gets accumulated. For 300 LEDs, 150 additional bits must be fed to
|
||||||
|
the input of LED one so that the data can reach the last LED.
|
||||||
|
|
||||||
|
Ultimately, we need to send additional numLEDs/2 arbitrary data bits,
|
||||||
|
in order to trigger numLEDs/2 additional clock changes. This driver
|
||||||
|
sends zeroes, which has the benefit of getting LED one partially or
|
||||||
|
fully ready for the next update to the strip. An optimized version
|
||||||
|
of the driver could omit the "clockStartFrame" method if enough zeroes have
|
||||||
|
been sent as part of "clockEndFrame".
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.spi.xfer2([0xFF] * 4)
|
||||||
|
|
||||||
|
# Round up num_led/2 bits (or num_led/16 bytes)
|
||||||
|
#for _ in range((self.num_led + 15) // 16):
|
||||||
|
# self.spi.xfer2([0x00])
|
||||||
|
|
||||||
|
|
||||||
|
def clear_strip(self):
|
||||||
|
""" Turns off the strip and shows the result right away."""
|
||||||
|
|
||||||
|
for led in range(self.num_led):
|
||||||
|
self.set_pixel(led, 0, 0, 0)
|
||||||
|
self.show()
|
||||||
|
|
||||||
|
|
||||||
|
def set_pixel(self, led_num, red, green, blue, bright_percent=100):
|
||||||
|
"""Sets the color of one pixel in the LED stripe.
|
||||||
|
|
||||||
|
The changed pixel is not shown yet on the Stripe, it is only
|
||||||
|
written to the pixel buffer. Colors are passed individually.
|
||||||
|
If brightness is not set the global brightness setting is used.
|
||||||
|
"""
|
||||||
|
if led_num < 0:
|
||||||
|
return # Pixel is invisible, so ignore
|
||||||
|
if led_num >= self.num_led:
|
||||||
|
return # again, invisible
|
||||||
|
|
||||||
|
# Calculate pixel brightness as a percentage of the
|
||||||
|
# defined global_brightness. Round up to nearest integer
|
||||||
|
# as we expect some brightness unless set to 0
|
||||||
|
brightness = int(ceil(bright_percent*self.global_brightness/100.0))
|
||||||
|
|
||||||
|
# LED startframe is three "1" bits, followed by 5 brightness bits
|
||||||
|
ledstart = (brightness & 0b00011111) | self.LED_START
|
||||||
|
|
||||||
|
start_index = 4 * led_num
|
||||||
|
self.leds[start_index] = ledstart
|
||||||
|
self.leds[start_index + self.rgb[0]] = red
|
||||||
|
self.leds[start_index + self.rgb[1]] = green
|
||||||
|
self.leds[start_index + self.rgb[2]] = blue
|
||||||
|
|
||||||
|
|
||||||
|
def set_pixel_rgb(self, led_num, rgb_color, bright_percent=100):
|
||||||
|
"""Sets the color of one pixel in the LED stripe.
|
||||||
|
|
||||||
|
The changed pixel is not shown yet on the Stripe, it is only
|
||||||
|
written to the pixel buffer.
|
||||||
|
Colors are passed combined (3 bytes concatenated)
|
||||||
|
If brightness is not set the global brightness setting is used.
|
||||||
|
"""
|
||||||
|
self.set_pixel(led_num, (rgb_color & 0xFF0000) >> 16,
|
||||||
|
(rgb_color & 0x00FF00) >> 8, rgb_color & 0x0000FF,
|
||||||
|
bright_percent)
|
||||||
|
|
||||||
|
|
||||||
|
def rotate(self, positions=1):
|
||||||
|
""" Rotate the LEDs by the specified number of positions.
|
||||||
|
|
||||||
|
Treating the internal LED array as a circular buffer, rotate it by
|
||||||
|
the specified number of positions. The number could be negative,
|
||||||
|
which means rotating in the opposite direction.
|
||||||
|
"""
|
||||||
|
cutoff = 4 * (positions % self.num_led)
|
||||||
|
self.leds = self.leds[cutoff:] + self.leds[:cutoff]
|
||||||
|
|
||||||
|
|
||||||
|
def show(self):
|
||||||
|
"""Sends the content of the pixel buffer to the strip.
|
||||||
|
|
||||||
|
Todo: More than 1024 LEDs requires more than one xfer operation.
|
||||||
|
"""
|
||||||
|
self.clock_start_frame()
|
||||||
|
# xfer2 kills the list, unfortunately. So it must be copied first
|
||||||
|
# SPI takes up to 4096 Integers. So we are fine for up to 1024 LEDs.
|
||||||
|
data = list(self.leds)
|
||||||
|
while data:
|
||||||
|
self.spi.xfer2(data[:32])
|
||||||
|
data = data[32:]
|
||||||
|
self.clock_end_frame()
|
||||||
|
|
||||||
|
|
||||||
|
def cleanup(self):
|
||||||
|
"""Release the SPI device; Call this method at the end"""
|
||||||
|
|
||||||
|
self.spi.close() # Close SPI port
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def combine_color(red, green, blue):
|
||||||
|
"""Make one 3*8 byte color value."""
|
||||||
|
|
||||||
|
return (red << 16) + (green << 8) + blue
|
||||||
|
|
||||||
|
|
||||||
|
def wheel(self, wheel_pos):
|
||||||
|
"""Get a color from a color wheel; Green -> Red -> Blue -> Green"""
|
||||||
|
|
||||||
|
if wheel_pos > 255:
|
||||||
|
wheel_pos = 255 # Safeguard
|
||||||
|
if wheel_pos < 85: # Green -> Red
|
||||||
|
return self.combine_color(wheel_pos * 3, 255 - wheel_pos * 3, 0)
|
||||||
|
if wheel_pos < 170: # Red -> Blue
|
||||||
|
wheel_pos -= 85
|
||||||
|
return self.combine_color(255 - wheel_pos * 3, 0, wheel_pos * 3)
|
||||||
|
# Blue -> Green
|
||||||
|
wheel_pos -= 170
|
||||||
|
return self.combine_color(0, wheel_pos * 3, 255 - wheel_pos * 3)
|
||||||
|
|
||||||
|
|
||||||
|
def dump_array(self):
|
||||||
|
"""For debug purposes: Dump the LED array onto the console."""
|
||||||
|
|
||||||
|
print(self.leds)
|
@ -0,0 +1,105 @@
|
|||||||
|
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
try:
|
||||||
|
import queue as Queue
|
||||||
|
except ImportError:
|
||||||
|
import Queue as Queue
|
||||||
|
|
||||||
|
from .apa102 import APA102
|
||||||
|
from .pattern import Echo, GoogleHome
|
||||||
|
|
||||||
|
|
||||||
|
class PixelRing(object):
|
||||||
|
PIXELS_N = 12
|
||||||
|
|
||||||
|
def __init__(self, pattern='google'):
|
||||||
|
if pattern == 'echo':
|
||||||
|
self.pattern = Echo(show=self.show)
|
||||||
|
else:
|
||||||
|
self.pattern = GoogleHome(show=self.show)
|
||||||
|
|
||||||
|
self.dev = APA102(num_led=self.PIXELS_N)
|
||||||
|
|
||||||
|
self.queue = Queue.Queue()
|
||||||
|
self.thread = threading.Thread(target=self._run)
|
||||||
|
self.thread.daemon = True
|
||||||
|
self.thread.start()
|
||||||
|
self.off()
|
||||||
|
|
||||||
|
def set_brightness(self, brightness):
|
||||||
|
if brightness > 100:
|
||||||
|
brightness = 100
|
||||||
|
|
||||||
|
if brightness > 0:
|
||||||
|
self.dev.global_brightness = int(0b11111 * brightness / 100)
|
||||||
|
|
||||||
|
def change_pattern(self, pattern):
|
||||||
|
if pattern == 'echo':
|
||||||
|
self.pattern = Echo(show=self.show)
|
||||||
|
else:
|
||||||
|
self.pattern = GoogleHome(show=self.show)
|
||||||
|
|
||||||
|
def wakeup(self, direction=0):
|
||||||
|
def f():
|
||||||
|
self.pattern.wakeup(direction)
|
||||||
|
|
||||||
|
self.put(f)
|
||||||
|
|
||||||
|
def listen(self):
|
||||||
|
self.put(self.pattern.listen)
|
||||||
|
|
||||||
|
def think(self):
|
||||||
|
self.put(self.pattern.think)
|
||||||
|
|
||||||
|
wait = think
|
||||||
|
|
||||||
|
def speak(self):
|
||||||
|
self.put(self.pattern.speak)
|
||||||
|
|
||||||
|
def off(self):
|
||||||
|
self.put(self.pattern.off)
|
||||||
|
|
||||||
|
def put(self, func):
|
||||||
|
self.pattern.stop = True
|
||||||
|
self.queue.put(func)
|
||||||
|
|
||||||
|
def _run(self):
|
||||||
|
while True:
|
||||||
|
func = self.queue.get()
|
||||||
|
self.pattern.stop = False
|
||||||
|
func()
|
||||||
|
|
||||||
|
def show(self, data):
|
||||||
|
for i in range(self.PIXELS_N):
|
||||||
|
self.dev.set_pixel(i, int(data[4*i + 1]), int(data[4*i + 2]), int(data[4*i + 3]))
|
||||||
|
|
||||||
|
self.dev.show()
|
||||||
|
|
||||||
|
def set_color(self, rgb=None, r=0, g=0, b=0):
|
||||||
|
if rgb:
|
||||||
|
r, g, b = (rgb >> 16) & 0xFF, (rgb >> 8) & 0xFF, rgb & 0xFF
|
||||||
|
for i in range(self.PIXELS_N):
|
||||||
|
self.dev.set_pixel(i, r, g, b)
|
||||||
|
|
||||||
|
self.dev.show()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
pixel_ring = PixelRing()
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
pixel_ring.wakeup()
|
||||||
|
time.sleep(3)
|
||||||
|
pixel_ring.think()
|
||||||
|
time.sleep(3)
|
||||||
|
pixel_ring.speak()
|
||||||
|
time.sleep(6)
|
||||||
|
pixel_ring.off()
|
||||||
|
time.sleep(3)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
break
|
||||||
|
|
||||||
|
|
||||||
|
pixel_ring.off()
|
||||||
|
time.sleep(1)
|
@ -0,0 +1,145 @@
|
|||||||
|
"""
|
||||||
|
LED pattern like Echo
|
||||||
|
"""
|
||||||
|
|
||||||
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
class Echo(object):
|
||||||
|
brightness = 24 * 8
|
||||||
|
|
||||||
|
def __init__(self, show, number=12):
|
||||||
|
self.pixels_number = number
|
||||||
|
self.pixels = [0] * 4 * number
|
||||||
|
|
||||||
|
if not callable(show):
|
||||||
|
raise ValueError('show parameter is not callable')
|
||||||
|
|
||||||
|
self.show = show
|
||||||
|
self.stop = False
|
||||||
|
|
||||||
|
def wakeup(self, direction=0):
|
||||||
|
position = int((direction + 15) / (360 / self.pixels_number)) % self.pixels_number
|
||||||
|
|
||||||
|
pixels = [0, 0, 0, self.brightness] * self.pixels_number
|
||||||
|
pixels[position * 4 + 2] = self.brightness
|
||||||
|
|
||||||
|
self.show(pixels)
|
||||||
|
|
||||||
|
def listen(self):
|
||||||
|
pixels = [0, 0, 0, self.brightness] * self.pixels_number
|
||||||
|
|
||||||
|
self.show(pixels)
|
||||||
|
|
||||||
|
def think(self):
|
||||||
|
half_brightness = int(self.brightness / 2)
|
||||||
|
pixels = [0, 0, half_brightness, half_brightness, 0, 0, 0, self.brightness] * self.pixels_number
|
||||||
|
|
||||||
|
while not self.stop:
|
||||||
|
self.show(pixels)
|
||||||
|
time.sleep(0.2)
|
||||||
|
pixels = pixels[-4:] + pixels[:-4]
|
||||||
|
|
||||||
|
def speak(self):
|
||||||
|
step = int(self.brightness / 12)
|
||||||
|
position = int(self.brightness / 2)
|
||||||
|
while not self.stop:
|
||||||
|
pixels = [0, 0, position, self.brightness - position] * self.pixels_number
|
||||||
|
self.show(pixels)
|
||||||
|
time.sleep(0.01)
|
||||||
|
if position <= 0:
|
||||||
|
step = int(self.brightness / 12)
|
||||||
|
time.sleep(0.4)
|
||||||
|
elif position >= int(self.brightness / 2):
|
||||||
|
step = - int(self.brightness / 12)
|
||||||
|
time.sleep(0.4)
|
||||||
|
|
||||||
|
position += step
|
||||||
|
|
||||||
|
def off(self):
|
||||||
|
self.show([0] * 4 * 12)
|
||||||
|
|
||||||
|
class GoogleHome(object):
|
||||||
|
def __init__(self, show):
|
||||||
|
self.basis = [0] * 4 * 12
|
||||||
|
self.basis[0 * 4 + 1] = 8
|
||||||
|
self.basis[3 * 4 + 1] = 4
|
||||||
|
self.basis[3 * 4 + 2] = 4
|
||||||
|
self.basis[6 * 4 + 2] = 8
|
||||||
|
self.basis[9 * 4 + 3] = 8
|
||||||
|
|
||||||
|
self.pixels = self.basis
|
||||||
|
|
||||||
|
if not callable(show):
|
||||||
|
raise ValueError('show parameter is not callable')
|
||||||
|
|
||||||
|
self.show = show
|
||||||
|
self.stop = False
|
||||||
|
|
||||||
|
def wakeup(self, direction=0):
|
||||||
|
position = int((direction + 90 + 15) / 30) % 12
|
||||||
|
|
||||||
|
basis = self.basis[position*-4:] + self.basis[:position*-4]
|
||||||
|
|
||||||
|
pixels = [v * 25 for v in basis]
|
||||||
|
self.show(pixels)
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
pixels = pixels[-4:] + pixels[:-4]
|
||||||
|
self.show(pixels)
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
for i in range(2):
|
||||||
|
new_pixels = pixels[-4:] + pixels[:-4]
|
||||||
|
|
||||||
|
self.show([v/2+pixels[index] for index, v in enumerate(new_pixels)])
|
||||||
|
pixels = new_pixels
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
self.show(pixels)
|
||||||
|
self.pixels = pixels
|
||||||
|
|
||||||
|
def listen(self):
|
||||||
|
pixels = self.pixels
|
||||||
|
for i in range(1, 25):
|
||||||
|
self.show([(v * i / 24) for v in pixels])
|
||||||
|
time.sleep(0.01)
|
||||||
|
|
||||||
|
def think(self):
|
||||||
|
pixels = self.pixels
|
||||||
|
|
||||||
|
while not self.stop:
|
||||||
|
pixels = pixels[-4:] + pixels[:-4]
|
||||||
|
self.show(pixels)
|
||||||
|
time.sleep(0.2)
|
||||||
|
|
||||||
|
t = 0.1
|
||||||
|
for i in range(0, 5):
|
||||||
|
pixels = pixels[-4:] + pixels[:-4]
|
||||||
|
self.show([(v * (4 - i) / 4) for v in pixels])
|
||||||
|
time.sleep(t)
|
||||||
|
t /= 2
|
||||||
|
|
||||||
|
self.pixels = pixels
|
||||||
|
|
||||||
|
def speak(self):
|
||||||
|
pixels = self.pixels
|
||||||
|
step = 1
|
||||||
|
brightness = 5
|
||||||
|
while not self.stop:
|
||||||
|
self.show([(v * brightness / 24) for v in pixels])
|
||||||
|
time.sleep(0.02)
|
||||||
|
|
||||||
|
if brightness <= 5:
|
||||||
|
step = 1
|
||||||
|
time.sleep(0.4)
|
||||||
|
elif brightness >= 24:
|
||||||
|
step = -1
|
||||||
|
time.sleep(0.4)
|
||||||
|
|
||||||
|
brightness += step
|
||||||
|
|
||||||
|
def off(self):
|
||||||
|
self.show([0] * 4 * 12)
|
||||||
|
|
||||||
|
|
@ -0,0 +1,26 @@
|
|||||||
|
|
||||||
|
|
||||||
|
class PixelRing(object):
|
||||||
|
def __init__(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def show(self, data):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def set_color(self, rgb=None, r=0, g=0, b=0):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def wakeup(self, angle=None):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def listen(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def think(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def speak(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def off(self):
|
||||||
|
pass
|
@ -0,0 +1,190 @@
|
|||||||
|
|
||||||
|
import usb.core
|
||||||
|
import usb.util
|
||||||
|
|
||||||
|
|
||||||
|
class HidDevice:
|
||||||
|
"""
|
||||||
|
This class provides basic functions to access
|
||||||
|
a USB HID device to write an endpoint
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, dev, ep_in, ep_out):
|
||||||
|
self.dev = dev
|
||||||
|
self.ep_in = ep_in
|
||||||
|
self.ep_out = ep_out
|
||||||
|
|
||||||
|
def write(self, data):
|
||||||
|
"""
|
||||||
|
write data on the OUT endpoint associated to the HID interface
|
||||||
|
"""
|
||||||
|
self.ep_out.write(data)
|
||||||
|
|
||||||
|
def read(self):
|
||||||
|
return self.ep_in.read(self.ep_in.wMaxPacketSize, -1)
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
"""
|
||||||
|
close the interface
|
||||||
|
"""
|
||||||
|
usb.util.dispose_resources(self.dev)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def find(vid=0x2886, pid=0x0007):
|
||||||
|
dev = usb.core.find(idVendor=vid, idProduct=pid)
|
||||||
|
if not dev:
|
||||||
|
return
|
||||||
|
|
||||||
|
config = dev.get_active_configuration()
|
||||||
|
|
||||||
|
# Iterate on all interfaces to find a HID interface
|
||||||
|
ep_in, ep_out = None, None
|
||||||
|
for interface in config:
|
||||||
|
if interface.bInterfaceClass == 0x03:
|
||||||
|
try:
|
||||||
|
if dev.is_kernel_driver_active(interface.bInterfaceNumber):
|
||||||
|
dev.detach_kernel_driver(interface.bInterfaceNumber)
|
||||||
|
except Exception as e:
|
||||||
|
print(e.message)
|
||||||
|
|
||||||
|
for ep in interface:
|
||||||
|
if ep.bEndpointAddress & 0x80:
|
||||||
|
ep_in = ep
|
||||||
|
else:
|
||||||
|
ep_out = ep
|
||||||
|
break
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if ep_in and ep_out:
|
||||||
|
hid = HidDevice(dev, ep_in, ep_out)
|
||||||
|
|
||||||
|
return hid
|
||||||
|
|
||||||
|
|
||||||
|
class UsbPixelRing:
|
||||||
|
PIXELS_N = 12
|
||||||
|
|
||||||
|
MONO = 1
|
||||||
|
THINK = 3
|
||||||
|
VOLUME = 5
|
||||||
|
CUSTOM = 6
|
||||||
|
|
||||||
|
def __init__(self, hid=None, pattern=None):
|
||||||
|
self.hid = hid if hid else HidDevice.find()
|
||||||
|
if not self.hid:
|
||||||
|
print('No USB device found')
|
||||||
|
|
||||||
|
colors = [0] * 4 * self.PIXELS_N
|
||||||
|
colors[0] = 0x4
|
||||||
|
colors[1] = 0x40
|
||||||
|
colors[2] = 0x4
|
||||||
|
|
||||||
|
colors[4 + 1] = 0x8
|
||||||
|
colors[4 * 11 + 1] = 0x8
|
||||||
|
|
||||||
|
self.direction_template = colors
|
||||||
|
|
||||||
|
def set_brightness(self, brightness):
|
||||||
|
print('Not support to change brightness')
|
||||||
|
|
||||||
|
def change_pattern(self, pattern=None):
|
||||||
|
print('Not support to change pattern')
|
||||||
|
|
||||||
|
def off(self):
|
||||||
|
self.set_color(rgb=0)
|
||||||
|
|
||||||
|
def set_color(self, rgb=None, r=0, g=0, b=0):
|
||||||
|
if rgb:
|
||||||
|
self.write(0, [self.MONO, rgb & 0xFF, (rgb >> 8) & 0xFF, (rgb >> 16) & 0xFF])
|
||||||
|
else:
|
||||||
|
self.write(0, [self.MONO, b, g, r])
|
||||||
|
|
||||||
|
def think(self):
|
||||||
|
self.write(0, [self.THINK, 0, 0, 0])
|
||||||
|
|
||||||
|
wait = think
|
||||||
|
|
||||||
|
speak = think
|
||||||
|
|
||||||
|
def set_volume(self, pixels):
|
||||||
|
self.write(0, [self.VOLUME, 0, 0, pixels])
|
||||||
|
|
||||||
|
def wakeup(self, angle=0):
|
||||||
|
if angle < 0 or angle > 360:
|
||||||
|
return
|
||||||
|
|
||||||
|
position = int((angle + 15) % 360 / 30) % self.PIXELS_N
|
||||||
|
colors = self.direction_template[-position*4:] + self.direction_template[:-position*4]
|
||||||
|
|
||||||
|
self.write(0, [self.CUSTOM, 0, 0, 0])
|
||||||
|
self.write(3, colors)
|
||||||
|
|
||||||
|
return position
|
||||||
|
|
||||||
|
def listen(self, angle=0):
|
||||||
|
self.write(0, [self.MONO, 0, 0x10, 0])
|
||||||
|
|
||||||
|
def show(self, data):
|
||||||
|
self.write(0, [self.CUSTOM, 0, 0, 0])
|
||||||
|
self.write(3, data)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def to_bytearray(data):
|
||||||
|
if type(data) is int:
|
||||||
|
array = bytearray([data & 0xFF])
|
||||||
|
elif type(data) is bytearray:
|
||||||
|
array = data
|
||||||
|
elif type(data) is str or type(data) is bytes:
|
||||||
|
array = bytearray(data)
|
||||||
|
elif type(data) is list:
|
||||||
|
array = bytearray(data)
|
||||||
|
else:
|
||||||
|
raise TypeError('%s is not supported' % type(data))
|
||||||
|
|
||||||
|
return array
|
||||||
|
|
||||||
|
def write(self, address, data):
|
||||||
|
data = self.to_bytearray(data)
|
||||||
|
length = len(data)
|
||||||
|
if self.hid:
|
||||||
|
packet = bytearray([address & 0xFF, (address >> 8) & 0xFF, length & 0xFF, (length >> 8) & 0xFF]) + data
|
||||||
|
self.hid.write(packet)
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
if self.hid:
|
||||||
|
self.hid.close()
|
||||||
|
|
||||||
|
def __call__(self, data):
|
||||||
|
self.write(3, data)
|
||||||
|
|
||||||
|
|
||||||
|
def find():
|
||||||
|
hid = HidDevice.find()
|
||||||
|
|
||||||
|
if hid:
|
||||||
|
pixel_ring = UsbPixelRing(hid)
|
||||||
|
return pixel_ring
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
import time
|
||||||
|
|
||||||
|
pixel_ring = UsbPixelRing()
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
pixel_ring.wakeup(180)
|
||||||
|
time.sleep(3)
|
||||||
|
pixel_ring.listen()
|
||||||
|
time.sleep(3)
|
||||||
|
pixel_ring.think()
|
||||||
|
time.sleep(3)
|
||||||
|
pixel_ring.set_volume(8)
|
||||||
|
time.sleep(3)
|
||||||
|
pixel_ring.off()
|
||||||
|
time.sleep(3)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
break
|
||||||
|
|
||||||
|
pixel_ring.off()
|
||||||
|
|
@ -0,0 +1,122 @@
|
|||||||
|
|
||||||
|
import usb.core
|
||||||
|
import usb.util
|
||||||
|
|
||||||
|
|
||||||
|
class PixelRing:
|
||||||
|
TIMEOUT = 8000
|
||||||
|
|
||||||
|
def __init__(self, dev):
|
||||||
|
self.dev = dev
|
||||||
|
|
||||||
|
def trace(self):
|
||||||
|
self.write(0)
|
||||||
|
|
||||||
|
def mono(self, color):
|
||||||
|
self.write(1, [(color >> 16) & 0xFF, (color >> 8) & 0xFF, color & 0xFF, 0])
|
||||||
|
|
||||||
|
def set_color(self, rgb=None, r=0, g=0, b=0):
|
||||||
|
if rgb:
|
||||||
|
self.mono(rgb)
|
||||||
|
else:
|
||||||
|
self.write(1, [r, g, b, 0])
|
||||||
|
|
||||||
|
def off(self):
|
||||||
|
self.mono(0)
|
||||||
|
|
||||||
|
def listen(self, direction=None):
|
||||||
|
self.write(2)
|
||||||
|
|
||||||
|
wakeup = listen
|
||||||
|
|
||||||
|
def speak(self):
|
||||||
|
self.write(3)
|
||||||
|
|
||||||
|
def think(self):
|
||||||
|
self.write(4)
|
||||||
|
|
||||||
|
wait = think
|
||||||
|
|
||||||
|
def spin(self):
|
||||||
|
self.write(5)
|
||||||
|
|
||||||
|
def show(self, data):
|
||||||
|
self.write(6, data)
|
||||||
|
|
||||||
|
customize = show
|
||||||
|
|
||||||
|
def set_brightness(self, brightness):
|
||||||
|
self.write(0x20, [brightness])
|
||||||
|
|
||||||
|
def set_color_palette(self, a, b):
|
||||||
|
self.write(0x21, [(a >> 16) & 0xFF, (a >> 8) & 0xFF, a & 0xFF, 0, (b >> 16) & 0xFF, (b >> 8) & 0xFF, b & 0xFF, 0])
|
||||||
|
|
||||||
|
def set_vad_led(self, state):
|
||||||
|
self.write(0x22, [state])
|
||||||
|
|
||||||
|
def set_volume(self, volume):
|
||||||
|
self.write(0x23, [volume])
|
||||||
|
|
||||||
|
def change_pattern(self, pattern):
|
||||||
|
if pattern == 'echo':
|
||||||
|
self.write(0x24, [1])
|
||||||
|
else:
|
||||||
|
self.write(0x24, [0])
|
||||||
|
|
||||||
|
def write(self, cmd, data=[0]):
|
||||||
|
self.dev.ctrl_transfer(
|
||||||
|
usb.util.CTRL_OUT | usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_RECIPIENT_DEVICE,
|
||||||
|
0, cmd, 0x1C, data, self.TIMEOUT)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def version(self):
|
||||||
|
return self.dev.ctrl_transfer(
|
||||||
|
usb.util.CTRL_IN | usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_RECIPIENT_DEVICE,
|
||||||
|
0, 0x80 | 0x40, 0x1C, 24, self.TIMEOUT).tostring()
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
"""
|
||||||
|
close the interface
|
||||||
|
"""
|
||||||
|
usb.util.dispose_resources(self.dev)
|
||||||
|
|
||||||
|
|
||||||
|
def find(vid=0x2886, pid=0x0018):
|
||||||
|
dev = usb.core.find(idVendor=vid, idProduct=pid)
|
||||||
|
if not dev:
|
||||||
|
return
|
||||||
|
|
||||||
|
# configuration = dev.get_active_configuration()
|
||||||
|
|
||||||
|
# interface_number = None
|
||||||
|
# for interface in configuration:
|
||||||
|
# interface_number = interface.bInterfaceNumber
|
||||||
|
|
||||||
|
# if dev.is_kernel_driver_active(interface_number):
|
||||||
|
# dev.detach_kernel_driver(interface_number)
|
||||||
|
|
||||||
|
return PixelRing(dev)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
import time
|
||||||
|
|
||||||
|
pixel_ring = find()
|
||||||
|
print(pixel_ring.version)
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
pixel_ring.wakeup(180)
|
||||||
|
time.sleep(3)
|
||||||
|
pixel_ring.listen()
|
||||||
|
time.sleep(3)
|
||||||
|
pixel_ring.think()
|
||||||
|
time.sleep(3)
|
||||||
|
pixel_ring.set_volume(8)
|
||||||
|
time.sleep(3)
|
||||||
|
pixel_ring.off()
|
||||||
|
time.sleep(3)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
break
|
||||||
|
|
||||||
|
pixel_ring.off()
|
Loading…
Reference in New Issue