IP代理池2.0版本,加入多进程以及多线程

网络爬虫

浏览数:305

2019-2-26

这次对之前的代理池1.0版本进行了升级,可用性大大增加了,也增加了一些IP源头的获取,包括西刺高匿代理前50页的IP抓取,还有对于TXT文件里面的IP存入数据库的操作,因为楼主是测试了免费的代理之后再也不想用了,就直接去淘宝买了20几块一个月,还能接受,因为返回的IP是在txt文件里,所以有了这个操作。顺便提一下,之前抓取了西刺免费2000多个IP,测出来能用的只有20个左右,妈的,亮瞎我的眼,好了,先看代码框架。

Paste_Image.png

先看封装的数据库操作

from pymongo import MongoClient,errors
from _datetime import datetime,timedelta

class mogo_queue(object):
    OUTSTANDING = 1  ##初始状态
    PROCESSING = 2  ##测试过后的状态

    def __init__(self, db, collection):
        self.client = MongoClient()
        self.database = self.client[db]  # 链接数据库
        self.db = self.database[collection]  # 链接数据库里面这个表
    def __bool__(self):
        """
        这个函数,我的理解是如果下面的表达为真,则整个类为真
        至于有什么用,后面我会注明的(如果我的理解有误,请指点出来谢谢,我也是Python新手)
        $ne的意思是不匹配
        """
        record = self.db.find_one(
            {'status': {'$ne': self.PROCESSING}}
        )
        return True if record else False
    def push_ip_url(self,url):
        self.db.insert({'_id':url})
        print('IP链接{}插入成功'.format(url))
    def find_url(self):#找到所有代理的url
        url_list=[]
        for i in self.db.find():
            url= i['_id']
            url_list.append(url)
        return url_list
    def find_proxy(self):
        proxy_list = []  # 用来接收从数据库查找到的所有代理
        for i in self.db.find():
            proxy = i['proxy']
            proxy_list.append(proxy)
        return proxy_list
    def push_ip(self,ip,port,proxy):#把代理插进数据库的操作
        try:
            self.db.insert({'_id':ip,'port':port,'proxy':proxy,'status':self.OUTSTANDING})
            print(proxy,'代理插入成功')
        except errors.DuplicateKeyError as e:#对于重复的ip不能插入
            print(proxy,'已经存在队列中')
    def find_one_ip(self):
        record = self.db.find_and_modify(
            query={'status': self.OUTSTANDING},#改变状态,防止另外的进程也车市同一个ip
            update={'$set': {'status': self.PROCESSING,}}
        )
        if record:
            return record['proxy']
        else:
            raise KeyError
    def status_setting(self):
        record = self.db.find({'status':self.PROCESSING})#找到所有状态为2的代理,
        #就是之前测试过的,以备重新测试,毕竟很多IP存活率不高
        #print(record)
        for i in record:
            print(i)
            id=i["_id"]
            #query={'status':self.PROCESSING},
            self.db.update({'_id':id},{'$set': {'status': self.OUTSTANDING }})#该状态为1,
            #重新测试
            print('代理',id,'更改成功')
        # if record:
        #     return record
    def delete_proxy(self,proxy):
        """这个函数是更新已完成的URL完成"""
        self.db.delete_one({'proxy': proxy})
        print('无效代理{}删除成功'.format(proxy))

下面是test文件

import requests
from pymongo import MongoClient
import re
from bs4 import BeautifulSoup
from  mogodb_operate import mogo_queue
from proxy_request import  request
url_queue = mogo_queue('ip_database','ip_link_collection')
ip_queue = mogo_queue('ip_database','proxy_collection')

class ip_operator():
    @staticmethod
    def insert_xici_url(page):
        urls = ['http://www.xicidaili.com/nn/{}'.format(str(i)) for i in range(page)]
      #构造西刺网前面page页的URL
        for url in urls:
            #print(url)
            url_queue.push_ip_url(url)#插进URL数据库

    @staticmethod
    def catch_ip_xici():#爬取西刺网前面50页的免费IP
        ip_url=url_queue.find_url()
        for url in ip_url:
            data = request.get(url,3)
            all_data = BeautifulSoup(data.text, 'lxml')
            all_ip = all_data.find_all('tr', class_='odd')
            for i in all_ip:
                ip = i.find_all('td')[1].get_text()  # ip
                port = i.find_all('td')[2].get_text()  # 端口
                proxy = (ip + ':' + port).strip()  # 组成成proxy代理
                ip_queue.push_ip(ip,port,proxy)#插进数据库
    #ip_queue.find_one_ip()
    @staticmethod#本来还想把快代理的抓取也封装进来,
     然后测试了西刺之后,觉得免费IP就算了吧....,这段代码大家可以无视
    def insert_kuaidaili_url(page):
        urls=['http://www.kuaidaili.com/free/inha/{}/'.format(str(i)) for i in range(page)]
        for url in urls:
            url_queue.push_ip_url(url)
    #catch_url_ip()
    @staticmethod
    def insert_ip_text():#把txt文件里面的ip存进数据库
        f= open('C:\\Users\\admin\\Desktop\\ip_daili.txt',encoding='utf-8')
        data = f.read()
        proxy=data.split('\n')
        for i in proxy:
            proxie = i,
            ip =i.split(':')[0]# ip
            port =i.split(":")[1]   # 端口
            #proxie = str(ip)+':'+str(port),
            #print(ip,port,i)
            ip_queue.push_ip(ip,port,i)
        #print(proxy)
        f.close()



ip_operator.catch_ip_xici()
#爬西刺的代理,存进数据库以待检验
ip_operator().insert_ip_text()
#把txt文件里面保存的IP插进数数据库以待检验存进数据库以待检验

接下来是爬虫主程序,写了好久啊,哎,都是因为要爬文书网才写了这玩意

import requests
from pymongo import MongoClient
import threading
from bs4 import BeautifulSoup
import time
import re
from  mogodb_operate import mogo_queue
url = 'http://ip.chinaz.com/getip.aspx'#这个是用于测试IP有效性的网站,
IP正常情况下会返回IP,以及IP所在地址
import multiprocessing
ip_queue = mogo_queue('ip_database','proxy_collection')#链接到储存IP的数据库
def ip_catch(max_threads=9):

    def test_effictive_ip():
        while True:#不断循环,找到数据进行测试
            try:
                proxy = ip_queue.find_one_ip()#提取IP,准备测试
   
                try:
                    proxies = {'http':'http://{}'.format(proxy),
                                            'https':'http://{}'.format(proxy),}
                    html = requests.get(url,proxies=proxies,timeout=1)

                    status_number = re.findall(r'\d\d\d', str(html))[0]#提取网页返回码
                    re_ip = re.findall(r'\{ip',html.text)#有些ip极其恶心,
                    虽然返回的是200数字,
                    表示正常,实则是bad request,这里去除掉
                    #print(re_ip)
                    if status_number==str(200):
                        if re_ip:

                            #检验代理是否能正常使用
                            print('网页返回状态码:',html,proxy,'代理有效,地址是:',html.text)
                        else:
                            ip_queue.delete_proxy(proxy)
                    else:
                        ip_queue.delete_proxy(proxy)
                except:
                    ip_queue.delete_proxy(proxy)
            except KeyError:

                print('队列没有数据了')
                break
                #print(proxy,'代理无效')
    threads = []
    while threads or ip_queue:
        """
                这儿crawl_queue用上了,就是我们__bool__函数的作用,
                为真则代表我们MongoDB队列里面还有IP没检测完,
                也就是状态依然是没有改变,还有没被测试过的IP
                threads 或者 为真都代表我们还没下载完成,程序就会继续执行
        """
        for thread in threads:
            if not thread.is_alive():  ##is_alive是判断是否为空,不是空则在队列中删掉
                    threads.remove(thread)
        while len(threads) < max_threads :  ##线程池中的线程少于max_threads 或者 crawl_qeue时
            thread = threading.Thread(target=test_effictive_ip)  ##创建线程
            thread.setDaemon(True)  ##设置守护线程
            thread.start()  ##启动线程
            threads.append(thread)  ##添加进线程队列
        time.sleep(5)
def process_crawler():
    process = []
    num_cpus = multiprocessing.cpu_count()
    print('将会启动进程数为:', num_cpus)
    for i in range(num_cpus):
        p = multiprocessing.Process(target=ip_catch)  ##创建进程
        p.start()  ##启动进程
        process.append(p)  ##添加进进程队列
    for p in process:
        p.join()  ##等待进程队列里面的进程结束


if __name__ == "__main__":
    ip_queue.status_setting()#重置状态,以便测试
    process_crawler()

于是我在淘宝每次提取5000个IP,都有差不多300个能有,已经满足了,以后找到更好的IP代理源再分享出来,上张运行图和测试过后的图,开始之前是5000,最后只剩几百,源头问题真的很重要

Paste_Image.png
Paste_Image.png