Hadoop 源码学习笔记(3)--Hdfs的RPC通信框架

前言

单个 Hdfs 集群中可能存在成百上千个 DataNode ,但默认情况下 NameNode 只有一个 , 各个节点不断的进行内部通信,如果不能快速的处理掉通信消息,可能会导致掉节点,或者数据传输缓慢等问题。因此Hdfs内部集群对内部RPC通信具有较高的性能要求。

本文会对 Hdfs 集群的RPC通信框架进行分析,看看它是如何保证节点通信的效率。

Protobuf 简介

在 Hdfs 中,为了提升内部通信的传输效率,整个RPC通信框架使用 Google 的 Protobuf 序列化框架进行数据传输。为了方便后续理解,这里先对 Protobuf 进行简单介绍。

数据传输

Protobuf 首先是一个跨语言的数据传输框架。把它和 XML 和 JSON进行对比可以看出:

语言 特点 可读性 数据Size 解析效率
Protobuf 将数据内容解析成纯字节形式传输 数据以字节形式存在,不具备可读性 占用数据量少 直接读取数据内容,效率高
XML和JSON 引入额外文本构造出格式化数据 额外文本使得数据具备良好的可读性 数据以字符形式存在,且额外文本占用大量空间 需要解析剔除额外数据,效率低

XML和Json都是将数据封装成一个格式化文本,因此在必要的传输数据之外,还有大量的额外文本进行状态描述。而 Protobuf 通过将数据字段序列化成为一串不可读的字节码,同XML和Json相比,对于同样的数据,它所需要传输的数据量更小,解析的速度更快。

Protobuf 也是一门天生的跨语言数据传输框架。 对于不同的语言,都用同一个 .proto 的文件进行数据描述,如下:

message Person {
  string name = 1;
  int32 id = 2;  // Unique ID number for this person.
  string email = 3;
}

代码中的 Person 数据,可以通过 Google 或者三方的 protobuf 处理工具,被转化为特定编程语言下的数据对象。

例如,在Java代码中,通过 .proto 文件生成一个 AddressBook 数据类,那么生成的 Java 文件中会自带 mergeFromwriteTo 方法如下:

// 从输入流中反序列化数据
AddressBook.Builder addressBook = AddressBook.newBuilder();
addressBook.mergeFrom(new FileInputStream(args[0]));

// 序列化数据到输入流
FileOutputStream output = new FileOutputStream(args[0]);
addressBook.build().writeTo(output);

通过Protobuf内部的IO逻辑,我们可以将指定的数据转化为少量的字节码进行传输,从而提升整体的传输效率。

对于任意语言,只要以同样的方式记录和读取同一份字节码数据就可以得到同样的数据对象,从而保证序列化数据的可还原性。同时,在数据的序列化过程中,由于没有额外文本的参与,也不需要保持数据在传输过程中的可读性,因此对于同一个数据,Protobuf拥有比XML和Json更小的数据量和更快的解析速度。

RPC 调用

Protobuf 除了实现数据的传输作用以外,还实现了一套RPC远程调用框架。

定义一个 .proto 文件如下

option java_generic_services = true;

service ReconfigurationProtocolService {
    rpc getReconfigurationStatus(GetReconfigurationStatusRequestProto)
      returns(GetReconfigurationStatusResponseProto);
}

使用Protobuf编译工具进行处理之后,可以得到一个 ReconfigurationProtocolService 接口,例如上方代码对应的接口中会有一个叫做 getReconfigurationStatus,参数类型为GetReconfigurationStatusRequestProto, 返回值为GetReconfigurationStatusResponseProto 的方法。

// 构造BlockingService
ReconfigurationProtocolServerSideTranslatorPB reconfigurationProtocolXlator
        = new ReconfigurationProtocolServerSideTranslatorPB(this);
BlockingService reconfigurationPbService = ReconfigurationProtocolService
        .newReflectiveBlockingService(reconfigurationProtocolXlator);

// 调用BlockingService
service.callBlockingMethod(methodDescriptor, null, param);

在 Java 文件中,通过动态代理得到一个BlockingService对象,内部包裹一个实现了 ReconfigurationProtocolService.BlockingInterface 接口的对象。

当需要使用RPC服务时,系统通过传输需要调用的方法名和相关的调用参数,使用 BlockingService::callBlockingMethod,就可以在Server端解析调用逻辑,实现RPC远程调用。

RPC通信的逻辑实现

总览

言归正传,我们回到 Hdfs 的内部通信机制本身。

RPC操作

如上图中,ProxyImpl是对同一个RPC调用接口的实现类,当Proxy中的接口被调用时,通过Client发送消息到 ServerServer 会按照标准数据格式进行解析,再调用Server侧的 Impl方法进行执行,并返回结果数据。Client 发送消息到 Server 的过程对于接口访问而言是透明的,对于使用者来说,他在本地执行 Proxy 的接口,会得到具有相同接口的 Impl 的调用结果。

不同的RPC框架的具体实现逻辑不尽相同,在Hdfs中,RPC.Server类扮演RPC框架中的 Server 角色,处理响应内部通信请求; Client 类扮演RPC框架中的 Client 角色,负责调用消息的发送和结果数据接收。

接下来会针对 Server 和 Client 的进行代码逻辑的走读。

Server

RPC.Server的源码路径是 $src/hadooop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java

RPC 类中有一个 Builder 类负责构造 RPC.Server,在构造方法中我们看到:

public Server build() throws IOException, HadoopIllegalArgumentException {
    return getProtocolEngine(this.protocol, this.conf).getServer(
          this.protocol, this.instance, this.bindAddress, this.port,
          this.numHandlers, this.numReaders, this.queueSizePerHandler,
          this.verbose, this.conf, this.secretManager, this.portRangeConfig);
}

默认情况下,通过 getProtocolEngine 都是得到一个 ProtobufRpcEngine 对象,再通过ProtobufRpcEngine::getServer构造出 ProtobufRpcEngine.Server 对象。

ProtobufRpcEngine.ServerServer 的子类,整个内部通信机制在 Server 类中就已经实现了,下面是 Server 中的数据处理流程。

Server通信逻辑

Server类中使用了四种类型的线程类,分别是Listener,Reader,HandlerResponder。如上图所示,为了方便表示各个线程间的通信逻辑,使用泳道代表着对应类型的线程类操作时锁使用的关键方法。

Listener

Listener 作为单线程任务负责监听指定端口的socketACCEPT 请求,当新的 socket链接到来时,将其封装成一个 Connection 对象,通过addConnection添加Reader的处理队列中。

Server 中只有一个 Listener 线程负责接收新的socket请求,但有多个 Reader 线程,在Listener::doAccept 中会根据以下代码尽可能将 Connection 平均分配到各个 Reader中,让多个线程可以同时读取不同的 socket 数据,从而避免Listener单线程引起的性能瓶颈。

Reader getReader() {
      currentReader = (currentReader + 1) % readers.length;
      return readers[currentReader];
}

Reader

Reader负责内部通信数据的解析工作,它不断尝试从Connection所包装的socket对象中读取数据。当发现某个 socket 可读时,通过 readAndProcess-> processOneRpc 处理到来的消息。

private void processOneRpc(ByteBuffer bb) throws IOException, WrappedRpcServerException, InterruptedException {
    final RpcWritable.Buffer buffer = RpcWritable.Buffer.wrap(bb);
    final RpcRequestHeaderProto header = getMessage(RpcRequestHeaderProto.getDefaultInstance(), buffer);
    callId = header.getCallId();
    if (callId < 0) { 
        processRpcOutOfBandRequest(header, buffer);
    } else if(!connectionContextRead) {
        throw new WrappedRpcServerException();
    } else {
        processRpcRequest(header, buffer);
    }
}

从上面的代码可以看出,每次从 socket 请求传来的数据请求都必然带着一个 RpcRequestHeaderProto 对象,这个对象中封装着后续参数的相关信息,就像 Http 协议中的头信息。

当 socket 初次建立链接时,需要通过 procesRpcOutOfBandRequest 进行链接初始化,初始化时的 callId < 0。初始化完成之后,后续请求通过 processRpcRequest 进行消费。

private void processRpcRequest(RpcRequestHeaderProto header,
    RpcWritable.Buffer buffer) throws WrappedRpcServerException,
    InterruptedException {
    Class<? extends Writable> rpcRequestClass =  getRpcRequestWrapper(header.getRpcKind());
    Writable rpcRequest;
    rpcRequest = buffer.newInstance(rpcRequestClass, conf);
    RpcCall call = new RpcCall(this, header.getCallId(),
          header.getRetryCount(), rpcRequest,
          ProtoUtil.convert(header.getRpcKind()),
          header.getClientId().toByteArray(), traceScope, callerContext);
    queueCall(call);
}

这里根据RpcRequestHeaderProto中包含的body类型解析出对应的数据类,将其封装成一个 RpcCall 对象,放入 Handler 的消费队列中。

Handler

Handler 线程负责具体指令的执行工作。

final Call call = callQueue.take(); // pop the queue; maybe blocked here
CurCall.set(call);
// always update the current call context
CallerContext.setCurrent(call.callerContext);
UserGroupInformation remoteUser = call.getRemoteUser();
if (remoteUser != null) {
    remoteUser.doAs(call);
} else {
    call.run();
}

Handler 的循环队列中,不断从 callQueue 中获取需要消费的任务信息,然后通过 call.run() 进行任务执行。

@Override
public Void run() throws Exception {
    Writable value = null;
    ResponseParams responseParams = new ResponseParams();
    value = call(rpcKind, connection.protocolName, rpcRequest, timestamp);
    if (!isResponseDeferred()) {
        setupResponse(this, responseParams.returnStatus, responseParams.detailedErr, value, responseParams.errorClass, responseParams.error);
        sendResponse();
    }
}    

RpcCall::run 中我们看到,系统实际上是通过Server::call方法执行的,这个方法在 RPC.Server 中被实现。

static { // Register the rpcRequest deserializer for ProtobufRpcEngine
    org.apache.hadoop.ipc.Server.registerProtocolEngine(
        RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcProtobufRequest.class,
        new Server.ProtoBufRpcInvoker());
}

@Override
public Writable call(RPC.RpcKind rpcKind, String protocol,
        Writable rpcRequest, long receiveTime) throws Exception {
    return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
          receiveTime);
}

// Server.ProtoBufRpcInvoker
public Writable call(RPC.Server server, String connectionProtocolName,
          Writable writableRequest, long receiveTime) throws Exception {
    RpcProtobufRequest request = (RpcProtobufRequest) writableRequest;
    RequestHeaderProto rpcRequest = request.getRequestHeader();
    String methodName = rpcRequest.getMethodName();
    
    String declaringClassProtoName = 
            rpcRequest.getDeclaringClassProtocolName();
    long clientVersion = rpcRequest.getClientProtocolVersion();
    
    ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, 
                              declaringClassProtoName, clientVersion);
    BlockingService service = (BlockingService) protocolImpl.protocolImpl;
    result = service.callBlockingMethod(methodDescriptor, null, param);
}

从源码中可以看到,RPC.Server::call经过层层路径,最终在Server.ProtoBufRpcInvoker 根据传入的数据找到对应的BlockingService,利用 Protobuf (这里没有使用Protobuf内置的RpcChannel,而是自己手动调用BlockingService::callBlockingMethod)实现方法的调用。

Responder

Reponder 线程的 while 循环中,我们看到当socket可写时,会尝试调用 doAsyncWrite->processResponse 进行写入操作

private boolean processResponse(LinkedList<RpcCall> responseQueue,
                                    boolean inHandler) throws IOException {
    call = responseQueue.removeFirst();
    SocketChannel channel = call.connection.channel;
    int numBytes = channelWrite(channel, call.rpcResponse);
    if (numBytes < 0) {
        return true;
    }
    if (!call.rpcResponse.hasRemaining()) {
        ...
    } else {
        call.connection.responseQueue.addFirst(call);
    }
    return done;
}

private int channelWrite(WritableByteChannel channel, 
                           ByteBuffer buffer) throws IOException {
    int count =  (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
                 channel.write(buffer) : channelIO(null, channel, buffer);
    if (count > 0) {
        rpcMetrics.incrSentBytes(count);
    }
    return count;
}

Responder会将得到的 response 写入socket 的输出流中,返回给Client。

Client

Client 的源码路径是 $src/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

@Override
@SuppressWarnings("unchecked")
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
      InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
      SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy,
      AtomicBoolean fallbackToSimpleAuth) throws IOException {

    final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
        rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth);
    return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
        protocol.getClassLoader(), new Class[]{protocol}, invoker), false);
}

Client 端通过 ProtobufRpcEngine::getProxy 构建出一个动态代理的接口对象。当 Client 访问接口时,通过 Invoker 类通知 Client 发送请求给 Server。

public Message invoke(Object proxy, final Method method, Object[] args) throws ServiceException {
    RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
    final Message theRequest = (Message) args[1];
    final RpcWritable.Buffer val;
    val = (RpcWritable.Buffer) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
            new RpcProtobufRequest(rpcRequestHeader, theRequest), remoteId,
            fallbackToSimpleAuth);
    return getReturnMessage(method, val);
}

Invoker 会根据访问接口的签名信息构造出一个 RequestHeaderProto 对象,在上一小节中,我们看到当 Server 接收到 socket 信息时,会先读取这个 RequestHeaderProto,了解当前调用的方法名称,然后进行后续分发。

RequestHeaderProto 对象随着 Message 对象一起被封装成一个 Call 对象传递给 Client 进行发送,每一个 Call 对象会有一个唯一的 callId, 便于在接收到返回信息中,返回给指定的 Call

Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
      ConnectionId remoteId, int serviceClass,
      AtomicBoolean fallbackToSimpleAuth) throws IOException {
  final Connection connection = getConnection(remoteId, call, serviceClass,
  fallbackToSimpleAuth);
  connection.sendRpcRequest(call);
}

private Connection getConnection(ConnectionId remoteId,
      Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth)
      throws IOException {
  connection = connections.get(remoteId);
  Connection existing = connections.putIfAbsent(remoteId, connection);
  if (connection == null) {
        connection = new Connection(remoteId, serviceClass);
  }
  connection.setupIOstreams(fallbackToSimpleAuth);
  return connection;
}

Client 有一个 connectionsConnection 队列负责同各个节点的NameNode 进行通信,首次构造 Connection 对象后,通过 setupIOstreams初始化链接信息,同时发送相关的设置信息到 Server::processRpcOutOfBandRequest 中进行Server侧的初始化。

当有一个可用的Connection 后,通过 connection::sendRpcRequest将请求发送给对应的Server

同时Connection 也是一个线程类,在 setupIOstreams 的时候会启动接收线程。接收线程在收到消息之后,根据消息中的唯一callId将返回数据返回给指定的 Call 对象,完成整个 Client 的通信流程。

NameNode 和 DataNode的心跳逻辑

接下来,以 NameNodeDataNode的心跳发送机制为例,举例说明内部通信的流程。

在 Hdfs 中,心跳是单向的,总是由DataNode主动上报当前状态到NameNode中,因此对于心跳而言,NameNode是Server,DataNode是Client。

DataNode

在前一篇文章中,我介绍了DataNode 在启动的时候,会构造一个 BlockPoolManager 对象,在 BlockPoolManager 中有一个 BPOfferService的集合对象。

BPOfferService(List<InetSocketAddress> nnAddrs, List<InetSocketAddress> lifelineNnAddrs, DataNode dn) {
    for (int i = 0; i < nnAddrs.size(); ++i) {
        this.bpServices.add(new BPServiceActor(nnAddrs.get(i),
          lifelineNnAddrs.get(i), this));
    }
}

void start() {
    for (BPServiceActor actor : bpServices) {
        actor.start();
    }
}

每一个BPOfferService对应着一个 NameService , 对于 NameService 的每一个 NameNode 节点,会对应 BPServiceActor 的Runnable类。在启动BPOfferService的时候,其实就是启动每一个BPServiceActor类。

void start() {
    bpThread = new Thread(this, formatThreadName("heartbeating", nnAddr));
    bpThread.start();
}

@Override
public void run() {
    connectToNNAndHandshake();
    while (shouldRun()) {
        offerService();
    }
}

private void offerService() throws Exception {
    while (shouldRun()) {
        final long startTime = scheduler.monotonicNow();
         final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);
         HeartbeatResponse resp = null;
         if (sendHeartbeat) {
            resp = sendHeartBeat(requestBlockReportLease);
         }
         ....
    }
}

BPServiceActor类本身是一个Runnable的实现类,在线程循环中,先链接到NameNode ,再在 while 循环中不断offerService

offerService中,通过 sendHeartBeat 进行周期性的心跳发送。

private void connectToNNAndHandshake() throws IOException {
    // get NN proxy
    bpNamenode = dn.connectToNN(nnAddr);

    // First phase of the handshake with NN - get the namespace
    // info.
    NamespaceInfo nsInfo = retrieveNamespaceInfo();

    // Verify that this matches the other NN in this HA pair.
    // This also initializes our block pool in the DN if we are
    // the first NN connection for this BP.
    bpos.verifyAndSetNamespaceInfo(this, nsInfo);
    
    // Second phase of the handshake with the NN.
    register(nsInfo);
}

HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
      throws IOException {
    scheduler.scheduleNextHeartbeat();
    scheduler.updateLastHeartbeatTime(monotonicNow());
    return bpNamenode.sendHeartbeat(bpRegistration,
        reports,
        dn.getFSDataset().getCacheCapacity(),
        dn.getFSDataset().getCacheUsed(),
        dn.getXmitsInProgress(),
        dn.getXceiverCount(),
        numFailedVolumes,
        volumeFailureSummary,
        requestBlockReportLease);
}

// DatanodeProtocolClientSideTranslatorPB.java
@Override
public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
      StorageReport[] reports, long cacheCapacity, long cacheUsed,
      int xmitsInProgress, int xceiverCount, int failedVolumes,
      VolumeFailureSummary volumeFailureSummary,
      boolean requestFullBlockReportLease) throws IOException {
  HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
        .setRegistration(PBHelper.convert(registration))
        .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
        .setFailedVolumes(failedVolumes)
        .setRequestFullBlockReportLease(requestFullBlockReportLease);
  resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build());
  return new HeartbeatResponse(cmds, PBHelper.convert(resp.getHaStatus()),
        rollingUpdateStatus, resp.getFullBlockReportLeaseId());
}

connectToNNAndHandshake中,通过ProtobufRpcEngine::getProxy 获得一个bpNamenode 的RPC代理类,调用 bpNamenode.sendHeartbeat时,通过动态代理将消息通过 Client 发送出去。

NameNode

DataNode发送了心跳之后,对应的NameNode会接收到一条对应的请求信息。

通过走读代码,我们找到了同样实现 DatanodeProtocolService 接口的是DatanodeProtocolServerSideTranslatorPB 类。

public HeartbeatResponseProto sendHeartbeat(RpcController controller,
      HeartbeatRequestProto request) throws ServiceException {
  return namesystem.handleHeartbeat(nodeReg, report,
        dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
        failedVolumes, volumeFailureSummary, requestFullBlockReportLease);
}

DatanodeProtocolServerSideTranslatorPB::sendHeartbeat 中通过事件分发将心跳事件交给 FSNamesystem 进行消费,从而完成了 DataNodeNameNode 的心跳事件。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,525评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,203评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,862评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,728评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,743评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,590评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,330评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,244评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,693评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,885评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,001评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,723评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,343评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,919评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,042评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,191评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,955评论 2 355

推荐阅读更多精彩内容