项目链接: https://github.com/jhao104/proxy_pool
爬虫代理IP池项目,主要功能为定时采集网上发布的免费代理验证入库,定时验证入库的代理保证代理的可用性,提供API和CLI两种使用方式。同时你也可以扩展代理源以增加代理池IP的质量和数量。
模块组成:获取功能、存储功能、校验功能、接口管理
:star:程序主要是启动了
startServer
的API接口服务、startScheduler
定时服务
startServer->runFlask
:是向外提供了通过proxyHandler来获得Redis中的proxy数据startScheduler->sche.add_task(__runProxyFetch)、sche.add_task(__runProxyCheck)
,
__runProxyFetch:proxy_fetcher.run()->proxy_queue->Checker("raw", proxy_queue)
获得各个代理网站的代理信息后,进行校验,校验成功则入库__runProxyCheck:proxy in proxy_handler.getAll()->proxy_queue->Checker("use", proxy_queue)
:通过proxy_handler拿到库里所有现存的数据后,进行有效性校验,无效的则删除,有效的则更新信息作为存储功能的接口proxyHandler,也是两个API服务与定时服务的中介。程序也是通过存储功能,将核心的两个功能:<u>定时抓取的proxy数据</u>与<u>提供proxy数据给用户使用</u>成功联系在了一起
通过元类实现单例模式:ConfigHandler,其可以在任意模块中以c = ConfigHandler()的形式获得,而不是
ConfigHandler.getInstance()
-
@LazyProperty
懒加载属性的装饰器: 只有用到时才会加载并将值注入到__dict__
、加载一次后值就不再变化、;讲解可见://www.greatytc.com/p/708dc26f9b92——描述符or修饰符实现class LazyProperty(object): # 在被注解类方法被解释器运行的时候就会创建LazyProperty实例并返回 def __init__(self, func): self.func = func """通过python描述符来实现""" def __get__(self, instance, owner): if instance is None: return self else: # 会将结果值通过setattr方法存入到instance对象实例的__dict__中 value = self.func(instance) setattr(instance, self.func.__name__, value) return value class ConfigHandler(withMetaclass(Singleton)): # 返回一个LazyProperty实例 @LazyProperty def serverHost(self): return os.environ.get("HOST", setting.HOST) c = ConfigHandler() # 会触发ConfigHandler.__dict__["serverHost"], 然后接而触发LazyProperty的__get__,value = self.func(instance)会得到真正serverHost函数的值后将其设置在ConfigHandler instance对象的__dict__中,由于对象的__dict__["serverHost"]=value优先级高于类的__dict__["serverHost"]=LazyProperty()对象,因此之后调用得到的是value结果 print(c.serverHost)
__get__
只有访问类属性的时候才会生效,这边是通过setattr将serverHost设置成了ConfigHandler的类属性 -
封装了一个请求工具类
WebRequest
:- 增加了异常处理的功能
- 增加了日志功能
- 请求头会得到随机UA
- 设置重试
-
使用click创建子命令:
- 得到一个click_group
CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) @click.group(context_settings=CONTEXT_SETTINGS) @click.version_option(version=VERSION) def cli(): """ProxyPool cli工具"""
- 指定group下的子命令
@cli.command(name="server") # 还可以设置参数: @click.option('--count', default=1, help='Number of greetings.') --> def server(count) def server(): """ 启动api服务 """ click.echo(BANNER) startServer() if __name__ == '__main__': cli()
然后通过bash脚本同时开启两个进程
#!/usr/bin/env bash python proxyPool.py server & python proxyPool.py schedule
-
DbClient DB工厂类
class DbClient(withMetaclass(Singleton)): def __init__(self, db_conn): self.parseDbConn(db_conn) self.__initDbClient() @classmethod def parseDbConn(cls, db_conn): db_conf = urlparse(db_conn) cls.db_type = db_conf.scheme.upper().strip() cls.db_host = db_conf.hostname cls.db_port = db_conf.port cls.db_user = db_conf.username cls.db_pwd = db_conf.password cls.db_name = db_conf.path[1:] return cls def __initDbClient(self): """ init DB Client :return: """ __type = None if "SSDB" == self.db_type: __type = "ssdbClient" elif "REDIS" == self.db_type: __type = "redisClient" else: pass assert __type, 'type error, Not support DB type: {}'.format(self.db_type) self.client = getattr(__import__(__type), "%sClient" % self.db_type.title())(host=self.db_host, port=self.db_port, username=self.db_user, password=self.db_pwd, db=self.db_name)
-
继承重写logging.logger,可选参数为
name, level=DEBUG, stream=True, file=True
,让每个功能函数都能生成单独的日志文件,并进行了可选控制。相比单例,日志精度更细,但也使用起来也更麻烦,需要考虑什么地方需要。
-
提供"扩展代理"接口
在ProxyFetcher类中添加自定义的获取代理的静态方法, 该方法需要以生成器(yield)形式返回
host:ip
格式的代理-
添加好方法后,修改setting.py文件中的
PROXY_FETCHER
项下添加自定义方法的名字:PROXY_FETCHER = [ "freeProxy01", "freeProxy02", # .... "freeProxyCustom1" # # 确保名字和你添加方法名字一致 ]
schedule
进程会每隔一段时间抓取一次代理,下次抓取时会自动识别调用你定义的方法。
实现方式:
self.log.info("ProxyFetch : start")
# 从配置中拿执行函数
for fetch_source in self.conf.fetchers:
# 判断ProxyFetcher中是否有定义、是否可调用
fetcher = getattr(ProxyFetcher, fetch_source, None)
if not fetcher:
self.log.error("ProxyFetch - {func}: class method not exists!".format(func=fetch_source))
continue
if not callable(fetcher):
self.log.error("ProxyFetch - {func}: must be class method".format(func=fetch_source))
continue
thread_list.append(_ThreadFetcher(fetch_source, proxy_dict))
for thread in thread_list:
thread.setDaemon(True)
thread.start()
for thread in thread_list:
thread.join()
self.log.info("ProxyFetch - all complete!")
-
Cpython(默认安装的都是Cpython)中Dict和list、tuple都是线程安全的
-
以装饰器的形式将过滤器将入到容器中
class ProxyValidator(withMetaclass(Singleton)): pre_validator = [] http_validator = [] https_validator = [] @classmethod def addPreValidator(cls, func): cls.pre_validator.append(func) return func # 实际上执行了 formatValidator=ProxyValidator.addPreValidator(formatValidator) # 由于addPreValidator返回了func, 所以formatValidator还是原来的addPreValidator, 但在类定义的时候ProxyValidator.pre_validator添加了formatValidator方法 @ProxyValidator.addPreValidator def formatValidator(proxy): """检查代理格式""" verify_regex = r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5}" _proxy = findall(verify_regex, proxy) return True if len(_proxy) == 1 and _proxy[0] == proxy else False class DoValidator(object): """ 校验执行器 """ @classmethod def validator(cls, proxy): """ 校验入口 Args: proxy: Proxy Object Returns: Proxy Object """ http_r = cls.httpValidator(proxy) https_r = False if not http_r else cls.httpsValidator(proxy) proxy.check_count += 1 proxy.last_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") proxy.last_status = True if http_r else False if http_r: if proxy.fail_count > 0: proxy.fail_count -= 1 proxy.https = True if https_r else False else: proxy.fail_count += 1 return proxy @classmethod def preValidator(cls, proxy): for func in ProxyValidator.pre_validator: if not func(proxy): return False return True
-
-
starter-banner:启动横幅
- reflection: No、adjustment: cewnter、Stretch: Yes、width: 80
- 还不错的font:
- 5lineoblique——好看
- banner3——清楚
- bell——抽象
- big——清晰
- bigchief——等高线版本、艺术
- block——块状
- bulbhead——可爱
- larry3d——立体3d
- ogre——清晰
- puffy——清晰+一点可爱
- slant——清晰+斜体
-
定时器框架apschedule配置:
scheduler = BlockingScheduler(logger=scheduler_log, timezone=timezone) scheduler.add_job(__runProxyCheck, 'interval', minutes=2, id="proxy_check", name="proxy检查") executors = { # job_defaults中的max_instances也受限于max_workers, 所以要大于max_instances;此外max_workers也决定了同时能处理几个同时发生的task 'default': {'type': 'threadpool', 'max_workers': 20}, 'processpool': ProcessPoolExecutor(max_workers=5) } job_defaults = { # 合并将所有这些错过的执行合并为一个, 默认为True。 如果是定时的存储任务的话,参数肯定不同,不能合并所以得手动设置False # 像本项目每隔一段时间抓取到的数据也不太一样,所以无法直接当作一次错误任务合并 'coalesce': False, # 默认情况下,每个作业只允许同时运行一个实例。这意味着,如果作业即将运行,但前一次运行尚未完成,则认为最近一次运行失败。通过在添加作业时使用关键字参数,可以设置调度程序允许同时运行的特定作业的最大实例数。默认为1 'max_instances': 10, # 框架会检查每个错过的执行时间,如果当前还在misfire_grace_time时间内,则会重新尝试执行任务,设高点就可以避免任务被漏掉执行。默认为1 # "misfire_grace_time": 5 该项目未使用,而是采用了多任务实例来规避任务错过执行==>即官方给出两种方案中的另一种。任务错过信息:Run time of job "say (trigger: interval[0:00:02])" was missed by 0:00:03.010383 } scheduler.configure(executors=executors, job_defaults=job_defaults, timezone=timezone) scheduler.start()
job_defaults参数含义见官方文档
注: 经过测试,在add_task中的func如果起了多个线程,其执行不受限于sche的配置
Python中如果只是使用全局变量则不需要用global声明(因为变量搜寻会由内往外),但是如果需要修改则需要用global声明,否则无法找到相应变量
-
生成器:使用了yield关键字的函数就是生成器,生成器是一类特殊的迭代器。
作用:
- 处理大量数据:生成器一次返回一个结果,而不是一次返回所有结果。比如
sum([i for i in range(10000000000000)])
会卡机;sum(i for i in range(10000000000000))
则不会 - 代码更加简洁:可以减少变量、空间
- 迭代器本身的作用
yield关键字有两点作用:
保存当前运行状态(断点),然后暂停执行,即将生成器(函数)挂起;可以使用next()函数让生成器从断点处继续执行,即唤醒生成器(函数)
将yield关键字后面表达式的值作为返回值返回,此时可以理解为起到了return的作用def __runProxyFetch(): for proxy in proxy_fetcher.run(): proxy_queue.put(proxy) class Fetcher(object): name = "fetcher" def run(self): # ... # 相比使用生成推导式 return [p for p in proxy_dict.values() if DoValidator.preValidator(p.proxy)], 使用yield生成器可以节省空间 for _ in proxy_dict.values(): if DoValidator.preValidator(_.proxy): yield _
- 处理大量数据:生成器一次返回一个结果,而不是一次返回所有结果。比如
-
应用部署:
①对apk换源;②设置时区
FROM python:3.6-alpine # .. # apk repository RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.ustc.edu.cn/g' /etc/apk/repositories # timezone RUN apk add -U tzdata && cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && apk del tzdata # ... ENTRYPOINT [ "sh", "start.sh" ]
docker-compose.yml
: 镜像还没编译好的情况。(如果自己改了功能并启用的话,需要用这种;或者自己发布镜像后用后一种)version: '2' services: proxy_pool: build: . container_name: proxy_pool ports: - "5010:5010" links: - proxy_redis environment: DB_CONN: "redis://@proxy_redis:6379/0" proxy_redis: image: "redis" container_name: proxy_redis
docker-compose.yml
:别人镜像已经编译好并上传version: "3" services: redis: image: redis expose: - 6379 web: restart: always image: jhao104/proxy_pool environment: - DB_CONN=redis://redis:6379/0 ports: - "5010:5010" depends_on: - redis
scheduler的逻辑
项目目录结构默写:
- settings: 配置文件
- main:启动文件
- api:提供获取proxy数据接口
- handler:
- loggerHandler:日志类
- configHandler:单例的配置接口类
- ProxyHandler: Proxy CRUD操作类
- fetcher: 代理数据获取类
- db:
- dbClinet: 存储功能接口类
- redisClient:存储功能实现类
- helper
- scheduler: 定时任务的定义与启动类
- validator: proxy有效性校验类
- check: 具体执行校验逻辑类
- proxy: 获取的proxy数据封装类
- utils:
- lazyProperty: 懒加载描述器
- singleton: 单例管理器类
- six: python2与python3兼容类
- webRequest: 网络请求封装类