使用Python实现接口的增删改查

提到Python,不得不感叹,简直不要太好用...
那么在Python中如何编写服务端的接口呢,方式有许多,今天主要说一下Flask,Flask是一个用Python编写的Web应用程序框架,具有轻量级和简洁性,还有强大的可扩展性等优点。
最简单的示例:

from flask import Flask  
  
app = Flask(__name__)  
  
@app.route('/')  
def hello_world():  
    return 'Hello, Flask!'  
  
if __name__ == '__main__':  
    app.run(debug=True)

这样我们就可以在浏览器访问到最简单的“Hello, Flask!”了,不过前提是你已经成功安装Flask 。

由于实际工作中可能会涉及更多内容,比如接收不同请求方式参数、数据库的连接和操作、跨域解决等等一系列问题,下面就进行一一解答。
cmd 安装以下指令:

# 安装指令
pip install pymysql 
pip install flask 
pip install flask_cors 

打开Pycharm,在设置中查看Python解析器是否有这几个软件包,如果有则代表安装成功。


image.png

新建一个新文件,导入依赖包

# 需要用到的包引入
from pymysql import Connection
from flask import Flask, request, jsonify
from flask_cors import CORS

导入完毕后,开始连接数据库,我在本地已经成功安装MySQL并且创建了test数据库,里面有一张student表格,如果没有安装请自行安装并新建表格。


企业微信截图_17134282429733.png
conn = Connection(
    host="localhost",
    port=3306,
    user="root",
    password="123456",
    autocommit=True
)

# 获取游标对象
cursor = conn.cursor()
# 选择数据库
conn.select_db("test")

接下来是启动服务和解决跨域问题

# 后端服务启动
app = Flask(__name__)
# 为所有路径启用CORS
CORS(app)

接下来是接口定义部分,直接贴代码吧,其实我注释写得还蛮多,应该能看明白。

# 获取列表数据
@app.route("/student/list", methods=['GET'])
def student_list():
    # 获取某个参数,例如 'name'
    name = request.args.get('name')
    # 构造查询语句
    if name:
        # 使用LIKE操作符和通配符%来查询包含name的记录
        query = "select * from student where name like %s"
        # 使用%%作为通配符%的转义,因为%在SQL中是特殊字符
        cursor.execute(query, ('%' + name + '%',))
    else:
        # 如果没有name参数,查询所有记录
        query = "select * from student"
        cursor.execute(query)

    try:
        # 获取查询结果
        data = cursor.fetchall()
    except Exception as e:
        # 处理数据库查询异常
        print(f"查询异常: {e}")
        return jsonify([]), 500

    # 将结果转换为字典列表
    # 这段代码使用了Python的列表推导式(list comprehension)来将一个数据库查询结果列表转换为包含字典的列表
    result = [
        {
            "id": row[0],
            "name": row[1],
            "age": row[2]
        } for row in data
    ]

    # 返回JSON响应
    return jsonify(result)

# 增加一条数据
@app.route("/student", methods=['POST'])
def create_student():
    name = request.form.get("name")
    age = request.form.get("age")
    ids = request.form.get("id")  # 这个id其实不应该传,应该是后端定一个规则自动生成的,这里偷懒了

    if not name or not age or not ids:
        return jsonify({"message": "缺少必填参数"}), 400

    query = "insert into student (name, age, id) values (%s, %s, %s)"
    try:
        cursor.execute(query, (name, age, ids))
        return jsonify({"message": f"学生{name}新增成功"}), 200
    except Exception as e:
        return jsonify({"message": str(e)}), 500
# 删除某条数据
@app.route("/student/<int:student_id>", methods=['DELETE'])
def delete_student(student_id):
    query = "delete from student where id=%s"
    try:
        cursor.execute(query, (student_id,))
        return jsonify({"message": f"学生 {student_id} 删除成功"}), 200
    except Exception as e:
        return jsonify({"message": str(e)}), 500
# 修改某条数据
@app.route("/student/<int:student_id>", methods=['PUT'])
def update_student(student_id):
    name = request.form.get("name")
    age = request.form.get("age")

    if not name and not age:
        return jsonify({"message": "缺少必要参数"}), 400

    update_query = "update student set "
    update_params = []

    if name:
        update_query += "name=%s, "
        update_params.append(name)
    if age:
        update_query += "age=%s, "
        update_params.append(age)

    update_query = update_query.rstrip(', ')

    query = f"{update_query} where id=%s"
    update_params.append(student_id)
    # print(query)
    # print(update_params)
    try:
        cursor.execute(query, tuple(update_params))
        return jsonify({"message": f"学生{name}信息更新成功"}), 200
    except Exception as e:
        return jsonify({"message": str(e)}), 500

最后定义运行在哪里以及端口号等

if __name__ == "__main__":
    app.run("0.0.0.0", port=9090, debug=True)
    conn.close() # 关闭数据库连接

使用postman 测试一下查询的接口:


企业微信截图_17134290049260.png

查询结果正确,很完美!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容