系列
开篇
- 这个系列主要用以分析mqadmin常见的比较核心的几个命令,主要包括订阅分组和topic的创建和删除、Topic的权限变更、MQAdmin的启动过程。
- 这篇文章主要是用来分析MQAdmin的启动过程,核心在于namesrv地址的传递以及对应的通信Channel的创建。
MQAdmin启动过后
MQAdmin的启动过程核心逻辑如下
- 注册各类mqadmin对应的命令和处理函数。
- 通过System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr)在当前执行环境中保存rocketmq集群的namesrv地址,所有命令执行都需要namesrv地址。
- MQAdmin启动过程中创建DefaultMQAdminExt对象过程中会通过System.getProperty方法获取namesrv的地址创建Channel对象。
MQAdminStartup
public class MQAdminStartup {
protected static List<SubCommand> subCommandList = new ArrayList<SubCommand>();
private static String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
System.getenv(MixAll.ROCKETMQ_HOME_ENV));
public static void main(String[] args) {
main0(args, null);
}
public static void main0(String[] args, RPCHook rpcHook) {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
// 注册所有的命令行
initCommand();
try {
initLogback();
switch (args.length) {
case 0:
printHelp();
break;
case 2:
if (args[0].equals("help")) {
SubCommand cmd = findSubCommand(args[1]);
if (cmd != null) {
Options options = ServerUtil.buildCommandlineOptions(new Options());
options = cmd.buildCommandlineOptions(options);
if (options != null) {
ServerUtil.printCommandLineHelp("mqadmin " + cmd.commandName(), options);
}
} else {
System.out.printf("The sub command %s not exist.%n", args[1]);
}
break;
}
case 1:
default:
SubCommand cmd = findSubCommand(args[0]);
if (cmd != null) {
String[] subargs = parseSubArgs(args);
Options options = ServerUtil.buildCommandlineOptions(new Options());
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options),
new PosixParser());
if (null == commandLine) {
return;
}
// mqadmin启动的时候通过-n 参数指定namesrvAddr
// 启动的时候会在系统变量中写入"rocketmq.namesrv.addr"
if (commandLine.hasOption('n')) {
String namesrvAddr = commandLine.getOptionValue('n');
System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
}
cmd.execute(commandLine, options, AclUtils.getAclRPCHook(rocketmqHome + MixAll.ACL_CONF_TOOLS_FILE));
} else {
System.out.printf("The sub command %s not exist.%n", args[0]);
}
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
// 注册各类mqadmin对应的命令和处理函数
public static void initCommand() {
initCommand(new UpdateTopicSubCommand());
initCommand(new DeleteTopicSubCommand());
initCommand(new UpdateSubGroupSubCommand());
initCommand(new DeleteSubscriptionGroupCommand());
initCommand(new UpdateBrokerConfigSubCommand());
initCommand(new UpdateTopicPermSubCommand());
}
}
- 注册各类mqadmin对应的命令和处理函数。
- System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr)启动的时候会在系统变量中写入"rocketmq.namesrv.addr"。
DefaultMQAdminExt
public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
private final DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private String adminExtGroup = "admin_ext_group";
private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
private long timeoutMillis = 5000;
// 构造函数初始化DefaultMQAdminExtImpl对象
public DefaultMQAdminExt(RPCHook rpcHook) {
this.defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(this, rpcHook, timeoutMillis);
}
}
public class ClientConfig {
public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel";
// ClientConfig的namesrvAddr通过NameServerAddressUtils.getNameServerAddresses()获取
private String namesrvAddr = NameServerAddressUtils.getNameServerAddresses();
private String clientIP = RemotingUtil.getLocalAddress();
private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
protected String namespace;
protected AccessChannel accessChannel = AccessChannel.LOCAL;
}
public class NameServerAddressUtils {
public static final String INSTANCE_PREFIX = "MQ_INST_";
public static final String INSTANCE_REGEX = INSTANCE_PREFIX + "\\w+_\\w+";
public static final String ENDPOINT_PREFIX = "http://";
public static final Pattern NAMESRV_ENDPOINT_PATTERN = Pattern.compile("^" + ENDPOINT_PREFIX + ".*");
public static final Pattern INST_ENDPOINT_PATTERN = Pattern.compile("^" + ENDPOINT_PREFIX + INSTANCE_REGEX + "\\..*");
// 负责通过System.getProperty获取namesrv的地址
public static String getNameServerAddresses() {
// NAMESRV_ADDR_PROPERTY = "rocketmq.namesrv.addr";
return System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
}
}
- DefaultMQAdminExt的父类ClientConfig的namesrvAddr是从System.getProperty("rocketmq.namesrv.addr")获取的namesrvAddr地址。
- 创建DefaultMQAdminExtImpl的对象过程中会把DefaultMQAdminExt当作参数传递,该参数同时是ClientConfig对象。
- ClientConfig的对象包含namesrv地址信息。
DefaultMQAdminExtImpl
public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
@Override
public void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.defaultMQAdminExt.changeInstanceNameToPID();
// getOrCreateMQClientInstance会创建mqClientInstance
this.mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQAdminExt, rpcHook);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The AdminExt service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
default:
break;
}
}
}
- DefaultMQAdminExtImpl内部通过MQClientManager创建mqClientInstance对象。
MQClientManager
public class MQClientManager {
private final static InternalLogger log = ClientLogger.getLog();
private static MQClientManager instance = new MQClientManager();
private AtomicInteger factoryIndexGenerator = new AtomicInteger();
private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
new ConcurrentHashMap<String, MQClientInstance>();
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
// 通过ClientConfig创建MQClientInstance对象
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
}
- MQClientManager内部创建MQClientInstance对象。
MQClientInstance
public class MQClientInstance {
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
this.clientConfig = clientConfig;
this.instanceIndex = instanceIndex;
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
// clientConfig.getNamesrvAddr() 在这里不为空
if (this.clientConfig.getNamesrvAddr() != null) {
// mQClientAPIImpl的updateNameServerAddressList更新namesrv地址
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
}
}
- clientConfig.getNamesrvAddr()不为空,会更新mQClientAPIImpl的namesrv的地址。
MQClientAPIImpl
public class MQClientAPIImpl {
private final static InternalLogger log = ClientLogger.getLog();
private static boolean sendSmartMsg =
Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true"));
static {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
}
private final RemotingClient remotingClient;
private final TopAddressing topAddressing;
private final ClientRemotingProcessor clientRemotingProcessor;
private String nameSrvAddr = null;
private ClientConfig clientConfig;
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
final ClientRemotingProcessor clientRemotingProcessor,
RPCHook rpcHook, final ClientConfig clientConfig) {
this.clientConfig = clientConfig;
topAddressing = new TopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName());
this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
this.clientRemotingProcessor = clientRemotingProcessor;
}
public void updateNameServerAddressList(final String addrs) {
String[] addrArray = addrs.split(";");
List<String> list = Arrays.asList(addrArray);
// 更新remotingClient的namesrv的地址
this.remotingClient.updateNameServerAddressList(list);
}
}
- updateNameServerAddressList会更新remotingClient的namesrv的地址。
NettyRemotingClient
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
// 负责保存namesrv的地址
private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
public void updateNameServerAddressList(List<String> addrs) {
List<String> old = this.namesrvAddrList.get();
boolean update = false;
if (!addrs.isEmpty()) {
if (null == old) {
update = true;
} else if (addrs.size() != old.size()) {
update = true;
} else {
for (int i = 0; i < addrs.size() && !update; i++) {
if (!old.contains(addrs.get(i))) {
update = true;
}
}
}
if (update) {
Collections.shuffle(addrs);
log.info("name server address updated. NEW : {} , OLD: {}", addrs, old);
this.namesrvAddrList.set(addrs);
}
}
}
private Channel getAndCreateNameserverChannel() throws RemotingConnectException, InterruptedException {
String addr = this.namesrvAddrChoosed.get();
if (addr != null) {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
}
// 创建NameserverChannel的时候会取namesrv地址
final List<String> addrList = this.namesrvAddrList.get();
if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
addr = this.namesrvAddrChoosed.get();
if (addr != null) {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
}
if (addrList != null && !addrList.isEmpty()) {
for (int i = 0; i < addrList.size(); i++) {
int index = this.namesrvIndex.incrementAndGet();
index = Math.abs(index);
index = index % addrList.size();
String newAddr = addrList.get(index);
this.namesrvAddrChoosed.set(newAddr);
Channel channelNew = this.createChannel(newAddr);
if (channelNew != null) {
return channelNew;
}
}
throw new RemotingConnectException(addrList.toString());
}
} finally {
this.lockNamesrvChannel.unlock();
}
}
return null;
}
}
- NettyRemotingClient的namesrvAddrList保存namesrv的地址。
- NettyRemotingClient的getAndCreateNameserverChannel方法获取namesrvAddrList的地址创建namesrv的通信channel。