start.py
from core import src import sys,os if __name__ == ‘__main__‘: src.run()
bin 目录下存放文件
admin_info.py 、bank_info.py 、shopping.py、user_info.py
admin_info.py
from db import db_hander #修改额度接口 def change_balance_info(username,money): user_dic = db_hander.select(username) if user_dic: #修改额度 user_dic[‘balance‘] = int(money) # 保存修改后保存 db_hander.save(user_dic) return True,"额度修改成功" return False ,"修改用户不存在" # 冻结账号接口 def lock_user_info(username): user_dic= db_hander.select(username) if user_dic: user_dic[‘lock‘] = True db_hander.save(username) return True ,f‘已经将用户{username} 冻结‘ return False ,"没有这个用户冻结失败"
bank_info.py
""" 银行卡接口业务 """ from db import db_hander #提现接口 def withdraw_info(username,menoy): #获取用户字典 user_dic = db_hander.select(username) #查看用户的余额是否足够 balance = int(user_dic.get(‘balance‘)) menoy_fee = int(menoy) * 1.05 # 提现需要增加百分之五的手续费,此时它已经变成了浮点型(float) #判断用户余额是否足够 if balance >= menoy_fee: #修改用户字典中的金额 balance -= menoy_fee user_dic[‘balance‘] = balance flow= f‘用户{username} 提现{menoy},手续费{menoy_fee-float(menoy)}‘ # 因为在上面menoy_fee 在上面变成了浮点型,所以在这里 menoy 也要转换一下 #保存金额 db_hander.save(user_dic) return True, flow return False,"你的穷屌丝,没钱还来提现" #还款接口 def repay_info(username,money): #获取用户信息 user_dic= db_hander.select(username) #充值操作 user_dic[‘balance‘]+=money flow= f‘用户{username},还款{money}成功 当前余额为{user_dic["balance"]}‘ user_dic[‘flow‘].append(flow) #保存数据 db_hander.save(user_dic) return True, flow #转账接口 def tranfer_hub(login_user,to_user,money): #获取当前用户信息 login_user_dic=db_hander.select(login_user) #获取目标用户信息 to_user_dic= db_hander.select(to_user) #检查目标账号是否存在 if not to_user_dic: return False ,"目标用户不存在" #判断当前用户的钱够不够 if login_user_dic[‘balance‘] >= money: #如果足够那么进行转账 #给当前用户做减钱操作 login_user_dic[‘balance‘] -= money #给目标用户做加钱操作 to_user_dic[‘balance‘] += money #记录当前用户流水 login_user_flow = f‘用户{login_user} 给 {to_user} 转了{money}‘ login_user_dic[‘flow‘].append(login_user_flow) #保存当前数据 db_hander.save(login_user_dic) #保存目标用户流水 to_user_flow = f‘用户{to_user} 收到 {login_user} 转了{money}‘ to_user_dic[‘flow‘].append(to_user_flow) #保存目标用户数据 db_hander.save(to_user_dic) return True, login_user_flow return False ,‘当前用户金额不足‘ #记录流水接口 def check_flow(login_user): user_dic=db_hander.select(login_user) return user_dic.get(‘flow‘) #支付接口 def pay_info(login_user,cost): user_dic= db_hander.select(login_user) if user_dic.get(‘balance‘) >= cost: user_dic[‘balance‘] -= cost #记录流水 flow = f‘用户消费金额:{cost}元‘ user_dic[‘flow‘].append(flow) #保存数据 db_hander.save(user_dic) # 这里返回的True 或者false 是给用户接口层 return True return False
shopping.py
""" 存放购物车信息 """ from bin import bank_info from db import db_hander #商品准备结算接口 def shopping_info(login_user,shopping_car): cost = 0 # 定义初始金额 for money_number in shopping_car.values(): money ,number = money_number cost +=(money *number ) flag = bank_info.pay_info(login_user,cost) if flag: return True,"支付成功" return False ,"支付失败" #购物车添加接口 def add_shop_car_info(login_user,shopping_car): #获取当前用户的购物车 user_dic = db_hander.select(login_user) #获取用户文件中的商品数据 shop_car = user_dic.get(‘shop_car‘) #检查当前用户选择的商品是否已经存在 for shop_name ,money_number in shopping_car.items(): number =money_number[1] #商品的数量 if shop_name in shop_car: user_dic[‘shop_car‘][shop_name][1] += number #如果商品重复,要累加商品数量 else: user_dic[‘shop_car‘].upate( {shop_name:money_number} #如果不重复的,就更新奥字典中 ) db_hander.save(user_dic) return True ,"添加购物车成功" #查看购物车接口 def check_shop_car_info(login_user): user_dic = db_hander.select(login_user) return user_dic.get(‘shop_car‘)
user_info.py
""" 用户接口 """ import os import json from lib import common from db import db_hander user_log=common.get_logger(‘user‘) from config import setting # 导入获取存放用户信息路径的自定义的模块 #注册接口 def redistered_info(username,password,balance=15000): #判断用户是否存在 user_dic = db_hander.select(username) if user_dic: return False, "您注册的用户已经存在" #相当于返回的一个元组(False ,‘您注册的用户已经存在’) #密码加密 password=common.get_pwd_md5(password) # 组织用户数据的字典信息 user_dic = { ‘username‘: username, ‘password‘: password, ‘balance‘: balance, ‘flow‘: [], # 定义列表记录ATM流水 ‘lock‘: False, # 定义用户状态如果为True ‘shop_car‘: {} } #保存数据 db_hander.save(user_dic) msg = f‘{username} 注册成功‘ user_log.info(msg) return True, msg #登录接口 def login_info(username,password): #先查看当前用户是否存在 user_dic=db_hander.select(username) if user_dic.get(‘lock‘): return False ,"此账号已经被锁定" #判断用户是否存在 if user_dic: #后面没有条件对比,意为真就执行下面代码否则不执行 #给用户输入进来的密码也做加密 password = common.get_pwd_md5(password) if password == user_dic.get(‘password‘): msg = f‘用户{username} 登录成功‘ user_log.info(msg) return True,msg else: msg = f‘用户{username} 密码错误‘ user_log.error(msg) return False, msg msg = f‘用户{username} 用户不存在‘ user_log.info(msg) return False ,msg #查询余额 def check_balance(username): user_dic = db_hander.select(username) return user_dic[‘balance‘] # 只返回余额这个,如果返回user_dic 就是整个用户信息了
config 目录下setting.py
""" 配置信息 """ import os # 获取db下存放用户信息的user_data 目录的路径 BASE_PATCH=os.path.dirname(os.path.dirname(__file__)) #先获取到顶级目录 USER_DATA=os.path.join(BASE_PATCH,‘db‘,‘user_data‘) # 拼接路径获取到存放用户信息的user_data目录 standard_format = ‘%(asctime)s - %(threadName)s:%(thread)d - %(name)s - %(filename)s:%(lineno)d - %(levelname)s - %(message)s‘ simple_format = ‘%(levelname)s - %(asctime)s - %(filename)s:%(lineno)d - %(message)s‘ test_format = ‘%(asctime)s - %(message)s‘ # 3、日志配置字典 LOGGING_DIC = { ‘version‘: 1, ‘disable_existing_loggers‘: False, ‘formatters‘: { ‘standard‘: { ‘format‘: standard_format }, ‘simple‘: { ‘format‘: simple_format }, ‘test‘: { ‘format‘: test_format }, }, ‘filters‘: {}, ‘handlers‘: { #打印到终端的日志 ‘tty‘: { ‘level‘: ‘DEBUG‘, ‘class‘: ‘logging.StreamHandler‘, # 打印到屏幕 ‘formatter‘: ‘simple‘ }, #打印到文件的日志,收集info及以上的日志 # ‘fh1‘: { # ‘level‘: ‘DEBUG‘, # ‘class‘: ‘logging.handlers.RotatingFileHandler‘, # 保存到文件,日志轮转 # ‘formatter‘: ‘standard‘, # # 可以定制日志文件路径 # # BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # log文件的目录 # # LOG_PATH = os.path.join(BASE_DIR,‘a1.log‘) # ‘filename‘: ‘a1.log‘, # 日志文件 # ‘maxBytes‘: 1024*1024*5, # 日志大小 5M # ‘backupCount‘: 5, # ‘encoding‘: ‘utf-8‘, # 日志文件的编码,再也不用担心中文log乱码了 # }, ‘fh2‘: { ‘level‘: ‘DEBUG‘, ‘class‘: ‘logging.FileHandler‘, # 保存到文件 ‘formatter‘: ‘test‘, ‘filename‘: ‘a2.log‘, ‘encoding‘: ‘utf-8‘, }, ‘fh1‘: { ‘level‘: ‘DEBUG‘, ‘class‘: ‘logging.FileHandler‘, # 保存到文件 ‘formatter‘: ‘standard‘, ‘filename‘: ‘a1.log‘, ‘encoding‘: ‘utf-8‘, }, }, ‘loggers‘: { #logging.getLogger(__name__)拿到的logger配置 ‘‘: { ‘handlers‘: [‘fh1‘, ‘fh2‘], ‘level‘: ‘DEBUG‘, ‘propagate‘: False, }, }, }
core 目录下文件 src.py admin.py
src.py
""" 存放用户视图层 """ import json import os from core import admin #记录已经登录的用户 login_user=None # 1、注册功能 from bin import user_info from bin import bank_info from lib import common from bin import shopping def redistered(): while True: #1)用户用输入用户名和密码 进行校验 username = input("请输入您的用户名").strip() # 去空格 password = input("请输入您的密码").strip() re_password = input("请确认您的密码").strip() #判断两次密码是否一致 if password == re_password: #调用接口层的注册接口,将用户名与密码交给接口层来处理 #res ---> (FALSE,‘您注册的用户名已经存在’) #赋值解压,flag,msg ----> (false,‘您注册的用户已经存在‘) flag,msg= user_info.redistered_info( username,password ) #根据flag判断是否注册成功 if flag: #if 后面不跟条件意为如果为True(真) print(msg) break else: print(msg) # 2、登录功能 def login(): while True: username=input("请输入您的用户名:").strip() password=input("请输入您的密码:").strip() flag,msg=user_info.login_info(username,password) if flag: #后面不跟条件意为如果为真(True)怎执行 print(msg) global login_user #把这获取到的用户名global 成为全局变量 login_user=username break else: print(msg) # 3、查看余额功能 @common.login_auth def check_balance(): banlce = user_info.check_balance(login_user) print(f‘用户{login_user} 您的余额为{banlce}‘) # 4、提现功能 @common.login_auth def withdraw(): while True: input_money= input("请输入要提取的金额").strip() #代表去空格 #检查用户输入的是否是纯数字 if not input_money.isdigit(): print("请重新输入") continue #调用接口层来实现 用户提取金额 flag,msg = bank_info.withdraw_info( login_user,input_money ) if flag: #后面不跟条件判断,意为如果结果为真(True) 就执行 print(msg) break else: print(msg) # 5、还款功能 @common.login_auth def repayment(): while True: #让用户输入还款金额 input_money= input(‘请输入还款金额‘).strip() #检查用户输入的是否数字 if not input_money.isdigit(): print("请输入正确的金额") continue input_money = int(input_money) # 检查用户输入的金额是否为正数 if input_money >0: #调用还款接口 flag,msg = bank_info.repay_info( login_user,input_money ) if flag: #意为真则执行 print(msg) break else: print("您的还款金额必须大于0") # 6、转账功能 @common.login_auth def tranfer(): """转账的三个条件,当前用户,目标用户,转账金额""" while True: to_user= input("请输入目标账号").strip() money= int(input("请输入转账的金额").strip()) if money >0: flag,msg = bank_info.tranfer_hub( login_user,to_user,money ) if flag: # 后面不跟判断意为如果为真(True) print(msg) break else: print(msg) else: print("请输入正确的金额") # 7、查看流水 @common.login_auth def check_water(): flow_list=bank_info.check_flow( login_user ) if flow_list: for flow in flow_list: print(flow) else: print("暂时没有流水记录") # 8、购物功能 @common.login_auth def shopping(): product_list = [[‘Iphone7‘, 5800], [‘Coffee‘, 30], [‘疙瘩汤‘, 10], [‘Python Book‘, 99], [‘Bike‘, 199], [‘ViVo X9‘, 2499], ] shopping_car={} #创建一个空字典用于存放添加的商品 while True: #使用枚举的方式来打印商品信息 for index,shop in enumerate(product_list): #枚举参数是可迭代对象变为元祖形式---------可迭代对象所以,索引对应的值 shop_name,shop_money = shop # 解压赋值 print(f‘商品编号为:{index},商品名称为{shop_name},商品单价为{shop_money}‘) choice=input("请输入商品编号,(是否结账输入 y 或 n)").strip() #输入y 代表进入结算功能 if choice == ‘y‘: if not shopping_car: print("购物车是空的,不能结算请重新输入") continue #调用支付接口 flag,msg = shopping.shopping_info(login_user,shopping_car) if flag: print(msg) break else: print(msg) elif choice == ‘n‘: if not shopping_car: print("购物车是空的,不能结算请重新输入") continue flag,msg = shopping.add_shop_car_info( login_user,shopping_car ) if flag: print(msg) break else: print(msg) if not choice.isdigit(): print("请输入正确编号") choice=int(choice) if choice not in range(len(product_list)): print("请输入正确编号") continue shop_name,shop_money = product_list[choice] #获取到商品名称和单价 #检查用户是否选择了多份同样的商品 默认为1 if shop_money in shopping_car: shopping_car[shop_name][1]+=1 #记录商品被选择的次数 else: shopping_car[shop_name] = [shop_money,1] #z这里以字典套列表的形式{‘商品名称‘:[‘钱‘,‘数量‘]} print("当前购物车有",shopping_car) # 9、查看购物车 @common.login_auth def check_shop_car(): shop_car=shopping.check_shop_car_info(login_user) print(shop_car) # 10、管理员功能 def admin(): admin.admin_run() #定义一个功能字典 func_dic={ ‘1‘:redistered, ‘2‘:login, ‘3‘:check_balance, ‘4‘:withdraw, ‘5‘:repayment, ‘6‘:tranfer, ‘7‘:check_water, ‘8‘:shopping, ‘9‘:check_shop_car, ‘10‘:admin, } def run(): while True: print(""" 1、注册功能 2、登录功能 3、查看余额功能 4、提现功能 5、还款功能 6、转账功能 7、查看流水 8、购物功能 9、查看购物车 10、管理员功能 11、退出 """) choice = input("请选择您的操作").strip() if choice in func_dic: # print("请输入正确的选择") # continue func_dic.get(choice)() #获取用户输入的功能编号然后调用对应功能函数 elif choice == ‘11‘: break else: print("请输入正确的选择")
admin.py
from core import src from bin import admin_info #添加管理员用户 def add_user(): src.redistered() #修改用户额度 def change_balace(): while True: change_user = input("请输入您要修改的用户").strip() money = input("请输入要修改的额度") if not money.isdigit(): continue #调用修改额度的接口 flag ,msg =admin_info.change_balance_info( change_user , money ) if flag: print(msg) break else: print(msg) #冻结账号 def lock_user(): while True: #输入需要冻结的账号 change_lock_user= input("请输入需要冻结的账号").strip() flag,msg = admin_info.lock_user_info( change_lock_user ) if flag: print(msg) break else: print(msg) #admin功能字典 admin_func={ ‘1‘: add_user, ‘2‘: change_balace, ‘3‘:lock_user, } def admin_run(): while True: print(""" ‘1‘: 添加管理员账号, ‘2‘: 修改额度, ‘3‘:冻结账号, """) choice = input("请输入管理员功能编号:") if choice not in admin_func: print("请输入正确的功能编号") continue admin_func.get(choice)()
db_hanlder.py
""" 专门处理用户数据 """ import os import json from config import setting #保存数据 def select(username): #接收传过来的用户名 user_path = os.path.join( setting.USER_DATA,f‘{username}.json‘ ) if os.path.exists(user_path): # 判断是否有用户名的文件 with open(user_path, ‘r‘, encoding=‘utf-8‘) as f: user_dic=json.load(f) return user_dic #返回用户数据 #保存数据 def save(user_dic): # 接收用户接口文件传过来的用户字典信息 username = user_dic.get(‘username‘) #获取到注册的用户名 user_path = os.path.join( #为了方便取数一个用户一个json 文件以用户名命名的 setting.USER_DATA,f‘{username}.json‘ ) #保存用户数据, with open(user_path,‘w‘,encoding=‘utf-8‘) as f: json.dump(user_dic,f,ensure_ascii=False) # ensure_ascii =False 可以显示中文
commony.py
""" 存放公共方法 """ import logging.config from config import setting #密码加密 import hashlib from core import src # 使用md5 加密 def get_pwd_md5(password): md5_obj =hashlib.md5() md5_obj.update(password.encode(‘utf-8‘)) slat = ‘哈哈哈‘ #加盐 md5_obj.update(slat.encode(‘utf-8‘)) return md5_obj.hexdigest() #登录认证装饰器 def login_auth(func): def wrapper(*args,**kwargs): if src.login_user: #后面没有条件判断意为如果为真(True) res= func(*args,**kwargs) return res else: print("请先登录账号") src.login() return wrapper #记录日志功能 def get_logger(log_type): #加载日志配置信息 logging.config.dictConfig( setting.LOGGING_DIC ) #获取日志对象 logger = logging.getLogger(log_type) return logger
原文:https://www.cnblogs.com/yjc53/p/13548965.html