Python 示例

# python 3.x
# pip3 install paho-mqtt
# Refer to  https://github.com/emqx/MQTT-Client-Examples/blob/master/mqtt-client-Python3/sub_tcp.py


import logging
import random
import time
from paho.mqtt import client as mqtt_client


BROKER = 'mqtt.assetscontrols.com'
PORT = 1883
TOPIC = "upload/{accessKey}/+/#"
# e.g.
#  TOPIC = "upload/B176C0FF1DFF41D488FBFF7908C76E17/+/location"
# B176C0FF1DFF41D488FBFF7908C76E17  is the {accessKey}.need to edit it.

CLIENT_ID = 'B176C0FF1DFF41D488FBFF7908C76E17_webloginuser'  #  any Custom Name
USERNAME = 'webloginuser'  # web applicaton  login-user
PASSWORD = 'cd01ab1bc1f444f6e6cb86505b00fea0'   #web applicaton  login-pass(MD532)
qos = 1
FIRST_RECONNECT_DELAY = 1
RECONNECT_RATE = 2
MAX_RECONNECT_COUNT = 12
MAX_RECONNECT_DELAY = 60

FLAG_EXIT = False


def on_connect(client, userdata, flags, rc, properties=None):
    if rc == 0 and client.is_connected():
        print("Connected to MQTT Broker!")
        client.subscribe(TOPIC,qos)
    else:
        print(f'Failed to connect, return code {rc}')


def on_disconnect(client, userdata, rc, properties=None):
    logging.info("Disconnected with result code: %s", rc)
    reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY
    while reconnect_count < MAX_RECONNECT_COUNT:
        logging.info("Reconnecting in %d seconds...", reconnect_delay)
        time.sleep(reconnect_delay)

        try:
            client.reconnect()
            logging.info("Reconnected successfully!")
            return
        except Exception as err:
            logging.error("%s. Reconnect failed. Retrying...", err)

        reconnect_delay *= RECONNECT_RATE
        reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY)
        reconnect_count += 1
    logging.info("Reconnect failed after %s attempts. Exiting...", reconnect_count)
    global FLAG_EXIT
    FLAG_EXIT = True


def on_message(client, userdata, msg):
    print(f'Received: {msg.payload.decode()}\nfrom topic: {msg.topic} \n')


def connect_mqtt():
    # client = mqtt_client.Client(CLIENT_ID)
    client = mqtt_client.Client(client_id=CLIENT_ID, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION2)
    client.username_pw_set(USERNAME, PASSWORD)
    client.on_connect = on_connect
    client.on_message = on_message
    client.connect(BROKER, PORT, keepalive=60)
    client.on_disconnect = on_disconnect
    return client

###########################################################################################################

#  Tail recursion optimization, decorators
import sys

class TailRecurseException(BaseException):
    def __init__(self, args, kwargs):
        self.args = args
        self.kwargs = kwargs

def tail_call_optimized(g):
    """
    This function decorates a function with tail call
    optimization. It does this by throwing an exception
    if it is it's own grandparent, and catching such
    exceptions to fake the tail call optimization.

    This function fails if the decorated
    function recurses in a non-tail context.
    """
    def func(*args, **kwargs):
        f = sys._getframe()
        if f.f_back and f.f_back.f_back \
            and f.f_back.f_back.f_code == f.f_code:
            raise TailRecurseException(args, kwargs)
        else:
            while 1:
                try:
                    return g(*args, **kwargs)
                except TailRecurseException as e:
                    args = e.args
                    kwargs = e.kwargs
    func.__doc__ = g.__doc__
    return func
############################################################################################################


# Automatic reconnection mechanism. Improve the network physical interruption and the MQTT server actively disconnecting the client connection. 
# Because it will not enter the on_disconnect function, the program will directly exit with an error.
@tail_call_optimized  #  Tail recursion optimization, decorators
def run(try_times):
    logging.basicConfig(format='%(asctime)s - %(levelname)s: %(message)s',
                        level=logging.DEBUG)
    try:
        client = connect_mqtt()
        client.loop_forever()
    except:
        try_times = try_times + 1
        time.sleep(3)
        print("Retry_times---",try_times)
        run(try_times)
    print("Break from MQTT Broker!after trying more than {} times".format(try_times))

# Initialization Attempts
#  The maximum number of attempts was 1037, and the error was "Fatal Python error: Cannot recover from stack overflow."
# Too many recursive function calls in Python caused stack overflow
try_times = 0 



if __name__ == '__main__':
    run(try_times)
文档更新时间: 2025-03-15 14:57   作者:Jeson