cachetools 3.1.1 celery 3.1.26.post2 celery-with-redis 3.0 certifi 2019.9.11 Django 2.2.6 django-allauth 0.40.0 django-appconf 1.0.3 django-celery 3.3.1 django-celery-results 1.0.0 django-compressor 2.3 django-contrib-comments 1.9.1 django-cors-headers 3.1.1 django-crispy-forms 1.8.0 django-environ 0.4.5 django-filter 2.2.0 django-fluent-comments 2.1 django-formtools 2.1 django-haystack 2.8.1 django-import-export 1.2.0 django-markdownx 2.0.28 django-redis 4.10.0 django-redis-cache 2.1.0 django-redis-sessions 0.6.1 django-rest-auth 0.9.5 django-rest-framework 0.1.0 django-reversion 3.0.4 django-tag-parser 3.1 django-taggit 1.1.0 django-test-plus 1.3.1 django-tinymce 2.8.0 django-validator 0.2.7 djangorestframework 3.10.3 djangorestframework-jwt 1.11.0 flower 0.9.3 PyMySQL 0.9.3 redis 2.10.6 tornado 5.1.1
INSTALLED_APPS = [
‘django.contrib.admin‘,
‘django.contrib.auth‘,
‘django.contrib.contenttypes‘,
‘django.contrib.sessions‘,
‘django.contrib.messages‘,
‘django.contrib.staticfiles‘,
‘steam_user‘, # steam
‘djcelery‘, # celery 注册到应用中
‘rest_framework‘,
]
import djcelery
djcelery.setup_loader()
# celery中间人 redis://redis服务所在的ip地址:端口/数据库号
# BROKER_URL = ‘redis://:password@10.133.3.34:6379/2‘ # password你设置的密码
BROKER_URL = ‘redis://10.133.3.34:6379/2‘
# celery结果返回,可用于跟踪结果
CELERY_RESULT_BACKEND = ‘redis://10.133.3.34:6379/2‘
CELERY_IMPORTS = (‘apps.steam_user.tasks‘) # tasks 路径
CELERY_BROKER_URL = ‘redis://10.133.3.34:6379/2‘
# celery内容等消息的格式设置
CELERY_ACCEPT_CONTENT = [‘application/json‘, ]
CELERY_TASK_SERIALIZER = ‘json‘
CELERY_RESULT_SERIALIZER = ‘json‘
# celery时区设置,使用settings中TIME_ZONE同样的时区
CELERY_TIMEZONE = ‘Asia/Shanghai‘

from __future__ import absolute_import, unicode_literals
from celery import Celery
from django.conf import settings
from os import path, environ
project_name = path.split(path.dirname(__file__))[-1] # okr_manage
project_settings = "{}.settings".format(project_name)
# 设置环境变量
environ.setdefault("DJANGO_SETTINGS_MODULE", project_settings)
# 实例化Celery
app = Celery(project_name)
# 使用django的settings文件配置celery
app.config_from_object("django.conf:settings")
# Celery加载所有注册的应用
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print("Request: {0!r}".format(self.request))
from __future__ import absolute_import, unicode_literals
import time, requests, json, hashlib
from celery import task, shared_task
from apps.steam_user.query import game_app_query, achievement_query, delete_games_query, steam_friends_query, delete_friends_query, steam_wishlist_query, delete_wishlist_query
from spider_user.settings import MD5_KEY
from apps.steam_user.request_data import user_info
@shared_task() # 装饰
def steam_game_app(user_id, body): # 游戏 游戏成就
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.99 Safari/537.36",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", }
response = requests.get(
url=‘http://api.steampowered.com/IPlayerService/GetOwnedGames/v1?key=23241F06EEB8234C46D8F390A718F71F&steamid={}&include_appinfo=true&include_played_free_games=true‘.format(
user_id),
headers=headers, verify=False).content.decode(‘utf-8‘)
app_origin_id_lists = []
data_dict = json.loads(response)
if data_dict[‘response‘]:
for data in data_dict[‘response‘][‘games‘]:
item = {}
item[‘steamid‘] = user_id
item[‘appid‘] = data[‘appid‘]
app_origin_id_lists.append(item[‘appid‘])
item[‘create_time‘] = time.strftime("%Y-%m-%d %X")
item[‘play_tim‘] = data[‘playtime_forever‘]
try:
item[‘last_run_time‘] = data[‘playtime_2weeks‘]
except:
item[‘last_run_time‘] = 0
game_app_query(item)
achievement_response = requests.get(
url=‘http://api.steampowered.com/ISteamUserStats/GetPlayerAchievements/v1/?key=23241F06EEB8234C46D8F390A718F71F&steamid={}&appid={}&l=schinese‘.format(
user_id, item[‘appid‘]),
headers=headers, verify=False).content.decode(‘utf-8‘)
data_dict = json.loads(achievement_response)
try:
data_list = data_dict[‘playerstats‘][‘achievements‘]
except:
data_list = []
for data in data_list:
item[‘is_finish‘] = data[‘achieved‘]
item[‘english_name‘] = data[‘apiname‘]
item[‘achieve_name‘] = data[‘name‘].replace(‘"‘, ‘“‘)
item[‘achieve_time‘] = time.strftime(‘%Y-%m-%d %H:%M:%S‘, time.localtime(int(data[‘unlocktime‘])))
achievement_query(item)
if app_origin_id_lists:
print(app_origin_id_lists)
print(‘*‘ * 100)
item = {}
item[‘steamid‘] = user_id
item[‘spider_member_app_list‘] = app_origin_id_lists
delete_games_query(item)
item = {
‘code‘: 0,
‘message‘: ‘抓取成功‘.encode("utf-8").decode("latin1"),
‘body‘: body[‘body‘],
}
local_time = str(int(time.time() * 1000))
md_data = ‘{}&{}&{}‘.format(json.dumps(body, ensure_ascii=False), MD5_KEY, local_time).replace(‘ ‘, ‘‘)
access_token = hashlib.md5(md_data.encode(‘utf-8‘)).hexdigest()
headers = {
‘Content-Type‘: ‘application/json‘,
‘time‘: local_time,
‘accessToken‘: access_token,
‘isTimer‘: ‘False‘
}
requests.post(url=‘http://10.133.3.144:7071/spider/callback‘, data=json.dumps(item, ensure_ascii=False),
headers=headers)
return ‘ok‘
else:
item = {
‘code‘: 403,
‘message‘: ‘用户设为隐私‘.encode("utf-8").decode("latin1"),
‘body‘: body[‘body‘],
}
local_time = str(int(time.time() * 1000))
md_data = ‘{}&{}&{}‘.format(json.dumps(body, ensure_ascii=False), MD5_KEY, local_time).replace(‘ ‘, ‘‘)
access_token = hashlib.md5(md_data.encode(‘utf-8‘)).hexdigest()
headers = {
‘Content-Type‘: ‘application/json‘,
‘time‘: local_time,
‘accessToken‘: access_token,
‘isTimer‘: ‘False‘
}
requests.post(url=‘http://10.133.3.144:7071/spider/callback‘, data=json.dumps(item, ensure_ascii=False),
headers=headers)
return ‘ok‘
@shared_task()
def steam_friends(user_id, body): # 用户好友
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.99 Safari/537.36",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", }
response = requests.get(
url=‘http://api.steampowered.com/ISteamUser/GetFriendList/v1/?key=23241F06EEB8234C46D8F390A718F71F&steamid={}‘.format(
user_id),
headers=headers, verify=False).content.decode(‘utf-8‘)
if json.loads(response):
friends_list = []
for data in json.loads(response)[‘friendslist‘][‘friends‘]:
item = {}
item[‘steamid_pid‘] = user_id
item[‘steamid‘] = data[‘steamid‘]
friends_list.append(item[‘steamid‘])
item[‘relationship‘] = data[‘relationship‘]
item[‘friend_since‘] = time.strftime(‘%Y-%m-%d %H:%M:%S‘, time.localtime(int(data[‘friend_since‘])))
item[‘create_time‘] = time.strftime("%Y-%m-%d %X")
data = user_info(item)
item[‘personaname‘] = data[‘personName‘]
item[‘avatarfull‘] = data[‘headPortrait184‘]
steam_friends_query(item)
if friends_list:
item = {}
item[‘steamid_pid‘] = user_id
item[‘friends_list‘] = friends_list
delete_friends_query(item)
item = {
‘code‘: 0,
‘message‘: ‘抓取成功‘.encode("utf-8").decode("latin1"),
‘body‘: body[‘body‘],
}
local_time = str(int(time.time() * 1000))
md_data = ‘{}&{}&{}‘.format(json.dumps(body, ensure_ascii=False), MD5_KEY, local_time).replace(‘ ‘, ‘‘)
access_token = hashlib.md5(md_data.encode(‘utf-8‘)).hexdigest()
headers = {
‘Content-Type‘: ‘application/json‘,
‘time‘: local_time,
‘accessToken‘: access_token,
‘isTimer‘: ‘False‘
}
requests.post(url=‘http://10.133.3.144:7071/spider/callback‘, data=json.dumps(item, ensure_ascii=False),
headers=headers)
return ‘ok‘
else:
item = {
‘code‘: 403,
‘message‘: ‘用户设为隐私‘.encode("utf-8").decode("latin1"),
‘body‘: body[‘body‘],
}
local_time = str(int(time.time() * 1000))
md_data = ‘{}&{}&{}‘.format(json.dumps(body, ensure_ascii=False), MD5_KEY, local_time).replace(‘ ‘, ‘‘)
access_token = hashlib.md5(md_data.encode(‘utf-8‘)).hexdigest()
headers = {
‘Content-Type‘: ‘application/json‘,
‘time‘: local_time,
‘accessToken‘: access_token,
‘isTimer‘: ‘False‘
}
requests.post(url=‘http://10.133.3.144:7071/spider/callback‘, data=json.dumps(item, ensure_ascii=False),
headers=headers)
return ‘ok‘
@shared_task()
def steam_wishlist(user_id, body): # 愿望单
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.99 Safari/537.36",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", }
response = requests.get(
url=‘https://store.steampowered.com/wishlist/profiles/{}/wishlistdata/‘.format(
user_id),
headers=headers, verify=False).content.decode(‘utf-8‘)
app_origin_id_list = []
data_dict = json.loads(response)
if data_dict:
for data in data_dict.keys():
item = {}
item[‘steamid‘] = user_id
item[‘appid‘] = data
app_origin_id_list.append(item[‘appid‘])
item[‘create_time‘] = time.strftime("%Y-%m-%d %X")
item[‘add_time‘] = time.strftime(‘%Y-%m-%d %H:%M:%S‘, time.localtime(int(data_dict[data][‘added‘])))
steam_wishlist_query(item)
if app_origin_id_list:
item = {}
item[‘steamid‘] = user_id
item[‘spider_member_wishlist_list‘] = app_origin_id_list
delete_wishlist_query(item)
item = {
‘code‘: 0,
‘message‘: ‘抓取成功‘.encode("utf-8").decode("latin1"),
‘body‘: body[‘body‘],
}
local_time = str(int(time.time() * 1000))
md_data = ‘{}&{}&{}‘.format(json.dumps(body, ensure_ascii=False), MD5_KEY, local_time).replace(‘ ‘, ‘‘)
access_token = hashlib.md5(md_data.encode(‘utf-8‘)).hexdigest()
headers = {
‘Content-Type‘: ‘application/json‘,
‘time‘: local_time,
‘accessToken‘: access_token,
‘isTimer‘: ‘False‘
}
requests.post(url=‘http://10.133.3.144:7071/spider/callback‘, data=json.dumps(item, ensure_ascii=False),
headers=headers)
return ‘ok‘
from django.views import View
from django.http import HttpResponse, JsonResponse
from apps.steam_user import tasks
from rest_framework import status
from rest_framework.views import APIView
from rest_framework.response import Response
from steam_user.exceptions import NotFound, BadRequest, Forbidden
from spider_user.settings import MD5_KEY
import hashlib, json, time, requests
from apps.steam_user.request_data import user_info
from django.core.cache import cache
# Create your views here.
class SteamUserState(APIView):
def post(self, request, *args, **kwargs):
‘‘‘
请求用户资料状态
‘‘‘
post_data = request.data
local_time = request.META.get(‘HTTP_TIME‘)
accessToken = request.META.get(‘HTTP_ACCESSTOKEN‘)
body = post_data
user_id = body[‘body‘][‘userId‘]
print(post_data)
if int(time.time() * 1000) - 60000000000 < int(local_time):
md_data = ‘{}&{}&{}‘.format(json.dumps(body, ensure_ascii=False), MD5_KEY, local_time).replace(‘ ‘, ‘‘)
access_token = hashlib.md5(md_data.encode(‘utf-8‘)).hexdigest()
print(access_token)
if access_token == accessToken:
if body[‘body‘][‘platformId‘] == 1 and body[‘body‘][‘kind‘] == 1:
data = user_info(user_id)
item = {
‘code‘: 0,
‘message‘: ‘成功‘,
‘body‘: dict(data, **body[‘body‘]),
}
cache.set(‘steam_{}‘.format(user_id), data[‘userState‘])
return Response(item, status=status.HTTP_200_OK)
if body[‘body‘][‘platformId‘] == 1 and body[‘body‘][‘kind‘] == 2:
tasks.steam_game_app.delay(user_id, body) # 传递参数
item = {
‘code‘: 0,
‘message‘: ‘爬取中‘,
}
return Response(item, status=status.HTTP_200_OK)
if body[‘body‘][‘platformId‘] == 1 and body[‘body‘][‘kind‘] == 3:
tasks.steam_wishlist.delay(user_id, body) # 传递参数
item = {
‘code‘: 0,
‘message‘: ‘爬取中‘,
}
return Response(item, status=status.HTTP_200_OK)
if body[‘body‘][‘platformId‘] == 1 and body[‘body‘][‘kind‘] == 4:
tasks.steam_friends.delay(user_id, body) # 传递参数
item = {
‘code‘: 0,
‘message‘: ‘爬取中‘,
}
return Response(item, status=status.HTTP_200_OK)
else:
return Response({‘code‘: 401, ‘message‘: ‘accessToken失效‘}, status=status.HTTP_401_UNAUTHORIZED)
else:
return Response({‘code‘: 401, ‘message‘: ‘accessToken失效‘}, status=status.HTTP_401_UNAUTHORIZED)
python3.6 manage.py runserver 10.133.3.34:8000 # 启动Django项目 python3.6 manage.py celery worker --loglevel=info # 启动worker python3.6 manage.py celery flower # 启动celery flower 访问 http://10.133.3.34:5555/ 可以看到任务
原文:https://www.cnblogs.com/yoyo1216/p/11947077.html