启动项目:python app.py
获取需要的模块及版本requirements.txt:pip freeze > requirements.txt
下载requirements中的模块及版本:pip install -r requirements.txt
app.run()
# 源码
def run(self, host=None, port=None, debug=None, load_dotenv=True, **options):
# host——主机名
# port——端口
# debug——调试,默认F,改为T时只要代码改变服务器会自动重新加载最新代码,适用于开发
# 环境:production——debug=False
# development——debug=True
@app.route(‘/index/<int:uid>‘, endpoint=‘‘, methods=[‘GET‘,‘POST‘], strict_slashes=False,defaults={"db":‘xxx‘}, redirect_to=‘/‘,subdomain=‘<username>‘)
# @app.route(‘/index‘)=app.add_url_rule(‘/index/<int:uid>‘, view_func=index)
# key就是变量名,默认str,
# endpoint类似于django中的name,
# strict_slashes是否严格url后面加不加/,
# defaults给函数中传参没啥用,
# redirect_to=‘/‘重定向(js中meta或者location.href)
# subdomain子域名默认www
def index(key):
return info.get(key)
@app.route(‘/add/<int:num>‘)
def add(num):
return str(num+10) # 返回值必须是字符串类型
配置
from flask import Flask
import settings
from apps.user.views import user_bp
from ext import db
def create_app():
app = Flask(__name__, template_folder=‘../templates‘, static_folder=‘../static‘) # app是一个核心对象
app.config.from_object(settings.DevelopmentConfig) # 加载配置
# 将db对象与app进行关联
db.init_app(app)
# 注册蓝图
app.register_blueprint(user_bp)
# print(app.url_map)
return app
配置文件
class Config:
DEBUG = True
# 数据库配置 mysql+pymysql://user:password@host:port/database
SQLALCHEMY_DATABASE_URI = ‘mysql+pymysql://root:123456@127.0.0.1:3306/flask‘
SQLALCHEMY_TRACK_MODIFICATIONS = False
class DevelopmentConfig(Config):
ENV = ‘development‘
class ProductionConfig(Config):
ENV = ‘production‘
DEBUG = False
app.py
from apps import create_app
from flask_migrate import Migrate, MigrateCommand
from flask_script import Manager
from ext import db
from apps.user import models
app = create_app()
manager = Manager(app=app)
migrate = Migrate(app=app, db=db)
manager.add_command(‘db‘, MigrateCommand)
@manager.command
def init():
print("初始化")
if __name__ == ‘__main__‘:
manager.run()
python app.py runserver
app.url_map —— 路由规则表
request.args只能取get请求的数据
request.form只能取post请求的数据
试图函数返回值类型:字符串、dict、tuple,response、WSGI
response(‘字符串‘,headers={key:value})
# 将post请求的数据直接转化为字典
request.form.to_dict()
重定向原理:一个重定向有两次响应
第一次返回一个response对象code=302的状态码和location的响应头
第二次返回location对应的url的返回值
@app.route(endpoint=‘xxx‘) 配合 url_for(‘xxx‘)反向解析类似于django中url的name
js删除操作
<a href="javascript:;" onclick="del(‘{{ user.name }}‘)">删除</a>
<script>
function del(username){
//console.log(username)
location.href = ‘/del?username=‘ + username
}
</script>
@user_bp.route(‘/del‘)
def del_user():
username = request.args.get(‘username‘)
print(username)
for user in users:
if user.name == username:
users.remove(user)
return redirect(‘/‘)
else:
return ‘删除失败‘
models.py
from ext import db
from datetime import datetime
class User(db.Model):
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
username = db.Column(db.String(32), nullable=False)
password = db.Column(db.String(32), nullable=False)
phone = db.Column(db.String(11), unique=True)
d_time = db.Column(db.DateTime, default=datetime.now)
def __str__(self):
return self.username
数据库迁移命令
python app.py db init # 产生一个migrations文件 执行一次就行
python app.py db migrate # 产生一个版本文件
python app.py db upgrade # 同步到数据库 升级
python app.py db downgrade # 同步到数据库 降级
@app.route(‘/index/<int:uid>‘)
def index(uid):
print(uid, type(uid))
return ‘Index‘
from werkzeug.routing import BaseConverter
class RegexConverter(BaseConverter):
"""
自定义URL匹配正则表达式
"""
def __init__(self, map, regex):
super(RegexConverter, self).__init__(map)
self.regex = regex
# 如果匹配成功执行to_python
def to_python(self, value):
"""
路由匹配时,匹配成功后传递给视图函数中参数的值
:param:value
:return:
"""
return int(value)
def to_url(self, value):
"""
使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL的参数
:param value:
:return:
"""
val = super(RegexConverter, self).to_url(value)
return val
app.url_map.converters[‘regex‘] = RegexConverter
from flask import Flask, url_for, views
class IndexView(views.MethodView):
methods = [‘GET‘, ‘POST‘]
decorators = [‘装饰器名‘]
def get(self):
pass
def post(self):
pass
app.add_url_rule(‘/index‘, view_func=IndexView.as_view(name="index")) # name=endpoint
@app.route(‘/index‘)
def index():
# 请求相关
# request.method
print(request.args)
# request.form
# request.cookies
# request.headers
# request.path
# request.files
# obj = request.files[‘the_file_name‘]
# obj.save(‘path‘+secure_filename(f.filename))
# 响应相关
return "xxx"
# return json.dumps({}) = return jsonify({})
# return render_template(‘xx.html‘, y=1)
# return redirect(url_for(‘index‘))
# response = make_response(render_template(‘base.html‘))
# response.set_cookie(‘key‘, ‘value‘)
# response.headers[‘X-Something‘] = ‘X-value‘
# response.delete_cookie(‘key‘)
# return response
模板的语法
{{ 变量 }}
{{ for }}中可以用loop.index、loop.revindex,loop.first这个返回布尔值
def get_input(value):
return Markup(f"<input value={value}>") # 或者html加过滤器safe
@app.template_global() # 模板全局,所有模板中都可以用
def sbbbbb(a1, a2):
return a1 + a2
@app.template_filter() # 也是全局模板,使用方法为{{ a1|cbbbbb(a2,a3) }}
def cbbbbb(a1, a2, a3):
return a1 + a2 + a3
@app.route(‘/index‘)
def index():
content = {
‘k1‘: 123,
‘k2‘: [11, 22, 33],
‘k3‘: {‘name‘: ‘lem‘, ‘age‘: 18},
‘k4‘: lambda x: x + 10,
‘k5‘: get_input,
}
return render_template(‘test.html‘, **content)
<h1>{{ k1 }}</h1>
<h1>{{ k2.0 }} {{ k2[0] }}</h1>
<h1>{{ k3.name }} {{ k3[‘age‘] }} {{ k3.get(‘gender‘, ‘male‘) }}</h1>
<h1>{{ k4(666) }}</h1>
<h1>{{ k5(‘lem‘)|safe }}</h1>
<h1>{{ k5(‘lem‘)|safe }}</h1>
<h1>{{ sbbbbb(100, 20) }}</h1>
<h1>{{ 10 |cbbbbb(100, 20) }}</h1>
{{ 变量名| 过滤器(*args,**kwargs)}}
常见过滤器
字符串过滤器
列表过滤器
{{ list|first }}
{{ list|last }}
{{ list|length }}
{{ list|sum }} 整型的计算
{{ list|sort }} 排序
字典过滤器
{% for v in dict.0.values() %}:获取值
{% for k in dict.0.keys() %}:获取键
{% for k,v in dict.0.items() %}:获取键值
自定义过滤器
通过flask模块中的add_template_filter方法
def replace_hello(val):
res = val.replace(‘hello‘, ‘‘)
return res.strip()
app.add_template_filter(replace_hello,‘rep_hel‘)
使用装饰器完成
@app.template_filter(‘listreverse‘)
def reverse_list(li):
temp_li = list(li)
temp_li.reverse()
return temp_li
@app.template_filter(‘add‘)
def reverse_list(a, b):
return a + b
{{ a | add(b) }}
继承
需要模板继承的情况:
标签:{% block 名字%} {% endblock %}
静态文件:{{ url_for(‘static‘,filename=‘‘) }}必须要加filename
<link rel="stylesheet" href="{{ url_for(‘static‘,filename=‘css/index.css‘) }}">
include:
在多个页面中都有共同的部分,但其他页面没有这部分,这时候使用include
先定义一个公共的模板部分,nav.html
使用时{% include ‘nav.html‘ %}
宏 macro
方式一:在需要的html模板中直接定义
{% set username=‘张三‘ %}
{{username}}
{% with num=1000 %}
{{num}}
{% endwith %}
方式二:将所有的宏(类似于函数)写到一个html中,后面需要的话直接导入
// marco.html中类似于定义函数
{% macro form(action, value=‘登录‘,method=‘post‘) %}
<form action="{{ action }}" method="{{ method }}">
<input type="text" placeholder="用户名" name="username"><br>
<input type="text" placeholder="密码" name="username"><br>
<input type="submit" value="{{ value }}"><br>
</form>
{% endmacro %}
// 导入宏
{% import ‘macro/macro.html‘ as f %}
{{ f.form(‘/index‘,value=‘注册‘) }}
"""
1.请求刚刚到达执行__call__方法返回wsgi_app方法
ctx = RequestContext(app, environ)
- request
- session=None
ctx.push()
ctx.session = SecureCookieSessionInterface.open_session(self.app, self.request)
2.视图函数
3.请求结束
SecureCookieSessionInterface.save_session()
"""
@app.route(‘/x1‘)
def index():
# 去ctx中获取session
session[‘k1‘]=123
# save_session
"""
def save_session(self, app, session, response):
domain = self.get_cookie_domain(app)
path = self.get_cookie_path(app)
# If the session is modified to be empty, remove the cookie.
# If the session is empty, return without setting the cookie.
if not session:
if session.modified:
response.delete_cookie(
app.session_cookie_name, domain=domain, path=path
)
return
# Add a "Vary: Cookie" header if the session was accessed at all.
if session.accessed:
response.vary.add("Cookie")
if not self.should_set_cookie(app, session):
return
httponly = self.get_cookie_httponly(app)
secure = self.get_cookie_secure(app)
samesite = self.get_cookie_samesite(app)
expires = self.get_expiration_time(app, session)
val = self.get_signing_serializer(app).dumps(dict(session))
response.set_cookie(
app.session_cookie_name,
val,
expires=expires,
httponly=httponly,
domain=domain,
path=path,
secure=secure,
samesite=samesite,
)
"""
return ‘Index‘
@app.route(‘/x2‘)
def order():
print(session[‘k1‘])
return ‘Order‘
if __name__ == ‘__main__‘:
app.run()
# 1、请求一旦到来
app.__call__
return self.wsgi_app(environ, start_response)
app.wsgi_app
ctx = app.request_context(environ) = RequestContext(app, environ)
- request(environ) —— 处理请求相关所有数据
- session=None
"""class RequestContext(object):
def __init__(self, app, environ, request=None, session=None):
self.app = app
if request is None:
# request = Request(environ)
request = app.request_class(environ)
self.request = request
self.url_adapter = None
try:
self.url_adapter = app.create_url_adapter(self.request)
except HTTPException as e:
self.request.routing_exception = e
self.flashes = None
self.session = session
"""
# session_interface = self.app.session_interface
# 给session赋值:session_interface.open_session(self.app, self.request)
# ctx = RequestContext(app, environ)
# - request(environ) —— 处理请求相关所有数据
# - session = session_interface.open_session(self.app, self.request)
# - session = self.session_class() = 特殊的dict()
# session_interface = self.app.session_interface = SecureCookieSessionInterface()
ctx.push()
app.open_session
"""
def open_session(self, app, request):
s = self.get_signing_serializer(app)
if s is None:
return None
val = request.cookies.get(app.session_cookie_name)
if not val:
return self.session_class()
max_age = total_seconds(app.permanent_session_lifetime)
try:
data = s.loads(val, max_age=max_age)
return self.session_class(data)
except BadSignature:
return self.session_class()
"""
app.save_session
"""
def save_session(self, app, session, response):
domain = self.get_cookie_domain(app)
path = self.get_cookie_path(app)
# If the session is modified to be empty, remove the cookie.
# If the session is empty, return without setting the cookie.
if not session:
if session.modified:
response.delete_cookie(
app.session_cookie_name, domain=domain, path=path
)
return
# Add a "Vary: Cookie" header if the session was accessed at all.
if session.accessed:
response.vary.add("Cookie")
if not self.should_set_cookie(app, session):
return
httponly = self.get_cookie_httponly(app)
secure = self.get_cookie_secure(app)
samesite = self.get_cookie_samesite(app)
expires = self.get_expiration_time(app, session)
val = self.get_signing_serializer(app).dumps(dict(session))
response.set_cookie(
app.session_cookie_name,
val,
expires=expires,
httponly=httponly,
domain=domain,
path=path,
secure=secure,
samesite=samesite,
)
"""
"""
before_request=[xxx1,xxx2]
"""
@app.before_request
def xxx1():
print(‘执行xxx前1‘)
@app.before_request
def xxx2():
print(‘执行xxx前2‘)
"""
after_request=[ooo1,ooo2]
reversed(after_request)=[ooo2,ooo1]
"""
@app.after_request
def ooo1(response):
print(‘执行ooo后1‘)
return response
@app.after_request
def ooo2(response):
print(‘执行ooo后2‘)
return response
# 定制404页面
@app.errorhandler(404)
def page_not_found(error):
return render_template(‘error.html‘)
app.secret_key=‘lem‘
@app.before_request
def check_login():
if request.path in [‘/login‘, ‘/register‘]: # 可以配置一个白名单
return
user = session.get(‘user‘)
if not user:
return redirect(‘/login‘)
# 原理
# 第一次得到请求时给seesion一组键值对
# 第二次得到请求时将该组键值对删除
from flask import Flask, flash, get_flashed_messages
app.secret_key = ‘lem‘
@app.route(‘/login‘, methods=[‘GET‘, ‘POST‘])
def login():
flash("shanxian1", category=‘x1‘)
flash("shanxian2", category=‘x2‘)
return "视图函数x1"
@app.route(‘/index‘, methods=[‘GET‘, ‘POST‘])
def index():
data = get_flashed_messages(category_filter=[‘x1‘])
print(data)
print("视图函数x2")
return "视图函数x2"
原文:https://www.cnblogs.com/HinaChan/p/14705008.html