歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
您现在的位置: Linux教程網 >> UnixLinux >  >> Linux編程 >> Linux編程

Apache Curator入門實戰

Apache Curator入門實戰

Curator是Netflix公司開源的一個Zookeeper客戶端,與Zookeeper提供的原生客戶端相比,Curator的抽象層次更高,簡化了Zookeeper客戶端的開發量。

1.Zookeeper安裝部署

Zookeeper的部署很簡單,如果已經有Java運行環境的話,下載tarball解壓後即可運行。

[root@linuxidc Temp]$ wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
[root@linuxidc Temp]$ tar zxvf zookeeper-3.4.6.tar.gz
[root@linuxidc Temp]$ cd zookeeper-3.4.6

[root@linuxidc zookeeper-3.4.6]$ cp conf/zoo_sample.cfg conf/zoo.cfg
[root@linuxidc zookeeper-3.4.6]$ export ZOOKEEPER_HOME=/usr/local/src/zookeeper-3.4.5
[root@linuxidc zookeeper-3.4.6]$ export PATH=$ZOOKEEPER_HOME/bin:$PATH

[root@linuxidc zookeeper-3.4.6]$ bin/zkServer.sh start
[root@linuxidc zookeeper-3.4.6]$ bin/zkCli.sh -server 127.0.0.1:2181

2.客戶端常用操作

用zkCli.sh連接上Zookeeper服務後,用help能列出所有命令:

[root@BC-VM-edce4ac67d304079868c0bb265337bd4 zookeeper-3.4.6]# bin/zkCli.sh -127.0.0.1:2181
Connecting to localhost:2181
2015-06-11 10:55:14,387 [myid:] - INFO  [main:Environment@100] - Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
    ...

[zk: localhost:2181(CONNECTED) 5] help
ZooKeeper -server host:port cmd args
        connect host:port
        get path [watch]
        ls path [watch]
        set path data [version]
        rmr path
        delquota [-n|-b] path
        quit 
        printwatches on|off
        create [-s] [-e] path data acl
        stat path [watch]
        close 
        ls2 path [watch]
        history 
        listquota path
        setAcl path acl
        getAcl path
        sync path
        redo cmdno
        addauth scheme auth
        delete path [version]
        setquota -n|-b val path

下面就試驗一下常用的命令:

  • create:創建路徑結點。
  • ls:查看路徑下的所有結點。
  • get:獲得結點上的值。
  • set:修改結點上的值。
  • delete:刪除結點。
[zk: localhost:2181(CONNECTED) 6] create /zktest mydata
Created /zktest
[zk: localhost:2181(CONNECTED) 12] ls /
[zktest, zookeeper]
[zk: localhost:2181(CONNECTED) 7] ls /zktest
[]
[zk: localhost:2181(CONNECTED) 13] get /zktest
mydata
cZxid = 0x1c
ctime = Thu Jun 11 10:58:06 CST 2015
mZxid = 0x1c
mtime = Thu Jun 11 10:58:06 CST 2015
pZxid = 0x1c
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0
[zk: localhost:2181(CONNECTED) 14] set /zktest junk
cZxid = 0x1c
ctime = Thu Jun 11 10:58:06 CST 2015
mZxid = 0x1f
mtime = Thu Jun 11 10:59:08 CST 2015
pZxid = 0x1c
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
numChildren = 0
[zk: localhost:2181(CONNECTED) 15] delete /zktest
[zk: localhost:2181(CONNECTED) 16] ls /
[zookeeper]

3.用Curator管理Zookeeper

Curator的Maven依賴如下,一般直接使用curator-recipes就行了,如果需要自己封裝一些底層些的功能的話,例如增加連接管理重試機制等,則可以引入curator-framework包。

    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>2.7.0</version>
    </dependency>

3.1 Client操作

利用Curator提供的客戶端API,可以完全實現上面原生客戶端的功能。值得注意的是,Curator采用流式風格API。

package com.cdai.codebase.bigdata.Hadoop.zookeeper.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;

/**
 * Curator framework's client test.
 * Output:
 *  $ create /zktest hello 
 *  $ ls / 
 *  [zktest, zookeeper]
 *  $ get /zktest 
 *  hello
 *  $ set /zktest world 
 *  $ get /zktest 
 *  world
 *  $ delete /zktest 
 *  $ ls / 
 *  [zookeeper]
 */
public class CuratorClientTest {

    /** Zookeeper info */
    private static final String ZK_ADDRESS = "192.168.1.100:2181";
    private static final String ZK_PATH = "/zktest";

    public static void main(String[] args) throws Exception {
        // 1.Connect to zk
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                ZK_ADDRESS,
                new RetryNTimes(10, 5000)
        );
        client.start();
        System.out.println("zk client start successfully!");

        // 2.Client API test
        // 2.1 Create node
        String data1 = "hello";
        print("create", ZK_PATH, data1);
        client.create().
                creatingParentsIfNeeded().
                forPath(ZK_PATH, data1.getBytes());

        // 2.2 Get node and data
        print("ls", "/");
        print(client.getChildren().forPath("/"));
        print("get", ZK_PATH);
        print(client.getData().forPath(ZK_PATH));

        // 2.3 Modify data
        String data2 = "world";
        print("set", ZK_PATH, data2);
        client.setData().forPath(ZK_PATH, data2.getBytes());
        print("get", ZK_PATH);
        print(client.getData().forPath(ZK_PATH));

        // 2.4 Remove node
        print("delete", ZK_PATH);
        client.delete().forPath(ZK_PATH);
        print("ls", "/");
        print(client.getChildren().forPath("/"));
    }

    private static void print(String... cmds) {
        StringBuilder text = new StringBuilder("$ ");
        for (String cmd : cmds) {
            text.append(cmd).append(" ");
        }
        System.out.println(text.toString());
    }

    private static void print(Object result) {
        System.out.println(
                result instanceof byte[]
                    ? new String((byte[]) result)
                        : result);
    }

}

3.2 監聽器

Curator提供了三種Watcher(Cache)來監聽結點的變化:

  • Path Cache:監視一個路徑下1)孩子結點的創建、2)刪除,3)以及結點數據的更新。產生的事件會傳遞給注冊的PathChildrenCacheListener。
  • Node Cache:監視一個結點的創建、更新、刪除,並將結點的數據緩存在本地。
  • Tree Cache:Path Cache和Node Cache的“合體”,監視路徑下的創建、更新、刪除事件,並緩存路徑下所有孩子結點的數據。

下面就測試一下最簡單的Path Watcher:

package com.cdai.codebase.bigdata.hadoop.zookeeper.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.retry.RetryNTimes;

/**
 * Curator framework watch test.
 */
public class CuratorWatcherTest {

    /** Zookeeper info */
    private static final String ZK_ADDRESS = "192.168.1.100:2181";
    private static final String ZK_PATH = "/zktest";

    public static void main(String[] args) throws Exception {
        // 1.Connect to zk
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                ZK_ADDRESS,
                new RetryNTimes(10, 5000)
        );
        client.start();
        System.out.println("zk client start successfully!");

        // 2.Register watcher
        PathChildrenCache watcher = new PathChildrenCache(
                client,
                ZK_PATH,
                true    // if cache data
        );
        watcher.getListenable().addListener((client1, event) -> {
            ChildData data = event.getData();
            if (data == null) {
                System.out.println("No data in event[" + event + "]");
            } else {
                System.out.println("Receive event: "
                        + "type=[" + event.getType() + "]"
                        + ", path=[" + data.getPath() + "]"
                        + ", data=[" + new String(data.getData()) + "]"
                        + ", stat=[" + data.getStat() + "]");
            }
        });
        watcher.start(StartMode.BUILD_INITIAL_CACHE);
        System.out.println("Register zk watcher successfully!");

        Thread.sleep(Integer.MAX_VALUE);
    }

}

下面是在zkCli.sh中操作時Java程序的輸出:

Java: zk client start successfully!
Java: Register zk watcher successfully!

zkCli: [zk: localhost:2181(CONNECTED) 11] create /zktest/hello mydata
Java: Receive event: type=[CHILD_ADDED], path=[/zktest/hello], data=[mydata], stat=[121,121,1434001221097,1434001221097,0,0,0,0,6,0,121]

zkCli: [zk: localhost:2181(CONNECTED) 12] set /zktest/hello otherdata
Java: Receive event: type=[CHILD_UPDATED], path=[/zktest/hello], data=[otherdata], stat=[121,122,1434001221097,1434001228467,1,0,0,0,9,0,121]

zkCli: [zk: localhost:2181(CONNECTED) 13] delete /zktest/hello
Java: Receive event: type=[CHILD_REMOVED], path=[/zktest/hello], data=[otherdata], stat=[121,122,1434001221097,1434001228467,1,0,0,0,9,0,121]

4.Curator“菜譜”

既然Maven包叫做curator-recipes,那說明Curator有它獨特的“菜譜”:

  • :包括共享鎖、共享可重入鎖、讀寫鎖等。
  • 選舉:Leader選舉算法。
  • Barrier:阻止分布式計算直至某個條件被滿足的“柵欄”,可以看做JDK Concurrent包中Barrier的分布式實現。
  • 緩存:前面提到過的三種Cache及監聽機制。
  • 持久化結點:連接或Session終止後仍然在Zookeeper中存在的結點。
  • 隊列:分布式隊列、分布式優先級隊列等。

4.1 分布式鎖

分布式編程時,比如最容易碰到的情況就是應用程序在線上多機部署,於是當多個應用同時訪問某一資源時,就需要某種機制去協調它們。例如,現在一台應用正在rebuild緩存內容,要臨時鎖住某個區域暫時不讓訪問;又比如調度程序每次只想一個任務被一台應用執行等等。

下面的程序會啟動兩個線程t1和t2去爭奪鎖,拿到鎖的線程會占用5秒。運行多次可以觀察到,有時是t1先拿到鎖而t2等待,有時又會反過來。Curator會用我們提供的lock路徑的結點作為全局鎖,這個結點的數據類似這種格式:[_c_64e0811f-9475-44ca-aa36-c1db65ae5350-lock-0000000005],每次獲得鎖時會生成這種串,釋放鎖時清空數據。

package com.cdai.codebase.bigdata.hadoop.zookeeper.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.RetryNTimes;

import java.util.concurrent.TimeUnit;

/**
 * Curator framework's distributed lock test.
 */
public class CuratorDistrLockTest {

    /** Zookeeper info */
    private static final String ZK_ADDRESS = "192.168.1.100:2181";
    private static final String ZK_LOCK_PATH = "/zktest";

    public static void main(String[] args) throws InterruptedException {
        // 1.Connect to zk
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                ZK_ADDRESS,
                new RetryNTimes(10, 5000)
        );
        client.start();
        System.out.println("zk client start successfully!");

        Thread t1 = new Thread(() -> {
            doWithLock(client);
        }, "t1");
        Thread t2 = new Thread(() -> {
            doWithLock(client);
        }, "t2");

        t1.start();
        t2.start();
    }

    private static void doWithLock(CuratorFramework client) {
        InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH);
        try {
            if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) {
                System.out.println(Thread.currentThread().getName() + " hold lock");
                Thread.sleep(5000L);
                System.out.println(Thread.currentThread().getName() + " release lock");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

}

4.2 Leader選舉

當集群裡的某個服務down機時,我們可能要從slave結點裡選出一個作為新的master,這時就需要一套能在分布式環境中自動協調的Leader選舉方法。Curator提供了LeaderSelector監聽器實現Leader選舉功能。同一時刻,只有一個Listener會進入takeLeadership()方法,說明它是當前的Leader。注意:當Listener從takeLeadership()退出時就說明它放棄了“Leader身份”,這時Curator會利用Zookeeper再從剩余的Listener中選出一個新的Leader。autoRequeue()方法使放棄Leadership的Listener有機會重新獲得Leadership,如果不設置的話放棄了的Listener是不會再變成Leader的。

package com.cdai.codebase.bigdata.hadoop.zookeeper.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.utils.EnsurePath;

/**
 * Curator framework's leader election test.
 * Output:
 *  LeaderSelector-2 take leadership!
 *  LeaderSelector-2 relinquish leadership!
 *  LeaderSelector-1 take leadership!
 *  LeaderSelector-1 relinquish leadership!
 *  LeaderSelector-0 take leadership!
 *  LeaderSelector-0 relinquish leadership! 
 *      ...
 */
public class CuratorLeaderTest {

    /** Zookeeper info */
    private static final String ZK_ADDRESS = "192.168.1.100:2181";
    private static final String ZK_PATH = "/zktest";

    public static void main(String[] args) throws InterruptedException {
        LeaderSelectorListener listener = new LeaderSelectorListener() {
            @Override
            public void takeLeadership(CuratorFramework client) throws Exception {
                System.out.println(Thread.currentThread().getName() + " take leadership!");

                // takeLeadership() method should only return when leadership is being relinquished.
                Thread.sleep(5000L);

                System.out.println(Thread.currentThread().getName() + " relinquish leadership!");
            }

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState state) {
            }
        };

        new Thread(() -> {
            registerListener(listener);
        }).start();

        new Thread(() -> {
            registerListener(listener);
        }).start();

        new Thread(() -> {
            registerListener(listener);
        }).start();

        Thread.sleep(Integer.MAX_VALUE);
    }

    private static void registerListener(LeaderSelectorListener listener) {
        // 1.Connect to zk
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                ZK_ADDRESS,
                new RetryNTimes(10, 5000)
        );
        client.start();

        // 2.Ensure path
        try {
            new EnsurePath(ZK_PATH).ensure(client.getZookeeperClient());
        } catch (Exception e) {
            e.printStackTrace();
        }

        // 3.Register listener
        LeaderSelector selector = new LeaderSelector(client, ZK_PATH, listener);
        selector.autoRequeue();
        selector.start();
    }

}

Copyright © Linux教程網 All Rights Reserved