Curl多线程

这个库是目前所有curl相关库中最强的,没有之一。

官网:
http://curlmulti.com
https://github.com/ares333/php-curlmulti

QQ群:215348766
1458616858207

工作中经常需要采集一些内容,所以深入研究了一下PHP CURL多线程,不得不说这个扩展很变态。经过几十次的修改终于写出一个完美的CURL类,有多厉害用“终极”二字形容也不为过。PHP文档中资料甚少,到网上也看到一些类,感觉实在太烂,有些类资源占用很高,有些稍微好点的效率真不敢恭维,说白了就是个原始的多线程而已,根本无法充分利用CPU和带宽。

本类的特点:
运行绝对稳定。
设置一个并发就会始终以这个并发数进行工作,即使通过回调函数添加任务也不影响。
CPU占用极低,绝大部分CPU消耗在用户的回调函数上。
内存利用率高,任务数量较多(15W个任务占用内存会超过256M)可以使用回调函数添加任务,个数自定。
能够最大限度的占用带宽。
链式任务,比如一个任务需要从多个不同的地址采集数据,可以通过回调一气呵成。
能够对CURL错误进行多次尝试,次数自定(大并发一开始容易产生CURL错误,网络状况或对方服务器稳定性也有可能产生CURL错误)。
回调函数相当灵活,可以多种类型任务同时进行(比如下载文件,抓取网页,分析404可以在一个PHP进程中同时进行)。
可以非常容易的定制任务类型,比如检查404,获取redirect的最后url等。
可以设置缓存,挑战产品节操。

不足:
不能充分利用多核CPU(可以开多个进程解决,需要自己处理任务分割等逻辑),可以用pthreads解决!
最大并发500(或512?),经过测试是CURL 内部限制,超过最大并发会导致总是返回失败。
目前没有断点续传功能。
目前任务是原子性的,不能对一个大文件分为几部分分别开线程下载。

我这里有34W张图片需要下载,先下载13W截图如下


每秒钟下载300张左右图片,每张图片300K左右,下载速度约100MB/s(1G的网卡用的差不多了),速度和下载文件总数有非常小的偏差,这个数值是根据CURL的传输字节数计算而来。速度超过100MB/s的时候SSH基本连接不上了。。。
iostat

主要瓶颈在IO上,否则速度会更快。

采集http://www.lyricsmode.com/的歌词速度也很快,一共60多万歌词页,平均每秒钟能稳定采集1200页(边采集边分析边入库,否则3000+页每秒都不稀奇),处理采集的HTML的速度根本追不上采集的速度,瓶颈应该是对方服务器速度,那个没有截图,有兴趣的可以试一下。

================================================================
完整文档如下
================================================================

尽可能展示类的特性。此demo环境是本机,所以性能不能达到最高,只做为使用说明,为了高性能请到服务器调试。选择http://www.1ting.com/作为目标站点,这个站比较典型,而且国内速度也很快。

本文用到两个类(这两个类在demo包中已经包含):
CUrl:多线程类。
PHPQuery:HTML分析,项目地址 http://code.google.com/p/phpquery/。如果google被封这里有较新版本 phpQuery-0.9.5.386.zip。这里用的压缩包中的 phpQuery/phpQuery.php这个单文件类,如果你不会用这个类没关系,用正则分析也可以。

必要的文件包含和初始化实例代码中都省略了。
代码非常完整,可以直接运行,demo请在命令行模式运行(你懂的)。
代码非常简洁。。。
PHPQuery需要php-dom扩展,PHP环境没有的话需要安装。
数据库操作使用PDO,所以也需要PDO扩展。
既然是demo,那么程序上我就一切从简了。
phpQuery很NB,很多变态的用法自己去研究吧,基本可以说无所不能。如果使用phpQuery必须在回调函数中手动调用phpQuery::unloadDocuments();释放,否则phpQuery处理的文档全部都在内存中!!!
并发情况可以用360的连接查看器查看。

CUrl一些必要内容:

  1. CUrl类单线程和多线程都可以使用缓存,并且缓存机制一样。
  2. CUrl类抓取HTML单线程和多线程返回的内容结构一致
    $result=array('info'=>array(),'content'=>'');
    $result['info']是curl_info()的内容,$result['content']是抓取的html文件。
  3. curl配置分为三个级别,优先级由低到高如下,优先级高的会覆盖优先级低的配置。
    默认:如私有方法init()中所示。
    类级别:保存在 $opt这个公有属性中,此CURL对象的所有操作中都会起作用。
    任务级别:多线程任务中添加任务时指定,只在当前任务中起作用。
  4. 类中用到回调函数的地方都使用call_user_func_array(),所以必须先从PHP手册中把这个函数搞明白了。

demo需要的数据表

CREATE TABLE `album` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `artist_id` int(11) NOT NULL,
  `name` varchar(255) NOT NULL,
  `url` varchar(255) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=MyISAM AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

CREATE TABLE `artist` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) NOT NULL,
  `pic` varchar(255) NOT NULL,
  `url` varchar(255) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=MyISAM AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

CREATE TABLE `songlist` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `artist_id` int(11) NOT NULL,
  `name` varchar(255) NOT NULL,
  `album_url` varchar(255) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=MyISAM AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

压缩包中有程序运行的所有必要文件,导入curl.sql到test数据库,修改init.php中的配置就可以直接运行了!!
命令行:
php demo1.php
php demo2.php
php demo3.php
...
然后数据表中就有数据了。。。
demo中都没有phpQuery::unloadDocuments()操作,请自行加上。
windows命令行中文可能乱码导致非常奇怪的错误(尤其是demo5),windows命令行默认是GBK编码,可以php中用iconv转码,或修改命令行默认编码(没测试),所有代码在linux下测试完全正确。

完整Demo从此正式开始...

1.单线程用法

CUrl类有两个单线程的方法read(),download()。

$url='http://www.1ting.com/group/group0_2.html';
$result=$curl->read($url);
$html=phpQuery::newDocumentHTML($result['content']);
$li=$html['ul.allSinger li a'];
$st=$db->prepare("insert into artist(name,url) values(?,?)");
foreach($li as $v){
    $v=pq($v);
    $st->execute(array(trim($v->text()),trim($v->attr('href'))));
}

用单线程方法抓取女歌手列表页,用PHPQuery进行分析,取出歌手名子和详情页地址并存入artist表,本次示例抓取了2417个歌手。

2.简单多线程抓取

先介绍一些必要内容,下面展示多线程其他特性的时候就不在描述这些必要内容。

add($url,$p,$f)方法:
添加一个任务到队列

  1. $url是一个数组$url[0]保存目标url,如果$url[1]未设置表示是一个抓取html的任务,如果$url[1]设置了表示是一个下载任务,$url[1]必须是下载文件的本地绝对路径,$url[2]是任务级curl设置。
  2. $p是任务完成后的回调函数 ,$p[0]是需要调用的回调函数,$p[1]是需要传递的参数。抓取和下载任务的回调函数第一个参数总是一个数组,保存有抓取的结果,从第二个参数开始才是$p[1]中设置的参数,抓取任务和下载任务第一个参数的唯一区别就是抓取任务多一个content内容。
  3. $f是错误处理的回调函数,$f[0]是回调函数,$f[1]是需要传递的参数。某个任务由于curl错误并且尝试了n次(公有属性$maxTry设置最大尝试次数,默认3)失败就会调用这个函数,回调函数的第一个参数是错误信息,从第二个参数开始才是$f[1]的内容。 如果没有设置错误回调,错误信息会输出到标准输出。

go()方法:
前序操作都做完了开始正式跑。

status($debug=false)方法:
这个函数应该在回调函数中调用用来显示当前时刻的curl状况,如果$debug为true会显示详细的运行信息。注意:这个函数本身会输出信息,所以前面不要加echo,函数本身已经进行了格式化,所以函数后边不要输出"n" 。

26.69%:当前任务的进度,如果是链式任务这个数字可能会降低,因为链式任务会动态加入大量任务。
645/2417(0):以数量方式显示进度,括号中是缓存命中的数量。
30/s:每秒钟完成的任务数,主要由目标服务器性能,网络性能,本机性能,并发数决定。
550.37KB/s:抓取速度。
11.29MB:已经抓取的数据量。
ETA 58s:在当前速度下任务完成剩余时间。

现在我要抓取那2000多个歌手的专辑列表,用add()方法一次性把这些任务添加到任务队列,go()方法启动,curl首先创建n个并发(并发数在$limit公有属性中设置,默认30),有任务完成就会调用回调函数处理,处理完毕之后丢弃html并从任务队列中拿出一个任务添加到并发进程中,当所有并发任务完成并且任务队列为空的时候curl结束运行。

$baseUrl='http://www.1ting.com';
$artistList=$db->query("select id,url from artist")->fetchAll();
foreach($artistList as $v){
    $url=array($baseUrl.$v['url']);
    $callback=array('demo2_cb1',array($v['id']));
    $curl->add($url,$callback);
}
$curl->go();

//处理歌手详情页的回调函数
function demo2_cb1($r,$id){
    global $db,$curl;
    if($r['info']['http_code']==200){
        $html=phpQuery::newDocumentHTML($r['content']);
        $list=$html['div.albumList ul li a.albumLink'];
        if(!empty($list)){
            $st=$db->prepare('insert into album(artist_id,name,url) values(?,?,?)');
            foreach($list as $v){
                $v=pq($v);
                $st->execute(array($id,trim($v->find('span.albumName')->text()),trim($v->attr('href'))));
            } 
        }
    }
    $curl->status();
}

从之前抓取结果中取所有歌手地址添加到任务队列,go()开始运行。回调函数负责处理抓取结果并入库。

3.链式抓取

假如现在我只需要歌曲列表,不需要其他信息。常规做法是从歌手列表抓取专辑列表并保存到数据库,然后从专辑列表抓取歌曲列表,现在专辑列表入库的操作可以省略了。

$baseUrl='http://www.1ting.com';
$artistList=$db->query("select id,url from artist")->fetchAll();
foreach($artistList as $v){
	$url=array($baseUrl.$v['url']);
	$callback=array('demo3_cb1',array($v['id']));
	$curl->add($url,$callback);
}
$curl->go();

//处理歌手详情页的回调函数
function demo3_cb1($r,$id){
	global $db,$curl,$baseUrl;
	if($r['info']['http_code']==200){
		$html=phpQuery::newDocumentHTML($r['content']);
		$list=$html['div.albumList ul li a.albumLink'];
		if(!empty($list)){
			foreach($list as $v){
				$v=pq($v);
				$url=array($baseUrl.trim($v->attr('href')));
				//继续传递歌手id
				$callback=array('demo3_cb2',array($id,$url[0]));
				$curl->add($url,$callback);
			} 
		}
	}
	$curl->status(0);
}

//处理专辑详情页的回调函数
function demo3_cb2($r,$id,$url){
	global $db,$curl;
	if($r['info']['http_code']==200){
		$html=phpQuery::newDocumentHTML($r['content']);
		$list=$html['#song-list tr td.songTitle a.songName'];
		if(!empty($list)){
			$st=$db->prepare('insert into songlist(artist_id,name,album_url) values(?,?,?)');
			foreach($list as $v){
				$v=pq($v);
				$st->execute(array($id,trim($v->text()),$url));
			}  
		}
	}
}

回调函数可以无限级连接,也可以根据不同情况添加不同的回调,各种神奇用法自己研究吧。

4.超大量任务处理

假设2中抓取专辑列表的任务数量非常大,不能一次性都添加进去,这时候需要通过回调函数动态添加任务。

$baseUrl='http://www.1ting.com';
$curl->task='demo4_addTask';
$curl->go();

//取还没有添加的任务
function demo4_addTask(){
	global $baseUrl,$db,$curl;
	static $lastId=0;
	$limit=100;
	$list=$db->query("select id,url from artist where id>$lastId order by id limit $limit")->fetchAll();
	foreach($list as $v){
		$url=array($baseUrl.$v['url']);
		$callback=array('demo4_cb1',array($v['id']));
		$curl->add($url,$callback);
	}
	$lastId=$v['id'];
}

//处理歌手详情页的回调函数
function demo4_cb1($r,$id){
	global $db,$curl;
	if($r['info']['http_code']==200){
		$html=phpQuery::newDocumentHTML($r['content']);
		$list=$html['div.albumList ul li a.albumLink'];
		if(!empty($list)){
			$st=$db->prepare('insert into album(artist_id,name,url) values(?,?,?)');
			foreach($list as $v){
				$v=pq($v);
				$st->execute(array($id,trim($v->find('span.albumName')->text()),trim($v->attr('href'))));
			} 
		}
	}
	$curl->status();
}

原理比较简单,如果任务队列为空curl就会尝试调用task属性指定的回调函数,具体任务怎么添加由自己控制。

任务总数500变成动态增长的。

5.多线程下载

现在我要多线程下载所有歌手的图片,图片是歌手详情页左上角那一张。

$dir=dirname(__FILE__).'/pic';
$baseUrl='http://www.1ting.com';
$artistList=$db->query("select id,name,url from artist")->fetchAll();
foreach($artistList as $v){
	$url=array($baseUrl.$v['url']);
	$callback=array('demo5_cb1',array($v['id'],$v['name']));
	$curl->add($url,$callback);
}
$curl->go();

//处理歌手详情页的回调函数
function demo5_cb1($r,$id,$name){
	global $db,$curl,$dir;
	if($r['info']['http_code']==200){
		$html=phpQuery::newDocumentHTML($r['content']);
		$list=$html['dl.singerInfo dt img'];
		if(!empty($list)){
			foreach($list as $v){
				$v=pq($v);
				$picUrl=$v->attr('src');
				$ext=pathinfo($picUrl,PATHINFO_EXTENSION);
				if(!empty($name) && !empty($ext)){
					$filename=$name.'.'.$ext;
					$file=$dir.'/'.$filename;
					$url=array($picUrl,$file);
					$callback=array('demo5_cb2',array($id,$filename));
					$curl->add($url,$callback);
				}
			}
		}
	}
	//$curl->status();
}

//图片下载完成回调函数
function demo5_cb2($r,$id,$filename){
	global $db;
	if($r['info']['http_code']==200){
		if($db->exec("update artist set pic='$filename' where id=$id")){
			echo $r['info']['url']."n";		
		}
	}	
}

这下载速度嗷嗷快啊,机器NB你也可以达到100MB/s的神速(不是100Mbit/s)。
注意:下载顺序不是按任务顺序来的,是由每个线程的速度决定的,所以数据表中查看的时候需要给pic字段排下序。

6.自定义任务

除了抓取html,下载各种类型的文件,还可以自定义任何类型的任务。
比如检查链接404,设置代理,检查链接的301次数等等。
现在我要检查歌手页的404情况。因为任务类型统一,使用类级别配置就可以。
任务级别的配置自己研究吧。

$baseUrl='http://www.1ting.com';
$artistList=$db->query("select id,url from artist")->fetchAll();
$curl->opt=array(CURLOPT_NOBODY=>true);
foreach($artistList as $v){
    $url=array($baseUrl.$v['url']);
    $callback=array('demo6_cb1',array($url[0]));
    $curl->add($url,$callback);
}
$curl->go();

//处理歌手详情页的回调函数
function demo6_cb1($r,$url){
    echo $r['info']['http_code']."t".$url."n";
}

没有全部检查,应该没有404页面。。。

7.使用缓存

有时候会遇到这样一种情况,由于网速或使用了代理等原因抓取的速度非常慢,而且由于抓取规则修改调整需要多次抓取,这时候使用缓存可以极大的提高效率。

缓存通过公有属性$cache控制,默认是这个样子
$cache=array('on'=>false,'dir'=>null,'expire'=>86400);

类自动建立hash之后的子目录,子目录数上限16^3,如果有必要$cache['dir']自己再套一层hash目录即可。

还是以2中抓取专辑列表为例。

$baseUrl='http://www.1ting.com';
$artistList=$db->query("select id,url from artist")->fetchAll();
$curl->cache['on']=true;
$curl->cache['dir']=dirname(__FILE__).'/cache';
foreach($artistList as $v){
    $url=array($baseUrl.$v['url']);
    $callback=array('demo7_cb1',array($v['id']));
    $curl->add($url,$callback);
}
$curl->go();

//处理歌手详情页的回调函数
function demo7_cb1($r,$id){
    global $db,$curl;
    if($r['info']['http_code']==200){
        $html=phpQuery::newDocumentHTML($r['content']);
        $list=$html['div.albumList ul li a.albumLink'];
        if(!empty($list)){
            $st=$db->prepare('insert into album(artist_id,name,url) values(?,?,?)');
            foreach($list as $v){
                $v=pq($v);
                $st->execute(array($id,trim($v->find('span.albumName')->text()),trim($v->attr('href'))));
            } 
        }
    }
    $curl->status();
}

第一次抓去了188个,第二次抓取这188个就直接读缓存了。

8.多线程XMLRPC

xmlrpc一般使用一些封装好的类进行操作,这样执行起来比较方便,但是只能单线程。本类可以非常简单的实现xmlrpc多线程调用,经过测试,大并发下比单线程速度提高2个数量级此代码不在demo包中,因为需要apikey和新的数据表,所以只演示原理,用法和结果。

last.fm有一个更正歌手名字的服务,文档看这里 http://www.last.fm/api/show/artist.getCorrection

private $xmlRpcServer = 'http://ws.audioscrobbler.com/2.0/';

/**
 * 利用last的api更正歌手的名字
 */
function artistNameCorrect() {
	$curl = $this->curl;
	$header = array();
	$header[] = "Content-type: text/xml";
	$curl->opt[CURLOPT_HTTPHEADER] = $header;
	$opt = array(CURLOPT_POSTFIELDS => $this->getXmlRpc('artist.getCorrection', array('artist' => 'Capone -N- Noreaga')));
	$curl->add(array($this->xmlRpcServer, null, $opt), array(array($this, 'artistNameCorrectCb1')));
	$curl->go();
}

function artistNameCorrectCb1($r) {
	header("Content-Type: text/xml");
	$r = $r['content'];
	$r = new SimpleXMLElement($r);
	$r = $r->params->param->value->string;
	$r = stripslashes($r);
	$r = new SimpleXMLElement($r);
	echo $r->asXML();
}

//获取请求xml
private function getXmlRpc($method, $param) {
	$member = '';
	foreach ($param as $k => $v) {
		$member.="
		<member>
			<name>$k</name>
			<value>
				<string>$v</string>
			</value>
		</member>";
	}
	$xml = "
	<methodCall>
		<methodName>$method</methodName>
		<params>
			<param>
			<value>
				<struct>
					$member
					<member>
						<name>api_key</name>
						<value>
							<string>{$this->apiKey}</string>
						</value>
					</member>
				</struct>
			</value>
			</param>
		</params>
	</methodCall>";
	return $xml;
}

有了前7步的基础,代码很容易理解,其中细节自己琢磨。回调函数输出如下结果

2013-02-06_144032

因为不断升级,API可能有变化,具体变化查看public方法的注释即可,教程和demo就不再更新。
旧版:CUrl
这是一个框架类命名空间什么的个别地方需要自己调整。

完整demo下载(curl-demo.zip)

内部代码经过n次修改变化很大,目标网站基本已经改版导致demo代码不一定可用,旧版本就不在维护。

已经独立为一个项目,详细信息访问 https://github.com/ares333/CurlMulti(http://curlmulti.com)

Curl多线程》上有162条评论

  1. 你好,我用的是DEMO4,程序结构和演示的是一样的,只是改成我的数据
    在函数demo4_cb1 的$curl->status();这个语句后加了一条
    phpQuery::unloadDocuments();

    但是程序运行到 78万条时
    不动了
    提示 :Allowed memory size of 8388608 bytes exhausted (tried to allocate 1298358 bytes)…… 错误 PHPQUERY LINE 56

    请问是怎么回事呢?谢谢

  2. 请问为什么每次一到2414个任务的时候就提示 ETA 1sKilled,一共有6W个任务的。
    96.98% 2341/2414(0) 167/s 1.97MB/s 27.55MB ETA 1sKilled
    弄了好久一直都是这样,请问这是什么原因呢?

    • 打乱是有优点,但是根据场景吧,我是curl远程获取账户数据,每次顺序都不同,我想直接生成个HTML表格页面,总不能每次都顺序不一样,建议加个属性,比如什么1是随机,2是按顺序什么的,我自己本地改造过了

      • 不需要改造。
        输入一批url,输出对应的结果,哪个url先完成就先处理哪个,按循序的话你直接改成单线程即可,或者手动处理返回的那一批结果。
        库本身是面向对象的组件化编程产物,不会针对某类需求做特殊处理。

  3. centos lnmp环境。
    网页直接访问demo1.php或我修改的php均出现以下错误:
    Warning: curl_setopt_array() [function.curl-setopt-array]: CURLOPT_FOLLOWLOCATION cannot be activated when safe_mode is enabled or an open_basedir is set in /home/wwwroot/vip/caiji/CURL.php on line 422

    Fatal error: Call to undefined function debug_print() in /home/wwwroot/vip/caiji/CURL.php on line 178

    命令行访问只出现这个错误:
    Fatal error: Call to undefined function debug_print() in /home/wwwroot/vip/caiji/CURL.php on line 178

    亲,什么原因呢?

    • 亲,找了一整天,发现貌似是open_basedir的问题。
      把open_basedir关了,还是无法运行demo1.php或我写的采集程序。

      还是有这个错误提示:
      Fatal error: Call to undefined function debug_print() in /home/wwwroot/vip/caiji/CURL.php on line 178

      windows能正常运行啊,这个错误提示在网上也没有找到相关说明。
      哪里错误了呢

      • CURL.php178行是function read($url) 功能的代码。
        小白猜测是不是读取网页都有问题了?
        果然demo1.php只保留curl读取,还出现了这个错误提示。
        就算目标网页很简单,还是无法读取到内容。

        如果是读取baidu,就很正常,有点奇怪。

        亲,是不是centos 上面的curl模块需要升级了?

        • debug_print()是自定义函数替换为user_error()即可。
          我可以肯定类的稳定性,采集了n亿的页面也没出问题。
          类升级了n次,接口等变化很大,具体什么问题自己修改类代码调试。

    • 为所有任务设置header
      $curl->opt[CURLOPT_HTTPHEADER]=’xxx’;
      单独为某一个任务设置header
      $curl->add(array(‘url’=>’xxx’,’opt’=>array(CURLOPT_HTTPHEADER=>’xxx’)));

      本类功能和性能已经完美。

    • 你说的主机应该是运行curl的机器,你说的多IP应该是多网卡多IP
      curl本质就是一个浏览器,主机多网卡访问外网只能有一个起作用,这是由路由规则决定的,浏览器无法选择网卡,所以curl也不行。

      可能的方案:
      如果主机是windows环境可以使用route命令修改路由,这个我亲身实践过,如果要实现ip轮转只能用php调用route命令动态修改路由实现,估计轮询性能好不到哪去。

      如果是linux主机,不知道有没有类似windows的route命令来修改路由,但是可以通过iptables的NAT模块实现,这个亲身实践过,不过这个够你喝一壶的,具体知识可以参考这里http://blog.phpdr.net/iptables%E5%88%9D%E6%8E%A2.html

      通过切换网关还不如把其他IP做成代理更简便快捷,理论上是可以实现的,privoxy或者php自己实现都可以,windows和linux都可以实现,具体细节不讨论,比较麻烦。

  4. windows命令行中文可能乱码导致非常奇怪的错误(尤其是demo5),所有代码在linux下测试完全正确。
    这个转一下码就可以了,
    $filename=iconv(‘utf-8′,’gbk’,$name).’.’.$ext;
    demo5里面我这样就可以保存成中文的名字了。还在研究博主的类,还没看懂,不过试了下pthreads,确实不错,博主试一下,会爱上它的,博主的示例代码 没有unloadducoment,导致内存可以占到2、3g去,而且那时运行速度非常慢。
    不过还是非常感谢强大的博主,提供这么好的东东。

  5. 我是用你的多线程的curl采集类,发现有个问题,就是我采用了链式采集,思路是这样:

    主要是担心第一个回调函数使用了sleep来防止被封IP地址,假如我其中一个子进程正在执行插入数据库,但是突然某个进程调用回调函数的时候刚刚判断到要进行sleep(5),这插入数据库的进程是中断已经正在插入的来休息1秒钟

    或者是某个子进程还在下载的时候,还没下载完,突然间另外一个子进程遇到了sleep(5);程序会怎么处理那个正在加载的子进程,休息了一秒钟之后,之前那个下载进程是重新开始下载还是,继续下载还未下载完的文件,假如被采集的网站不支持续传,会怎么样呢?

    求大神指导一下!调试了一个星期了,终于弄明白您的多线程类大致原理,但是我看了您的博客,没有提及这个sleep(5)对其他进程影响。

    • 运行原理你搞错了!
      PHP没有多进程,PHP只能运行一个进程,php有个扩展pthreads可以实现多线程,没有测试,应该能利用多核CPU。
      curl扩展自己内部维护一个多线程操作,但是和PHP没有关系,PHP始终是以顺序结构运行程序!
      链式采集原理:
      curl类做的工作很单纯,记录任务(每个任务包含url,回调函数,curl配置等相关信息),并发数不够就从任务缓冲区取任务开始执行,只要有任务完成就调用对应的回调函数,回调函数是阻塞的!回调完成之后curl类继续轮询运行中的任务(轮询容易理解,实际上也是一个阻塞操作,不会进行空轮询,这是本类非常重要的一个特性,保证了cpu占用基本为零),有完成的任务继续上述操作,直到所有任务完成,第一个curl->go()才会结束。
      任务缓冲池是一个堆栈数据结构,后添加的任务先得到执行,保证链子中总是最后面的节点先执行。
      curl类本质还是顺序执行,有点类似while循环。

      结论:回调函数是阻塞的,如果sleep(5)会导致所有PHP代码都sleep,但是php的curl扩展可能还在后台做一些事情。本类用的多线程是curl扩展的多线程,php本身还是顺序执行。php的多线程目前只有pthreads扩展可以实现。

  6. 请教下楼主,我在测试addtask时发现总会有部分任务跑不完。最后的报错为Fatal error: Call to a member function fetchAll() on a non-object in XX.PHP on line XX。
    我观察了程序运行的过程,首先会读取所有的数据出来放入taskpool当中,但是当运行了一段时间后taskpool中的任务为29也就是小于30的时候,程序会再调用addtask函数并添加到taskpool中。这个过程是OK的,但是问题出在最后的一个队列上,也就是当taskpool中的任务为29时,程序会去获取调用addtask函数,并且由于SQL返回的结果为空导致报错退出,所以最终active并且在running的29个任务+taskpool中的29个任务就始终都不会被执行到了。
    请问该如何处理这种问题呢?尽管对于全局来说少采几十个无所谓,但看到error始终不舒服。。。

  7. tor的速度是个大问题。
    尝试多端口同时去采集同一网址,一旦某个线程完成抓取就直接返回结果,其他线程直接舍弃。想以此选择N端口中最快的一条线路进行抓取。
    结果发现这种情况,速度反而变得更慢了。
    后直接不用tor,对比并发采集10条相同url跟单线程采集。发现总时间上,并发占了点优势。但是响应时间上完全没优势。
    瓶颈?

    • 没有任何瓶颈。
      tor如果不慢就不正常了,请先了解tor的原理。
      并发占了点优势跟多线程和单线程没关系,和网络稳定性、网速、对方服务器速度、缓存等一系列因素有关系。

  8. 楼主,我问个问题,假如添加了200个任务,$curl->go();之后,是把这200个一波同时发出去。
    假定每个线程平均需要5秒执行完。在第3秒时,有些线程快早1秒已经执行完了,这里运行中的线程假设只有100个了(因为另100个已经执行完)。这时$curl不会自动添加新的任务时来,需必须等这200个全部执行完才添加下一批。是这样吗?

    如果是这样,假如我有10万个任务需要执行,每波200个,过程中的流量是不平滑的,带宽不能充分利用,有什么办法能更平滑的执行?

    • 3.curl配置分为三个级别,优先级由低到高如下,优先级高的会覆盖优先级低的配置。
      默认:如私有方法init()中所示。
      类级别:保存在 $opt这个公有属性中,此CURL对象的所有操作中都会起作用。
      任务级别:多线程任务中添加任务时指定,只在当前任务中起作用。

      $curl->add(array(‘url’=>’http://xxx.com’,’opt’=>array(CURLOPT_PROXYPORT=>’127.0.0.1:8118′)));

  9. 类中的download方法有一个错误,fclose($fp),之前设置参数时没有引用此资源,是直接打开的,所以关闭的时候没有引用的符号,用$fp是错的,要在前面加上一句$fp=fopen($file,’w’);然后把参数中的换成$fp

  10. 博主,您好,我发现大家很少用task,task_param这种方式的多任务调用方法,我觉得大家应该多用,因为这种方法是等添加的任务快执行完,再添加,占资源比较少。

    但是,楼主的addTask函数中存在笔误:
    if(!emtpy($task)) 应该改为 if(!emtpy($this->task))

    call_user_func_array($this->task,$this->$task_param);
    应该改为
    call_user_func_array($this->task,$this->task_param);

      • 楼主,你好,这里我有笔误,应该是empty。
        但是我在使用时,发现一个问题,就是当我使用task和taskparam时,在一个查询数据库的回调函数中,多个线程都会去调用这个回调函数,当没有更多记录时,所有的线程都执行同样的查询,而且设置$this->task=null来避免进一步调用回调函数时,容易出问题,导致程序退出。

  11. 以前没有涉及过多级分层资源的采集,现在才发现博主的采集类,是采集多级分层资源的利器。好象火车头不能采集多级分层资源吧,比如商品信息,有多级商品分类。我想即使火车头能用来采集,效率也是个大问题。如果商品分类数量和商品数量有限,可能不是大问题;当商品分类有n个级别而且数量巨大的时候,不能想像用火车头采集会是什么情况。看来火车头是专为采集文章定做的。

  12.   博主的程序构思精巧,我正在学习。但我发现不同类型的网站,回调函数需要重新编写。我准备搞一个通用的,用规则代替回调函数,这一点比较像火车头。呵呵。

      博主的程序很有学习价值,比如将采集分解为任务的方式进行,这让每个任务获得自由,以便打乱顺序充分发挥多线程优势,并且通过使用回调函数使处理智能化自动化。火车头则比较死板,必须分布进行,每一步之间是手动而非自动的。

      有关任务和任务的调度,以前没有涉及过,跟博主学到不少东西。

    • 启动多个tor实例已经搞定了,现在的问题就是tor貌似很消耗资源的样子,我2G内存的vps开了80个,内存就跑了80%,负载直接飙到10,而且这些实例貌似不能持续运行,大概5分钟之后就神奇的消失了,只剩3个实例,不知道博主有木有遇到这样的情况

      • tor的源码包中包含有一些控制脚本,就是带start,stop,restart参数的那种脚本,用哪个脚本启动tor客户端,没遇到过那个问题。
        tor有两种运行模式,客户端和服务端,负载大估计你开了没必要的东西,tor的技术性问题我也不太懂,你只能去官网看文档。

  13. 楼主,您好,如果我在curl类运行过程中,突然中断php进程,会有什么后果?
    我测试发现,会有内存泄露,我不知道,这种异常中断情况下,curl中的那个taskpool中的添加的大量任务是否会释放内存?我的qq是48019671,还有问题要请教楼主,项目遇到内存问题,急呀?

    • 突然中断php进程不会有任何后果,php程序结束而已。
      类本身没有内存泄露问题,不管什么中断,php结束后会释放所有内存。
      你说的内存泄露问题,应该是两个原因中的一个,1.程序逻辑有问题。2.你使用了phpQuery,这个库每次调用完了都要手动phpQuery::unloadDocuments(),否则处理的内容会堆积在内存中!

  14. 在windows下的CMD或浏览器运行demo1 ,没有任何反应,然后PHP进程一下保持在50%以上.

    看了下是在while($curlInfo = curl_multi_info_read($this->mh,$this->queueNum)){这一行就卡在这不动了

  15. 如果在一个回调函数中再加一个采集,要等这个采集结束后,回调函数才结束,造成,如果一个网站有多层的话,效率就很低下,要等最后一层采集完,才一层一层释放资源.
    同样一个网站,如果把URL都采集好,并行采集是快

    • 这个问题已经考虑到了,并且完美解决,请先看完教程 3.链式抓取。
      不管网站结构有多少层都可以并行执行,链式任务是一个堆栈数据结构,所以总是优先完成最内层的任务。
      目前来说一个PHP进程中只需要一个curl对象可以完成所有可能的任务。
      你说的那种情况完全是程序结构问题导致。
      链式抓取根本不需要提前把url采集好,并且没有比这种方式效率更高的方式!

  16. 楼主,这个怎么防止ip被封,我想采集kuaidi100.com的dhl,fedex,ems,ups等线路的快递跟踪,怎么预防被封ip,请教指导,我看了你说用tor,能否加QQ指导一下。诚心想和 亲亲请教技术上的问题,因为我们公司是深圳的fedex一级代理,希望能自动跟踪包裹.QQ:252341102

    • linux下tor可以做一个socks5代理,启动多个实例可以用多个端口做代理,tor的文档自己google。最近很忙,抽空写一个整套的tor自动化多IP代理,目前还在测试,很不稳定。

        • 写了一个,需要建表和进程同步,还不稳定,失败处理什么的太复杂,不打算搞了,自己多开几个tor写逻辑轮询就行了。

          • 恩。然后。现在的问题是怎么多开实例。研究了一天了。直接/etc/init.d/tor start开一个没问题。网上找了个sh方式多开失败了。端口打开了。但是似乎没能联网。
            代码如下:

            #!/bin/bash

            # Original script from
            # http://blog.databigbang.com/distributed-scraping-with-multiple-tor-circuits/

            base_socks_port=9050
            base_control_port=15000

            # Create data directory if it doesn’t exist
            if [ ! -d “data” ]; then
            mkdir “data”
            fi

            TOR_INSTANCES=”$1″

            if [ ! $TOR_INSTANCES ] || [ $TOR_INSTANCES -lt 1 ]; then
            echo “Please supply an instance count”
            echo “Example: ./multi-tor.sh 5”
            exit 1
            fi

            for i in $(seq $TOR_INSTANCES)
            do
            j=$((i+1))
            socks_port=$((base_socks_port+i))
            control_port=$((base_control_port+i))
            if [ ! -d “data/tor$i” ]; then
            echo “Creating directory data/tor$i”
            mkdir “data/tor$i”
            fi

            # Take into account that authentication for the control port is disabled. Must be used in secure and controlled environments
            echo “Running: tor –RunAsDaemon 1 –CookieAuthentication 0 –HashedControlPassword \”\” –ControlPort $control_port –PidFile tor$i.pid –SocksPort $socks_port –DataDirectory data/tor$i”

            tor –RunAsDaemon 1 –CookieAuthentication 0 –HashedControlPassword “” –ControlPort $control_port –PidFile tor$i.pid –SocksPort $socks_port –DataDirectory data/tor$i
            done

            博主能指点下么?

            • /usr/bin/tor -f /srv/tor/torrc –pidfile /srv/tor/10090/tor.pid –log “notice file /srv/tor/10090/tor.log” –runasdaemon 1 –datadirectory /srv/tor/10090/data –WarnUnsafeSocks 0 –SocksPort 10090 –SocksListenAddress 127.0.0.1 –ControlPort 10091 –HashedControlPassword 16:478A547917EEB05B60DDF0895AA0C2A06D1F5E55832E48F521E01AE2CB –user _tor

              用这个命令启动一个tor,/srv/tor/torrc这个配置文件为空即可(配置文件必须有),修改端口可以启动多个进程,多个进程tor的ip不同,具体命令行参数含义见官方文档,如何切换IP本博客搜索”tor切换IP”。有其他问题看官方文档或google自行解决。

  17. 类非常好用,谢谢分享,不过在大数据量处理时,会遇到内存溢出现象,不知道各位有没有遇到,报如下错误,Fatal error: out of dynamic memory in yy_create_buffer()
    我在名两行下执行demo2的时候就已经报错了,楼主是否也在优化下,感觉有些地方的循环太多,有待优化下。谢谢

    • 从没有遇到过这种错误,一次性添加15W个url内存占用300MB左右,执行demo2不可能内存溢出,除非php配置有问题。类中的每一个循环都是必要的,建议你到googlecode下载最新版本的类。
      类唯一需要优化的地方是curl线程的复用,也就是tcp连接的复用,现在性能,稳定性完全满足要求加上精力有限就先不修改了。
      估计你的代码可能有些问题,可以贴出来帮你测试一下。

      • 多谢你的回复,我是在CI框架中使用的,不知道是不是与框架本身有关,经常性的报如下错误
        A PHP Error was encountered

        Severity: Warning
        Message: curl_getinfo(): 83 is not a valid cURL handle resource
        Filename: libraries/CURL.php
        Line Number: 173

        Fatal error: Call to undefined function debug_print() in /data/www/app/libraries/CURL.php on line 178

        • debug_print()是对user_error()的封装,直接用查找替换就可以。

          你把curl先关的逻辑代码发到我邮箱,我看下什么问题。aresrr@126.com,包括sql,可能有非法的url,CURL不对url格式进行验证。

            • 通过测试,类没有问题。
              有几点情况必须注意,脚本必须在linux命令行下运行(cygwin没有测试),windows命令行肯定不行,windows下php 5.3.10 $this->curl->read($url)这一步就出问题。
              我的linux环境用的是centos 5.4 final,PHP 5.4.11 (cli) (built: Jan 16 2013 17:14:04) 。
              另外代码中显示状态的函数调用不对,$curl->status()应该在回调函数中调用,这样可以每一秒更新一次状态。修改后的回调函数如下:

              public function test2($res, $url,$curl) {
              	$lists = array();
              	if($res['info']['http_code']==200){
              		phpQuery::clear();
              		$html = phpQuery::newDocumentHTML($res['content']);
              		$list = $html['div.albumList ul li a.albumLink'];
              		if(!empty($list)){
              			foreach($list as $v){
              				$data = array();
              				$v = pq($v);
              				$data['albumName'] = trim($v->find('span.albumName')->text());
              				$data['url'] = trim($v->attr('href'));
              		
              				$lists[] = $data;
              			}
              		}
              	}
              	$curl->status();
              	phpQuery::unloadDocuments();
              	file_put_contents(ROOTPATH.'logs/'.basename($url), print_r($lists, true));
              }

              主程序37行改成了这样:$callback = array(array($this->test, ‘test2’), array($info[‘url’],&$this->curl));
              另外一点一定注意!如果使用phpQuery必须在回调函数中手动调用phpQuery::unloadDocuments();释放,否则phpQuery处理的文档全部都在内存中!!!
              我的虚拟机只分配了128MB内存,跑完你的脚本速度非常快,内存占用完全可以忽略,结果如下:
              ———————————-
              100.00% 100/100(0) 20/s 535.14KB/s 2.61MB ETA 0s
              PHP Fatal error: Call to undefined method CI_Profiler::slowsql_log() in /mnt/hgfs/www_root/test/snatch/system/core/Output.php on line 444

              Fatal error: Call to undefined method CI_Profiler::slowsql_log() in /mnt/hgfs/www_root/test/snatch/system/core/Output.php on line 444
              [root@vm-centos5 www]#
              ————————————
              必定是你的配置有问题。

            • 如果感觉类有问题,建议把逻辑从框架拿出来用最原始的php程序测试一下,因为这类问题基本都是和类本身无关的问题导致。

  18. 那啥,我首先用addTask添加采集url
    $href=“”;
    $url=array($href);
    $callback=array(‘demo2_cb2’,array($asin,$href,$reviewpagenum,$r_tb_name));
    $curl->add($url,$callback);
    ————-上面这个是addTask中执行————-
    然后在demo2_cb2中,根据采集内容返回的结果,如果有解析出来的url需要抓取。

    我再用$curl->add($url,$callback);添加解析出来的url,会怎么样?

    • 这就是教程中第3条提到的。
      这么做是没有任何问题的,$curl->add()添加一个任务到$curl的内部数组,为了保持一个固定的并发数$curl始终从这个内部数组中取任务。
      仔细看一下上面“3.链式抓取”

  19. 不知道怎么解决了,求助博主。

    我有个大库,需要抓取的种子网址有10万左右,这10万个种子网址。
    假设格式为 a.htm
    需要去访问,然后每个种子网址根据返回的内容来确定是否要继续抓取二级网址,即
    a.html?page=1,a.html?page=2
    这样。。我写了代码,但是发现抓取的速度似乎并不能提升。
    博主能给点思路吗?

    • 补上其他情况:
      当然a.htm抓取完后,我回到数据库更新a.htm的状态为已更新。

      但是在这种情况下,我用curl->task来添加每次添加10个种子词到队列中,发现会重复添加种子词。

      即,第一次添加了a.htm,b.html几个种子地址,本来他们需要抓取a.html?page=1等页面的,但是还没抓完,那个addtask又再次执行,又把a.htm,b.html加入了队列,这就造成了无限死循环。貌似。。。

      • 有可能,$curl->task在并发数不够$curl->limit的时候被调用,a.htm,b.html抓取进行时a.htm,b.html数据库状态不会被更新,一旦并发不够就会重复添加造成浪费(CURL类本身没有url排重功能)。
        把你的代码贴出来,我可以帮你看下有什么问题。

        • 针对那个url重复添加的问题,我自己在task调用的函数中,搞了个array,把添加过的url存起来,然后下次自动添加url时先在这个数据进行判断。

          • 有时候url个数有几百万,用array占内存较多,我是这么搞的,设置一个
            static $lastId;每次取数据这么取:select id,url from table where id>$lastId order by id asc limit 100;然后设置$lastId为结果中最后一条记录的id。

    • 抓取速度由网络情况和被抓取服务器共同决定,类本身性能是最优的,可以尝试$curl->limit调大一些。可以先检查一下程序逻辑是否有问题。刚才站内的CURL更新到了最新版本,解决了一个特殊情况下的性能问题。

        • 有两种方法:1.在任何代码可以执行到的地方$curl->task=null即可,建议在$curl->task指向的回调函数中操作。
          2.$curl->task回调中实际通过$curl->add()添加任务,不执行这个方法就可以了,也就是$curl->task空跑就行了,这样不优美,不建议这么干。

      • 想问一下,比如我建立了$curl,然后,调用了$curl->task=’addUrl’

        在这个 addUrl中,再建立$curl_1,然后通过这个$curl_1来处理任务,这样效率会怎么样?下面是代码

        $curl=new CURL();
        $curl->task=’addNewFetchItem’;
        $curl->go();

        //添加新的url
        function addNewFetchItem()
        {
        global $curl_2,$curl, $sql_connect,$item_tablename,$cid,$r_table_count,$has_success,$has_fetch_array;
        $limit=1;
        $sql = “select url,asin from `$item_tablename` where is_fetch = 0 order by id limit $limit;”;
        $result = mysql_query($sql, $sql_connect);//执行查询
        while($row = mysql_fetch_array($result))
        {
        $href=”xxxxxxx”;
        //判断网址是否已加入队列,如果已加入,则不添加,跳过
        if(!in_array($asin,$has_fetch_array))
        {
        $r_tb_name=getRiviewTable($cid,$r_table_count);
        getFirstReviewPage($asin,$tag,$href,$r_tb_name);
        }
        else
        {
        echo “url is feting ,do no add again”;
        echo “”;
        }
        }
        }

        function getFirstReviewPage($asin,$tag,$href,$r_tb_name)
        {
        global $curl, $sql_connect,$item_tablename,$cid,$has_fetch_array;

        $has_fetch_array[]=$asin;
        $result=$curl->read($href);
        $info=$result[‘content’];
        if (strpos($info,”“))
        {
        insertReivews($info,$asin,$r_tb_name);//如果有评论,则进行处理
        $reviewpagenum= getReviewPagenum($info);//得到评论页数
        if($reviewpagenum>1)
        {
        $curl_2= new CURL();
        for($i=2;$iadd($url,$callback);
        }
        echo “”;
        $curl_2->go();
        }
        else
        {
        }
        }
        else
        {
        }
        }

        • 这代码逻辑比较乱,具体逻辑没仔细看,但是getFirstReviewPage()中$curl_2= new CURL();肯定是有问题的,性能如何没有测试,但肯定不是最优,CURL类本身可以复用,在一个PHP进程中理论上一个CURL对象完全可以做所有的事情。

        • 只需要启动多个tor客户端监听不同端口就可以了,然后用PHP写控制逻辑。
          一个作为客户端最小配置的tor进程大约占用20MB~30MB空间。
          不重启客户端用PHP换IP,http://blog.phpdr.net/tor-change-identity.html

          我是这么干的:
          [root@ares tor]# ps aux|grep tor
          _tor 31431 20.8 6.7 21876 17200 ? S 20:34 0:01 /usr/bin/tor -f /srv/tor/torrc –pidfile /srv/tor/10090/tor.pid –log notice file /srv/tor/10090/tor.log –runasdaemon 1 –datadirectory /srv/tor/10090/data –WarnUnsafeSocks 0 –SocksPort 10090 –SocksListenAddress 127.0.0.1 –ControlPort 10091 –HashedControlPassword 16:478A547917EEB05B60DDF0895AA0C2A06D1F5E55832E48F521E01AE2CB –user _tor

  20. 不错的类啊,我原来写了简单的类,前段用了rolling curl .但是并发大的时候,很多页面下载不完整。
    现在想换成楼主的类,可以向楼主请教下实际操作吗,我的流程有点不同,试了下不成功。

  21. 博主你好 关注你这个类 好久了
    自己也能简单应用 但是按你的介绍来看 应用的很不完善
    球博主写个demo 有可能的话发个邮件 谢谢

发表评论

电子邮件地址不会被公开。

ERROR: si-captcha.php plugin: GD image support not detected in PHP!

Contact your web host and ask them to enable GD image support for PHP.

ERROR: si-captcha.php plugin: imagepng function not detected in PHP!

Contact your web host and ask them to enable imagepng for PHP.