传送门:系列教程前言及整理贴 测试开发——Flask 入门教程系列 (整理及前言)
上一章我们使用内存数据完成了 TODO API 的 Task 功能,但一旦服务出错或是重启,所有的数据会没有,这显然不是我们想要的,面对这样的问题,引入了数据库存储,本章我们将学习使用 Flask-MongoEngine 来集成数据库。
本教程使用的是 MongoDB(暂时没打算加其它数据库的教程),示例的 Mongo 版本及信息为:
version:3.4.0
ip:localhost
port:27017
数据模型主要的功能是用于说明数据包含哪些字段,每个字段分别是什么类型,有什么属性(唯一的,还是固定几个值中的一个)等等,这样可以帮助我们在操作数据的时候时刻清晰地知道数据信息。
我们自然可用 pymongo 来操作 MongoDB,但为了进一步简化操作以及规范代码,需创建数据模型。
这里引入 Flask 的 MongoDB 扩展是 Flask-MongoEngine,我们需要:
pip install flask-mongoengine
import flask_mongoengine
Mongo 建库和表很方便,示例库名: todo ,我们将把数据存入 task 表(collection)中:
$mongo
...
> use todo
switched to db todo
> show collections
task
需要先在 Flask 中配置 MongoDB 信息,然后再初始化 MongoEngine,这样数据库和服务器就建立了联系。
配置及初始化代码如下:
app.config['MONGODB_SETTINGS'] = {
'db': 'todo',
'host': 'localhost',
'port': 27017
}
db = MongoEngine()
db.init_app(app)
建立联系之后,我们就可以使用 MongoEngine 创建数据模型了,即创建 class Task(db.Document) ,参考上章
"task": { "id": 1, "title": "Buy groceries", "description": "modify your plan", "done": false }
字段类型,字段是否必要,以及字符串长度等均可在数据模型中定义,我们暂定如下(且增加创建及完成时间):
class Task(db.Document):
task_id = db.StringField(required=True)
title = db.StringField(required=True, max_length=50)
description = db.StringField(required=True, max_length=1000)
done = db.BooleanField(required=True)
createtime = db.DateTimeField(required=True)
completetime = db.DateTimeField()
def to_json(self):
return {
"task_id": self.task_id,
"title": self.title,
"description": self.description,
"done": self.done,
"createtime": self.createtime.strftime("%Y-%m-%d %H:%M:%S"),
"completetime": self.completetime.strftime("%Y-%m-%d %H:%M:%S") if self.done else ""
}
数据模型(Model)创建成功后,就可以通过 Model 对数据库中的数据进行操作,即常规的增删改查了。
Mongoengine 包含很多用法和功能,这里仅做基础示例,详情及其余用法请 Google
增加一条记录很简单,仅需要使用 Model 的 save() 方法即可。
以 TODO 的 Task 为例,我们改写 postTask()
方法:
@app.route('/todo/task', methods=['POST'])
def postTask():
if not request.json or not 'task' in request.json:
return jsonify({'err': 'Request not Json or miss task'})
else:
task = Task(
task_id=shortuuid.uuid(),
title=request.json['task'],
description=request.json['description'] if 'decription' in request.json else "",
done=False,
createtime=datetime.now()
)
task.save()
return jsonify({'status': 0, 'task_id': task['task_id']})
这边引入了 shortuuid 作为 task_id,此方式能保证唯一性(需要
pip install shoruuid
);另外引入时间所以,需要
from datetime import datetime
以及import shortuuid
代码逻辑处理 description 非必传,若不传,存为 ""
Document 类有一个 objects 属性,它用于将类与数据库关联起来。objects 属性是一个 QuerySetManager 类型的对象,它的操作会返回一个 QuerySet 类型的对象。可以通过对 QuerySet 对象的迭代获取数据库中的数据。
回到之前的 TODO Task,先做查操作,使用 objects(),若是不加条件过滤,则为 Task.objects()
在 ()
内增加对查询的过滤。如查询对应 id 为 1 的 Task:Task.objects(id=1)
另外,MongoEngine 提供了一些条件语句:
tasks = Tasks.objects(id__lte=10) # 示例:查询id小于等于10
附加,MongoEngine 数据查询:
- 查询引用的对象只需要使用双下划线即可,如
uk_papers = Paper.objects(author__country='uk')
其中 country 为 author 下的一个字段- 查询结果个数限制可用数组分片的语法,如
users = User.objects[10:15]
表示从 10 开始取到 15- MongoEngine 还提供了一些数据库的聚合操作,例如
.count()
.sum()
等
根据上述数据库查询方式,我们改写之前的 getTask() ,如下( .first() 表示取第一条 ):
@app.route('/todo/task/<task_id>', methods=['GET'])
def getTask(task_id):
task = Task.objects(task_id=task_id).first()
if not task:
return jsonify({'err': 'Not found.'})
else:
return jsonify({'status': 0, 'task': task.to_json()})
改写 getTasks(),需要对返回的数组对象进行循环读取处理,如下:
@app.route('/todo/tasks', methods=['GET'])
def getTasks():
tasks = Task.objects()
return jsonify({'status': 0, 'tasks': [task.to_json() for task in tasks]})
如果我们需要更新一条记录,先需要找到他,找到之后调用 update() 方法即可:
@app.route('/todo/task/<task_id>', methods=['PUT'])
def putTask(task_id):
task = Task.objects(task_id=task_id).first()
if not task:
return jsonify({'err': 'Not found.'})
else:
if 'task' in request.json:
task.update(title=request.json['task'])
if 'description' in request.json:
task.update(description=request.json['description'])
if 'done' in request.json:
if request.json['done'] == True:
task.update(done=True, completetime=datetime.now())
task = Task.objects(task_id=task_id).first()
return jsonify({'status': 0, 'task': task.to_json()})
和更改一样,需要先找到这个需要删除的记录,找到之后只需调用 delete() 方法即可:
@app.route('/todo/task/<task_id>', methods=['DELETE'])
def deleteTask(task_id):
task = Task.objects(task_id=task_id).first()
if not task:
return jsonify({'err': 'Not found.'})
else:
task.delete()
return jsonify({'status': 0, 'task_id': task['task_id']})
完整的代码如下,基本实现了数据库的增查改删(CRUD),大家可以再尝试写一些其他的功能及方法。
#!/usr/bin/env python3
from flask import Flask, jsonify, request
from flask_mongoengine import MongoEngine
from datetime import datetime
import shortuuid
app = Flask(__name__)
app.config['MONGODB_SETTINGS'] = {
'db': 'todo',
'host': 'localhost',
'port': 27017
}
db = MongoEngine()
db.init_app(app)
class Task(db.Document):
task_id = db.StringField(required=True)
title = db.StringField(required=True, max_length=50)
description = db.StringField(required=True, max_length=1000)
done = db.BooleanField(required=True)
createtime = db.DateTimeField(required=True)
completetime = db.DateTimeField()
def to_json(self):
return {
"task_id": self.task_id,
"title": self.title,
"description": self.description,
"done": self.done,
"createtime": self.createtime.strftime("%Y-%m-%d %H:%M:%S"),
"completetime": self.completetime.strftime("%Y-%m-%d %H:%M:%S") if self.done else ""
}
@app.route('/todo/task', methods=['POST'])
def postTask():
if not request.json or not 'task' in request.json:
return jsonify({'err': 'Request not Json or miss task'})
else:
task = Task(
task_id=shortuuid.uuid(),
title=request.json['task'],
description=request.json['description'] if 'decription' in request.json else "",
done=False,
createtime=datetime.now()
)
task.save()
return jsonify({'status': 0, 'task_id': task['task_id']})
@app.route('/todo/task/<task_id>', methods=['GET'])
def getTask(task_id):
task = Task.objects(task_id=task_id).first()
if not task:
return jsonify({'err': 'Not found.'})
else:
return jsonify({'status': 0, 'task': task.to_json()})
@app.route('/todo/tasks', methods=['GET'])
def getTasks():
tasks = Task.objects()
return jsonify({'status': 0, 'tasks': [task.to_json() for task in tasks]})
@app.route('/todo/task/<task_id>', methods=['PUT'])
def putTask(task_id):
task = Task.objects(task_id=task_id).first()
if not task:
return jsonify({'err': 'Not found.'})
else:
if 'task' in request.json:
task.update(title=request.json['task'])
if 'description' in request.json:
task.update(description=request.json['description'])
if 'done' in request.json:
if request.json['done'] == True:
task.update(done=True, completetime=datetime.now())
task = Task.objects(task_id=task_id).first()
return jsonify({'status': 0, 'task': task.to_json()})
@app.route('/todo/task/<task_id>', methods=['DELETE'])
def deleteTask(task_id):
task = Task.objects(task_id=task_id).first()
if not task:
return jsonify({'err': 'Not found.'})
else:
task.delete()
return jsonify({'status': 0, 'task_id': task['task_id']})
if __name__ == '__main__':
app.run(debug=True)
该代码仍有很多不足之处,比如异常情况的返回(试试 title 传值超过 50 个字符,会报数据库存储异常)
别着急,后续章节会完善我们的代码 ~