自制爬蟲框架 - 新聞資訊 - 雲南小程序開發|雲南軟件開發|雲南網站建設-昆明融晨信息技術有限公司

159-8711-8523

雲南網建設/小程序開發/軟件開發

知識

不(bù)管是(shì)網站,軟件還是(shì)小程序,都要(yào / yāo)直接或間接能爲(wéi / wèi)您産生價值,我們在(zài)追求其視覺表現的(de)同時(shí),更側重于(yú)功能的(de)便捷,營銷的(de)便利,運營的(de)高效,讓網站成爲(wéi / wèi)營銷工具,讓軟件能切實提升企業内部管理水平和(hé / huò)效率。優秀的(de)程序爲(wéi / wèi)後期升級提供便捷的(de)支持!

您當前位置>首頁 » 新聞資訊 » 技術分享 >

自制爬蟲框架

發表時(shí)間:2020-10-19

發布人(rén):融晨科技

浏覽次數:39

自制python爬蟲程序模闆(爬蟲小白亦可用)

        • 1.mysql數據庫鏈接
        • 2. 頁面請求過程
        • 3. 數據提取處理
        • 4. 數據保存處理。

??在(zài)平時(shí)揮手大(dà)幹項目的(de)過程中,時(shí)不(bù)時(shí)會有一些小的(de)爬蟲任務需要(yào / yāo)處理,因此專門寫了(le/liǎo)一個(gè)爬蟲框架,基本覆蓋平常用到(dào)的(de)網站,覺得使用效果不(bù)錯,分享出(chū)來(lái)給大(dà)家使用,也(yě)請各路大(dà)神走過路過提些好的(de)意見。
??接下來(lái)爲(wéi / wèi)大(dà)家簡單介紹一下每個(gè)模塊實現過程及思路。本文結束後處會附全部代碼,前面代碼隻是(shì)便于(yú)大(dà)家理解,無需挨個(gè)粘貼。

1.mysql數據庫鏈接

??本程序使用mysql數據庫讀取和(hé / huò)保存數據,爲(wéi / wèi)了(le/liǎo)工作過程中的(de)安全和(hé / huò)方便,我們通過另外一個(gè)程序将數據庫鏈接賬号密碼等數據,保存中windows注冊表中,可通過 win+regedit 調出(chū)查看。(此塊僅适用于(yú)windows系統,若需在(zài)linux上(shàng)使用,則不(bù)使用此模塊鏈接數據庫),本模塊中數據庫鏈接方式見代碼:

    def read_setttings_zhuce(self, file, winn_c_u=winreg.HKEY_CURRENT_USER):
        """
            讀取注冊表中的(de)設置
        """
        parentkey = winreg.OpenKey(winn_c_u, file)
        # 獲取該鍵的(de)所有鍵值,因爲(wéi / wèi)沒有方法可以(yǐ)獲取鍵值的(de)個(gè)數,所以(yǐ)隻能用這(zhè)種方法進行遍曆
        item = dict()
        try:
            i = 0
            while True:
                # EnumValue方法用來(lái)枚舉鍵值,EnumKey用來(lái)枚舉子(zǐ)鍵
                name, value, type = winreg.EnumValue(parentkey, i)
                item[name] = value
                i += 1
        except Exception as e:
            pass
        return item
    
    def __init__(self, start_p):
        # 注意,super().__init__() 一定要(yào / yāo)寫
        # 而(ér)且要(yào / yāo)寫在(zài)最前面,否則會報錯。
        super().__init__()
        self.item_fwq = self.read_setttings_zhuce("Software\lxl\lxl_program")
        # 鏈接數據庫
        self.conn = pymysql.connect(
            user=self.item_fwq["user"], password=self.item_fwq["password"], host=self.item_fwq["host"], port=int(self.item_fwq["port"]),
            database=self.item_fwq["database"], use_unicode=True,
            charset="utf8")
        self.start_p = start_p
        print("數據庫開啓中......")
        # 獲取遊标
        self.cursor = self.conn.cursor()

2. 頁面請求過程

??此處說(shuō)明一下,整個(gè)模塊是(shì)通過dict來(lái)傳遞數據的(de),因此在(zài)使用過程中,可以(yǐ)随時(shí)随地(dì / de)添加我們需要(yào / yāo)傳遞的(de)參數。我們平常用到(dào)的(de)頁面一般是(shì)get或post請求方式,get方式通過修改傳遞的(de)url鏈接即可請求獲取數據,post方式通過data參數傳遞獲取數據。因此将兩種方式分開處理。同時(shí)将請求回來(lái)的(de)數據做deocde解碼處理,一般遇到(dào)的(de)有utf8或者GBK的(de),我寫了(le/liǎo)兩種,如果你們使用過程中出(chū)現其他(tā)的(de)解碼,添加上(shàng)去即可,此處代碼比較low我就(jiù)不(bù)貼在(zài)此處了(le/liǎo),各位結尾處直接複制即可,(我貼幾行重點吧,否則好像顯得此處特殊)。

        item_fwq_ip = read_setttings_zhuce("Software\lxl\lxl_program")
        # 讀取實時(shí)寫入windows注冊表中的(de)ip代理  本人(rén)喜歡使用無憂代理 不(bù)是(shì)打廣告,而(ér)是(shì)品質确實好
        proxies = {"http": "%s" % item_fwq_ip["ip"], "https": "%s" % item_fwq_ip["ip"]}
        headers = {
             "user-agent": item_fwq_ip['user_agent']
        }
        try:
            response = requests.get(url=url, headers=headers, timeout=20).content
            if response:
                return response
        except Exception as f:
            print("重新請求")
        try:
            response = requests.post(url=url, headers=headers, data=data,timeout=20).content
            if response:
                return response
        except Exception as f:
            print("重新請求")

3. 數據提取處理

??頁面請求成功之(zhī)後,會返回三種格式,一種是(shì)html格式,一種是(shì)json格式,還有一種是(shì)我請求不(bù)到(dào)數據返回的(de)無數據結果(未針對此處如何處理,若有需要(yào / yāo),自行處理)。針對html格式我們使用xpath解析數據(本來(lái)想着能不(bù)能通過代碼去自動處理xpath,太忙沒時(shí)間,以(yǐ)後補上(shàng)吧);針對json格式,就(jiù)簡單許多了(le/liǎo),直接對應讀取出(chū)來(lái)即可。兩種格式處理之(zhī)後,将數據以(yǐ)dict格式傳遞至數據保存處理中即可 見代碼:

    def response_json(self, response, meta={}):
        """
            json 格式解析
        """
        list_data = response['result']['data']
        for ds in list_data:
            item = dict()
            """
                此處可以(yǐ)對數據進行處理,若不(bù)需特殊處理的(de) 則直接合并到(dào)item字典中,保存入數據庫
                列: item["pid] = ds['id']
            """
            item = {**item, **meta}
            where_list = ["pid"]  # 此處添加mysql保存判斷條件中查詢的(de)字段 可寫多個(gè)字段
            table_name = 'your_databases_tablename'  # 此處添加你需要(yào / yāo)保存的(de)數據表名稱 注: 若沒有新建數據表, 代碼可自動建立新的(de)數據表
            self.mysql_f_item(item, table_name=table_name, where_list=where_list)
    
    def response_html(self, response, meta={}):
        """
            html 格式解析
        """
        list_response = response.xpath('//div[@class="name"]')
        for resp in list_response:
            item = dict()
            """
                此處可以(yǐ)對數據進行xpath解析處理,保存入數據庫
                列: item["pid] = resp.xpath('./a/@href')[0]
            """
            print(item)
            item = {**item, **meta}
            where_list = ["pid"]  # 此處添加mysql保存判斷條件中查詢的(de)字段 可寫多個(gè)字段
            table_name = "your_databases_tablename" # 此處添加你需要(yào / yāo)保存的(de)數據表名稱 注: 若沒有新建數據表, 代碼可自動建立新的(de)數據表
            self.mysql_f_item(item, table_name, where_list=where_list)

4. 數據保存處理。

??數據庫選用mysql保存,在(zài)此模塊中,我加入了(le/liǎo)自動創建表和(hé / huò)自動拼接sql的(de)功能,傳入一個(gè)數據表名稱,若存在(zài)則進行下一步處理,不(bù)存在(zài)會進行數據表創建,此時(shí)dict中的(de)字段名稱就(jiù)起到(dào)了(le/liǎo)一定的(de)作用,我通過字段中所帶的(de)值,作爲(wéi / wèi)創建字段的(de)類型(此處也(yě)可自行添加);同時(shí)數據保存過程中,有時(shí)會需要(yào / yāo)做判重,通過在(zài)指定列表 where_list 中添加字段即可(默認爲(wéi / wèi)空,不(bù)判重。其他(tā)的(de)沒什麽了(le/liǎo)都是(shì)一些常規操作了(le/liǎo)。見代碼:

        sql = "insert into %s(" % table_name
        for item in lst:
            sql = sql + "`%s`," % item
        sql = sql.strip(',') + ") values ("
        if list_flag is False:
            for item in lst:
                sql = sql + "'{%s}'," % item
        else:
            for i in range(len(lst)):
                sql = sql + "'{0[%s]}'," % i
        sql = sql.strip(',') + ")"
        return sql

                sql_begin = """CREATE TABLE `%s` (  `id` int(11) NOT NULL AUTO_INCREMENT,""" % table_name
        sql_end = """ PRIMARY KEY (`id`)
                    ) ENGINE=%s AUTO_INCREMENT=0 DEFAULT CHARSET=%s;""" % (engine, charset)
        sql_temp = " `%s` varchar(256) DEFAULT NULL,"
        sql_temp_time = "`%s` datetime DEFAULT NULL,"
        sql_temp_content = "`%s` text,"
        sql_temp_sgin = "`%s` enum('0','1') DEFAULT '0',"
        sql = str()
        for item in lst:
            # 生成新的(de)數據表時(shí) 可根據item中的(de)字段名稱 來(lái)決定數據庫中字段的(de)類型
            if "time" in item:
                sql += sql_temp_time % item
            elif "content" in item:
                sql += sql_temp_content % item
            elif "sgin" in item:
                sql += sql_temp_sgin % item
            else:
                sql += sql_temp % (item)

        sql = sql_begin + sql + sql_end
        return sql

??好了(le/liǎo),這(zhè)次就(jiù)寫到(dào)這(zhè)裏吧,如果之(zhī)後對這(zhè)個(gè)模塊做大(dà)的(de)更新或調整再說(shuō)吧。 如果對以(yǐ)上(shàng)代碼有不(bù)懂之(zhī)處,可以(yǐ)發送至郵件 xiang_long_liu@163.com,大(dà)家共同探讨吧。
結尾處付全部代碼:

import requests, winreg, pymysql, re, json
from lxml import etree
from threading import Thread
import settings  # 将服務器數據庫等鏈接方式寫入windows注冊表中,然後再在(zài)該程序中讀取出(chū)來(lái)


def read_setttings_zhuce(file, winn_c_u=winreg.HKEY_CURRENT_USER):
    """
        讀取注冊表中的(de)設置
    """
    parentkey = winreg.OpenKey(winn_c_u, file)
    # 獲取該鍵的(de)所有鍵值,因爲(wéi / wèi)沒有方法可以(yǐ)獲取鍵值的(de)個(gè)數,所以(yǐ)隻能用這(zhè)種方法進行遍曆
    item = dict()
    try:
        i = 0
        while True:
            # EnumValue方法用來(lái)枚舉鍵值,EnumKey用來(lái)枚舉子(zǐ)鍵
            name, value, type = winreg.EnumValue(parentkey, i)
            # print(name, value)
            item[name] = value
            i += 1
    except Exception as e:
        pass
    return item


class ALi_Main(Thread):

    def read_setttings_zhuce(self, file, winn_c_u=winreg.HKEY_CURRENT_USER):
        """
            讀取注冊表中的(de)設置
        """
        parentkey = winreg.OpenKey(winn_c_u, file)
        # 獲取該鍵的(de)所有鍵值,因爲(wéi / wèi)沒有方法可以(yǐ)獲取鍵值的(de)個(gè)數,所以(yǐ)隻能用這(zhè)種方法進行遍曆
        item = dict()
        try:
            i = 0
            while True:
                # EnumValue方法用來(lái)枚舉鍵值,EnumKey用來(lái)枚舉子(zǐ)鍵
                name, value, type = winreg.EnumValue(parentkey, i)
                item[name] = value
                i += 1
        except Exception as e:
            pass
        return item

    def __init__(self, start_p):
        # 注意,super().__init__() 一定要(yào / yāo)寫
        # 而(ér)且要(yào / yāo)寫在(zài)最前面,否則會報錯。
        super().__init__()
        self.item_fwq = self.read_setttings_zhuce("Software\lxl\lxl_program")
        # 鏈接數據庫
        self.conn = pymysql.connect(
            user=self.item_fwq["user"], password=self.item_fwq["password"], host=self.item_fwq["host"], port=int(self.item_fwq["port"]),
            database=self.item_fwq["database"], use_unicode=True,
            charset="utf8")
        self.start_p = start_p
        print("數據庫開啓中......")
        # 獲取遊标
        self.cursor = self.conn.cursor()

    def main(self, url="https://www.baidu.com/", formdata={}, meta={}):
        """
            開關
        """
        response = self.url_f_requests(url, formdata)
        if response != "無結果":
            # 對返回的(de)結果解碼
            response = self.response_decode(response)
            print(response)
            response, fangshi = self.t_f_response_json_html(response)
            if fangshi is "json":
                self.response_json(response, meta)
            elif fangshi is "html":
                self.response_html(response, meta)
            else:
                print(fangshi)
                print("返回的(de)頁面數據有誤請檢查")
        else:
            print("數據無結果,未獲取到(dào)")
            
    def url_f_requests(self, url, formdata):
        """
            get / post 請求發送
        """
        if formdata == {}:
            response = self.requests_url(url)
            print("{INFO}:url以(yǐ) get 方式請求")
            # print(response)
        else:
            response = self.requests_url_post(url, formdata)
            print("{INFO}:url以(yǐ) post 方式請求")
            # print(response)
        return response
            
    def t_f_response_json_html(self, response):
        """
            判斷返回的(de)結果
        """
        try:
            response = json.loads(response)
            print("{INFO}:數據以(yǐ)json格式返回")
            return response, "json"
        except Exception as f:
            try:
                response = etree.HTML(response)
                print("{INFO}:數據以(yǐ)html格式返回")
                return response, "html"
            except Exception as f:
                response = response
                return response, "None"
        
    def response_decode(self, response):
        """
            對返回的(de)結果解碼
        """
        try:
            response = response.decode()
            print("{INFO}:數據以(yǐ)utf-8解碼")
        except Exception as f:
            try:
                response = response.decode("GBK")
                print("{INFO}:數據以(yǐ) GBK 解碼")
            except Exception as f:
                print("{INFO}:數據以(yǐ)未指定解碼方式返回")
                response = response
        return response
        
    def response_json(self, response, meta={}):
        """
            json 格式解析
        """
        list_data = response['result']['data']
        for ds in list_data:
            item = dict()
            """
                此處可以(yǐ)對數據進行處理,若不(bù)需特殊處理的(de) 則直接合并到(dào)item字典中,保存入數據庫
                列: item["pid] = ds['id']
            """
            item = {**item, **meta}
            where_list = ["pid"]  # 此處添加mysql保存判斷條件中查詢的(de)字段 可寫多個(gè)字段
            table_name = 'your_databases_tablename'  # 此處添加你需要(yào / yāo)保存的(de)數據表名稱 注: 若沒有新建數據表, 代碼可自動建立新的(de)數據表
            self.mysql_f_item(item, table_name=table_name, where_list=where_list)
    
    def response_html(self, response, meta={}):
        """
            html 格式解析
        """
        list_response = response.xpath('//div[@class="name"]')
        for resp in list_response:
            item = dict()
            """
                此處可以(yǐ)對數據進行xpath解析處理,保存入數據庫
                列: item["pid] = resp.xpath('./a/@href')[0]
            """
            print(item)
            item = {**item, **meta}
            where_list = ["pid"]  # 此處添加mysql保存判斷條件中查詢的(de)字段 可寫多個(gè)字段
            table_name = "your_databases_tablename" # 此處添加你需要(yào / yāo)保存的(de)數據表名稱 注: 若沒有新建數據表, 代碼可自動建立新的(de)數據表
            self.mysql_f_item(item, table_name, where_list=where_list)
        
    def mysql_f_item(self, item, table_name="new_table_name", where_list=[]):
        """
            保存創建mysql數據庫
        """
        lst = item.keys()
        # print(lst)
        insert_sql = self.create_insert_sql_for_list(table_name=table_name, lst=lst)
        insert_sql = insert_sql.format(**item)
        # print(insert_sql)
        select_sql = self.create_select_sql(table_name=table_name, where_list=where_list)
        select_sql = select_sql.format(**item)
        # print(select_sql)
        self.insert_mysql(insert_sql=insert_sql, select_sql=select_sql, table_name=table_name, lst=lst)
        print("--------------------------------")

    def create_insert_sql_for_list(self, table_name, lst, list_flag=False):
        """
        動态生成sql文(單條)
        :param table_name:表名
        :param lst:插入的(de)數據列表
        :param list_flag: true:代表lst字段是(shì) list嵌套list,   false:代表list嵌套dict
        :return:返回單條插入的(de)sql
        """
        sql = "insert into %s(" % table_name
        for item in lst:
            sql = sql + "`%s`," % item
        sql = sql.strip(',') + ") values ("
        if list_flag is False:
            for item in lst:
                sql = sql + "'{%s}'," % item
        else:
            for i in range(len(lst)):
                sql = sql + "'{0[%s]}'," % i
        sql = sql.strip(',') + ")"
        return sql

    def create_select_sql(self, table_name, where_list):
        """
            動态生成sql文
        """
        if where_list == []:
            return ""
        else:
            sql = 'select id from %s where' % table_name
            for i in range(len(where_list)):
                sql = sql + " `%s` = '{%s}' and " % (where_list[i], where_list[i])
            sql = sql.strip('and ')
            # print(sql)
            return sql

    def create_table(self, table_name, lst, engine='MyISAM', charset='utf8'):
        """
        生成建表sql
        :param table_name:表名
        :param lst:字段列表
        :param engine:數據庫類型
        :param charset:字符集
        :return:sql
        """
        sql_begin = """CREATE TABLE `%s` (  `id` int(11) NOT NULL AUTO_INCREMENT,""" % table_name
        sql_end = """ PRIMARY KEY (`id`)
                    ) ENGINE=%s AUTO_INCREMENT=0 DEFAULT CHARSET=%s;""" % (engine, charset)
        sql_temp = " `%s` varchar(256) DEFAULT NULL,"
        sql_temp_time = "`%s` datetime DEFAULT NULL,"
        sql_temp_content = "`%s` text,"
        sql_temp_sgin = "`%s` enum('0','1') DEFAULT '0',"
        sql = str()
        for item in lst:
            # 生成新的(de)數據表時(shí) 可根據item中的(de)字段名稱 來(lái)決定數據庫中字段的(de)類型
            if "time" in item:
                sql += sql_temp_time % item
            elif "content" in item:
                sql += sql_temp_content % item
            elif "sgin" in item:
                sql += sql_temp_sgin % item
            else:
                sql += sql_temp % (item)

        sql = sql_begin + sql + sql_end
        return sql

    def insert_mysql(self, insert_sql, select_sql='', update_sql='', table_name='', lst=()):
        """
            保存數據
        """
        while True:
            # 獲取遊标
            self.conn.ping(reconnect=True)
            if select_sql:
                try:
                    self.cursor.execute(select_sql)
                    if self.cursor.fetchone() is None:
                        print(insert_sql)
                        try:
                            self.cursor.execute(insert_sql)
                            self.conn.commit()
                            print("數據保存中......")
                            if update_sql:
                                self.cursor.execute(update_sql)
                                self.conn.commit()
                                print("數據更新中......")
                            break
                        except Exception as f:
                            # print(insert_sql)
                            print(f)
                            print("數據保存失敗")
                            break
                    else:
                        print("數據已存在(zài)")
                    break
                except Exception as f:
                    print(f)
                    # 首次執行 創建一個(gè)新的(de)數據表
                    if "Table" in str(f) and "doesn't exist" in str(f):
                        print("*" * 100)
                        print("創建數據庫中......")
                        sql = self.create_table(table_name=table_name, lst=lst)
                        self.cursor.execute(sql)
                        self.conn.commit()
                    else:
                        break
            else:
                try:
                    print(insert_sql)
                    print("數據保存中......")
                    self.cursor.execute(insert_sql)
                    self.conn.commit()
                    break
                except Exception as f:
                    print(f)
                    # 首次執行 創建一個(gè)新的(de)數據表
                    if "Table" in str(f) and "doesn't exist" in str(f):
                        print("*" * 100)
                        print("創建數據庫中......")
                        sql = self.create_table(table_name=table_name, lst=lst)
                        self.cursor.execute(sql)
                        self.conn.commit()
                    else:
                        print("保存失敗")
                        break

    def getDropStr(self, l_strHtml):
        """清洗字符串"""
        strList = re.findall(
            r'[一-龥a-zA-Z0-9,.;?!_\]\'\"\[{}+-\u2014\u2026\uff1b\u3002\uff1b\uff0c\uff1a\u201c\u201d\uff08\uff09\u3001\uff1f\u300a\u300b]',
            l_strHtml)
        return "".join(strList)

    def requests_url(self, url, data=None):
        """
            發送請求,返回相應
        """
        item_fwq_ip = read_setttings_zhuce("Software\lxl\lxl_program")
        # 讀取實時(shí)寫入windows注冊表中的(de)ip代理  本人(rén)喜歡使用無憂代理 不(bù)是(shì)打廣告,而(ér)是(shì)品質确實好
        proxies = {"http": "%s" % item_fwq_ip["ip"], "https": "%s" % item_fwq_ip["ip"]}
        headers = {
             "user-agent": item_fwq_ip['user_agent']
        }
        try:
            response = requests.get(url=url, headers=headers, timeout=20).content
            if response:
                return response
        except Exception as f:
            print("重新請求")
            i = 0
            while True:
                i += 1
                if i >= 5:
                    return "無結果"
                try:
                    response = requests.get(url=url, headers=headers, proxies=proxies, timeout=20).content
                    if response:
                        return response
                except Exception as f:
                    print("重新請求")
        
    def requests_url_post(self, url, data):
        """
            發送請求,返回相應
        """
        item_fwq_ip = read_setttings_zhuce("Software\lxl\lxl_program")
        # 讀取實時(shí)寫入windows注冊表中的(de)ip代理  本人(rén)喜歡使用無憂代理 不(bù)是(shì)打廣告,而(ér)是(shì)品質确實好
        proxies = {"http": "%s" % item_fwq_ip["ip"], "https": "%s" % item_fwq_ip["ip"]}
        headers = {
            "user-agent": item_fwq_ip['user_agent']
        }
        try:
            response = requests.post(url=url, headers=headers, data=data,timeout=20).content
            if response:
                return response
        except Exception as f:
            print("重新請求")
            i = 0
            while True:
                i += 1
                if i >= 5:
                    return "無結果"
                try:

                    response = requests.post(url=url, headers=headers, data=data, proxies=proxies, timeout=20).content
                    if response:
                        return response
                except Exception as f:
                    print("重新請求")
    
    def __del__(self):
        self.cursor.close()
        self.conn.close()
        print("數據庫關閉中......")


def main_thread(number_p):
    """
        多線程啓動
        若使用多線程爬取是(shì) 将 main 函數改爲(wéi / wèi) run 函數 傳遞參數控制url使用個(gè)數從而(ér)決定多線程條數
    """
    print("多線程啓動程序")
    list_thread = list()
    for p in range(0, number_p+1000, 1000):
        thread = ALi_Main(p)
        list_thread.append(thread)
    
    for threads in list_thread:
        threads.start()
    
    for threads in list_thread:
        threads.join()


if __name__ == '__main__':
    # 初始化
    # settings.main()
    
    alm = ALi_Main(0)

    meta = dict()
    meta["key_name"] = "傳值"
    url = "https://search.sina.com.cn/?range=title&q=" + str(meta["key_name"]) + "&c=news&time=&ie=utf-8&col=&source=&from=&country=&size=&a=&page=1&pf=0&ps=0&dpc=1"
    print(url)
    alm.main(url=url, meta=meta)

相關案例查看更多