Rename module from motor_passo to spectrometer

This commit is contained in:
2023-10-29 20:56:03 -03:00
parent 02051ebda7
commit 00633508f0
15 changed files with 11 additions and 11 deletions

7
spectrometer/__init__.py Normal file
View File

@@ -0,0 +1,7 @@
from appdirs import AppDirs
from pathlib import Path
dirs = AppDirs("spectrometer", appauthor="uff")
log_dir = Path(dirs.user_log_dir) # XXX
config_dir = Path(dirs.user_config_dir) # XXX
data_dir = Path(dirs.user_data_dir)

51
spectrometer/config.py Normal file
View File

@@ -0,0 +1,51 @@
import json
import logging
from dataclasses import dataclass, field
from spectrometer import config_dir
from pathlib import Path
@dataclass
class Config:
config_path: Path
_json: dict = field(default_factory=dict)
_l: logging.Logger = None
def __post_init__(self):
self._l = logging.getLogger(__name__).getChild(self.__class__.__name__)
def load(self) -> bool:
self.config_path = self.config_path.expanduser()
self._l.info(f"Carregando configuração de {self.config_path}")
try:
with self.config_path.open("r") as f:
self._json = json.load(f)
for k, v in self._json.items():
setattr(self, k, v)
except FileNotFoundError:
self._l.exception("Arquivo de configuração não existe")
raise RuntimeError() # TODO: Usar exception customizada
except json.JSONDecodeError:
self._l.exception("Configuração inválida")
raise RuntimeError() # TODO: Usar exception customizada
return True
_configs = {}
def get_config(name: str) -> Config:
global _configs
if name in _configs:
return _configs[name]
_configs[name] = Config(config_path=config_dir / name)
_configs[name].load()
return _configs[name]

35
spectrometer/encoder.py Normal file
View File

@@ -0,0 +1,35 @@
import RPi.GPIO as gpio
from dataclasses import dataclass
@dataclass
class Encoder:
pin_a: int
pin_b: int
_curr_steps: int = 0
def setup(self):
if not gpio.getmode():
gpio.setmode(gpio.BCM)
gpio.setup(self.pin_a, gpio.IN)
gpio.setup(self.pin_b, gpio.IN)
gpio.add_event_detect(self.pin_a,
gpio.RISING,
callback=self._event_detect)
gpio.add_event_detect(self.pin_b,
gpio.RISING,
callback=self._event_detect)
def _event_detect(self, pin):
a = gpio.input(self.pin_a)
b = gpio.input(self.pin_b)
if a ^ b:
self._curr_steps += 1 if a else -1
@property
def angle(self) -> float:
return self._curr_steps / 5000 * 360

View File

View File

@@ -0,0 +1,29 @@
import sys
from spectrometer.motor import Motor
from spectrometer.encoder import Encoder
from spectrometer.utils.utils import setup_cleanup, _cleanup
def main(args: list):
setup_cleanup()
motor = Motor([13, 19, 26])
motor.setup()
motor.set_speed(50)
encoder = Encoder(6, 5)
encoder.setup()
while encoder.angle <= 360:
motor.step(-5)
print(encoder.angle)
if __name__ == "__main__":
try:
main(sys.argv)
except Exception as e:
print(str(e))
finally:
_cleanup()

View File

View File

@@ -0,0 +1,17 @@
from spectrometer.encoder import Encoder
from spectrometer.utils.utils import setup_cleanup
from time import sleep
def main():
setup_cleanup()
encoder = Encoder(6, 5)
encoder.setup()
while True:
print(encoder.angle)
sleep(1)
if __name__ == "__main__":
main()

View File

View File

@@ -0,0 +1,17 @@
from spectrometer.motor import Motor
from spectrometer.utils.utils import setup_cleanup
def main():
setup_cleanup()
motor = Motor([13, 19, 26])
motor.setup()
motor.set_speed(10)
while True:
input()
motor.step(1)
if __name__ == "__main__":
main()

48
spectrometer/motor.py Normal file
View File

@@ -0,0 +1,48 @@
import RPi.GPIO as gpio
from dataclasses import dataclass, field
from time import sleep
@dataclass
class Motor:
pins: list[int] = field(default_factory=list)
rev_steps: int = 24
_delay: int = 0
_step_configs: list[list[int]] = field(default_factory=lambda: [
[[1, 0, 0], [0, 1, 0], [0, 0, 1]],
])
_next_step: int = 0
_period: int = 3
def __post_init__(self) -> None:
self.set_speed(4)
def setup(self) -> None:
if not gpio.getmode():
gpio.setmode(gpio.BCM)
for pin in self.pins:
gpio.setup(pin, gpio.OUT)
gpio.output(pin, gpio.LOW)
def step(self, steps: int) -> None:
direction = int(steps / abs(steps))
steps = abs(steps)
for i in range(steps):
self._step(direction)
sleep(self._delay)
def set_speed(self, rpm: int) -> None:
self._delay = 60 / self.rev_steps / rpm
def _step(self, direction: int):
print(self._next_step, self._next_step % self._period)
conf = self._step_configs[0][self._next_step % self._period]
self._next_step += direction
for p, c in zip(self.pins, conf):
gpio.output(p, c)

View File

View File

@@ -0,0 +1,51 @@
import paho.mqtt.client as mqtt
from spectrometer.utils.service import Service
class MqttSubscriber(Service):
def __init__(self, broker_host, broker_port):
super().__init__(name="mqtt_listener")
self.broker_host = broker_host
self.broker_port = broker_port
self._client = mqtt.Client()
self._client.on_connect = self._on_connect
self._client.on_message = self._on_message
self._client.on_disconnect = self._on_disconnect
self._cbs: dict[str, callable] = {}
def run(self) -> None:
self.client.connect(self.broker_host, self.broker_port, 60)
self.client.loop_start()
def subscribe(self, topic: str, cb: callable) -> None:
self._cbs[topic] = cb
if self._client.is_connected():
self._client.subscribe(topic)
def _on_connect(self, client, userdata, flags, rc) -> None:
self._l.debug(f"Conectado com resultado {str(rc)}")
for topic in self._cbs.keys():
client.subscribe(topic)
def _on_disconnect(self, client, userdata, rc) -> None:
self._l.debug(f"Desconectado, código {str(rc)}")
# Start the loop again in case of an unexpected disconnection
client.loop_start()
def _on_message(self, client, userdata, message) -> None:
payload = message.payload.decode("utf-8")
self._l.debug(f"Mensagem recebida no tópico: {message.topic}")
if message.topic not in self._cbs:
self._l.debug(f"Mensagem no tópico {message.topic} não tratada")
return
try:
self._cbs[message.topic](message.topic, payload)
except Exception:
self._l.exception(
f"Falha ao processar mensagem no tópico {message.topic}")

View File

@@ -0,0 +1,17 @@
import threading
import logging
class Service(threading.Thread):
def __init__(self, name: str = "(sem nome)"):
super().__init__()
log = logging.getLogger(__name__).getChild(self.__class__.__name__)
self.daemon = True
self.name = name
self._l = log.getChild(self.name)
def start(self) -> None:
self._l.debug(f"Iniciando serviço {self.name}")
super().start()

View File

@@ -0,0 +1,12 @@
import signal
def _cleanup(*args, **kwargs):
import sys
import RPi.GPIO as gpio
gpio.cleanup()
sys.exit(0)
def setup_cleanup():
signal.signal(signal.SIGINT, _cleanup)