python 阿里云oss实现直传签名与回调验证的示例方法

签名

import base64
import json
import time
from datetime import datetime
import hmac
from hashlib import sha1

access_key_id = \'\'
# 请填写您的AccessKeySecret。
access_key_secret = \'\'
# host的格式为 bucketname.endpoint ,请替换为您的真实信息。
host = \'\'
# callback_url为 上传回调服务器的URL,请将下面的IP和Port配置为您自己的真实信息。
callback_url = \"\"
# 用户上传文件时指定的前缀。
upload_dir = \'user-dir-prefix/\'
expire_time = 1200
expire_syncpoint = int(time.time() + expire_time)

policy_dict = {
  \'expiration\': datetime.utcfromtimestamp(expire_syncpoint).isoformat() + \'Z\',
  \'conditions\': [
    {\"bucket\": \"test-paige\"},
    [\'starts-with\', \'$key\', \'user/test/\']
  ]
}
policy = json.dumps(policy_dict).strip()
policy_encode = base64.b64encode(policy.encode())
signature = base64.encodebytes(hmac.new(access_key_secret.encode(), policy_encode, sha1).digest())

callback_dict = {
  \'callbackUrl\': callback_url,
  \'callbackBody\': \'filename=${object}&size=${size}&mimeType=${mimeType}&height=${imageInfo.height}&width=${\'
          \'imageInfo.width}\',
  \'callbackBodyType\': \'application/json\'
}

callback = base64.b64encode(json.dumps(callback_dict).strip().encode()).decode()

var = {
  \'accessid\': access_key_id,
  \'host\': host,
  \'policy\': policy_encode.decode(),
  \'signature\': signature.decode().strip(),
  \'expire\': expire_syncpoint,
  \'callback\': callback
}

回调验签

import asyncio
import base64
import time
import aiomysql
import rsa
from aiohttp import web, ClientSession
from urllib import parse
import uuid


def success(msg=\'\', data=None):
  if data is None:
    data = {}
  dict_data = {
    \'code\': 1,
    \'msg\': msg,
    \'data\': data
  }
  return web.json_response(dict_data)


def failed(msg=\'\', data=None):
  if data is None:
    data = {}
  dict_data = {
    \'code\': 0,
    \'msg\': msg,
    \'data\': data
  }
  return web.json_response(dict_data)


async def handle(request):
  \"\"\"
  获取连接池
  :param web.BaseRequest request:
  :return:
  \"\"\"
  authorization_base64 = request.headers[\'authorization\']
  x_oss_pub_key_url_base64 = request.headers[\'x-oss-pub-key-url\']
  pub_key_url = base64.b64decode(x_oss_pub_key_url_base64.encode())
  authorization = base64.b64decode(authorization_base64.encode())
  path = request.path

  async with ClientSession() as session:
    async with session.get(pub_key_url.decode()) as resp:
      pub_key_body = await resp.text()
      pubkey = rsa.PublicKey.load_pkcs1_openssl_pem(pub_key_body.encode())
      body = await request.content.read()
      auth_str = parse.unquote(path) + \'\\n\' + body.decode()
      parse_url = parse.parse_qs(body.decode())
      print(parse_url)
      try:
        rsa.verify(auth_str.encode(), authorization, pubkey)
        pool = request.app[\'mysql_pool\']
        async with pool.acquire() as conn:
          async with conn.cursor() as cur:
            id = str(uuid.uuid4())
            url = parse_url[\'filename\'][0]
            mime = parse_url[\'mimeType\'][0]
            disk = \'oss\'
            time_at = time.strftime(\"%Y-%m-%d %H:%I:%S\", time.localtime())
            sql = \"INSERT INTO media(id,url,mime,disk,created_at,updated_at) VALUES(%s,%s,%s,%s,%s,%s)\"
            await cur.execute(sql, (id, url, mime, disk, time_at, time_at))
            await conn.commit()
        dict_data = {
          \'id\': id,
          \'url\': url,
          \'cdn_url\': \'https://cdn.***.net\' + \'/\' + url,
          \'mime\': mime,
          \'disk\': disk,
          \'created_at\': time_at,
          \'updated_at\': time_at,
        }
        return success(data=dict_data)
      except rsa.pkcs1.VerificationError:
        return failed(msg=\'验证错误\')


async def init(loop):
  # 创建连接池
  mysql_pool = await aiomysql.create_pool(host=\'127.0.0.1\', port=3306,
                      user=\'\', password=\'\',
                      db=\'\', loop=loop)

  async def on_shutdown(application):
    \"\"\"
    接收到关闭信号时,要先关闭连接池,并等待连接池关闭成功.
    :param web.Application application:
    :return:
    \"\"\"
    application[\'mysql_pool\'].close()
    # 没有下面这句话会报错 RuntimeError: Event loop is closed ,因为连接池没有真正关关闭程序就关闭了,引发python的报错
    await application[\'mysql_pool\'].wait_closed()

  application = web.Application()
  application.on_shutdown.append(on_shutdown)
  # 把连接池放到 application 实例中
  application[\'mysql_pool\'] = mysql_pool
  application.add_routes([web.get(\'/\', handle), web.post(\'/oss\', handle)])
  return application


if __name__ == \'__main__\':
  loop = asyncio.get_event_loop()
  application = loop.run_until_complete(init(loop))
  web.run_app(application, host=\'127.0.0.1\')
  loop.close()

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容