"""Module that manages the :class:`~SimulRPi.pindb.PinDB` database, threads,
and default keymap.
The threads are responsible for displaying LEDs in the terminal and listening
to the keyboard.
The default keymap maps keyboard keys to GPIO channel numbers and is defined
in `default_key_to_channel_map`_.
"""
import copy
import logging
import os
import threading
from logging import NullHandler
try:
from pynput import keyboard
except ImportError:
print("`pynput` couldn't be found. Thus, no keyboard keys will be detected "
"if pressed or released.\nIf you need this option, install `pynput` "
"with: pip install pynput.\n")
keyboard = None
# NOTE: on Python 3.5 and 3.6, can't use ``import SimulRPi.GPIO as GPIO``
# if circular import
# AttributeError: module 'SimulRPi' has no attribute 'GPIO
import SimulRPi.GPIO
from SimulRPi.mapping import default_key_to_channel_map
from SimulRPi.pindb import PinDB
logger = logging.getLogger(__name__)
logger.addHandler(NullHandler())
[docs]class DisplayExceptionThread(threading.Thread):
"""A subclass from :class:`threading.Thread` that defines threads that can
catch errors if their target functions raise an exception.
Attributes
----------
exception_raised : bool
When the exception is raised, it should be set to `True`. By default, it
is `False`.
exc: :class:`Exception`
Represents the exception raised by the target function.
References
----------
* `stackoverflow <https://stackoverflow.com/a/51270466>`__
"""
def __init__(self, *args, **kwargs):
threading.Thread.__init__(self, *args, **kwargs)
self.exception_raised = False
self.exc = None
[docs] def run(self):
"""Method representing the thread’s activity.
Overridden from the base class :class:`threading.Thread`. This method
invokes the callable object passed to the object’s constructor as the
target argument, if any, with sequential and keyword arguments taken
from the args and kwargs arguments, respectively.
**It also catches and saves any error that the target function might
raise.**
.. important::
The exception is only caught here, not raised. The exception is
further raised in :meth:`SimulRPi.GPIO.output` or
:meth:`SimulRPi.GPIO.wait`. The reason for not raising it here is
because the main program won't catch it. The exception must be
raised outside the thread's ``run`` method so that the thread's
exception can be caught by the main program.
The same reasoning applies to the listening thread's callbacks
:meth:`Manager.on_press` and :meth:`Manager.on_release`.
"""
try:
self._target(*self._args, **self._kwargs)
except Exception as e:
# TODO: important add a method to raise the exception
self.exc = e
if keyboard:
class KeyboardExceptionThread(keyboard.Listener):
"""A subclass from :class:`pynput.keyboard.Listener` that defines
threads that store the exception raised in their target function.
"""
# TODO: check if you can modify run() like you did for DisplayExceptionThread
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.exception_raised = False
self.exc = None
[docs]class Manager:
"""Class that manages the pin database (:class:`SimulRPi.pindb.PinDB`),
the threads responsible for displaying "LEDs" in the terminal and listening
for pressed/released keys, and the default keymap.
The threads are not started right away in ``__init__()`` but in
:meth:`SimulRPi.GPIO.input` for the listening thread and
:meth:`SimulRPi.GPIO.output` for the displaying thread.
They are eventually stopped in :meth:`SimulRPi.GPIO.cleanup`.
The default keymap maps keyboard keys to GPIO channel numbers and is defined
in `default_key_to_channel_map`_.
Attributes
----------
mode : int
Numbering system used to identify the I/O pins on an RPi: `BOARD` or
`BCM`. Default value is :obj:`None`.
warnings : bool
Whether to show warnings when using a pin other than the default GPIO
function (input). Default value is `True`.
enable_printing : bool
Whether to enable printing on the terminal. Default value is `True`.
pin_db : PinDB
A database of :class:`~SimulRPi.pindb.Pin`\s. See
:class:`~SimulRPi.pindb.PinDB` on how to access it.
default_led_symbols : dict
A dictionary that maps each output channel's state ('ON' and 'OFF') to
a LED symbol. By default, it is set to these LED symbols::
default_led_symbols = {
"ON": "🛑",
"OFF": "⚪"
}
key_to_channel_map : dict
A dictionary that maps keyboard keys (:obj:`string`) to GPIO channel
numbers (:obj:`int`). By default, it takes the keys and values defined
in the keymap `default_key_to_channel_map`_.
channel_to_key_map : dict
The reverse dictionary of ``key_to_channel_map``. It maps channels to
keys.
th_display_leds : manager.DisplayExceptionThread
Thread responsible for displaying blinking red dots in the terminal as
to simulate LEDs connected to an RPi.
th_listener : manager.KeyboardExceptionThread
Thread responsible for listening on any pressed or released keyboard
key as to simulate push buttons connected to an RPi.
If ``pynput`` couldn't be imported, ``th_listener`` is :obj:`None`.
Otherwise, it is instantiated from ``manager.KeyboardExceptionThread``.
.. note::
A keyboard listener is a subclass of :class:`threading.Thread`, and
all callbacks will be invoked from the thread.
**Ref.:** https://pynput.readthedocs.io/en/latest/keyboard.html#monitoring-the-keyboard
.. important::
If the ``pynput.keyboard`` module couldn't be imported, the listening
thread ``th_listener`` will not be created and the parts of the
``SimulRPi`` library that monitors the keyboard for any pressed or
released key will be ignored. Only the thread ``th_display_leds`` that
displays "LEDs" in the terminal will be created.
This is necessary for example in the case we are running tests on
travis and we don't want travis to install ``pynput`` in a headless
setup because the following exception will get raised::
Xlib.error.DisplayNameError: Bad display name ""
The tests involving ``pynput`` will be performed with a mock version of
``pynput``.
"""
def __init__(self):
self.mode = None
self.warnings = True
self.enable_printing = True
self.pin_db = PinDB()
self.default_led_symbols = {
"ON": "\U0001F6D1",
"OFF": "\U000026AA"
}
# TODO: call it _channel_cached_info?
self._channel_tmp_info = {}
self.key_to_channel_map = copy.copy(default_key_to_channel_map)
self.channel_to_key_map = {v: k for k, v in
self.key_to_channel_map.items()}
self.th_display_leds = DisplayExceptionThread(
name="thread_display_leds",
target=self.display_leds,
args=())
if keyboard:
self.th_listener = KeyboardExceptionThread(
on_press=self.on_press,
on_release=self.on_release)
self.th_listener.name = "thread_listener"
else:
self.th_listener = None
[docs] def add_pin(self, channel_number, channel_type, pull_up_down=None, initial=None):
"""Add an input or output pin to the pin database.
An instance of :class:`~SimulRPi.pindb.Pin` is created with the given
arguments and added to the pin database :class:`~SimulRPi.pindb.PinDB`.
Parameters
----------
channel_number : int
GPIO channel number associated with the
:class:`~SimulRPi.pindb.Pin` to be added in the pin database.
channel_type : int
Type of a GPIO channel: e.g. 1 (`GPIO.IN`) or 0 (`GPIO.OUT`).
pull_up_down : int or None, optional
Initial value of an input channel, e.g. `GPIO.PUP_UP`. Default
value is :obj:`None`.
initial : int or None, optional
Initial value of an output channel, e.g. `GPIO.HIGH`. Default value
is :obj:`None`.
"""
key = None
tmp_info = self._channel_tmp_info.get(channel_number, {})
if channel_type == SimulRPi.GPIO.IN:
# Get keyboard key associated with the INPUT pin (button)
# TODO: add key also in _channel_tmp_info?
key = self.channel_to_key_map.get(channel_number)
tmp_info['led_symbols'] = None
if self._channel_tmp_info.get(channel_number):
del self._channel_tmp_info[channel_number]
self.pin_db.create_pin(
channel_number=channel_number,
channel_id=tmp_info.get('channel_id', channel_number),
channel_type=channel_type,
channel_name=tmp_info.get('channel_name', channel_number),
key=key,
led_symbols=tmp_info.get('led_symbols', self.default_led_symbols),
pull_up_down=pull_up_down,
initial=initial)
[docs] def bulk_channel_update(self, new_channels_attributes):
"""Update the attributes (e.g. `channel_name` and `led_symbols`) for
multiple channels.
If a channel number is associated with a not yet created
:class:`~SimulRPi.pindb.Pin`, the corresponding attributes will be
temporary saved for later when the pin object will be created with
:meth:`add_pin`.
Parameters
----------
new_channels_attributes : dict
A dictionary mapping channel numbers (:obj:`int`) with channels'
attributes (:obj:`dict`). The accepted attributes are those
specified in :meth:`SimulRPi.GPIO.setchannels`.
**Example**::
new_channels_attributes = {
1: {
'channel_id': 'channel1',
'channel_name': 'The Channel 1',
'led_symbols': {
'ON': '🔵',
'OFF': '⚪ '
}
}.
2: {
'channel_id': 'channel2',
'channel_name': 'The Channel 2',
'key': 'cmd_r'
}
}
"""
for ch_number, ch_attributes in new_channels_attributes.items():
for attribute_name, attribute_value in ch_attributes.items():
self._update_attribute_pins(
attribute_name,
{ch_number: attribute_value})
[docs] def display_leds(self):
"""Displaying thread's **target function** that simulates LEDs
connected to an RPi by blinking red dots in a terminal.
.. highlight:: none
**Example: terminal output** ::
⬤ [9] ⬤ [10] 🔴 [11]
.. highlight:: python
where each dot represents a LED and the number between brackets is the
associated GPIO channel number.
.. important::
:meth:`display_leds` should be run by a thread and eventually
stopped from the main program by setting its ``do_run`` attribute
to `False` to let the thread exit from its target function.
**For example**:
.. code-block:: python
th = DisplayExceptionThread(target=self.display_leds, args=())
th.start()
# Your other code ...
# Time to stop thread
th.do_run = False
th.join()
.. note::
If ``enable_printing`` is set to `True`, the terminal's cursor will
be hidden. It will be eventually shown again in
:meth:`SimulRPi.GPIO.cleanup` which is called by the main program
when it is exiting.
The reason is to avoid messing with the display of LEDs done by the
displaying thread ``th_display_leds``.
.. note::
Since the displaying thread ``th_display_leds`` is an
:class:`DisplayExceptionThread` object, it has an attribute ``exc``
which stores the exception raised by this target function.
"""
# TODO: explain order outputs are setup is how the channels are shown
if self.enable_printing:
# Hide the cursor
# TODO: works on UNIX shell only, not Windows
os.system("tput civis")
print()
th = threading.currentThread()
# TODO: reduce number of prints, i.e. computations
while getattr(th, "do_run", True):
leds = ""
last_msg_length = len(leds) if leds else 0
# test = 1/0
# for channel in sorted(self.channel_output_state_map):
for pin in self.pin_db.output_pins:
channel = pin.channel_number
# TODO: pin could be None
if pin.state == SimulRPi.GPIO.HIGH:
# Turn ON LED
# TODO: safeguard?
led_symbol = pin.led_symbols.get(
'ON', self.default_led_symbols['ON'])
else:
# Turn OFF LED
# TODO: safeguard?
led_symbol = pin.led_symbols.get(
'OFF', self.default_led_symbols['OFF'])
channel = pin.channel_name if pin.channel_name else channel
leds += "{led_symbol}{spaces1}[{channel}]{spaces2}".format(
spaces1=" ",
led_symbol=led_symbol,
channel=channel,
spaces2=" " * 8)
if self.enable_printing:
print(' ' * last_msg_length, end='\r')
print(' {}'.format(leds), end='\r')
if self.enable_printing:
print(' {}'.format(leds))
logger.debug("Stopping thread: {}".format(th.name))
[docs] @staticmethod
def get_key_name(key):
"""Get the name of a keyboard key as a string.
The name of the special or alphanumeric key is given by the `pynput`_
package.
Parameters
----------
key : pynput.keyboard.Key or pynput.keyboard.KeyCode
The keyboard key (from ``pynput.keyboard``) whose name will be
returned.
Returns
-------
key_name : str or None
Returns the name of the given keyboard key if one was found by
`pynput`_. Otherwise, it returns :obj:`None`.
"""
# TODO: how to detect enter key
# print(key)
if hasattr(key, 'char'):
# Alphanumeric key (keyboard.KeyCode)
if key.char == '\x05':
key_name = "insert"
elif key.char == '\x1b':
key_name = "num_lock"
else:
key_name = key.char
elif hasattr(key, 'name'):
# Special key (keyboard.Key)
key_name = key.name
else:
# Unknown key
key_name = None
return key_name
[docs] def on_press(self, key):
"""When a valid keyboard key is pressed, set the associated pin's
state to `GPIO.LOW`.
**Callback** invoked from the thread ``th_listener``.
This thread is used to monitor the keyboard for any valid pressed key.
Only keys defined in the pin database are treated, i.e. keys that were
configured with :meth:`SimulRPi.GPIO.setup` are further processed.
Once a valid key is detected as pressed, the associated pin's state is
changed to `GPIO.LOW`.
Parameters
----------
key : pynput.keyboard.Key, pynput.keyboard.KeyCode, or None
The key parameter passed to callbacks is
* a :class:`pynput.keyboard.Key` for special keys,
* a :class:`pynput.keyboard.KeyCode` for normal alphanumeric keys, or
* :obj:`None` for unknown keys.
**Ref.:** https://bit.ly/3k4whEs
.. note::
If an exception is raised, it is caught to be further raised in
:meth:`SimulRPi.GPIO.input` or :meth:`SimulRPi.GPIO.wait`.
See Also
--------
:meth:`DisplayExceptionThread`: Read the **Important** message that
explains why an exception is not
raised in a thread's callback or
target function.
"""
try:
# test = 1/0
self.pin_db.set_pin_state_from_key(self.get_key_name(key),
state=SimulRPi.GPIO.LOW)
except Exception as e:
self.th_listener.exc = e
[docs] def on_release(self, key):
"""When a valid keyboard key is released, set the associated pin's
state to `GPIO.HIGH`.
**Callback** invoked from the thread ``th_listener``.
This thread is used to monitor the keyboard for any valid released key.
Only keys defined in the pin database are treated, i.e. keys that were
configured with :meth:`SimulRPi.GPIO.setup` are further processed.
Once a valid key is detected as released, the associated pin's state is
changed to `GPIO.HIGH`.
Parameters
----------
key : pynput.keyboard.Key, pynput.keyboard.KeyCode, or None
The key parameter passed to callbacks is
* a :class:`pynput.keyboard.Key` for special keys,
* a :class:`pynput.keyboard.KeyCode` for normal alphanumeric keys, or
* :obj:`None` for unknown keys.
**Ref.:** https://bit.ly/3k4whEs
.. note::
If an exception is raised, it is caught to be further raised in
:meth:`SimulRPi.GPIO.input` or :meth:`SimulRPi.GPIO.wait`.
See Also
--------
:meth:`DisplayExceptionThread`: Read the **Important** message that
explains why an exception is not
raised in a thread's callback or
target function.
"""
try:
# test = 1/0
self.pin_db.set_pin_state_from_key(self.get_key_name(key),
state=SimulRPi.GPIO.HIGH)
except Exception as e:
self.th_listener.exc = e
[docs] def update_channel_names(self, new_channel_names):
"""Update the channels names for multiple channels.
If a channel number is associated with a not yet created
:class:`~SimulRPi.pindb.Pin`, the corresponding `channel_name` will be
temporary saved for later when the pin object will be created with
:meth:`add_pin`.
Parameters
----------
new_channel_names : dict
Dictionary that maps channel numbers (:obj:`int`) to channel names
(:obj:`str`).
**Example**::
new_channel_names = {
1: "The Channel 1",
2: "The Channel 2"
}
"""
# TODO: assert on new_channel_names
self._update_attribute_pins('channel_name', new_channel_names)
[docs] def update_default_led_symbols(self, new_default_led_symbols):
"""Update the default LED symbols used by all output channels.
Parameters
----------
new_default_led_symbols : dict
Dictionary that maps each output state (:obj:`str`, {'`ON`',
'`OFF`'}) to a LED symbol (:obj:`str`).
**Example**::
new_default_led_symbols = {
'ON': '🔵',
'OFF': '⚪ '
}
"""
# TODO: assert on new_led_symbols
new_default_led_symbols = self._clean_led_symbols(new_default_led_symbols)
self.default_led_symbols.update(new_default_led_symbols)
# TODO: unique keymap in both ways
[docs] def update_keymap(self, new_keymap):
"""Update the default dictionary mapping keys and GPIO channels.
``new_keymap`` is a dictionary mapping some keys to their new GPIO
channels, and will be used to update the default keymap
`default_key_to_channel_map`_.
Parameters
----------
new_keymap : dict
Dictionary that maps keys (:obj:`str`) to their new GPIO channels
(:obj:`int`).
**Example**::
new_keymap = {
"f": 24,
"g": 25,
"h": 23
}
Raises
------
TypeError
Raised if a given key is invalid: only special and alphanumeric
keys recognized by `pynput`_ are accepted.
See the documentation for :mod:`SimulRPi.mapping` for a list of
accepted keys.
.. note::
If the key to be updated is associated to a channel that is already
taken by another key, both keys' channels will be swapped. However,
if a key is being linked to a :obj:`None` channel, then it will
take on the maximum channel number available + 1.
"""
# TODO: assert keys (str) and channels (int)
# TODO: test uniqueness in channel numbers of new map
assert len(set(new_keymap.values())) == len(new_keymap)
orig_keych = {}
for key1, new_ch in new_keymap.items():
old_ch = self.key_to_channel_map.get(key1)
# Case 1: if key's channel is not found, maybe it is a special key
# or an alphanumeric key not already in the keymap
# Validate the key before updating the keymaps
if old_ch is None and not self.validate_key(key1):
# Invalid key: the key is neither a special nor an alphanum key
orig_keych.setdefault(key1, None)
raise TypeError("The key '{}' is invalid: only special and "
"alphanumeric keys recognized by `pynput` are "
"accepted. \nSee the documentation for "
"`SimulRPi.mapping` @ https://bit.ly/3fIsd9o "
"for a list of accepted keys.".format(key1))
key2 = self.channel_to_key_map.get(new_ch)
if key2 is None:
# Case 2: the new channel is not associated with any key in the
# keymap. Thus, add the key with the new channel in the keymaps
orig_keych.setdefault(key1, None)
self._update_keymaps_and_pin_db(key_channels=[(key1, new_ch)])
continue
elif key1 == key2 and new_ch == old_ch:
# Case 3: No update necessary since the key with the given
# channel is already in the keymap.
continue
else:
# Case 4: Update the keymaps to reflect the key with the new
# given channel.
orig_keych.setdefault(key1, old_ch)
orig_keych.setdefault(key2, new_ch)
if old_ch is None:
# The new key is not found in the default keymap at all.
# Thus, its channel is None and must be set to an integer
# channel which is equal to the maximum channel available
# plus one. Hence, the second key whose channel is being
# swapped with this new key will not receive a None channel
# which would be problematic since there can be many keys
# with None channels and the keymap channel_to_key_map
# would only keep one key with the None channel.
old_ch = max(self.channel_to_key_map.keys()) + 1
self._update_keymaps_and_pin_db(
key_channels=[(key1, new_ch), (key2, old_ch)])
if orig_keych:
# There were updates and/or there are invalid keys
msg = "Update of Key-to-Channel Map:\n\n"
for i, (key, old_ch) in enumerate(orig_keych.items()):
new_ch = self.key_to_channel_map.get(key)
msg += '\t Key "{}"{}: Channel {} ------> Channel {}\n'.format(
key, " " * (20 - len(key)), old_ch, new_ch)
logger.debug(msg)
[docs] def update_led_symbols(self, new_led_symbols):
"""Update the LED symbols for multiple channels.
If a channel number is associated with a not yet created
:class:`~SimulRPi.pindb.Pin`, the corresponding LED symbols will be
temporary saved for later when the pin object will be created with
:meth:`add_pin`.
Parameters
----------
new_led_symbols : dict
Dictionary that maps channel numbers (:obj:`int`) to LED symbols
(:obj:`dict`).
**Example**::
new_led_symbols = {
1: {
'ON': '🔵',
'OFF': '⚪ '
},
2: {
'ON': '🔵',
'OFF': '⚪ '
}
}
"""
# TODO: assert on new_led_symbols
self._update_attribute_pins('led_symbols', new_led_symbols)
[docs] @staticmethod
def validate_key(key):
"""Validate if a key is recognized by `pynput`_
A valid key can either be:
* a :class:`pynput.keyboard.Key` for special keys (e.g. ``tab`` or \
``up``), or
* a :class:`pynput.keyboard.KeyCode` for normal alphanumeric keys.
Parameters
----------
key : str
The key (e.g. '`tab`') that will be validated.
Returns
-------
retval : bool
Returns `True` if it's a valid key. Otherwise, it returns `False`.
References
----------
`pynput <https://pynput.readthedocs.io/en/latest/keyboard.html#reference>`__
See Also
--------
:mod:`SimulRPi.mapping` : for a list of special keys supported by
`pynput`_.
"""
if not hasattr(keyboard.Key, key) and \
not (len(key) == 1 and key.isalnum()):
# Unrecognized key
# Neither a special key nor an alphanum key
return False
else:
return True
@staticmethod
def _clean_channel_name(channel_number, channel_name):
"""TODO
Parameters
----------
channel_number
channel_name
Returns
-------
"""
return channel_name if channel_name else channel_number
def _clean_led_symbols(self, led_symbols):
"""TODO
Parameters
----------
led_symbols
"""
if led_symbols:
if led_symbols == "default_ascii":
led_symbols = {
"ON": "\033[1;31;48m(0)\033[1;37;0m",
"OFF": "(0)",
}
else:
assert isinstance(led_symbols, dict), \
"Wrong type for `led_symbols`: {}. \nIt should be a " \
"dictionary".format(led_symbols)
for symbol_name, symbol_value in led_symbols.items():
if symbol_value:
symbol_value = symbol_value.replace("\\033", "\033")
else:
symbol_value = self.default_led_symbols[symbol_name]
led_symbols[symbol_name] = symbol_value
else:
led_symbols = self.default_led_symbols
return led_symbols
def _update_attribute_pins(self, attribute_name, new_attributes):
"""TODO
Parameters
----------
attribute_name
new_attributes
Raises
------
ValueError
Raised if
"""
if attribute_name == 'led_symbols':
set_fnc = self.pin_db.set_pin_symbols_from_channel
elif attribute_name == 'channel_name':
set_fnc = self.pin_db.set_pin_name_from_channel
elif attribute_name == 'channel_id':
set_fnc = self.pin_db.set_pin_id_from_channel
else:
raise ValueError("Invalid attribute name: {}".format(attribute_name))
for ch_number, attr_value in new_attributes.items():
# TODO: explain
ch_number = int(ch_number)
if attribute_name == 'led_symbols':
attr_value = self._clean_led_symbols(attr_value)
elif attribute_name == 'channel_name':
attr_value = self._clean_channel_name(ch_number, attr_value)
if not set_fnc(ch_number, attr_value):
self._channel_tmp_info.setdefault(ch_number, {})
self._channel_tmp_info[ch_number].update(
{attribute_name: attr_value})
def _update_keymaps_and_pin_db(self, key_channels):
"""Update the two internal keymaps and the pin database.
``key_channels`` is a list of two-tuples where each tuple contains the
key and its new channel with which it needs to be updated.
The different internal data structures need to be updated to reflect
these changes:
* the two keymaps ``key_to_channel_map`` and ``channel_to_key_map``
* the pin database (:class:`SimulRPi.pindb.PinDB`)
Parameters
----------
key_channels : list of tuple
Where each tuple contains the key and its new channel with which it
needs to be updated.
**For example**::
key_channels = [('f', 25), ('g', 23)])
where the key *'f'* will be mapped to the GPIO channel 25 and the
key *'g'* to the GPIO channel 23.
"""
for key_ch in key_channels:
key = key_ch[0]
channel = key_ch[1]
self.key_to_channel_map[key] = channel
self.channel_to_key_map[channel] = key
self.pin_db.set_pin_key_from_channel(channel, key)