nginxuwsgipython-3.8geventflask-socketio

Can't fix error "RuntimeError: You need to use the gevent-websocket server." and "OSError: write error"


I am writing a website for Flask. I use a bunch of uWSGI + NGINX + Flask-Socketio. I use gevent as an asynchronous module. Errors occur during operation:

RuntimeError: You need to use the gevent-websocket server. uwsgi_response_writev_headers_and_body_do(): Broken pipe [core/writer.c line 306] during > Feb 23 12:57:55 toaa uwsgi[558436]: OSError: write error

I tried different configurations and also removed the async_mode='gevent' from the socketio initialization.

wsgi.py file:

from webapp import app, socketio

if __name__ == '__main__':
    socketio.run(app, use_reloader=False, debug=True, log_output=True)

project.ini:

[uwsgi]
module = wsgi:app

master = true
gevent = 1024
gevent-monkey-patch = true
buffer-size=32768 # optionally

socket = /home/sammy/projectnew/projectnew.sock
socket-timeout = 240
chmod-socket = 664

vacuum = true
die-on-term = true

webapp/__init__.py for application app:

from gevent import monkey

monkey.patch_all()
import grpc.experimental.gevent

grpc.experimental.gevent.init_gevent()

from flask import Flask, session, request
from config import DevelopConfig, MqttConfig, MailConfig, ProductionConfig
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_mail import Mail
from flask_script import Manager
from flask_socketio import SocketIO
# from flask_mqtt import Mqtt
from flask_login import LoginManager
from flask_babel import Babel
from flask_babel_js import BabelJS
from flask_babel import lazy_gettext as _l
from apscheduler.schedulers.gevent import GeventScheduler
# from celery import Celery


app = Flask(__name__)
app.config.from_object(ProductionConfig)     
app.config.from_object(MqttConfig)        
app.config.from_object(MailConfig)         
db = SQLAlchemy(app)
migrate = Migrate(app, db)
mail = Mail(app)
manager = Manager(app, db)
login_manager = LoginManager(app)
login_manager.login_view = 'auth'    
login_manager.login_message = _l("Необходимо авторизоваться для доступа к закрытой странице")
login_manager.login_message_category = "error"
# celery = Celery(app.name, broker=Config.CELERY_BROKER_URL)
# celery.conf.update(app.config)
scheduler = GeventScheduler()
# socketio = SocketIO(app) - Production Version
socketio = SocketIO(app, async_mode='gevent')
babel = Babel(app)
babeljs = BabelJS(app=app, view_path='/translations/')
import webapp.views


@babel.localeselector
def get_locale():
    # if the user has set up the language manually it will be stored in the session,
    # so we use the locale from the user settings
    try:
        language = session['language']
    except KeyError:
        language = None
    if language is not None:
        print(language)
        return language
    return request.accept_languages.best_match(app.config['LANGUAGES'].keys())


from webapp import models

if __name__ == "__main__":
    manager.run()

The class in which the socket itself is used (mqtt.py):

from webapp import socketio, app
from flask import request
from flask_mqtt import Mqtt
from flask_babel import lazy_gettext as _l
from webapp.tasks import SchedulerTask
from webapp import translate_state_gate as tr_msg
import json
import copy
import logging

mqtt = Mqtt(app)
logger = logging.getLogger('flask.flask_mqtt')
logger.disabled = True


class MqttTOAA(object):
    type_topic = ["/Control", "/Data"]
    m_request_state = {"comm": "3"}
    m_start = {"Gate": "Start"}
    m_stop = {"Gate": "Stop"}
    qos_request = 1
    qos_sub = 2
    struct_state_devices = None
    POOL_TIME = 2
    end_publish = None
    devices = None
    schedulers_list = list()
    sch_task = None
    sid_mqtt = None
    code_list = list()

    def __init__(self, devices, lang):
        mqtt._connect()
        self.devices = devices
        self.sch_task = SchedulerTask()
        if lang not in app.config['LANGUAGES'].keys():
            lang = 'ru'
        self.dict_gate = {"dict_state_button": {'con_Clos': tr_msg.MessageGate.t_message[lang]["f_open"],
                                                'con_Open': tr_msg.MessageGate.t_message[lang]["f_close"],
                                                "fl_OpenClos": (tr_msg.MessageGate.t_message[lang]["f_continue"],
                                                           tr_msg.MessageGate.t_message[lang]["f_stop"],
                                                           tr_msg.MessageGate.t_message[lang]["f_abort"])},
                          "dict_state_text": {tr_msg.MessageGate.t_message[lang]["f_open"]:\
                                                 tr_msg.MessageGate.t_message[lang]["ps_close"],
                                             tr_msg.MessageGate.t_message[lang]["f_close"]:\
                                                 tr_msg.MessageGate.t_message[lang]["ps_open"],
                                             tr_msg.MessageGate.t_message[lang]["f_continue"]:\
                                                 tr_msg.MessageGate.t_message[lang]["ps_stop"],
                                             tr_msg.MessageGate.t_message[lang]["f_abort"]:\
                                                 tr_msg.MessageGate.t_message[lang]["pr_close"],
                                             tr_msg.MessageGate.t_message[lang]["f_stop"]:\
                                                 (tr_msg.MessageGate.t_message[lang]["pr_open"],
                                                  tr_msg.MessageGate.t_message[lang]["pr_close"],
                                                  tr_msg.MessageGate.t_message[lang]["pr_move"])},
                          "dict_type_element": {"button": u'', "text": u'', "device_code": u'', },
                          "state_gate": {},
                          "position": {"state": u'', "stop": False},
                          "reverse": False,
                          }
        self.close_msg = tr_msg.MessageGate.t_message[lang]["pr_close"]
        self.open_msg = tr_msg.MessageGate.t_message[lang]["pr_open"]

        self.create_devices_dict()
        self.handle_mqtt_connect()
        self.mqtt_onmessage = mqtt.on_message()(self._handle_mqtt_message)
        self.mqtt_onlog = mqtt.on_log()(self._handle_logging)
        self.socketio_error = socketio.on_error()(self._handle_error)
        self.handle_change_state = socketio.on('change_state')(self._handle_change_state)
        self.handle_on_connect = socketio.on('connect')(self._handle_on_connect)
        self.handle_unsubscribe_all = socketio.on('unsubscribe_all')(self._handle_unsubscribe_all)

    def _handle_on_connect(self):
        self.sid_mqtt = request.sid


    def handle_mqtt_connect(self):
        task = None
        for dev in self.devices:
            if dev.device_code not in self.code_list:  
                mqtt.subscribe("BK" + dev.device_code + self.type_topic[1], self.qos_sub)
                self.code_list.append(dev.device_code)
                task = self.sch_task.add_scheduler_publish(dev.device_code,
                                                           mqtt,
                                                           "BK" + dev.device_code +
                                                           self.type_topic[0],
                                                           self.m_request_state,
                                                           self.qos_request,
                                                           self.POOL_TIME)
                if task is not None:
                    self.schedulers_list.append(task)

        if len(self.schedulers_list) > 0:
            self.sch_task.start_schedulers()
            self.code_list.clear()


    @staticmethod
    def _handle_error():
        print(request.event["message"])  # "my error event"
        print(request.event["args"])  # (data,)


    @staticmethod
    def _handle_unsubscribe_all():
        mqtt.unsubscribe_all()

   
    def _handle_change_state(self, code):
        print(code)
        # print(self.struct_state_devices[code])
        message = None
        if code is not None:
            try:
                type_g = self.struct_state_devices[code]["state_gate"]

                if type_g["fl_OpenClos"] == 1:  
                    message = self.m_stop
                else:
                    if self.struct_state_devices[code]["reverse"] is True:
                       
                        if self.struct_state_devices[code]["position"]["state"] == self.close_msg:
                            message = self.m_stop
                            self.struct_state_devices[code]["position"]["state"] = self.open_msg
                        else:
                            message = self.m_start
                    else:  
                        message = self.m_start
                print("Msg:" + str(message))
            except Exception as ex:
                print(ex)
            if message is not None:
                mqtt.publish("BK" + code + self.type_topic[0], json.dumps(message), self.qos_request)
            else:
                print("Error change state " + code)

    
    def _handle_mqtt_message(self, client, userdata, message):
        # print("Get message")
        # print(self.struct_state_devices)
        data = dict(
            topic=message.topic,
            payload=message.payload.decode(),
            qos=message.qos,
            )
        try:
            data = json.loads(data['payload'])
            self.gate_msg(data)
        except Exception as ex:
            print("Exception: " + str(ex))

   
    @staticmethod
    def _handle_logging(self, client, userdata, level, buf):
        print(level, buf)
        pass

    
    def create_devices_dict(self):
        if self.struct_state_devices is None:
            self.struct_state_devices = dict()
        for dev in self.devices:
            self.struct_state_devices[dev.device_code] = self.dict_gate.copy()  
            if dev.typedev.reverse:
                self.struct_state_devices[dev.device_code]['reverse'] = True

   
    def gate_msg(self, data):
        k = ""
        code = data["esp_id"][2:]
        dict_dev = copy.deepcopy(self.struct_state_devices[code])
        dict_dev["state_gate"] = data.copy()
        try:
            if dict_dev["state_gate"]["con_Clos"] == 0:  # ворота закрыты
                # print("1")
                k = "con_Clos"
                dict_dev["position"]["state"] = k
                dict_dev["position"]["stop"] = False
            elif dict_dev["state_gate"]["con_Open"] == 0:  # ворота открыты
                # print("2")
                k = "con_Open"
                dict_dev["position"]["state"] = k
                dict_dev["position"]["stop"] = False
            elif dict_dev["state_gate"]["fl_OpenClos"] == 0:  
                # print("3")
                k = "fl_OpenClos"
                # обратный ход ворот при закрытии
                if dict_dev["position"]["state"] == self.close_msg and dict_dev["reverse"] is True:
                    # print("4")
                    k1 = 1
                    k2 = 0
                    dict_dev["dict_type_element"]["text"] = \
                        dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]][k2]
                    dict_dev["position"]["stop"] = False
                else:
                    # print("5")
                    k1 = 0
                    dict_dev["dict_type_element"]["text"] = \
                        dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]]
                    dict_dev["position"]["stop"] = True
            elif dict_dev["state_gate"]["fl_OpenClos"] == 1:  
                # print("6")
                k = "fl_OpenClos"
                
                if len(dict_dev["position"]["state"]) == 0:
                    # print("7")
                    k1 = 1
                    k2 = 2
                    dict_dev["dict_type_element"]["text"] = \
                        dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]][k2]
                
                elif dict_dev["position"]["state"] == "con_Clos" or \
                        dict_dev["position"]["state"] == self.open_msg:
                    if dict_dev["position"]["stop"]:
                        # print("8")
                        k1 = 1
                        k2 = 1
                        dict_dev["position"]["stop"] = False
                        dict_dev["dict_type_element"]["text"] = \
                            dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]][k2]
                    else:
                        # print("9")
                        k1 = 1
                        k2 = 0
                        dict_dev["dict_type_element"]["text"] = \
                            dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]][k2]
                elif dict_dev["position"]["state"] == "con_Open" or \
                        dict_dev["position"]["state"] == self.close_msg:
                    if dict_dev["reverse"]:
                        # print("10")
                        k1 = 2
                        dict_dev["dict_type_element"]["text"] = \
                            dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]]
                    else:
                        if dict_dev["position"]["stop"]:
                            # print("11")
                            k1 = 1
                            k2 = 0
                            dict_dev["position"]["stop"] = False
                            dict_dev["dict_type_element"]["text"] = \
                                dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]][k2]
                        else:
                            # print("12")
                            k1 = 1
                            k2 = 1
                            dict_dev["dict_type_element"]["text"] = \
                                dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]][k2]

                if dict_dev["position"]["state"] != dict_dev["dict_type_element"]["text"]:
                    # print("13")
                    dict_dev["position"]["state"] = dict_dev["dict_type_element"]["text"]

            if k == "fl_OpenClos":
                dict_dev["dict_type_element"]["button"] = dict_dev["dict_state_button"][k][k1]
            else:
                dict_dev["dict_type_element"]["button"] = dict_dev["dict_state_button"][k]
                dict_dev["dict_type_element"]["text"] = \
                    dict_dev["dict_state_text"][dict_dev["dict_state_button"][k]]

        except Exception as ex:
            print("Exception (gate_msg): " + str(ex))
        dict_dev["dict_type_element"]["device_code"] = data["esp_id"][2:]
        dict_dev["dict_type_element"]["temp"] = data["temp_1"]
        dict_dev["dict_type_element"]["button"] = copy.deepcopy(str(dict_dev["dict_type_element"]["button"]))
        dict_dev["dict_type_element"]["text"] = copy.deepcopy(str(dict_dev["dict_type_element"]["text"]))
        self.struct_state_devices[code] = copy.deepcopy(dict_dev)
        socketio.emit('mqtt_message', data=dict_dev["dict_type_element"], room=self.sid_mqtt)
        # print(dict_dev["state_gate"]["esp_id"] + str(dict_dev["dict_type_element"]))

Solution

  • When you use uWSGI, the async_mode should be gevent_uwsgi:

    socketio = SocketIO(app, async_mode='gevent_uwsgi')