源引自气象家园大佬,仅作自学使用,使用时需将此py文件放入需要下载的文件夹内。
暂未学会给定下载目录。
# -*- coding: utf-8 -*-
"""
Created on Tue Jul 23 14:45:51 2024
@author: dell
"""
from queue import Queue
from threading import Thread
import cdsapi
import time
from datetime import datetime, timedelta
import os, glob
from multiprocessing import Pool
download_dir = os.getcwd()
def download_ear5(allvar):
print(allvar)
variable = allvar[0]
pressure_level = allvar[1]
year = allvar[2]
month = allvar[3]
day = allvar[4]
hour = allvar[5]
file_path = os.path.join(
download_dir,
variable,
pressure_level,
year,
month,
day,
variable
+ "_"
+ pressure_level
+ "hpa"
+ "_"
+ year
+ month
+ day
+ hour
+ ".nc",
)
if not os.path.exists(
file_path
): # or (os.path.getsize(file_path) < 1024*1024*1.8) :
c = cdsapi.Client() ## 填入账号信息
r = c.retrieve(
"reanalysis-era5-pressure-levels",
{
"product_type": "reanalysis",
"format": "netcdf",
"variable": [variable],
"pressure_level": [pressure_level],
"year": year,
"month": [month],
"day": [day],
"time": [hour + ":00"],
"area": [60, 80, 20, 130],
},
)
r.download(
os.path.join(
download_dir,
variable,
pressure_level,
year,
month,
day,
variable
+ "_"
+ pressure_level
+ "hpa"
+ "_"
+ year
+ month
+ day
+ hour
+ ".nc",
)
)
r.delete()
time.sleep(3)
return
# 下载脚本
class DownloadWorker(Thread):
def __init__(self, queue):
Thread.__init__(self)
self.queue = queue
def run(self):
while True:
# 从队列中获取任务并扩展tuple
allvar = self.queue.get()
download_ear5(allvar)
self.queue.task_done()
# 主程序
def main():
start_time = datetime(2018, 1, 1, 0) # 开始时间
end_time = datetime(2022, 12, 31, 0) # 结束时间
# 创建列表,存储起止时间之间的所有时次
datetime_list = []
while start_time <= end_time:
datetime_list.append(start_time)
start_time += timedelta(hours=1)
# 变量列表
variable_list = [
"geopotential",
"temperature",
"u_component_of_wind",
"v_component_of_wind",
]
# 气压层列表
pressure_level_list = ["500", "700", "850"]
# 创建allvar列表,内含variable,pressure_level,year,month,day,hour
allvar = []
for idatetime in datetime_list: # 非并行创建文件夹,同事填充allvar列表
year = idatetime.strftime("%Y")
month = idatetime.strftime("%m")
day = idatetime.strftime("%d")
hour = idatetime.strftime("%H")
for pressure_level in pressure_level_list:
for variable in variable_list:
if not os.path.exists(
os.path.join(
download_dir, variable, pressure_level, year, month, day
)
):
os.makedirs(
os.path.join(
download_dir, variable, pressure_level, year, month, day
)
)
allvar.append([variable, pressure_level, year, month, day, hour])
# 创建一个主进程与工作进程通信
queue = Queue()
# 注意,每个用户同时最多接受4个request https://cds.climate.copernicus.eu/vision
# 创建12个工作线程
for x in range(12):
worker = DownloadWorker(queue)
# 将daemon设置为True将会使主线程退出,即使所有worker都阻塞了
worker.daemon = False
worker.start()
# 将任务以tuple的形式放入队列中
for link in allvar:
queue.put((link))
# 让主线程等待队列完成所有的任务
queue.join()
if __name__ == "__main__":
main()
----------------------------------------------------------------------------------------------------------------------------
# -*- coding: utf-8 -*-
"""
Created on Tue Jul 23 14:45:51 2024
@author: dell
"""
from queue import Queue
from threading import Thread
import cdsapi
import time
from datetime import datetime, timedelta
import os, glob
from multiprocessing import Pool
download_dir = os.getcwd()
def download_ear5(allvar):
print(allvar)
variable = allvar[0]
pressure_level = allvar[1]
year = allvar[2]
month = allvar[3]
day = allvar[4]
hour = allvar[5]
file_path = os.path.join(
download_dir,
variable,
pressure_level,
year,
month,
day,
variable
+ "_"
+ pressure_level
+ "hpa"
+ "_"
+ year
+ month
+ day
+ hour
+ ".nc",
)
if not os.path.exists(
file_path
): # or (os.path.getsize(file_path) < 1024*1024*1.8) :
c = cdsapi.Client()
r = c.retrieve(
"reanalysis-era5-pressure-levels",
{
"product_type": "reanalysis",
"data_format": "netcdf",
"variable": [variable],
"pressure_level": [pressure_level],
"year": [year],
"month": [month],
"day": [day],
"time": [hour + ":00"],
"area": [60, 80, 20, 130],
},
)
r.download(
os.path.join(
download_dir,
variable,
pressure_level,
year,
month,
day,
variable
+ "_"
+ pressure_level
+ "hpa"
+ "_"
+ year
+ month
+ day
+ hour
+ ".nc",
)
)
# r.delete()
time.sleep(3)
return
# 下载脚本
class DownloadWorker(Thread):
def __init__(self, queue):
Thread.__init__(self)
self.queue = queue
def run(self):
while True:
# 从队列中获取任务并扩展tuple
allvar = self.queue.get()
download_ear5(allvar)
self.queue.task_done()
# 主程序
def main():
start_time = datetime(2018, 1, 1, 0) # 开始时间
end_time = datetime(2022, 12, 31, 0) # 结束时间
# 创建列表,存储起止时间之间的所有时次
datetime_list = []
while start_time <= end_time:
datetime_list.append(start_time)
start_time += timedelta(hours=1)
# 变量列表
variable_list = [
"geopotential",
"u_component_of_wind",
"v_component_of_wind",
]
# 气压层列表
pressure_level_list = ["500", "700", "850"]
# 创建allvar列表,内含variable,pressure_level,year,month,day,hour
allvar = []
for idatetime in datetime_list: # 非并行创建文件夹,同事填充allvar列表
year = idatetime.strftime("%Y")
month = idatetime.strftime("%m")
day = idatetime.strftime("%d")
hour = idatetime.strftime("%H")
for pressure_level in pressure_level_list:
for variable in variable_list:
if not os.path.exists(
os.path.join(
download_dir, variable, pressure_level, year, month, day
)
):
os.makedirs(
os.path.join(
download_dir, variable, pressure_level, year, month, day
)
)
allvar.append([variable, pressure_level, year, month, day, hour])
# 创建一个主进程与工作进程通信
queue = Queue()
# 注意,每个用户同时最多接受4个request https://cds.climate.copernicus.eu/vision
# 创建12个工作线程
for x in range(12):
worker = DownloadWorker(queue)
# 将daemon设置为True将会使主线程退出,即使所有worker都阻塞了
worker.daemon = False
worker.start()
# 将任务以tuple的形式放入队列中
for link in allvar:
queue.put((link))
# 让主线程等待队列完成所有的任务
queue.join()
if __name__ == "__main__":
main()