#!/usr/bin/env python
# coding: utf-8
# @Time : 2018/12/21 0021 13:57
# @Site :
# @File : demos.py
# @Software: PyCharm
import MySQLdb
import redis
import json
import os, time
import threading
from multiprocessing import Pool, Process
import os, time, random
import sys
reload(sys)
sys.setdefaultencoding(‘utf8‘)
class InsertData():
def __init__(self):
# 去掉一些无用信息
self.__list_industry = []
self.__has_many = []
self.__list_xczx = []
self.__list_cxcy = []
self.__list_industry_dict = {‘test‘: self.__list_xczx }
self.__dict_industry = {‘test‘: 212}
self.db = MySQLdb.connect(host="127.0.0.1", port=3306, user="root", passwd="123456", db="ww",
charset=‘utf8‘)
redisPool = redis.ConnectionPool(host=‘localhost‘, port=6379)
self.re_queue = redis.Redis(connection_pool=redisPool)
self.re_queue2 = redis.Redis(connection_pool=redisPool)
def __get_dict_industry(self):
industry_name = self.__list_industry_dict.keys()
if len(industry_name) == 1:
industry_name = str(tuple(industry_name)).replace(",","")
elif len(industry_name) > 1:
industry_name = str(tuple(industry_name))
else:
return
sql_industry = "select industry_name,industry_id from zzh_industry where industry_name in {}".format(industry_name)
cursor3 = self.db.cursor()
cursor3.execute(sql_industry)
result_list = cursor3.fetchall()
for result in result_list:
self.__dict_industry[result[0]] = result[1]
cursor3.close()
def inser_industry(self):
dta = """xx、xxx"""
data = dta.split("、")
for index, da in enumerate(data):
industry_code = 100001 + index
sqlStr = """insert into xx(industry_name,industry_pid,industry_code,industry_sort,is_lock) VALUES(‘{industry_name}‘,211,‘{industry_code}‘,{industry_sort},1) ;""".format(
industry_name=da, industry_code=industry_code, industry_sort=index + 1)
print sqlStr
def put_redis(self):
cursor = self.db.cursor()
item_sql = """SELECT item_title,item_id from xxx"""
cursor.execute(item_sql)
result_list = cursor.fetchall()
num = 1
for result in result_list:
data = {"itemTitle": result[0], "itemId": result[1]}
self.re_queue.lpush("item", json.dumps(data))
num += 1
print ("put over", num)
def get_redis(self):
nums = 1
resultNum = 0
cursor_get = self.db.cursor()
while True:
result = self.re_queue.rpop("item")
if not result:
time.sleep(1)
if resultNum == 10:
break
else:
print "resultNum", resultNum
resultNum += 1
continue
try:
resultNum = 0
result = json.loads(result)
value_list = []
for strkey in self.__list_industry_dict.keys():
if strkey in self.__has_many:
for __strkey in self.__list_industry_dict[strkey]:
if __strkey in result["itemTitle"]:
value_list.append(strkey)
break
if strkey in result["itemTitle"]:
value_list.append(strkey)
value_list = set(value_list)
item_id = result["itemId"]
if value_list:
print result["itemTitle"]
for value in value_list:
nums += 1
# select_sql = "select id from zzh_industry_item where item_id={} and industry_id={} limit 1".format(item_id,self.__dict_industry[value])
# cursor_get.execute(select_sql)
# if cursor_get.fetchone():
# print ("reseat",select_sql)
# continue
sql_insert = "insert into zzh_industry_item(item_id,industry_id)values ({item_id},{industry_id})".format(
item_id=item_id, industry_id=self.__dict_industry[value])
self.re_queue2.lpush("sqls", str(sql_insert))
except Exception as e:
print e
cursor_get.close()
print ("put over")
def test(self):
cursor2 = self.db.cursor()
count = 0
breakNum = 0
num = 0
try:
while True:
sql = self.re_queue2.rpop("sqls")
if sql:
num += 1
breakNum = 0
print sql
try:
cursor2.execute(sql)
if count == 500:
self.db.commit()
count = 0
else:
count += 1
except Exception as e:
print e
if not sql:
time.sleep(1)
if breakNum == 10:
break
else:
print "breakNum", breakNum
breakNum += 1
finally:
print ("insertSql", num)
self.db.commit()
self.db.close()
if __name__ == ‘__main__‘:
items = InsertData()
print(‘Parent process %s.‘ % os.getpid())
t1 = threading.Thread(target=items.put_redis)
t2 = threading.Thread(target=items.get_redis)
t3 = threading.Thread(target=items.test)
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
原文:https://www.cnblogs.com/libaibuaidufu/p/10170773.html