前言
我们在前面的几篇文章中介绍了MP4的视频文件格式,以及MP4文件内封装的AAC音频数据格式和H.264视频码流。
但是上述分析主要停留在知识层面,因此文本主要讲述通过代码来实现对上述格式的数据(文件)进行正确的解析,从而获得一些有效的信息。
目前代码已经上传到github上,有兴趣可以自取。
读取MP4文件信息
本次的解析功能主要使用python来实现(主要比较简单用起来顺手,其他编程语言比如Java,kotlin,c++等效果是一样的 )。
class Box(object):
# 通用的4字节
one_bytes = 1
two_bytes = 2
three_bytes = 3
four_bytes = 4
eight_bytes = 8
# 当前box的大小
box_size = 0
box_header_size = 8
box_type = ''
# fullbox 的version字段 int
box_version = 0
# 当前box是否是 FullBox
isFullBox = False
header_read_already = False
def __init__(self, box_type=None, size=None):
self.box_size = size
self.box_type = box_type
if box_type is not None and size is not None:
self.header_read_already = True
def findBoxHeader(self, file):
self.box_size = int.from_bytes(file.read(self.four_bytes))
self.box_type = file.read(self.four_bytes).decode()
return self.box_type, self.box_size
def print_origin_for_test(self, file):
if not self.header_read_already:
self.findBoxHeader(file)
last = file.read(self.box_size - self.four_bytes)
print("{0} box,size:{1} last byte:{2}".format(self.box_type, self.box_size, last))
def printSelf(self, file):
if not self.header_read_already:
self.findBoxHeader(file)
print(
"\n============================================== {0} box ==========================================".format(
self.box_type))
print("type:{0} ".format( self.box_type))
print("size:{0} ".format( self.box_size))
if self.box_size == 1:
box_large_size = file.read(self.eight_bytes)
print("large_size:{0} ".format(box_large_size))
self.box_header_size = self.box_header_size + 8
if self.isFullBox:
box_version = int.from_bytes(file.read(self.one_bytes))
box_flags = int.from_bytes(file.read(self.three_bytes))
print("box_version:{0} ".format(box_version))
print("box_flags:{0} ".format(box_flags))
self.box_header_size = self.box_header_size + 4
# 是否读取了下一个box的头部信息,读了的话要返回回去
return None,None
class FtypBox(Box):
used_bytes = 16
def printSelf(self, file):
super().printSelf(file)
minor_brand = file.read(self.four_bytes).decode()
minor_version = int.from_bytes(file.read(self.four_bytes))
last_size = self.box_size - self.used_bytes
index = 0
compatible_brand = []
while index < last_size:
count = 4
compatible_brand_item = file.read(count).decode()
compatible_brand.append(compatible_brand_item)
index = index + count
print("minor_brand:{0} ".format(minor_brand))
print("minor_version:{0} ".format(minor_version))
print("compatible_brand:{0} ".format(str(compatible_brand)))
return None,None
class MoovBox(Box):
def printSelf(self, file):
super().printSelf(file)
type, size = Box().findBoxHeader(file)
while type=='mvhd' or type == 'trak':
if type == 'mvhd':
mvhd_box = MvhdBox(type,size)
type, size = mvhd_box.printSelf(file)
elif type == 'trak':
trak_box = TrakBox(type,size)
type, size = trak_box.printSelf(file)
if type is None and size is None:
type, size = Box().findBoxHeader(file)
return type,size;
class MvhdBox(Box):
def __init__(self, box_type=None, size=None):
self.isFullBox = True
super().__init__(box_type, size)
def printSelf(self, file):
super().printSelf(file)
if self.box_version == 1:
creation_time = int.from_bytes(file.read(self.eight_bytes))
modification_time = int.from_bytes(file.read(self.eight_bytes))
timescale = int.from_bytes(file.read(self.four_bytes))
duration = int.from_bytes(file.read(self.eight_bytes))
else:
creation_time = int.from_bytes(file.read(self.four_bytes))
modification_time = int.from_bytes(file.read(self.four_bytes))
timescale = int.from_bytes(file.read(self.four_bytes))
duration = int.from_bytes(file.read(self.four_bytes))
rate_all = int.from_bytes(file.read(self.four_bytes))
rate_f = rate_all & 0xffff
rate_i = rate_all >> 16
volume_all = int.from_bytes(file.read(self.two_bytes))
volume_f = volume_all & 0xff
volume_i = volume_all >> 8
reserved = file.read(self.two_bytes)
reserved_1 = file.read(self.four_bytes)
reserved_2 = file.read(self.four_bytes)
matrix = []
for i in range(9):
matrix.append(int.from_bytes(file.read(self.four_bytes)))
pre_define = []
for i in range(6):
pre_define.append(int.from_bytes(file.read(self.four_bytes)))
next_track_ID = int.from_bytes(file.read(self.four_bytes))
print("creation_time:{0} ".format(creation_time))
print("modification_time:{0} ".format(modification_time))
print("timescale:{0} ".format(timescale))
print("duration:{0} ".format(duration))
print("rate:{0}.{1} ".format(rate_i, rate_f))
print("volume:{0}.{1} ".format(volume_i, volume_f))
print("reserved:{0} ".format(reserved))
print("reserved[] :{0} , {1} ".format(reserved_1, reserved_2))
print("matrix:{0} ".format(matrix))
print("pre_define:{0} ".format(pre_define))
print("next_track_ID:{0} ".format(next_track_ID))
return None,None
class TrakBox(Box):
def printSelf(self, file):
super().printSelf(file)
#获取内部的下一个box
type,size = Box().findBoxHeader(file)
index = 0
while type == 'tkhd' :
tkhd_box = TkhdBox(type,size)
type, size = tkhd_box.printSelf(file)
print("\n{0} box remain size======> {1}".format(tkhd_box.box_type,(self.box_size-tkhd_box.box_size)))
# box剩余的还没读取数据一次性计提
tkhd_remain_size = file.read(self.box_size-tkhd_box.box_size)
if type is None and size is None:
type, size = Box().findBoxHeader(file) #读取下一个box的头部信息
return type,size #下一个box的头部信息
class TkhdBox(Box):
def __init__(self, box_type=None, size=None):
self.isFullBox = True
super().__init__(box_type, size)
def printSelf(self, file):
super().printSelf(file)
if self.box_version == 1:
creation_time = int.from_bytes(file.read(self.eight_bytes))
modification_time = int.from_bytes(file.read(self.eight_bytes))
track_ID = int.from_bytes(file.read(self.four_bytes))
reserved_32 = int.from_bytes(file.read(self.four_bytes))
duration = int.from_bytes(file.read(self.eight_bytes))
else:
creation_time = int.from_bytes(file.read(self.four_bytes))
modification_time = int.from_bytes(file.read(self.four_bytes))
track_ID = int.from_bytes(file.read(self.four_bytes))
reserved_32 = int.from_bytes(file.read(self.four_bytes))
duration = int.from_bytes(file.read(self.four_bytes))
reserved_1 = int.from_bytes(file.read(self.four_bytes))
reserved_2 = int.from_bytes(file.read(self.four_bytes))
layer = int.from_bytes(file.read(self.two_bytes))
alternate_group = int.from_bytes(file.read(self.two_bytes))
volume_all = int.from_bytes(file.read(self.two_bytes))
volume_f = volume_all & 0xff
volume_i = volume_all >> 8
reserved_16 = int.from_bytes(file.read(self.two_bytes))
matrix = []
for i in range(9):
matrix.append(int.from_bytes(file.read(self.four_bytes)))
width = int.from_bytes(file.read(self.four_bytes)) >> 16
height = int.from_bytes(file.read(self.four_bytes)) >> 16
print("creation_time:{0} ".format(creation_time))
print("modification_time:{0} ".format(modification_time))
print("track_ID:{0} ".format(track_ID))
print("reserved_32:{0} ".format(reserved_32))
print("duration:{0} ".format(duration))
print("duration:{0} ".format(duration))
print("reserved[]:{0} {1} ".format(reserved_1,reserved_2))
print("layer:{0} ".format(layer))
print("alternate_group:{0} ".format(alternate_group))
print("volume:{0}.{1} ".format(volume_i,volume_f))
print("reserved_16:{0} ".format(reserved_16))
print("matrix:{0} ".format(matrix))
print("width:{0} ".format(width))
print("height:{0} ".format(height))
return None, None
主要实现了MP4文件的ftyp/moov/mvhd/trak/tkhd这几个box的解析,读取其中存储的信息。
核心逻辑就是读取box的头8个字节来判断它的类型和大小,然后根据对应的类型进行解析即可。
入口程序如下
def print_MP4(file_name):
with open(file_name, 'rb') as file:
box_type, box_size = Box().findBoxHeader(file)
while True:
#print("current type {0}".format(box_type))
if box_type == 'ftyp':
box_type, box_size = FtypBox(box_type,box_size).printSelf(file)
elif box_type == 'moov':
box_type, box_size = MoovBox(box_type,box_size).printSelf(file)
else:
break
if box_type is None and box_size is None:
box_type, box_size = Box().findBoxHeader(file)
print("\n=======================read end ==============================")
if __name__ == '__main__':
print_MP4('sample.mp4')
当然,假如只想读取特定内容则大可不必把box完整读取出来,只需要读取特定位置的内容即可。
读取AAC格式数据
aac不仅仅常用于MP4文件中的音频数据存储,它可以作为单独的音频文件被大家消费。
如果不方便找到一个AAC文件的话,可以从MP4文件中提取出一个AAC文件(使用ffmpeg):
ffmpeg -i test.mp4 -acodec aac -vn output.aac
此时我们已经获得了一个sample.aac文件(从sample.mp4中提取的),那么接下来如何读取它的数据从而获得有效的信息呢?
class ADTSHeader(object):
one_byte = 1
two_byte = 2
three_byte = 3
def getProfile(self,profile):
profile_real = 'reserved'
if profile == 1:
profile_real = 'Low Complexity profile (LC) '
elif profile == 2:
profile_real = 'Scalable Sampling Rate profile (SSR)'
elif profile == 0:
profile_real = 'Main Profile'
return profile_real
def getLayer(self,layer):
layer_real = 'reserved'
if layer == 1:
layer_real = 'Layer III'
elif layer == 2:
layer_real = 'Layer II'
elif layer == 3:
layer_real = 'Layer I'
return layer_real
def getChannelConfiguration(self,chanel):
channel_configure = str(chanel)
if chanel == 6:
channel_configure = '5+1'
elif chanel == 7:
channel_configure = '7+1'
return channel_configure
def getSampling(self,sampling_frequency):
sampling_frequency_value = '0'
if sampling_frequency == 0:
sampling_frequency_value = '96khz'
elif sampling_frequency == 1:
sampling_frequency_value = '88.2khz'
elif sampling_frequency == 2:
sampling_frequency_value = '64khz'
elif sampling_frequency == 3:
sampling_frequency_value = '48khz'
elif sampling_frequency == 4:
sampling_frequency_value = '44.1khz'
elif sampling_frequency == 5:
sampling_frequency_value = '32khz'
elif sampling_frequency == 6:
sampling_frequency_value = '24khz'
elif sampling_frequency == 7:
sampling_frequency_value = '22khz'
elif sampling_frequency == 8:
sampling_frequency_value = '16khz'
elif sampling_frequency == 9:
sampling_frequency_value = '12khz'
elif sampling_frequency == 10:
sampling_frequency_value = '11.025khz'
elif sampling_frequency == 10:
sampling_frequency_value = '0.8khz'
else:
sampling_frequency_value = 'reserved'
return sampling_frequency_value
def printSelf(self,file):
result = int.from_bytes(file.read(self.two_byte))
syncword = result >> 4
id = (result & 0x0008) >> 3
layer = (result & 0x0006) >> 1
protection_absent = (result & 0x0001)
result = int.from_bytes(file.read(self.two_byte))
profile = result >> 14
sampling_frequency_index = (result & 0x3c00) >> 10
private_bit = (result & 0x0200) >> 9
channel_configuration = (result & 0x01c0) >> 6
original_copy = (result & 0x0020) >> 5
home = (result & 0x0010) >> 4
# 以下是可变头部的数据读取
copyright_identification_bit = (result & 0x0008) >> 3
copyright_identification_start = (result & 0x0004) >> 2
remain_2 = (result & 0x3) # 剩余2bit
result = int.from_bytes(file.read(self.three_byte)) #读取剩余3byte
aac_frame_length = (result >> 13) | (remain_2 << 11)
adts_buffer_fullness = (result & 0x1ffc) >> 2
number_of_raw_data_blocks_in_frame = (result & 0x3)
print("================================= adts_fixed_header ==========================")
print("syncword: {0}".format(hex(syncword)))
print("id: {0}".format(id))
print("layer: {0} : {1}".format(layer,self.getLayer(layer)))
print("protection_absent: {0}".format(protection_absent))
print("profile: {0} ".format(self.getProfile(profile)))
print("sampling_frequency_index: {0} ".format(self.getSampling(sampling_frequency_index)))
print("private_bit: {0}".format(private_bit))
print("channel_configuration: {0} ".format(self.getChannelConfiguration(channel_configuration)))
print("original_copy: {0}".format(original_copy))
print("home: {0}".format(home))
print("================================= adts_variable_header ==========================")
print("copyright_identification_bit: {0}".format(copyright_identification_bit))
print("copyright_identification_start: {0}".format(copyright_identification_start))
print("aac_frame_length: {0}".format(aac_frame_length))
print("adts_buffer_fullness: {0}".format(hex(adts_buffer_fullness)))
print("number_of_raw_data_blocks_in_frame: {0}".format(number_of_raw_data_blocks_in_frame))
入口程序如下:
def print_AAC(file_name):
with open(file_name, 'rb') as file:
ADTSHeader().printSelf(file)
print("\n=======================read end ==============================")
if __name__ == '__main__':
print_AAC('sample.aac')
程序执行之后打印的内容如下:
================================= adts_fixed_header ==========================
syncword: 0xfff
id: 0
layer: 0 : reserved
protection_absent: 1
profile: Low Complexity profile (LC)
sampling_frequency_index: 44.1khz // 44.1khz
private_bit: 0
channel_configuration: 2 // 两个声道
original_copy: 0
home: 0
================================= adts_variable_header ==========================
copyright_identification_bit: 0
copyright_identification_start: 0
aac_frame_length: 378
adts_buffer_fullness: 0x7ff
number_of_raw_data_blocks_in_frame: 0
=======================read end ==============================
然后我们利用ffmpeg打印sample.aac文件的基本信息,对照以下看是否一致
> ffprobe -show_streams sample.aac
[STREAM]
index=0
codec_name=aac
codec_long_name=AAC (Advanced Audio Coding)
profile=LC
codec_type=audio
codec_tag_string=[0][0][0][0]
codec_tag=0x0000
sample_fmt=fltp
sample_rate=44100
channels=2
channel_layout=stereo
...
...
[/STREAM]
采样率,声道数,profile这些都是一致的。
读取H.264码流
首先当然是从MP4文件中提取H.264码流数据:
ffmpeg -i sample.mp4 -codec copy -bsf: h264_mp4toannexb -f h264 sample.264
然后我们就可以按照H.264数据编码格式来读取一些信息了。H.264格式解析见ffmpeg开发——初探H.264
具体的解析逻辑如下:
class NALU(object):
forbidden_zero_bit = -1
nal_ref_idc = -1
nal_unit_type = -1
nal_unit_type_str = ''
start_in_file = -1
end_in_file = -1
size = -1
def copy_from(self,nalu_obj):
self.forbidden_zero_bit = nalu_obj.forbidden_zero_bit
self.nal_ref_idc = nalu_obj.nal_ref_idc
self.nal_unit_type = nalu_obj.nal_unit_type
self.nal_unit_type_str = nalu_obj.nal_unit_type_str
self.start_in_file = nalu_obj.start_in_file
self.end_in_file = nalu_obj.end_in_file
self.size = nalu_obj.size
def parse_data(self,file):
pass
class NaluDataFinder(object):
BYTE_ONE = 1
BYTE_TWO = 2
BYTE_THREE = 3
BYTE_FOUR = 4
BYTE_10M = 10*1024*1024
def isStartCode(self,file):
data_byte = file.read(self.BYTE_THREE)
if len(data_byte) < self.BYTE_THREE: # 没读到预期值表明已经读到结尾了
#print("数据不足 {0},已经读到文件末尾了".format(len(data_byte)))
return False ,len(data_byte)
data = int.from_bytes(data_byte)
if not data_byte:
return False,0
#print(hex(data))
byte_num = 3
if data == 0x000001:
#print("start code 0x000001")
return True,byte_num
if data == 0x000000:
end = int.from_bytes(file.read(self.BYTE_ONE))
if not end:
return False,byte_num
byte_num = byte_num+1
data = (data << 8) | end
if end == 0x01:
#print("start code 0x00000001 ===== ")
return True, byte_num
#print("start code not found !!!")
return False,byte_num
def getNALUType(self,nalu_type):
if nalu_type == 0:
return "unspecified"
elif nalu_type == 1:
return "non-IDR slice layer"
elif nalu_type == 2 or nalu_type == 3 or nalu_type == 4:
return "A/B/C slice data"
elif nalu_type == 5:
return "IDR slice layer"
elif nalu_type == 6:
return "SEI"
elif nalu_type == 7:
return "SPS"
elif nalu_type == 8:
return "PPS"
elif nalu_type == 9:
return "unit-delimiter"
else:
return "other-type"
def printSelf(self,file,start_index):
isEnd = False
nalu_size = 1 #当前的nalu的大小
one_byte_data = file.read(self.BYTE_ONE)
nalu_obj = NALU()
# if not one_byte_data:
# #print("read file eof +++")
# isEnd = True
# return isEnd,0
byte_data = int.from_bytes(one_byte_data)
forbidden_zero_bit = byte_data >> 7
nal_ref_idc = (byte_data & 0x70) >> 5
nal_unit_type = (byte_data & 0x1f)
nalu_obj.forbidden_zero_bit = forbidden_zero_bit
nalu_obj.nal_ref_idc = nal_ref_idc
nalu_obj.nal_unit_type = nal_unit_type
nalu_obj.nal_unit_type_str = self.getNALUType(nal_unit_type)
nalu_obj.start_in_file = start_index
is_start_code, read_byte_num =self.isStartCode(file)
while not is_start_code and read_byte_num >= 3:
nalu_size = nalu_size+1;
seek_num = 0-(read_byte_num-1)
file.seek(seek_num,1)
is_start_code, read_byte_num = self.isStartCode(file)
if not is_start_code and read_byte_num < 3:
nalu_size = nalu_size+read_byte_num
nalu_obj.end_in_file = start_index+nalu_size
nalu_obj.size = nalu_size
# print("nalu size: {0} in file start:{1} end:{2}".format(nalu_size,nalu_obj.start_in_file,nalu_obj.end_in_file))
next_start = start_index + nalu_size + read_byte_num
if read_byte_num < 3:
print("read file eof ===")
isEnd = True
return isEnd,next_start,nalu_obj
return isEnd,next_start,nalu_obj
class H264Reader(object):
def printSelf(self,file):
nalu_finder = NaluDataFinder()
is_start_code,read_byte_num = nalu_finder.isStartCode(file)
isEnd = False
if read_byte_num == 0: # 读取完毕
#print("read file eof ----")
isEnd = True
return isEnd
start_index = read_byte_num
nalu_array = [] # 存储NALU列表
end,start_index,nalu_obj = nalu_finder.printSelf(file,start_index)
nalu_array.append(nalu_obj)
while not end:
end,start_index,nalu_obj = nalu_finder.printSelf(file,start_index)
nalu_array.append(nalu_obj)
入口程序也是类似:
def print_h264(file_name):
with open(file_name, 'rb') as file:
H264Reader().printSelf(file)
print("\n======================= read end ==============================")
if __name__ == '__main__':
print_h264('sample.264')
对于H.264数据解析目前只解析到NALU的层级,定位了h264码流中的每个NALU所在的位置,后面会补充一些具体的结构比如SLice,SPS,PPS等的解析逻辑,会直接更新在github上。
总结
其实无论解析MP4封装文件,还是AAC音频,h264码流,当我们了解了他们的内部结构的定义之后,解析的逻辑可以称得上是按部就班,读取每个字节,甚至每个bit的数据,把他们按照定义标准文档解读出来即可。