diff --git a/pixel_ring/__init__.py b/pixel_ring/__init__.py new file mode 100755 index 0000000..fedbbb1 --- /dev/null +++ b/pixel_ring/__init__.py @@ -0,0 +1,52 @@ + + +from . import usb_pixel_ring_v1 +from . import usb_pixel_ring_v2 +from .apa102_pixel_ring import PixelRing + +pixel_ring = usb_pixel_ring_v2.find() + +if not pixel_ring: + pixel_ring = usb_pixel_ring_v1.find() + +if not pixel_ring: + pixel_ring = PixelRing() + + +USAGE = ''' +If the hardware is ReSpeaker 4 Mic Array for Pi or ReSpeaker V2, +there is a power-enable pin which should be enabled at first. ++ ReSpeaker 4 Mic Array for Pi: + + import gpiozero + power = LED(5) + power.on() + ++ ReSpeaker V2: + + import mraa + power = mraa.Gpio(12) + power.dir(mraa.DIR_OUT) + power.write(0) +''' + +def main(): + import time + + if isinstance(pixel_ring, usb_pixel_ring_v2.PixelRing): + print('Found ReSpeaker USB 4 Mic Array') + elif isinstance(pixel_ring, usb_pixel_ring_v1.UsbPixelRing): + print('Found ReSpeaker USB 6+1 Mic Array') + else: + print('Control APA102 RGB LEDs via SPI') + print(USAGE) + + pixel_ring.think() + time.sleep(3) + pixel_ring.off() + time.sleep(1) + + +if __name__ == '__main__': + main() + diff --git a/pixel_ring/apa102.py b/pixel_ring/apa102.py new file mode 100755 index 0000000..e5918b9 --- /dev/null +++ b/pixel_ring/apa102.py @@ -0,0 +1,243 @@ +""" +from https://github.com/tinue/APA102_Pi +This is the main driver module for APA102 LEDs +""" +import spidev +from math import ceil + +RGB_MAP = { 'rgb': [3, 2, 1], 'rbg': [3, 1, 2], 'grb': [2, 3, 1], + 'gbr': [2, 1, 3], 'brg': [1, 3, 2], 'bgr': [1, 2, 3] } + +class APA102: + """ + Driver for APA102 LEDS (aka "DotStar"). + + (c) Martin Erzberger 2016-2017 + + My very first Python code, so I am sure there is a lot to be optimized ;) + + Public methods are: + - set_pixel + - set_pixel_rgb + - show + - clear_strip + - cleanup + + Helper methods for color manipulation are: + - combine_color + - wheel + + The rest of the methods are used internally and should not be used by the + user of the library. + + Very brief overview of APA102: An APA102 LED is addressed with SPI. The bits + are shifted in one by one, starting with the least significant bit. + + An LED usually just forwards everything that is sent to its data-in to + data-out. While doing this, it remembers its own color and keeps glowing + with that color as long as there is power. + + An LED can be switched to not forward the data, but instead use the data + to change it's own color. This is done by sending (at least) 32 bits of + zeroes to data-in. The LED then accepts the next correct 32 bit LED + frame (with color information) as its new color setting. + + After having received the 32 bit color frame, the LED changes color, + and then resumes to just copying data-in to data-out. + + The really clever bit is this: While receiving the 32 bit LED frame, + the LED sends zeroes on its data-out line. Because a color frame is + 32 bits, the LED sends 32 bits of zeroes to the next LED. + As we have seen above, this means that the next LED is now ready + to accept a color frame and update its color. + + So that's really the entire protocol: + - Start by sending 32 bits of zeroes. This prepares LED 1 to update + its color. + - Send color information one by one, starting with the color for LED 1, + then LED 2 etc. + - Finish off by cycling the clock line a few times to get all data + to the very last LED on the strip + + The last step is necessary, because each LED delays forwarding the data + a bit. Imagine ten people in a row. When you yell the last color + information, i.e. the one for person ten, to the first person in + the line, then you are not finished yet. Person one has to turn around + and yell it to person 2, and so on. So it takes ten additional "dummy" + cycles until person ten knows the color. When you look closer, + you will see that not even person 9 knows its own color yet. This + information is still with person 2. Essentially the driver sends additional + zeroes to LED 1 as long as it takes for the last color frame to make it + down the line to the last LED. + """ + # Constants + MAX_BRIGHTNESS = 0b11111 # Safeguard: Set to a value appropriate for your setup + LED_START = 0b11100000 # Three "1" bits, followed by 5 brightness bits + + def __init__(self, num_led, global_brightness=MAX_BRIGHTNESS, + order='rgb', bus=0, device=1, max_speed_hz=8000000): + self.num_led = num_led # The number of LEDs in the Strip + order = order.lower() + self.rgb = RGB_MAP.get(order, RGB_MAP['rgb']) + # Limit the brightness to the maximum if it's set higher + if global_brightness > self.MAX_BRIGHTNESS: + self.global_brightness = self.MAX_BRIGHTNESS + else: + self.global_brightness = global_brightness + + self.leds = [self.LED_START,0,0,0] * self.num_led # Pixel buffer + self.spi = spidev.SpiDev() # Init the SPI device + self.spi.open(bus, device) # Open SPI port 0, slave device (CS) 1 + # Up the speed a bit, so that the LEDs are painted faster + if max_speed_hz: + self.spi.max_speed_hz = max_speed_hz + + def clock_start_frame(self): + """Sends a start frame to the LED strip. + + This method clocks out a start frame, telling the receiving LED + that it must update its own color now. + """ + self.spi.xfer2([0] * 4) # Start frame, 32 zero bits + + + def clock_end_frame(self): + """Sends an end frame to the LED strip. + + As explained above, dummy data must be sent after the last real colour + information so that all of the data can reach its destination down the line. + The delay is not as bad as with the human example above. + It is only 1/2 bit per LED. This is because the SPI clock line + needs to be inverted. + + Say a bit is ready on the SPI data line. The sender communicates + this by toggling the clock line. The bit is read by the LED + and immediately forwarded to the output data line. When the clock goes + down again on the input side, the LED will toggle the clock up + on the output to tell the next LED that the bit is ready. + + After one LED the clock is inverted, and after two LEDs it is in sync + again, but one cycle behind. Therefore, for every two LEDs, one bit + of delay gets accumulated. For 300 LEDs, 150 additional bits must be fed to + the input of LED one so that the data can reach the last LED. + + Ultimately, we need to send additional numLEDs/2 arbitrary data bits, + in order to trigger numLEDs/2 additional clock changes. This driver + sends zeroes, which has the benefit of getting LED one partially or + fully ready for the next update to the strip. An optimized version + of the driver could omit the "clockStartFrame" method if enough zeroes have + been sent as part of "clockEndFrame". + """ + + self.spi.xfer2([0xFF] * 4) + + # Round up num_led/2 bits (or num_led/16 bytes) + #for _ in range((self.num_led + 15) // 16): + # self.spi.xfer2([0x00]) + + + def clear_strip(self): + """ Turns off the strip and shows the result right away.""" + + for led in range(self.num_led): + self.set_pixel(led, 0, 0, 0) + self.show() + + + def set_pixel(self, led_num, red, green, blue, bright_percent=100): + """Sets the color of one pixel in the LED stripe. + + The changed pixel is not shown yet on the Stripe, it is only + written to the pixel buffer. Colors are passed individually. + If brightness is not set the global brightness setting is used. + """ + if led_num < 0: + return # Pixel is invisible, so ignore + if led_num >= self.num_led: + return # again, invisible + + # Calculate pixel brightness as a percentage of the + # defined global_brightness. Round up to nearest integer + # as we expect some brightness unless set to 0 + brightness = int(ceil(bright_percent*self.global_brightness/100.0)) + + # LED startframe is three "1" bits, followed by 5 brightness bits + ledstart = (brightness & 0b00011111) | self.LED_START + + start_index = 4 * led_num + self.leds[start_index] = ledstart + self.leds[start_index + self.rgb[0]] = red + self.leds[start_index + self.rgb[1]] = green + self.leds[start_index + self.rgb[2]] = blue + + + def set_pixel_rgb(self, led_num, rgb_color, bright_percent=100): + """Sets the color of one pixel in the LED stripe. + + The changed pixel is not shown yet on the Stripe, it is only + written to the pixel buffer. + Colors are passed combined (3 bytes concatenated) + If brightness is not set the global brightness setting is used. + """ + self.set_pixel(led_num, (rgb_color & 0xFF0000) >> 16, + (rgb_color & 0x00FF00) >> 8, rgb_color & 0x0000FF, + bright_percent) + + + def rotate(self, positions=1): + """ Rotate the LEDs by the specified number of positions. + + Treating the internal LED array as a circular buffer, rotate it by + the specified number of positions. The number could be negative, + which means rotating in the opposite direction. + """ + cutoff = 4 * (positions % self.num_led) + self.leds = self.leds[cutoff:] + self.leds[:cutoff] + + + def show(self): + """Sends the content of the pixel buffer to the strip. + + Todo: More than 1024 LEDs requires more than one xfer operation. + """ + self.clock_start_frame() + # xfer2 kills the list, unfortunately. So it must be copied first + # SPI takes up to 4096 Integers. So we are fine for up to 1024 LEDs. + data = list(self.leds) + while data: + self.spi.xfer2(data[:32]) + data = data[32:] + self.clock_end_frame() + + + def cleanup(self): + """Release the SPI device; Call this method at the end""" + + self.spi.close() # Close SPI port + + @staticmethod + def combine_color(red, green, blue): + """Make one 3*8 byte color value.""" + + return (red << 16) + (green << 8) + blue + + + def wheel(self, wheel_pos): + """Get a color from a color wheel; Green -> Red -> Blue -> Green""" + + if wheel_pos > 255: + wheel_pos = 255 # Safeguard + if wheel_pos < 85: # Green -> Red + return self.combine_color(wheel_pos * 3, 255 - wheel_pos * 3, 0) + if wheel_pos < 170: # Red -> Blue + wheel_pos -= 85 + return self.combine_color(255 - wheel_pos * 3, 0, wheel_pos * 3) + # Blue -> Green + wheel_pos -= 170 + return self.combine_color(0, wheel_pos * 3, 255 - wheel_pos * 3) + + + def dump_array(self): + """For debug purposes: Dump the LED array onto the console.""" + + print(self.leds) diff --git a/pixel_ring/apa102_pixel_ring.py b/pixel_ring/apa102_pixel_ring.py new file mode 100755 index 0000000..8690ba2 --- /dev/null +++ b/pixel_ring/apa102_pixel_ring.py @@ -0,0 +1,105 @@ + +import time +import threading +try: + import queue as Queue +except ImportError: + import Queue as Queue + +from .apa102 import APA102 +from .pattern import Echo, GoogleHome + + +class PixelRing(object): + PIXELS_N = 12 + + def __init__(self, pattern='google'): + if pattern == 'echo': + self.pattern = Echo(show=self.show) + else: + self.pattern = GoogleHome(show=self.show) + + self.dev = APA102(num_led=self.PIXELS_N) + + self.queue = Queue.Queue() + self.thread = threading.Thread(target=self._run) + self.thread.daemon = True + self.thread.start() + self.off() + + def set_brightness(self, brightness): + if brightness > 100: + brightness = 100 + + if brightness > 0: + self.dev.global_brightness = int(0b11111 * brightness / 100) + + def change_pattern(self, pattern): + if pattern == 'echo': + self.pattern = Echo(show=self.show) + else: + self.pattern = GoogleHome(show=self.show) + + def wakeup(self, direction=0): + def f(): + self.pattern.wakeup(direction) + + self.put(f) + + def listen(self): + self.put(self.pattern.listen) + + def think(self): + self.put(self.pattern.think) + + wait = think + + def speak(self): + self.put(self.pattern.speak) + + def off(self): + self.put(self.pattern.off) + + def put(self, func): + self.pattern.stop = True + self.queue.put(func) + + def _run(self): + while True: + func = self.queue.get() + self.pattern.stop = False + func() + + def show(self, data): + for i in range(self.PIXELS_N): + self.dev.set_pixel(i, int(data[4*i + 1]), int(data[4*i + 2]), int(data[4*i + 3])) + + self.dev.show() + + def set_color(self, rgb=None, r=0, g=0, b=0): + if rgb: + r, g, b = (rgb >> 16) & 0xFF, (rgb >> 8) & 0xFF, rgb & 0xFF + for i in range(self.PIXELS_N): + self.dev.set_pixel(i, r, g, b) + + self.dev.show() + + +if __name__ == '__main__': + pixel_ring = PixelRing() + while True: + try: + pixel_ring.wakeup() + time.sleep(3) + pixel_ring.think() + time.sleep(3) + pixel_ring.speak() + time.sleep(6) + pixel_ring.off() + time.sleep(3) + except KeyboardInterrupt: + break + + + pixel_ring.off() + time.sleep(1) diff --git a/pixel_ring/pattern.py b/pixel_ring/pattern.py new file mode 100755 index 0000000..054910f --- /dev/null +++ b/pixel_ring/pattern.py @@ -0,0 +1,145 @@ +""" +LED pattern like Echo +""" + +import time + + +class Echo(object): + brightness = 24 * 8 + + def __init__(self, show, number=12): + self.pixels_number = number + self.pixels = [0] * 4 * number + + if not callable(show): + raise ValueError('show parameter is not callable') + + self.show = show + self.stop = False + + def wakeup(self, direction=0): + position = int((direction + 15) / (360 / self.pixels_number)) % self.pixels_number + + pixels = [0, 0, 0, self.brightness] * self.pixels_number + pixels[position * 4 + 2] = self.brightness + + self.show(pixels) + + def listen(self): + pixels = [0, 0, 0, self.brightness] * self.pixels_number + + self.show(pixels) + + def think(self): + half_brightness = int(self.brightness / 2) + pixels = [0, 0, half_brightness, half_brightness, 0, 0, 0, self.brightness] * self.pixels_number + + while not self.stop: + self.show(pixels) + time.sleep(0.2) + pixels = pixels[-4:] + pixels[:-4] + + def speak(self): + step = int(self.brightness / 12) + position = int(self.brightness / 2) + while not self.stop: + pixels = [0, 0, position, self.brightness - position] * self.pixels_number + self.show(pixels) + time.sleep(0.01) + if position <= 0: + step = int(self.brightness / 12) + time.sleep(0.4) + elif position >= int(self.brightness / 2): + step = - int(self.brightness / 12) + time.sleep(0.4) + + position += step + + def off(self): + self.show([0] * 4 * 12) + +class GoogleHome(object): + def __init__(self, show): + self.basis = [0] * 4 * 12 + self.basis[0 * 4 + 1] = 8 + self.basis[3 * 4 + 1] = 4 + self.basis[3 * 4 + 2] = 4 + self.basis[6 * 4 + 2] = 8 + self.basis[9 * 4 + 3] = 8 + + self.pixels = self.basis + + if not callable(show): + raise ValueError('show parameter is not callable') + + self.show = show + self.stop = False + + def wakeup(self, direction=0): + position = int((direction + 90 + 15) / 30) % 12 + + basis = self.basis[position*-4:] + self.basis[:position*-4] + + pixels = [v * 25 for v in basis] + self.show(pixels) + time.sleep(0.1) + + pixels = pixels[-4:] + pixels[:-4] + self.show(pixels) + time.sleep(0.1) + + for i in range(2): + new_pixels = pixels[-4:] + pixels[:-4] + + self.show([v/2+pixels[index] for index, v in enumerate(new_pixels)]) + pixels = new_pixels + time.sleep(0.1) + + self.show(pixels) + self.pixels = pixels + + def listen(self): + pixels = self.pixels + for i in range(1, 25): + self.show([(v * i / 24) for v in pixels]) + time.sleep(0.01) + + def think(self): + pixels = self.pixels + + while not self.stop: + pixels = pixels[-4:] + pixels[:-4] + self.show(pixels) + time.sleep(0.2) + + t = 0.1 + for i in range(0, 5): + pixels = pixels[-4:] + pixels[:-4] + self.show([(v * (4 - i) / 4) for v in pixels]) + time.sleep(t) + t /= 2 + + self.pixels = pixels + + def speak(self): + pixels = self.pixels + step = 1 + brightness = 5 + while not self.stop: + self.show([(v * brightness / 24) for v in pixels]) + time.sleep(0.02) + + if brightness <= 5: + step = 1 + time.sleep(0.4) + elif brightness >= 24: + step = -1 + time.sleep(0.4) + + brightness += step + + def off(self): + self.show([0] * 4 * 12) + + diff --git a/pixel_ring/pixel_ring.py b/pixel_ring/pixel_ring.py new file mode 100755 index 0000000..0f68f68 --- /dev/null +++ b/pixel_ring/pixel_ring.py @@ -0,0 +1,26 @@ + + +class PixelRing(object): + def __init__(self): + pass + + def show(self, data): + pass + + def set_color(self, rgb=None, r=0, g=0, b=0): + pass + + def wakeup(self, angle=None): + pass + + def listen(self): + pass + + def think(self): + pass + + def speak(self): + pass + + def off(self): + pass diff --git a/pixel_ring/usb_pixel_ring_v1.py b/pixel_ring/usb_pixel_ring_v1.py new file mode 100755 index 0000000..343d758 --- /dev/null +++ b/pixel_ring/usb_pixel_ring_v1.py @@ -0,0 +1,190 @@ + +import usb.core +import usb.util + + +class HidDevice: + """ + This class provides basic functions to access + a USB HID device to write an endpoint + """ + + def __init__(self, dev, ep_in, ep_out): + self.dev = dev + self.ep_in = ep_in + self.ep_out = ep_out + + def write(self, data): + """ + write data on the OUT endpoint associated to the HID interface + """ + self.ep_out.write(data) + + def read(self): + return self.ep_in.read(self.ep_in.wMaxPacketSize, -1) + + def close(self): + """ + close the interface + """ + usb.util.dispose_resources(self.dev) + + @staticmethod + def find(vid=0x2886, pid=0x0007): + dev = usb.core.find(idVendor=vid, idProduct=pid) + if not dev: + return + + config = dev.get_active_configuration() + + # Iterate on all interfaces to find a HID interface + ep_in, ep_out = None, None + for interface in config: + if interface.bInterfaceClass == 0x03: + try: + if dev.is_kernel_driver_active(interface.bInterfaceNumber): + dev.detach_kernel_driver(interface.bInterfaceNumber) + except Exception as e: + print(e.message) + + for ep in interface: + if ep.bEndpointAddress & 0x80: + ep_in = ep + else: + ep_out = ep + break + + + + if ep_in and ep_out: + hid = HidDevice(dev, ep_in, ep_out) + + return hid + + +class UsbPixelRing: + PIXELS_N = 12 + + MONO = 1 + THINK = 3 + VOLUME = 5 + CUSTOM = 6 + + def __init__(self, hid=None, pattern=None): + self.hid = hid if hid else HidDevice.find() + if not self.hid: + print('No USB device found') + + colors = [0] * 4 * self.PIXELS_N + colors[0] = 0x4 + colors[1] = 0x40 + colors[2] = 0x4 + + colors[4 + 1] = 0x8 + colors[4 * 11 + 1] = 0x8 + + self.direction_template = colors + + def set_brightness(self, brightness): + print('Not support to change brightness') + + def change_pattern(self, pattern=None): + print('Not support to change pattern') + + def off(self): + self.set_color(rgb=0) + + def set_color(self, rgb=None, r=0, g=0, b=0): + if rgb: + self.write(0, [self.MONO, rgb & 0xFF, (rgb >> 8) & 0xFF, (rgb >> 16) & 0xFF]) + else: + self.write(0, [self.MONO, b, g, r]) + + def think(self): + self.write(0, [self.THINK, 0, 0, 0]) + + wait = think + + speak = think + + def set_volume(self, pixels): + self.write(0, [self.VOLUME, 0, 0, pixels]) + + def wakeup(self, angle=0): + if angle < 0 or angle > 360: + return + + position = int((angle + 15) % 360 / 30) % self.PIXELS_N + colors = self.direction_template[-position*4:] + self.direction_template[:-position*4] + + self.write(0, [self.CUSTOM, 0, 0, 0]) + self.write(3, colors) + + return position + + def listen(self, angle=0): + self.write(0, [self.MONO, 0, 0x10, 0]) + + def show(self, data): + self.write(0, [self.CUSTOM, 0, 0, 0]) + self.write(3, data) + + @staticmethod + def to_bytearray(data): + if type(data) is int: + array = bytearray([data & 0xFF]) + elif type(data) is bytearray: + array = data + elif type(data) is str or type(data) is bytes: + array = bytearray(data) + elif type(data) is list: + array = bytearray(data) + else: + raise TypeError('%s is not supported' % type(data)) + + return array + + def write(self, address, data): + data = self.to_bytearray(data) + length = len(data) + if self.hid: + packet = bytearray([address & 0xFF, (address >> 8) & 0xFF, length & 0xFF, (length >> 8) & 0xFF]) + data + self.hid.write(packet) + + def close(self): + if self.hid: + self.hid.close() + + def __call__(self, data): + self.write(3, data) + + +def find(): + hid = HidDevice.find() + + if hid: + pixel_ring = UsbPixelRing(hid) + return pixel_ring + + +if __name__ == '__main__': + import time + + pixel_ring = UsbPixelRing() + while True: + try: + pixel_ring.wakeup(180) + time.sleep(3) + pixel_ring.listen() + time.sleep(3) + pixel_ring.think() + time.sleep(3) + pixel_ring.set_volume(8) + time.sleep(3) + pixel_ring.off() + time.sleep(3) + except KeyboardInterrupt: + break + + pixel_ring.off() + diff --git a/pixel_ring/usb_pixel_ring_v2.py b/pixel_ring/usb_pixel_ring_v2.py new file mode 100755 index 0000000..b7a07a2 --- /dev/null +++ b/pixel_ring/usb_pixel_ring_v2.py @@ -0,0 +1,122 @@ + +import usb.core +import usb.util + + +class PixelRing: + TIMEOUT = 8000 + + def __init__(self, dev): + self.dev = dev + + def trace(self): + self.write(0) + + def mono(self, color): + self.write(1, [(color >> 16) & 0xFF, (color >> 8) & 0xFF, color & 0xFF, 0]) + + def set_color(self, rgb=None, r=0, g=0, b=0): + if rgb: + self.mono(rgb) + else: + self.write(1, [r, g, b, 0]) + + def off(self): + self.mono(0) + + def listen(self, direction=None): + self.write(2) + + wakeup = listen + + def speak(self): + self.write(3) + + def think(self): + self.write(4) + + wait = think + + def spin(self): + self.write(5) + + def show(self, data): + self.write(6, data) + + customize = show + + def set_brightness(self, brightness): + self.write(0x20, [brightness]) + + def set_color_palette(self, a, b): + self.write(0x21, [(a >> 16) & 0xFF, (a >> 8) & 0xFF, a & 0xFF, 0, (b >> 16) & 0xFF, (b >> 8) & 0xFF, b & 0xFF, 0]) + + def set_vad_led(self, state): + self.write(0x22, [state]) + + def set_volume(self, volume): + self.write(0x23, [volume]) + + def change_pattern(self, pattern): + if pattern == 'echo': + self.write(0x24, [1]) + else: + self.write(0x24, [0]) + + def write(self, cmd, data=[0]): + self.dev.ctrl_transfer( + usb.util.CTRL_OUT | usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_RECIPIENT_DEVICE, + 0, cmd, 0x1C, data, self.TIMEOUT) + + @property + def version(self): + return self.dev.ctrl_transfer( + usb.util.CTRL_IN | usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_RECIPIENT_DEVICE, + 0, 0x80 | 0x40, 0x1C, 24, self.TIMEOUT).tostring() + + def close(self): + """ + close the interface + """ + usb.util.dispose_resources(self.dev) + + +def find(vid=0x2886, pid=0x0018): + dev = usb.core.find(idVendor=vid, idProduct=pid) + if not dev: + return + + # configuration = dev.get_active_configuration() + + # interface_number = None + # for interface in configuration: + # interface_number = interface.bInterfaceNumber + + # if dev.is_kernel_driver_active(interface_number): + # dev.detach_kernel_driver(interface_number) + + return PixelRing(dev) + + + +if __name__ == '__main__': + import time + + pixel_ring = find() + print(pixel_ring.version) + while True: + try: + pixel_ring.wakeup(180) + time.sleep(3) + pixel_ring.listen() + time.sleep(3) + pixel_ring.think() + time.sleep(3) + pixel_ring.set_volume(8) + time.sleep(3) + pixel_ring.off() + time.sleep(3) + except KeyboardInterrupt: + break + + pixel_ring.off() \ No newline at end of file