import base64 import binascii import random from fastapi import APIRouter, Depends, HTTPException, Query, Form from app.common import util from app.common.schema import ApiResultSchema from app.common.response import ApiResult router = APIRouter() @router.post("/search", name="查询网盘提取码", description=""" #### 测试数据 - url: `0f5e94524cbcf7faRys1SkxMWjRMdks2VDUzeVJVa0s5WGwvWkF6ZmJqNk1qcGJwenN0M1REUT0K` - device_id: `0f5e94524cbcf7fa` - timestamp: `1611275090557` """, response_model=ApiResultSchema) async def search( url: str = Form(..., title="百度网盘分享链接", description="百度网盘分享链接,算法:`device_id + base64(aes(url))`"), device_id: str = Form(..., title="设备标识", description="设备标识"), timestamp: str = Form(..., title="时间戳", description="时间戳"), ): if url is None or device_id is None or timestamp is None: return ApiResult.error_msg("参数错误") urlBase64 = url.replace(device_id, "") try: urlAES = base64.b64decode(urlBase64) except: return ApiResult.error_msg("参数错误") urlFinal = util.aes_decrypt(_key, urlAES) if urlFinal is None or urlFinal.isspace() or len(urlFinal) == 0: return ApiResult.error_msg("参数错误") if 1 - urlFinal.startswith("https://pan.baidu.com/"): return ApiResult.error_msg("非百度网盘分享链接") code = str(random.randint(1000, 9999)) codeHex = binascii.hexlify(code.encode("utf-8")).decode('utf-8') return ApiResult.success_data({"code": codeHex}) # DATA _key = "FVYL5DONEY7OEIVE".encode("utf-8")