I am writing a website for Flask
. I use a bunch of uWSGI
+ NGINX
+ Flask-Socketio
. I use gevent
as an asynchronous module. Errors occur during operation:
RuntimeError: You need to use the gevent-websocket server. uwsgi_response_writev_headers_and_body_do(): Broken pipe [core/writer.c line 306] during > Feb 23 12:57:55 toaa uwsgi[558436]: OSError: write error
I tried different configurations and also removed the async_mode='gevent'
from the socketio
initialization.
wsgi.py
file:
from webapp import app, socketio
if __name__ == '__main__':
socketio.run(app, use_reloader=False, debug=True, log_output=True)
project.ini:
[uwsgi]
module = wsgi:app
master = true
gevent = 1024
gevent-monkey-patch = true
buffer-size=32768 # optionally
socket = /home/sammy/projectnew/projectnew.sock
socket-timeout = 240
chmod-socket = 664
vacuum = true
die-on-term = true
webapp/__init__.py
for application app:
from gevent import monkey
monkey.patch_all()
import grpc.experimental.gevent
grpc.experimental.gevent.init_gevent()
from flask import Flask, session, request
from config import DevelopConfig, MqttConfig, MailConfig, ProductionConfig
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_mail import Mail
from flask_script import Manager
from flask_socketio import SocketIO
# from flask_mqtt import Mqtt
from flask_login import LoginManager
from flask_babel import Babel
from flask_babel_js import BabelJS
from flask_babel import lazy_gettext as _l
from apscheduler.schedulers.gevent import GeventScheduler
# from celery import Celery
app = Flask(__name__)
app.config.from_object(ProductionConfig)
app.config.from_object(MqttConfig)
app.config.from_object(MailConfig)
db = SQLAlchemy(app)
migrate = Migrate(app, db)
mail = Mail(app)
manager = Manager(app, db)
login_manager = LoginManager(app)
login_manager.login_view = 'auth'
login_manager.login_message = _l("Необходимо авторизоваться для доступа к закрытой странице")
login_manager.login_message_category = "error"
# celery = Celery(app.name, broker=Config.CELERY_BROKER_URL)
# celery.conf.update(app.config)
scheduler = GeventScheduler()
# socketio = SocketIO(app) - Production Version
socketio = SocketIO(app, async_mode='gevent')
babel = Babel(app)
babeljs = BabelJS(app=app, view_path='/translations/')
import webapp.views
@babel.localeselector
def get_locale():
# if the user has set up the language manually it will be stored in the session,
# so we use the locale from the user settings
try:
language = session['language']
except KeyError:
language = None
if language is not None:
print(language)
return language
return request.accept_languages.best_match(app.config['LANGUAGES'].keys())
from webapp import models
if __name__ == "__main__":
manager.run()
The class in which the socket itself is used (mqtt.py
):
from webapp import socketio, app
from flask import request
from flask_mqtt import Mqtt
from flask_babel import lazy_gettext as _l
from webapp.tasks import SchedulerTask
from webapp import translate_state_gate as tr_msg
import json
import copy
import logging
mqtt = Mqtt(app)
logger = logging.getLogger('flask.flask_mqtt')
logger.disabled = True
class MqttTOAA(object):
type_topic = ["/Control", "/Data"]
m_request_state = {"comm": "3"}
m_start = {"Gate": "Start"}
m_stop = {"Gate": "Stop"}
qos_request = 1
qos_sub = 2
struct_state_devices = None
POOL_TIME = 2
end_publish = None
devices = None
schedulers_list = list()
sch_task = None
sid_mqtt = None
code_list = list()
def __init__(self, devices, lang):
mqtt._connect()
self.devices = devices
self.sch_task = SchedulerTask()
if lang not in app.config['LANGUAGES'].keys():
lang = 'ru'
self.dict_gate = {"dict_state_button": {'con_Clos': tr_msg.MessageGate.t_message[lang]["f_open"],
'con_Open': tr_msg.MessageGate.t_message[lang]["f_close"],
"fl_OpenClos": (tr_msg.MessageGate.t_message[lang]["f_continue"],
tr_msg.MessageGate.t_message[lang]["f_stop"],
tr_msg.MessageGate.t_message[lang]["f_abort"])},
"dict_state_text": {tr_msg.MessageGate.t_message[lang]["f_open"]:\
tr_msg.MessageGate.t_message[lang]["ps_close"],
tr_msg.MessageGate.t_message[lang]["f_close"]:\
tr_msg.MessageGate.t_message[lang]["ps_open"],
tr_msg.MessageGate.t_message[lang]["f_continue"]:\
tr_msg.MessageGate.t_message[lang]["ps_stop"],
tr_msg.MessageGate.t_message[lang]["f_abort"]:\
tr_msg.MessageGate.t_message[lang]["pr_close"],
tr_msg.MessageGate.t_message[lang]["f_stop"]:\
(tr_msg.MessageGate.t_message[lang]["pr_open"],
tr_msg.MessageGate.t_message[lang]["pr_close"],
tr_msg.MessageGate.t_message[lang]["pr_move"])},
"dict_type_element": {"button": u'', "text": u'', "device_code": u'', },
"state_gate": {},
"position": {"state": u'', "stop": False},
"reverse": False,
}
self.close_msg = tr_msg.MessageGate.t_message[lang]["pr_close"]
self.open_msg = tr_msg.MessageGate.t_message[lang]["pr_open"]
self.create_devices_dict()
self.handle_mqtt_connect()
self.mqtt_onmessage = mqtt.on_message()(self._handle_mqtt_message)
self.mqtt_onlog = mqtt.on_log()(self._handle_logging)
self.socketio_error = socketio.on_error()(self._handle_error)
self.handle_change_state = socketio.on('change_state')(self._handle_change_state)
self.handle_on_connect = socketio.on('connect')(self._handle_on_connect)
self.handle_unsubscribe_all = socketio.on('unsubscribe_all')(self._handle_unsubscribe_all)
def _handle_on_connect(self):
self.sid_mqtt = request.sid
def handle_mqtt_connect(self):
task = None
for dev in self.devices:
if dev.device_code not in self.code_list:
mqtt.subscribe("BK" + dev.device_code + self.type_topic[1], self.qos_sub)
self.code_list.append(dev.device_code)
task = self.sch_task.add_scheduler_publish(dev.device_code,
mqtt,
"BK" + dev.device_code +
self.type_topic[0],
self.m_request_state,
self.qos_request,
self.POOL_TIME)
if task is not None:
self.schedulers_list.append(task)
if len(self.schedulers_list) > 0:
self.sch_task.start_schedulers()
self.code_list.clear()
@staticmethod
def _handle_error():
print(request.event["message"]) # "my error event"
print(request.event["args"]) # (data,)
@staticmethod
def _handle_unsubscribe_all():
mqtt.unsubscribe_all()
def _handle_change_state(self, code):
print(code)
# print(self.struct_state_devices[code])
message = None
if code is not None:
try:
type_g = self.struct_state_devices[code]["state_gate"]
if type_g["fl_OpenClos"] == 1:
message = self.m_stop
else:
if self.struct_state_devices[code]["reverse"] is True:
if self.struct_state_devices[code]["position"]["state"] == self.close_msg:
message = self.m_stop
self.struct_state_devices[code]["position"]["state"] = self.open_msg
else:
message = self.m_start
else:
message = self.m_start
print("Msg:" + str(message))
except Exception as ex:
print(ex)
if message is not None:
mqtt.publish("BK" + code + self.type_topic[0], json.dumps(message), self.qos_request)
else:
print("Error change state " + code)
def _handle_mqtt_message(self, client, userdata, message):
# print("Get message")
# print(self.struct_state_devices)
data = dict(
topic=message.topic,
payload=message.payload.decode(),
qos=message.qos,
)
try:
data = json.loads(data['payload'])
self.gate_msg(data)
except Exception as ex:
print("Exception: " + str(ex))
@staticmethod
def _handle_logging(self, client, userdata, level, buf):
print(level, buf)
pass
def create_devices_dict(self):
if self.struct_state_devices is None:
self.struct_state_devices = dict()
for dev in self.devices:
self.struct_state_devices[dev.device_code] = self.dict_gate.copy()
if dev.typedev.reverse:
self.struct_state_devices[dev.device_code]['reverse'] = True
def gate_msg(self, data):
k = ""
code = data["esp_id"][2:]
dict_dev = copy.deepcopy(self.struct_state_devices[code])
dict_dev["state_gate"] = data.copy()
try:
if dict_dev["state_gate"]["con_Clos"] == 0: # ворота закрыты
# print("1")
k = "con_Clos"
dict_dev["position"]["state"] = k
dict_dev["position"]["stop"] = False
elif dict_dev["state_gate"]["con_Open"] == 0: # ворота открыты
# print("2")
k = "con_Open"
dict_dev["position"]["state"] = k
dict_dev["position"]["stop"] = False
elif dict_dev["state_gate"]["fl_OpenClos"] == 0:
# print("3")
k = "fl_OpenClos"
# обратный ход ворот при закрытии
if dict_dev["position"]["state"] == self.close_msg and dict_dev["reverse"] is True:
# print("4")
k1 = 1
k2 = 0
dict_dev["dict_type_element"]["text"] = \
dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]][k2]
dict_dev["position"]["stop"] = False
else:
# print("5")
k1 = 0
dict_dev["dict_type_element"]["text"] = \
dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]]
dict_dev["position"]["stop"] = True
elif dict_dev["state_gate"]["fl_OpenClos"] == 1:
# print("6")
k = "fl_OpenClos"
if len(dict_dev["position"]["state"]) == 0:
# print("7")
k1 = 1
k2 = 2
dict_dev["dict_type_element"]["text"] = \
dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]][k2]
elif dict_dev["position"]["state"] == "con_Clos" or \
dict_dev["position"]["state"] == self.open_msg:
if dict_dev["position"]["stop"]:
# print("8")
k1 = 1
k2 = 1
dict_dev["position"]["stop"] = False
dict_dev["dict_type_element"]["text"] = \
dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]][k2]
else:
# print("9")
k1 = 1
k2 = 0
dict_dev["dict_type_element"]["text"] = \
dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]][k2]
elif dict_dev["position"]["state"] == "con_Open" or \
dict_dev["position"]["state"] == self.close_msg:
if dict_dev["reverse"]:
# print("10")
k1 = 2
dict_dev["dict_type_element"]["text"] = \
dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]]
else:
if dict_dev["position"]["stop"]:
# print("11")
k1 = 1
k2 = 0
dict_dev["position"]["stop"] = False
dict_dev["dict_type_element"]["text"] = \
dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]][k2]
else:
# print("12")
k1 = 1
k2 = 1
dict_dev["dict_type_element"]["text"] = \
dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]][k2]
if dict_dev["position"]["state"] != dict_dev["dict_type_element"]["text"]:
# print("13")
dict_dev["position"]["state"] = dict_dev["dict_type_element"]["text"]
if k == "fl_OpenClos":
dict_dev["dict_type_element"]["button"] = dict_dev["dict_state_button"][k][k1]
else:
dict_dev["dict_type_element"]["button"] = dict_dev["dict_state_button"][k]
dict_dev["dict_type_element"]["text"] = \
dict_dev["dict_state_text"][dict_dev["dict_state_button"][k]]
except Exception as ex:
print("Exception (gate_msg): " + str(ex))
dict_dev["dict_type_element"]["device_code"] = data["esp_id"][2:]
dict_dev["dict_type_element"]["temp"] = data["temp_1"]
dict_dev["dict_type_element"]["button"] = copy.deepcopy(str(dict_dev["dict_type_element"]["button"]))
dict_dev["dict_type_element"]["text"] = copy.deepcopy(str(dict_dev["dict_type_element"]["text"]))
self.struct_state_devices[code] = copy.deepcopy(dict_dev)
socketio.emit('mqtt_message', data=dict_dev["dict_type_element"], room=self.sid_mqtt)
# print(dict_dev["state_gate"]["esp_id"] + str(dict_dev["dict_type_element"]))
When you use uWSGI, the async_mode should be gevent_uwsgi
:
socketio = SocketIO(app, async_mode='gevent_uwsgi')