百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程网 > 正文

干货分享,程序员自建代理ip池,轻松爬取数据不封ip没有反爬虫。

yuyutoo 2025-01-17 13:58 3 浏览 0 评论

代理池主要分为4个模块:存储模块、获取模块、检测模块、接口模块无私分享全套Python爬虫干货,如果你也想学习Python,@ 私信小编获取

存储模块

这里我们使用Redis的有序集合,集合的每一个元素都是不重复的。另外,有序集合的每一个元素都有一个分数字段。

具体代码实现如下(ippool_save.py)

MAX_SCORE = 100 #最高分
MIN_SCORE = 0 #最低分
INITIAL_SCORE = 10  #初始分数
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_PASSWORD = None
REDIS_KEY = 'proxies' #键名

import redis
from random import choice

class PoolEmptyError():
    def __str__(self):
        return PoolEmptyError

class RedisClient(object):
    def __init__(self,host=REDIS_HOST,port=REDIS_PORT,password=REDIS_PASSWORD):
        '''
        初始化
        :param host:地址
        :param port: 端口号
        :param password: 密码
        '''
        self.db = redis.StrictRedis(host=host,port=port,password=password,decode_responses=True)

    def add(self,proxy,score=INITIAL_SCORE):
        '''
        添加代理,设置初始分数
        :param proxy: 代理
        :param score: 分数
        :return: 添加结果
        '''
        if not self.db.zscore(REDIS_KEY,proxy):
            return self.db.zadd(REDIS_KEY,{proxy:score})

    def random(self):
        '''
        随即获取有效代理,首先尝试获取最高分数代理,如果最高分数不存在,则按照排名获取
        :return:
        '''
        result = self.db.zrangebyscore(REDIS_KEY,MAX_SCORE,MAX_SCORE)
        if len(result):
            return choice(result)
        else:
            result = self.db.zrevrange(REDIS_KEY,0,100)
            if len(result):
                return choice(result)
            else:
                raise PoolEmptyError

    def decrease(self, proxy):
        '''
        代理值减一分,分数小于最小值,则代理删除
        :param proxy: 代理
        :return: 修改后的代理分数
        '''
        score = self.db.zscore(REDIS_KEY,proxy)
        if score and score>MIN_SCORE:
            print("代理",proxy,"当前分数",score,"减1")
            return self.db.zincrby(REDIS_KEY,-1,proxy)
        else:
            print("代理",proxy,"当前分数",score,"移除")
            return self.db.zrem(REDIS_KEY,proxy)

    def exists(self,proxy):
        '''
        判断是否存在
        :param proxy: 代理
        :return: 是否存在
        '''
        return not self.db.zscore(REDIS_KEY,proxy) == None

    def max(self,proxy):
        '''
        将代理设置为MAX_SCORE
        :param proxy: 代理
        :return: 设置结果
        '''
        print("代理",proxy,"可用,设置为",MAX_SCORE)
        return self.db.zadd(REDIS_KEY,{proxy:MAX_SCORE})

    def count(self):
        '''
        获取数量
        :return:数量
        '''
        return self.db.zcard(REDIS_KEY)

    def all(self):
        '''
        获取全部代理
        :return: 全部代理列表
        '''
        return self.db.zrangebyscore(REDIS_KEY,MIN_SCORE,MAX_SCORE)


获取模块

获取模块的逻辑相对简单,首先要定义一个ippool_crawler.py来从各大网站抓取,具体代码如下:

import json
import requests
from lxml import etree
from ippool_save import RedisClient

class ProxyMetaclass(type):
    #参数依次是当前准备创建的类的对象;类的名字;类继承的父类集合;类的方法集合。
    def __new__(cls, name,bases,attrs):
        count = 0
        attrs['__CrawlFunc__'] = []
        for k,v in attrs.items():
            if 'crawl_' in k:
                attrs['__CrawlFunc__'].append(k)
                count+=1
        attrs['__CrawlFuncCount__'] = count
        return type.__new__(cls,name,bases,attrs)

class Crawler(object,metaclass=ProxyMetaclass):
    def __init__(self):
        self.proxy = RedisClient().random()
        self.proxies = {
            'http': 'http://' + self.proxy,
            'https': 'https://' + self.proxy
        }
    def get_proxies(self,callback):
        proxies = []
        for proxy in eval("self.{}()".format(callback)):
            print('成功获取代理',proxy)
            proxies.append(proxy)
        return proxies
        
	

我们还需要定义一个Getter类,用来动态地调用所有以crawl开头的方法,然后获取抓取到的代理,将其加入到数据库存储起来,具体代码如下(ippool_getter.py)

from ippool_save import RedisClient
from ippool_crawler import Crawler

POOL_UPPER_THRESHOLD = 1000

class Getter():
    def __init__(self):
        self.redis = RedisClient()
        self.crawler = Crawler()
    def is_over_threshold(self):
        if self.redis.count() >= POOL_UPPER_THRESHOLD:
            return True
        else:
            return False

    def run(self):
        print("获取器开始执行")
        if not self.is_over_threshold():
            for callback_label in range(self.crawler.__CrawlFuncCount__):
                callback = self.crawler.__CrawlFunc__[callback_label]
                proxies = self.crawler.get_proxies(callback)
                for proxy in proxies:
                    self.redis.add(proxy)

检测模块

我们已经将各个网站的代理都抓取下来了现在就需要一个检测模块来对所有代理进行多轮检测。

VALID_STATUS_CODES = [200]
TEST_URL = "http://www.baidu.com"
BATCH_TEST_SIZE = 100

from ippool_save import RedisClient
import aiohttp
import asyncio
import time

class Tester(object):
    def __init__(self):
        self.redis = RedisClient()

    async def test_single_proxy(self,proxy):
        conn = aiohttp.TCPConnector(verify_ssl=False)
        async with aiohttp.ClientSession(connector=conn) as session:
            try:
                if isinstance(proxy,bytes):
                    proxy = proxy.decode('utf-8')
                real_proxy = 'http://'+ proxy
                print("正在测试",proxy)
                async with session.get(TEST_URL,proxy=real_proxy,timeout=15) as response:
                    if response.status in VALID_STATUS_CODES:
                        self.redis.max(proxy)
                        print('代理可用',proxy)
                    else:
                        self.redis.decrease(proxy)
                        print('请求响应码不合法',proxy)
            except (TimeoutError,ArithmeticError):
                self.redis.decrease(proxy)
                print('代理请求失败',proxy)

    def run(self):
        print('测试开始运行')
        try:
            proxies = self.redis.all()
            loop = asyncio.get_event_loop()
            for i in range(0,len(proxies),BATCH_TEST_SIZE):
                test_proxies = proxies[i:i+BATCH_TEST_SIZE]
                tasks = [self.test_single_proxy(proxy) for proxy in test_proxies]
                loop.run_until_complete(asyncio.wait(tasks))
                time.sleep(5)
        except Exception as e:
            print('测试器发生错误', e.args)

接口模块

为了更方便地获取可用代理,我们增加了一个接口模块。

使用Flask来实现这个接口模块,实现代码如下(ippool_api.py)

from flask import Flask,g
from ippool_save import RedisClient

__all__ = ['app']
app = Flask(__name__)

def get_conn():
    if not hasattr(g,'redis'):
        g.redis = RedisClient()
    return  g.redis

@app.route('/')
def index():
    return '<h2>Welcome to Proxy Pool System</h2>'

@app.route('/random')
def get_proxy():
    conn = get_conn()
    return conn.random()

@app.route('/count')
def get_counts():
    conn = get_conn()
    return  str(conn.count())

if __name__ == '__main__':
    app.run()

调度模块

调度模块就是调用以上定义的3个模块,将这3个模块通过多进程的形式运行起来。

最后,只需要调用Scheduler的run()方法即可启动整个代码池。

TESTER_CYCLE = 20
GETTER_CYCLE = 20
TESTER_ENABLED = True
GETTER_ENABLED = True
API_ENABLED = True

from multiprocessing import Process
from ippool_api import app
from ippool_getter import Getter
from ippool_check import Tester
import time

class Scheduler():
    def schedule_tester(self,cycle=TESTER_CYCLE):
     tester = Tester()
     while True:
         print('测试器开始运行')
         tester.run()
         time.sleep(cycle)

    def schedule_getter(self,cycle=GETTER_CYCLE):
        getter = Getter()
        while True:
            print('开始抓取代理')
            getter.run()
            time.sleep(cycle)

    def schedule_api(self):
        app.run()

    def run(self):
        print('代理池开始运行')
        if TESTER_ENABLED:
            tester_process = Process(target=self.schedule_tester)
            tester_process.start()

        if GETTER_ENABLED:
            getter_process = Process(target=self.schedule_getter)
            getter_process.start()

        if API_ENABLED:
            api_process = Process(target=self.schedule_api)
            api_process.start()

if __name__ == '__main__':
    Scheduler().run()



为了帮助大家更轻松的学好Python,我给大家分享一套Python学习资料,希望对正在学习的你有所帮助!

获取方式:关注并私信小编 “ 学习 ”,即可免费获取!


相关推荐

墨尔本一华裔男子与亚裔男子分别失踪数日 警方寻人

中新网5月15日电据澳洲新快网报道,据澳大利亚维州警察局网站消息,22岁的华裔男子邓跃(Yue‘Peter’Deng,音译)失踪已6天,维州警方于当地时间13日发布寻人通告,寻求公众协助寻找邓跃。华...

网络交友须谨慎!美国犹他州一男子因涉嫌杀害女网友被捕

伊森·洪克斯克(图源网络,侵删)据美国广播公司(ABC)25日报道,美国犹他州一名男子于24日因涉嫌谋杀被捕。警方表示,这名男子主动告知警局,称其杀害了一名在网络交友软件上认识的25岁女子。雷顿警...

一课译词:来龙去脉(来龙去脉 的意思解释)

Mountainranges[Photo/SIPA]“来龙去脉”,汉语成语,本指山脉的走势和去向,现比喻一件事的前因后果(causeandeffectofanevent),可以翻译为“i...

高考重要考点:range(range高考用法)

range可以用作动词,也可以用作名词,含义特别多,在阅读理解中出现的频率很高,还经常作为完形填空的选项,而且在作文中使用是非常好的高级词汇。...

C++20 Ranges:现代范围操作(现代c++白皮书)

1.引言:C++20Ranges库简介C++20引入的Ranges库是C++标准库的重要更新,旨在提供更现代化、表达力更强的方式来处理数据序列(范围,range)。Ranges库基于...

学习VBA,报表做到飞 第二章 数组 2.4 Filter函数

第二章数组2.4Filter函数Filter函数功能与autofilter函数类似,它对一个一维数组进行筛选,返回一个从0开始的数组。...

VBA学习笔记:数组:数组相关函数—Split,Join

Split拆分字符串函数,语法Split(expression,字符,Limit,compare),第1参数为必写,后面3个参数都是可选项。Expression为需要拆分的数据,“字符”就是以哪个字...

VBA如何自定义序列,学会这些方法,让你工作更轻松

No.1在Excel中,自定义序列是一种快速填表机制,如何有效地利用这个方法,可以大大增加工作效率。通常在操作工作表的时候,可能会输入一些很有序的序列,如果一一录入就显得十分笨拙。Excel给出了一种...

Excel VBA入门教程1.3 数组基础(vba数组详解)

1.3数组使用数组和对象时,也要声明,这里说下数组的声明:'确定范围的数组,可以存储b-a+1个数,a、b为整数Dim数组名称(aTob)As数据类型Dimarr...

远程网络调试工具百宝箱-MobaXterm

MobaXterm是一个功能强大的远程网络工具百宝箱,它将所有重要的远程网络工具(SSH、Telnet、X11、RDP、VNC、FTP、MOSH、Serial等)和Unix命令(bash、ls、cat...

AREX:携程新一代自动化回归测试工具的设计与实现

一、背景随着携程机票BU业务规模的不断提高,业务系统日趋复杂,各种问题和挑战也随之而来。对于研发测试团队,面临着各种效能困境,包括业务复杂度高、数据构造工作量大、回归测试全量回归、沟通成本高、测试用例...

Windows、Android、IOS、Web自动化工具选择策略

Windows平台中应用UI自动化测试解决方案AutoIT是开源工具,该工具识别windows的标准控件效果不错,但是当它遇到应用中非标准控件定义的UI元素时往往就无能为力了,这个时候选择silkte...

python自动化工具:pywinauto(python快速上手 自动化)

简介Pywinauto是完全由Python构建的一个模块,可以用于自动化Windows上的GUI应用程序。同时,它支持鼠标、键盘操作,在元素控件树较复杂的界面,可以辅助我们完成自动化操作。我在...

时下最火的 Airtest 如何测试手机 APP?

引言Airtest是网易出品的一款基于图像识别的自动化测试工具,主要应用在手机APP和游戏的测试。一旦使用了这个工具进行APP的自动化,你就会发现自动化测试原来是如此简单!!连接手机要进行...

【推荐】7个最强Appium替代工具,移动App自动化测试必备!

在移动应用开发日益火爆的今天,自动化测试成为了确保应用质量和用户体验的关键环节。Appium作为一款广泛应用的移动应用自动化测试工具,为测试人员所熟知。然而,在不同的测试场景和需求下,还有许多其他优...

取消回复欢迎 发表评论: