FEAT : HC via HA
This commit is contained in:
parent
e2a893a038
commit
3e206ebcfd
@ -1,67 +0,0 @@
|
|||||||
# localtime_fr.py
|
|
||||||
import time
|
|
||||||
import ntptime
|
|
||||||
|
|
||||||
# Fuseaux Europe/Paris
|
|
||||||
TZ_STD = 1 # UTC+1 en hiver
|
|
||||||
TZ_DST = 2 # UTC+2 en été
|
|
||||||
|
|
||||||
# Trouve le dernier dimanche d'un mois
|
|
||||||
def last_sunday(year, month):
|
|
||||||
# On cherche depuis le dernier jour du mois
|
|
||||||
for day in range(31, 0, -1):
|
|
||||||
try:
|
|
||||||
wd = time.localtime(time.mktime((year, month, day, 0, 0, 0, 0, 0)))[6]
|
|
||||||
if wd == 6: # 6 = dimanche
|
|
||||||
return day
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Détecte si on est en heure d'été (DST)
|
|
||||||
def is_dst_europe(tm):
|
|
||||||
year, month, mday, hour = tm[0], tm[1], tm[2], tm[3]
|
|
||||||
|
|
||||||
# Hors périodes
|
|
||||||
if month < 3 or month > 10:
|
|
||||||
return False
|
|
||||||
if month > 3 and month < 10:
|
|
||||||
return True
|
|
||||||
|
|
||||||
# Mois de transition : Mars
|
|
||||||
if month == 3:
|
|
||||||
ls = last_sunday(year, 3)
|
|
||||||
return (mday > ls) or (mday == ls and hour >= 2)
|
|
||||||
|
|
||||||
# Mois de transition : Octobre
|
|
||||||
if month == 10:
|
|
||||||
ls = last_sunday(year, 10)
|
|
||||||
return not ((mday > ls) or (mday == ls and hour >= 3))
|
|
||||||
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Fonction principale
|
|
||||||
def localtime():
|
|
||||||
"""
|
|
||||||
Retourne la date locale Europe/Paris (UTC+1/UTC+2)
|
|
||||||
après application de l'heure d'été/hiver.
|
|
||||||
"""
|
|
||||||
t = list(time.localtime()) # UTC
|
|
||||||
|
|
||||||
if is_dst_europe(t):
|
|
||||||
t[3] += TZ_DST
|
|
||||||
else:
|
|
||||||
t[3] += TZ_STD
|
|
||||||
|
|
||||||
# Normalisation
|
|
||||||
return time.localtime(time.mktime(tuple(t)))
|
|
||||||
|
|
||||||
# Wrapper simple pour datetime-like
|
|
||||||
def now():
|
|
||||||
y, m, d, hh, mm, ss, wd, yd = localtime()
|
|
||||||
return f"{y:04d}-{m:02d}-{d:02d} {hh:02d}:{mm:02d}:{ss:02d}"
|
|
||||||
|
|
||||||
# Sync NTP (facultatif)
|
|
||||||
def sync():
|
|
||||||
ntptime.host = "fr.pool.ntp.org"
|
|
||||||
ntptime.settime()
|
|
||||||
81
main.py
81
main.py
@ -1,20 +1,21 @@
|
|||||||
from machine import Pin, I2C, RTC
|
from machine import Pin, I2C
|
||||||
from neopixel import NeoPixel
|
from neopixel import NeoPixel
|
||||||
|
|
||||||
import network
|
import network
|
||||||
import time
|
import time
|
||||||
import ntptime
|
|
||||||
import urequests
|
import urequests
|
||||||
import localtime_fr # TODO : Lire info HP.. HC.. sur HA > sensor.compteur_linky_ptec
|
|
||||||
import webrepl
|
|
||||||
|
|
||||||
Broche = [0, 1, 2, 21, 22, 23, 16, 17, 19, 20, 18]
|
Broche = [0, 1, 2, 21, 22, 23, 16, 17, 19, 20, 18]
|
||||||
|
|
||||||
|
def debug(str):
|
||||||
|
if True:
|
||||||
|
print(f"--> {str}")
|
||||||
|
|
||||||
class Bouton:
|
class Bouton:
|
||||||
DEBOUNCE_MS = 500
|
DEBOUNCE_MS = 500
|
||||||
|
dernier_appui = 0
|
||||||
|
|
||||||
def __init__(self, nom, pin, todo = None, pin_led = None, led_on = False):
|
def __init__(self, nom, pin, todo = None, pin_led = None, led_on = False):
|
||||||
self.dernier_appui = 0
|
|
||||||
self.nom = nom
|
self.nom = nom
|
||||||
self.pin = Pin(pin, Pin.IN, Pin.PULL_UP)
|
self.pin = Pin(pin, Pin.IN, Pin.PULL_UP)
|
||||||
self.pin.irq(trigger = Pin.IRQ_FALLING, handler = self.bouton_presse)
|
self.pin.irq(trigger = Pin.IRQ_FALLING, handler = self.bouton_presse)
|
||||||
@ -62,19 +63,53 @@ class Cloud:
|
|||||||
token_ha = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJkNGY4MDdmYWYzNDQ0NTc0ODY4MmFmNzA4NDdmMTE0MyIsImlhdCI6MTc2NDQ1Mzk0NSwiZXhwIjoyMDc5ODEzOTQ1fQ.DJgSqeTKPHWbKEFH3HuFih4QKt3CSqLqot34_vhCOQU"
|
token_ha = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJkNGY4MDdmYWYzNDQ0NTc0ODY4MmFmNzA4NDdmMTE0MyIsImlhdCI6MTc2NDQ1Mzk0NSwiZXhwIjoyMDc5ODEzOTQ1fQ.DJgSqeTKPHWbKEFH3HuFih4QKt3CSqLqot34_vhCOQU"
|
||||||
headers = {"Authorization": f"Bearer {token_ha}", "Content-Type": "application/json"}
|
headers = {"Authorization": f"Bearer {token_ha}", "Content-Type": "application/json"}
|
||||||
wifi = network.WLAN(network.STA_IF)
|
wifi = network.WLAN(network.STA_IF)
|
||||||
wifi.active(True)
|
dernier_appel_zoe = 0
|
||||||
wifi.connect("Livebox-BDC6", "KFQSn7PDCMSgPpM2Ws")
|
derniere_val_zoe = 0
|
||||||
|
dernier_appel_HC = 0
|
||||||
|
derniere_val_HC = False
|
||||||
|
ttl = 60 * 1000
|
||||||
|
|
||||||
|
def net_up(self):
|
||||||
|
if not self.wifi.isconnected():
|
||||||
|
self.wifi.active(True)
|
||||||
|
self.wifi.connect("Livebox-BDC6", "KFQSn7PDCMSgPpM2Ws")
|
||||||
|
while not self.wifi.isconnected():
|
||||||
|
print(".")
|
||||||
|
|
||||||
def zoe(self) -> int:
|
def zoe(self) -> int:
|
||||||
try:
|
try:
|
||||||
|
self.maintenant = time.ticks_ms()
|
||||||
|
if time.ticks_diff(self.maintenant, self.dernier_appel_zoe) > self.ttl:
|
||||||
|
self.dernier_appel_zoe = self.maintenant
|
||||||
|
self.net_up()
|
||||||
|
debug("appel sensor.zoe_batterie")
|
||||||
response = urequests.get("https://ha-demo.arbi.fr/api/states/sensor.zoe_batterie", headers=self.headers)
|
response = urequests.get("https://ha-demo.arbi.fr/api/states/sensor.zoe_batterie", headers=self.headers)
|
||||||
data = response.json()
|
data = response.json()
|
||||||
response.close()
|
response.close()
|
||||||
return int(data["state"])
|
self.derniere_val_zoe = int(data["state"])
|
||||||
|
return self.derniere_val_zoe
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print("Erreur :", e)
|
print("Erreur :", e)
|
||||||
return -1
|
return -1
|
||||||
|
|
||||||
|
def HC(self) -> bool:
|
||||||
|
try:
|
||||||
|
self.maintenant = time.ticks_ms()
|
||||||
|
if time.ticks_diff(self.maintenant, self.dernier_appel_HC) > self.ttl:
|
||||||
|
self.dernier_appel_HC = self.maintenant
|
||||||
|
self.net_up()
|
||||||
|
debug("appel sensor.compteur_linky_ptec")
|
||||||
|
response = urequests.get("https://ha-demo.arbi.fr/api/states/sensor.compteur_linky_ptec", headers=self.headers)
|
||||||
|
data = response.json()
|
||||||
|
response.close()
|
||||||
|
val = data["state"]
|
||||||
|
print(f"--------------> {val}")
|
||||||
|
self.derniere_val_HC = (val == "HC..")
|
||||||
|
return self.derniere_val_HC
|
||||||
|
except Exception as e:
|
||||||
|
print("Erreur :", e)
|
||||||
|
return False
|
||||||
|
|
||||||
# Capter exception pour éviter les sorties intempestives (comme timeout sur ntp)
|
# Capter exception pour éviter les sorties intempestives (comme timeout sur ntp)
|
||||||
# Regarder https://docs.micropython.org/en/latest/esp32/quickref.html#timers
|
# Regarder https://docs.micropython.org/en/latest/esp32/quickref.html#timers
|
||||||
|
|
||||||
@ -134,36 +169,16 @@ set_pixel(7, 0, 255, 0)
|
|||||||
np.write()
|
np.write()
|
||||||
# np.brightness(50)
|
# np.brightness(50)
|
||||||
|
|
||||||
pas_temps = 10
|
pas_temps = 1
|
||||||
|
i = 0
|
||||||
|
|
||||||
# def HC(date_heure) -> bool:
|
|
||||||
# heure, minute = date_heure[4], date_heure[5]
|
|
||||||
# nb_minutes = heure * 60 + minute
|
|
||||||
# _21h30 = 21 * 60 + 30 # TODO : Ne pas faire le calcul à chaque appel
|
|
||||||
# _23h30 = 23 * 60 + 30
|
|
||||||
# if (nb_minutes > _21h30 and nb_minutes < _23h30):
|
|
||||||
# return True
|
|
||||||
# return False
|
|
||||||
|
|
||||||
# def ntp():
|
|
||||||
# global ntp_ok, ntp_ttl, ntp_attente
|
|
||||||
# if (wifi.isconnected() and (not ntp_ok or ntp_attente < 0)):
|
|
||||||
# print("WIFI OK, call NTP")
|
|
||||||
# ntptime.settime()
|
|
||||||
# ntp_ok = True
|
|
||||||
# ntp_attente = ntp_ttl
|
|
||||||
# webrepl.start()
|
|
||||||
# if (ntp_ok):
|
|
||||||
# date_heure = localtime_fr.localtime()
|
|
||||||
# print(f"Date et heure : {date_heure[3]}:{date_heure[4]}:{date_heure[5]} ({ntp_attente}, {HC(date_heure)})")
|
|
||||||
# ntp_attente = ntp_attente - pas_temps
|
|
||||||
|
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
# now = datetime.now()
|
# now = datetime.now()
|
||||||
# print("Date et heure :", now)
|
# print("Date et heure :", now)
|
||||||
|
i += 1
|
||||||
|
print(f"-------------- {i}")
|
||||||
print(f"Charge : {cloud.zoe()}")
|
print(f"Charge : {cloud.zoe()}")
|
||||||
|
print(f"HC : {cloud.HC()}")
|
||||||
time.sleep(pas_temps)
|
time.sleep(pas_temps)
|
||||||
# set_pixel(0, 0, 0, 0) # OFF
|
# set_pixel(0, 0, 0, 0) # OFF
|
||||||
# np.write()
|
# np.write()
|
||||||
Loading…
Reference in New Issue
Block a user