前言
我们在前面的几篇文章中介绍了MP4的视频文件格式,以及MP4文件内封装的AAC音频数据格式和H.264视频码流。
但是上述分析主要停留在知识层面,因此文本主要讲述通过代码来实现对上述格式的数据(文件)进行正确的解析,从而获得一些有效的信息。
目前代码已经上传到github上,有兴趣可以自取。
读取MP4文件信息
本次的解析功能主要使用python来实现(主要比较简单用起来顺手,其他编程语言比如Java,kotlin,c++等效果是一样的 )。
class Box(object):
    # 通用的4字节
    one_bytes = 1
    two_bytes = 2
    three_bytes = 3
    four_bytes = 4
    eight_bytes = 8
    # 当前box的大小
    box_size = 0
    box_header_size = 8
    box_type = ''
    # fullbox 的version字段 int
    box_version = 0
    # 当前box是否是 FullBox
    isFullBox = False
    header_read_already = False
    def __init__(self, box_type=None, size=None):
        self.box_size = size
        self.box_type = box_type
        if box_type is not None and size is not None:
            self.header_read_already = True
    def findBoxHeader(self, file):
        self.box_size = int.from_bytes(file.read(self.four_bytes))
        self.box_type = file.read(self.four_bytes).decode()
        return self.box_type, self.box_size
    def print_origin_for_test(self, file):
        if not self.header_read_already:
            self.findBoxHeader(file)
        last = file.read(self.box_size - self.four_bytes)
        print("{0} box,size:{1} last byte:{2}".format(self.box_type, self.box_size, last))
    def printSelf(self, file):
        if not self.header_read_already:
            self.findBoxHeader(file)
        print(
            "\n============================================== {0} box ==========================================".format(
                self.box_type))
        print("type:{0}  ".format( self.box_type))
        print("size:{0}  ".format( self.box_size))
        if self.box_size == 1:
            box_large_size = file.read(self.eight_bytes)
            print("large_size:{0}  ".format(box_large_size))
            self.box_header_size = self.box_header_size + 8
        if self.isFullBox:
            box_version = int.from_bytes(file.read(self.one_bytes))
            box_flags = int.from_bytes(file.read(self.three_bytes))
            print("box_version:{0}  ".format(box_version))
            print("box_flags:{0}  ".format(box_flags))
            self.box_header_size = self.box_header_size + 4
        # 是否读取了下一个box的头部信息,读了的话要返回回去
        return None,None
class FtypBox(Box):
    used_bytes = 16
    def printSelf(self, file):
        super().printSelf(file)
        minor_brand = file.read(self.four_bytes).decode()
        minor_version = int.from_bytes(file.read(self.four_bytes))
        last_size = self.box_size - self.used_bytes
        index = 0
        compatible_brand = []
        while index < last_size:
            count = 4
            compatible_brand_item = file.read(count).decode()
            compatible_brand.append(compatible_brand_item)
            index = index + count
        print("minor_brand:{0}  ".format(minor_brand))
        print("minor_version:{0}  ".format(minor_version))
        print("compatible_brand:{0}  ".format(str(compatible_brand)))
        return None,None
class MoovBox(Box):
    def printSelf(self, file):
        super().printSelf(file)
        type, size = Box().findBoxHeader(file)
        while type=='mvhd' or type == 'trak':
            if type == 'mvhd':
                mvhd_box = MvhdBox(type,size)
                type, size = mvhd_box.printSelf(file)
            elif type == 'trak':
                trak_box = TrakBox(type,size)
                type, size = trak_box.printSelf(file)
            if type is None and size is None:
                type, size = Box().findBoxHeader(file)
        return type,size;
class MvhdBox(Box):
    def __init__(self, box_type=None, size=None):
        self.isFullBox = True
        super().__init__(box_type, size)
    def printSelf(self, file):
        super().printSelf(file)
        if self.box_version == 1:
            creation_time = int.from_bytes(file.read(self.eight_bytes))
            modification_time = int.from_bytes(file.read(self.eight_bytes))
            timescale = int.from_bytes(file.read(self.four_bytes))
            duration = int.from_bytes(file.read(self.eight_bytes))
        else:
            creation_time = int.from_bytes(file.read(self.four_bytes))
            modification_time = int.from_bytes(file.read(self.four_bytes))
            timescale = int.from_bytes(file.read(self.four_bytes))
            duration = int.from_bytes(file.read(self.four_bytes))
        rate_all = int.from_bytes(file.read(self.four_bytes))
        rate_f = rate_all & 0xffff
        rate_i = rate_all >> 16
        volume_all = int.from_bytes(file.read(self.two_bytes))
        volume_f = volume_all & 0xff
        volume_i = volume_all >> 8
        reserved = file.read(self.two_bytes)
        reserved_1 = file.read(self.four_bytes)
        reserved_2 = file.read(self.four_bytes)
        matrix = []
        for i in range(9):
            matrix.append(int.from_bytes(file.read(self.four_bytes)))
        pre_define = []
        for i in range(6):
            pre_define.append(int.from_bytes(file.read(self.four_bytes)))
        next_track_ID = int.from_bytes(file.read(self.four_bytes))
        print("creation_time:{0}  ".format(creation_time))
        print("modification_time:{0}  ".format(modification_time))
        print("timescale:{0}  ".format(timescale))
        print("duration:{0}  ".format(duration))
        print("rate:{0}.{1}  ".format(rate_i, rate_f))
        print("volume:{0}.{1}  ".format(volume_i, volume_f))
        print("reserved:{0}  ".format(reserved))
        print("reserved[] :{0} , {1} ".format(reserved_1, reserved_2))
        print("matrix:{0}  ".format(matrix))
        print("pre_define:{0}  ".format(pre_define))
        print("next_track_ID:{0}  ".format(next_track_ID))
        return None,None
class TrakBox(Box):
    def printSelf(self, file):
        super().printSelf(file)
        #获取内部的下一个box
        type,size = Box().findBoxHeader(file)
        index = 0
        while type == 'tkhd' :
            tkhd_box = TkhdBox(type,size)
            type, size = tkhd_box.printSelf(file)
            print("\n{0} box remain size======> {1}".format(tkhd_box.box_type,(self.box_size-tkhd_box.box_size)))
            # box剩余的还没读取数据一次性计提
            tkhd_remain_size = file.read(self.box_size-tkhd_box.box_size)
            if type is None and size is None:
                type, size = Box().findBoxHeader(file) #读取下一个box的头部信息
        return type,size #下一个box的头部信息
class TkhdBox(Box):
    def __init__(self, box_type=None, size=None):
        self.isFullBox = True
        super().__init__(box_type, size)
    def printSelf(self, file):
        super().printSelf(file)
        if self.box_version == 1:
            creation_time = int.from_bytes(file.read(self.eight_bytes))
            modification_time = int.from_bytes(file.read(self.eight_bytes))
            track_ID = int.from_bytes(file.read(self.four_bytes))
            reserved_32 = int.from_bytes(file.read(self.four_bytes))
            duration = int.from_bytes(file.read(self.eight_bytes))
        else:
            creation_time = int.from_bytes(file.read(self.four_bytes))
            modification_time = int.from_bytes(file.read(self.four_bytes))
            track_ID = int.from_bytes(file.read(self.four_bytes))
            reserved_32 = int.from_bytes(file.read(self.four_bytes))
            duration = int.from_bytes(file.read(self.four_bytes))
        reserved_1 = int.from_bytes(file.read(self.four_bytes))
        reserved_2 = int.from_bytes(file.read(self.four_bytes))
        layer  = int.from_bytes(file.read(self.two_bytes))
        alternate_group = int.from_bytes(file.read(self.two_bytes))
        volume_all = int.from_bytes(file.read(self.two_bytes))
        volume_f = volume_all & 0xff
        volume_i = volume_all >> 8
        reserved_16 = int.from_bytes(file.read(self.two_bytes))
        matrix = []
        for i in range(9):
            matrix.append(int.from_bytes(file.read(self.four_bytes)))
        width = int.from_bytes(file.read(self.four_bytes)) >> 16
        height = int.from_bytes(file.read(self.four_bytes)) >> 16
        print("creation_time:{0}  ".format(creation_time))
        print("modification_time:{0}  ".format(modification_time))
        print("track_ID:{0}  ".format(track_ID))
        print("reserved_32:{0}  ".format(reserved_32))
        print("duration:{0}  ".format(duration))
        print("duration:{0}  ".format(duration))
        print("reserved[]:{0} {1}  ".format(reserved_1,reserved_2))
        print("layer:{0}  ".format(layer))
        print("alternate_group:{0}  ".format(alternate_group))
        print("volume:{0}.{1}  ".format(volume_i,volume_f))
        print("reserved_16:{0}  ".format(reserved_16))
        print("matrix:{0}  ".format(matrix))
        print("width:{0}  ".format(width))
        print("height:{0}  ".format(height))
        return None, None
主要实现了MP4文件的ftyp/moov/mvhd/trak/tkhd这几个box的解析,读取其中存储的信息。
核心逻辑就是读取box的头8个字节来判断它的类型和大小,然后根据对应的类型进行解析即可。
入口程序如下
def print_MP4(file_name):
    with open(file_name, 'rb') as file:
        box_type, box_size = Box().findBoxHeader(file)
        while True:
            #print("current type {0}".format(box_type))
            if box_type == 'ftyp':
                box_type, box_size = FtypBox(box_type,box_size).printSelf(file)
            elif box_type == 'moov':
                box_type, box_size = MoovBox(box_type,box_size).printSelf(file)
            else:
                break
            if box_type is None and box_size is None:
                box_type, box_size = Box().findBoxHeader(file)
        print("\n=======================read end ==============================")
if __name__ == '__main__':
    print_MP4('sample.mp4')
当然,假如只想读取特定内容则大可不必把box完整读取出来,只需要读取特定位置的内容即可。
读取AAC格式数据
aac不仅仅常用于MP4文件中的音频数据存储,它可以作为单独的音频文件被大家消费。
如果不方便找到一个AAC文件的话,可以从MP4文件中提取出一个AAC文件(使用ffmpeg):
ffmpeg -i test.mp4 -acodec aac -vn output.aac
此时我们已经获得了一个sample.aac文件(从sample.mp4中提取的),那么接下来如何读取它的数据从而获得有效的信息呢?
class ADTSHeader(object):
    one_byte = 1
    two_byte = 2
    three_byte = 3
    def getProfile(self,profile):
        profile_real = 'reserved'
        if profile == 1:
            profile_real = 'Low Complexity profile (LC) '
        elif profile == 2:
            profile_real = 'Scalable Sampling Rate profile (SSR)'
        elif profile == 0:
            profile_real = 'Main Profile'
        return profile_real
    def getLayer(self,layer):
        layer_real = 'reserved'
        if layer == 1:
            layer_real = 'Layer III'
        elif layer == 2:
            layer_real = 'Layer II'
        elif layer == 3:
            layer_real = 'Layer I'
        return layer_real
    def getChannelConfiguration(self,chanel):
        channel_configure = str(chanel)
        if chanel == 6:
            channel_configure = '5+1'
        elif chanel == 7:
            channel_configure = '7+1'
        return channel_configure
    def getSampling(self,sampling_frequency):
        sampling_frequency_value = '0'
        if sampling_frequency == 0:
            sampling_frequency_value = '96khz'
        elif sampling_frequency == 1:
            sampling_frequency_value = '88.2khz'
        elif sampling_frequency == 2:
            sampling_frequency_value = '64khz'
        elif sampling_frequency == 3:
            sampling_frequency_value = '48khz'
        elif sampling_frequency == 4:
            sampling_frequency_value = '44.1khz'
        elif sampling_frequency == 5:
            sampling_frequency_value = '32khz'
        elif sampling_frequency == 6:
            sampling_frequency_value = '24khz'
        elif sampling_frequency == 7:
            sampling_frequency_value = '22khz'
        elif sampling_frequency == 8:
            sampling_frequency_value = '16khz'
        elif sampling_frequency == 9:
            sampling_frequency_value = '12khz'
        elif sampling_frequency == 10:
            sampling_frequency_value = '11.025khz'
        elif sampling_frequency == 10:
            sampling_frequency_value = '0.8khz'
        else:
            sampling_frequency_value = 'reserved'
        return sampling_frequency_value
    def printSelf(self,file):
        result = int.from_bytes(file.read(self.two_byte))
        syncword = result >> 4
        id = (result & 0x0008) >> 3
        layer = (result & 0x0006) >> 1
        protection_absent = (result & 0x0001)
        result = int.from_bytes(file.read(self.two_byte))
        profile = result >> 14
        sampling_frequency_index = (result & 0x3c00) >> 10
        private_bit = (result & 0x0200) >> 9
        channel_configuration = (result & 0x01c0) >> 6
        original_copy = (result & 0x0020) >> 5
        home = (result & 0x0010) >> 4
        # 以下是可变头部的数据读取
        copyright_identification_bit = (result & 0x0008) >> 3
        copyright_identification_start = (result & 0x0004) >> 2
        remain_2 = (result & 0x3) # 剩余2bit
        result = int.from_bytes(file.read(self.three_byte)) #读取剩余3byte
        aac_frame_length = (result >> 13) | (remain_2 << 11)
        adts_buffer_fullness = (result & 0x1ffc) >> 2
        number_of_raw_data_blocks_in_frame = (result & 0x3)
        print("================================= adts_fixed_header ==========================")
        print("syncword: {0}".format(hex(syncword)))
        print("id: {0}".format(id))
        print("layer: {0} : {1}".format(layer,self.getLayer(layer)))
        print("protection_absent: {0}".format(protection_absent))
        print("profile:  {0} ".format(self.getProfile(profile)))
        print("sampling_frequency_index:  {0} ".format(self.getSampling(sampling_frequency_index)))
        print("private_bit: {0}".format(private_bit))
        print("channel_configuration: {0} ".format(self.getChannelConfiguration(channel_configuration)))
        print("original_copy: {0}".format(original_copy))
        print("home: {0}".format(home))
        print("================================= adts_variable_header ==========================")
        print("copyright_identification_bit: {0}".format(copyright_identification_bit))
        print("copyright_identification_start: {0}".format(copyright_identification_start))
        print("aac_frame_length: {0}".format(aac_frame_length))
        print("adts_buffer_fullness: {0}".format(hex(adts_buffer_fullness)))
        print("number_of_raw_data_blocks_in_frame: {0}".format(number_of_raw_data_blocks_in_frame))
入口程序如下:
def print_AAC(file_name):
    with open(file_name, 'rb') as file:
        ADTSHeader().printSelf(file)
        print("\n=======================read end ==============================")
if __name__ == '__main__':
    print_AAC('sample.aac')
程序执行之后打印的内容如下:
================================= adts_fixed_header ==========================
syncword: 0xfff
id: 0
layer: 0 : reserved
protection_absent: 1
profile:  Low Complexity profile (LC)  
sampling_frequency_index:  44.1khz  // 44.1khz
private_bit: 0
channel_configuration: 2   // 两个声道
original_copy: 0
home: 0
================================= adts_variable_header ==========================
copyright_identification_bit: 0
copyright_identification_start: 0
aac_frame_length: 378
adts_buffer_fullness: 0x7ff
number_of_raw_data_blocks_in_frame: 0
=======================read end ==============================
然后我们利用ffmpeg打印sample.aac文件的基本信息,对照以下看是否一致
> ffprobe -show_streams sample.aac
[STREAM]
index=0
codec_name=aac
codec_long_name=AAC (Advanced Audio Coding)
profile=LC
codec_type=audio
codec_tag_string=[0][0][0][0]
codec_tag=0x0000
sample_fmt=fltp
sample_rate=44100  
channels=2
channel_layout=stereo
...
...
[/STREAM]
采样率,声道数,profile这些都是一致的。
读取H.264码流
首先当然是从MP4文件中提取H.264码流数据:
ffmpeg -i sample.mp4 -codec copy -bsf: h264_mp4toannexb -f h264 sample.264
然后我们就可以按照H.264数据编码格式来读取一些信息了。H.264格式解析见ffmpeg开发——初探H.264
具体的解析逻辑如下:
class NALU(object):
    forbidden_zero_bit = -1
    nal_ref_idc = -1
    nal_unit_type = -1
    nal_unit_type_str = ''
    start_in_file = -1
    end_in_file = -1
    size = -1
    def copy_from(self,nalu_obj):
        self.forbidden_zero_bit = nalu_obj.forbidden_zero_bit
        self.nal_ref_idc = nalu_obj.nal_ref_idc
        self.nal_unit_type = nalu_obj.nal_unit_type
        self.nal_unit_type_str = nalu_obj.nal_unit_type_str
        self.start_in_file = nalu_obj.start_in_file
        self.end_in_file = nalu_obj.end_in_file
        self.size = nalu_obj.size
    def parse_data(self,file):
        pass
class NaluDataFinder(object):
    BYTE_ONE = 1
    BYTE_TWO = 2
    BYTE_THREE = 3
    BYTE_FOUR = 4
    BYTE_10M = 10*1024*1024
    def isStartCode(self,file):
        data_byte = file.read(self.BYTE_THREE)
        if len(data_byte) < self.BYTE_THREE: # 没读到预期值表明已经读到结尾了
            #print("数据不足 {0},已经读到文件末尾了".format(len(data_byte)))
            return False ,len(data_byte)
        data =  int.from_bytes(data_byte)
        if not data_byte:
            return False,0
        #print(hex(data))
        byte_num = 3
        if data == 0x000001:
            #print("start code 0x000001")
            return True,byte_num
        if data == 0x000000:
            end = int.from_bytes(file.read(self.BYTE_ONE))
            if not end:
                return False,byte_num
            byte_num = byte_num+1
            data = (data << 8) | end
            if end == 0x01:
                #print("start code 0x00000001  ===== ")
                return True, byte_num
        #print("start code not found !!!")
        return False,byte_num
    def getNALUType(self,nalu_type):
        if nalu_type == 0:
            return "unspecified"
        elif nalu_type == 1:
            return "non-IDR slice layer"
        elif nalu_type == 2 or nalu_type == 3 or nalu_type == 4:
            return "A/B/C slice data"
        elif nalu_type == 5:
            return "IDR slice layer"
        elif nalu_type == 6:
            return "SEI"
        elif nalu_type == 7:
            return "SPS"
        elif nalu_type == 8:
            return "PPS"
        elif nalu_type == 9:
            return "unit-delimiter"
        else:
            return "other-type"
    def printSelf(self,file,start_index):
        isEnd = False
        nalu_size = 1 #当前的nalu的大小
        one_byte_data = file.read(self.BYTE_ONE)
        nalu_obj = NALU()
        # if not one_byte_data:
        #     #print("read file eof +++")
        #     isEnd = True
        #     return isEnd,0
        byte_data = int.from_bytes(one_byte_data)
        forbidden_zero_bit = byte_data >> 7
        nal_ref_idc = (byte_data & 0x70) >> 5
        nal_unit_type = (byte_data & 0x1f)
        nalu_obj.forbidden_zero_bit = forbidden_zero_bit
        nalu_obj.nal_ref_idc = nal_ref_idc
        nalu_obj.nal_unit_type = nal_unit_type
        nalu_obj.nal_unit_type_str = self.getNALUType(nal_unit_type)
        nalu_obj.start_in_file = start_index
        is_start_code, read_byte_num =self.isStartCode(file)
        while not is_start_code and read_byte_num >= 3:
            nalu_size = nalu_size+1;
            seek_num = 0-(read_byte_num-1)
            file.seek(seek_num,1)
            is_start_code, read_byte_num = self.isStartCode(file)
        if not is_start_code and read_byte_num < 3:
            nalu_size = nalu_size+read_byte_num
        nalu_obj.end_in_file = start_index+nalu_size
        nalu_obj.size = nalu_size
        # print("nalu size: {0}  in file start:{1} end:{2}".format(nalu_size,nalu_obj.start_in_file,nalu_obj.end_in_file))
        next_start = start_index + nalu_size + read_byte_num
        if read_byte_num < 3:
            print("read file eof ===")
            isEnd = True
            return isEnd,next_start,nalu_obj
        return isEnd,next_start,nalu_obj
    
    
    
class H264Reader(object):
    def printSelf(self,file):
        nalu_finder =  NaluDataFinder()
        is_start_code,read_byte_num = nalu_finder.isStartCode(file)
        isEnd = False
        if read_byte_num == 0: # 读取完毕
            #print("read file eof ----")
            isEnd = True
            return isEnd
        start_index = read_byte_num
        nalu_array = [] # 存储NALU列表
        end,start_index,nalu_obj = nalu_finder.printSelf(file,start_index)
        nalu_array.append(nalu_obj)
        while not end:
            end,start_index,nalu_obj = nalu_finder.printSelf(file,start_index)
            nalu_array.append(nalu_obj)
入口程序也是类似:
def print_h264(file_name):
    with open(file_name, 'rb') as file:
        H264Reader().printSelf(file)
        print("\n======================= read end ==============================")
if __name__ == '__main__':
    print_h264('sample.264')
对于H.264数据解析目前只解析到NALU的层级,定位了h264码流中的每个NALU所在的位置,后面会补充一些具体的结构比如SLice,SPS,PPS等的解析逻辑,会直接更新在github上。
总结
其实无论解析MP4封装文件,还是AAC音频,h264码流,当我们了解了他们的内部结构的定义之后,解析的逻辑可以称得上是按部就班,读取每个字节,甚至每个bit的数据,把他们按照定义标准文档解读出来即可。
