数据发布订阅(配置中心)
何为配置中心
单体应用配置集成在一个系统中,分布式系统中每个系统或多或少都需要做一些配置,如何解决系统参数配置,及动态改参问题?
配置中心。
ZooKeeper 实现配置中心
依靠下列两点来实现配置中心
- znode 能存储数据。
- watch 能监听数据改变。
数据在zookeeper中如何存放
- 一个配置项一个 znode。
- 一个配置文件一个 znode。
示例
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
public class ConfigCenterDemo {
public void put2zk(){
ZkClient zkClient = new ZkClient("192.168.3.111:2181");
zkClient.setZkSerializer(new SerializableSerializer());
String configPath = "/config";
String value = "info_zookeeper_1";
if(zkClient.exists(configPath)){
zkClient.writeData(configPath,value);
}else{
zkClient.createPersistent(configPath,value);
}
zkClient.close();
}
public void putFile2zk() throws IOException {
File file = new File(this.getClass().getResource("/config.xml").getFile());
FileInputStream fileInputStream = new FileInputStream(file);
byte[] datas = new byte[(int) file.length()];
fileInputStream.read(datas);
fileInputStream.close();
ZkClient zkClient = new ZkClient("192.168.3.111:2181");
zkClient.setZkSerializer(new BytesPushThroughSerializer());
String configPath = "/config_1";
if(zkClient.exists(configPath)){
zkClient.writeData(configPath,datas);
}else {
zkClient.createPersistent(configPath,datas);
}
zkClient.close();
}
public void getConfigFromZk(){
ZkClient zkClient = new ZkClient("192.168.3.2181");
zkClient.setZkSerializer(new SerializableSerializer());
String configPath="/config";
String value = zkClient.readData(configPath);
System.out.println("从zk读到的config配置的值:"+value);
// 监控更新
zkClient.subscribeDataChanges(configPath, new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println("获得更新的配置值:"+data);
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
}
});
// 为了样式实时获取配置加的等待,实际项目中根据场景写(可用阻塞方式)
try{
Thread.sleep(5*60*100);
}catch (InterruptedException e){
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
ConfigCenterDemo demo = new ConfigCenterDemo();
demo.put2zk();
demo.putFile2zk();
demo.getConfigFromZk();
}
}