我們的爬蟲程序在執行過程中,可能需要滿足以下條件:
1、可以每天定時執行,爬取指定電商等網站內容。
2、可以對分布式爬蟲進行監控,當爬蟲程序掛掉之後,可以通知管理員。
下面我們來介紹如何實現這兩個功能。
注意:
這裡我們主要演示定時執行和監控功能,所以爬蟲程序只是偽代碼。
簡介:
Quartz是一個完全由java編寫的開源作業調度框架。盡管Quartz框架整合了許多額外功能, 但就其簡易形式,你會發現它非常易用。簡單地創建一個實現org.quartz.Job接口的java類。
Job接口包含唯一的方法:
public void execute(JobExecutionContext context) throws JobExecutionException;
在你的Job接口實現類裡面,添加一些邏輯到execute()方法。一旦你配置好Job實現類並設定好調度時間表,Quartz將密切注意剩余時間。當調度程序確定該是通知你的作業的時候,Quartz框架將調用你Job實現類(作業類)上的execute()方法並允許做它該做的事情。無需報告任何東西給調度器或調用任何特定的東西。僅僅執行任務和結束任務即可。如果配置你的作業在隨後再次被調用,Quartz框架將在恰當的時間再次調用它。
使用格式
首先Quartz Cron 表達式支持到七個域:
名稱 是否必須 允許值 特殊字符
秒 是 0-59 , - * /
分 是 0-59 , - * /
時 是 0-23 , - * /
日 是 1-31 , - * ? / L W
月 是 1-12 或 JAN-DEC , - * /
周 是 1-7 或 SUN-SAT , - * ? / L #
年 否 空 或 1970-2099 , - * /
結構,以這個為例
0 11 11 11 11 ? 每年的11月11號 11點11分觸發(光棍節)
可以看到基本結構是 秒_分_小時_日_月_[周]_[年] 後面的周和年是可選的
其次,通配符,主要的有星號(*);問號(?);減號(-);逗號(,);斜槓(/);L字母;W字母;井號(#).
通配符含義:
星號:表示任意時刻
問號:只能在日或周字段上使用,如官方文檔解釋的那樣,問號(?)的作用是指明該字段‘沒有特定的值
減號:范圍,如 1-5秒
逗號:列表,如 1,5,10 秒
斜槓:等步長序列,如3/13秒 表示 3,16,29,42,55,3,16...
L:僅在日和周上支持,表示允許的最後一個值,注意不要讓范圍和列表與L連用
W:工作日
井號:為給定月份指定具體的工作日實例。把“MON#2”放在周內日期字段中,表示把任務安排在當月的第二個星期一。
使用示例:
Pom依賴:
org.quartz-scheduler quartz1.8.4
模擬的爬蟲程序:
import java.text.SimpleDateFormat; import java.util.Date; //添加quartz依賴 import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; //通過quartz實現定時任務需要實現Job接口 public class SpiderTaobao implements Job{ //重寫Job中的execute方法 public void execute(JobExecutionContext arg0) throws JobExecutionException { //設置時間輸出格式 SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy年MM月dd日 HH時mm分ss秒"); String time = simpleDateFormat.format(new Date()); //打印爬蟲執行時間 System.out.println(time); //模擬爬蟲程序 System.out.println("爬取淘寶數據。。。。。。。。"); } }
定時器實現爬蟲程序定時:
//添加quartz依賴 import org.quartz.CronTrigger; import org.quartz.JobDetail; import org.quartz.Scheduler; import org.quartz.impl.StdSchedulerFactory; public class SpiderTimer { public static void main(String[] args) { try { //1獲取一個默認調度器 Scheduler defaultScheduler = StdSchedulerFactory.getDefaultScheduler(); //2開啟調度器 defaultScheduler.start(); //封裝要調度的任務 String simpleName = SpiderTaobao.class.getSimpleName(); JobDetail jobDetail = new JobDetail(simpleName,Scheduler.DEFAULT_GROUP, SpiderTaobao.class); //表示設置定時操作(每隔5秒執行一次) CronTrigger trigger = new CronTrigger(simpleName,Scheduler.DEFAULT_GROUP, "0/5 * * * * ?"); //3執行調度任務 defaultScheduler.scheduleJob(jobDetail, trigger); } catch (Exception e) { e.printStackTrace(); } } }
執行SpiderTimer 的打印結果:
我們使用Zookeeper對爬蟲程序進行監控。ZooKeeper是一個分布式的,開放源碼的分布式應用程序協調服務,是Google的Chubby一個開源的實現,是Hadoop和Hbase的重要組件。它是一個為分布式應用提供一致性服務的軟件,提供的功能包括:配置維護、域名服務、分布式同步、組服務等。
Zookeeper集群啟動:
切換到集群所有主機Zookeeper的bin目錄下,然後
./zkServer.sh start //每台主機都開啟Zookeeper
開啟Zookeeper客戶端:
切換到一台主機Zookeeper的bin目錄下,然後
./zkCli.sh //啟動客戶端
查看Zookeeper集群中節點信息:(創建臨時節點後,在有效時間內,可以像以下這樣查看臨時節點)
1、我們主要使用Zookeeper中臨時節點的特性實現對爬蟲程序的監控。臨時節點默認生存期為40秒(也可以在代碼中自定義)。
2、我們創建一個監視器程序,監視器永不停止並且一直監控Zookeeper中臨時節點是否存在,如果爬蟲程序掛了,那麼臨時節點會在指定時間消失,監視器發現沒有臨時節點或者有新增節點後,可以執行代碼(例如發郵箱等)通知管理員。
注意程序運行需要先開啟Zookeeper集群
Pom依賴:
org.apache.curator curator-framework2.8.0
爬蟲程序SpiderTaobao:
import java.net.InetAddress; import java.text.SimpleDateFormat; import java.util.Date; //添加curator依賴 import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs.Ids; public class SpiderTaobao { public SpiderTaobao(){ //指定zk集群的地址 String connectString = "192.168.33.130:2181,192.168.33.131:2181,192.168.33.132:2181"; //1000 :代表是重試時間間隔 3:表示是重試次數 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); //使用curator創建一個zk鏈接 int sessionTimeoutMs = 2000;//這個值必須在4s--40s之間,表示是鏈接失效的時間 int connectionTimeoutMs = 1000;//鏈接超時時間 CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs , connectionTimeoutMs , retryPolicy); //啟動鏈接 client.start(); try { InetAddress localHost = InetAddress.getLocalHost(); String ip = localHost.getHostAddress(); client.create() .creatingParentsIfNeeded()//如果父節點不存在,則創建 .withMode(CreateMode.EPHEMERAL)//指定節點類型 .withACL(Ids.OPEN_ACL_UNSAFE)//指定節點的權限信息 .forPath("/Spider/"+ip);//指定節點名稱 } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { SpiderTaobao spiderTaobao = new SpiderTaobao(); //設置時間輸出格式 SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy年MM月dd日 HH時mm分ss秒"); String time = simpleDateFormat.format(new Date()); //打印爬蟲執行時間 System.out.println(time); //模擬爬蟲程序 System.out.println("爬取淘寶數據。。。。。。。。"); System.out.println("爬取結束"); } }
監視器SpiderWatcher:
import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; /** * 創建一個監視器,這個監視器需要實現watcher接口 * 接口中有一個process方法。 * 當監視器發現監視的節點發生變化的時候,這個process方法會被調用 * * * 所以這個監視器是一個守護進程,也就是說一個永遠不會停止的進程,類似於死循環 * */ public class SpiderWatcher implements Watcher { CuratorFramework client; ListchildrenList; public SpiderWatcher() { //在這需要指定監視的節點 //指定zk集群的地址 String connectString = "192.168.33.130:2181,192.168.33.131:2181,192.168.33.132:2181"; //1000 :代表是重試時間間隔 3:表示是重試次數 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); //使用curator創建一個zk鏈接 int sessionTimeoutMs = 2000;//這個值必須在4s--40s之間,表示是鏈接失效的時間 int connectionTimeoutMs = 1000;//鏈接超時時間 client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs , connectionTimeoutMs , retryPolicy); //啟動鏈接 client.start(); try { //使用spiderwatcher監視器監視spider節點下面的所有子節點的變化情況(向spider節點注冊監視器,這個監視器需要重復重復注冊) childrenList = client.getChildren().usingWatcher(this).forPath("/Spider"); } catch (Exception e) { e.printStackTrace(); } } public void process(WatchedEvent event) { try { //重復注冊監視器 List newChildrenList = client.getChildren().usingWatcher(this).forPath("/Spider"); for (String node : childrenList) { if(!newChildrenList.contains(node)){ //設置時間輸出格式 SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy年MM月dd日 HH時mm分ss秒"); String time = simpleDateFormat.format(new Date()); //打印爬蟲執行時間 System.out.println(time); System.out.println("節點消失:"+node); //給管理員發送短信,或者郵件 //發短信的話可以使用一些第三方平台 雲片網 //發郵件的話使用 javamail } } for (String node : newChildrenList) { if(!childrenList.contains(node)){ //設置時間輸出格式 SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy年MM月dd日 HH時mm分ss秒"); String time = simpleDateFormat.format(new Date()); //打印爬蟲執行時間 System.out.println(time); System.out.println("節點新增:"+node); } } this.childrenList = newChildrenList; } catch (Exception e) { e.printStackTrace(); } } public void start(){ //為了保證讓這個方法一直運行,因為這是一個監視器,不可以掛掉 while(true){ ; } } public static void main(String[] args) { SpiderWatcher spiderWatcher = new SpiderWatcher(); spiderWatcher.start(); } }
監視器啟動後狀態:
監視器捕獲到的程序狀態: