首页 > 编程语言 > 详细

Python 计算AWS4签名,Header鉴权与URL鉴权

时间:2021-07-24 11:44:25      阅读:18      评论:0      收藏:0      [点我收藏+]

AWS4 版本签名计算参考

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
#  @Time: 2021/7/24 8:12
#  @Author:zhangmingda
#  @File: api_for_aws4_signature.py
#  @Software: PyCharm
#  Description:

from urllib.request import quote
import hashlib
import hmac
import datetime
import requests
import json
import base64

class KscClient(object):
    def __init__(self, ak, sk,service, domain, region, use_ssl=False):
        self.ak = ak
        self.sk = sk
        self.host = service + "." + domain
        self.region = region
        self.service = service
        self.protocol = https if use_ssl else http
        self.base_url = self.protocol + :// + self.host + /

    # 创建规范化请求查询字符串方法
    def generate_canonical_querystring(self, **params):
        ‘‘‘
        :param params: 查询字符串字典
        :return: url编码后的查询字符串key1=value1&key2=value2 ...
        ‘‘‘
        sorted_params = ‘‘
        sorted_param_keys = sorted(params.keys())
        for param in sorted_param_keys:
            # print(param)
            sorted_params += quote(param) + = + quote(params.get(param), safe=‘‘) + &  # 查询字符串中的值任何字符串都进行URL编码
            # print(sorted_params)
        return sorted_params[:-1]

    #  生成标准化请求头,请求头大写转小写,排序返回字符串
    def generate_canonical_headers(self, **headers):
        ‘‘‘
        :param headers: 所有用来签名的请求头
        :return: 返回小写请求头排序后的key:value 字符串,每个key:value 之间\n换行
        ‘‘‘
        sorted_headers = ‘‘                                         # 准备标准请求头key+value字符串
        sorted_signed_headers = ‘‘                                  # 准备签名的请求头key组合字符串
        header_keys = headers.keys()                                # 获取请求头所有key
        lower_headers = {}                                          # 准备存放小写请求头和值的字典
        for header in header_keys:                                  # 把大写请求头字典都转换为小写请求头字典
            lower_headers[header.lower()] = headers.get(header)
        sorted_lower_header_keys = sorted(lower_headers.keys())     # 把签名的请求头排序

        ‘‘‘排序后的请求头和值进行组合字符串‘‘‘
        for lower_header in sorted_lower_header_keys:
            sorted_headers += lower_header + : + str(lower_headers[lower_header]).strip() + \n
            sorted_signed_headers += lower_header + ";"
        # print("============请求头排序==============")
        # print(sorted_headers)
        # print(‘------被签名的请求头----‘)
        # print(sorted_signed_headers)
        # print("==========请求头排序end============")
        sorted_lower_headers = {
            sorted_headers: sorted_headers,                           # 签名的请求头key 和 value
            signed_headers: sorted_signed_headers[:-1]                # 被签名的请求头字符串组合,最后一个分号; 不要
        }
        return sorted_lower_headers

    # 十六进制请求体Sha256Hash值--->请求体用
    def bytes_data_sha256_hex(self, bytes_data=‘‘.encode()):
        ‘‘‘
        :param binary_data: 字节码文件
        :return: 返回请求体内容的sha256 哈希值
        ‘‘‘
        sha256 = hashlib.sha256()
        sha256.update(bytes_data)
        return sha256.hexdigest().lower()

    # 构建被签名字符串,把标准请求canonical_request 哈希取16进制用
    def str_sha256_hex(self, string=‘‘):
        ‘‘‘
        :param string:
        :return: 字符串的sha256哈希值
        ‘‘‘
        sha256 = hashlib.sha256()
        sha256.update(string.encode())
        return sha256.hexdigest()

    # UTC时间字符串工具
    def get_utc_datetime(self):
        now_utc = datetime.datetime.utcnow()
        datetime_utc = now_utc.strftime(%Y%m%dT%H%M%SZ)
        date_utc = now_utc.strftime(%Y%m%d)
        return datetime_utc, date_utc

    # 生成规范化请求的字符串方法
    def generate_canonical_request(self, method, encode_uri, canonical_querystring, canonical_headers, signed_headers,payload_sha256_hex):
        ‘‘‘ 1.1 规范化请求CanonicalRequest计算方法‘‘‘
        return method + \n                + encode_uri + \n                + canonical_querystring + \n                + canonical_headers + \n                + signed_headers + \n                + payload_sha256_hex  

    # 生成被签名字符串的方法
    def generate_string_tosign(self, algorithm, datetime_utc, credentialscope, canonical_reques_sha256_hex ):
        ‘‘‘ 1.2 被签名字符串计算方法‘‘‘
        return algorithm + \n                         + datetime_utc + \n                         + credentialscope + \n                         + canonical_reques_sha256_hex

    # 第二步构建签名key的工具方法
    def encode_string_to_hmac_256_digest(self, encode_salt, msg, digestmod=hashlib.sha256):
        ‘‘‘
        :param encode_salt: 盐
        :param msg: 字符串信息
        :param digestmod: 摘要算法
        :return: HMAC-SHA256 加盐哈希后的字节
        ‘‘‘
        digest_maker = hmac.new(encode_salt, msg.encode(), digestmod=digestmod)
        return digest_maker.digest()

    # 创建签名KEY的方法
    def generate_signing_key(self, signature_version,sk, date_utc, region, service, termchar=aws4_request):
        k_secret = signature_version + sk
        k_date = self.encode_string_to_hmac_256_digest(k_secret.encode(), date_utc)
        k_region = self.encode_string_to_hmac_256_digest(k_date, region)
        k_service = self.encode_string_to_hmac_256_digest(k_region, service)
        k_signing = self.encode_string_to_hmac_256_digest(k_service, termchar)
        return k_signing

    # 创建签名的方法
    def generate_signature(self, signing_key, string_tosign):
        digest_maker = hmac.new(signing_key, string_tosign.encode(), digestmod=hashlib.sha256)
        hex_signature = digest_maker.hexdigest()
        # print(‘hex_signature:‘, digest_maker.hexdigest())
        return hex_signature

    # 构建带签名的请求头,或签名url查询字符串
    def get_auth_data(self, method, query_params, append_headers={}, binary_payload=‘‘.encode()):
        # ========================================1. 创建被签名字符串======================================
        # 1.1 构建标准化请求
        method = method             # 1.1.1 HTTP方法
        canonical_uri = /         # 1.1.2 资源URI
        encode_uri = quote(canonical_uri, safe="/")  # 1.1.2 将URI编码成%字符串格式 不包含? 后的url查询参数
        payload_sha256_hex = self.bytes_data_sha256_hex(binary_payload)  # 1.1.5 请求体sha256 十六进制HASH值
        datetime_utc, date_utc = self.get_utc_datetime()
        # 准备请求头
        signature_headers = {
            Host: self.host,
            X-amz-date: datetime_utc
        }
        signature_headers.update(append_headers)
        # 请求头排序格式化
        sorted_lower_headers = self.generate_canonical_headers(**signature_headers)
        canonical_headers = sorted_lower_headers.get(sorted_headers)      # 1.1.4 小写排序后的请求头key:value
        signed_headers = sorted_lower_headers.get(signed_headers)         # 1.1.5 小写排序后的签名头
        # 1.2 创建信任状
        credentialscope = date_utc + "/" + self.region + "/" + self.service + "/aws4_request"
        # 查询参数字典
        auth_params = {}
        # 签名放在URL中时计算签名传递参数
        if query_params:
            auth_params = {
                X-Amz-Algorithm: "AWS4-HMAC-SHA256",
                X-Amz-Credential: self.ak + / + credentialscope,
                X-Amz-Date: datetime_utc,
                X-Amz-SignedHeaders: signed_headers,
            }
            auth_params.update(query_params)
            # print(‘auth_params: ‘, auth_params)
        auth_in_header_canonical_querystring = self.generate_canonical_querystring(**query_params)      # 1.1.3 通过header传递签名的查询字符串
        auth_in_queryparam_canonical_querystring = self.generate_canonical_querystring(**auth_params)   # 1.1.3 通过URL传递签名的查询字符串
        canonical_request_auth_in_header = self.generate_canonical_request(method, encode_uri, auth_in_header_canonical_querystring, canonical_headers, signed_headers, payload_sha256_hex)
        canonical_request_auth_in_url = self.generate_canonical_request(method, encode_uri, auth_in_queryparam_canonical_querystring, canonical_headers, signed_headers, payload_sha256_hex)
        print("Header传递签名的规范化请求".center(50, "="))
        print(canonical_request_auth_in_header)
        print("canonical_request_auth_in_url".center(50, "*"))
        print(canonical_request_auth_in_header)
        print("规范化化请求done".center(50, "="))

        # 1.2 创建被签名字的符串StringToSign
        canonical_request_auth_in_header_sha256_hex = self.str_sha256_hex(canonical_request_auth_in_header)  # 规范化请求的256哈希值
        canonical_request_auth_in_url_sha256_hex = self.str_sha256_hex(canonical_request_auth_in_url)        # 规范化请求的256哈希值
        algorithm = AWS4-HMAC-SHA256
        string_tosign_auth_in_header = self.generate_string_tosign(algorithm, datetime_utc, credentialscope, canonical_request_auth_in_header_sha256_hex)
        string_tosign_auth_in_url = self.generate_string_tosign(algorithm, datetime_utc, credentialscope, canonical_request_auth_in_url_sha256_hex)

        print(Header传递签名被签名的字符串.center(50, =))
        print(string_tosign_auth_in_header)
        print(URL传递签名被签名的字符串.center(50, *))
        print(string_tosign_auth_in_url)
        print(被签名的字符串Done!.center(50,=))

        # ========================================2. 创建签名 ======================================
        # 2.1创建签名签名秘钥
        signature_version = AWS4
        signing_key = self.generate_signing_key(signature_version, self.sk, date_utc, self.region, self.service)
        print("签名秘钥:",signing_key)
        # 2.2 创建签名
        signature_for_header = self.generate_signature(signing_key, string_tosign_auth_in_header)
        signature_for_url = self.generate_signature(signing_key, string_tosign_auth_in_url)
        print(Header鉴权的签名:, signature_for_header)
        print(url鉴权的签名:, signature_for_url)
        # ========================================3 请求头或URL 传递签名鉴权和参数 ======================================
        authorization_headers = {
            Authorization: "AWS4-HMAC-SHA256 Credential="
                             + self.ak + /
                             + credentialscope + ", "
                             + "SignedHeaders="
                             + signed_headers + , 
                             + Signature= + signature_for_header,
        }
        # 3.1请求头携带签名,返回带签名的请求头
        authorization_headers.update(signature_headers)

        # 3.2 url携带签名返回带签名的URL查询字符串
        request_query_params = {
            X-Amz-Signature: signature_for_url
        }
        request_query_params.update(auth_params)
        authorization_params = self.generate_canonical_querystring(**request_query_params)
        # print(‘request_query_params: ‘, request_query_params)
        # print(‘authorization_params:‘,authorization_params)
        auth_data = {
            authorization_headers: authorization_headers,
            authorization_params: authorization_params,
            signature_headers: signature_headers,  # get object 在URL里面传递签名 请求头要带的签名头
        }
        return auth_data

    #  通过标准请求头方式传递签名GET文件
    def get_test(self, query_params={}, append_headers={}):
        method = GET                                      # 1.1.1 HTTP方法
        auth_data = self.get_auth_data(method, query_params, append_headers)
        authorization_headers = auth_data.get(authorization_headers)
        auth_query_params = auth_data.get(authorization_params)
        signature_headers = auth_data.get(signature_headers)

        url = self.base_url + ? + self.generate_canonical_querystring(**query_params)
        print(Header鉴权的URL:,url)
        req = requests.get(url, headers=authorization_headers)
        print(req.status_code)
        print(req.text)
        ********************************************************************
        url_for_auth_param = self.base_url
        print("URL鉴权的URL:", url_for_auth_param)
        req = requests.get(url_for_auth_param, headers=signature_headers,params=auth_query_params)
        print(req.status_code)
        print(req.text)
    # def get_auth_in_query_param(self,query_params={}, append_headers={}):
    #     method = ‘GET‘                                      # 1.1.1 HTTP方法
    #     auth_data = self.get_auth_data(method, query_params, append_headers)
    #     auth_query_params = auth_data.get(‘authorization_params‘)
    #     signature_headers = auth_data.get(‘signature_headers‘)
    #     url_for_auth_param = self.protocol + ‘://‘ + self.host + ‘/‘ + ‘?‘ + auth_query_params
    #     print("url_for_auth_param:", url_for_auth_param)
    #     req = requests.get(url_for_auth_param, headers=signature_headers)
    #     print(req.status_code)
    #     print(req.text)


if __name__ == __main__:
    ak = XXXXXXXXXXXXXXXXXXXXXX
    sk = XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
    service = kcm
    domain = api.ksyun.com
    region = cn-beijing-6
    ksc = KscClient(ak, sk, service, domain, region)
    query_params = {
        Action: GetDownloadLink,
        Version: 2016-03-04,
        CertificateId: kcm2021022216204501
    }
    ksc.get_test(query_params=query_params)

技术分享图片

 

Python 计算AWS4签名,Header鉴权与URL鉴权

原文:https://www.cnblogs.com/zhangmingda/p/15054612.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!