98 lines
2.9 KiB
Python
98 lines
2.9 KiB
Python
import time
|
|
|
|
from flask import Response, request, session
|
|
|
|
from max.db import database
|
|
from max.exceptions import NotFound, PermissionDenied
|
|
from max.not_found_view import NotFoundView
|
|
from max.translations import TranslationHandler
|
|
|
|
|
|
class BaseMiddleware:
|
|
def __init__(self, app, handler):
|
|
self.app = app
|
|
self.handler = handler
|
|
for attr in ("no_auth", "no_csrf"):
|
|
if hasattr(handler, attr):
|
|
setattr(self, attr, True)
|
|
|
|
def __call__(self, *args, **kwargs):
|
|
raise NotImplementedError()
|
|
|
|
|
|
class LoggingMiddleware(BaseMiddleware):
|
|
def __call__(self, *args, **kwargs):
|
|
real_ip = request.headers.get("X-Real-IP", None)
|
|
if real_ip is None and self.app.debug:
|
|
real_ip = request.remote_addr
|
|
setattr(request, "real_remote_addr", real_ip)
|
|
|
|
start = time.time()
|
|
|
|
result = self.handler(*args, **kwargs)
|
|
response: Response = self.app.make_response(result)
|
|
|
|
end = time.time()
|
|
time_in_ms = f"{(end-start)*1000:.2f}"
|
|
|
|
self.app.logger.info(
|
|
'%s %s %s user_agent="%s" referrer="%s" db_queries=%s %s '
|
|
+ "%s user_id=%s session=%s lang=%s %sms",
|
|
request.method,
|
|
request.url,
|
|
request.real_remote_addr,
|
|
request.user_agent,
|
|
request.referrer,
|
|
database.collect_query_count(),
|
|
response.status_code,
|
|
response.content_length,
|
|
session.get("user_id"),
|
|
session.id,
|
|
TranslationHandler.language,
|
|
time_in_ms,
|
|
)
|
|
return result
|
|
|
|
|
|
class ExceptionMiddleware(BaseMiddleware):
|
|
def __call__(self, *args, **kwargs):
|
|
try:
|
|
return self.handler(*args, **kwargs)
|
|
except (NotFound, PermissionDenied) as e:
|
|
notFoundView = NotFoundView(**kwargs)
|
|
return notFoundView.render(), 404
|
|
|
|
|
|
class I18NMiddleware(BaseMiddleware):
|
|
def __call__(self, *args, **kwargs):
|
|
TranslationHandler.set_language(
|
|
session.get(
|
|
"lang",
|
|
TranslationHandler.parse_language_header(
|
|
request.headers.get("accept-language")
|
|
)
|
|
or "en",
|
|
)
|
|
)
|
|
return self.handler(*args, **kwargs)
|
|
|
|
|
|
def add_middleware_stack(app, *middlewares):
|
|
original_add_url_rule = app.add_url_rule
|
|
|
|
def url_rule_with_middlewares(
|
|
rule, endpoint=None, view_func=None, provide_automatic_options=None, **options
|
|
):
|
|
if view_func:
|
|
for middleware in middlewares[::-1]:
|
|
view_func = middleware(app, view_func)
|
|
return original_add_url_rule(
|
|
rule,
|
|
endpoint=endpoint,
|
|
view_func=view_func,
|
|
provide_automatic_options=provide_automatic_options,
|
|
**options,
|
|
)
|
|
|
|
app.add_url_rule = url_rule_with_middlewares
|