Skip to content
Open

3a #7

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
TOPICO0=franco/contador0
TOPICO1=franco/contador1
SERVIDOR=fiounan.duckdns.org
2 changes: 1 addition & 1 deletion clienteMqtt/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.11-slim
FROM python:3.12-slim

WORKDIR /app

Expand Down
62 changes: 56 additions & 6 deletions clienteMqtt/clienteMqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,74 @@
from environs import Env

env = Env()
env.read_env() #lee el archivo con las variables. por defecto .env
env.read_env()

logging.basicConfig(format='%(asctime)s - cliente mqtt - %(levelname)s:%(message)s', level=logging.INFO, datefmt='%d/%m/%Y %H:%M:%S')
logging.basicConfig(format='%(asctime)s - %(taskName)s - %(levelname)s:%(message)s', level=logging.INFO, datefmt='%d/%m/%Y %H:%M:%S')

class Estado:
def __init__(self):
self.contador = 0

async def incrementar(estado): #Aumento el contador cada 3 segundos
asyncio.current_task().set_name("Incremento")
while True:
await asyncio.sleep(3)
estado.contador = estado.contador + 1

async def publicar(client, topico_pub, estado): #
asyncio.current_task().set_name("Publicar")
while True:
await asyncio.sleep(5)
await client.publish(topico_pub, str(estado.contador))
logging.info(f"Contador ({estado.contador}) publicado en {topico_pub}")

async def atender(queue, nombre_tarea):
asyncio.current_task().set_name(nombre_tarea)
while True:
mensaje = await queue.get()
logging.info(f"Recibido en {mensaje.topic}: {mensaje.payload.decode('utf-8')}")

async def escuchar(client, sub1, sub2, queue1, queue2):
asyncio.current_task().set_name("enviar")
async for message in client.messages:
if message.topic.matches(sub1):
await queue1.put(message)
elif message.topic.matches(sub2):
await queue2.put(message)

async def main():
tls_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
tls_context.verify_mode = ssl.CERT_REQUIRED
tls_context.check_hostname = True
tls_context.load_default_certs()

topico0 = env('TOPICO0')
topico1 = env('TOPICO1')
topico_publicar = env('TOPICO_PUB')

estado = Estado()
cola_topico0 = asyncio.Queue()
cola_topico1 = asyncio.Queue()

async with aiomqtt.Client(
env("SERVIDOR"),
port=8883,
tls_context=tls_context,
) as client:
await client.subscribe("#")
async for message in client.messages:
logging.info(str(message.topic) + ": " + message.payload.decode("utf-8"))

await client.subscribe(topico0)
await client.subscribe(topico1)
logging.info("Conectado.")

async with asyncio.TaskGroup() as tg:
tg.create_task(incrementar(estado))
tg.create_task(publicar(client, topico_publicar, estado))
tg.create_task(atender(cola_topico0, "AtencionTopic0"))
tg.create_task(atender(cola_topico1, "AtencionTopico1"))
tg.create_task(escuchar(client, topico0, topico1, cola_topico0, cola_topico1))

if __name__ == "__main__":
asyncio.run(main())
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
13 changes: 13 additions & 0 deletions compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
services:
clientemqtt:
build: ./clienteMqtt
image: clientemqtt
container_name: clientemqtt
environment:
- TZ=America/Argentina/Buenos_Aires
- SERVIDOR=${SERVIDOR}
- TOPICO0=${TOPICO0}
- TOPICO1=${TOPICO1}
- TOPICO_PUB=${TOPICO_PUB}

restart: unless-stopped