每次想调用feign接口的时候都要上eureka查一下ip和端口,所以封装了两个请求方法,对输入的url提取服务名并自动查找eureka上的ip端口,方便接口测试,也可以在接口自动化框架中使用
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import json
from urllib.parse import urljoin
class ServiceNotFoundError(Exception):
""" 在eureka找不到服务时抛出这个异常 """
pass
class NoInstanceError(Exception):
""" 服务没有实例,抛出这个异常 """
pass
# eureka地址
env_test = 'http://{地址}:{端口}'
active = env_test
# 防止建立过多连接,使用session来请求接口
session = requests.Session()
session.keep_alive = False
# 最多重试3次,间隔0.5s
adapter = HTTPAdapter(max_retries=Retry(connect=3, backoff_factor=0.5))
session.mount('http://', adapter)
session.mount('https://', adapter)
def get_service_host(service):
"""
从eureka获取指定服务的ip和端口,如果有多个节点,只返回第一个
:param service: 注册到eureka服务名,如 draw-course-backend
:return: 第一个节点的服务ip和端口,如 http://172.16.51.6:8889/
"""
headers = {'Accept': 'application/json'}
res = session.get(urljoin(active, '/eureka/apps/%s' % service), headers=headers)
if res.status_code != 200:
raise ServiceNotFoundError('The "%s" service maybe not be registered in eureka!' % service)
content = str(res.content, encoding='utf-8')
content = json.loads(content)
instance = content['application']['instance']
if not instance:
raise NoInstanceError('No "%s" service instance available' % service)
instance = instance[0]
return instance['homePageUrl']
def translate_url(url):
"""
把链接的服务名转换成ip地址
:param url: /draw-course-backend/a/b/c (或 draw-course-backend/a/b/c)
:return: 新的链接,如 http://172.16.51.6:8889/a/b/c
"""
routes = str(url).split('/')
service = routes.pop(0)
if service == '':
service = routes.pop(0)
host = get_service_host(service)
url = urljoin(host, '/'.join(routes))
print('translate_url: %s' % url)
return url
def fget(url, **kwargs):
""" 对feign接口发起get请求 """
url = translate_url(url)
try:
return session.get(url, timeout=10, **kwargs)
except Exception as e:
print(e)
def fpost(url, **kwargs):
""" 对feign接口发起post请求 """
url = translate_url(url)
try:
return session.post(url, timeout=10, **kwargs)
except Exception as e:
print(e)