Flask-Session
from flask import Flask,views,request,sessionfrom flask_session import Sessionfrom redis import Redisapp=Flask(__name__)app.config['SESSION_TYPE'] = 'redis'app.config['SESSION_REDIS'] = Redis(host="127.0.0.1",port=6379)Session(app)@app.route("/login", methods=["GET", "POST"])def login(): if request.method == "POST": session["user"] = "123" return "post" return "get"if __name__ == '__main__': app.run(debug=True)
WTForms
from flask import Flask, request, render_templatefrom wtforms import Form, validators, widgetsfrom wtforms.fields import simple, coreapp = Flask(__name__)class RegForm(Form): username = simple.StringField( label="用户名", validators=[ validators.DataRequired(message="用户名不能为空"), ], render_kw={ "class": "my_class"} ) pwd = simple.PasswordField( label="密码", validators=[ validators.DataRequired(message="密码不能为空"), validators.length(min=6, max=16, message="长度必须大于等于6,小于等于16") ], render_kw={ "class": "form-control"}, widget = widgets.PasswordInput(), ) pwd_confim = simple.PasswordField( label="确认密码", validators=[ validators.EqualTo("pwd", message="两次密码不一致"), validators.length(min=6, max=16, message="长度必须大于等于6,小于等于16") ], ) gender = core.SelectField( label="性别", choices=( (1, "女"), (2, "男") ), default=1, coerce=int # 限制是int类型的 ) city = core.RadioField( label="城市", choices=( ("1", "北京"), ("2", "上海") ), default=2, coerce=str ) email = simple.StringField( label="邮箱", validators=[ validators.DataRequired(message="邮箱不能为空"), validators.Email(message="格式不对") ] ) hobby = core.SelectMultipleField( label="爱好", choices=( (1, "电影"), (2, "音乐"), (3, "画画"), (4, "看书") ), coerce=int, default=(1, 4) ) favor = core.SelectMultipleField( label="喜好", choices=( (1, '篮球'), (2, '足球'), ), widget=widgets.ListWidget(prefix_label=False), option_widget=widgets.CheckboxInput(), coerce=int, default=[1, 2] ) def __init__(self,*args,**kwargs): #这里的self是一个RegisterForm对象 '''重写__init__方法''' super().__init__(*args, **kwargs) #继承父类的init方法 self.favor.choices =((1, '篮球'), (2, '足球'), (3, '羽毛球')) #吧RegisterForm这个类里面的favor重新赋值 def validate_pwd_confim(self,field,): ''' 自定义pwd_config字段规则,例:与pwd字段是否一致 :param field: :return: ''' # 最开始初始化时,self.data中已经有所有的值 if field.data != self.data['pwd']: # raise validators.ValidationError("密码不一致") # 继续后续验证 raise validators.StopValidation("密码不一致") # 不再继续后续验证class LoginForm(Form): username = simple.StringField( label="用户名", validators=[ validators.DataRequired(message="用户名不能为空"), validators.length(min=4, max=8, message="长度必须大于等于4,小于等于8") ] ) password = simple.PasswordField( label="密码", validators=[ validators.DataRequired(message="密码不能为空"), validators.length(min=4, max=8, message="长度必须大于等于4,小于等于8") ] )@app.route('/register',methods=["GET","POST"])def register(): if request.method=="GET": form = RegForm(data={ 'gender': 1}) #默认是1, return render_template("register.html",form=form) else: form = RegForm(formdata=request.form) if form.validate(): #判断是否验证成功 print('用户提交数据通过格式验证,提交的值为:', form.data) #所有的正确信息 else: print(form.errors) #所有的错误信息 return render_template('register.html', form=form)@app.route("/reg", methods=["GET", "POST"])def reg(): if request.method == "GET": rf = RegForm() return render_template("reg.html", rf=rf) else: rf_data = RegForm(request.form) if rf_data.validate(): print(rf_data.data) return "OK" else: return render_template("reg.html", rf=rf_data)@app.route("/login", methods=["GET", "POST"])def login(): if request.method == "GET": login_form = LoginForm() return render_template("login.html", lf=login_form) else: login_form_data = LoginForm(request.form) if login_form_data.validate(): user = login_form_data.data.get("username") return str(user) else: return render_template("login.html", lf=login_form_data)if __name__ == '__main__': app.run(debug=True)
Title
Flask-请求上下文
# 偏函数partialfrom functools import partialdef my_sum(a, b): print(a) print(b) return a + b# new_my_sum = partial(my_sum, 10)# new_my_sum此时就是下面这样的# def my_sum(b):# a=10new_my_sum = partial(my_sum, b=10) # 位置参数一定要在关键字参数前面 不能写成new_my_sum = partial(my_sum, a=10),如果非要这样写,下面就改成res = new_my_sum(b=20)# new_my_sum此时就是下面这样的# def my_sum(b):# b=10res = new_my_sum(20)print(res)class My: def __call__(self, *args, **kwargs): print(666, '__call__') def __setattr__(self, key, value): print(key, value, '__setattr__') def __getattr__(self, item): print(item, '__getattr__') def __setitem__(self, key, value): print(key, value, '__setitem__') def __getitem__(self, item): print(item, '__getitem__')a = My()a() # 执行__call__a.name = 'tom' # 执行__setattr__a['age'] = 18 # __setitem__a['name'] # __getitem__a.qweqwewqe # __getattr__# 空间换时间# 线程安全 Flask上下文机制就是使用的Threading.local# 线程进来,开辟一个空间给这个线程,线程操作的所有任务,都会复制一份到空间中from threading import localclass Foo(local): passprint('##############################')# 栈Stackclass Mystack(object): data = [] def __setattr__(self, key, value): self.push(key, value) def push(self, key, value): self.data.append({key:value}) def top(self): return self.data.pop()my_stack = Mystack()my_stack.name = 'tom'my_stack.name = 'rose'print(my_stack.data) #[{'name': 'tom'}, {'name': 'rose'}]#LocalStack# import threading# threading.current_thread().ident #当前线程的IDimport threadingfrom threading import get_ident #和上面是一样的class MyLocalStack(object): data=[] storage={} def push(self, item): try: self.storage[get_ident()].append(item) except: self.storage[get_ident()]=[item] def top(self): return self.storage[get_ident()].pop()my_local_stack=MyLocalStack()import timedef go(i): my_local_stack.push(i) time.sleep(1) # my_local_stack.top()for i in range(5): t = threading.Thread(target=go,args=(i,)) t.start()print(my_local_stack.storage)###########from werkzeug.wrappers import Request, Responsefrom werkzeug.serving import run_simple@Request.applicationdef app(req): print(req) return Response("200ok")run_simple("0.0.0.0", 9527, app)"""run_simple("0.0.0.0",9527,app) 会执行app函数from flask import Flaskapp=Flask(__name__)if __name__ == '__main__': app.run()"""
#!/usr/bin/env python# -*- coding: utf-8 -*-from flask import Flask,requestapp=Flask(__name__)if __name__ == '__main__': app.run() #run_simple(host, port, self, **options) #app()=obj()=obj.__call__ #app.wsgi_app"""上文self._local=={"__storage__":{},"__ident_func__":get_ident}ctx = requestcontext对象 此对象中有 request/session 此时 self._local=={"__storage__":{3721:{"stack":[ctx] }},"__ident_func__":get_ident}_request_ctx_stack == LocalStack()==self._local=={"__storage__":{3721:{"stack":[ctx] }},"__ident_func__":get_ident}""""""下文源码request = LocalProxy(partial(_lookup_req_object, "request")#返回的是request对象) class LocalProxy(object): __slots__ = ("__local", "__dict__", "__name__", "__wrapped__") def __init__(self, local, name=None): ####local==request偏函数==partial(_lookup_req_object, "request")#返回的是request对象 object.__setattr__(self, "_LocalProxy__local", local) #__local=local=request偏函数 object.__setattr__(self, "__name__", name) if callable(local) and not hasattr(local, "__release_local__"): object.__setattr__(self, "__wrapped__", local) def __getattr__(self, name): ##request.method执行的就是这 if name == "__members__": return dir(self._get_current_object()) return getattr(self._get_current_object(), name) #从request对象中取出 name ,name=method def _get_current_object(self): if not hasattr(self.__local, "__release_local__"): return self.__local() ##request偏函数的执行,返回的是request对象 try: return getattr(self.__local, self.__name__) except AttributeError: raise RuntimeError("no object bound to %s" % self.__name__) def _lookup_req_object(name): #name==request top = _request_ctx_stack.top #top==ctx ctx = requestcontext对象 此对象中有 request/session if top is None: raise RuntimeError(_request_ctx_err_msg) return getattr(top, name) #返回的是request对象 ,top是ctx name是request @property def top(self): try: return self._local.stack[-1] ## except (AttributeError, IndexError): return None """""" 上文源码 def run(self, host=None, port=None, debug=None, load_dotenv=True, **options): # self==app==Flask from werkzeug.serving import run_simple try: run_simple(host, port, self, **options) #相当于执行self() 也就是 Flask.__call__() # self==app==Flask Flask.__call__() def __call__(self, environ, start_response): #environ==request return self.wsgi_app(environ, start_response) def wsgi_app(self, environ, start_response): ctx = self.request_context(environ) # self==app==Flask ## ctx = requestcontext对象 此对象中有 request/session ## error = None try: try: ctx.push()### ctx执行的push response = self.full_dispatch_request() except Exception as e: error = e response = self.handle_exception(e) except: # noqa: B001 error = sys.exc_info()[1] raise return response(environ, start_response) finally: if self.should_ignore_error(error): error = None ctx.auto_pop(error) def push(self): ##self==ctx top = _request_ctx_stack.top ###_request_ctx_stack == LocalStack()=={"__storage__":{},"__ident_func__":get_ident} if top is not None and top.preserved: top.pop(top._preserved_exc) app_ctx = _app_ctx_stack.top if app_ctx is None or app_ctx.app != self.app: app_ctx = self.app.app_context() app_ctx.push() self._implicit_app_ctx_stack.append(app_ctx) else: self._implicit_app_ctx_stack.append(None) if hasattr(sys, "exc_clear"): sys.exc_clear() _request_ctx_stack.push(self)######### self==ctx ctx = requestcontext对象 此对象中有 request/session ###{"__storage__":{3721:{"stack":[ctx] }},"__ident_func__":get_ident}### 上文 def push(self, obj): ######### obj就是ctx self==_request_ctx_stack == LocalStack()=={"__storage__":{},"__ident_func__":get_ident} rv = getattr(self._local, "stack", None) ##此时 self._local=={"__storage__":{},"__ident_func__":get_ident} if rv is None: self._local.stack = rv = [] ##此时 self.local=={"__storage__":{3721:{"stack":rv=[] }},"__ident_func__":get_ident} rv.append(obj) ##obj==ctx ctx = requestcontext对象 此对象中有 request/session 此时 {"__storage__":{3721:{"stack":[ctx] }},"__ident_func__":get_ident} return rv ##rv==ctx class Local(object): def __setattr__(self, name, value): ident = self.__ident_func__() storage = self.__storage__ try: storage[ident][name] = value except KeyError: storage[ident] = {name: value} class LocalStack(object): def __init__(self): self._local = Local() class Local(object): __slots__ = ("__storage__", "__ident_func__") def __init__(self): object.__setattr__(self, "__storage__", {}) object.__setattr__(self, "__ident_func__", get_ident) def request_context(self, environ): return RequestContext(self, environ) # self==app==Flask class RequestContext(object): def __init__(self, app, environ, request=None, session=None): #self==requestcontext对象 app==app==flask self.app = app if request is None: request = app.request_class(environ) #request_class = Request self.request = request ####### self.url_adapter = None try: self.url_adapter = app.create_url_adapter(self.request) except HTTPException as e: self.request.routing_exception = e self.flashes = None self.session = session class Request(RequestBase, JSONMixin): pass class BaseRequest(object): def __init__(self, environ, populate_request=True, shallow=False): self.environ = environ if populate_request and not shallow: self.environ["werkzeug.request"] = self self.shallow = shallow"""