1000字范文,内容丰富有趣,学习的好帮手!
1000字范文 > Python批量转存百度网盘资源

Python批量转存百度网盘资源

时间:2019-02-16 16:47:10

相关推荐

Python批量转存百度网盘资源

一. 参考程式

from tkinter import *from retrying import retryimport urllib3,time,sys,re,requestsheaders = {"Cookie": r'',"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36"}request_header = {'Host': '','Connection': 'keep-alive','Upgrade-Insecure-Requests': '1','Sec-Fetch-Dest': 'document','Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9','Sec-Fetch-Site': 'same-site','Sec-Fetch-Mode': 'navigate','Referer': '','Accept-Encoding': 'gzip, deflate, br','Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-US;q=0.7,en-GB;q=0.6,ru;q=0.5',}urllib3.disable_warnings()s = requests.session()s.trust_env = False@retry(stop_max_attempt_number=5, wait_fixed=1000)def get_bdstoken():url = '/api/gettemplatevariable?clienttype=0&app_id=250528&web=1&fields=[%22bdstoken%22,%22token%22,%22uk%22,%22isdocuser%22,%22servertime%22]'response = s.get(url=url, headers=request_header, timeout=20, allow_redirects=True, verify=False)return response.json()['errno'] if response.json()['errno'] != 0 else response.json()['result']['bdstoken']@retry(stop_max_attempt_number=5, wait_fixed=1000)def get_dir_list(bdstoken):url = '/api/list?order=time&desc=1&showempty=0&web=1&page=1&num=1000&dir=%2Fupload&bdstoken=' + bdstokenresponse = s.get(url=url, headers=request_header, timeout=15, allow_redirects=False, verify=False)return response.json()['errno'] if response.json()['errno'] != 0 else response.json()['list']@retry(stop_max_attempt_number=5, wait_fixed=1000)def create_dir(dir_name, bdstoken):url = '/api/create?a=commit&bdstoken=' + bdstokenpost_data = {'path': "/"+dir_name, 'isdir': '1', 'block_list': '[]', }response = s.post(url=url, headers=request_header, data=post_data, timeout=15, allow_redirects=False, verify=False)return response.json()['errno']@retry(stop_max_attempt_number=6, wait_fixed=2000)def check_links(link_url, pass_code, bdstoken):if pass_code:t_str = str(int(round(time.time() * 1000)))check_url = '/share/verify?surl=' + link_url[25:48] + '&bdstoken=' + bdstoken + '&t=' + t_str + '&channel=chunlei&web=1&clienttype=0'post_data = {'pwd': pass_code, 'vcode': '', 'vcode_str': '', }response_post = s.post(url=check_url, headers=request_header, data=post_data, timeout=10, allow_redirects=False, verify=False)if response_post.json()['errno'] == 0:bdclnd = response_post.json()['randsk']else:return response_post.json()['errno']if bool(re.search('BDCLND=', request_header['Cookie'], re.IGNORECASE)):request_header['Cookie'] = re.sub(r'BDCLND=(\S+);?', r'BDCLND=' + bdclnd + ';', request_header['Cookie'])else:request_header['Cookie'] += ';BDCLND=' + bdclndresponse = s.get(url=link_url, headers=request_header, timeout=15, allow_redirects=True, verify=False).content.decode("utf-8")shareid_list = re.findall('"shareid":(\\d+?),"', response)user_id_list = re.findall('"share_uk":"(\\d+?)","', response)fs_id_list = re.findall('"fs_id":(\\d+?),"', response)info_title_list = re.findall('<title>(.+)</title>', response)if not shareid_list:return 1elif not user_id_list:return 2elif not fs_id_list:return info_title_list[0] if info_title_list else 3else:return [shareid_list[0], user_id_list[0], fs_id_list, info_title_list[0].replace("_免费高速下载|百度网盘-分享无限制","")]@retry(stop_max_attempt_number=20, wait_fixed=2000)def transfer_files(check_links_reason, dir_name, bdstoken):url = '/share/transfer?shareid=' + check_links_reason[0] + '&from=' + check_links_reason[1] + '&bdstoken=' + bdstoken + '&channel=chunlei&web=1&clienttype=0'fs_id = ','.join(i for i in check_links_reason[2])post_data = {'fsidlist': '[' + fs_id + ']', 'path': '/' + dir_name, }response = s.post(url=url, headers=request_header, data=post_data, timeout=15, allow_redirects=False, verify=False)return response.json()['errno']def savebp(dir_name,url):cookie = headers['Cookie']user_agent = headers['User-Agent']request_header['Cookie'] = cookierequest_header['User-Agent'] = user_agenttry:if any([ord(word) not in range(256) for word in cookie]) or cookie.find('BAIDUID=') == -1:sys.exit()bdstoken = get_bdstoken()if isinstance(bdstoken, int):sys.exit()dir_list_json = get_dir_list(bdstoken)if type(dir_list_json) != list:sys.exit()dir_list = [dir_json['server_filename'] for dir_json in dir_list_json]if dir_name and dir_name not in dir_list:create_dir_reason = create_dir(dir_name, bdstoken)if create_dir_reason != 0:sys.exit()url_code = url.replace("?pwd=", " ")link_url_org, pass_code_org = re.sub(r'提取码*[::](.*)', r'\1', url_code.lstrip()).split(' ', maxsplit=1)[link_url, pass_code] = [link_url_org.strip()[:47], pass_code_org.strip()[:4]]check_links_reason = check_links(link_url, pass_code, bdstoken)if isinstance(check_links_reason, list):transfer_files_reason = transfer_files(check_links_reason, dir_name, bdstoken)if transfer_files_reason == 0:filepath = "/"+dir_name+"/"+check_links_reason[3]print(f'转存成功:{filepath} {url_code}')return filepathexcept Exception as e:print(e)return None# 单链接转存url = "/s/*******************?pwd=****"savebp("test",url)# 批量链接转存urls = ["/s/*******************?pwd=****","/s/*******************?pwd=****","/s/*******************?pwd=****"]for url in urls:savebp("test",url)

二. 使用方法

首先需要使用浏览器登陆百度网盘,打开浏览器开发者模式,并打开网络选项

重新刷新网页后找到的请求,并复制Cookie

将拿到的Cookie填入headers中,并将需要转存的链接添加到 url 或 urls 中,注意链接格式

# cookieheaders = {"Cookie": r'Your Cookie',"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36"}# 单链接转存url = "/s/*******************?pwd=****"savebp("test",url)# 批量链接转存urls = ["/s/*******************?pwd=****","/s/*******************?pwd=****","/s/*******************?pwd=****"]for url in urls:# 参数:转存目录,分享链接savebp("test",url)

注:链接格式确保一致

三. 注意事项

需要将以下程序保存为retrying.py,并放在同目录

import randomimport siximport sysimport timeimport traceback# sys.maxint / 2, since Python 3.2 doesn't have a sys.maxint...MAX_WAIT = 1073741823def retry(*dargs, **dkw):"""Decorator function that instantiates the Retrying object@param *dargs: positional arguments passed to Retrying object@param **dkw: keyword arguments passed to the Retrying object"""# support both @retry and @retry() as valid syntaxif len(dargs) == 1 and callable(dargs[0]):def wrap_simple(f):@six.wraps(f)def wrapped_f(*args, **kw):return Retrying().call(f, *args, **kw)return wrapped_freturn wrap_simple(dargs[0])else:def wrap(f):@six.wraps(f)def wrapped_f(*args, **kw):return Retrying(*dargs, **dkw).call(f, *args, **kw)return wrapped_freturn wrapclass Retrying(object):def __init__(self,stop=None, wait=None,stop_max_attempt_number=None,stop_max_delay=None,wait_fixed=None,wait_random_min=None, wait_random_max=None,wait_incrementing_start=None, wait_incrementing_increment=None,wait_exponential_multiplier=None, wait_exponential_max=None,retry_on_exception=None,retry_on_result=None,wrap_exception=False,stop_func=None,wait_func=None,wait_jitter_max=None):self._stop_max_attempt_number = 5 if stop_max_attempt_number is None else stop_max_attempt_numberself._stop_max_delay = 100 if stop_max_delay is None else stop_max_delayself._wait_fixed = 1000 if wait_fixed is None else wait_fixedself._wait_random_min = 0 if wait_random_min is None else wait_random_minself._wait_random_max = 1000 if wait_random_max is None else wait_random_maxself._wait_incrementing_start = 0 if wait_incrementing_start is None else wait_incrementing_startself._wait_incrementing_increment = 100 if wait_incrementing_increment is None else wait_incrementing_incrementself._wait_exponential_multiplier = 1 if wait_exponential_multiplier is None else wait_exponential_multiplierself._wait_exponential_max = MAX_WAIT if wait_exponential_max is None else wait_exponential_maxself._wait_jitter_max = 0 if wait_jitter_max is None else wait_jitter_max# TODO add chaining of stop behaviors# stop behaviorstop_funcs = []if stop_max_attempt_number is not None:stop_funcs.append(self.stop_after_attempt)if stop_max_delay is not None:stop_funcs.append(self.stop_after_delay)if stop_func is not None:self.stop = stop_funcelif stop is None:self.stop = lambda attempts, delay: any(f(attempts, delay) for f in stop_funcs)else:self.stop = getattr(self, stop)# TODO add chaining of wait behaviors# wait behaviorwait_funcs = [lambda *args, **kwargs: 0]if wait_fixed is not None:wait_funcs.append(self.fixed_sleep)if wait_random_min is not None or wait_random_max is not None:wait_funcs.append(self.random_sleep)if wait_incrementing_start is not None or wait_incrementing_increment is not None:wait_funcs.append(self.incrementing_sleep)if wait_exponential_multiplier is not None or wait_exponential_max is not None:wait_funcs.append(self.exponential_sleep)if wait_func is not None:self.wait = wait_funcelif wait is None:self.wait = lambda attempts, delay: max(f(attempts, delay) for f in wait_funcs)else:self.wait = getattr(self, wait)# retry on exception filterif retry_on_exception is None:self._retry_on_exception = self.always_rejectelse:self._retry_on_exception = retry_on_exception# TODO simplify retrying by Exception types# retry on result filterif retry_on_result is None:self._retry_on_result = self.never_rejectelse:self._retry_on_result = retry_on_resultself._wrap_exception = wrap_exceptiondef stop_after_attempt(self, previous_attempt_number, delay_since_first_attempt_ms):"""Stop after the previous attempt >= stop_max_attempt_number."""return previous_attempt_number >= self._stop_max_attempt_numberdef stop_after_delay(self, previous_attempt_number, delay_since_first_attempt_ms):"""Stop after the time from the first attempt >= stop_max_delay."""return delay_since_first_attempt_ms >= self._stop_max_delaydef no_sleep(self, previous_attempt_number, delay_since_first_attempt_ms):"""Don't sleep at all before retrying."""return 0def fixed_sleep(self, previous_attempt_number, delay_since_first_attempt_ms):"""Sleep a fixed amount of time between each retry."""return self._wait_fixeddef random_sleep(self, previous_attempt_number, delay_since_first_attempt_ms):"""Sleep a random amount of time between wait_random_min and wait_random_max"""return random.randint(self._wait_random_min, self._wait_random_max)def incrementing_sleep(self, previous_attempt_number, delay_since_first_attempt_ms):"""Sleep an incremental amount of time after each attempt, starting atwait_incrementing_start and incrementing by wait_incrementing_increment"""result = self._wait_incrementing_start + (self._wait_incrementing_increment * (previous_attempt_number - 1))if result < 0:result = 0return resultdef exponential_sleep(self, previous_attempt_number, delay_since_first_attempt_ms):exp = 2 ** previous_attempt_numberresult = self._wait_exponential_multiplier * expif result > self._wait_exponential_max:result = self._wait_exponential_maxif result < 0:result = 0return resultdef never_reject(self, result):return Falsedef always_reject(self, result):return Truedef should_reject(self, attempt):reject = Falseif attempt.has_exception:reject |= self._retry_on_exception(attempt.value[1])else:reject |= self._retry_on_result(attempt.value)return rejectdef call(self, fn, *args, **kwargs):start_time = int(round(time.time() * 1000))attempt_number = 1while True:try:attempt = Attempt(fn(*args, **kwargs), attempt_number, False)except:tb = sys.exc_info()attempt = Attempt(tb, attempt_number, True)if not self.should_reject(attempt):return attempt.get(self._wrap_exception)delay_since_first_attempt_ms = int(round(time.time() * 1000)) - start_timeif self.stop(attempt_number, delay_since_first_attempt_ms):if not self._wrap_exception and attempt.has_exception:# get() on an attempt with an exception should cause it to be raised, but raise just in caseraise attempt.get()else:raise RetryError(attempt)else:sleep = self.wait(attempt_number, delay_since_first_attempt_ms)if self._wait_jitter_max:jitter = random.random() * self._wait_jitter_maxsleep = sleep + max(0, jitter)time.sleep(sleep / 1000.0)attempt_number += 1class Attempt(object):"""An Attempt encapsulates a call to a target function that may end as anormal return value from the function or an Exception depending on whatoccurred during the execution."""def __init__(self, value, attempt_number, has_exception):self.value = valueself.attempt_number = attempt_numberself.has_exception = has_exceptiondef get(self, wrap_exception=False):"""Return the return value of this Attempt instance or raise an Exception.If wrap_exception is true, this Attempt is wrapped inside of aRetryError before being raised."""if self.has_exception:if wrap_exception:raise RetryError(self)else:six.reraise(self.value[0], self.value[1], self.value[2])else:return self.valuedef __repr__(self):if self.has_exception:return "Attempts: {0}, Error:\n{1}".format(self.attempt_number, "".join(traceback.format_tb(self.value[2])))else:return "Attempts: {0}, Value: {1}".format(self.attempt_number, self.value)class RetryError(Exception):"""A RetryError encapsulates the last Attempt instance right before giving up."""def __init__(self, last_attempt):self.last_attempt = last_attemptdef __str__(self):return "RetryError[{0}]".format(self.last_attempt)

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。