手把手教你搭建一个基于Java的分布式爬虫系统
- - DockOne.io【编者的话】在不用爬虫框架的情况,经过多方学习,尝试实现了一个分布式爬虫系统,并且可以将数据保存到不同地方,类似MySQL、HBase等. 基于面向接口的编码思想来开发,因此这个系统具有一定的扩展性,有兴趣的朋友直接看一下代码,就能理解其设计思想,虽然代码目前来说很多地方还是比较紧耦合,但只要花些时间和精力,很多都是可抽取出来并且可配置化的.
# IPProxyRepository.txt
58.60.255.104:8118
219.135.164.245:3128
27.44.171.27:9999
219.135.164.245:3128
58.60.255.104:8118
58.252.6.165:9000
......
// IP地址代理库Map
private static Map IPProxyRepository = new HashMap<>();
private static String[] keysArray = null; // keysArray是为了方便生成随机的代理对象
/**
* 初次使用时使用静态代码块将IP代理库加载进set中
*/
static {
InputStream in = HttpUtil.class.getClassLoader().getResourceAsStream("IPProxyRepository.txt"); // 加载包含代理IP的文本
// 构建缓冲流对象
InputStreamReader isr = new InputStreamReader(in);
BufferedReader bfr = new BufferedReader(isr);
String line = null;
try {
// 循环读每一行,添加进map中
while ((line = bfr.readLine()) != null) {
String[] split = line.split(":"); // 以:作为分隔符,即文本中的数据格式应为192.168.1.1:4893
String host = split[0];
int port = Integer.valueOf(split[1]);
IPProxyRepository.put(host, port);
}
Set keys = IPProxyRepository.keySet();
keysArray = keys.toArray(new String[keys.size()]); // keysArray是为了方便生成随机的代理对象
} catch (IOException e) {
e.printStackTrace();
}
}
CloseableHttpClient httpClient = null;
HttpHost proxy = null;
if (IPProxyRepository.size() > 0) { // 如果ip代理地址库不为空,则设置代理
proxy = getRandomProxy();
httpClient = HttpClients.custom().setProxy(proxy).build(); // 创建httpclient对象
} else {
httpClient = HttpClients.custom().build(); // 创建httpclient对象
}
HttpGet request = new HttpGet(url); // 构建htttp get请求
......
/**
* 随机返回一个代理对象
*
* @return
*/
public static HttpHost getRandomProxy() {
// 随机获取host:port,并构建代理对象
Random random = new Random();
String host = keysArray[random.nextInt(keysArray.length)];
int port = IPProxyRepository.get(host);
HttpHost proxy = new HttpHost(host, port); // 设置http代理
return proxy;
}
/**
* 网页数据下载
*/
public interface IDownload {
/**
* 下载给定url的网页数据
* @param url
* @return
*/
public Page download(String url);
}
/**
* 数据下载实现类
*/
public class HttpGetDownloadImpl implements IDownload {
@Override
public Page download(String url) {
Page page = new Page();
String content = HttpUtil.getHttpContent(url); // 获取网页数据
page.setUrl(url);
page.setContent(content);
return page;
}
}
/**
* 网页数据解析
*/
public interface IParser {
public void parser(Page page);
}
/**
* 解析京东商品的实现类
*/
public class JDHtmlParserImpl implements IParser {
......
}
/**
* 苏宁易购网页解析
*/
public class SNHtmlParserImpl implements IParser {
......
}
/**
* 网页对象,主要包含网页内容和商品数据
*/
public class Page {
private String content; // 网页内容
private String id; // 商品Id
private String source; // 商品来源
private String brand; // 商品品牌
private String title; // 商品标题
private float price; // 商品价格
private int commentCount; // 商品评论数
private String url; // 商品地址
private String imgUrl; // 商品图片地址
private String params; // 商品规格参数
private List urls = new ArrayList<>(); // 解析列表页面时用来保存解析的商品URL的容器
}
-- ----------------------------
-- Table structure for phone
-- ----------------------------
DROP TABLE IF EXISTS `phone`;
CREATE TABLE `phone` (
`id` varchar(30) CHARACTER SET armscii8 NOT NULL COMMENT '商品id',
`source` varchar(30) NOT NULL COMMENT '商品来源,如jd suning gome等',
`brand` varchar(30) DEFAULT NULL COMMENT '手机品牌',
`title` varchar(255) DEFAULT NULL COMMENT '商品页面的手机标题',
`price` float(10,2) DEFAULT NULL COMMENT '手机价格',
`comment_count` varchar(30) DEFAULT NULL COMMENT '手机评论',
`url` varchar(500) DEFAULT NULL COMMENT '手机详细信息地址',
`img_url` varchar(500) DEFAULT NULL COMMENT '图片地址',
`params` text COMMENT '手机参数,json格式存储',
PRIMARY KEY (`id`,`source`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
## cf1 存储 id source price comment brand url
## cf2 存储 title params imgUrl
create 'phone', 'cf1', 'cf2'
## 在HBase shell中查看创建的表
hbase(main):135:0> desc 'phone'
Table phone is ENABLED
phone
COLUMN FAMILIES DESCRIPTION
{NAME => 'cf1', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK
_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE =>
'65536', REPLICATION_SCOPE => '0'}
{NAME => 'cf2', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK
_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE =>
'65536', REPLICATION_SCOPE => '0'}
2 row(s) in 0.0350 seconds
/**
* 商品数据的存储
*/
public interface IStore {
public void store(Page page);
}
/**
* 使用dbc数据库连接池将数据写入MySQL表中
*/
public class MySQLStoreImpl implements IStore {
private QueryRunner queryRunner = new QueryRunner(DBCPUtil.getDataSource());
@Override
public void store(Page page) {
String sql = "insert into phone(id, source, brand, title, price, comment_count, url, img_url, params) values(?, ?, ?, ?, ?, ?, ?, ?, ?)";
try {
queryRunner.update(sql, page.getId(),
page.getSource(),
page.getBrand(),
page.getTitle(),
page.getPrice(),
page.getCommentCount(),
page.getUrl(),
page.getImgUrl(),
page.getParams());
} catch (SQLException e) {
e.printStackTrace();
}
}
}
......
// cf1:price
Put pricePut = new Put(rowKey);
// 必须要做是否为null判断,否则会有空指针异常
pricePut.addColumn(cf1, "price".getBytes(), page.getPrice() != null ? String.valueOf(page.getPrice()).getBytes() : "".getBytes());
puts.add(pricePut);
// cf1:comment
Put commentPut = new Put(rowKey);
commentPut.addColumn(cf1, "comment".getBytes(), page.getCommentCount() != null ? String.valueOf(page.getCommentCount()).getBytes() : "".getBytes());
puts.add(commentPut);
// cf1:brand
Put brandPut = new Put(rowKey);
brandPut.addColumn(cf1, "brand".getBytes(), page.getBrand() != null ? page.getBrand().getBytes() : "".getBytes());
puts.add(brandPut);
......
// 3.注入存储器
iSpider.setStore(new HBaseStoreImpl());
core-site.xml
hbase-site.xml
hdfs-site.xml
// 3.注入存储器
iSpider.setStore(new MySQLStoreImpl());
jd.com.higher
--https://list.jd.com/list.html?cat=9987,653,655&page=1
...
suning.com.higher
--https://list.suning.com/0-20006-0.html
...
jd.com.lower
--https://item.jd.com/23545806622.html
...
suning.com.lower
--https://product.suning.com/0000000000/690128156.html
...
/**
* URL 仓库
* 主要功能:
* 向仓库中添加URL(高优先级的列表,低优先级的商品URL)
* 从仓库中获取URL(优先获取高优先级的URL,如果没有,再获取低优先级的URL)
*
*/
public interface IRepository {
/**
* 获取URL的方法
* 从仓库中获取URL(优先获取高优先级的URL,如果没有,再获取低优先级的URL)
* @return
*/
public String poll();
/**
* 向高优先级列表中添加商品列表URL
* @param highUrl
*/
public void offerHigher(String highUrl);
/**
* 向低优先级列表中添加商品URL
* @param lowUrl
*/
public void offerLower(String lowUrl);
}
/**
* 基于Redis的全网爬虫,随机获取爬虫URL:
*
* Redis中用来保存URL的数据结构如下:
* 1. 需要爬取的域名集合(存储数据类型为set,这个需要先在Redis中添加)
* key
* spider.website.domains
* value(set)
* jd.com suning.com gome.com
* key由常量对象SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY 获得
* 2. 各个域名所对应的高低优先URL队列(存储数据类型为list,这个由爬虫程序解析种子URL后动态添加)
* key
* jd.com.higher
* jd.com.lower
* suning.com.higher
* suning.com.lower
* gome.com.higher
* gome.come.lower
* value(list)
* 相对应需要解析的URL列表
* key由随机的域名 + 常量 SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX或者SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX获得
* 3. 种子URL列表
* key
* spider.seed.urls
* value(list)
* 需要爬取的数据的种子URL
* key由常量SpiderConstants.SPIDER_SEED_URLS_KEY获得
*
* 种子URL列表中的URL会由URL调度器定时向高低优先URL队列中
*/
public class RandomRedisRepositoryImpl implements IRepository {
/**
* 构造方法
*/
public RandomRedisRepositoryImpl() {
init();
}
/**
* 初始化方法,初始化时,先将Redis中存在的高低优先级URL队列全部删除
* 否则上一次URL队列中的URL没有消耗完时,再停止启动跑下一次,就会导致URL仓库中有重复的URL
*/
public void init() {
Jedis jedis = JedisUtil.getJedis();
Set domains = jedis.smembers(SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY);
String higherUrlKey;
String lowerUrlKey;
for(String domain : domains) {
higherUrlKey = domain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX;
lowerUrlKey = domain + SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX;
jedis.del(higherUrlKey, lowerUrlKey);
}
JedisUtil.returnJedis(jedis);
}
/**
* 从队列中获取URL,目前的策略是:
* 1. 先从高优先级URL队列中获取
* 2. 再从低优先级URL队列中获取
* 对应我们的实际场景,应该是先解析完列表URL再解析商品URL
* 但是需要注意的是,在分布式多线程的环境下,肯定是不能完全保证的,因为在某个时刻高优先级url队列中
* 的URL消耗完了,但实际上程序还在解析下一个高优先级URL,此时,其它线程去获取高优先级队列URL肯定获取不到
* 这时就会去获取低优先级队列中的URL,在实际考虑分析时,这点尤其需要注意
* @return
*/
@Override
public String poll() {
// 从set中随机获取一个顶级域名
Jedis jedis = JedisUtil.getJedis();
String randomDomain = jedis.srandmember(SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY); // jd.com
String key = randomDomain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX; // jd.com.higher
String url = jedis.lpop(key);
if(url == null) { // 如果为null,则从低优先级中获取
key = randomDomain + SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX; // jd.com.lower
url = jedis.lpop(key);
}
JedisUtil.returnJedis(jedis);
return url;
}
/**
* 向高优先级URL队列中添加URL
* @param highUrl
*/
@Override
public void offerHigher(String highUrl) {
offerUrl(highUrl, SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX);
}
/**
* 向低优先URL队列中添加URL
* @param lowUrl
*/
@Override
public void offerLower(String lowUrl) {
offerUrl(lowUrl, SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX);
}
/**
* 添加URL的通用方法,通过offerHigher和offerLower抽象而来
* @param url 需要添加的URL
* @param urlTypeSuffix url类型后缀.higher或.lower
*/
public void offerUrl(String url, String urlTypeSuffix) {
Jedis jedis = JedisUtil.getJedis();
String domain = SpiderUtil.getTopDomain(url); // 获取URL对应的顶级域名,如jd.com
String key = domain + urlTypeSuffix; // 拼接URL队列的key,如jd.com.higher
jedis.lpush(key, url); // 向URL队列中添加URL
JedisUtil.returnJedis(jedis);
}
}
定时器基于Quartz实现,下面是其job的代码:
URL消费完毕后,是否需要循环不断爬取数据根据个人业务需求而不同,因此这一步不是必需的,只是也提供了这样的操作。因为事实上,我们需要爬取的数据也是每隔一段时间就会更新的,如果希望我们爬取的数据也跟着定时更新,那么这时定时器就有非常重要的作用了。不过需要注意的是,一旦决定需要循环重复爬取数据,则在设计存储器实现时需要考虑重复数据的问题,即重复数据应该是更新操作,目前在我设计的存储器不包括这个功能,有兴趣的朋友可以自己实现,只需要在插入数据前判断数据库中是否存在该数据即可。
另外需要注意的一点是,URL定时器是一个独立的进程,需要单独启动。
/**
* 每天定时从URL仓库中获取种子URL,添加进高优先级列表
*/
public class UrlJob implements Job {
// log4j日志记录
private Logger logger = LoggerFactory.getLogger(UrlJob.class);
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
/**
* 1. 从指定URL种子仓库获取种子URL
* 2. 将种子URL添加进高优先级列表
*/
Jedis jedis = JedisUtil.getJedis();
Set seedUrls = jedis.smembers(SpiderConstants.SPIDER_SEED_URLS_KEY); // spider.seed.urls Redis数据类型为set,防止重复添加种子URL
for(String seedUrl : seedUrls) {
String domain = SpiderUtil.getTopDomain(seedUrl); // 种子url的顶级域名
jedis.sadd(domain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX, seedUrl);
logger.info("获取种子:{}", seedUrl);
}
JedisUtil.returnJedis(jedis);
// System.out.println("Scheduler Job Test...");
}
} URL
调度器的实现如下:
{{{/**
* URL定时调度器,定时向URL对应仓库中存放种子URL
*
* 业务规定:每天凌晨1点10分向仓库中存放种子URL
*/
public class UrlJobScheduler {
public UrlJobScheduler() {
init();
}
/**
* 初始化调度器
*/
public void init() {
try {
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
// 如果没有以下start方法的执行,则是不会开启任务的调度
scheduler.start();
String name = "URL_SCHEDULER_JOB";
String group = "URL_SCHEDULER_JOB_GROUP";
JobDetail jobDetail = new JobDetail(name, group, UrlJob.class);
String cronExpression = "0 10 1 * * ?";
Trigger trigger = new CronTrigger(name, group, cronExpression);
// 调度任务
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
UrlJobScheduler urlJobScheduler = new UrlJobScheduler();
urlJobScheduler.start();
}
/**
* 定时调度任务
* 因为我们每天要定时从指定的仓库中获取种子URL,并存放到高优先级的URL列表中
* 所以是一个不间断的程序,所以不能停止
*/
private void start() {
while (true) {
}
}
}
[zk: localhost:2181(CONNECTED) 1] create /ispider ispider
Created /ispider
[zk: localhost:2181(CONNECTED) 0] ls /ispider
[]
[zk: localhost:2181(CONNECTED) 0] ls /ispider
[192.168.43.166]
[zk: localhost:2181(CONNECTED) 0] ls /ispider
[]
/**
* 注册ZooKeeper
*/
private void registerZK() {
String zkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181";
int baseSleepTimeMs = 1000;
int maxRetries = 3;
RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
CuratorFramework curator = CuratorFrameworkFactory.newClient(zkStr, retryPolicy);
curator.start();
String ip = null;
try {
// 向ZooKeeper的具体目录注册写节点创建节点
ip = InetAddress.getLocalHost().getHostAddress();
curator.create().withMode(CreateMode.EPHEMERAL).forPath("/ispider/" + ip, ip.getBytes());
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
public SpiderMonitorTask() {
String zkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181";
int baseSleepTimeMs = 1000;
int maxRetries = 3;
RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
curator = CuratorFrameworkFactory.newClient(zkStr, retryPolicy);
curator.start();
try {
previousNodes = curator.getChildren().usingWatcher(this).forPath("/ispider");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 这个方法,当监控的ZooKeeper对应的目录一旦有变动,就会被调用
* 得到当前最新的节点状态,将最新的节点状态和初始或者上一次的节点状态作比较,那我们就知道了是由谁引起的节点变化
* @param event
*/
@Override
public void process(WatchedEvent event) {
try {
List currentNodes = curator.getChildren().usingWatcher(this).forPath("/ispider");
// HashSet previousNodesSet = new HashSet<>(previousNodes);
if(currentNodes.size() > previousNodes.size()) { // 最新的节点服务,超过之前的节点服务个数,有新的节点增加进来
for(String node : currentNodes) {
if(!previousNodes.contains(node)) {
// 当前节点就是新增节点
logger.info("----有新的爬虫节点{}新增进来", node);
}
}
} else if(currentNodes.size() < previousNodes.size()) { // 有节点挂了 发送告警邮件或者短信
for(String node : previousNodes) {
if(!currentNodes.contains(node)) {
// 当前节点挂掉了 得需要发邮件
logger.info("----有爬虫节点{}挂掉了", node);
MailUtil.sendMail("有爬虫节点挂掉了,请人工查看爬虫节点的情况,节点信息为:", node);
}
}
} // 挂掉和新增的数目一模一样,上面是不包括这种情况的,有兴趣的朋友可以直接实现包括这种特殊情况的监控
previousNodes = currentNodes; // 更新上一次的节点列表,成为最新的节点列表
} catch (Exception e) {
e.printStackTrace();
}
// 在原生的API需要再做一次监控,因为每一次监控只会生效一次,所以当上面发现变化后,需要再监听一次,这样下一次才能监听到
// 但是在使用curator的API时则不需要这样做
}
mysql> select count(*) from phone;
+----------+
| count(*) |
+----------+
| 12052 |
+----------+
1 row in set
mysql> select count(*) from phone where source='jd.com';
+----------+
| count(*) |
+----------+
| 9578 |
+----------+
1 row in set
mysql> select count(*) from phone where source='suning
.com';
+----------+
| count(*) |
+----------+
| 2474 |
+----------+
1 row in set
hbase(main):225:0* count 'phone'
Current count: 1000, row: 11155386088_jd.com
Current count: 2000, row: 136191393_suning.com
Current count: 3000, row: 16893837301_jd.com
Current count: 4000, row: 19036619855_jd.com
Current count: 5000, row: 1983786945_jd.com
Current count: 6000, row: 1997392141_jd.com
Current count: 7000, row: 21798495372_jd.com
Current count: 8000, row: 24154264902_jd.com
Current count: 9000, row: 25687565618_jd.com
Current count: 10000, row: 26458674797_jd.com
Current count: 11000, row: 617169906_suning.com
Current count: 12000, row: 769705049_suning.com
12348 row(s) in 1.5720 seconds
=> 12348
2018-04-01 21:26:03 [pool-1-thread-1] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下载网页:https://list.jd.com/list.html?cat=9987,653,655&page=1,消耗时长:590 ms,代理信息:null:null
2018-04-01 21:26:03 [pool-1-thread-1] [cn.xpleaf.spider.core.parser.Impl.JDHtmlParserImpl] [INFO] - 解析列表页面:https://list.jd.com/list.html?cat=9987,653,655&page=1, 消耗时长:46ms
2018-04-01 21:26:03 [pool-1-thread-3] [cn.xpleaf.spider.core.parser.Impl.SNHtmlParserImpl] [INFO] - 解析列表页面:https://list.suning.com/0-20006-0.html, 消耗时长:49ms
2018-04-01 21:26:04 [pool-1-thread-5] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下载网页:https://item.jd.com/6737464.html,消耗时长:219 ms,代理信息:null:null
2018-04-01 21:26:04 [pool-1-thread-2] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下载网页:https://list.jd.com/list.html?cat=9987,653,655&page=2&sort=sort_rank_asc&trans=1&JL=6_0_0,消耗时长:276 ms,代理信息:null:null
2018-04-01 21:26:04 [pool-1-thread-4] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下载网页:https://list.suning.com/0-20006-99.html,消耗时长:300 ms,代理信息:null:null
2018-04-01 21:26:04 [pool-1-thread-4] [cn.xpleaf.spider.core.parser.Impl.SNHtmlParserImpl] [INFO] - 解析列表页面:https://list.suning.com/0-20006-99.html, 消耗时长:4ms
......
2018-04-01 21:27:49 [pool-1-thread-3] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下载网页:https://club.jd.com/comment/productCommentSummaries.action?referenceIds=23934388891,消耗时长:176 ms,代理信息:null:null
2018-04-01 21:27:49 [pool-1-thread-3] [cn.xpleaf.spider.core.parser.Impl.JDHtmlParserImpl] [INFO] - 解析商品页面:https://item.jd.com/23934388891.html, 消耗时长:413ms
2018-04-01 21:27:49 [pool-1-thread-2] [cn.xpleaf.spider.utils.HttpUtil] [INFO] - 下载网页:https://review.suning.com/ajax/review_satisfy/general-00000000010017793337-0070079092-----satisfy.htm,消耗时长:308 ms,代理信息:null:null
2018-04-01 21:27:49 [pool-1-thread-2] [cn.xpleaf.spider.core.parser.Impl.SNHtmlParserImpl] [INFO] - 解析商品页面:https://product.suning.com/0070079092/10017793337.html, 消耗时长:588ms
......