首页 > 其他 > 详细

基于tornado---异步并发接口

时间:2019-07-23 17:24:37      阅读:97      评论:0      收藏:0      [点我收藏+]

1、目的

  由于有多个程序和脚本需要对mysql进行读写数据库,每次在脚本中进行数据库的连接、用cursor进行操作过于麻烦,因此希望可以有一个脚本开放接口,只需要传入sql语句,就可以返回结果回来。因此有需要一个可以支持并发量较大的脚本来进行数据库操作。

2、程序

    1)客户端代码(通过requests 调用接口)

import reuqests
POST = requests.post # post请求方式
def db_query(sql, method=query):
    db_api = http://127.0.0.1:8000/db/api
    db_base = "dbbase"
    db_ret = POST(db_api, data=json_encode({method: method, sql: sql, dbbase: db_base, pwd: password123}))
    if db_ret.status_code == 200:
        if json_decode(db_ret.text)[status] == True:
            db_ret_data = json_decode(db_ret.text)[data]
            return {status: ok, data: db_ret_data, err: ‘‘}
        else:
            return {status: False, data: [], err: json_decode(db_ret.text)[data]}
    else:
        return {status: False, data: [], err: connect error}

 2)服务端代码(基于tornado框架)

# coding=utf8
import MySQLdb
import MySQLdb.cursors
import tornado
import tornado.ioloop
import tornado.web
import tornado.gen
from tornado.concurrent import run_on_executor
from concurrent.futures import ThreadPoolExecutor


class DB_CONFIG:
    config = {
        dbbase: (authentic, password), # 数据库user、password
    }


class DB:
    def __init__(self, dbbase):
        user, password = DB_CONFIG.config.get(dbbase, (None, None))
        if user == None or password == None:
            raise Exception(KeyError, dbbase)
        db_host = 192.168.xx.xx
        db_port = 1234

        self.db = MySQLdb.connect(db_host, user, password, dbbase, port=db_port, cursorclass=MySQLdb.cursors.DictCursor)

        self.cursor = self.db.cursor()

    def query(self, sql):
        self.cursor.execute(sql)
        return self.cursor.fetchall()

    def commit(self, sql):
        try:
            self.cursor.execute(sql)
            self.db.commit()
            return {status: True, data: ‘‘}
        except Exception as e:
            self.db.rollback()
            return {status: False, data: e}

    def close(self):
        self.db.close()


class ServiceHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(900)  # 必须定义一个executor的属性,然后run_on_executor装饰器才会游泳。

    @run_on_executor  # 线程内运行 query函数被run_on_executor包裹,将该函数的执行传递给线程池executor的线程执行,优化了处理耗时性任务,以致达到不阻塞主线程的效果。
    def query(self, dbname, method, sql):
        db = DB(dbname)
        ret = ‘‘
        if method == query:
            ret = db.query(sql)
        elif method == commit:
            ret = db.commit(sql)
        db.close()
        return ret

    @tornado.web.asynchronous  # 保持长连接,直到处理后返回
    @tornado.gen.coroutine  # 异步处理
    def post(self):
        data = tornado.escape.json_decode(self.request.body)  # 获取参数,json.loads()解码
        r = {status: ‘‘, data: ‘‘}
        if not data.get(pwd, None):
            r[status], r[data] = (False, not password)
        elif not data.get(dbbase, None):
            r[status], r[data] = (False, not DB select)
        else:
            if data[pwd] != password123): # 接口的密码认证
                r = {status: False, data: password error}
            elif data[method] == query:

                d = yield self.query(data[dbbase], query, data[sql])
                r = {status: True, data: d}
            elif data[method] == commit:

                db_r = yield self.query(data[dbbase], commit, data[sql])
                if db_r[status]:
                    r = {status: True, data: commit sucessful}
                else:
                    r = {status: False, data: db_r[data]}
            else:
                r = {status: False, data: method Invaild}
        self.write(tornado.escape.json_encode(r))  # 写入返回信息写入response
        self.finish()  # 结束服务

    def get(self):
        return self.post()


if __name__ == "__main__":
    application = tornado.web.Application([
        (r"/db/api", ServiceHandler),  # 路由映射
    ])
    application.listen(8000)  # 监听端口
    tornado.ioloop.IOLoop.instance().start()  # 启动服务

 

3、请求举例

  1) sql语句

 sql = select vid from video where status=1 group by vid ORDER BY num ASC limit 100

   2)客户端调用db_query函数

db_ret = db_query(sql, query) # 其中query是数据库操作的方法 query为查询/commit 为insert/update/delete等数据库修改操作时使用

  通过调用db_query传入参数sql语句和操作方式,返回结果或错误内容

基于tornado---异步并发接口

原文:https://www.cnblogs.com/kakawith/p/11232756.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!