一、一次定位操作背后发生了什么
1
2
3
4
5
6
# 初始化wda_client
import wda
wda_client = wda.Client()
# 调用查找元素
self.wda_client(label='登录')
这个行为会调用BaseClient类的__call__
方法,返回一个Selector对象
1
2
3
4
5
6
7
8
class BaseClient(object)
def __init__(self, url=None, _session_id=None):
self.__timeout = 30.0
def __call__(self, *args, **kwargs):
if 'timeout' not in kwargs:
kwargs['timeout'] = self.__timeout
return Selector(self, *args, **kwargs)
而Selector对象在初始化的时候,接受如下参数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Selector(object):
def __init__(self,
session: Session,
predicate=None,
id=None,
className=None,
type=None,
name=None,
nameContains=None,
nameMatches=None,
text=None,
textContains=None,
textMatches=None,
value=None,
valueContains=None,
label=None,
labelContains=None,
visible=None,
enabled=None,
classChain=None,
xpath=None,
parent_class_chains=[],
timeout=10.0,
index=0):
此时并不会发生查找动作,即不会与wda进行交互。真正的查找发生在Selector.find_element_ids()方法被调用的时候,具体为以下场景:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
s = self.wda_client(label='登录')
s.get()
s.count()
s.exists
class Selector(object):
def count(self):
return len(self.find_element_ids())
def exists(self):
return len(self.find_element_ids()) > self._index
def find_elements(self):
"""
Returns:
Element (list): all the elements
"""
es = []
for element_id in self.find_element_ids():
e = Element(self._session, element_id)
es.append(e)
return es
def get(self, timeout=None, raise_error=True):
"""
Args:
timeout (float): timeout for query element, unit seconds
Default 10s
raise_error (bool): whether to raise error if element not found
Returns:
Element: UI Element
Raises:
WDAElementNotFoundError if raise_error is True else None
"""
start_time = time.time()
if timeout is None:
timeout = self._timeout
while True:
elems = self.find_elements()
if len(elems) > 0:
return elems[0]
if start_time + timeout < time.time():
break
time.sleep(0.5)
if raise_error:
raise WDAElementNotFoundError("element not found",
"timeout %.1f" % timeout)
而find_element_ids()方法的实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class Selector(object):
@retry.retry(WDAStaleElementReferenceError, tries=3, delay=.5, jitter=.2)
def find_element_ids(self):
elems = []
if self._id:
return self._wdasearch('id', self._id)
if self._predicate:
return self._wdasearch('predicate string', self._predicate)
if self._xpath:
return self._wdasearch('xpath', self._xpath)
if self._class_chain:
return self._wdasearch('class chain', self._class_chain)
chain = '**' + ''.join(
self._parent_class_chains) + self._gen_class_chain()
if DEBUG:
print('CHAIN:', chain)
return self._wdasearch('class chain', chain)
def _wdasearch(self, using, value):
"""
Returns:
element_ids (list(string)): example ['id1', 'id2']
HTTP example response:
[
{"ELEMENT": "E2FF5B2A-DBDF-4E67-9179-91609480D80A"},
{"ELEMENT": "597B1A1E-70B9-4CBE-ACAD-40943B0A6034"}
]
"""
element_ids = []
for v in self.http.post('/elements', {
'using': using,
'value': value
}).value:
element_ids.append(v['ELEMENT'])
return element_ids
find_element_ids会向wda发送http请求,查找/elements接口,并且失败自动重试三次。
而wda会使用using和value两个字段来共同定位XCUIElement元素。
using支持id、predicate string、xpath、class chain四种,其中我们常用label、name等,都是通过Selector._gen_class_chain方法拼接出来的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class Selector(object):
def _gen_class_chain(self):
# just return if aleady exists predicate
if self._predicate:
return '/XCUIElementTypeAny[`' + self._predicate + '`]'
qs = []
if self._name:
qs.append("name == '%s'" % self._name)
if self._name_part:
qs.append("name CONTAINS %r" % self._name_part)
if self._name_regex:
qs.append("name MATCHES %r" % self._name_regex)
if self._label:
qs.append("label == '%s'" % self._label)
if self._label_part:
qs.append("label CONTAINS '%s'" % self._label_part)
if self._value:
qs.append("value == '%s'" % self._value)
if self._value_part:
qs.append("value CONTAINS '%s'" % self._value_part)
if self._visible is not None:
qs.append("visible == %s" % 'true' if self._visible else 'false')
if self._enabled is not None:
qs.append("enabled == %s" % 'true' if self._enabled else 'false')
predicate = ' AND '.join(qs)
chain = '/' + (self._class_name or 'XCUIElementTypeAny')
if predicate:
chain = chain + '[`' + predicate + '`]'
if self._index:
chain = chain + '[%d]' % self._index
return chain
所以向wda发送请求时的body:
1
2
3
4
5
POST /elements
{
'using': 'class chain',
'value': "**/XCUIElementTypeAny[`label == '登录'`]"
}
而响应类似:
1
2
3
4
{
'ELEMENT': '20000000-0000-0000-B8C1-000000000000',
'element-6066-11e4-a52e-4f735466cecf': '20000000-0000-0000-B8C1-000000000000'
}
v[‘ELEMENT’]可以认为是元素id,下次查询动作可以直接通过id来定位元素。
如果是调用Selector.find_elements获取元素id,会将取到的元素保存为一组Element对象。
1
2
3
4
5
6
7
class Element(object):
def __init__(self, session: Session, id: str):
"""
base_url eg: http://localhost:8100/session/$SESSION_ID
"""
self._session = session
self._id = id
而查询Element对象的具体信息,需要调用Element的方法,如:
1
2
3
4
5
6
7
8
class Element(object):
@property
def label(self):
return self._prop('attribute/label')
@property
def accessible(self):
return self._wda_prop("accessible")
每次获取信息的时候都需要与wda交互,调用/element或/wda/element接口
1
2
3
4
5
6
7
8
9
10
11
12
13
class Element(object):
def _req(self, method, url, data=None):
return self.http.fetch(method, '/element/' + self._id + url, data)
def _wda_req(self, method, url, data=None):
return self.http.fetch(method, '/wda/element/' + self._id + url, data)
def _prop(self, key):
return self._req('get', '/' + key.lstrip('/')).value
def _wda_prop(self, key):
ret = self.http.get('/wda/element/%s/%s' % (self._id, key)).value
return ret
wda支持如下操作接口:
而如果对Element元素进行点击,会有两种方式:一种是直接调用wda的click接口,另一种是先获取元素的中心点坐标,然后点击坐标。理论上前一种速度更快。
1
2
3
4
5
6
7
8
9
10
11
12
class Element(object):
def tap(self):
return self._req('post', '/click')
def click(self):
"""
Get element center position and do click, a little slower
"""
# Some one reported, invisible element can not click
# So here, git position and then do tap
x, y = self.bounds.center
self._session.click(x, y)
二、层级设置
wda支持一个设置:snapshotMaxDepth,用于指定查找元素的层级。
对于页面上元素过多,导致查找超时的情况,可以通过修改snapshotMaxDepth来加快查询速度。
在wda中通过/session/$sessionId/appium/settings 接口可以设置snapshotMaxDepth:
python调用appium_settings来修改snapshotMaxDepth
1
2
3
4
5
6
7
8
9
10
11
12
13
wda_client.appium_settings({"snapshotMaxDepth": 30})
class BaseClient(object):
def appium_settings(self, value: Optional[dict] = None) -> dict:
"""
Get and set /session/$sessionId/appium/settings
"""
if value is None:
return self._session_http.get("/appium/settings").value
return self._session_http.post("/appium/settings",
data={
"settings": value
}).value
一个简单的脚本,逐级获取当前页面下所有元素,写入一个csv文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
""" 获取当前页的元素,写入文件 """
import wda
wda_client = wda.Client('http://localhost:%s' % 20002)
s = wda_client.session()
result = {}
def parse(root, result, depth):
if root.get('label'):
alias = root.get('label')
if alias in result.keys():
if not result[alias]['type'] == root.get('type'):
result[alias + '_' + root.get('type')] = {
'label': root.get('label'),
'name': root.get('name'),
'value': root.get('value'),
'type': root.get('type'),
'depth': depth
}
else:
result[alias] = {
'label': alias,
'name': root.get('name'),
'value': root.get('value'),
'type': root.get('type'),
'depth': depth,
}
def get_source(depth):
s.appium_settings({"snapshotMaxDepth": depth})
root = s.source(format='json')
queue = [root]
while queue:
item = queue.pop(0)
parse(item, result, depth)
if item.get('children'):
queue.extend(item.get('children'))
def get_all_source(max_depth):
for i in range(10, max_depth, 10):
get_source(i)
get_source(max_depth)
def write_csv(page):
with open(page + '.csv', 'w', encoding='gbk') as f:
f.write('alias,label,name,value,type,max_depth\n')
for key in result.keys():
i = result[key]
f.write('{},{},{},{},{},{}\n'.format(key, i['label'], i['name'], i['value'], i['type'], i['depth']))
if __name__ == '__main__':
page_name = input('Please input a page:')
while page_name:
d = input('Please input max_depth:')
if d:
max_depth = int(d)
else:
max_depth = 50
get_all_source(max_depth)
write_csv(page_name)
page_name = input('Please input a page:')
三、平衡速度与容错
自动化测试中,需要平衡查找速度与失败重试。
(1)压缩无用操作,直接调用Selector的find_element_ids()方法
1
2
3
4
5
6
7
8
9
# 调用查找元素
s = Selector(label='登录')
elems = s.find_element_ids()
# 根据出现顺序获取Element对象。 order=1,2,3……
elem = elems[order-1]
# click操作
elem.tap()
(2) 指定查找元素的超时时间和重试次数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 弹窗检测:只查一次 internal=0.5,retry=1
# 元素assert 检测:
# 刚操作完,页面有刷新,直到检测到: internal=0.5, retry=5
# 静止页面: internal=0.5,retry=1
# 操作调用检测:
# 静止页面: internal=0.3,retry=5
# 刚操作完: internal=1,retry=5
def find_element_new(self, element=None, **kwargs):
"""
优化后的元素查找。
"""
timeout = kwargs.pop('timeout') if 'timeout' in kwargs else 1.0
order = kwargs.pop('order') if 'order' in kwargs else 1
retry = kwargs.pop('retry') if 'retry' in kwargs else 1
if element:
s = self.wda_client(element.find)
else:
s = self.wda_client(**kwargs)
return s.get(timeout=timeout, retry=retry, raise_error=False, order=order)
(3)其他操作复用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def click_new(self, element: Element = None, **kwargs):
tap = kwargs.pop('tap') if 'tap' in kwargs else True
e = self.find_element_new(element, **kwargs)
if e:
logger.debug('find element result: [%s]', e.label)
e.tap() if tap else e.click()
else:
# 用于错误处理的时候
raise FindElementError('点击失败')
def input_new(self, element: Element = None, context='', **kwargs):
e = self.find_element_new(element, **kwargs)
if e:
logger.debug('find element result: [%s]', e.label)
e.set_text(context)
else:
# 用于错误处理的时候
raise FindElementError('输入内容失败')
def find_matches_new(self, elements: List[Element], **kwargs):
"""
find_matches_new([xxx页.x按钮,xxx页.y按钮],'A标签'={'label':'A','type:'Button'}}
"""
result = {}
for e in elements:
s = self.wda_client(e.find)
find_e = s.find_element_ids() # 只查一次
result[e.find] = find_e
for name in kwargs:
s = self.wda_client(**kwargs[name])
find_e = s.find_element_ids() # 只查一次
result[name] = find_e
return result
(4)智能查找元素,用于已知信息查不到对应元素时。在弹窗查找、当前页判断等处设置smart=False,不启用智能查找。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def find_element_new(self, element: FtElement = None, **kwargs):
"""
优化后的元素查找。
"""
timeout = kwargs.pop('timeout') if 'timeout' in kwargs else 1.0
order = kwargs.pop('order') if 'order' in kwargs else 1
retry = kwargs.pop('retry') if 'retry' in kwargs else 1
smart = kwargs.pop('smart') if 'smart' in kwargs else True
if element:
kwargs = element.find
s = self.wda_client(**kwargs)
find_s = s.get(timeout=timeout, retry=retry, raise_error=False, order=order)
if not smart or find_s:
return find_s
return self.smart_find_element(**kwargs)
def smart_find_element(self, **kwargs):
"""
智能查找元素。当find_element查不到时使用.
(1)去除order
(2)逐步扩大层级至50。 +1、2、4、8、16
(3)label->labelContains,name->nameContains,value->valueContains
(4)排列组合参数,逐渐减少参数个数
(5)raise FindElementError
"""
if 'order' in kwargs:
kwargs.pop('order')
s = self.wda_client(**kwargs)
if s.find_element_ids():
logger.warning('old order is not correct, real order=1')
return s.get(timeout=1.0, retry=1, raise_error=False)
if self._expand_depth_find(s):
return s.get(timeout=1.0, retry=1, raise_error=False)
try_contains = self._expand_contains_find(**kwargs)
if try_contains:
s = self.wda_client(**try_contains)
return s.get(timeout=1.0, retry=1, raise_error=False)
try_combine = self._combine_find(**kwargs)
if try_combine:
s = self.wda_client(**try_combine)
return s.get(timeout=1.0, retry=1, raise_error=False)
raise FindElementError('smart find element fail')
def _check_exist(self, **kwargs):
try_s = self.wda_client(**kwargs)
if try_s.find_element_ids():
return True
return False
def _expand_depth_find(self, selector):
old_depth = self.get_snapshot_max_depth()
depth = old_depth
for i in range(6):
depth += 1 << i
if depth > 50:
break
self.set_snapshot_max_depth(depth)
if selector.find_element_ids():
logger.warning('old depth is not correct, real depth = [%s]', str(depth))
self.set_snapshot_max_depth(old_depth)
return True
self.set_snapshot_max_depth(old_depth)
return False
def _expand_contains_find(self, **kwargs):
if 'label' in kwargs:
kwargs['labelContains'] = kwargs.pop('label')
if 'name' in kwargs:
kwargs['nameContains'] = kwargs.pop('name')
if 'value' in kwargs:
kwargs['valueContains'] = kwargs.pop('value')
if self._check_exist(**kwargs):
logger.warning('label not matches, change to labelContains')
return kwargs
return None
def _combine_find(self, **kwargs):
for i in range(len(kwargs) - 1, 0, -1):
for c in combinations(kwargs.keys(), i):
new_kwargs = {}
for new_key in c:
new_kwargs[new_key] = kwargs[new_key]
if self._check_exist(**new_kwargs):
logger.warning('old param is too much, reserve [%s]', c)
return new_kwargs
return None