OTA的制作流程是从ota_from_target_files开始的,ota_from_target_files模块最先执行的方法如下:
if __name__ == '__main__':
try:
common.CloseInheritedPipes() //MAC系统gmake会存在文件描述符泄露,因此在开始其他工作的时候将这些描述符关闭
main(sys.argv[1:]) //开始制作OTA包
except common.ExternalError as e:
print
print " ERROR: %s" % (e,)
print
sys.exit(1)
finally:
common.Cleanup()
脚本入口函数main
def main(argv):
def option_handler(o, a): \\将用户设定的option存入到一个OPTIONS类中
if o == "--board_config":
pass # deprecated
elif o in ("-k", "--package_key"):
OPTIONS.package_key = a
elif o in ("-i", "--incremental_from"):
OPTIONS.incremental_source = a
elif o == "--full_radio":
OPTIONS.full_radio = True
elif o == "--full_bootloader":
OPTIONS.full_bootloader = True
elif o in ("-w", "--wipe_user_data"):
OPTIONS.wipe_user_data = True
elif o in ("-n", "--no_prereq"):
OPTIONS.omit_prereq = True
elif o in ("-o", "--oem_settings"):
OPTIONS.oem_source = a
elif o in ("-e", "--extra_script"):
OPTIONS.extra_script = a
elif o in ("-a", "--aslr_mode"):
if a in ("on", "On", "true", "True", "yes", "Yes"):
OPTIONS.aslr_mode = True
else:
OPTIONS.aslr_mode = False
elif o in ("-t", "--worker_threads"):
if a.isdigit():
OPTIONS.worker_threads = int(a)
else:
raise ValueError("Cannot parse value %r for option %r - only "
"integers are allowed." % (a, o))
elif o in ("-2", "--two_step"):
OPTIONS.two_step = True
elif o == "--no_signing":
OPTIONS.no_signing = True
elif o == "--verify":
OPTIONS.verify = True
elif o == "--block":
OPTIONS.block_based = True
elif o in ("-b", "--binary"):
OPTIONS.updater_binary = a
elif o in ("--no_fallback_to_full",):
OPTIONS.fallback_to_full = False
elif o == "--stash_threshold":
try:
OPTIONS.stash_threshold = float(a)
except ValueError:
raise ValueError("Cannot parse value %r for option %r - expecting "
"a float" % (a, o))
else:
return False
return True
解析参数,将得到的参数和参数值传回给args,args是个列表,保存了没有加-或者--的参数
args = common.ParseOptions(argv, __doc__,
extra_opts="b:k:i:d:wne:t:a:2o:",
extra_long_opts=[
"board_config=",
"package_key=",
"incremental_from=",
"full_radio",
"full_bootloader",
"wipe_user_data",
"no_prereq",
"extra_script=",
"worker_threads=",
"aslr_mode=",
"two_step",
"no_signing",
"block",
"binary=",
"oem_settings=",
"verify",
"no_fallback_to_full",
"stash_threshold=",
], extra_option_handler=option_handler)
common.py中的ParseOptions,主要是调用python中的getopt模块中的getopt函数来解析参数
def ParseOptions(argv,
docstring,
extra_opts="", extra_long_opts=(),
extra_option_handler=None):
"""Parse the options in argv and return any arguments that aren't
flags. docstring is the calling module's docstring, to be displayed
for errors and -h. extra_opts and extra_long_opts are for flags
defined by the caller, which are processed by passing them to
extra_option_handler."""
try:
opts, args = getopt.getopt(
argv, "hvp:s:x:" + extra_opts,
["help", "verbose", "path=", "signapk_path=", "extra_signapk_args=",
"java_path=", "java_args=", "public_key_suffix=",
"private_key_suffix=", "boot_signer_path=", "boot_signer_args=",
"verity_signer_path=", "verity_signer_args=", "device_specific=",
"extra="] +
list(extra_long_opts))
except getopt.GetoptError as err:
Usage(docstring)
print "**", str(err), "**"
sys.exit(2)
for o, a in opts:
if o in ("-h", "--help"):
Usage(docstring)
sys.exit()
elif o in ("-v", "--verbose"):
OPTIONS.verbose = True
elif o in ("-p", "--path"):
OPTIONS.search_path = a
elif o in ("--signapk_path",):
OPTIONS.signapk_path = a
elif o in ("--extra_signapk_args",):
OPTIONS.extra_signapk_args = shlex.split(a)
elif o in ("--java_path",):
OPTIONS.java_path = a
elif o in ("--java_args",):
OPTIONS.java_args = a
elif o in ("--public_key_suffix",):
OPTIONS.public_key_suffix = a
elif o in ("--private_key_suffix",):
OPTIONS.private_key_suffix = a
elif o in ("--boot_signer_path",):
OPTIONS.boot_signer_path = a
elif o in ("--boot_signer_args",):
OPTIONS.boot_signer_args = shlex.split(a)
elif o in ("--verity_signer_path",):
OPTIONS.verity_signer_path = a
elif o in ("--verity_signer_args",):
OPTIONS.verity_signer_args = shlex.split(a)
elif o in ("-s", "--device_specific"):
OPTIONS.device_specific = a
elif o in ("-x", "--extra"):
key, value = a.split("=", 1)
OPTIONS.extras[key] = value
else:
if extra_option_handler is None or not extra_option_handler(o, a):
assert False, "unknown option \"%s\"" % (o,)
if OPTIONS.search_path:
os.environ["PATH"] = (os.path.join(OPTIONS.search_path, "bin") +
os.pathsep + os.environ["PATH"])
return args
if len(args) != 2:
common.Usage(__doc__)
sys.exit(1)
if OPTIONS.extra_script is not None:
OPTIONS.extra_script = open(OPTIONS.extra_script).read()
解压目标文件包
print "unzipping target target-files..."
OPTIONS.input_tmp, input_zip = common.UnzipTemp(args[0])
common.py中的UnzipTemp函数,主要是对目标文件包进行解压,返回值是tmp临时路径和zipfile.ZipFile目标文件包的内容。
def UnzipTemp(filename, pattern=None):
"""Unzip the given archive into a temporary directory and return the name.
If filename is of the form "foo.zip+bar.zip", unzip foo.zip into a
temp dir, then unzip bar.zip into that_dir/BOOTABLE_IMAGES.
Returns (tempdir, zipobj) where zipobj is a zipfile.ZipFile (of the
main file), open for reading.
"""
tmp = tempfile.mkdtemp(prefix="targetfiles-")
OPTIONS.tempfiles.append(tmp)
def unzip_to_dir(filename, dirname):
cmd = ["unzip", "-o", "-q", filename, "-d", dirname]
if pattern is not None:
cmd.append(pattern)
p = Run(cmd, stdout=subprocess.PIPE) //开启一个进程来解压目标文件包
p.communicate()
if p.returncode != 0:
raise ExternalError("failed to unzip input target-files \"%s\"" %
(filename,))
m = re.match(r"^(.*[.]zip)\+(.*[.]zip)$", filename, re.IGNORECASE)
if m:
unzip_to_dir(m.group(1), tmp)
unzip_to_dir(m.group(2), os.path.join(tmp, "BOOTABLE_IMAGES"))
filename = m.group(1)
else:
unzip_to_dir(filename, tmp)
return tmp, zipfile.ZipFile(filename, "r")
OPTIONS.target_tmp = OPTIONS.input_tmp
OPTIONS.info_dict = common.LoadInfoDict(input_zip)
将目标文件包中META/misc_info.txt的信息进行解析保存
def LoadInfoDict(input_file):
"""Read and parse the META/misc_info.txt key/value pairs from the
input target files and return a dict."""
def read_helper(fn):
if isinstance(input_file, zipfile.ZipFile):
return input_file.read(fn)
else:
path = os.path.join(input_file, *fn.split("/"))
try:
with open(path) as f:
return f.read()
except IOError as e:
if e.errno == errno.ENOENT:
raise KeyError(fn)
d = {}
try:
d = LoadDictionaryFromLines(read_helper("META/misc_info.txt").split("\n"))
except KeyError:
# ok if misc_info.txt doesn't exist
pass
# backwards compatibility: These values used to be in their own
# files. Look for them, in case we're processing an old
# target_files zip.
if "mkyaffs2_extra_flags" not in d:
try:
d["mkyaffs2_extra_flags"] = read_helper(
"META/mkyaffs2-extra-flags.txt").strip()
except KeyError:
# ok if flags don't exist
pass
if "recovery_api_version" not in d:
try:
d["recovery_api_version"] = read_helper(
"META/recovery-api-version.txt").strip()
except KeyError:
raise ValueError("can't find recovery API version in input target-files")
if "tool_extensions" not in d:
try:
d["tool_extensions"] = read_helper("META/tool-extensions.txt").strip()
except KeyError:
# ok if extensions don't exist
pass
if "fstab_version" not in d:
d["fstab_version"] = "1"
try:
data = read_helper("META/imagesizes.txt")
for line in data.split("\n"):
if not line:
continue
name, value = line.split(" ", 1)
if not value:
continue
if name == "blocksize":
d[name] = value
else:
d[name + "_size"] = value
except KeyError:
pass
def makeint(key):
if key in d:
d[key] = int(d[key], 0)
makeint("recovery_api_version")
makeint("blocksize")
makeint("system_size")
makeint("vendor_size")
makeint("userdata_size")
makeint("cache_size")
makeint("recovery_size")
makeint("boot_size")
makeint("fstab_version")
d["fstab"] = LoadRecoveryFSTab(read_helper, d["fstab_version"])
d["build.prop"] = LoadBuildProp(read_helper)
return d
def LoadDictionaryFromLines(lines):
d = {}
for line in lines:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
name, value = line.split("=", 1)
d[name] = value
return d
# If this image was originally labelled with SELinux contexts, make sure we
# also apply the labels in our new image. During building, the "file_contexts"
# is in the out/ directory tree, but for repacking from target-files.zip it's
# in the root directory of the ramdisk.
if "selinux_fc" in OPTIONS.info_dict:
OPTIONS.info_dict["selinux_fc"] = os.path.join(
OPTIONS.input_tmp, "BOOT", "RAMDISK", "file_contexts")
if OPTIONS.verbose:
print "--- target info ---"
common.DumpInfoDict(OPTIONS.info_dict) //打印从目标文件包中的/META/misc_info.txt读取的信息
# If the caller explicitly specified the device-specific extensions
# path via -s/--device_specific, use that. Otherwise, use
# META/releasetools.py if it is present in the target target_files.
# Otherwise, take the path of the file from 'tool_extensions' in the
# info dict and look for that in the local filesystem, relative to
# the current directory.
if OPTIONS.device_specific is None:
from_input = os.path.join(OPTIONS.input_tmp, "META", "releasetools.py")
if os.path.exists(from_input):
print "(using device-specific extensions from target_files)"
OPTIONS.device_specific = from_input
else:
OPTIONS.device_specific = OPTIONS.info_dict.get("tool_extensions", None)
if OPTIONS.device_specific is not None:
OPTIONS.device_specific = os.path.abspath(OPTIONS.device_specific)
while True:
if OPTIONS.no_signing:
if os.path.exists(args[1]):
os.unlink(args[1])
output_zip = zipfile.ZipFile(args[1], "w",
compression=zipfile.ZIP_DEFLATED)
else:
temp_zip_file = tempfile.NamedTemporaryFile()
output_zip = zipfile.ZipFile(temp_zip_file, "w",
compression=zipfile.ZIP_DEFLATED)
cache_size = OPTIONS.info_dict.get("cache_size", None)
if cache_size is None:
raise RuntimeError("can't determine the cache partition size")
OPTIONS.cache_size = cache_size
if OPTIONS.incremental_source is None:
WriteFullOTAPackage(input_zip, output_zip)
if OPTIONS.package_key is None:
OPTIONS.package_key = OPTIONS.info_dict.get(
"default_system_dev_certificate",
"build/target/product/security/testkey")
common.ZipClose(output_zip)
break
else:
print "unzipping source target-files..." //解析和保存源文件包的相关信息
OPTIONS.source_tmp, source_zip = common.UnzipTemp(
OPTIONS.incremental_source)
OPTIONS.target_info_dict = OPTIONS.info_dict
OPTIONS.source_info_dict = common.LoadInfoDict(source_zip)
if "selinux_fc" in OPTIONS.source_info_dict:
OPTIONS.source_info_dict["selinux_fc"] = os.path.join(
OPTIONS.source_tmp, "BOOT", "RAMDISK", "file_contexts")
if OPTIONS.package_key is None:
OPTIONS.package_key = OPTIONS.source_info_dict.get(
"default_system_dev_certificate",
"build/target/product/security/testkey")
if OPTIONS.verbose:
print "--- source info ---"
common.DumpInfoDict(OPTIONS.source_info_dict)
try:
WriteIncrementalOTAPackage(input_zip, source_zip, output_zip) //构建OTA包
common.ZipClose(output_zip)
break
except ValueError:
if not OPTIONS.fallback_to_full:
raise
print "--- failed to build incremental; falling back to full ---"
OPTIONS.incremental_source = None
common.ZipClose(output_zip)
if not OPTIONS.no_signing:
SignOutput(temp_zip_file.name, args[1])
temp_zip_file.close()
print "done."