Package lupro

Lupro Library ~~~~~~~~~~~~~~~~~~~~~

Lupro is an Asynchronous HTTP library, written in Python, It is fully compatible with requests. usage:

  • 兼容requests

from lupro import lupro r = lupro.get('https://www.python.org') r.status_code 200

其它 lupro.api 请参考 https://github.com/luxuncang/lupro.

:copyright: (c) 2021 by ShengXin Lu.

Expand source code
'''
Lupro Library
~~~~~~~~~~~~~~~~~~~~~

Lupro is an Asynchronous HTTP library, written in Python, It is fully compatible with `requests`.
usage:

* `兼容requests`

   >>> from lupro import lupro
   >>> r = lupro.get('https://www.python.org')
   >>> r.status_code
   200

其它 `lupro.api` 请参考 <https://github.com/luxuncang/lupro>.

:copyright: (c) 2021 by ShengXin Lu.
'''

from gevent import monkey
monkey.patch_all()  # 先引用
from .hooks import lupros
from .controller import persistence, batch
from .HTTPengine import lupro, analyze
from .api import generator, Batchsubmission, BulkDownload, xpath_Batchanalysis, json_Batchanalysis, re_Batchanalysis, Batchanalysis, async_lupro
from .__version__ import __title__, __description__, __url__, __version__
from .__version__ import __author__, __author_email__, __license__

__all__ = [
   'lupro',    # lupro 控制台 兼容HTTP客服端
   'lupros',   # lupros 请求参数
   'batch',    # 批量任务控制器
   'analyze',  # 解析器
   'persistence', # 对象持久化控制器
   'generator', 
   'Batchsubmission', 
   'BulkDownload', 
   'xpath_Batchanalysis', 
   'json_Batchanalysis', 
   're_Batchanalysis', 
   'Batchanalysis', 
   'async_lupro' # 极简批量请求
]

'''
v1.0.4
* 完全兼容 `requests` 一切操作,只需 `from lupro import lupro as requests` 即可不更改一行代码
* 新增原生异步 `requests`请求 只需 `async_lupro([lupros.get('https://www.python.org')]*10)`
* 标准化函数描述

v1.0.5
* 修复了 `lupro.lupro` 解析方法判断的失误
* 修复了 移除无效代理的方式

v1.0.6 v1.0.7
* 修复了不启用代理且请求错误时的,更新字典的方式
* 文件结构优化
* 新增对象持久化(shelve)
* 新增批量任务自省,冷重启(atexit)

v1.0.8
* 改变了 response.encoding 为空时的编码推测(chardet)
* 优化了html解析器的接口,`response` 类型可直接通过 `class analyze` 和 `class batch` 进行解析
* 新增 css解析器(parsel)
* 新增 httpx 客服端(httpx)
'''

Sub-modules

lupro.HTTPengine

lupro HTTP

lupro.analyzesController

response解析器

lupro.api

lupro api

lupro.config

lurpo 全局配置

lupro.controller

lupro 内部控制器

lupro.hooks

HTTP 请求钩子

lupro.publictool

lupro 工具箱

lupro.typing

自定义类型注释

lupro.useragent

UA池

Functions

def Batchanalysis(mold: str, generator: Union[ForwardRef('list[lupro]'), ForwardRef('list[Response]')], analytic: dict, auxiliary=<function original>) ‑> list

lupro批量解析

Args

mold : str 解析方法 generator() : Union[list[lupro], list[Response]] lupro实例列表 或 Response实例列表 analytic : dict 解析字典 auxiliary : function 自定义解析处理

Returns

list[dict]
解析列表
Expand source code
def Batchanalysis(mold : str ,generator : Union["list[lupro]", "list[Response]"], analytic : dict, auxiliary = original) -> list:
    '''lupro批量解析

    Args:
        `mold` : `str` 解析方法
        `generator` : `Union[list[lupro], list[Response]]` lupro实例列表 或 Response实例列表
        `analytic` : `dict` 解析字典
        `auxiliary` : `function` 自定义解析处理

    Returns:
        list[dict] : 解析列表
    '''
    if mold == 'xpath':
        return xpath_Batchanalysis(generator, analytic, auxiliary)
    elif mold == 'json':
        return json_Batchanalysis(generator, analytic, auxiliary)
    elif mold == 're':
        return re_Batchanalysis(generator, analytic, auxiliary)
    else:
        raise TypeError('No corresponding parsing method!')
def Batchsubmission(generator) ‑> list

通过实例列表的批量请求

Args: generator() : list[lupro] lupro实例列表

Returns

list
Response列表
Expand source code
def Batchsubmission(generator) -> list:
    '''通过实例列表的批量请求
    
    Args:
        `generator` : `list[lupro]` lupro实例列表
    
    Returns:
        list : Response列表
    '''
    a = [gevent.spawn(i.task,) for i in generator]
    gevent.joinall(a)
    return ([i.value for i in a])
def BulkDownload(generator) ‑> list

通过实例列表的批量下载

Args: generator() : list[lupro] lupro实例列表

Returns

list
path 列表
Expand source code
def BulkDownload(generator) -> list:
    '''通过实例列表的批量下载
    
    Args:
        `generator` : `list[lupro]` lupro实例列表
    
    Returns:
        list : path 列表
    '''
    a = [gevent.spawn(i.save_file,) for i in generator]
    gevent.joinall(a)
    return ([i.value for i in a])
def async_lupro(generator: list[lupros]) ‑> list

原生异步requests请求

Args

generator() : list[lupros] lupros实例列表

Returns

list[Response]
Response列表
Expand source code
def async_lupro(generator : "list[lupros]") -> list:
    '''原生异步`requests`请求

    Args:
        `generator` : `list[lupros]` lupros实例列表
    
    Returns:
        list[Response] : Response列表
    '''

    a = [gevent.spawn(HTTP_ENGINE.request,i[0],*i[1],**i[2]) for i in generator]
    gevent.joinall(a)
    return ([i.value for i in a])
def generator(instantiation: lupro, url: list, filenameNo: list = []) ‑> list

实例化生成器

Args

instantiation : lupro lupro模板实例 url : list 链接表 filenameNo : list filename 序列且此序列会继承 instantiation.filename

Returns

list
lupro实例列表
Expand source code
def generator(instantiation : lupro, url : list, filenameNo : list = []) -> list:
    '''实例化生成器

    Args:
        `instantiation` : `lupro` lupro模板实例
        `url` : `list` 链接表
        `filenameNo` : `list` filename 序列且此序列会继承 `instantiation.filename` 

    Returns:
        list : lupro实例列表
    '''
    if not filenameNo:
        filenameNo = range(len(url))
    else:
        if not len(url)==len(filenameNo):
            raise ValueError('"url" needs to be consistent with "FileNameno"!')
    repr = instantiation.__reprs__()
    res = []
    for i,j in enumerate(url):
        res.append(lupro(**requests_dict(repr,j,filenameNo[i])))
    return res
def json_Batchanalysis(generator: Union[ForwardRef('list[lupro]'), ForwardRef('list[Response]')], analytic: dict, auxiliary=<function original>) ‑> list

json批量解析器 dtanys\>

Args

generator() : Union[list[lupro], list[Response]] lupro实例列表 或 Response实例列表 analytic : dict 解析字典 auxiliary : function 自定义解析处理

Returns

list[dict]
解析列表
Expand source code
def json_Batchanalysis(generator : Union["list[lupro]", "list[Response]"], analytic : dict, auxiliary = original) -> list:
    '''json批量解析器 <json解析器为 `dtanys`>

    Args:
        `generator` : `Union[list[lupro], list[Response]]` lupro实例列表 或 Response实例列表
        `analytic` : `dict` 解析字典
        `auxiliary` : `function` 自定义解析处理
    
    Returns:
        list[dict] : 解析列表
    '''
    if isinstance(generator[0], lupro):
        a = [gevent.spawn(i.json_analysis, analytic, auxiliary) for i in generator]
        gevent.joinall(a)
        return ([i.value for i in a])
    else:
        return [analyze.json(i, analytic, auxiliary) for i in generator]
def re_Batchanalysis(generator: Union[ForwardRef('list[lupro]'), ForwardRef('list[Response]')], analytic: dict, auxiliary=<function original>) ‑> list

正则解析器

Args

generator() : Union[list[lupro], list[Response]] lupro实例列表 或 Response实例列表 analytic : dict{str:function} 正则解析字典 auxiliary : function 自定义解析处理

Returns

list[dict]
解析列表
Expand source code
def re_Batchanalysis(generator : Union["list[lupro]", "list[Response]"], analytic : dict, auxiliary = original) -> list:
    '''正则解析器

    Args:
        `generator` : `Union[list[lupro], list[Response]]` lupro实例列表 或 Response实例列表
        `analytic` : `dict`{`str`:`function`} 正则解析字典 
        `auxiliary` : `function` 自定义解析处理
    
    Returns:
        list[dict] : 解析列表
    '''
    if isinstance(generator[0], lupro):
        a = [gevent.spawn(i.re_analysis, analytic, auxiliary) for i in generator]
        gevent.joinall(a)
        return ([i.value for i in a])
    else:
        return [analyze.re(i, analytic, auxiliary) for i in generator]
def xpath_Batchanalysis(generator: Union[ForwardRef('list[lupro]'), ForwardRef('list[Response]')], analytic: dict, auxiliary=<function original>) ‑> list

xpath批量解析器

Args

generator() : Union[list[lupro], list[Response]] lupro实例列表 或 Response实例列表 analytic : dict 解析字典 auxiliary : function 自定义解析处理

Returns

list[dict]
解析列表
Expand source code
def xpath_Batchanalysis(generator : Union["list[lupro]", "list[Response]"], analytic : dict, auxiliary = original) -> list:
    ''' xpath批量解析器

    Args:
        `generator` : `Union[list[lupro], list[Response]]` lupro实例列表 或 Response实例列表
        `analytic` : `dict` 解析字典
        `auxiliary` : `function` 自定义解析处理
        
    Returns:
        list[dict] : 解析列表
    '''
    if isinstance(generator[0], lupro):
        a = [gevent.spawn(i.xpath_analysis, analytic, auxiliary) for i in generator]
        gevent.joinall(a)
        return ([i.value for i in a])
    else:
        return [analyze.xpath(i, analytic, auxiliary) for i in generator]

Classes

class analyze

response 解析控制器

Expand source code
class analyze():
    '''`response` 解析控制器'''

    @staticmethod
    def xpath(response, analytic : dict, auxiliary = original):
        ''' xpath 解析方法.

        Args:
            `response` : `response` response响应
            `analytic` : `dict[str:str]` xpath解析字典
            `auxiliary` : `function` 自定义解析处理
        
        Returns:
            dict : 解析字典
        '''
        html = etree.HTML(responsecoding(response))
        res = {}
        for i, j in analytic.items():
            res[i] = [auxiliary(r) for r in html.xpath(j)]
        return res

    @staticmethod
    def json(response, analytic : dict, auxiliary = original):
        ''' json 解析方法.

        Args:
            `response` : `response` response响应
            `analytic` : `dict[str:str]` json解析字典
            `auxiliary` : `function` 自定义解析处理
        
        Returns:
            dict : 解析字典
        '''
        if response.apparent_encoding == None:
            response.encoding = 'utf-8'
        else:
            response.encoding = response.apparent_encoding
        reDict = {}
        for i,j in analytic.items():
            reDict[i] = auxiliary(XDict(response.json(),j).edict())
        return reDict

    @staticmethod
    def re(response, analytic : dict, auxiliary = original):
        ''' re 解析方法.

        Args:
            `response` : `response` response响应
            `analytic` : `dict[str:str]` re解析字典
            `auxiliary` : `function` 自定义解析处理
        
        Returns:
            dict : 解析字典
        '''
        res = []
        html = etree.HTML(responsecoding(response))
        res = {}
        for i, j in analytic.items():
            res[i]=[auxiliary(r) for r in j(html)]
        return res

    @staticmethod
    def css(response, analytic : dict, auxiliary = original):
        ''' css 解析方法.

        Args:
            `response` : `response` response响应
            `analytic` : `dict[str:str]` css解析字典
            `auxiliary` : `function` 自定义解析处理
        
        Returns:
            dict : 解析字典
        '''
        res = []
        html = parsel.Selector(responsecoding(response))
        res = {}
        for i, j in analytic.items():
            res[i] = [auxiliary(r) for r in html.css(j).extract()]
        return res        

Static methods

def css(response, analytic: dict, auxiliary=<function original>)

css 解析方法.

Args

response : response response响应 analytic : dict[str:str] css解析字典 auxiliary : function 自定义解析处理

Returns

dict
解析字典
Expand source code
@staticmethod
def css(response, analytic : dict, auxiliary = original):
    ''' css 解析方法.

    Args:
        `response` : `response` response响应
        `analytic` : `dict[str:str]` css解析字典
        `auxiliary` : `function` 自定义解析处理
    
    Returns:
        dict : 解析字典
    '''
    res = []
    html = parsel.Selector(responsecoding(response))
    res = {}
    for i, j in analytic.items():
        res[i] = [auxiliary(r) for r in html.css(j).extract()]
    return res        
def json(response, analytic: dict, auxiliary=<function original>)

json 解析方法.

Args

response : response response响应 analytic : dict[str:str] json解析字典 auxiliary : function 自定义解析处理

Returns

dict
解析字典
Expand source code
@staticmethod
def json(response, analytic : dict, auxiliary = original):
    ''' json 解析方法.

    Args:
        `response` : `response` response响应
        `analytic` : `dict[str:str]` json解析字典
        `auxiliary` : `function` 自定义解析处理
    
    Returns:
        dict : 解析字典
    '''
    if response.apparent_encoding == None:
        response.encoding = 'utf-8'
    else:
        response.encoding = response.apparent_encoding
    reDict = {}
    for i,j in analytic.items():
        reDict[i] = auxiliary(XDict(response.json(),j).edict())
    return reDict
def re(response, analytic: dict, auxiliary=<function original>)

re 解析方法.

Args

response : response response响应 analytic : dict[str:str] re解析字典 auxiliary : function 自定义解析处理

Returns

dict
解析字典
Expand source code
@staticmethod
def re(response, analytic : dict, auxiliary = original):
    ''' re 解析方法.

    Args:
        `response` : `response` response响应
        `analytic` : `dict[str:str]` re解析字典
        `auxiliary` : `function` 自定义解析处理
    
    Returns:
        dict : 解析字典
    '''
    res = []
    html = etree.HTML(responsecoding(response))
    res = {}
    for i, j in analytic.items():
        res[i]=[auxiliary(r) for r in j(html)]
    return res
def xpath(response, analytic: dict, auxiliary=<function original>)

xpath 解析方法.

Args

response : response response响应 analytic : dict[str:str] xpath解析字典 auxiliary : function 自定义解析处理

Returns

dict
解析字典
Expand source code
@staticmethod
def xpath(response, analytic : dict, auxiliary = original):
    ''' xpath 解析方法.

    Args:
        `response` : `response` response响应
        `analytic` : `dict[str:str]` xpath解析字典
        `auxiliary` : `function` 自定义解析处理
    
    Returns:
        dict : 解析字典
    '''
    html = etree.HTML(responsecoding(response))
    res = {}
    for i, j in analytic.items():
        res[i] = [auxiliary(r) for r in html.xpath(j)]
    return res
class batch (instantiation, url: list, filenameNo: list = [])

批量任务控制器

实例化生成器

Args

instantiation : lupro lupro模板实例 url : list 链接表 filenameNo : list filename 序列且此序列会继承 instantiation.filename

Returns

None

Expand source code
class batch():
    '''批量任务控制器'''

    def __init__(self, instantiation, url : list, filenameNo : list = []) -> list:
        '''实例化生成器

        Args:
            `instantiation` : `lupro` lupro模板实例
            `url` : `list` 链接表
            `filenameNo` : `list` filename 序列且此序列会继承 `instantiation.filename` 

        Returns:
            None
        '''
        self.name = instantiation.filename
        self.generator = generator(instantiation, url, filenameNo)
        self.filenameNo = [i.filename for i in self.generator]
        persistence.shelve.add(self.name, {})
    
    # 任务自省
    @reconfig(config = persistence.ENABLED)
    def province(self):
        '''任务自省'''
        fail, success, task = [], [], []
        dbdict = persistence.shelve.put()
        for i,j in enumerate(self.filenameNo):
            if not j in dbdict:
                fail.append(j)
                task.append(self.generator[i])
                success.append('-') # not perfect
            else:
                success.append(dbdict[j])
        persistence.shelve.add(self.name, {'success' : success, 'task' : task, 'filenameNo' : self.filenameNo})

        print(f"批量任务 {self.name} >> 一共有 {len(self.filenameNo)} 次请求", f"失败了 {len(fail)} 次" ,sep = '\n')
        if not len(fail)==0:
            res = '\n' + '\n'.join(fail)
            print(f"分别是: {res}")

    # 冷重启
    @staticmethod
    def coldheavy(filename) -> None:
        '''冷重启

        Args:
            `filename` : `str` 冷重启对象的 `self.name`
            
        Returns:
            None
        '''
        Batchsubmission(persistence.shelve.put()[filename]['task'])

    # 任务回调
    @staticmethod
    def callback(filename) -> list:
        '''任务反持久化

        Args:
            `filename` : `str` 冷重启对象的 `self.name`
            
        Returns:
            list task列表
        '''
        container = persistence.shelve.put()
        return {i:container.get(i) for i in container[filename]['filenameNo']}
        
    # 批量请求
    @abnormal
    def Batchsubmission(self) -> list:
        '''批量请求'''
        atexit.register(batch.province, self)
        return Batchsubmission(self.generator)
    
    # 批量下载
    @abnormal
    def BulkDownload(self) -> list:
        '''批量下载'''
        atexit.register(batch.province, self)
        return BulkDownload(self.generator)
    
    # 批量解析
    @abnormal
    def Batchanalysis(self, mold : str , analytic : dict, auxiliary = original) -> list:
        '''批量解析'''
        atexit.register(batch.province, self)
        return Batchanalysis(mold, self.generator, analytic, auxiliary)
    
    # xpath 批量解析
    @abnormal
    def xpath_Batchanalysis(self, analytic, auxiliary = original) -> list:
        '''xpath 批量解析'''
        atexit.register(batch.province, self)
        return xpath_Batchanalysis(self.generator, analytic, auxiliary)
    
    # json 批量解析
    @abnormal
    def json_Batchanalysis(self, analytic, auxiliary = original) -> list:
        '''json 批量解析'''
        atexit.register(batch.province, self)
        return json_Batchanalysis(self, analytic, auxiliary)    
    
    # 正则 批量解析
    @abnormal
    def re_Batchanalysis(self, analytic, auxiliary = original) -> list:
        '''正则 批量解析'''
        atexit.register(batch.province, self)
        return re_Batchanalysis(self.generator, analytic, auxiliary)

Static methods

def callback(filename) ‑> list

任务反持久化

Args: filename : str 冷重启对象的 self.name

Returns

list task列表

Expand source code
@staticmethod
def callback(filename) -> list:
    '''任务反持久化

    Args:
        `filename` : `str` 冷重启对象的 `self.name`
        
    Returns:
        list task列表
    '''
    container = persistence.shelve.put()
    return {i:container.get(i) for i in container[filename]['filenameNo']}
def coldheavy(filename) ‑> NoneType

冷重启

Args: filename : str 冷重启对象的 self.name

Returns

None

Expand source code
@staticmethod
def coldheavy(filename) -> None:
    '''冷重启

    Args:
        `filename` : `str` 冷重启对象的 `self.name`
        
    Returns:
        None
    '''
    Batchsubmission(persistence.shelve.put()[filename]['task'])

Methods

def Batchanalysis(self, mold: str, analytic: dict, auxiliary=<function original>) ‑> list

批量解析

Expand source code
@abnormal
def Batchanalysis(self, mold : str , analytic : dict, auxiliary = original) -> list:
    '''批量解析'''
    atexit.register(batch.province, self)
    return Batchanalysis(mold, self.generator, analytic, auxiliary)
def Batchsubmission(self) ‑> list

批量请求

Expand source code
@abnormal
def Batchsubmission(self) -> list:
    '''批量请求'''
    atexit.register(batch.province, self)
    return Batchsubmission(self.generator)
def BulkDownload(self) ‑> list

批量下载

Expand source code
@abnormal
def BulkDownload(self) -> list:
    '''批量下载'''
    atexit.register(batch.province, self)
    return BulkDownload(self.generator)
def json_Batchanalysis(self, analytic, auxiliary=<function original>) ‑> list

json 批量解析

Expand source code
@abnormal
def json_Batchanalysis(self, analytic, auxiliary = original) -> list:
    '''json 批量解析'''
    atexit.register(batch.province, self)
    return json_Batchanalysis(self, analytic, auxiliary)    
def province(self)

任务自省

Expand source code
@reconfig(config = persistence.ENABLED)
def province(self):
    '''任务自省'''
    fail, success, task = [], [], []
    dbdict = persistence.shelve.put()
    for i,j in enumerate(self.filenameNo):
        if not j in dbdict:
            fail.append(j)
            task.append(self.generator[i])
            success.append('-') # not perfect
        else:
            success.append(dbdict[j])
    persistence.shelve.add(self.name, {'success' : success, 'task' : task, 'filenameNo' : self.filenameNo})

    print(f"批量任务 {self.name} >> 一共有 {len(self.filenameNo)} 次请求", f"失败了 {len(fail)} 次" ,sep = '\n')
    if not len(fail)==0:
        res = '\n' + '\n'.join(fail)
        print(f"分别是: {res}")
def re_Batchanalysis(self, analytic, auxiliary=<function original>) ‑> list

正则 批量解析

Expand source code
@abnormal
def re_Batchanalysis(self, analytic, auxiliary = original) -> list:
    '''正则 批量解析'''
    atexit.register(batch.province, self)
    return re_Batchanalysis(self.generator, analytic, auxiliary)
def xpath_Batchanalysis(self, analytic, auxiliary=<function original>) ‑> list

xpath 批量解析

Expand source code
@abnormal
def xpath_Batchanalysis(self, analytic, auxiliary = original) -> list:
    '''xpath 批量解析'''
    atexit.register(batch.province, self)
    return xpath_Batchanalysis(self.generator, analytic, auxiliary)
class lupro (filename: str, lupros: lupros, proxie: bool = False, format: str = 'html', content: int = 200, faultolt: int = 10)

lupro 引擎基类

初始化 lupro 实例,一个实例代表一个请求或任务.

Args

filename : str 文件路径或请求名称 推荐使用路径命名 lupros : lupros requests参数字典 proxie : bool 是否使用代理 format : str 保存文件格式 content : int 回调最少字节 faultolt : int 可重试次数

Returns

None

Expand source code
class lupro(metaclass = inherit):
    '''`lupro` 引擎基类'''

    # 兼容 `requests`
    __general__ = HTTP_ENGINE

    # 当前文件夹
    onFile = RUNFILE

    # 代理池
    Proxies = PROXIES

    # 是否验证代理池
    VERIFY_PROXIES = VERIFY_PROXIES

    # 是否已验证代理
    IS_AGENT_VERIFIED = False

    def __init__(self, filename : str, lupros : lupros, proxie : bool = False, format : str = 'html', content : int = 200,faultolt : int = 10):
        ''' 初始化 `lupro` 实例,一个实例代表一个请求或任务.

        Args:
            `filename` : `str` 文件路径或请求名称 推荐使用路径命名
            `lupros` : `lupros` requests参数字典
            `proxie` : `bool` 是否使用代理
            `format` : `str` 保存文件格式
            `content` : `int` 回调最少字节
            `faultolt` : `int` 可重试次数
        
        Returns:
            None
        '''
        self.filename = filename    # 文件路径或请求名称 推荐使用路径命名
        self.format = format        # 保存文件格式
        self.faultolt = faultolt    # 可重试次数
        self.proxie = proxie        # 是否使用代理
        self.content = content      # 回调最少字节
        self.lupros = lupros
        self.args = (lupros[0],*lupros[1])
        self.kw = copy(lupros[2])

        if (not lupro.Proxies) and self.proxie:
            lupro.Proxies = get_proxies()

        assert (not self.proxie) or (self.proxie and lupro.Proxies),'`Proxies` cannot be empty!'

        if not 'headers' in self.kw:
            self.kw['headers'] = {'User-Agent' : get_header()}
        elif not 'User-Agent' in self.kw['headers']:
            self.kw['headers'].update({'User-Agent' : get_header()})
        
        if lupro.VERIFY_PROXIES and self.proxie and (not lupro.IS_AGENT_VERIFIED):
            print(logging('开始验证代理!'))
            t1 = datetime.now()
            self.authentication()
            lupro.IS_AGENT_VERIFIED = True
            print(logging(f'用时{datetime.now() - t1}'))
            print(logging(f"高质量代理:{len(lupro.Proxies)}个!"))

        if (not 'proxies' in self.kw) and self.proxie:
            self.proxie = random.choice(lupro.Proxies)
            self.kw['proxies'] = {'http': f"//{self.proxie}"}
    
    # 验证代理
    def authentication(self) -> None:
        '''`authentication` 为快速代理验证.'''

        auth_proxies = [gevent.spawn(verify_proxies, i) for i in lupro.Proxies]
        gevent.joinall(auth_proxies)
        lupro.Proxies = [i.value for i in auth_proxies if i.value]

    # 更新字典
    def renew_proxie(self):
        '''lupro实例 更新字典'''
        lupro.Proxies.remove(self.proxie)
        self.proxie = random.choice(lupro.Proxies)
        if lupro.__general__.__name__ == 'requests':
            self.kw['proxies'] = {'http': f"//{self.proxie}"}
        elif lupro.__general__.__name__ == 'httpx':
            self.kw['proxies'] = {'http://': f"http://{self.proxie}"}

    # 请求方法 lupro 所有请求
    @endurance
    def task(self):
        '''请求方法 lupro 所有请求接口'''
        def ask(self):
            if self.faultolt <= 0:
                print(f'{self.filename} failed.')
                return None
            print(logging(f"{self.filename} {self.proxie} -----> 开始请求!"))
            try:
                res = getattr(engine, lupro.__general__.__name__)(self)
            except:
                if self.proxie and self.proxie in lupro.Proxies:
                    self.renew_proxie()
                self.faultolt -= 1
                print(logging(f"{self.filename} -----> 更新字典中!{self.faultolt}"))
                return ask(self)
            if not res.status_code == 200:
                return ask(self)
            if len(res.content) < self.content:
                return ask(self)
            print(logging(f"{self.filename} {len(res.content)} -----> 请求结束!")) 
            return res     
        return ask(self)
    
    # xpath 解析
    def xpath_analysis(self, analytic : dict, auxiliary = original) -> dict:
        '''实例 xpath 解析方法.

        Args:
            `analytic` : `dict[str:str]` xpath解析字典
            `auxiliary` : `function` 自定义解析处理
        
        Returns:
            dict : 解析字典
        '''
        res = self.task()
        if not res:
            return {}
        return analyze.xpath(res, analytic, auxiliary)

    # json 解析
    def json_analysis(self, analytic : dict, auxiliary = original) -> dict:
        '''实例 json 解析方法.

        Args:
            `analytic` : `dict[str:str]` xpath解析字典
            `auxiliary` : `function` 自定义解析处理
        
        Returns:
            dict : 解析字典
        '''
        res = self.task()
        if not res:
            return {}
        return analyze.json(res, analytic, auxiliary)

    # 正则 解析
    def re_analysis(self, analytic : dict, auxiliary = original) -> dict:
        '''实例 正则 解析方法.

        Args:
            `analytic` : `dict`{`str`:`function`} 正则解析字典 
            `auxiliary` : `function` 自定义解析处理
        
        Returns:
            dict : 解析字典
        '''
        res = self.task()
        if not res:
            return {}
        return analyze.re(res, analytic, auxiliary)

    # css 解析
    def css_analysis(self, analytic : dict, auxiliary = original) -> dict:
        '''实例 css 解析方法.

        Args:
            `analytic` : `dict[str:str]` css解析字典
            `auxiliary` : `function` 自定义解析处理
        
        Returns:
            dict : 解析字典
        '''
        res = self.task()
        if not res:
            return {}
        return analyze.css(res, analytic, auxiliary)

    # 保存文件路径
    def save_file(self) -> str:
        '''保存文件方法,如果 `filename` 不为绝对路径,则保存文件的路径为当前目录'''

        res = self.task()
        if not res:
            return ''
        if os.path.isabs(self.filename):
            path = os.path.split(self.filename)[0]
        else:
            path = os.path.join(self.onFile,os.path.split(self.filename)[0])
        if not os.path.exists(path):
            os.makedirs(path)
        path = os.path.join(path,os.path.split(self.filename)[1]+f'.{self.format}')
        if res.apparent_encoding == None:
            res.encoding = 'utf-8'
        else:
            res.encoding = res.apparent_encoding
        with open(path,mode='wb') as f:
            f.write(res.content)
        return path

    def __repr__(self) -> str:
        return f"<{__name__}.lupro({lupro.__general__.__name__}) object {self.filename}>"

    def __reprs__(self) -> dict:
        ''' `__reprs__` 为 `lupro` 实例化参数'''
        return {'filename':self.filename ,'format':self.format , 'proxie':self.proxie, 'faultolt':self.faultolt, 'content' : self.content ,'lupros':self.lupros}

Class variables

var IS_AGENT_VERIFIED
var Proxies
var VERIFY_PROXIES
var onFile

Methods

def authentication(self) ‑> NoneType

authentication 为快速代理验证.

Expand source code
def authentication(self) -> None:
    '''`authentication` 为快速代理验证.'''

    auth_proxies = [gevent.spawn(verify_proxies, i) for i in lupro.Proxies]
    gevent.joinall(auth_proxies)
    lupro.Proxies = [i.value for i in auth_proxies if i.value]
def css_analysis(self, analytic: dict, auxiliary=<function original>) ‑> dict

实例 css 解析方法.

Args

analytic : dict[str:str] css解析字典 auxiliary : function 自定义解析处理

Returns

dict
解析字典
Expand source code
def css_analysis(self, analytic : dict, auxiliary = original) -> dict:
    '''实例 css 解析方法.

    Args:
        `analytic` : `dict[str:str]` css解析字典
        `auxiliary` : `function` 自定义解析处理
    
    Returns:
        dict : 解析字典
    '''
    res = self.task()
    if not res:
        return {}
    return analyze.css(res, analytic, auxiliary)
def json_analysis(self, analytic: dict, auxiliary=<function original>) ‑> dict

实例 json 解析方法.

Args

analytic : dict[str:str] xpath解析字典 auxiliary : function 自定义解析处理

Returns

dict
解析字典
Expand source code
def json_analysis(self, analytic : dict, auxiliary = original) -> dict:
    '''实例 json 解析方法.

    Args:
        `analytic` : `dict[str:str]` xpath解析字典
        `auxiliary` : `function` 自定义解析处理
    
    Returns:
        dict : 解析字典
    '''
    res = self.task()
    if not res:
        return {}
    return analyze.json(res, analytic, auxiliary)
def re_analysis(self, analytic: dict, auxiliary=<function original>) ‑> dict

实例 正则 解析方法.

Args

analytic : dict{str:function} 正则解析字典 auxiliary : function 自定义解析处理

Returns

dict
解析字典
Expand source code
def re_analysis(self, analytic : dict, auxiliary = original) -> dict:
    '''实例 正则 解析方法.

    Args:
        `analytic` : `dict`{`str`:`function`} 正则解析字典 
        `auxiliary` : `function` 自定义解析处理
    
    Returns:
        dict : 解析字典
    '''
    res = self.task()
    if not res:
        return {}
    return analyze.re(res, analytic, auxiliary)
def renew_proxie(self)

lupro实例 更新字典

Expand source code
def renew_proxie(self):
    '''lupro实例 更新字典'''
    lupro.Proxies.remove(self.proxie)
    self.proxie = random.choice(lupro.Proxies)
    if lupro.__general__.__name__ == 'requests':
        self.kw['proxies'] = {'http': f"//{self.proxie}"}
    elif lupro.__general__.__name__ == 'httpx':
        self.kw['proxies'] = {'http://': f"http://{self.proxie}"}
def save_file(self) ‑> str

保存文件方法,如果 filename 不为绝对路径,则保存文件的路径为当前目录

Expand source code
def save_file(self) -> str:
    '''保存文件方法,如果 `filename` 不为绝对路径,则保存文件的路径为当前目录'''

    res = self.task()
    if not res:
        return ''
    if os.path.isabs(self.filename):
        path = os.path.split(self.filename)[0]
    else:
        path = os.path.join(self.onFile,os.path.split(self.filename)[0])
    if not os.path.exists(path):
        os.makedirs(path)
    path = os.path.join(path,os.path.split(self.filename)[1]+f'.{self.format}')
    if res.apparent_encoding == None:
        res.encoding = 'utf-8'
    else:
        res.encoding = res.apparent_encoding
    with open(path,mode='wb') as f:
        f.write(res.content)
    return path
def task(self)

请求方法 lupro 所有请求接口

Expand source code
@endurance
def task(self):
    '''请求方法 lupro 所有请求接口'''
    def ask(self):
        if self.faultolt <= 0:
            print(f'{self.filename} failed.')
            return None
        print(logging(f"{self.filename} {self.proxie} -----> 开始请求!"))
        try:
            res = getattr(engine, lupro.__general__.__name__)(self)
        except:
            if self.proxie and self.proxie in lupro.Proxies:
                self.renew_proxie()
            self.faultolt -= 1
            print(logging(f"{self.filename} -----> 更新字典中!{self.faultolt}"))
            return ask(self)
        if not res.status_code == 200:
            return ask(self)
        if len(res.content) < self.content:
            return ask(self)
        print(logging(f"{self.filename} {len(res.content)} -----> 请求结束!")) 
        return res     
    return ask(self)
def xpath_analysis(self, analytic: dict, auxiliary=<function original>) ‑> dict

实例 xpath 解析方法.

Args

analytic : dict[str:str] xpath解析字典 auxiliary : function 自定义解析处理

Returns

dict
解析字典
Expand source code
def xpath_analysis(self, analytic : dict, auxiliary = original) -> dict:
    '''实例 xpath 解析方法.

    Args:
        `analytic` : `dict[str:str]` xpath解析字典
        `auxiliary` : `function` 自定义解析处理
    
    Returns:
        dict : 解析字典
    '''
    res = self.task()
    if not res:
        return {}
    return analyze.xpath(res, analytic, auxiliary)
class lupros

HTTP参数引擎 辅助字典生成器

Expand source code
class lupros(metaclass = inherit):
    '''`HTTP参数引擎` 辅助字典生成器'''

    kernel = {'requests': requests, 'httpx' : httpx}

    __general__ = kernel[HTTP_ENGINE.__name__]

Class variables

var kernel
class persistence

对象持久化控制器

Expand source code
class persistence():
    '''对象持久化控制器'''

    '''是否启用对象持久化'''
    ENABLED = PERSISTENCE_ENABLED

    # 对象持久化元类
    class kernelk(type):
        '''对象持久化元类'''
        pass

    # shelve内核
    class shelve():
        '''shelve内核'''

        '''对象持久化存储路径'''
        dbfile = PERSISTENCE_PATH

        @classmethod
        def add(cls, key : str, value : Any) -> bool:
            '''持久化一个新对象

            Args:
                `cls` : `persistence.shelve` 类对象
                `key` : `str` 新增对象的键
                `value` : `Any` 新增对象的值
            
            Returns:
                `bool` : True
            '''

            with shelve.open(persistence.shelve.dbfile) as f:
                f[key] = value
            return True
        
        @classmethod
        def put(cls):
            '''持久化一个新对象

            Args:
                `cls` : `persistence.shelve` 类对象
            
            Returns:
                `shelve` : 当前路径shelve对象字典
            '''

            with shelve.open(persistence.shelve.dbfile) as f:
                res = {i:j for i,j in f.items()}
            return res

    # ZODB内核
    class ZODB():
        '''ZODB内核'''

        pass  

Class variables

var ENABLED
var ZODB

ZODB内核

var kernelk

对象持久化元类

var shelve

shelve内核