# -*- coding: utf-8 -*-
# @Author: wqshen
# @Email: wqshen91@gmail.com
# @Date: 2023/2/6 10:23
# @Last Modified by: wqshen
import os
import yaml
import numpy as np
import configparser
import pandas as pd
import xarray as xr
from typing import Union
from logzero import logger
from itertools import product
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor
from pydaas.music.DataQueryClient import DataQueryClient
[文档]
class DaasClient(DataQueryClient):
def __init__(self, user: str = None, password: str = None, **kwargs):
"""Daas
Parameters
----------
user: str
user name
password: str
password
kwargs:
other parameters passed into DataQueryClient
"""
conf_dir = fr'{os.path.dirname(__file__)}/config'
default_config = fr'{conf_dir}/client.config'
kwargs['config_file'] = kwargs.get('config_file', default_config)
logger.debug(f"load client.config from {kwargs['config_file']}")
super().__init__(**kwargs)
cf = configparser.ConfigParser()
cf.read(kwargs['config_file'], 'utf-8')
self._user = cf.get('Pb', 'music_user') if user is None else user
self._password = cf.get('Pb', 'music_password') if password is None else password
self._n_jobs = 1
self.alias = self.load_yaml(fr'{conf_dir}/alias.yaml')
@property
def n_jobs(self) -> int:
"""getter attribute for thread numbers"""
return self._n_jobs
@n_jobs.setter
def n_jobs(self, n: int):
"""setter thread numbers in parallel get data
Parameters
----------
n : int
thread number in parallel get data
"""
self._n_jobs = n
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
del self
[文档]
@staticmethod
def load_yaml(path_yaml: str, encoding: str = 'utf8') -> dict:
"""load yaml configuration file
Parameters
----------
path_yaml: str
path to yaml file
encoding: str
file encoding, default 'utf8'
Returns
-------
res: dict
A dict like object
"""
if not os.path.isfile(path_yaml):
raise FileNotFoundError(path_yaml)
with open(path_yaml, "r", encoding=encoding) as f:
return yaml.safe_load(f)
@property
def user(self) -> str:
"""get property user"""
return self._user
@user.setter
def user(self, user: str):
"""set property user"""
self._user = user
@property
def password(self) -> str:
"""get property password"""
return self._password
@password.setter
def password(self, pwd: str):
"""set property password"""
self._password = pwd
[文档]
def sel(self, datasource: Union[str, list], inittime: Union[datetime, slice, list, str] = None,
fh: Union[int, slice, list] = None, varname: Union[str, list] = None,
leadtime: Union[datetime, slice, list, str] = None,
merge: bool = False,
**kwargs) -> Union[xr.DataArray, pd.DataFrame, list]:
"""interface to select variable from file by given more filter and clip parameters
Parameters
----------
datasource (str, list): data source name from Daas, also alias from config/alias.yaml
inittime (datetime, slice, list): model initial datetime or observation time
fh (int, list): forecast hour
varname (str, list): variable name
kwargs (dict): other k/v arguments passed to `sel` method of specific reader
Returns
-------
(pd.DataFrame, xarray.DataArray, list[xarray.DataArray]): Readed variable
"""
datasource = [datasource] if isinstance(datasource, str) else datasource
datasource = [self.alias.get(d, d) for d in datasource]
inittime = [inittime] if isinstance(inittime, (datetime, slice, str)) or inittime is None else inittime
leadtime = [leadtime] if isinstance(leadtime, (datetime, slice, str)) or leadtime is None else leadtime
fh = [fh] if isinstance(fh, (int, slice, str)) or fh is None else fh
varname = [varname] if isinstance(varname, str) or varname is None else varname
requests = list(product(datasource, inittime, fh, varname, leadtime))
with ThreadPoolExecutor(max_workers=self._n_jobs) as executor:
datas = list(executor.map(lambda r: self._sel(r, **kwargs), requests))
if all([i is None for i in datas]):
logger.exception(f"all requests failed.")
raise Exception(f"all requests failed.")
if merge:
if isinstance(datas, list):
if isinstance(datas[0], (xr.DataArray, xr.Dataset)):
if len(inittime) > 1 and all([d.time == datas[0].time for d in datas]):
leadtime = xr.DataArray(datas[0].time.values, dims='time')
datas = [d.set_index(time='inittime').assign_coords(leadtime=leadtime) for d in datas]
datas = xr.merge(datas)
elif isinstance(datas[0], pd.DataFrame):
datas = pd.concat(datas)
elif isinstance(datas, xr.DataArray):
datas = datas.to_datas()
elif isinstance(datas, (list, xr.Dataset, pd.DataFrame, pd.Series)):
pass
else:
raise NotImplementedError(datas)
return datas
return datas if len(requests) > 1 else datas[0]
def _sel(self, request: Union[list, tuple], **kwargs):
logger.debug(request)
request = dict(zip(('datasource', 'inittime', 'fh', 'varname', 'leadtime'), request))
request['inittime'], request['fh'] = self.decode_leadtime(request['inittime'], request['fh'],
request['leadtime'])
logger.debug(request)
interface_method = getattr(self, f"_sel_{request['datasource'].split('_')[0].lower()}")
try:
return interface_method(**request, **kwargs)
except Exception as e:
logger.exception("{} - {}".format(request, e))
return
[文档]
def decode_leadtime(self, inittime=None, fh=None, leadtime=None):
"""Decode inittime, fh and leadtime
Parameters
----------
inittime (datetime): intial datetime
fh (int): other k/v arguments passed to `sel` method of specific reader
leadtime (datetime): kwargs passed to wcard construction
Returns
-------
(xarray.DataArray, list[xarray.DataArray]): Readed variable in xarray.DataArray
"""
assert inittime is not None or leadtime is not None
if fh is None:
if leadtime is not None and inittime is not None:
fh = int((leadtime - inittime).total_seconds() / 3600.)
if inittime is None:
inittime = leadtime - timedelta(hours=fh)
return inittime, fh
def _sel_file(self, datasource: str, inittime: Union[datetime, slice] = None, **kwargs):
"""sel nafp (model) file from daas
Parameters
----------
datasource: str
data source name from Daas
inittime: datetime, slice
model initial datetime
kwargs:
other parameters, not implemented
Returns
-------
ret:
a list of url or download directly based on default call method
"""
default_call = "callAPI_to_downFile"
# default_call = "callAPI_to_fileList"
parameters = {
"dataCode": self.alias.get(datasource, datasource),
}
dtype = datasource.split('_')[0].capitalize()
interface = f'get{dtype}File'
if isinstance(inittime, slice):
interface += 'ByTimeRange'
parameters.update({"timeRange": f"[{inittime.start:%Y%m%d%H%M%S},{inittime.stop:%Y%m%d%H%M%S}]"})
else:
interface += 'ByTime'
parameters.update({"time": f"{inittime:%Y%m%d%H%M%S}"})
if kwargs.get('staIds'):
interface += 'AndStaID'
parameters.update({"staIds": kwargs.get('staIds')})
path = kwargs.pop('path', './')
ret = getattr(self, default_call)(self._user, self._password, interface, parameters, path)
return ret.fileInfos
def _sel_nafp(self, datasource: str, inittime: Union[datetime, slice] = None,
fh: Union[int, slice] = None, varname: str = None,
**kwargs) -> Union[pd.DataFrame, xr.DataArray]:
"""Sel nafp (model) variable from daas
Parameters
----------
datasource: str
data source name from Daas
inittime: datetime, slice
model initial datetime
fh: int, slice
forecast lead hours
varname: str
forecast variable name
kwargs:
lat,lon: slice or point
Returns
-------
(xr.DataArray, pd.DataFrame): variable
"""
download = kwargs.pop('download', False)
if download:
url = self._sel_file(datasource, inittime, path=download, **kwargs)
return url
default_call = "callAPI_to_gridArray2D"
level = kwargs.pop('level', 0)
if 'levelType' in kwargs:
level_type = kwargs.get('levelType')
else:
level_type = 1 if level == 0 else 100
parameters = {
"dataCode": self.alias.get(datasource, datasource),
"time": f"{inittime:%Y%m%d%H%M%S}",
"fcstEle": varname,
}
if level is None:
parameters.update({"levelType": "-"})
else:
parameters.update({"levelType": f"{kwargs.pop('levelType', level_type)}",
"fcstLevel": f"{kwargs.pop('level', level)}", })
interface = "getNafpEle"
if datasource.startswith('NAFP_GRID_ANA'):
interface = "getNafpAnaEle"
if kwargs.get('lat') and kwargs.get('lon'):
lat, lon = kwargs.get('lat'), kwargs.get('lon')
if isinstance(lat, slice):
interface += 'GridInRect'
parameters.update(
{'minLat': f"{lat.start}",
'maxLat': f"{lat.stop}",
'minLon': f"{lon.start}",
'maxLon': f"{lon.stop}"}
)
else:
interface += 'AtPoint'
if isinstance(lat, (list, tuple)) and isinstance(lon, (list, tuple)):
points = ','.join([f'{y}/{x}' for y, x in zip(lat, lon)])
else:
points = f'{lat}/{lon}'
parameters.update({'latLons': points})
default_call = "callAPI_to_array2D"
else:
interface += 'Grid'
interface += 'ByTimeAndLevel'
time = inittime
if isinstance(fh, slice):
interface += 'AndValidtimeRange'
parameters.update(
{'minVT': f"{fh.start}",
'maxVT': f"{fh.stop}"}
)
elif fh is not None:
interface += 'AndValidtime'
parameters.update({'validTime': f"{fh}"})
time = inittime + timedelta(hours=fh)
ret = getattr(self, default_call)(self._user, self._password, interface, parameters)
if ret.request.errorCode == 0:
logger.debug(ret)
if default_call == "callAPI_to_array2D":
# TODO: 返回不一致的数据类型,让人不知所措
data = pd.DataFrame(ret.data, columns=list(ret.elementNames))
else:
if datasource == "NAFP_C3E_FOR_FTM_LOW_ASI":
rets = [ret.data]
for i in range(1, 51):
parameters['fcstLevel'] = f"{i}"
ret = getattr(self, default_call)(self._user, self._password, interface, parameters)
rets.append(ret.data)
data = xr.DataArray([rets], dims=('time', 'number', 'lat', 'lon'),
coords={'time': [time],
# 'inittime': [inittime],
'number': np.arange(51, dtype='i4'),
'lon': np.linspace(ret.startLon, ret.endLon, ret.lonCount),
'lat': np.linspace(ret.startLat, ret.endLat, ret.latCount)},
name=varname)
data = data.assign_coords(inittime=xr.DataArray([inittime], dims='time'))
else:
data = xr.DataArray([ret.data], dims=('time', 'lat', 'lon'),
coords={'lon': np.linspace(ret.startLon, ret.endLon, ret.lonCount),
'lat': np.linspace(ret.startLat, ret.endLat, ret.latCount),
# 'inittime': [inittime],
'time': [time]},
name=varname)
data = data.assign_coords(inittime=xr.DataArray([inittime], dims='time'))
return data
else:
logger.debug(ret.request)
raise Exception(ret.request.errorCode, ret.request.errorMessage)
def _sel_surf_grid(self, datasource: str, inittime: Union[datetime, slice] = None,
varname: str = None, **kwargs) -> Union[pd.DataFrame, xr.DataArray]:
"""Sel surface (observation) grid variable from daas
Parameters
----------
datasource: str
data source name from Daas
inittime: datetime, slice
observation datetime
varname: str
observation variable name
kwargs:
lat,lon: slice or point
Returns
-------
(xr.DataArray, pd.DataFrame): variable
"""
default_call = "callAPI_to_gridArray2D"
parameters = {"dataCode": self.alias.get(datasource, datasource),
"time": f"{inittime:%Y%m%d%H%M%S}",
"fcstEle": varname, }
interface = "getSurfEle"
if kwargs.get('lat') and kwargs.get('lon'):
lat, lon = kwargs.get('lat'), kwargs.get('lon')
if isinstance(lat, slice):
interface += 'GridInRect'
parameters.update(
{'minLat': f"{lat.start}",
'maxLat': f"{lat.stop}",
'minLon': f"{lon.start}",
'maxLon': f"{lon.stop}"}
)
else:
interface += 'AtPoint'
if isinstance(lat, (list, tuple)) and isinstance(lon, (list, tuple)):
points = ','.join([f'{y}/{x}' for y, x in zip(lat, lon)])
else:
points = f'{lat}/{lon}'
parameters.update({'latLons': points})
default_call = "callAPI_to_array2D"
else:
interface += 'Grid'
interface += 'ByTime'
ret = getattr(self, default_call)(self._user, self._password, interface, parameters)
if ret.request.errorCode == 0:
logger.debug(ret)
if default_call == "callAPI_to_array2D":
# TODO: 返回不一致的数据类型,让人不知所措
data = pd.DataFrame(ret.data, columns=ret.elementNames)
else:
data = xr.DataArray(ret.data, dims=('lat', 'lon'),
coords={'lon': np.linspace(ret.startLon, ret.endLon, ret.lonCount),
'lat': np.linspace(ret.startLat, ret.endLat, ret.latCount)},
name=varname)
return data
else:
logger.debug(ret.request)
raise Exception(ret.request.errorCode, ret.request.errorMessage)
def _sel_surf(self, datasource: str, inittime: Union[str, slice, datetime] = None,
varname: str = None, **kwargs) -> pd.DataFrame:
"""sel surface data
Parameters
----------
datasource: str
data source name from Daas
inittime: str, datetime, slice
observation datetime
varname: str
observation variable name, or with statistic ('SUM_', 'MAX_', 'MIN_', 'AVG_', 'COUNT_') prefix
kwargs:
read_from_file: read file in daas
index_col: for statistic interface, use to group, and as the index for requested data
lat,lon: slice of latitude and longitude
adminCodes: region Code, 330000 is Zhejiang
staIds: station Id, multiple Id use comma separated
staLevels: 011,012,013 represent national, basic, ordinary
eleValueRanges: element value range limit
limitCnt: max return records number
orderBy: order key
dataProvinceId: BABJ, BEHZ ...
Returns
-------
data: pd.DataFrame
requested observation data
"""
read_from_file = kwargs.pop('read_from_file', False)
if read_from_file:
url = self._sel_file(datasource, inittime)
return url
if datasource.startswith('SURF_CMPA'):
return self._sel_surf_grid(datasource, inittime=inittime, varname=varname, **kwargs)
stats_prefix = ['SUM_', 'MAX_', 'MIN_', 'AVG_', 'COUNT_']
if any([True for p in stats_prefix if varname.startswith(p)]):
interface = 'statSurfEle'
para_dict = dict(elements=kwargs.get('index_col'),
statEles=varname)
else:
interface = 'getSurfEle'
elements = varname
if kwargs.get('index_col') is not None:
elements = f"{varname},{kwargs.get('index_col')}"
para_dict = dict(elements=elements)
parameters = {
"dataCode": self.alias.get(datasource, datasource),
**para_dict
}
if kwargs.get('lat') and kwargs.get('lon'):
parameters.update(
{'minLat': f"{kwargs.get('lat').start}",
'maxLat': f"{kwargs.get('lat').stop}",
'minLon': f"{kwargs.get('lon').start}",
'maxLon': f"{kwargs.get('lon').stop}"}
)
interface += 'InRect'
elif kwargs.get('adminCodes'):
parameters.update(
{'adminCodes': f"{kwargs.get('adminCodes')}"}
)
interface += 'InRegion'
if hasattr(inittime, 'start') and hasattr(inittime, 'stop'):
if interface.startswith('getSurf'):
interface += 'ByTimeRange'
parameters.update(
{'timeRange': f"[{inittime.start:%Y%m%d%H%M%S},{inittime.stop:%Y%m%d%H%M%S}]"})
else:
interface += 'ByTime'
parameters.update({'times': inittime if isinstance(inittime, str) else f"{inittime:%Y%m%d%H%M%S}"})
if kwargs.get('staIds'):
if kwargs.get('lat') or kwargs.get('lon'):
raise Exception("lat/lon extent has been given.")
if interface.startswith('getSurf'):
interface += 'AndStaID'
else:
interface += 'ByStaID'
parameters.update({'staIds': kwargs.get('staIds')})
options = ['staLevels', 'eleValueRanges', 'limitCnt', 'orderBy', 'dataProvinceId']
if interface.startswith('getSurf'):
options += ['distinct', ]
else:
options += ['statEleValueRanges', 'hourSeparate', 'minSeparate']
for k in options:
if k in kwargs:
parameters.update({k: kwargs.get(k)})
ret = self.callAPI_to_array2D(self._user, self._password, interface, parameters, )
if ret.request.errorCode == 0:
data = pd.DataFrame(ret.data, columns=list(ret.elementNames))
if kwargs.get('index_col') is not None:
data = data.set_index(kwargs.get('index_col').split(','))
return data
else:
raise Exception(ret.request.errorCode, ret.request.errorMessage)
def _sel_upar(self, datasource: str, inittime: Union[str, slice, datetime] = None,
varname: str = None, **kwargs) -> pd.DataFrame:
"""sel upper data
Parameters
----------
datasource: str
data source name from Daas
inittime: str, datetime, slice
observation datetime
varname: str
observation variable name
kwargs:
read_from_file: read file in daas
index_col: for statistic interface, use to group, and as the index for requested data
lat,lon: slice of latitude and longitude
adminCodes: region Code, 330000 is Zhejiang
staIds: station Id, multiple Id use comma separated
staLevels: 011,012,013 represent national, basic, ordinary
eleValueRanges: element value range limit
limitCnt: max return records number
orderBy: order key
dataProvinceId: BABJ, BEHZ ...
Returns
-------
data: pd.DataFrame
requested observation data
"""
read_from_file = kwargs.pop('read_from_file', False)
if read_from_file:
url = self._sel_file(datasource, inittime, staIds=kwargs.get('staIds', None))
return url
interface = 'getUparEle'
elements = varname
if kwargs.get('index_col') is not None:
elements = f"{varname},{kwargs.get('index_col')}"
para_dict = dict(elements=elements)
parameters = {
"dataCode": self.alias.get(datasource, datasource),
**para_dict
}
if kwargs.get('lat') and kwargs.get('lon'):
parameters.update(
{'minLat': f"{kwargs.get('lat').start}",
'maxLat': f"{kwargs.get('lat').stop}",
'minLon': f"{kwargs.get('lon').start}",
'maxLon': f"{kwargs.get('lon').stop}"}
)
interface += 'InRect'
elif kwargs.get('adminCodes'):
parameters.update(
{'adminCodes': f"{kwargs.get('adminCodes')}"}
)
interface += 'InRegion'
if hasattr(inittime, 'start') and hasattr(inittime, 'stop'):
if interface.startswith('getUpar'):
interface += 'ByTimeRange'
parameters.update(
{'timeRange': f"[{inittime.start:%Y%m%d%H%M%S},{inittime.stop:%Y%m%d%H%M%S}]"})
else:
interface += 'ByTime'
parameters.update({'times': inittime if isinstance(inittime, str) else f"{inittime:%Y%m%d%H%M%S}"})
if kwargs.get('staIds'):
if kwargs.get('lat') or kwargs.get('lon'):
raise Exception("lat/lon extent has been given.")
interface += 'AndStaID'
parameters.update({'staIds': kwargs.get('staIds')})
if kwargs.get('pLayers'):
interface += 'AndPress'
parameters.update({'pLayers': kwargs.get('pLayers')})
elif kwargs.get('hLayers'):
interface += 'AndHeight'
parameters.update({'hLayers': kwargs.get('hLayers')})
options = ['eleValueRanges', 'limitCnt', 'orderBy', 'dataProvinceId']
for k in options:
if k in kwargs:
parameters.update({k: kwargs.get(k)})
ret = self.callAPI_to_array2D(self._user, self._password, interface, parameters, )
if ret.request.errorCode == 0:
data = pd.DataFrame(ret.data, columns=list(ret.elementNames))
if kwargs.get('index_col') is not None:
data = data.set_index(kwargs.get('index_col').split(','))
return data
else:
raise Exception(ret.request.errorCode, ret.request.errorMessage)
def _sel_sevp(self, datasource: str, inittime: Union[str, slice, datetime] = None,
varname: str = None, **kwargs) -> pd.DataFrame:
"""sel surface data
Parameters
----------
datasource: str
data source name from Daas
inittime: str, datetime, slice
observation datetime
varname: str
observation variable name, or with statistic ('SUM_', 'MAX_', 'MIN_', 'AVG_', 'COUNT_') prefix
kwargs:
read_from_file: read file in daas
index_col: for statistic interface, use to group, and as the index for requested data
lat,lon: slice of latitude and longitude
adminCodes: region Code, 330000 is Zhejiang
staIds: station Id, multiple Id use comma separated
staLevels: 011,012,013 represent national, basic, ordinary
eleValueRanges: element value range limit
limitCnt: max return records number
orderBy: order key
dataProvinceId: BABJ, BEHZ ...
Returns
-------
data: pd.DataFrame
requested observation data
"""
download = kwargs.pop('download', False)
if download:
url = self._sel_file(datasource, inittime, path=download)
return url
interface = 'getSevpEle'
elements = varname
if kwargs.get('index_col') is not None:
elements = f"{varname},{kwargs.get('index_col')}"
para_dict = dict(elements=elements)
parameters = {
"dataCode": self.alias.get(datasource, datasource),
**para_dict
}
if hasattr(inittime, 'start') and hasattr(inittime, 'stop'):
typh_params = {k: v for k, v in kwargs.items() if k.startswith('typh')}
if typh_params:
interface = 'getTyphByTimeRange'
if kwargs.get('typhNames'):
interface = 'getTyphByTimeRangeAndTyphNames'
if kwargs.get('typhCIds'):
interface = 'getTyphByTimeRangeAndTyphCids'
if kwargs.get('typhNames'):
interface = 'getTyphByTimeRangeAndTyphGids'
if kwargs.get('reportCenters'):
parameters.update({'reportCenters': kwargs.get('reportCenters')})
else:
interface += 'ByTimeRange'
parameters.update(
{'timeRange': f"[{inittime.start:%Y%m%d%H%M%S},{inittime.stop:%Y%m%d%H%M%S}]"})
else:
interface += 'ByTime'
parameters.update({'times': inittime if isinstance(inittime, str) else f"{inittime:%Y%m%d%H%M%S}"})
options = ['eleValueRanges', 'limitCnt', 'orderBy', 'dataProvinceId']
for k in options:
if k in kwargs:
parameters.update({k: kwargs.get(k)})
ret = self.callAPI_to_array2D(self._user, self._password, interface, parameters, )
if ret.request.errorCode == 0:
data = pd.DataFrame(ret.data, columns=list(ret.elementNames))
if kwargs.get('index_col') is not None:
data = data.set_index(kwargs.get('index_col').split(','))
return data
else:
raise Exception(ret.request.errorCode, ret.request.errorMessage)