十年网站开发经验 + 多家企业客户 + 靠谱的建站团队
量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决
python中怎么实现分布式抓取网页,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。
成都创新互联公司专注为客户提供全方位的互联网综合服务,包含不限于成都做网站、网站制作、安吉网络推广、重庆小程序开发、安吉网络营销、安吉企业策划、安吉品牌公关、搜索引擎seo、人物专访、企业宣传片、企业代运营等,从售前售中售后,我们都将竭诚为您服务,您的肯定,是我们最大的嘉奖;成都创新互联公司为所有大学生创业者提供安吉建站搭建服务,24小时服务热线:13518219792,官方网址:www.cdcxhl.com
[python]
view plaincopy
'''''
Created on 2010-9-15
@author: chenggong
'''
import urllib2
import re
import socket
DEBUG = 0
'''''
工具类
'''
class Tools():
#log函数
@staticmethod
def writelog(level,info,notify=False):
if DEBUG == 0:
try:
print "["+level+"]"+info.decode('UTF-8').encode('GBK')
except:
print "["+level+"]"+info.encode('GBK')
else:
print "["+level+"]"+info
#if notify:
# print "[notify]报告管理员!!"
#转unicode
@staticmethod
def toUnicode(s,charset):
if( charset == "" ):
return s
else:
try:
u = unicode( s, charset )
except:
u = ""
return u
#正则抓取
#@param single 是否只抓取一个
@staticmethod
def getFromPatten(patten,src,single=False):
rst = "";
p = re.compile(patten,re.S)
all = p.findall(src)
for matcher in all:
rst += matcher + " "
if( single ):
break
return rst.strip()
'''''
网页内容爬虫
'''
class PageGripper():
URL_OPEN_TIMEOUT = 10 #网页超时时间
MAX_RETRY = 3 #最大重试次数
def __init__(self):
socket.setdefaulttimeout(self.URL_OPEN_TIMEOUT)
#获取字符集
def getCharset(self,s):
rst = Tools.getFromPatten(u'charset=(.*?)"',s,True)
if rst != "":
if rst == "utf8":
rst = "utf-8"
return rst
#尝试获取页面
def downloadUrl(self,url):
charset = ""
page = ""
retry = 0
while True:
try:
fp = urllib2.urlopen(url)
break
except urllib2.HTTPError,e: #状态错误
Tools.writelog('error','HTTP状态错误 code='+e.code)
raise urllib2.HTTPError
except urllib2.URLError,e: #网络错误超时
Tools.writelog('warn','页面访问超时,重试..')
retry+=1
if( retry > self.MAX_RETRY ):
Tools.writelog('warn','超过最大重试次数,放弃')
raise urllib2.URLError
while True:
line = fp.readline()
if charset == "":
charset = self.getCharset(line)
if not line:
break
page += Tools.toUnicode(line,charset)
fp.close()
return page
#获取页面
def getPageInfo(self,url):
Tools.writelog( "info","开始抓取网页,url= "+url)
info = ""
try:
info = self.downloadUrl(url)
except:
raise
Tools.writelog("debug","网页抓取成功")
return info
'''''
内容提取类
'''
class InfoGripper():
pageGripper = PageGripper()
def __init__(self):
Tools.writelog('debug',"爬虫启动")
#抓取标题
def griptitle(self,data):
title = Tools.getFromPatten(u'box2t sp">
if title == "":
title = Tools.getFromPatten(u'
return title.strip()
#抓取频道
def gripchannel(self,data):
zone = Tools.getFromPatten(u'频道:(.*?)',data,True)
channel = Tools.getFromPatten(u'
return channel
#抓取标签
def griptag(self,data):
zone = Tools.getFromPatten(u'标签:(.*?)[^a].*>',data,True);
rst = Tools.getFromPatten(u'>(.*?)',zone,False);
return rst
#抓取观看次数
def gripviews(self,data):
rst = Tools.getFromPatten(u'已经有(.*?)次观看',data);
return rst
#抓取发布时间
def griptime(self,data):
rst = Tools.getFromPatten(u'在(.*?)发布',data,True)
return rst
#抓取发布者
def gripuser(self,data):
rst = Tools.getFromPatten(u'title="点击进入(.*?)的用户空间"',data,True)
return rst
#获取页面字符集
def getPageCharset(self,data):
charset = Tools.getFromPatten(u'charset=(.*?)"',data,True)
if( charset == "utf8" ):
charset = "utf-8"
return charset
#获取CC相关数据
def getCCData(self,data):
zone = Tools.getFromPatten(u'SWFObject(.*?)',data,True)
#判断是否使用bokecc播放
isFromBokeCC = re.match('.*bokecc.com.*', zone)
if( not isFromBokeCC ):
return "",""
ccSiteId = Tools.getFromPatten(u'siteid=(.*?)[&,"]',zone,True)
ccVid = Tools.getFromPatten(u'vid=(.*?)[&,"]',zone,True)
return ccSiteId,ccVid
#获取站内vid
def gripVideoId(self,data):
vid = Tools.getFromPatten(u'var vid = "(.*?)"',data,True)
return vid
#获取点击量
def gripViewsAjax(self,vid,url,basedir):
host = Tools.getFromPatten(u'http://(.*?)/',url,True)
ajaxAddr = "http://" + host + basedir + "/index.php/ajax/video_statistic/" + vid
'''''
try:
content = self.pageGripper.getPageInfo(ajaxAddr)
except Exception,e:
print e
Tools.writelog ("error", ajaxAddr+u"抓取失败")
return "error"
'''
Tools.writelog('debug', u"开始获取点击量,url="+ajaxAddr)
while True:
try:
fp = urllib2.urlopen(ajaxAddr)
break
except urllib2.HTTPError,e: #状态错误
Tools.writelog('error','HTTP状态错误 code='+"%d"%e.code)
return ""
except urllib2.URLError,e: #网络错误超时
Tools.writelog('warn','页面访问超时,重试..')
retry+=1
if( retry > self.MAX_RETRY ):
Tools.writelog('warn','超过最大重试次数,放弃')
return ""
content = fp.read()
fp.close()
views = Tools.getFromPatten(u'"viewcount":(.*?),',content,True)
views = views.replace('"','')
return views
#从网页内容中爬取点击量
def gripViewsFromData(self,data):
views = Tools.getFromPatten(u'已经有<.*?>(.*?)<.*?>次观看',data,True)
return views
def gripBaseDir(self,data):
dir = Tools.getFromPatten(u"base_dir = '(.*?)'",data,True)
return dir
#抓取数据
def gripinfo(self,url):
try:
data = self.pageGripper.getPageInfo(url)
except:
Tools.writelog ("error", url+" 抓取失败")
raise
Tools.writelog('info','开始内容匹配')
rst = {}
rst['title'] = self.griptitle(data)
rst['channel'] = self.gripchannel(data)
rst['tag'] = self.griptag(data)
rst['release'] = self.griptime(data)
rst['user'] = self.gripuser(data)
ccdata = self.getCCData(data)
rst['ccsiteId'] = ccdata[0]
rst['ccVid'] = ccdata[1]
views = self.gripViewsFromData(data)
if views =="" or not views:
vid = self.gripVideoId(data)
basedir = self.gripBaseDir(data)
views = self.gripViewsAjax(vid,url,basedir)
if( views == "" ):
views = "error"
if( views == "error"):
Tools.writelog("error","获取观看次数失败")
Tools.writelog("debug","点击量:"+views)
rst['views'] = views
Tools.writelog('debug','title=%s,channel=%s,tag=%s'%(rst['title'],rst['channel'],rst['tag']))
return rst
'''''
单元测试
'''
if __name__ == '__main__':
list = [
'http://008yx.com/xbsp/index.php/video/index/3138',
'http://vblog.xwhb.com/index.php/video/index/4067',
'http://demo.ccvms.bokecc.com/index.php/video/index/3968',
'http://vlog.cnhubei.com/wuhan/20100912_56145.html',
'http://vlog.cnhubei.com/html/js/30271.html',
'http://www.ddvtv.com/index.php/video/index/15',
'http://boke.2500sz.com/index.php/video/index/60605',
'http://video.zgkqw.com/index.php/video/index/334',
'http://yule.hitmv.com/html/joke/27041.html',
'http://www.ddvtv.com/index.php/video/index/11',
'http://www.zgnyyy.com/index.php/video/index/700',
'http://www.kdianshi.com/index.php/video/index/5330',
'http://www.aoyatv.com/index.php/video/index/127',
'http://v.ourracing.com/html/channel2/64.html',
'http://v.zheye.net/index.php/video/index/93',
'http://vblog.thmz.com/index.php/video/index/7616',
'http://kdianshi.com/index.php/video/index/5330',
'http://tv.seeyoueveryday.com/index.php/video/index/95146',
'http://sp.zgyangzhi.com/html/ji/2.html',
'http://www.xjapan.cc/index.php/video/index/146',
'http://www.jojy.cn/vod/index.php/video/index/399',
'http://v.cyzone.cn/index.php/video/index/99',
]
list1 = ['http://192.168.25.7:8079/vinfoant/versionasdfdf']
infoGripper = InfoGripper()
for url in list:
infoGripper.gripinfo(url)
del infoGripper
WEB服务及任务调度
[python]
view plaincopy
'''''
Created on 2010-9-15
@author: chenggong
'''
# -*- coding: utf-8 -*-
import string,cgi,time
from os import curdir,sep
from BaseHTTPServer import BaseHTTPRequestHandler,HTTPServer
from InfoGripper import *
import re
import MySQLdb
import time
import threading
import urllib
import urllib2
PORT = 8079
VERSION = 0.1
DBCHARSET = "utf8"
PARAMS = [
'callback',
'sessionId',
'retry',
'retryInterval',
'dbhost',
'dbport',
'db',
'dbuser',
'dbpass',
'videoId'
]
DBMAP = ['video_id',
'ccsiteid',
'ccvid',
'desc_url',
'site_id',
'title',
'post_time',
'author',
'elapse',
'channel',
'tags',
'create_time',
'check_time',
'status']
'''''
ERROR CODE定义
'''
ERR_OK = 0
ERR_PARAM = 1
ERR_HTTP_TIMEOUT = 5
ERR_HTTP_STATUS = 6
ERR_DB_CONNECT_FAIL = 8
ERR_DB_SQL_FAIL = 9
ERR_GRIPVIEW = 11
ERR_UNKNOW = 12
'''''
数据库适配器
'''
class DBAdapter(object):
def __init__(self):
self.param = {'ip':'',
'port':0,
'user':'',
'pw':'',
'db':''}
self.connect_once = False #是否连接过数据库
'''''
创建/更新数据库连接池
'''
def connect(self,ip,port,user,pw,db):
if( ip != self.param['ip'] or
port != self.param['port'] or
user != self.param['user'] or
pw != self.param['pw'] or
db != self.param['db']):
Tools.writelog('info','更换数据库连接池,ip='+ip+',port='+port+',user='+user+',pw='+pw+',db='+db)
try:
if self.connect_once == True: #释放上次连接
self.cur.close()
self.conn.close()
self.conn=MySQLdb.connect(user=user,passwd=pw,db=db,host=ip,port=int(port))
self.conn.set_character_set(DBCHARSET)
self.connect_once = True
self.cur=self.conn.cursor(MySQLdb.cursors.Cursor)
self.param['ip'] = ip
self.param['port'] = port
self.param['user'] = user
self.param['pw'] = pw
self.param['db'] = db
except:
Tools.writelog('error',u'数据库连接失败',True)
raise
else:
Tools.writelog('info',u'数据库连接成功')
'''''
执行SQL语句
'''
def execute(self,sql):
Tools.writelog('debug',u'执行SQL: '+sql)
try:
self.cur.execute(sql)
except:
Tools.writelog('error',u'SQL执行错误:'+sql)
raise
'''''
查询数据库
'''
def query(self,sql):
row = {}
self.execute(sql)
row=self.cur.fetchall()
return row
'''''
视频错误
'''
def updateErr(self,videoId):
nowtime = time.strftime('%Y-%m-%d-%H-%M-%S',time.localtime(time.time()))
sql = "UPDATE videos SET "
sql += "check_time='" + nowtime +"',"
sql += "status=-1 "
sql += "WHERE video_id="+videoId
self.execute(sql)
self.conn.commit()
'''''
更新查询结果
'''
def update(self,obj,videoId,isUpdateTitle=True):
Tools.writelog('debug','开始更新数据库')
try:
#更新video表
sql = "UPDATE videos SET "
if(obj['ccsiteId'] !="" ):
sql += "ccsiteid='" + obj['ccsiteId'] + "',"
if(obj['ccVid'] != "" ):
sql += "ccvid='" + obj['ccVid'] + "',"
if isUpdateTitle:
sql += "title='" + obj['title'] + "',"
sql += "post_time='" + obj['release'] + "',"
sql += "author='" + obj['user'] + "',"
sql += "channel='" + obj['channel'] + "',"
sql += "tags='" + obj['tag'] + "',"
nowtime = time.strftime('%Y-%m-%d-%H-%M-%S',time.localtime(time.time()))
sql += "check_time='" + nowtime +"',"
sql += "status=0 "
sql += "WHERE video_id="+videoId
self.execute(sql)
#更新count表
if( obj['views'] != 'error' ):
nowdate = time.strftime('%Y-%m-%d',time.localtime(time.time()))
sql = "SELECT * FROM counts WHERE "
sql += "date = '" + nowdate + "' and video_id=" + videoId
rst = self.query(sql)
if len(rst) > 0:#如果当天已有记录,则更新
sql = "UPDATE counts SET count="+obj['views']
sql +=" WHERE video_id=" + videoId + " AND date='" + nowdate+ "'"
else:#否则插入
sql = "INSERT INTO counts VALUES"
sql += "(null," +videoId+",'"+nowdate+"',"+obj['views'] + ")"
self.execute(sql)