Package lupro
Lupro Library ~~~~~~~~~~~~~~~~~~~~~
Lupro is an Asynchronous HTTP library, written in Python, It is fully compatible with requests
.
usage:
兼容requests
from lupro import lupro r = lupro.get('https://www.python.org') r.status_code 200
其它 lupro.api
请参考 https://github.com/luxuncang/lupro.
:copyright: (c) 2021 by ShengXin Lu.
Expand source code
'''
Lupro Library
~~~~~~~~~~~~~~~~~~~~~
Lupro is an Asynchronous HTTP library, written in Python, It is fully compatible with `requests`.
usage:
* `兼容requests`
>>> from lupro import lupro
>>> r = lupro.get('https://www.python.org')
>>> r.status_code
200
其它 `lupro.api` 请参考 <https://github.com/luxuncang/lupro>.
:copyright: (c) 2021 by ShengXin Lu.
'''
from gevent import monkey
monkey.patch_all() # 先引用
from .hooks import lupros
from .controller import persistence, batch
from .HTTPengine import lupro, analyze
from .api import generator, Batchsubmission, BulkDownload, xpath_Batchanalysis, json_Batchanalysis, re_Batchanalysis, Batchanalysis, async_lupro
from .__version__ import __title__, __description__, __url__, __version__
from .__version__ import __author__, __author_email__, __license__
__all__ = [
'lupro', # lupro 控制台 兼容HTTP客服端
'lupros', # lupros 请求参数
'batch', # 批量任务控制器
'analyze', # 解析器
'persistence', # 对象持久化控制器
'generator',
'Batchsubmission',
'BulkDownload',
'xpath_Batchanalysis',
'json_Batchanalysis',
're_Batchanalysis',
'Batchanalysis',
'async_lupro' # 极简批量请求
]
'''
v1.0.4
* 完全兼容 `requests` 一切操作,只需 `from lupro import lupro as requests` 即可不更改一行代码
* 新增原生异步 `requests`请求 只需 `async_lupro([lupros.get('https://www.python.org')]*10)`
* 标准化函数描述
v1.0.5
* 修复了 `lupro.lupro` 解析方法判断的失误
* 修复了 移除无效代理的方式
v1.0.6 v1.0.7
* 修复了不启用代理且请求错误时的,更新字典的方式
* 文件结构优化
* 新增对象持久化(shelve)
* 新增批量任务自省,冷重启(atexit)
v1.0.8
* 改变了 response.encoding 为空时的编码推测(chardet)
* 优化了html解析器的接口,`response` 类型可直接通过 `class analyze` 和 `class batch` 进行解析
* 新增 css解析器(parsel)
* 新增 httpx 客服端(httpx)
'''
Sub-modules
lupro.HTTPengine
-
lupro HTTP
lupro.analyzesController
-
response解析器
lupro.api
-
lupro api
lupro.config
-
lurpo 全局配置
lupro.controller
-
lupro 内部控制器
lupro.hooks
-
HTTP 请求钩子
lupro.publictool
-
lupro 工具箱
lupro.typing
-
自定义类型注释
lupro.useragent
-
UA池
Functions
def Batchanalysis(mold: str, generator: Union[ForwardRef('list[lupro]'), ForwardRef('list[Response]')], analytic: dict, auxiliary=<function original>) ‑> list
-
lupro批量解析
Args
mold
:str
解析方法generator()
:Union[list[lupro], list[Response]]
lupro实例列表 或 Response实例列表analytic
:dict
解析字典auxiliary
:function
自定义解析处理Returns
list[dict]
- 解析列表
Expand source code
def Batchanalysis(mold : str ,generator : Union["list[lupro]", "list[Response]"], analytic : dict, auxiliary = original) -> list: '''lupro批量解析 Args: `mold` : `str` 解析方法 `generator` : `Union[list[lupro], list[Response]]` lupro实例列表 或 Response实例列表 `analytic` : `dict` 解析字典 `auxiliary` : `function` 自定义解析处理 Returns: list[dict] : 解析列表 ''' if mold == 'xpath': return xpath_Batchanalysis(generator, analytic, auxiliary) elif mold == 'json': return json_Batchanalysis(generator, analytic, auxiliary) elif mold == 're': return re_Batchanalysis(generator, analytic, auxiliary) else: raise TypeError('No corresponding parsing method!')
def Batchsubmission(generator) ‑> list
-
Expand source code
def Batchsubmission(generator) -> list: '''通过实例列表的批量请求 Args: `generator` : `list[lupro]` lupro实例列表 Returns: list : Response列表 ''' a = [gevent.spawn(i.task,) for i in generator] gevent.joinall(a) return ([i.value for i in a])
def BulkDownload(generator) ‑> list
-
Expand source code
def BulkDownload(generator) -> list: '''通过实例列表的批量下载 Args: `generator` : `list[lupro]` lupro实例列表 Returns: list : path 列表 ''' a = [gevent.spawn(i.save_file,) for i in generator] gevent.joinall(a) return ([i.value for i in a])
def async_lupro(generator: list[lupros]) ‑> list
-
Expand source code
def async_lupro(generator : "list[lupros]") -> list: '''原生异步`requests`请求 Args: `generator` : `list[lupros]` lupros实例列表 Returns: list[Response] : Response列表 ''' a = [gevent.spawn(HTTP_ENGINE.request,i[0],*i[1],**i[2]) for i in generator] gevent.joinall(a) return ([i.value for i in a])
def generator(instantiation: lupro, url: list, filenameNo: list = []) ‑> list
-
实例化生成器
Args
instantiation
:lupro
lupro模板实例url
:list
链接表filenameNo
:list
filename 序列且此序列会继承instantiation.filename
Returns
list
- lupro实例列表
Expand source code
def generator(instantiation : lupro, url : list, filenameNo : list = []) -> list: '''实例化生成器 Args: `instantiation` : `lupro` lupro模板实例 `url` : `list` 链接表 `filenameNo` : `list` filename 序列且此序列会继承 `instantiation.filename` Returns: list : lupro实例列表 ''' if not filenameNo: filenameNo = range(len(url)) else: if not len(url)==len(filenameNo): raise ValueError('"url" needs to be consistent with "FileNameno"!') repr = instantiation.__reprs__() res = [] for i,j in enumerate(url): res.append(lupro(**requests_dict(repr,j,filenameNo[i]))) return res
def json_Batchanalysis(generator: Union[ForwardRef('list[lupro]'), ForwardRef('list[Response]')], analytic: dict, auxiliary=<function original>) ‑> list
-
json批量解析器
dtanys\> Args
generator()
:Union[list[lupro], list[Response]]
lupro实例列表 或 Response实例列表analytic
:dict
解析字典auxiliary
:function
自定义解析处理Returns
list[dict]
- 解析列表
Expand source code
def json_Batchanalysis(generator : Union["list[lupro]", "list[Response]"], analytic : dict, auxiliary = original) -> list: '''json批量解析器 <json解析器为 `dtanys`> Args: `generator` : `Union[list[lupro], list[Response]]` lupro实例列表 或 Response实例列表 `analytic` : `dict` 解析字典 `auxiliary` : `function` 自定义解析处理 Returns: list[dict] : 解析列表 ''' if isinstance(generator[0], lupro): a = [gevent.spawn(i.json_analysis, analytic, auxiliary) for i in generator] gevent.joinall(a) return ([i.value for i in a]) else: return [analyze.json(i, analytic, auxiliary) for i in generator]
def re_Batchanalysis(generator: Union[ForwardRef('list[lupro]'), ForwardRef('list[Response]')], analytic: dict, auxiliary=<function original>) ‑> list
-
正则解析器
Args
generator()
:Union[list[lupro], list[Response]]
lupro实例列表 或 Response实例列表analytic
:dict
{str
:function
} 正则解析字典auxiliary
:function
自定义解析处理Returns
list[dict]
- 解析列表
Expand source code
def re_Batchanalysis(generator : Union["list[lupro]", "list[Response]"], analytic : dict, auxiliary = original) -> list: '''正则解析器 Args: `generator` : `Union[list[lupro], list[Response]]` lupro实例列表 或 Response实例列表 `analytic` : `dict`{`str`:`function`} 正则解析字典 `auxiliary` : `function` 自定义解析处理 Returns: list[dict] : 解析列表 ''' if isinstance(generator[0], lupro): a = [gevent.spawn(i.re_analysis, analytic, auxiliary) for i in generator] gevent.joinall(a) return ([i.value for i in a]) else: return [analyze.re(i, analytic, auxiliary) for i in generator]
def xpath_Batchanalysis(generator: Union[ForwardRef('list[lupro]'), ForwardRef('list[Response]')], analytic: dict, auxiliary=<function original>) ‑> list
-
xpath批量解析器
Args
generator()
:Union[list[lupro], list[Response]]
lupro实例列表 或 Response实例列表analytic
:dict
解析字典auxiliary
:function
自定义解析处理Returns
list[dict]
- 解析列表
Expand source code
def xpath_Batchanalysis(generator : Union["list[lupro]", "list[Response]"], analytic : dict, auxiliary = original) -> list: ''' xpath批量解析器 Args: `generator` : `Union[list[lupro], list[Response]]` lupro实例列表 或 Response实例列表 `analytic` : `dict` 解析字典 `auxiliary` : `function` 自定义解析处理 Returns: list[dict] : 解析列表 ''' if isinstance(generator[0], lupro): a = [gevent.spawn(i.xpath_analysis, analytic, auxiliary) for i in generator] gevent.joinall(a) return ([i.value for i in a]) else: return [analyze.xpath(i, analytic, auxiliary) for i in generator]
Classes
class analyze
-
response
解析控制器Expand source code
class analyze(): '''`response` 解析控制器''' @staticmethod def xpath(response, analytic : dict, auxiliary = original): ''' xpath 解析方法. Args: `response` : `response` response响应 `analytic` : `dict[str:str]` xpath解析字典 `auxiliary` : `function` 自定义解析处理 Returns: dict : 解析字典 ''' html = etree.HTML(responsecoding(response)) res = {} for i, j in analytic.items(): res[i] = [auxiliary(r) for r in html.xpath(j)] return res @staticmethod def json(response, analytic : dict, auxiliary = original): ''' json 解析方法. Args: `response` : `response` response响应 `analytic` : `dict[str:str]` json解析字典 `auxiliary` : `function` 自定义解析处理 Returns: dict : 解析字典 ''' if response.apparent_encoding == None: response.encoding = 'utf-8' else: response.encoding = response.apparent_encoding reDict = {} for i,j in analytic.items(): reDict[i] = auxiliary(XDict(response.json(),j).edict()) return reDict @staticmethod def re(response, analytic : dict, auxiliary = original): ''' re 解析方法. Args: `response` : `response` response响应 `analytic` : `dict[str:str]` re解析字典 `auxiliary` : `function` 自定义解析处理 Returns: dict : 解析字典 ''' res = [] html = etree.HTML(responsecoding(response)) res = {} for i, j in analytic.items(): res[i]=[auxiliary(r) for r in j(html)] return res @staticmethod def css(response, analytic : dict, auxiliary = original): ''' css 解析方法. Args: `response` : `response` response响应 `analytic` : `dict[str:str]` css解析字典 `auxiliary` : `function` 自定义解析处理 Returns: dict : 解析字典 ''' res = [] html = parsel.Selector(responsecoding(response)) res = {} for i, j in analytic.items(): res[i] = [auxiliary(r) for r in html.css(j).extract()] return res
Static methods
def css(response, analytic: dict, auxiliary=<function original>)
-
css 解析方法.
Args
response
:response
response响应analytic
:dict[str:str]
css解析字典auxiliary
:function
自定义解析处理Returns
dict
- 解析字典
Expand source code
@staticmethod def css(response, analytic : dict, auxiliary = original): ''' css 解析方法. Args: `response` : `response` response响应 `analytic` : `dict[str:str]` css解析字典 `auxiliary` : `function` 自定义解析处理 Returns: dict : 解析字典 ''' res = [] html = parsel.Selector(responsecoding(response)) res = {} for i, j in analytic.items(): res[i] = [auxiliary(r) for r in html.css(j).extract()] return res
def json(response, analytic: dict, auxiliary=<function original>)
-
json 解析方法.
Args
response
:response
response响应analytic
:dict[str:str]
json解析字典auxiliary
:function
自定义解析处理Returns
dict
- 解析字典
Expand source code
@staticmethod def json(response, analytic : dict, auxiliary = original): ''' json 解析方法. Args: `response` : `response` response响应 `analytic` : `dict[str:str]` json解析字典 `auxiliary` : `function` 自定义解析处理 Returns: dict : 解析字典 ''' if response.apparent_encoding == None: response.encoding = 'utf-8' else: response.encoding = response.apparent_encoding reDict = {} for i,j in analytic.items(): reDict[i] = auxiliary(XDict(response.json(),j).edict()) return reDict
def re(response, analytic: dict, auxiliary=<function original>)
-
re 解析方法.
Args
response
:response
response响应analytic
:dict[str:str]
re解析字典auxiliary
:function
自定义解析处理Returns
dict
- 解析字典
Expand source code
@staticmethod def re(response, analytic : dict, auxiliary = original): ''' re 解析方法. Args: `response` : `response` response响应 `analytic` : `dict[str:str]` re解析字典 `auxiliary` : `function` 自定义解析处理 Returns: dict : 解析字典 ''' res = [] html = etree.HTML(responsecoding(response)) res = {} for i, j in analytic.items(): res[i]=[auxiliary(r) for r in j(html)] return res
def xpath(response, analytic: dict, auxiliary=<function original>)
-
xpath 解析方法.
Args
response
:response
response响应analytic
:dict[str:str]
xpath解析字典auxiliary
:function
自定义解析处理Returns
dict
- 解析字典
Expand source code
@staticmethod def xpath(response, analytic : dict, auxiliary = original): ''' xpath 解析方法. Args: `response` : `response` response响应 `analytic` : `dict[str:str]` xpath解析字典 `auxiliary` : `function` 自定义解析处理 Returns: dict : 解析字典 ''' html = etree.HTML(responsecoding(response)) res = {} for i, j in analytic.items(): res[i] = [auxiliary(r) for r in html.xpath(j)] return res
class batch (instantiation, url: list, filenameNo: list = [])
-
批量任务控制器
实例化生成器
Args
instantiation
:lupro
lupro模板实例url
:list
链接表filenameNo
:list
filename 序列且此序列会继承instantiation.filename
Returns
None
Expand source code
class batch(): '''批量任务控制器''' def __init__(self, instantiation, url : list, filenameNo : list = []) -> list: '''实例化生成器 Args: `instantiation` : `lupro` lupro模板实例 `url` : `list` 链接表 `filenameNo` : `list` filename 序列且此序列会继承 `instantiation.filename` Returns: None ''' self.name = instantiation.filename self.generator = generator(instantiation, url, filenameNo) self.filenameNo = [i.filename for i in self.generator] persistence.shelve.add(self.name, {}) # 任务自省 @reconfig(config = persistence.ENABLED) def province(self): '''任务自省''' fail, success, task = [], [], [] dbdict = persistence.shelve.put() for i,j in enumerate(self.filenameNo): if not j in dbdict: fail.append(j) task.append(self.generator[i]) success.append('-') # not perfect else: success.append(dbdict[j]) persistence.shelve.add(self.name, {'success' : success, 'task' : task, 'filenameNo' : self.filenameNo}) print(f"批量任务 {self.name} >> 一共有 {len(self.filenameNo)} 次请求", f"失败了 {len(fail)} 次" ,sep = '\n') if not len(fail)==0: res = '\n' + '\n'.join(fail) print(f"分别是: {res}") # 冷重启 @staticmethod def coldheavy(filename) -> None: '''冷重启 Args: `filename` : `str` 冷重启对象的 `self.name` Returns: None ''' Batchsubmission(persistence.shelve.put()[filename]['task']) # 任务回调 @staticmethod def callback(filename) -> list: '''任务反持久化 Args: `filename` : `str` 冷重启对象的 `self.name` Returns: list task列表 ''' container = persistence.shelve.put() return {i:container.get(i) for i in container[filename]['filenameNo']} # 批量请求 @abnormal def Batchsubmission(self) -> list: '''批量请求''' atexit.register(batch.province, self) return Batchsubmission(self.generator) # 批量下载 @abnormal def BulkDownload(self) -> list: '''批量下载''' atexit.register(batch.province, self) return BulkDownload(self.generator) # 批量解析 @abnormal def Batchanalysis(self, mold : str , analytic : dict, auxiliary = original) -> list: '''批量解析''' atexit.register(batch.province, self) return Batchanalysis(mold, self.generator, analytic, auxiliary) # xpath 批量解析 @abnormal def xpath_Batchanalysis(self, analytic, auxiliary = original) -> list: '''xpath 批量解析''' atexit.register(batch.province, self) return xpath_Batchanalysis(self.generator, analytic, auxiliary) # json 批量解析 @abnormal def json_Batchanalysis(self, analytic, auxiliary = original) -> list: '''json 批量解析''' atexit.register(batch.province, self) return json_Batchanalysis(self, analytic, auxiliary) # 正则 批量解析 @abnormal def re_Batchanalysis(self, analytic, auxiliary = original) -> list: '''正则 批量解析''' atexit.register(batch.province, self) return re_Batchanalysis(self.generator, analytic, auxiliary)
Static methods
def callback(filename) ‑> list
-
任务反持久化
Args:
filename
:str
冷重启对象的self.name
Returns
list task列表
Expand source code
@staticmethod def callback(filename) -> list: '''任务反持久化 Args: `filename` : `str` 冷重启对象的 `self.name` Returns: list task列表 ''' container = persistence.shelve.put() return {i:container.get(i) for i in container[filename]['filenameNo']}
def coldheavy(filename) ‑> NoneType
-
冷重启
Args:
filename
:str
冷重启对象的self.name
Returns
None
Expand source code
@staticmethod def coldheavy(filename) -> None: '''冷重启 Args: `filename` : `str` 冷重启对象的 `self.name` Returns: None ''' Batchsubmission(persistence.shelve.put()[filename]['task'])
Methods
def Batchanalysis(self, mold: str, analytic: dict, auxiliary=<function original>) ‑> list
-
批量解析
Expand source code
@abnormal def Batchanalysis(self, mold : str , analytic : dict, auxiliary = original) -> list: '''批量解析''' atexit.register(batch.province, self) return Batchanalysis(mold, self.generator, analytic, auxiliary)
def Batchsubmission(self) ‑> list
-
批量请求
Expand source code
@abnormal def Batchsubmission(self) -> list: '''批量请求''' atexit.register(batch.province, self) return Batchsubmission(self.generator)
def BulkDownload(self) ‑> list
-
批量下载
Expand source code
@abnormal def BulkDownload(self) -> list: '''批量下载''' atexit.register(batch.province, self) return BulkDownload(self.generator)
def json_Batchanalysis(self, analytic, auxiliary=<function original>) ‑> list
-
json 批量解析
Expand source code
@abnormal def json_Batchanalysis(self, analytic, auxiliary = original) -> list: '''json 批量解析''' atexit.register(batch.province, self) return json_Batchanalysis(self, analytic, auxiliary)
def province(self)
-
任务自省
Expand source code
@reconfig(config = persistence.ENABLED) def province(self): '''任务自省''' fail, success, task = [], [], [] dbdict = persistence.shelve.put() for i,j in enumerate(self.filenameNo): if not j in dbdict: fail.append(j) task.append(self.generator[i]) success.append('-') # not perfect else: success.append(dbdict[j]) persistence.shelve.add(self.name, {'success' : success, 'task' : task, 'filenameNo' : self.filenameNo}) print(f"批量任务 {self.name} >> 一共有 {len(self.filenameNo)} 次请求", f"失败了 {len(fail)} 次" ,sep = '\n') if not len(fail)==0: res = '\n' + '\n'.join(fail) print(f"分别是: {res}")
def re_Batchanalysis(self, analytic, auxiliary=<function original>) ‑> list
-
正则 批量解析
Expand source code
@abnormal def re_Batchanalysis(self, analytic, auxiliary = original) -> list: '''正则 批量解析''' atexit.register(batch.province, self) return re_Batchanalysis(self.generator, analytic, auxiliary)
def xpath_Batchanalysis(self, analytic, auxiliary=<function original>) ‑> list
-
xpath 批量解析
Expand source code
@abnormal def xpath_Batchanalysis(self, analytic, auxiliary = original) -> list: '''xpath 批量解析''' atexit.register(batch.province, self) return xpath_Batchanalysis(self.generator, analytic, auxiliary)
class lupro (filename: str, lupros: lupros, proxie: bool = False, format: str = 'html', content: int = 200, faultolt: int = 10)
-
lupro
引擎基类初始化
lupro
实例,一个实例代表一个请求或任务.Args
filename
:str
文件路径或请求名称 推荐使用路径命名lupros
:lupros
requests参数字典proxie
:bool
是否使用代理format
:str
保存文件格式content
:int
回调最少字节faultolt
:int
可重试次数Returns
None
Expand source code
class lupro(metaclass = inherit): '''`lupro` 引擎基类''' # 兼容 `requests` __general__ = HTTP_ENGINE # 当前文件夹 onFile = RUNFILE # 代理池 Proxies = PROXIES # 是否验证代理池 VERIFY_PROXIES = VERIFY_PROXIES # 是否已验证代理 IS_AGENT_VERIFIED = False def __init__(self, filename : str, lupros : lupros, proxie : bool = False, format : str = 'html', content : int = 200,faultolt : int = 10): ''' 初始化 `lupro` 实例,一个实例代表一个请求或任务. Args: `filename` : `str` 文件路径或请求名称 推荐使用路径命名 `lupros` : `lupros` requests参数字典 `proxie` : `bool` 是否使用代理 `format` : `str` 保存文件格式 `content` : `int` 回调最少字节 `faultolt` : `int` 可重试次数 Returns: None ''' self.filename = filename # 文件路径或请求名称 推荐使用路径命名 self.format = format # 保存文件格式 self.faultolt = faultolt # 可重试次数 self.proxie = proxie # 是否使用代理 self.content = content # 回调最少字节 self.lupros = lupros self.args = (lupros[0],*lupros[1]) self.kw = copy(lupros[2]) if (not lupro.Proxies) and self.proxie: lupro.Proxies = get_proxies() assert (not self.proxie) or (self.proxie and lupro.Proxies),'`Proxies` cannot be empty!' if not 'headers' in self.kw: self.kw['headers'] = {'User-Agent' : get_header()} elif not 'User-Agent' in self.kw['headers']: self.kw['headers'].update({'User-Agent' : get_header()}) if lupro.VERIFY_PROXIES and self.proxie and (not lupro.IS_AGENT_VERIFIED): print(logging('开始验证代理!')) t1 = datetime.now() self.authentication() lupro.IS_AGENT_VERIFIED = True print(logging(f'用时{datetime.now() - t1}')) print(logging(f"高质量代理:{len(lupro.Proxies)}个!")) if (not 'proxies' in self.kw) and self.proxie: self.proxie = random.choice(lupro.Proxies) self.kw['proxies'] = {'http': f"//{self.proxie}"} # 验证代理 def authentication(self) -> None: '''`authentication` 为快速代理验证.''' auth_proxies = [gevent.spawn(verify_proxies, i) for i in lupro.Proxies] gevent.joinall(auth_proxies) lupro.Proxies = [i.value for i in auth_proxies if i.value] # 更新字典 def renew_proxie(self): '''lupro实例 更新字典''' lupro.Proxies.remove(self.proxie) self.proxie = random.choice(lupro.Proxies) if lupro.__general__.__name__ == 'requests': self.kw['proxies'] = {'http': f"//{self.proxie}"} elif lupro.__general__.__name__ == 'httpx': self.kw['proxies'] = {'http://': f"http://{self.proxie}"} # 请求方法 lupro 所有请求 @endurance def task(self): '''请求方法 lupro 所有请求接口''' def ask(self): if self.faultolt <= 0: print(f'{self.filename} failed.') return None print(logging(f"{self.filename} {self.proxie} -----> 开始请求!")) try: res = getattr(engine, lupro.__general__.__name__)(self) except: if self.proxie and self.proxie in lupro.Proxies: self.renew_proxie() self.faultolt -= 1 print(logging(f"{self.filename} -----> 更新字典中!{self.faultolt}")) return ask(self) if not res.status_code == 200: return ask(self) if len(res.content) < self.content: return ask(self) print(logging(f"{self.filename} {len(res.content)} -----> 请求结束!")) return res return ask(self) # xpath 解析 def xpath_analysis(self, analytic : dict, auxiliary = original) -> dict: '''实例 xpath 解析方法. Args: `analytic` : `dict[str:str]` xpath解析字典 `auxiliary` : `function` 自定义解析处理 Returns: dict : 解析字典 ''' res = self.task() if not res: return {} return analyze.xpath(res, analytic, auxiliary) # json 解析 def json_analysis(self, analytic : dict, auxiliary = original) -> dict: '''实例 json 解析方法. Args: `analytic` : `dict[str:str]` xpath解析字典 `auxiliary` : `function` 自定义解析处理 Returns: dict : 解析字典 ''' res = self.task() if not res: return {} return analyze.json(res, analytic, auxiliary) # 正则 解析 def re_analysis(self, analytic : dict, auxiliary = original) -> dict: '''实例 正则 解析方法. Args: `analytic` : `dict`{`str`:`function`} 正则解析字典 `auxiliary` : `function` 自定义解析处理 Returns: dict : 解析字典 ''' res = self.task() if not res: return {} return analyze.re(res, analytic, auxiliary) # css 解析 def css_analysis(self, analytic : dict, auxiliary = original) -> dict: '''实例 css 解析方法. Args: `analytic` : `dict[str:str]` css解析字典 `auxiliary` : `function` 自定义解析处理 Returns: dict : 解析字典 ''' res = self.task() if not res: return {} return analyze.css(res, analytic, auxiliary) # 保存文件路径 def save_file(self) -> str: '''保存文件方法,如果 `filename` 不为绝对路径,则保存文件的路径为当前目录''' res = self.task() if not res: return '' if os.path.isabs(self.filename): path = os.path.split(self.filename)[0] else: path = os.path.join(self.onFile,os.path.split(self.filename)[0]) if not os.path.exists(path): os.makedirs(path) path = os.path.join(path,os.path.split(self.filename)[1]+f'.{self.format}') if res.apparent_encoding == None: res.encoding = 'utf-8' else: res.encoding = res.apparent_encoding with open(path,mode='wb') as f: f.write(res.content) return path def __repr__(self) -> str: return f"<{__name__}.lupro({lupro.__general__.__name__}) object {self.filename}>" def __reprs__(self) -> dict: ''' `__reprs__` 为 `lupro` 实例化参数''' return {'filename':self.filename ,'format':self.format , 'proxie':self.proxie, 'faultolt':self.faultolt, 'content' : self.content ,'lupros':self.lupros}
Class variables
var IS_AGENT_VERIFIED
var Proxies
var VERIFY_PROXIES
var onFile
Methods
def authentication(self) ‑> NoneType
-
authentication
为快速代理验证.Expand source code
def authentication(self) -> None: '''`authentication` 为快速代理验证.''' auth_proxies = [gevent.spawn(verify_proxies, i) for i in lupro.Proxies] gevent.joinall(auth_proxies) lupro.Proxies = [i.value for i in auth_proxies if i.value]
def css_analysis(self, analytic: dict, auxiliary=<function original>) ‑> dict
-
实例 css 解析方法.
Args
analytic
:dict[str:str]
css解析字典auxiliary
:function
自定义解析处理Returns
dict
- 解析字典
Expand source code
def css_analysis(self, analytic : dict, auxiliary = original) -> dict: '''实例 css 解析方法. Args: `analytic` : `dict[str:str]` css解析字典 `auxiliary` : `function` 自定义解析处理 Returns: dict : 解析字典 ''' res = self.task() if not res: return {} return analyze.css(res, analytic, auxiliary)
def json_analysis(self, analytic: dict, auxiliary=<function original>) ‑> dict
-
实例 json 解析方法.
Args
analytic
:dict[str:str]
xpath解析字典auxiliary
:function
自定义解析处理Returns
dict
- 解析字典
Expand source code
def json_analysis(self, analytic : dict, auxiliary = original) -> dict: '''实例 json 解析方法. Args: `analytic` : `dict[str:str]` xpath解析字典 `auxiliary` : `function` 自定义解析处理 Returns: dict : 解析字典 ''' res = self.task() if not res: return {} return analyze.json(res, analytic, auxiliary)
def re_analysis(self, analytic: dict, auxiliary=<function original>) ‑> dict
-
实例 正则 解析方法.
Args
analytic
:dict
{str
:function
} 正则解析字典auxiliary
:function
自定义解析处理Returns
dict
- 解析字典
Expand source code
def re_analysis(self, analytic : dict, auxiliary = original) -> dict: '''实例 正则 解析方法. Args: `analytic` : `dict`{`str`:`function`} 正则解析字典 `auxiliary` : `function` 自定义解析处理 Returns: dict : 解析字典 ''' res = self.task() if not res: return {} return analyze.re(res, analytic, auxiliary)
def renew_proxie(self)
-
lupro实例 更新字典
Expand source code
def renew_proxie(self): '''lupro实例 更新字典''' lupro.Proxies.remove(self.proxie) self.proxie = random.choice(lupro.Proxies) if lupro.__general__.__name__ == 'requests': self.kw['proxies'] = {'http': f"//{self.proxie}"} elif lupro.__general__.__name__ == 'httpx': self.kw['proxies'] = {'http://': f"http://{self.proxie}"}
def save_file(self) ‑> str
-
保存文件方法,如果
filename
不为绝对路径,则保存文件的路径为当前目录Expand source code
def save_file(self) -> str: '''保存文件方法,如果 `filename` 不为绝对路径,则保存文件的路径为当前目录''' res = self.task() if not res: return '' if os.path.isabs(self.filename): path = os.path.split(self.filename)[0] else: path = os.path.join(self.onFile,os.path.split(self.filename)[0]) if not os.path.exists(path): os.makedirs(path) path = os.path.join(path,os.path.split(self.filename)[1]+f'.{self.format}') if res.apparent_encoding == None: res.encoding = 'utf-8' else: res.encoding = res.apparent_encoding with open(path,mode='wb') as f: f.write(res.content) return path
def task(self)
-
请求方法 lupro 所有请求接口
Expand source code
@endurance def task(self): '''请求方法 lupro 所有请求接口''' def ask(self): if self.faultolt <= 0: print(f'{self.filename} failed.') return None print(logging(f"{self.filename} {self.proxie} -----> 开始请求!")) try: res = getattr(engine, lupro.__general__.__name__)(self) except: if self.proxie and self.proxie in lupro.Proxies: self.renew_proxie() self.faultolt -= 1 print(logging(f"{self.filename} -----> 更新字典中!{self.faultolt}")) return ask(self) if not res.status_code == 200: return ask(self) if len(res.content) < self.content: return ask(self) print(logging(f"{self.filename} {len(res.content)} -----> 请求结束!")) return res return ask(self)
def xpath_analysis(self, analytic: dict, auxiliary=<function original>) ‑> dict
-
实例 xpath 解析方法.
Args
analytic
:dict[str:str]
xpath解析字典auxiliary
:function
自定义解析处理Returns
dict
- 解析字典
Expand source code
def xpath_analysis(self, analytic : dict, auxiliary = original) -> dict: '''实例 xpath 解析方法. Args: `analytic` : `dict[str:str]` xpath解析字典 `auxiliary` : `function` 自定义解析处理 Returns: dict : 解析字典 ''' res = self.task() if not res: return {} return analyze.xpath(res, analytic, auxiliary)
class lupros
-
HTTP参数引擎
辅助字典生成器Expand source code
class lupros(metaclass = inherit): '''`HTTP参数引擎` 辅助字典生成器''' kernel = {'requests': requests, 'httpx' : httpx} __general__ = kernel[HTTP_ENGINE.__name__]
Class variables
var kernel
class persistence
-
对象持久化控制器
Expand source code
class persistence(): '''对象持久化控制器''' '''是否启用对象持久化''' ENABLED = PERSISTENCE_ENABLED # 对象持久化元类 class kernelk(type): '''对象持久化元类''' pass # shelve内核 class shelve(): '''shelve内核''' '''对象持久化存储路径''' dbfile = PERSISTENCE_PATH @classmethod def add(cls, key : str, value : Any) -> bool: '''持久化一个新对象 Args: `cls` : `persistence.shelve` 类对象 `key` : `str` 新增对象的键 `value` : `Any` 新增对象的值 Returns: `bool` : True ''' with shelve.open(persistence.shelve.dbfile) as f: f[key] = value return True @classmethod def put(cls): '''持久化一个新对象 Args: `cls` : `persistence.shelve` 类对象 Returns: `shelve` : 当前路径shelve对象字典 ''' with shelve.open(persistence.shelve.dbfile) as f: res = {i:j for i,j in f.items()} return res # ZODB内核 class ZODB(): '''ZODB内核''' pass
Class variables
var ENABLED
var ZODB
-
ZODB内核
var kernelk
-
对象持久化元类
var shelve
-
shelve内核