merge with changes made on mac pro
commit
a226c11707
@ -1 +0,0 @@
|
||||
Subproject commit 30de55966fdf0d0f1ee6e02cf356c56ba76b577b
|
@ -0,0 +1,26 @@
|
||||
Pixel Ring
|
||||
==========
|
||||
|
||||
[![Build Status](https://travis-ci.org/respeaker/pixel_ring.svg?branch=master)](https://travis-ci.org/respeaker/pixel_ring)
|
||||
[![Pypi](https://img.shields.io/pypi/v/pixel_ring.svg)](https://pypi.python.org/pypi/pixel_ring)
|
||||
|
||||
|
||||
The library is for pixel ring based on APA102, ReSpeaker series pixel ring.
|
||||
|
||||
## Hardware
|
||||
+ ReSpeaker 4 Mic Array or ReSpeaker V2
|
||||
+ ReSpeaker V2
|
||||
+ ReSpeaker USB 6+1 Mic Array
|
||||
+ ReSpeaker USB 4 Mic Array
|
||||
|
||||
## Get started
|
||||
```
|
||||
git clone --depth 1 https://github.com/respeaker/pixel_ring.git
|
||||
cd pixel_ring
|
||||
pip install -U -e .
|
||||
python examples/respeaker_4mic_array.py
|
||||
```
|
||||
|
||||
|
||||
## Credits
|
||||
+ [APA102_Pi](https://github.com/tinue/APA102_Pi)
|
@ -0,0 +1,52 @@
|
||||
|
||||
|
||||
from . import usb_pixel_ring_v1
|
||||
from . import usb_pixel_ring_v2
|
||||
from .apa102_pixel_ring import PixelRing
|
||||
|
||||
pixel_ring = usb_pixel_ring_v2.find()
|
||||
|
||||
if not pixel_ring:
|
||||
pixel_ring = usb_pixel_ring_v1.find()
|
||||
|
||||
if not pixel_ring:
|
||||
pixel_ring = PixelRing()
|
||||
|
||||
|
||||
USAGE = '''
|
||||
If the hardware is ReSpeaker 4 Mic Array for Pi or ReSpeaker V2,
|
||||
there is a power-enable pin which should be enabled at first.
|
||||
+ ReSpeaker 4 Mic Array for Pi:
|
||||
|
||||
import gpiozero
|
||||
power = LED(5)
|
||||
power.on()
|
||||
|
||||
+ ReSpeaker V2:
|
||||
|
||||
import mraa
|
||||
power = mraa.Gpio(12)
|
||||
power.dir(mraa.DIR_OUT)
|
||||
power.write(0)
|
||||
'''
|
||||
|
||||
def main():
|
||||
import time
|
||||
|
||||
if isinstance(pixel_ring, usb_pixel_ring_v2.PixelRing):
|
||||
print('Found ReSpeaker USB 4 Mic Array')
|
||||
elif isinstance(pixel_ring, usb_pixel_ring_v1.UsbPixelRing):
|
||||
print('Found ReSpeaker USB 6+1 Mic Array')
|
||||
else:
|
||||
print('Control APA102 RGB LEDs via SPI')
|
||||
print(USAGE)
|
||||
|
||||
pixel_ring.think()
|
||||
time.sleep(3)
|
||||
pixel_ring.off()
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
@ -0,0 +1,243 @@
|
||||
"""
|
||||
from https://github.com/tinue/APA102_Pi
|
||||
This is the main driver module for APA102 LEDs
|
||||
"""
|
||||
import spidev
|
||||
from math import ceil
|
||||
|
||||
RGB_MAP = { 'rgb': [3, 2, 1], 'rbg': [3, 1, 2], 'grb': [2, 3, 1],
|
||||
'gbr': [2, 1, 3], 'brg': [1, 3, 2], 'bgr': [1, 2, 3] }
|
||||
|
||||
class APA102:
|
||||
"""
|
||||
Driver for APA102 LEDS (aka "DotStar").
|
||||
|
||||
(c) Martin Erzberger 2016-2017
|
||||
|
||||
My very first Python code, so I am sure there is a lot to be optimized ;)
|
||||
|
||||
Public methods are:
|
||||
- set_pixel
|
||||
- set_pixel_rgb
|
||||
- show
|
||||
- clear_strip
|
||||
- cleanup
|
||||
|
||||
Helper methods for color manipulation are:
|
||||
- combine_color
|
||||
- wheel
|
||||
|
||||
The rest of the methods are used internally and should not be used by the
|
||||
user of the library.
|
||||
|
||||
Very brief overview of APA102: An APA102 LED is addressed with SPI. The bits
|
||||
are shifted in one by one, starting with the least significant bit.
|
||||
|
||||
An LED usually just forwards everything that is sent to its data-in to
|
||||
data-out. While doing this, it remembers its own color and keeps glowing
|
||||
with that color as long as there is power.
|
||||
|
||||
An LED can be switched to not forward the data, but instead use the data
|
||||
to change it's own color. This is done by sending (at least) 32 bits of
|
||||
zeroes to data-in. The LED then accepts the next correct 32 bit LED
|
||||
frame (with color information) as its new color setting.
|
||||
|
||||
After having received the 32 bit color frame, the LED changes color,
|
||||
and then resumes to just copying data-in to data-out.
|
||||
|
||||
The really clever bit is this: While receiving the 32 bit LED frame,
|
||||
the LED sends zeroes on its data-out line. Because a color frame is
|
||||
32 bits, the LED sends 32 bits of zeroes to the next LED.
|
||||
As we have seen above, this means that the next LED is now ready
|
||||
to accept a color frame and update its color.
|
||||
|
||||
So that's really the entire protocol:
|
||||
- Start by sending 32 bits of zeroes. This prepares LED 1 to update
|
||||
its color.
|
||||
- Send color information one by one, starting with the color for LED 1,
|
||||
then LED 2 etc.
|
||||
- Finish off by cycling the clock line a few times to get all data
|
||||
to the very last LED on the strip
|
||||
|
||||
The last step is necessary, because each LED delays forwarding the data
|
||||
a bit. Imagine ten people in a row. When you yell the last color
|
||||
information, i.e. the one for person ten, to the first person in
|
||||
the line, then you are not finished yet. Person one has to turn around
|
||||
and yell it to person 2, and so on. So it takes ten additional "dummy"
|
||||
cycles until person ten knows the color. When you look closer,
|
||||
you will see that not even person 9 knows its own color yet. This
|
||||
information is still with person 2. Essentially the driver sends additional
|
||||
zeroes to LED 1 as long as it takes for the last color frame to make it
|
||||
down the line to the last LED.
|
||||
"""
|
||||
# Constants
|
||||
MAX_BRIGHTNESS = 0b11111 # Safeguard: Set to a value appropriate for your setup
|
||||
LED_START = 0b11100000 # Three "1" bits, followed by 5 brightness bits
|
||||
|
||||
def __init__(self, num_led, global_brightness=MAX_BRIGHTNESS,
|
||||
order='rgb', bus=0, device=1, max_speed_hz=8000000):
|
||||
self.num_led = num_led # The number of LEDs in the Strip
|
||||
order = order.lower()
|
||||
self.rgb = RGB_MAP.get(order, RGB_MAP['rgb'])
|
||||
# Limit the brightness to the maximum if it's set higher
|
||||
if global_brightness > self.MAX_BRIGHTNESS:
|
||||
self.global_brightness = self.MAX_BRIGHTNESS
|
||||
else:
|
||||
self.global_brightness = global_brightness
|
||||
|
||||
self.leds = [self.LED_START,0,0,0] * self.num_led # Pixel buffer
|
||||
self.spi = spidev.SpiDev() # Init the SPI device
|
||||
self.spi.open(bus, device) # Open SPI port 0, slave device (CS) 1
|
||||
# Up the speed a bit, so that the LEDs are painted faster
|
||||
if max_speed_hz:
|
||||
self.spi.max_speed_hz = max_speed_hz
|
||||
|
||||
def clock_start_frame(self):
|
||||
"""Sends a start frame to the LED strip.
|
||||
|
||||
This method clocks out a start frame, telling the receiving LED
|
||||
that it must update its own color now.
|
||||
"""
|
||||
self.spi.xfer2([0] * 4) # Start frame, 32 zero bits
|
||||
|
||||
|
||||
def clock_end_frame(self):
|
||||
"""Sends an end frame to the LED strip.
|
||||
|
||||
As explained above, dummy data must be sent after the last real colour
|
||||
information so that all of the data can reach its destination down the line.
|
||||
The delay is not as bad as with the human example above.
|
||||
It is only 1/2 bit per LED. This is because the SPI clock line
|
||||
needs to be inverted.
|
||||
|
||||
Say a bit is ready on the SPI data line. The sender communicates
|
||||
this by toggling the clock line. The bit is read by the LED
|
||||
and immediately forwarded to the output data line. When the clock goes
|
||||
down again on the input side, the LED will toggle the clock up
|
||||
on the output to tell the next LED that the bit is ready.
|
||||
|
||||
After one LED the clock is inverted, and after two LEDs it is in sync
|
||||
again, but one cycle behind. Therefore, for every two LEDs, one bit
|
||||
of delay gets accumulated. For 300 LEDs, 150 additional bits must be fed to
|
||||
the input of LED one so that the data can reach the last LED.
|
||||
|
||||
Ultimately, we need to send additional numLEDs/2 arbitrary data bits,
|
||||
in order to trigger numLEDs/2 additional clock changes. This driver
|
||||
sends zeroes, which has the benefit of getting LED one partially or
|
||||
fully ready for the next update to the strip. An optimized version
|
||||
of the driver could omit the "clockStartFrame" method if enough zeroes have
|
||||
been sent as part of "clockEndFrame".
|
||||
"""
|
||||
|
||||
self.spi.xfer2([0xFF] * 4)
|
||||
|
||||
# Round up num_led/2 bits (or num_led/16 bytes)
|
||||
#for _ in range((self.num_led + 15) // 16):
|
||||
# self.spi.xfer2([0x00])
|
||||
|
||||
|
||||
def clear_strip(self):
|
||||
""" Turns off the strip and shows the result right away."""
|
||||
|
||||
for led in range(self.num_led):
|
||||
self.set_pixel(led, 0, 0, 0)
|
||||
self.show()
|
||||
|
||||
|
||||
def set_pixel(self, led_num, red, green, blue, bright_percent=100):
|
||||
"""Sets the color of one pixel in the LED stripe.
|
||||
|
||||
The changed pixel is not shown yet on the Stripe, it is only
|
||||
written to the pixel buffer. Colors are passed individually.
|
||||
If brightness is not set the global brightness setting is used.
|
||||
"""
|
||||
if led_num < 0:
|
||||
return # Pixel is invisible, so ignore
|
||||
if led_num >= self.num_led:
|
||||
return # again, invisible
|
||||
|
||||
# Calculate pixel brightness as a percentage of the
|
||||
# defined global_brightness. Round up to nearest integer
|
||||
# as we expect some brightness unless set to 0
|
||||
brightness = int(ceil(bright_percent*self.global_brightness/100.0))
|
||||
|
||||
# LED startframe is three "1" bits, followed by 5 brightness bits
|
||||
ledstart = (brightness & 0b00011111) | self.LED_START
|
||||
|
||||
start_index = 4 * led_num
|
||||
self.leds[start_index] = ledstart
|
||||
self.leds[start_index + self.rgb[0]] = red
|
||||
self.leds[start_index + self.rgb[1]] = green
|
||||
self.leds[start_index + self.rgb[2]] = blue
|
||||
|
||||
|
||||
def set_pixel_rgb(self, led_num, rgb_color, bright_percent=100):
|
||||
"""Sets the color of one pixel in the LED stripe.
|
||||
|
||||
The changed pixel is not shown yet on the Stripe, it is only
|
||||
written to the pixel buffer.
|
||||
Colors are passed combined (3 bytes concatenated)
|
||||
If brightness is not set the global brightness setting is used.
|
||||
"""
|
||||
self.set_pixel(led_num, (rgb_color & 0xFF0000) >> 16,
|
||||
(rgb_color & 0x00FF00) >> 8, rgb_color & 0x0000FF,
|
||||
bright_percent)
|
||||
|
||||
|
||||
def rotate(self, positions=1):
|
||||
""" Rotate the LEDs by the specified number of positions.
|
||||
|
||||
Treating the internal LED array as a circular buffer, rotate it by
|
||||
the specified number of positions. The number could be negative,
|
||||
which means rotating in the opposite direction.
|
||||
"""
|
||||
cutoff = 4 * (positions % self.num_led)
|
||||
self.leds = self.leds[cutoff:] + self.leds[:cutoff]
|
||||
|
||||
|
||||
def show(self):
|
||||
"""Sends the content of the pixel buffer to the strip.
|
||||
|
||||
Todo: More than 1024 LEDs requires more than one xfer operation.
|
||||
"""
|
||||
self.clock_start_frame()
|
||||
# xfer2 kills the list, unfortunately. So it must be copied first
|
||||
# SPI takes up to 4096 Integers. So we are fine for up to 1024 LEDs.
|
||||
data = list(self.leds)
|
||||
while data:
|
||||
self.spi.xfer2(data[:32])
|
||||
data = data[32:]
|
||||
self.clock_end_frame()
|
||||
|
||||
|
||||
def cleanup(self):
|
||||
"""Release the SPI device; Call this method at the end"""
|
||||
|
||||
self.spi.close() # Close SPI port
|
||||
|
||||
@staticmethod
|
||||
def combine_color(red, green, blue):
|
||||
"""Make one 3*8 byte color value."""
|
||||
|
||||
return (red << 16) + (green << 8) + blue
|
||||
|
||||
|
||||
def wheel(self, wheel_pos):
|
||||
"""Get a color from a color wheel; Green -> Red -> Blue -> Green"""
|
||||
|
||||
if wheel_pos > 255:
|
||||
wheel_pos = 255 # Safeguard
|
||||
if wheel_pos < 85: # Green -> Red
|
||||
return self.combine_color(wheel_pos * 3, 255 - wheel_pos * 3, 0)
|
||||
if wheel_pos < 170: # Red -> Blue
|
||||
wheel_pos -= 85
|
||||
return self.combine_color(255 - wheel_pos * 3, 0, wheel_pos * 3)
|
||||
# Blue -> Green
|
||||
wheel_pos -= 170
|
||||
return self.combine_color(0, wheel_pos * 3, 255 - wheel_pos * 3)
|
||||
|
||||
|
||||
def dump_array(self):
|
||||
"""For debug purposes: Dump the LED array onto the console."""
|
||||
|
||||
print(self.leds)
|
@ -0,0 +1,105 @@
|
||||
|
||||
import time
|
||||
import threading
|
||||
try:
|
||||
import queue as Queue
|
||||
except ImportError:
|
||||
import Queue as Queue
|
||||
|
||||
from .apa102 import APA102
|
||||
from .pattern import Echo, GoogleHome
|
||||
|
||||
|
||||
class PixelRing(object):
|
||||
PIXELS_N = 12
|
||||
|
||||
def __init__(self, pattern='google'):
|
||||
if pattern == 'echo':
|
||||
self.pattern = Echo(show=self.show)
|
||||
else:
|
||||
self.pattern = GoogleHome(show=self.show)
|
||||
|
||||
self.dev = APA102(num_led=self.PIXELS_N)
|
||||
|
||||
self.queue = Queue.Queue()
|
||||
self.thread = threading.Thread(target=self._run)
|
||||
self.thread.daemon = True
|
||||
self.thread.start()
|
||||
self.off()
|
||||
|
||||
def set_brightness(self, brightness):
|
||||
if brightness > 100:
|
||||
brightness = 100
|
||||
|
||||
if brightness > 0:
|
||||
self.dev.global_brightness = int(0b11111 * brightness / 100)
|
||||
|
||||
def change_pattern(self, pattern):
|
||||
if pattern == 'echo':
|
||||
self.pattern = Echo(show=self.show)
|
||||
else:
|
||||
self.pattern = GoogleHome(show=self.show)
|
||||
|
||||
def wakeup(self, direction=0):
|
||||
def f():
|
||||
self.pattern.wakeup(direction)
|
||||
|
||||
self.put(f)
|
||||
|
||||
def listen(self):
|
||||
self.put(self.pattern.listen)
|
||||
|
||||
def think(self):
|
||||
self.put(self.pattern.think)
|
||||
|
||||
wait = think
|
||||
|
||||
def speak(self):
|
||||
self.put(self.pattern.speak)
|
||||
|
||||
def off(self):
|
||||
self.put(self.pattern.off)
|
||||
|
||||
def put(self, func):
|
||||
self.pattern.stop = True
|
||||
self.queue.put(func)
|
||||
|
||||
def _run(self):
|
||||
while True:
|
||||
func = self.queue.get()
|
||||
self.pattern.stop = False
|
||||
func()
|
||||
|
||||
def show(self, data):
|
||||
for i in range(self.PIXELS_N):
|
||||
self.dev.set_pixel(i, int(data[4*i + 1]), int(data[4*i + 2]), int(data[4*i + 3]))
|
||||
|
||||
self.dev.show()
|
||||
|
||||
def set_color(self, rgb=None, r=0, g=0, b=0):
|
||||
if rgb:
|
||||
r, g, b = (rgb >> 16) & 0xFF, (rgb >> 8) & 0xFF, rgb & 0xFF
|
||||
for i in range(self.PIXELS_N):
|
||||
self.dev.set_pixel(i, r, g, b)
|
||||
|
||||
self.dev.show()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pixel_ring = PixelRing()
|
||||
while True:
|
||||
try:
|
||||
pixel_ring.wakeup()
|
||||
time.sleep(3)
|
||||
pixel_ring.think()
|
||||
time.sleep(3)
|
||||
pixel_ring.speak()
|
||||
time.sleep(6)
|
||||
pixel_ring.off()
|
||||
time.sleep(3)
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
|
||||
|
||||
pixel_ring.off()
|
||||
time.sleep(1)
|
@ -0,0 +1,37 @@
|
||||
"""
|
||||
Control pixel ring on ReSpeaker 4 Mic Array
|
||||
|
||||
pip install pixel_ring gpiozero
|
||||
"""
|
||||
|
||||
import time
|
||||
|
||||
from pixel_ring import pixel_ring
|
||||
from gpiozero import LED
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
power = LED(5)
|
||||
power.on()
|
||||
|
||||
pixel_ring.set_brightness(20)
|
||||
pixel_ring.change_pattern('echo')
|
||||
while True:
|
||||
|
||||
try:
|
||||
pixel_ring.wakeup()
|
||||
time.sleep(3)
|
||||
pixel_ring.think()
|
||||
time.sleep(3)
|
||||
pixel_ring.speak()
|
||||
time.sleep(6)
|
||||
pixel_ring.off()
|
||||
time.sleep(3)
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
|
||||
|
||||
pixel_ring.off()
|
||||
power.off()
|
||||
time.sleep(1)
|
||||
|
@ -0,0 +1,36 @@
|
||||
"""
|
||||
Control pixel ring on ReSpeaker 4 Mic Array
|
||||
|
||||
pip install pixel_ring gpiozero
|
||||
"""
|
||||
|
||||
import time
|
||||
|
||||
from pixel_ring import pixel_ring
|
||||
from gpiozero import LED
|
||||
|
||||
power = LED(5)
|
||||
power.on()
|
||||
|
||||
pixel_ring.set_brightness(10)
|
||||
|
||||
if __name__ == '__main__':
|
||||
while True:
|
||||
|
||||
try:
|
||||
pixel_ring.wakeup()
|
||||
time.sleep(3)
|
||||
pixel_ring.think()
|
||||
time.sleep(3)
|
||||
pixel_ring.speak()
|
||||
time.sleep(6)
|
||||
pixel_ring.off()
|
||||
time.sleep(3)
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
|
||||
|
||||
pixel_ring.off()
|
||||
time.sleep(1)
|
||||
|
||||
power.off()
|
@ -0,0 +1,43 @@
|
||||
"""
|
||||
Control pixel ring on ReSpeaker V2
|
||||
|
||||
sudo apt install python-mraa libmraa1
|
||||
pip install pixel-ring
|
||||
|
||||
"""
|
||||
|
||||
import time
|
||||
|
||||
from pixel_ring import pixel_ring
|
||||
import mraa
|
||||
import os
|
||||
|
||||
en = mraa.Gpio(12)
|
||||
if os.geteuid() != 0 :
|
||||
time.sleep(1)
|
||||
|
||||
en.dir(mraa.DIR_OUT)
|
||||
en.write(0)
|
||||
|
||||
pixel_ring.set_brightness(20)
|
||||
|
||||
if __name__ == '__main__':
|
||||
while True:
|
||||
|
||||
try:
|
||||
pixel_ring.wakeup()
|
||||
time.sleep(3)
|
||||
pixel_ring.think()
|
||||
time.sleep(3)
|
||||
pixel_ring.speak()
|
||||
time.sleep(6)
|
||||
pixel_ring.off()
|
||||
time.sleep(3)
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
|
||||
|
||||
pixel_ring.off()
|
||||
time.sleep(1)
|
||||
|
||||
en.write(1)
|
@ -0,0 +1,29 @@
|
||||
"""
|
||||
Control pixel ring on ReSpeaker USB Mic Array
|
||||
"""
|
||||
|
||||
import time
|
||||
|
||||
from pixel_ring import pixel_ring
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pixel_ring.change_pattern('echo')
|
||||
while True:
|
||||
|
||||
try:
|
||||
pixel_ring.wakeup()
|
||||
time.sleep(3)
|
||||
pixel_ring.think()
|
||||
time.sleep(3)
|
||||
pixel_ring.speak()
|
||||
time.sleep(6)
|
||||
pixel_ring.off()
|
||||
time.sleep(3)
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
|
||||
|
||||
pixel_ring.off()
|
||||
time.sleep(1)
|
||||
|
@ -0,0 +1,145 @@
|
||||
"""
|
||||
LED pattern like Echo
|
||||
"""
|
||||
|
||||
import time
|
||||
|
||||
|
||||
class Echo(object):
|
||||
brightness = 24 * 8
|
||||
|
||||
def __init__(self, show, number=12):
|
||||
self.pixels_number = number
|
||||
self.pixels = [0] * 4 * number
|
||||
|
||||
if not callable(show):
|
||||
raise ValueError('show parameter is not callable')
|
||||
|
||||
self.show = show
|
||||
self.stop = False
|
||||
|
||||
def wakeup(self, direction=0):
|
||||
position = int((direction + 15) / (360 / self.pixels_number)) % self.pixels_number
|
||||
|
||||
pixels = [0, 0, 0, self.brightness] * self.pixels_number
|
||||
pixels[position * 4 + 2] = self.brightness
|
||||
|
||||
self.show(pixels)
|
||||
|
||||
def listen(self):
|
||||
pixels = [0, 0, 0, self.brightness] * self.pixels_number
|
||||
|
||||
self.show(pixels)
|
||||
|
||||
def think(self):
|
||||
half_brightness = int(self.brightness / 2)
|
||||
pixels = [0, 0, half_brightness, half_brightness, 0, 0, 0, self.brightness] * self.pixels_number
|
||||
|
||||
while not self.stop:
|
||||
self.show(pixels)
|
||||
time.sleep(0.2)
|
||||
pixels = pixels[-4:] + pixels[:-4]
|
||||
|
||||
def speak(self):
|
||||
step = int(self.brightness / 12)
|
||||
position = int(self.brightness / 2)
|
||||
while not self.stop:
|
||||
pixels = [0, 0, position, self.brightness - position] * self.pixels_number
|
||||
self.show(pixels)
|
||||
time.sleep(0.01)
|
||||
if position <= 0:
|
||||
step = int(self.brightness / 12)
|
||||
time.sleep(0.4)
|
||||
elif position >= int(self.brightness / 2):
|
||||
step = - int(self.brightness / 12)
|
||||
time.sleep(0.4)
|
||||
|
||||
position += step
|
||||
|
||||
def off(self):
|
||||
self.show([0] * 4 * 12)
|
||||
|
||||
class GoogleHome(object):
|
||||
def __init__(self, show):
|
||||
self.basis = [0] * 4 * 12
|
||||
self.basis[0 * 4 + 1] = 8
|
||||
self.basis[3 * 4 + 1] = 4
|
||||
self.basis[3 * 4 + 2] = 4
|
||||
self.basis[6 * 4 + 2] = 8
|
||||
self.basis[9 * 4 + 3] = 8
|
||||
|
||||
self.pixels = self.basis
|
||||
|
||||
if not callable(show):
|
||||
raise ValueError('show parameter is not callable')
|
||||
|
||||
self.show = show
|
||||
self.stop = False
|
||||
|
||||
def wakeup(self, direction=0):
|
||||
position = int((direction + 90 + 15) / 30) % 12
|
||||
|
||||
basis = self.basis[position*-4:] + self.basis[:position*-4]
|
||||
|
||||
pixels = [v * 25 for v in basis]
|
||||
self.show(pixels)
|
||||
time.sleep(0.1)
|
||||
|
||||
pixels = pixels[-4:] + pixels[:-4]
|
||||
self.show(pixels)
|
||||
time.sleep(0.1)
|
||||
|
||||
for i in range(2):
|
||||
new_pixels = pixels[-4:] + pixels[:-4]
|
||||
|
||||
self.show([v/2+pixels[index] for index, v in enumerate(new_pixels)])
|
||||
pixels = new_pixels
|
||||
time.sleep(0.1)
|
||||
|
||||
self.show(pixels)
|
||||
self.pixels = pixels
|
||||
|
||||
def listen(self):
|
||||
pixels = self.pixels
|
||||
for i in range(1, 25):
|
||||
self.show([(v * i / 24) for v in pixels])
|
||||
time.sleep(0.01)
|
||||
|
||||
def think(self):
|
||||
pixels = self.pixels
|
||||
|
||||
while not self.stop:
|
||||
pixels = pixels[-4:] + pixels[:-4]
|
||||
self.show(pixels)
|
||||
time.sleep(0.2)
|
||||
|
||||
t = 0.1
|
||||
for i in range(0, 5):
|
||||
pixels = pixels[-4:] + pixels[:-4]
|
||||
self.show([(v * (4 - i) / 4) for v in pixels])
|
||||
time.sleep(t)
|
||||
t /= 2
|
||||
|
||||
self.pixels = pixels
|
||||
|
||||
def speak(self):
|
||||
pixels = self.pixels
|
||||
step = 1
|
||||
brightness = 5
|
||||
while not self.stop:
|
||||
self.show([(v * brightness / 24) for v in pixels])
|
||||
time.sleep(0.02)
|
||||
|
||||
if brightness <= 5:
|
||||
step = 1
|
||||
time.sleep(0.4)
|
||||
elif brightness >= 24:
|
||||
step = -1
|
||||
time.sleep(0.4)
|
||||
|
||||
brightness += step
|
||||
|
||||
def off(self):
|
||||
self.show([0] * 4 * 12)
|
||||
|
||||
|
@ -0,0 +1,26 @@
|
||||
|
||||
|
||||
class PixelRing(object):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def show(self, data):
|
||||
pass
|
||||
|
||||
def set_color(self, rgb=None, r=0, g=0, b=0):
|
||||
pass
|
||||
|
||||
def wakeup(self, angle=None):
|
||||
pass
|
||||
|
||||
def listen(self):
|
||||
pass
|
||||
|
||||
def think(self):
|
||||
pass
|
||||
|
||||
def speak(self):
|
||||
pass
|
||||
|
||||
def off(self):
|
||||
pass
|
@ -0,0 +1,52 @@
|
||||
|
||||
|
||||
from . import usb_pixel_ring_v1
|
||||
from . import usb_pixel_ring_v2
|
||||
from .apa102_pixel_ring import PixelRing
|
||||
|
||||
pixel_ring = usb_pixel_ring_v2.find()
|
||||
|
||||
if not pixel_ring:
|
||||
pixel_ring = usb_pixel_ring_v1.find()
|
||||
|
||||
if not pixel_ring:
|
||||
pixel_ring = PixelRing()
|
||||
|
||||
|
||||
USAGE = '''
|
||||
If the hardware is ReSpeaker 4 Mic Array for Pi or ReSpeaker V2,
|
||||
there is a power-enable pin which should be enabled at first.
|
||||
+ ReSpeaker 4 Mic Array for Pi:
|
||||
|
||||
import gpiozero
|
||||
power = LED(5)
|
||||
power.on()
|
||||
|
||||
+ ReSpeaker V2:
|
||||
|
||||
import mraa
|
||||
power = mraa.Gpio(12)
|
||||
power.dir(mraa.DIR_OUT)
|
||||
power.write(0)
|
||||
'''
|
||||
|
||||
def main():
|
||||
import time
|
||||
|
||||
if isinstance(pixel_ring, usb_pixel_ring_v2.PixelRing):
|
||||
print('Found ReSpeaker USB 4 Mic Array')
|
||||
elif isinstance(pixel_ring, usb_pixel_ring_v1.UsbPixelRing):
|
||||
print('Found ReSpeaker USB 6+1 Mic Array')
|
||||
else:
|
||||
print('Control APA102 RGB LEDs via SPI')
|
||||
print(USAGE)
|
||||
|
||||
pixel_ring.think()
|
||||
time.sleep(3)
|
||||
pixel_ring.off()
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
@ -0,0 +1,243 @@
|
||||
"""
|
||||
from https://github.com/tinue/APA102_Pi
|
||||
This is the main driver module for APA102 LEDs
|
||||
"""
|
||||
import spidev
|
||||
from math import ceil
|
||||
|
||||
RGB_MAP = { 'rgb': [3, 2, 1], 'rbg': [3, 1, 2], 'grb': [2, 3, 1],
|
||||
'gbr': [2, 1, 3], 'brg': [1, 3, 2], 'bgr': [1, 2, 3] }
|
||||
|
||||
class APA102:
|
||||
"""
|
||||
Driver for APA102 LEDS (aka "DotStar").
|
||||
|
||||
(c) Martin Erzberger 2016-2017
|
||||
|
||||
My very first Python code, so I am sure there is a lot to be optimized ;)
|
||||
|
||||
Public methods are:
|
||||
- set_pixel
|
||||
- set_pixel_rgb
|
||||
- show
|
||||
- clear_strip
|
||||
- cleanup
|
||||
|
||||
Helper methods for color manipulation are:
|
||||
- combine_color
|
||||
- wheel
|
||||
|
||||
The rest of the methods are used internally and should not be used by the
|
||||
user of the library.
|
||||
|
||||
Very brief overview of APA102: An APA102 LED is addressed with SPI. The bits
|
||||
are shifted in one by one, starting with the least significant bit.
|
||||
|
||||
An LED usually just forwards everything that is sent to its data-in to
|
||||
data-out. While doing this, it remembers its own color and keeps glowing
|
||||
with that color as long as there is power.
|
||||
|
||||
An LED can be switched to not forward the data, but instead use the data
|
||||
to change it's own color. This is done by sending (at least) 32 bits of
|
||||
zeroes to data-in. The LED then accepts the next correct 32 bit LED
|
||||
frame (with color information) as its new color setting.
|
||||
|
||||
After having received the 32 bit color frame, the LED changes color,
|
||||
and then resumes to just copying data-in to data-out.
|
||||
|
||||
The really clever bit is this: While receiving the 32 bit LED frame,
|
||||
the LED sends zeroes on its data-out line. Because a color frame is
|
||||
32 bits, the LED sends 32 bits of zeroes to the next LED.
|
||||
As we have seen above, this means that the next LED is now ready
|
||||
to accept a color frame and update its color.
|
||||
|
||||
So that's really the entire protocol:
|
||||
- Start by sending 32 bits of zeroes. This prepares LED 1 to update
|
||||
its color.
|
||||
- Send color information one by one, starting with the color for LED 1,
|
||||
then LED 2 etc.
|
||||
- Finish off by cycling the clock line a few times to get all data
|
||||
to the very last LED on the strip
|
||||
|
||||
The last step is necessary, because each LED delays forwarding the data
|
||||
a bit. Imagine ten people in a row. When you yell the last color
|
||||
information, i.e. the one for person ten, to the first person in
|
||||
the line, then you are not finished yet. Person one has to turn around
|
||||
and yell it to person 2, and so on. So it takes ten additional "dummy"
|
||||
cycles until person ten knows the color. When you look closer,
|
||||
you will see that not even person 9 knows its own color yet. This
|
||||
information is still with person 2. Essentially the driver sends additional
|
||||
zeroes to LED 1 as long as it takes for the last color frame to make it
|
||||
down the line to the last LED.
|
||||
"""
|
||||
# Constants
|
||||
MAX_BRIGHTNESS = 0b11111 # Safeguard: Set to a value appropriate for your setup
|
||||
LED_START = 0b11100000 # Three "1" bits, followed by 5 brightness bits
|
||||
|
||||
def __init__(self, num_led, global_brightness=MAX_BRIGHTNESS,
|
||||
order='rgb', bus=0, device=1, max_speed_hz=8000000):
|
||||
self.num_led = num_led # The number of LEDs in the Strip
|
||||
order = order.lower()
|
||||
self.rgb = RGB_MAP.get(order, RGB_MAP['rgb'])
|
||||
# Limit the brightness to the maximum if it's set higher
|
||||
if global_brightness > self.MAX_BRIGHTNESS:
|
||||
self.global_brightness = self.MAX_BRIGHTNESS
|
||||
else:
|
||||
self.global_brightness = global_brightness
|
||||
|
||||
self.leds = [self.LED_START,0,0,0] * self.num_led # Pixel buffer
|
||||
self.spi = spidev.SpiDev() # Init the SPI device
|
||||
self.spi.open(bus, device) # Open SPI port 0, slave device (CS) 1
|
||||
# Up the speed a bit, so that the LEDs are painted faster
|
||||
if max_speed_hz:
|
||||
self.spi.max_speed_hz = max_speed_hz
|
||||
|
||||
def clock_start_frame(self):
|
||||
"""Sends a start frame to the LED strip.
|
||||
|
||||
This method clocks out a start frame, telling the receiving LED
|
||||
that it must update its own color now.
|
||||
"""
|
||||
self.spi.xfer2([0] * 4) # Start frame, 32 zero bits
|
||||
|
||||
|
||||
def clock_end_frame(self):
|
||||
"""Sends an end frame to the LED strip.
|
||||
|
||||
As explained above, dummy data must be sent after the last real colour
|
||||
information so that all of the data can reach its destination down the line.
|
||||
The delay is not as bad as with the human example above.
|
||||
It is only 1/2 bit per LED. This is because the SPI clock line
|
||||
needs to be inverted.
|
||||
|
||||
Say a bit is ready on the SPI data line. The sender communicates
|
||||
this by toggling the clock line. The bit is read by the LED
|
||||
and immediately forwarded to the output data line. When the clock goes
|
||||
down again on the input side, the LED will toggle the clock up
|
||||
on the output to tell the next LED that the bit is ready.
|
||||
|
||||
After one LED the clock is inverted, and after two LEDs it is in sync
|
||||
again, but one cycle behind. Therefore, for every two LEDs, one bit
|
||||
of delay gets accumulated. For 300 LEDs, 150 additional bits must be fed to
|
||||
the input of LED one so that the data can reach the last LED.
|
||||
|
||||
Ultimately, we need to send additional numLEDs/2 arbitrary data bits,
|
||||
in order to trigger numLEDs/2 additional clock changes. This driver
|
||||
sends zeroes, which has the benefit of getting LED one partially or
|
||||
fully ready for the next update to the strip. An optimized version
|
||||
of the driver could omit the "clockStartFrame" method if enough zeroes have
|
||||
been sent as part of "clockEndFrame".
|
||||
"""
|
||||
|
||||
self.spi.xfer2([0xFF] * 4)
|
||||
|
||||
# Round up num_led/2 bits (or num_led/16 bytes)
|
||||
#for _ in range((self.num_led + 15) // 16):
|
||||
# self.spi.xfer2([0x00])
|
||||
|
||||
|
||||
def clear_strip(self):
|
||||
""" Turns off the strip and shows the result right away."""
|
||||
|
||||
for led in range(self.num_led):
|
||||
self.set_pixel(led, 0, 0, 0)
|
||||
self.show()
|
||||
|
||||
|
||||
def set_pixel(self, led_num, red, green, blue, bright_percent=100):
|
||||
"""Sets the color of one pixel in the LED stripe.
|
||||
|
||||
The changed pixel is not shown yet on the Stripe, it is only
|
||||
written to the pixel buffer. Colors are passed individually.
|
||||
If brightness is not set the global brightness setting is used.
|
||||
"""
|
||||
if led_num < 0:
|
||||
return # Pixel is invisible, so ignore
|
||||
if led_num >= self.num_led:
|
||||
return # again, invisible
|
||||
|
||||
# Calculate pixel brightness as a percentage of the
|
||||
# defined global_brightness. Round up to nearest integer
|
||||
# as we expect some brightness unless set to 0
|
||||
brightness = int(ceil(bright_percent*self.global_brightness/100.0))
|
||||
|
||||
# LED startframe is three "1" bits, followed by 5 brightness bits
|
||||
ledstart = (brightness & 0b00011111) | self.LED_START
|
||||
|
||||
start_index = 4 * led_num
|
||||
self.leds[start_index] = ledstart
|
||||
self.leds[start_index + self.rgb[0]] = red
|
||||
self.leds[start_index + self.rgb[1]] = green
|
||||
self.leds[start_index + self.rgb[2]] = blue
|
||||
|
||||
|
||||
def set_pixel_rgb(self, led_num, rgb_color, bright_percent=100):
|
||||
"""Sets the color of one pixel in the LED stripe.
|
||||
|
||||
The changed pixel is not shown yet on the Stripe, it is only
|
||||
written to the pixel buffer.
|
||||
Colors are passed combined (3 bytes concatenated)
|
||||
If brightness is not set the global brightness setting is used.
|
||||
"""
|
||||
self.set_pixel(led_num, (rgb_color & 0xFF0000) >> 16,
|
||||
(rgb_color & 0x00FF00) >> 8, rgb_color & 0x0000FF,
|
||||
bright_percent)
|
||||
|
||||
|
||||
def rotate(self, positions=1):
|
||||
""" Rotate the LEDs by the specified number of positions.
|
||||
|
||||
Treating the internal LED array as a circular buffer, rotate it by
|
||||
the specified number of positions. The number could be negative,
|
||||
which means rotating in the opposite direction.
|
||||
"""
|
||||
cutoff = 4 * (positions % self.num_led)
|
||||
self.leds = self.leds[cutoff:] + self.leds[:cutoff]
|
||||
|
||||
|
||||
def show(self):
|
||||
"""Sends the content of the pixel buffer to the strip.
|
||||
|
||||
Todo: More than 1024 LEDs requires more than one xfer operation.
|
||||
"""
|
||||
self.clock_start_frame()
|
||||
# xfer2 kills the list, unfortunately. So it must be copied first
|
||||
# SPI takes up to 4096 Integers. So we are fine for up to 1024 LEDs.
|
||||
data = list(self.leds)
|
||||
while data:
|
||||
self.spi.xfer2(data[:32])
|
||||
data = data[32:]
|
||||
self.clock_end_frame()
|
||||
|
||||
|
||||
def cleanup(self):
|
||||
"""Release the SPI device; Call this method at the end"""
|
||||
|
||||
self.spi.close() # Close SPI port
|
||||
|
||||
@staticmethod
|
||||
def combine_color(red, green, blue):
|
||||
"""Make one 3*8 byte color value."""
|
||||
|
||||
return (red << 16) + (green << 8) + blue
|
||||
|
||||
|
||||
def wheel(self, wheel_pos):
|
||||
"""Get a color from a color wheel; Green -> Red -> Blue -> Green"""
|
||||
|
||||
if wheel_pos > 255:
|
||||
wheel_pos = 255 # Safeguard
|
||||
if wheel_pos < 85: # Green -> Red
|
||||
return self.combine_color(wheel_pos * 3, 255 - wheel_pos * 3, 0)
|
||||
if wheel_pos < 170: # Red -> Blue
|
||||
wheel_pos -= 85
|
||||
return self.combine_color(255 - wheel_pos * 3, 0, wheel_pos * 3)
|
||||
# Blue -> Green
|
||||
wheel_pos -= 170
|
||||
return self.combine_color(0, wheel_pos * 3, 255 - wheel_pos * 3)
|
||||
|
||||
|
||||
def dump_array(self):
|
||||
"""For debug purposes: Dump the LED array onto the console."""
|
||||
|
||||
print(self.leds)
|
@ -0,0 +1,105 @@
|
||||
|
||||
import time
|
||||
import threading
|
||||
try:
|
||||
import queue as Queue
|
||||
except ImportError:
|
||||
import Queue as Queue
|
||||
|
||||
from .apa102 import APA102
|
||||
from .pattern import Echo, GoogleHome
|
||||
|
||||
|
||||
class PixelRing(object):
|
||||
PIXELS_N = 12
|
||||
|
||||
def __init__(self, pattern='google'):
|
||||
if pattern == 'echo':
|
||||
self.pattern = Echo(show=self.show)
|
||||
else:
|
||||
self.pattern = GoogleHome(show=self.show)
|
||||
|
||||
self.dev = APA102(num_led=self.PIXELS_N)
|
||||
|
||||
self.queue = Queue.Queue()
|
||||
self.thread = threading.Thread(target=self._run)
|
||||
self.thread.daemon = True
|
||||
self.thread.start()
|
||||
self.off()
|
||||
|
||||
def set_brightness(self, brightness):
|
||||
if brightness > 100:
|
||||
brightness = 100
|
||||
|
||||
if brightness > 0:
|
||||
self.dev.global_brightness = int(0b11111 * brightness / 100)
|
||||
|
||||
def change_pattern(self, pattern):
|
||||
if pattern == 'echo':
|
||||
self.pattern = Echo(show=self.show)
|
||||
else:
|
||||
self.pattern = GoogleHome(show=self.show)
|
||||
|
||||
def wakeup(self, direction=0):
|
||||
def f():
|
||||
self.pattern.wakeup(direction)
|
||||
|
||||
self.put(f)
|
||||
|
||||
def listen(self):
|
||||
self.put(self.pattern.listen)
|
||||
|
||||
def think(self):
|
||||
self.put(self.pattern.think)
|
||||
|
||||
wait = think
|
||||
|
||||
def speak(self):
|
||||
self.put(self.pattern.speak)
|
||||
|
||||
def off(self):
|
||||
self.put(self.pattern.off)
|
||||
|
||||
def put(self, func):
|
||||
self.pattern.stop = True
|
||||
self.queue.put(func)
|
||||
|
||||
def _run(self):
|
||||
while True:
|
||||
func = self.queue.get()
|
||||
self.pattern.stop = False
|
||||
func()
|
||||
|
||||
def show(self, data):
|
||||
for i in range(self.PIXELS_N):
|
||||
self.dev.set_pixel(i, int(data[4*i + 1]), int(data[4*i + 2]), int(data[4*i + 3]))
|
||||
|
||||
self.dev.show()
|
||||
|
||||
def set_color(self, rgb=None, r=0, g=0, b=0):
|
||||
if rgb:
|
||||
r, g, b = (rgb >> 16) & 0xFF, (rgb >> 8) & 0xFF, rgb & 0xFF
|
||||
for i in range(self.PIXELS_N):
|
||||
self.dev.set_pixel(i, r, g, b)
|
||||
|
||||
self.dev.show()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pixel_ring = PixelRing()
|
||||
while True:
|
||||
try:
|
||||
pixel_ring.wakeup()
|
||||
time.sleep(3)
|
||||
pixel_ring.think()
|
||||
time.sleep(3)
|
||||
pixel_ring.speak()
|
||||
time.sleep(6)
|
||||
pixel_ring.off()
|
||||
time.sleep(3)
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
|
||||
|
||||
pixel_ring.off()
|
||||
time.sleep(1)
|
@ -0,0 +1,145 @@
|
||||
"""
|
||||
LED pattern like Echo
|
||||
"""
|
||||
|
||||
import time
|
||||
|
||||
|
||||
class Echo(object):
|
||||
brightness = 24 * 8
|
||||
|
||||
def __init__(self, show, number=12):
|
||||
self.pixels_number = number
|
||||
self.pixels = [0] * 4 * number
|
||||
|
||||
if not callable(show):
|
||||
raise ValueError('show parameter is not callable')
|
||||
|
||||
self.show = show
|
||||
self.stop = False
|
||||
|
||||
def wakeup(self, direction=0):
|
||||
position = int((direction + 15) / (360 / self.pixels_number)) % self.pixels_number
|
||||
|
||||
pixels = [0, 0, 0, self.brightness] * self.pixels_number
|
||||
pixels[position * 4 + 2] = self.brightness
|
||||
|
||||
self.show(pixels)
|
||||
|
||||
def listen(self):
|
||||
pixels = [0, 0, 0, self.brightness] * self.pixels_number
|
||||
|
||||
self.show(pixels)
|
||||
|
||||
def think(self):
|
||||
half_brightness = int(self.brightness / 2)
|
||||
pixels = [0, 0, half_brightness, half_brightness, 0, 0, 0, self.brightness] * self.pixels_number
|
||||
|
||||
while not self.stop:
|
||||
self.show(pixels)
|
||||
time.sleep(0.2)
|
||||
pixels = pixels[-4:] + pixels[:-4]
|
||||
|
||||
def speak(self):
|
||||
step = int(self.brightness / 12)
|
||||
position = int(self.brightness / 2)
|
||||
while not self.stop:
|
||||
pixels = [0, 0, position, self.brightness - position] * self.pixels_number
|
||||
self.show(pixels)
|
||||
time.sleep(0.01)
|
||||
if position <= 0:
|
||||
step = int(self.brightness / 12)
|
||||
time.sleep(0.4)
|
||||
elif position >= int(self.brightness / 2):
|
||||
step = - int(self.brightness / 12)
|
||||
time.sleep(0.4)
|
||||
|
||||
position += step
|
||||
|
||||
def off(self):
|
||||
self.show([0] * 4 * 12)
|
||||
|
||||
class GoogleHome(object):
|
||||
def __init__(self, show):
|
||||
self.basis = [0] * 4 * 12
|
||||
self.basis[0 * 4 + 1] = 8
|
||||
self.basis[3 * 4 + 1] = 4
|
||||
self.basis[3 * 4 + 2] = 4
|
||||
self.basis[6 * 4 + 2] = 8
|
||||
self.basis[9 * 4 + 3] = 8
|
||||
|
||||
self.pixels = self.basis
|
||||
|
||||
if not callable(show):
|
||||
raise ValueError('show parameter is not callable')
|
||||
|
||||
self.show = show
|
||||
self.stop = False
|
||||
|
||||
def wakeup(self, direction=0):
|
||||
position = int((direction + 90 + 15) / 30) % 12
|
||||
|
||||
basis = self.basis[position*-4:] + self.basis[:position*-4]
|
||||
|
||||
pixels = [v * 25 for v in basis]
|
||||
self.show(pixels)
|
||||
time.sleep(0.1)
|
||||
|
||||
pixels = pixels[-4:] + pixels[:-4]
|
||||
self.show(pixels)
|
||||
time.sleep(0.1)
|
||||
|
||||
for i in range(2):
|
||||
new_pixels = pixels[-4:] + pixels[:-4]
|
||||
|
||||
self.show([v/2+pixels[index] for index, v in enumerate(new_pixels)])
|
||||
pixels = new_pixels
|
||||
time.sleep(0.1)
|
||||
|
||||
self.show(pixels)
|
||||
self.pixels = pixels
|
||||
|
||||
def listen(self):
|
||||
pixels = self.pixels
|
||||
for i in range(1, 25):
|
||||
self.show([(v * i / 24) for v in pixels])
|
||||
time.sleep(0.01)
|
||||
|
||||
def think(self):
|
||||
pixels = self.pixels
|
||||
|
||||
while not self.stop:
|
||||
pixels = pixels[-4:] + pixels[:-4]
|
||||
self.show(pixels)
|
||||
time.sleep(0.2)
|
||||
|
||||
t = 0.1
|
||||
for i in range(0, 5):
|
||||
pixels = pixels[-4:] + pixels[:-4]
|
||||
self.show([(v * (4 - i) / 4) for v in pixels])
|
||||
time.sleep(t)
|
||||
t /= 2
|
||||
|
||||
self.pixels = pixels
|
||||
|
||||
def speak(self):
|
||||
pixels = self.pixels
|
||||
step = 1
|
||||
brightness = 5
|
||||
while not self.stop:
|
||||
self.show([(v * brightness / 24) for v in pixels])
|
||||
time.sleep(0.02)
|
||||
|
||||
if brightness <= 5:
|
||||
step = 1
|
||||
time.sleep(0.4)
|
||||
elif brightness >= 24:
|
||||
step = -1
|
||||
time.sleep(0.4)
|
||||
|
||||
brightness += step
|
||||
|
||||
def off(self):
|
||||
self.show([0] * 4 * 12)
|
||||
|
||||
|
@ -0,0 +1,26 @@
|
||||
|
||||
|
||||
class PixelRing(object):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def show(self, data):
|
||||
pass
|
||||
|
||||
def set_color(self, rgb=None, r=0, g=0, b=0):
|
||||
pass
|
||||
|
||||
def wakeup(self, angle=None):
|
||||
pass
|
||||
|
||||
def listen(self):
|
||||
pass
|
||||
|
||||
def think(self):
|
||||
pass
|
||||
|
||||
def speak(self):
|
||||
pass
|
||||
|
||||
def off(self):
|
||||
pass
|
@ -0,0 +1,190 @@
|
||||
|
||||
import usb.core
|
||||
import usb.util
|
||||
|
||||
|
||||
class HidDevice:
|
||||
"""
|
||||
This class provides basic functions to access
|
||||
a USB HID device to write an endpoint
|
||||
"""
|
||||
|
||||
def __init__(self, dev, ep_in, ep_out):
|
||||
self.dev = dev
|
||||
self.ep_in = ep_in
|
||||
self.ep_out = ep_out
|
||||
|
||||
def write(self, data):
|
||||
"""
|
||||
write data on the OUT endpoint associated to the HID interface
|
||||
"""
|
||||
self.ep_out.write(data)
|
||||
|
||||
def read(self):
|
||||
return self.ep_in.read(self.ep_in.wMaxPacketSize, -1)
|
||||
|
||||
def close(self):
|
||||
"""
|
||||
close the interface
|
||||
"""
|
||||
usb.util.dispose_resources(self.dev)
|
||||
|
||||
@staticmethod
|
||||
def find(vid=0x2886, pid=0x0007):
|
||||
dev = usb.core.find(idVendor=vid, idProduct=pid)
|
||||
if not dev:
|
||||
return
|
||||
|
||||
config = dev.get_active_configuration()
|
||||
|
||||
# Iterate on all interfaces to find a HID interface
|
||||
ep_in, ep_out = None, None
|
||||
for interface in config:
|
||||
if interface.bInterfaceClass == 0x03:
|
||||
try:
|
||||
if dev.is_kernel_driver_active(interface.bInterfaceNumber):
|
||||
dev.detach_kernel_driver(interface.bInterfaceNumber)
|
||||
except Exception as e:
|
||||
print(e.message)
|
||||
|
||||
for ep in interface:
|
||||
if ep.bEndpointAddress & 0x80:
|
||||
ep_in = ep
|
||||
else:
|
||||
ep_out = ep
|
||||
break
|
||||
|
||||
|
||||
|
||||
if ep_in and ep_out:
|
||||
hid = HidDevice(dev, ep_in, ep_out)
|
||||
|
||||
return hid
|
||||
|
||||
|
||||
class UsbPixelRing:
|
||||
PIXELS_N = 12
|
||||
|
||||
MONO = 1
|
||||
THINK = 3
|
||||
VOLUME = 5
|
||||
CUSTOM = 6
|
||||
|
||||
def __init__(self, hid=None, pattern=None):
|
||||
self.hid = hid if hid else HidDevice.find()
|
||||
if not self.hid:
|
||||
print('No USB device found')
|
||||
|
||||
colors = [0] * 4 * self.PIXELS_N
|
||||
colors[0] = 0x4
|
||||
colors[1] = 0x40
|
||||
colors[2] = 0x4
|
||||
|
||||
colors[4 + 1] = 0x8
|
||||
colors[4 * 11 + 1] = 0x8
|
||||
|
||||
self.direction_template = colors
|
||||
|
||||
def set_brightness(self, brightness):
|
||||
print('Not support to change brightness')
|
||||
|
||||
def change_pattern(self, pattern=None):
|
||||
print('Not support to change pattern')
|
||||
|
||||
def off(self):
|
||||
self.set_color(rgb=0)
|
||||
|
||||
def set_color(self, rgb=None, r=0, g=0, b=0):
|
||||
if rgb:
|
||||
self.write(0, [self.MONO, rgb & 0xFF, (rgb >> 8) & 0xFF, (rgb >> 16) & 0xFF])
|
||||
else:
|
||||
self.write(0, [self.MONO, b, g, r])
|
||||
|
||||
def think(self):
|
||||
self.write(0, [self.THINK, 0, 0, 0])
|
||||
|
||||
wait = think
|
||||
|
||||
speak = think
|
||||
|
||||
def set_volume(self, pixels):
|
||||
self.write(0, [self.VOLUME, 0, 0, pixels])
|
||||
|
||||
def wakeup(self, angle=0):
|
||||
if angle < 0 or angle > 360:
|
||||
return
|
||||
|
||||
position = int((angle + 15) % 360 / 30) % self.PIXELS_N
|
||||
colors = self.direction_template[-position*4:] + self.direction_template[:-position*4]
|
||||
|
||||
self.write(0, [self.CUSTOM, 0, 0, 0])
|
||||
self.write(3, colors)
|
||||
|
||||
return position
|
||||
|
||||
def listen(self, angle=0):
|
||||
self.write(0, [self.MONO, 0, 0x10, 0])
|
||||
|
||||
def show(self, data):
|
||||
self.write(0, [self.CUSTOM, 0, 0, 0])
|
||||
self.write(3, data)
|
||||
|
||||
@staticmethod
|
||||
def to_bytearray(data):
|
||||
if type(data) is int:
|
||||
array = bytearray([data & 0xFF])
|
||||
elif type(data) is bytearray:
|
||||
array = data
|
||||
elif type(data) is str or type(data) is bytes:
|
||||
array = bytearray(data)
|
||||
elif type(data) is list:
|
||||
array = bytearray(data)
|
||||
else:
|
||||
raise TypeError('%s is not supported' % type(data))
|
||||
|
||||
return array
|
||||
|
||||
def write(self, address, data):
|
||||
data = self.to_bytearray(data)
|
||||
length = len(data)
|
||||
if self.hid:
|
||||
packet = bytearray([address & 0xFF, (address >> 8) & 0xFF, length & 0xFF, (length >> 8) & 0xFF]) + data
|
||||
self.hid.write(packet)
|
||||
|
||||
def close(self):
|
||||
if self.hid:
|
||||
self.hid.close()
|
||||
|
||||
def __call__(self, data):
|
||||
self.write(3, data)
|
||||
|
||||
|
||||
def find():
|
||||
hid = HidDevice.find()
|
||||
|
||||
if hid:
|
||||
pixel_ring = UsbPixelRing(hid)
|
||||
return pixel_ring
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import time
|
||||
|
||||
pixel_ring = UsbPixelRing()
|
||||
while True:
|
||||
try:
|
||||
pixel_ring.wakeup(180)
|
||||
time.sleep(3)
|
||||
pixel_ring.listen()
|
||||
time.sleep(3)
|
||||
pixel_ring.think()
|
||||
time.sleep(3)
|
||||
pixel_ring.set_volume(8)
|
||||
time.sleep(3)
|
||||
pixel_ring.off()
|
||||
time.sleep(3)
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
|
||||
pixel_ring.off()
|
||||
|
@ -0,0 +1,122 @@
|
||||
|
||||
import usb.core
|
||||
import usb.util
|
||||
|
||||
|
||||
class PixelRing:
|
||||
TIMEOUT = 8000
|
||||
|
||||
def __init__(self, dev):
|
||||
self.dev = dev
|
||||
|
||||
def trace(self):
|
||||
self.write(0)
|
||||
|
||||
def mono(self, color):
|
||||
self.write(1, [(color >> 16) & 0xFF, (color >> 8) & 0xFF, color & 0xFF, 0])
|
||||
|
||||
def set_color(self, rgb=None, r=0, g=0, b=0):
|
||||
if rgb:
|
||||
self.mono(rgb)
|
||||
else:
|
||||
self.write(1, [r, g, b, 0])
|
||||
|
||||
def off(self):
|
||||
self.mono(0)
|
||||
|
||||
def listen(self, direction=None):
|
||||
self.write(2)
|
||||
|
||||
wakeup = listen
|
||||
|
||||
def speak(self):
|
||||
self.write(3)
|
||||
|
||||
def think(self):
|
||||
self.write(4)
|
||||
|
||||
wait = think
|
||||
|
||||
def spin(self):
|
||||
self.write(5)
|
||||
|
||||
def show(self, data):
|
||||
self.write(6, data)
|
||||
|
||||
customize = show
|
||||
|
||||
def set_brightness(self, brightness):
|
||||
self.write(0x20, [brightness])
|
||||
|
||||
def set_color_palette(self, a, b):
|
||||
self.write(0x21, [(a >> 16) & 0xFF, (a >> 8) & 0xFF, a & 0xFF, 0, (b >> 16) & 0xFF, (b >> 8) & 0xFF, b & 0xFF, 0])
|
||||
|
||||
def set_vad_led(self, state):
|
||||
self.write(0x22, [state])
|
||||
|
||||
def set_volume(self, volume):
|
||||
self.write(0x23, [volume])
|
||||
|
||||
def change_pattern(self, pattern):
|
||||
if pattern == 'echo':
|
||||
self.write(0x24, [1])
|
||||
else:
|
||||
self.write(0x24, [0])
|
||||
|
||||
def write(self, cmd, data=[0]):
|
||||
self.dev.ctrl_transfer(
|
||||
usb.util.CTRL_OUT | usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_RECIPIENT_DEVICE,
|
||||
0, cmd, 0x1C, data, self.TIMEOUT)
|
||||
|
||||
@property
|
||||
def version(self):
|
||||
return self.dev.ctrl_transfer(
|
||||
usb.util.CTRL_IN | usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_RECIPIENT_DEVICE,
|
||||
0, 0x80 | 0x40, 0x1C, 24, self.TIMEOUT).tostring()
|
||||
|
||||
def close(self):
|
||||
"""
|
||||
close the interface
|
||||
"""
|
||||
usb.util.dispose_resources(self.dev)
|
||||
|
||||
|
||||
def find(vid=0x2886, pid=0x0018):
|
||||
dev = usb.core.find(idVendor=vid, idProduct=pid)
|
||||
if not dev:
|
||||
return
|
||||
|
||||
# configuration = dev.get_active_configuration()
|
||||
|
||||
# interface_number = None
|
||||
# for interface in configuration:
|
||||
# interface_number = interface.bInterfaceNumber
|
||||
|
||||
# if dev.is_kernel_driver_active(interface_number):
|
||||
# dev.detach_kernel_driver(interface_number)
|
||||
|
||||
return PixelRing(dev)
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import time
|
||||
|
||||
pixel_ring = find()
|
||||
print(pixel_ring.version)
|
||||
while True:
|
||||
try:
|
||||
pixel_ring.wakeup(180)
|
||||
time.sleep(3)
|
||||
pixel_ring.listen()
|
||||
time.sleep(3)
|
||||
pixel_ring.think()
|
||||
time.sleep(3)
|
||||
pixel_ring.set_volume(8)
|
||||
time.sleep(3)
|
||||
pixel_ring.off()
|
||||
time.sleep(3)
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
|
||||
pixel_ring.off()
|
@ -0,0 +1,2 @@
|
||||
spidev
|
||||
pyusb
|
@ -0,0 +1,3 @@
|
||||
|
||||
[bdist_wheel]
|
||||
universal = 1
|
@ -0,0 +1,61 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""The setup script."""
|
||||
|
||||
from setuptools import setup, find_packages
|
||||
|
||||
README = \
|
||||
'''
|
||||
RGB LED library for ReSpeaker USB 6+1 Microphone Array, 4 Mic Array for Raspberry Pi
|
||||
to control the pixel ring
|
||||
'''
|
||||
|
||||
|
||||
requirements = [
|
||||
'spidev',
|
||||
'pyusb'
|
||||
]
|
||||
|
||||
setup_requirements = [
|
||||
# TODO: put setup requirements (distutils extensions, etc.) here
|
||||
]
|
||||
|
||||
test_requirements = [
|
||||
'pytest'
|
||||
]
|
||||
|
||||
setup(
|
||||
name='pixel-ring',
|
||||
version='0.1.0',
|
||||
description="respeaker series pixel ring library",
|
||||
long_description=README,
|
||||
author="Yihui Xiong",
|
||||
author_email='yihui.xiong@hotmail.com',
|
||||
url='https://github.com/respeaker/pixel_ring',
|
||||
packages=find_packages(include=['pixel_ring']),
|
||||
include_package_data=True,
|
||||
install_requires=requirements,
|
||||
entry_points={
|
||||
'console_scripts': [
|
||||
'pixel_ring_check=pixel_ring.__init__:main'
|
||||
],
|
||||
},
|
||||
license="GNU General Public License v2",
|
||||
zip_safe=False,
|
||||
keywords='voice doa beamforming kws',
|
||||
classifiers=[
|
||||
'Development Status :: 2 - Pre-Alpha',
|
||||
'Intended Audience :: Developers',
|
||||
'License :: OSI Approved :: GNU General Public License v2 (GPLv2)',
|
||||
'Natural Language :: English',
|
||||
"Programming Language :: Python :: 2",
|
||||
'Programming Language :: Python :: 2.7',
|
||||
'Programming Language :: Python :: 3',
|
||||
'Programming Language :: Python :: 3.4',
|
||||
'Programming Language :: Python :: 3.5',
|
||||
],
|
||||
test_suite='tests',
|
||||
tests_require=test_requirements,
|
||||
setup_requires=setup_requirements,
|
||||
)
|
@ -0,0 +1,7 @@
|
||||
"""
|
||||
It is an empty test as a real test requires to access SPI bus
|
||||
"""
|
||||
|
||||
|
||||
def test_pixel_ring():
|
||||
pass
|
@ -0,0 +1,190 @@
|
||||
|
||||
import usb.core
|
||||
import usb.util
|
||||
|
||||
|
||||
class HidDevice:
|
||||
"""
|
||||
This class provides basic functions to access
|
||||
a USB HID device to write an endpoint
|
||||
"""
|
||||
|
||||
def __init__(self, dev, ep_in, ep_out):
|
||||
self.dev = dev
|
||||
self.ep_in = ep_in
|
||||
self.ep_out = ep_out
|
||||
|
||||
def write(self, data):
|
||||
"""
|
||||
write data on the OUT endpoint associated to the HID interface
|
||||
"""
|
||||
self.ep_out.write(data)
|
||||
|
||||
def read(self):
|
||||
return self.ep_in.read(self.ep_in.wMaxPacketSize, -1)
|
||||
|
||||
def close(self):
|
||||
"""
|
||||
close the interface
|
||||
"""
|
||||
usb.util.dispose_resources(self.dev)
|
||||
|
||||
@staticmethod
|
||||
def find(vid=0x2886, pid=0x0007):
|
||||
dev = usb.core.find(idVendor=vid, idProduct=pid)
|
||||
if not dev:
|
||||
return
|
||||
|
||||
config = dev.get_active_configuration()
|
||||
|
||||
# Iterate on all interfaces to find a HID interface
|
||||
ep_in, ep_out = None, None
|
||||
for interface in config:
|
||||
if interface.bInterfaceClass == 0x03:
|
||||
try:
|
||||
if dev.is_kernel_driver_active(interface.bInterfaceNumber):
|
||||
dev.detach_kernel_driver(interface.bInterfaceNumber)
|
||||
except Exception as e:
|
||||
print(e.message)
|
||||
|
||||
for ep in interface:
|
||||
if ep.bEndpointAddress & 0x80:
|
||||
ep_in = ep
|
||||
else:
|
||||
ep_out = ep
|
||||
break
|
||||
|
||||
|
||||
|
||||
if ep_in and ep_out:
|
||||
hid = HidDevice(dev, ep_in, ep_out)
|
||||
|
||||
return hid
|
||||
|
||||
|
||||
class UsbPixelRing:
|
||||
PIXELS_N = 12
|
||||
|
||||
MONO = 1
|
||||
THINK = 3
|
||||
VOLUME = 5
|
||||
CUSTOM = 6
|
||||
|
||||
def __init__(self, hid=None, pattern=None):
|
||||
self.hid = hid if hid else HidDevice.find()
|
||||
if not self.hid:
|
||||
print('No USB device found')
|
||||
|
||||
colors = [0] * 4 * self.PIXELS_N
|
||||
colors[0] = 0x4
|
||||
colors[1] = 0x40
|
||||
colors[2] = 0x4
|
||||
|
||||
colors[4 + 1] = 0x8
|
||||
colors[4 * 11 + 1] = 0x8
|
||||
|
||||
self.direction_template = colors
|
||||
|
||||
def set_brightness(self, brightness):
|
||||
print('Not support to change brightness')
|
||||
|
||||
def change_pattern(self, pattern=None):
|
||||
print('Not support to change pattern')
|
||||
|
||||
def off(self):
|
||||
self.set_color(rgb=0)
|
||||
|
||||
def set_color(self, rgb=None, r=0, g=0, b=0):
|
||||
if rgb:
|
||||
self.write(0, [self.MONO, rgb & 0xFF, (rgb >> 8) & 0xFF, (rgb >> 16) & 0xFF])
|
||||
else:
|
||||
self.write(0, [self.MONO, b, g, r])
|
||||
|
||||
def think(self):
|
||||
self.write(0, [self.THINK, 0, 0, 0])
|
||||
|
||||
wait = think
|
||||
|
||||
speak = think
|
||||
|
||||
def set_volume(self, pixels):
|
||||
self.write(0, [self.VOLUME, 0, 0, pixels])
|
||||
|
||||
def wakeup(self, angle=0):
|
||||
if angle < 0 or angle > 360:
|
||||
return
|
||||
|
||||
position = int((angle + 15) % 360 / 30) % self.PIXELS_N
|
||||
colors = self.direction_template[-position*4:] + self.direction_template[:-position*4]
|
||||
|
||||
self.write(0, [self.CUSTOM, 0, 0, 0])
|
||||
self.write(3, colors)
|
||||
|
||||
return position
|
||||
|
||||
def listen(self, angle=0):
|
||||
self.write(0, [self.MONO, 0, 0x10, 0])
|
||||
|
||||
def show(self, data):
|
||||
self.write(0, [self.CUSTOM, 0, 0, 0])
|
||||
self.write(3, data)
|
||||
|
||||
@staticmethod
|
||||
def to_bytearray(data):
|
||||
if type(data) is int:
|
||||
array = bytearray([data & 0xFF])
|
||||
elif type(data) is bytearray:
|
||||
array = data
|
||||
elif type(data) is str or type(data) is bytes:
|
||||
array = bytearray(data)
|
||||
elif type(data) is list:
|
||||
array = bytearray(data)
|
||||
else:
|
||||
raise TypeError('%s is not supported' % type(data))
|
||||
|
||||
return array
|
||||
|
||||
def write(self, address, data):
|
||||
data = self.to_bytearray(data)
|
||||
length = len(data)
|
||||
if self.hid:
|
||||
packet = bytearray([address & 0xFF, (address >> 8) & 0xFF, length & 0xFF, (length >> 8) & 0xFF]) + data
|
||||
self.hid.write(packet)
|
||||
|
||||
def close(self):
|
||||
if self.hid:
|
||||
self.hid.close()
|
||||
|
||||
def __call__(self, data):
|
||||
self.write(3, data)
|
||||
|
||||
|
||||
def find():
|
||||
hid = HidDevice.find()
|
||||
|
||||
if hid:
|
||||
pixel_ring = UsbPixelRing(hid)
|
||||
return pixel_ring
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import time
|
||||
|
||||
pixel_ring = UsbPixelRing()
|
||||
while True:
|
||||
try:
|
||||
pixel_ring.wakeup(180)
|
||||
time.sleep(3)
|
||||
pixel_ring.listen()
|
||||
time.sleep(3)
|
||||
pixel_ring.think()
|
||||
time.sleep(3)
|
||||
pixel_ring.set_volume(8)
|
||||
time.sleep(3)
|
||||
pixel_ring.off()
|
||||
time.sleep(3)
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
|
||||
pixel_ring.off()
|
||||
|
@ -0,0 +1,122 @@
|
||||
|
||||
import usb.core
|
||||
import usb.util
|
||||
|
||||
|
||||
class PixelRing:
|
||||
TIMEOUT = 8000
|
||||
|
||||
def __init__(self, dev):
|
||||
self.dev = dev
|
||||
|
||||
def trace(self):
|
||||
self.write(0)
|
||||
|
||||
def mono(self, color):
|
||||
self.write(1, [(color >> 16) & 0xFF, (color >> 8) & 0xFF, color & 0xFF, 0])
|
||||
|
||||
def set_color(self, rgb=None, r=0, g=0, b=0):
|
||||
if rgb:
|
||||
self.mono(rgb)
|
||||
else:
|
||||
self.write(1, [r, g, b, 0])
|
||||
|
||||
def off(self):
|
||||
self.mono(0)
|
||||
|
||||
def listen(self, direction=None):
|
||||
self.write(2)
|
||||
|
||||
wakeup = listen
|
||||
|
||||
def speak(self):
|
||||
self.write(3)
|
||||
|
||||
def think(self):
|
||||
self.write(4)
|
||||
|
||||
wait = think
|
||||
|
||||
def spin(self):
|
||||
self.write(5)
|
||||
|
||||
def show(self, data):
|
||||
self.write(6, data)
|
||||
|
||||
customize = show
|
||||
|
||||
def set_brightness(self, brightness):
|
||||
self.write(0x20, [brightness])
|
||||
|
||||
def set_color_palette(self, a, b):
|
||||
self.write(0x21, [(a >> 16) & 0xFF, (a >> 8) & 0xFF, a & 0xFF, 0, (b >> 16) & 0xFF, (b >> 8) & 0xFF, b & 0xFF, 0])
|
||||
|
||||
def set_vad_led(self, state):
|
||||
self.write(0x22, [state])
|
||||
|
||||
def set_volume(self, volume):
|
||||
self.write(0x23, [volume])
|
||||
|
||||
def change_pattern(self, pattern):
|
||||
if pattern == 'echo':
|
||||
self.write(0x24, [1])
|
||||
else:
|
||||
self.write(0x24, [0])
|
||||
|
||||
def write(self, cmd, data=[0]):
|
||||
self.dev.ctrl_transfer(
|
||||
usb.util.CTRL_OUT | usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_RECIPIENT_DEVICE,
|
||||
0, cmd, 0x1C, data, self.TIMEOUT)
|
||||
|
||||
@property
|
||||
def version(self):
|
||||
return self.dev.ctrl_transfer(
|
||||
usb.util.CTRL_IN | usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_RECIPIENT_DEVICE,
|
||||
0, 0x80 | 0x40, 0x1C, 24, self.TIMEOUT).tostring()
|
||||
|
||||
def close(self):
|
||||
"""
|
||||
close the interface
|
||||
"""
|
||||
usb.util.dispose_resources(self.dev)
|
||||
|
||||
|
||||
def find(vid=0x2886, pid=0x0018):
|
||||
dev = usb.core.find(idVendor=vid, idProduct=pid)
|
||||
if not dev:
|
||||
return
|
||||
|
||||
# configuration = dev.get_active_configuration()
|
||||
|
||||
# interface_number = None
|
||||
# for interface in configuration:
|
||||
# interface_number = interface.bInterfaceNumber
|
||||
|
||||
# if dev.is_kernel_driver_active(interface_number):
|
||||
# dev.detach_kernel_driver(interface_number)
|
||||
|
||||
return PixelRing(dev)
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import time
|
||||
|
||||
pixel_ring = find()
|
||||
print(pixel_ring.version)
|
||||
while True:
|
||||
try:
|
||||
pixel_ring.wakeup(180)
|
||||
time.sleep(3)
|
||||
pixel_ring.listen()
|
||||
time.sleep(3)
|
||||
pixel_ring.think()
|
||||
time.sleep(3)
|
||||
pixel_ring.set_volume(8)
|
||||
time.sleep(3)
|
||||
pixel_ring.off()
|
||||
time.sleep(3)
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
|
||||
pixel_ring.off()
|
@ -1,139 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# PLAY_ACT.py
|
||||
# This script runs the play
|
||||
# It is in a seperate file to enable the mechanism to detect the Google Home speaking, before continuing to the next line
|
||||
|
||||
# Libraries
|
||||
from config import characters, directions
|
||||
from logic import tts, read_script, led_on, led_off, select_script
|
||||
from pixel_ring import pixel_ring
|
||||
from subprocess import call
|
||||
import paho.mqtt.client as mqtt
|
||||
import json
|
||||
import sys
|
||||
from time import sleep
|
||||
|
||||
# Switch of LED's of speakers at the start of the play
|
||||
pixel_ring.off()
|
||||
|
||||
|
||||
|
||||
# === SETUP OF MQTT PART 1 ===
|
||||
|
||||
# Location of the MQTT server
|
||||
HOST = 'localhost'
|
||||
PORT = 1883
|
||||
|
||||
# Subscribe to relevant MQTT topics
|
||||
def on_connect(client, userdata, flags, rc):
|
||||
print("Connected to {0} with result code {1}".format(HOST, rc))
|
||||
# Subscribe to the text detected topic
|
||||
client.subscribe("hermes/asr/textCaptured")
|
||||
client.subscribe("hermes/dialogueManager/sessionQueued")
|
||||
|
||||
# Function which sets a flag when the Google Home is not speaking
|
||||
# Callback of MQTT message that says that the text is captured by the speech recognition (ASR)
|
||||
def done_speaking(client, userdata, msg):
|
||||
print('Google Home is not speaking anymore')
|
||||
client.connected_flag=True
|
||||
|
||||
# Function which removes intents that are by accident activated by the Google Home
|
||||
# e.g. The google home says introduce yourself, which could trigger the other speakers to introduce themselves
|
||||
# Snips works with queing of sessions, so this situation would only happen after this play is finished
|
||||
def remove_sessions(client, userdata, msg):
|
||||
sessionId = json.loads(id.payload)
|
||||
print('delete mistaken intent')
|
||||
client.publish("hermes/dialogueManager/endSession", json.dumps({
|
||||
'sessionId': sessionId,
|
||||
}))
|
||||
|
||||
|
||||
|
||||
|
||||
# === SETUP OF MQTT PART 2 ===
|
||||
|
||||
# Initialise MQTT client
|
||||
client = mqtt.Client()
|
||||
client.connect(HOST, PORT, 60)
|
||||
client.on_connect = on_connect
|
||||
|
||||
|
||||
|
||||
|
||||
# === Read script and run the play ===
|
||||
|
||||
# Flags to check if the system is listening, or not
|
||||
client.connected_flag=False
|
||||
listening = False
|
||||
|
||||
|
||||
# Read the script and run the play
|
||||
|
||||
|
||||
#file = sys.argv[1] # get the chosen act passed by smart_speaker_theatre.py
|
||||
file = select_script('scripts_play/intro/')
|
||||
|
||||
for character, line, direction in read_script(file):
|
||||
input_text = line
|
||||
voice = characters.get(character)[0]
|
||||
speaker = characters.get(character)[1]
|
||||
#speaker = 'default'
|
||||
# Some way to do something with the stage directions will come here
|
||||
action = directions.get(direction[0])
|
||||
pixel_ring.speak()
|
||||
tts(voice, input_text, speaker)
|
||||
|
||||
if action == 'listen_google_home':
|
||||
print('Waiting for the Google Home to finish its talk')
|
||||
|
||||
# # start voice activity detection
|
||||
# client.publish("hermes/asr/startListening", json.dumps({
|
||||
# 'siteId': 'default',
|
||||
# 'init': {
|
||||
# 'type': 'action',
|
||||
# 'canBeEnqueued': True
|
||||
# }
|
||||
# }))
|
||||
|
||||
# Activate the microphone and speech recognition
|
||||
client.publish("hermes/asr/startListening", json.dumps({
|
||||
'siteId': 'default'
|
||||
}))
|
||||
|
||||
# LED to listening mode
|
||||
pixel_ring.listen()
|
||||
|
||||
# create callback
|
||||
client.on_message = done_speaking
|
||||
listening = True
|
||||
|
||||
while listening:
|
||||
client.loop()
|
||||
|
||||
#client.on_message = on_message
|
||||
client.message_callback_add('hermes/asr/textCaptured', done_speaking)
|
||||
|
||||
if client.connected_flag:
|
||||
sleep(1)
|
||||
print('Continue the play')
|
||||
client.connected_flag = False
|
||||
client.message_callback_add('hermes/dialogueManager/sessionQueued', remove_sessions)
|
||||
break
|
||||
|
||||
if action == 'music':
|
||||
print('play audioclip')
|
||||
playing = True
|
||||
|
||||
while playing:
|
||||
call(["aplay", "-D", speaker, "/usr/share/snips/congress.wav"])
|
||||
playing = False
|
||||
|
||||
|
||||
|
||||
pixel_ring.off() # Switch of the lights when done speaking
|
||||
sleep(0.2) # Add a short pause between the lines
|
||||
|
||||
|
||||
print('The act is done.')
|
@ -1,3 +1,3 @@
|
||||
ROGUE: Do you want to continue?
|
||||
RASA: Well, I definitely want to
|
||||
SAINT: So do I
|
||||
SAINT: O K Google.
|
||||
SAINT: What do you most like about people? [Listen to Google Home]
|
||||
SAINT: O K Google. Who are the people that made you? [Listen to Google Home]
|
||||
|
@ -1,3 +1,2 @@
|
||||
ROGUE: Test a question
|
||||
RASA: Well, O K Google. What time is it? [Listen to Google Home]
|
||||
ROGUE: O K Google. What is the weather in Rotterdam? [Listen to Google Home]
|
||||
SAINT: We got an answer.
|
||||
|
Loading…
Reference in New Issue