224 lines
5.9 KiB
Python
224 lines
5.9 KiB
Python
from machine import Pin, I2C
|
|
from neopixel import NeoPixel
|
|
|
|
import network
|
|
import time
|
|
import urequests
|
|
|
|
Broche = [0, 1, 2, 21, 22, 23, 16, 17, 19, 20, 18]
|
|
|
|
def debug(str):
|
|
if True:
|
|
print(f"--> {str}")
|
|
|
|
class Bouton:
|
|
DEBOUNCE_MS = 500
|
|
dernier_appui = 0
|
|
on = False
|
|
|
|
def __init__(self, nom, pin, todo = None, pin_led = None, led_on = False):
|
|
self.nom = nom
|
|
self.pin = Pin(pin, Pin.IN, Pin.PULL_UP)
|
|
self.pin.irq(trigger = Pin.IRQ_FALLING, handler = self.bouton_presse)
|
|
self.todo = todo
|
|
if pin_led is None:
|
|
self.avec_led = False
|
|
else:
|
|
self.avec_led = True
|
|
self.pin_led = Pin(pin_led, Pin.OUT)
|
|
self.led_on() if led_on else self.led_off()
|
|
|
|
def led_on(self):
|
|
if self.avec_led:
|
|
self.pin_led.on()
|
|
self.on = True
|
|
|
|
def led_off(self):
|
|
if self.avec_led:
|
|
self.pin_led.off()
|
|
self.on = False
|
|
|
|
def is_on(self) -> bool:
|
|
if self.avec_led:
|
|
return self.on
|
|
return False
|
|
|
|
def bouton_presse(self, pin):
|
|
self.maintenant = time.ticks_ms()
|
|
if time.ticks_diff(self.maintenant, self.dernier_appui) > self.DEBOUNCE_MS:
|
|
self.dernier_appui = self.maintenant
|
|
print("Bouton " + self.nom + " pressé !")
|
|
if self.todo is not None:
|
|
self.todo()
|
|
|
|
class Relais:
|
|
i2c = I2C(scl=Pin(Broche[5]), sda=Pin(Broche[4]), freq=20000)
|
|
I2C_ADDR = 0x11
|
|
|
|
def __init__(self) -> None:
|
|
self.solaire()
|
|
|
|
def off(self):
|
|
self.i2c.writeto_mem(self.I2C_ADDR, 0x10, bytes([0b0000]))
|
|
|
|
def solaire(self):
|
|
self.i2c.writeto_mem(self.I2C_ADDR, 0x10, bytes([0b0011]))
|
|
|
|
def reseau(self):
|
|
self.i2c.writeto_mem(self.I2C_ADDR, 0x10, bytes([0b1100]))
|
|
|
|
class Cloud:
|
|
token_ha = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJkNGY4MDdmYWYzNDQ0NTc0ODY4MmFmNzA4NDdmMTE0MyIsImlhdCI6MTc2NDQ1Mzk0NSwiZXhwIjoyMDc5ODEzOTQ1fQ.DJgSqeTKPHWbKEFH3HuFih4QKt3CSqLqot34_vhCOQU"
|
|
headers = {"Authorization": f"Bearer {token_ha}", "Content-Type": "application/json"}
|
|
wifi = network.WLAN(network.STA_IF)
|
|
dernier_appel_zoe = 0
|
|
derniere_val_zoe = 0
|
|
dernier_appel_HC = 0
|
|
derniere_val_HC = False
|
|
ttl = 60 * 1000
|
|
|
|
def net_up(self):
|
|
if not self.wifi.isconnected():
|
|
self.wifi.active(True)
|
|
self.wifi.connect("Livebox-BDC6", "KFQSn7PDCMSgPpM2Ws")
|
|
while not self.wifi.isconnected():
|
|
print(".")
|
|
|
|
def zoe(self) -> int:
|
|
try:
|
|
self.maintenant = time.ticks_ms()
|
|
if time.ticks_diff(self.maintenant, self.dernier_appel_zoe) > self.ttl:
|
|
self.dernier_appel_zoe = self.maintenant
|
|
self.net_up()
|
|
debug("appel sensor.zoe_batterie")
|
|
response = urequests.get("https://ha-demo.arbi.fr/api/states/sensor.zoe_batterie", headers=self.headers)
|
|
data = response.json()
|
|
response.close()
|
|
self.derniere_val_zoe = int(data["state"])
|
|
return self.derniere_val_zoe
|
|
except Exception as e:
|
|
print("Erreur :", e)
|
|
return -1
|
|
|
|
def HC(self) -> bool:
|
|
try:
|
|
self.maintenant = time.ticks_ms()
|
|
if time.ticks_diff(self.maintenant, self.dernier_appel_HC) > self.ttl:
|
|
self.dernier_appel_HC = self.maintenant
|
|
self.net_up()
|
|
debug("appel sensor.compteur_linky_ptec")
|
|
response = urequests.get("https://ha-demo.arbi.fr/api/states/sensor.compteur_linky_ptec", headers=self.headers)
|
|
data = response.json()
|
|
response.close()
|
|
val = data["state"]
|
|
print(f"--------------> {val}")
|
|
self.derniere_val_HC = (val == "HC..")
|
|
return self.derniere_val_HC
|
|
except Exception as e:
|
|
print("Erreur :", e)
|
|
return False
|
|
|
|
class Leds:
|
|
brightness = 0.01 # 1 %
|
|
pin_leds = Pin(Broche[8], Pin.OUT)
|
|
np = NeoPixel(pin_leds, 10)
|
|
vert = (0, 255, 0)
|
|
noir = (0, 0, 0)
|
|
orange = (255, 165, 0)
|
|
bascule = False
|
|
|
|
def _led(self, i, r, g, b):
|
|
self.np[i] = (
|
|
int(r * self.brightness),
|
|
int(g * self.brightness),
|
|
int(b * self.brightness)
|
|
)
|
|
|
|
def afficher(self, charge:int, consigne):
|
|
self.bascule = not self.bascule
|
|
nb_dizaine = charge // 10
|
|
led_consigne = (consigne // 10) - 1
|
|
for led in range(0, 10):
|
|
self._led(led, *self.noir)
|
|
for led in range(0, nb_dizaine):
|
|
self._led(led, *self.vert)
|
|
if self.bascule:
|
|
self._led(nb_dizaine, *self.vert)
|
|
else:
|
|
self._led(led_consigne, *self.orange)
|
|
self.np.write()
|
|
|
|
# x = charge // 10
|
|
# print(f"--> {x}")
|
|
# pass
|
|
|
|
# Capter exception pour éviter les sorties intempestives (comme timeout sur ntp)
|
|
# Regarder https://docs.micropython.org/en/latest/esp32/quickref.html#timers
|
|
|
|
relais = Relais()
|
|
|
|
cloud = Cloud()
|
|
|
|
def action_jaune():
|
|
global consigne
|
|
bouton_jaune.led_on()
|
|
bouton_bleu.led_off()
|
|
bouton_rouge.led_off()
|
|
consigne = 80
|
|
|
|
bouton_jaune = Bouton("jaune", Broche[1], action_jaune, Broche[0], True)
|
|
|
|
def action_bleu():
|
|
global consigne
|
|
bouton_jaune.led_off()
|
|
bouton_bleu.led_on()
|
|
bouton_rouge.led_off()
|
|
consigne = 80
|
|
|
|
bouton_bleu = Bouton("bleu", Broche[3], action_bleu, Broche[2])
|
|
|
|
def action_rouge():
|
|
global consigne
|
|
bouton_jaune.led_off()
|
|
bouton_bleu.led_off()
|
|
bouton_rouge.led_on()
|
|
consigne = 80
|
|
|
|
bouton_rouge = Bouton("bleu", Broche[10], action_rouge, Broche[9])
|
|
|
|
def action_moins():
|
|
global consigne
|
|
consigne = max(10, consigne - 10)
|
|
|
|
bouton_moins = Bouton("moins", Broche[7], action_moins)
|
|
|
|
def action_plus():
|
|
global consigne
|
|
consigne = min(100, consigne + 10)
|
|
|
|
bouton_plus = Bouton("plus", Broche[6], action_plus)
|
|
|
|
leds = Leds()
|
|
|
|
#######################################################""
|
|
|
|
pas_temps = 1
|
|
i = 0
|
|
consigne = 80
|
|
|
|
while True:
|
|
i = (i + 1) % 10
|
|
print(f"-------------- {i}")
|
|
charge = cloud.zoe()
|
|
print(f"Charge : {charge}")
|
|
print(f"HC : {cloud.HC()}")
|
|
print(f"Consigne : {consigne}")
|
|
leds.afficher(charge = charge, consigne = consigne)
|
|
if (consigne > charge) and (bouton_bleu.is_on() or bouton_rouge.is_on()):
|
|
print(f"{consigne} > {charge} On force le réseau")
|
|
relais.reseau()
|
|
else:
|
|
print(f"On force le solaire")
|
|
relais.solaire()
|
|
time.sleep(pas_temps)
|